Distributed lock using Hazelcast
Introduction
During application development we often encounter performance issues. An obvious solution (apart from optimizing the code) is to scale it. Usually we start with the parallelization of processing using multiple threads. The drawback of this solution is, that you need to synchronize access to shared data, usually using locks. There is a physical and cost limit to vertical scaling (adding more resources to the machine on which application is running), so in the end you may consider horizontal scaling (adding more machines/nodes). This creates another challenge – you need to synchronize access to the shared data across multiple nodes. In this article I would like to share implementation of a distributed lock synchronization that I had the pleasure of co-creating.
Problem description
Let’s consider the following scenario. We have a system that needs to track train locations. The train is updated based on 3 message types – train creation, train movement and train deletion. First we receive the message that creates the train, then we receive messages which report its movement from one location to another and lastly the train is deleted by deletion message. Let’s assume that we want to track how many trains were created/deleted at a particular location and how many arrived/departed from a particular location. To do that we would need to store that data and synchronize access to it as it can be modified by different trains.
Implementation – domain
A reference to the full implementation is located at the end of the article. Here we go step by step in explaining how it was implemented. From the problem description we know that we need two objects to store needed data – Train and Location. We also need objects for all message types.
public class Train {
private boolean exists;
private String trainId;
private String lastLocationId;
public static Train getFromStore(String trainId) {
// implementation
}
public void save() {
// implementation
}
public void delete() {
// implementation
}
public boolean exists() {
return exists;
}
// Getters, setters, equals, hash code, toString
}
In the Train object we have a unique identifier, which will identify a train, and a field to store its last location (a unique identifier of the location). Apart from that we have methods which will allow to create/update/delete the object in the store and the helper method to check if the train is already present there.
public class Location {
private boolean exists;
private String id;
private Integer numberOfTrainsCreated = 0;
private Integer numberOfTrainsDeleted = 0;
private Integer numberOfTrainsWhichArrived = 0;
private Integer numberOfTrainsWhichDeparted = 0;
public static Location getFromStore(String locationId) {
// implementation
}
public void save() throws SavingException {
// implementation
}
public void delete() {
// implementation
}
public boolean exists() {
return exists;
}
public void trainCreated() {
++numberOfTrainsCreated;
}
public void trainDeleted() {
++numberOfTrainsDeleted;
}
public void trainArrived() {
++numberOfTrainsWhichArrived;
}
public void trainDeparted() {
++numberOfTrainsWhichDeparted;
}
// Getters, setters, equals, hash code, toString
}
The Location object allows us to track the number of trains which were created/deleted at or arrived/departed from that location. As with the train object we have helper methods which allow us to save/update/delete the object. Additional methods allow us to increment counters for different train events.
public class TrainCreationMessage {
private String id;
private String trainId;
private String location;
// Getters, setters, equals, hash code, toString
}
public class TrainDeletionMessage {
private String id;
private String trainId;
// Getters, setters, equals, hash code, toString
}
public class TrainMovementMessage {
private String id;
private String trainId;
private String toLocation;
// Getters, setters, equals, hash code, toString
}
All of the messages have a unique identifier, which will help to track how a particular message is processed and a field to store the unique train identifier. TrainCreationMessage and TrainMovementMessage have also a field with a location identifier to indicate at which location the train needs to be created/to which location the train is moving.
Implementation – the message processing logic
Now that we have the initial domain model, we can specify how each message will be processed. We start with TrainCreationMessage:
public class MessageProcessor {
private static final Logger log = LoggerFactory.getLogger(MessageProcessor.class);
private void process(TrainCreationMessage message) {
Train train = Train.getFromStore(message.getTrainId());
if (!train.exists()) {
train.setLastLocationId(message.getLocation());
train.save();
Location location = Location.getFromStore(message.getLocation());
location.trainCreated();
location.save();
log.info("{} Train '{}' created at location '{}'", message.getMessageId(), train.getTrainId(),
train.getLastLocationId());
} else {
log.info("{} Cannot create train '{}' at location '{}' as it already exists. Train location '{}'", message.getMessageId(), train.getTrainId(), message.getLocation(), train.getLastLocationId());
}
}
// [...]
}
TrainCreationMessage is processed as follows:
– If we already have a train in memory – discard the message
– Otherwise – store the last location in the Train object and save it, then increment the number of created trains in the Location object and also save it.
public class MessageProcessor {
// [...]
private void process(TrainMovementMessage message) {
Train train = Train.getFromStore(message.getTrainId());
if (train.exists()) {
Location lastTrainLocation = Location.getFromStore(train.getLastLocationId());
lastTrainLocation.trainDeparted();
lastTrainLocation.save();
Location newTrainLocation = Location.getFromStore(message.getToLocation());
newTrainLocation.trainArrived();
newTrainLocation.save();
train.setLastLocationId(message.getToLocation());
train.save();
log.info("{} Train '{}' moved from '{}' to '{}'", message.getMessageId(), train.getTrainId(), lastTrainLocation.getId(),
message.getToLocation());
} else {
log.info("{} Train {} does not exists - needs to be created first", message.getMessageId(), train.getTrainId());
}
}
}
TrainMovementMessage not only updates the train and the location from the message, but also it also updates previous location of the train – increments the number of departed trains.
public class MessageProcessor {
// [...]
private void process(TrainDeletionMessage message) {
Train train = Train.getFromStore(message.getTrainId());
if (train.exists()) {
Location location = Location.getFromStore(train.getLastLocationId());
location.trainDeleted();
location.save();
train.delete();
log.info("{} Train '{}' deleted", message.getMessageId(), train.getTrainId());
} else {
log.info("{} Train {} cannot be deleted as it does not exists", message.getMessageId(), train.getTrainId());
}
}
}
TrainDeletionMessage deletes the train object (if it exists) and updates the counter of the deleted trains for the location at which the train was before the deletion.
Implementation – synchronization
Looking at the message processing logic we can see that changing the Train and Location objects by a multiple thread will cause problems. To synchronize the threads we will use Hazelcast IMDG. We have two types of objects to synchronize so let’s create an enum for that:
public enum SynchronizationKeyType {
LOCATION("LOC_"),
TRAIN("TRA_");
private SynchronizationKeyType(String prefix) {
this.prefix = prefix;
}
private String prefix;
public String getPrefix() {
return prefix;
}
}
This enum will be used to create a key on which we will synchronize in a separate object:
public class Synchronization {
private SynchronizationKeyType key;
private String value;
public Synchronization(SynchronizationKeyType key, String value) {
this.key = key;
this.value = value;
}
private String getKey() {
return key.getPrefix() + value;
};
public boolean synchronize() throws InterruptedException {
String keyToSync = getKey();
return getMap().tryLock(keyToSync, 10, TimeUnit.MILLISECONDS, 2, TimeUnit.MINUTES);
}
public void desynchronize() {
String keyToSync = getKey();
getMap().unlock(keyToSync);
}
private IMap<String, Synchronization> getMap() {
return HazelcastManager.getInstance().getSynchronizationMap();
}
}
As a key for synchronization we use the prefix from the enum and a unique identifier from the Location or Train object. This way even if the train and the location have the same unique id – our synchronization key is still unique. For locking we will use the Hazelcast IMap tryLock method. We try to obtain the lock for 10 milliseconds. If we fail to do so, the method returns false. Lock TTL (time to live) is set to 2 minutes in case we fail to release the lock. This will allow to continue processing in case of error. HazelcastManager is a singleton class which creates and holds a HazelcastInstance object. HazelcastInstance allows to interact with Hazelcast.
Now that we have means for synchronization, let’s implement the synchronization for all of the message types. First we create a new interface:
public interface Synchronizable {
public static final Logger log = LoggerFactory.getLogger(Synchronizable.class);
public static final int MAX_RETRIES = 10;
public Set<Synchronization> getSynchronizationKeys();
public String getMessageId();
public default boolean obtainLocks(Set<Synchronization> keys) throws InterruptedException {
Set<Synchronization> synchronizedKeys = new HashSet<>();
int counter = 0;
while (counter < MAX_RETRIES) {
++counter;
for (Synchronization key : keys) {
if (key.synchronize()) {
synchronizedKeys.add(key);
} else {
releaseLocks(synchronizedKeys);
synchronizedKeys.clear();
log.debug("{} Failed to get all required locks for message {}", getMessageId());
break;
}
}
if (keys.size() == synchronizedKeys.size()) {
log.info("{} Acquired all required locks for message on {} attempt", getMessageId(), counter);
return true;
}
}
log.error("{} Failed to get all required locks for message {}", getMessageId());
return false;
}
public default void releaseLocks(Set<Synchronization> keys) {
for (Synchronization key : keys) {
key.desynchronize();
}
}
}
The method obtainLocks from the Synchronizable interface takes all the synchronization keys that we need to synchronize and tries to obtain the lock for all of them. When we fail to obtain the lock for one of the keys, we release already obtained locks and retry up to 10 times. If we manage to obtain all of them, we return true to indicate success.
The next step is to select the keys that needs to be synchronized for each message type. For the TrainCreationMessage it is the id of the train and the id of the location at which the train will be created:
public class TrainCreationMessage implements Synchronizable {
//[...]
@Override
public Set<Synchronization> getSynchronizationKeys() {
Set<Synchronization> result = new LinkedHashSet<>();
result.add(new Synchronization(SynchronizationKeyType.TRAIN, trainId));
result.add(new Synchronization(SynchronizationKeyType.LOCATION, location));
return result;
}
}
For the TrainMovementMessage the synchronization keys are: the id of the moving train, the id of the current train location and the id of the location to which tain will move.
public class TrainMovementMessage implements Synchronizable {
//[...]
@Override
public Set<Synchronization> getSynchronizationKeys() {
Set<Synchronization> result = new LinkedHashSet<>();
result.add(new Synchronization(SynchronizationKeyType.TRAIN, trainId));
Train train = Train.getFromStore(trainId);
if (train.exists()) {
result.add(new Synchronization(SynchronizationKeyType.LOCATION, train.getLastLocationId()));
}
result.add(new Synchronization(SynchronizationKeyType.LOCATION, toLocation));
return result;
}
For the TrainDeletionMessage the synchronization keys are: id of the train to be deleted and id of the location at which train is right now.
public class TrainDeletionMessage implements Synchronizable {
//[...]
@Override
public Set<Synchronization> getSynchronizationKeys() {
Set<Synchronization> result = new LinkedHashSet<>();
result.add(new Synchronization(SynchronizationKeyType.TRAIN, trainId));
Train train = Train.getFromStore(trainId);
if (train.exists()) {
result.add(new Synchronization(SynchronizationKeyType.LOCATION, train.getLastLocationId()));
}
return result;
}
Now that we have implemented the synchronization in all messages, let’s use it in message processing:
public class MessageProcessor {
private static final Logger log = LoggerFactory.getLogger(MessageProcessor.class);
public static final int MAX_RETRIES = 3;
public void process(Synchronizable message) throws InterruptedException {
boolean processed = false;
int attempt = 0;
while (!processed && attempt < MAX_RETRIES) {
++attempt;
Set<Synchronization> keysToLock = message.getSynchronizationKeys();
if (message.obtainLocks(keysToLock)) {
try {
if (message instanceof TrainCreationMessage) {
process((TrainCreationMessage) message);
} else if (message instanceof TrainDeletionMessage) {
process((TrainDeletionMessage) message);
} else if (message instanceof TrainMovementMessage) {
process((TrainMovementMessage) message);
}
processed = true;
} catch (SavingException e) {
log.error("Error while processing message " + message.getClass().getSimpleName() + " with id "
+ message.getMessageId(), e);
} finally {
message.releaseLocks(keysToLock);
log.info("{} released all locks for message", message.getMessageId());
}
}
}
}
//[...]
}
To process the Synchronizable message we obtain the locks for all the needed keys. After this is done we call the method responsible for processing the specific message type. If we fail to get the locks we retry up to 3 times. You could wonder where the SavingException came from. The problem which is tackled in this article is fairly simple. If you want to expand on this logic, you will introduce other objects which will need a synchronisation. In order to detect if we have synchronization problem (an object was changed by another thread before the current thread finished modifying it), additional logic is introduced when saving Train and Location objects:
public class Train implements Serializable {
private static final long serialVersionUID = 7105498476179814166L;
private boolean exists;
private Integer hashCode;
private String trainId;
private String lastLocationId;
public static Train getFromStore(String trainId) {
Train train = getMap().get(trainId);
if (train == null) {
train = new Train();
train.setTrainId(trainId);
train.setToNotExists();
}
return train;
}
private static IMap<String, Train> getMap() {
return HazelcastManager.getInstance().getTrainMap();
}
public void save() throws SavingException {
boolean prevExists = exists;
Integer originalHashCode = hashCode;
hashCode = hashCode();
setToExists();
Train previous = getMap().put(trainId, this);
if (prevExists && previous != null && !previous.getHashCode().equals(originalHashCode)) {
String message = """
Other process modified the object. Saved object
{%s},
object in hazelcast
{%s}
""".formatted(this, previous);
throw new SavingException(message);
}
}
public void delete() {
getMap().remove(trainId);
setToNotExists();
}
public void setToExists() {
exists = true;
}
public void setToNotExists() {
exists = false;
}
public boolean exists() {
return exists;
}
//[...]
}
Before storing the object in the Hazelcast map we calculate the hash code of the object, so that it is stored alongside other fields. When we get the object from Hazelcast, the original hash code is loaded from the Hazelcast as well. This way we can check if the original hash code is the same as the one returned from Hazelcast when saving (the put method of IMap returns the previous object). If they are different, it means that some other thread modified the object. This helps to identify where the synchronization key is missing.
Summary
Distributed locking is not an ideal solution and like everything has some drawbacks. Its pros and cons need to be evaluated before applying it to your system. In my experience it works well enough, which encourages me to use it. Check also the article about what blockchain is.
Reference
- https://docs.hazelcast.com/hazelcast/5.3/
- https://www.linkedin.com/advice/0/what-some-common-distributed-locking-patterns
- https://www.baeldung.com/java-concurrent-locks
Code Reference
Meet the geek-tastic people, and allow us to amaze you with what it's like to work with j‑labs!
Contact us


