Skip to content
Home » Exploring Distributed Locks Across Various Platforms

Exploring Distributed Locks Across Various Platforms

Introduction to Distributed Locks

Distributed locks are a critical concept in the world of distributed systems, necessary for maintaining data consistency, coordination, and synchronization across various nodes of a system. They serve as a concurrency control mechanism to restrict multiple processes from accessing or modifying shared resources simultaneously.

A distributed lock enables only one process to own it at a time across all nodes in a distributed environment. This is essential when you have a set of operations that need to be executed atomically in a distributed system, thus preserving the system’s consistency and avoiding potential conflicts and race conditions.

A Practical Scenario: A Job Scheduler

Let’s consider a real-world scenario where a distributed lock is required: A Job Scheduling System. 

Such a system might be responsible for executing tasks like data processing, sending out newsletters, generating reports, etc., across multiple servers.

Suppose a task T is scheduled to be executed at a specific time. It’s crucial that this task isn’t executed more than once, leading to data inconsistency or duplicated work. However, without a locking mechanism, it’s possible that multiple instances of the scheduler might pick up and execute task T simultaneously.

In such a situation, distributed locks prove to be vital. When a scheduler instance decides to execute a task, it would first acquire a distributed lock for that task. If it’s successful in acquiring the lock, it executes the task, and if not, it implies another scheduler instance has already taken up the task. Once the task is executed, the lock is released, thus ensuring each task is executed just once.

Implementation Examples

Redis

Redis, an in-memory data structure store renowned for its versatility, is often used for implementing distributed locks. Here’s a simplified way to implement it:

import redis
import time
import uuid

# Establishing a connection to Redis
r = redis.Redis(host='localhost', port=6379, db=0)

def acquire_lock(lockname, acquire_timeout=10, lock_timeout=10):
    identifier = str(uuid.uuid4())
    lockname = 'lock:' + lockname
    lock_timeout = int(time.time()) + lock_timeout + 1

    end = time.time() + acquire_timeout
    while time.time() < end:
        if r.setnx(lockname, identifier):
            r.expire(lockname, lock_timeout)
            return identifier
        elif not r.ttl(lockname):
            r.expire(lockname, lock_timeout)
        time.sleep(.01)
    return False
Python

In this script, the function acquire_lock tries to acquire a lock with a unique identifier. The SETNX command is used to establish the lock if it doesn’t already exist, and an expiration time is set for the lock to prevent situations where a lock is never released.

Zookeeper

Apache Zookeeper is a centralized service for maintaining configuration information, naming, and providing distributed synchronization. Zookeeper’s data nodes, known as znodes, and ephemeral nodes, make it ideal for creating distributed locks.

Here’s a simple example of how to do so:

import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.KeeperException;

public class DistributedLock {
    private ZooKeeper zk;
    private String lockPath;

    public DistributedLock() throws Exception {
        zk = new ZooKeeper("localhost:2181", 3000, new Watcher() {
            public void process(WatchedEvent event) {
                // do nothing
            }
        });
    }

    public boolean lock(String path) throws KeeperException, InterruptedException {
        lockPath = zk.create(path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        return lockPath != null;
    }

    public void unlock() throws KeeperException, InterruptedException {
        zk.delete(lockPath, -1);
    }
}
Java

In this Java code, the DistributedLock class connects to a Zookeeper instance and provides lock and unlock methods. The lock method tries to create an ephemeral znode. If the creation succeeds, the lock has been acquired. If not, it means the lock is already held by another process. The unlock method deletes the znode to release the lock.

Amazon DynamoDB

Amazon’s DynamoDB offers a lock client library for creating distributed locks. Here’s an example of how to use the DynamoDBLockClient:

import java.time.Duration;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputDescription;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;

// Instantiate the lock client
AmazonDynamoDBLockClient client = new AmazonDynamoDBLockClient(
    AmazonDynamoDBLockClientOptions.builder(dynamoDBClient, "myLockTable")
    .withLeaseDuration(Duration.ofSeconds(10))
    .withHeartbeatPeriod(Duration.ofSeconds(3))
    .build());

// Acquire lock
LockItem lockItem = null;
try {
    lockItem = client.acquireLock(AcquireLockOptions.builder("myLock").build());
    // Lock is now acquired for this item, do some work...
} catch (LockNotGrantedException e) {
    // Lock could not be acquired
} finally {
    if (lockItem != null) {
        client.releaseLock(lockItem);
    }
}
Java

Important lines explained:

  • AmazonDynamoDBLockClient client = new AmazonDynamoDBLockClient(...): This line creates a new instance of the AmazonDynamoDBLockClient which is used to interact with the lock table in DynamoDB.
  • lockItem = client.acquireLock(AcquireLockOptions.builder("myLock").build()): Attempts to acquire a lock. If the lock cannot be acquired, a LockNotGrantedException will be thrown.
  • client.releaseLock(lockItem): This line releases the lock once you’re finished with it.

Apache Curator

Apache Curator is a library offering a high-level API for Zookeeper. Here’s how to use the InterProcessMutex class to create a distributed lock:

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.LockConsumer;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(1000, 3));
client.start();

InterProcessMutex lock = new InterProcessMutex(client, "/my_lock");

if (lock.acquire(10, TimeUnit.SECONDS)) {
    try {
        // do some work inside of the critical section here
    } finally {
        lock.release();
    }
}
Java

Key lines explained:

  • CuratorFramework client = CuratorFrameworkFactory.newClient(...): This line creates a new instance of the CuratorFramework which is used to interact with ZooKeeper.
  • InterProcessMutex lock = new InterProcessMutex(client, "/my_lock"): The InterProcessMutex is a mutex (a form of lock) that works across JVMs and processes.
  • if (lock.acquire(10, TimeUnit.SECONDS)): This line attempts to acquire the lock within the specified time limit.
  • `lock.release()`: This line releases the lock after you have completed your work.

These are just basic examples and real-world scenarios might require more advanced usage and handling of the respective APIs, but these examples give an idea of how you can implement distributed locks using different technologies.

etcd

package main

import (
	"context"
	"log"
	"time"

	"go.etcd.io/etcd/clientv3"
	"go.etcd.io/etcd/clientv3/concurrency"
)

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"localhost:2379"},
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	// create a sessions to use for the lock
	s, _ := concurrency.NewSession(cli)
	defer s.Close()

	// create a mutex
	m := concurrency.NewMutex(s, "/my-lock/")

	// acquire the lock
	if err := m.Lock(context.Background()); err != nil {
		log.Fatal(err)
	}
	log.Println("acquired lock")

	// do some work...

	// release the lock
	if err := m.Unlock(context.Background()); err != nil {
		log.Fatal(err)
	}
	log.Println("released lock")
}
Go

Important lines explained:

  • cli, err := clientv3.New(clientv3.Config{...}): Creates a new client that connects to the etcd server.
  • s, _ := concurrency.NewSession(cli): Starts a new session for the client.
  • m := concurrency.NewMutex(s, "/my-lock/"): Initializes a new distributed mutex (lock).
  • m.Lock(context.Background()): Acquires the lock. If the lock is already held by another, this call blocks until the lock is available.
  • m.Unlock(context.Background()): Releases the lock so that other clients can acquire it.

Final Thoughts

Distributed locks are a fundamental tool for ensuring order and preserving data integrity across distributed systems. They offer a way to make certain that operations are conducted in an orderly fashion, mitigating problems such as data corruption and race conditions. Implementing distributed locks through platforms like Redis and Zookeeper can offer a solid framework for managing these synchronization requirements effectively. However, these mechanisms should be used judiciously, as they do add an extra layer of complexity and overhead to the system.