Distributed lock using Hazelcast

Jakub Kosiorek

Introduction

During application development we often encounter performance issues. An obvious solution (apart from optimizing the code) is to scale it. Usually we start with the parallelization of processing using multiple threads. The drawback of this solution is, that you need to synchronize access to shared data, usually using locks. There is a physical and cost limit to vertical scaling (adding more resources to the machine on which application is running), so in the end you may consider horizontal scaling (adding more machines/nodes). This creates another challenge – you need to synchronize access to the shared data across multiple nodes. In this article I would like to share implementation of a distributed lock synchronization that I had the pleasure of co-creating.

Problem description

Let’s consider the following scenario. We have a system that needs to track train locations. The train is updated based on 3 message types – train creation, train movement and train deletion. First we receive the message that creates the train, then we receive messages which report its movement from one location to another and lastly the train is deleted by deletion message. Let’s assume that we want to track how many trains were created/deleted at a particular location and how many arrived/departed from a particular location. To do that we would need to store that data and synchronize access to it as it can be modified by different trains.

Implementation – domain

A reference to the full implementation is located at the end of the article. Here we go step by step in explaining how it was implemented. From the problem description we know that we need two objects to store needed data – Train and Location. We also need objects for all message types.

In the Train object we have a unique identifier, which will identify a train, and a field to store its last location (a unique identifier of the location). Apart from that we have methods which will allow to create/update/delete the object in the store and the helper method to check if the train is already present there.

The Location object allows us to track the number of trains which were created/deleted at or arrived/departed from that location. As with the train object we have helper methods which allow us to save/update/delete the object. Additional methods allow us to increment counters for different train events.

All of the messages have a unique identifier, which will help to track how a particular message is processed and a field to store the unique train identifier. TrainCreationMessage and TrainMovementMessage have also a field with a location identifier to indicate at which location the train needs to be created/to which location the train is moving.

Implementation – the message processing logic

Now that we have the initial domain model, we can specify how each message will be processed. We start with TrainCreationMessage:

TrainCreationMessage is processed as follows:
– If we already have a train in memory – discard the message
– Otherwise – store the last location in the Train object and save it, then increment the number of created trains in the Location object and also save it.

TrainMovementMessage not only updates the train and the location from the message, but also it also updates previous location of the train – increments the number of departed trains.

TrainDeletionMessage deletes the train object (if it exists) and updates the counter of the deleted trains for the location at which the train was before the deletion.

Implementation – synchronization

Looking at the message processing logic we can see that changing the Train and Location objects by a multiple thread will cause problems. To synchronize the threads we will use Hazelcast IMDG. We have two types of objects to synchronize so let’s create an enum for that:

This enum will be used to create a key on which we will synchronize in a separate object:

As a key for synchronization we use the prefix from the enum and a unique identifier from the Location or Train object. This way even if the train and the location have the same unique id – our synchronization key is still unique. For locking we will use the Hazelcast IMap tryLock method. We try to obtain the lock for 10 milliseconds. If we fail to do so, the method returns false. Lock TTL (time to live) is set to 2 minutes in case we fail to release the lock. This will allow to continue processing in case of error. HazelcastManager is a singleton class which creates and holds a HazelcastInstance object. HazelcastInstance allows to interact with Hazelcast.

Now that we have means for synchronization, let’s implement the synchronization for all of the message types. First we create a new interface:

The method obtainLocks from the Synchronizable interface takes all the synchronization keys that we need to synchronize and tries to obtain the lock for all of them. When we fail to obtain the lock for one of the keys, we release already obtained locks and retry up to 10 times. If we manage to obtain all of them, we return true to indicate success.

The next step is to select the keys that needs to be synchronized for each message type. For the TrainCreationMessage it is the id of the train and the id of the location at which the train will be created:

For the TrainMovementMessage the synchronization keys are: the id of the moving train, the id of the current train location and the id of the location to which tain will move.

For the TrainDeletionMessage the synchronization keys are: id of the train to be deleted and id of the location at which train is right now.

Now that we have implemented the synchronization in all messages, let’s use it in message processing:

To process the Synchronizable message we obtain the locks for all the needed keys. After this is done we call the method responsible for processing the specific message type. If we fail to get the locks we retry up to 3 times. You could wonder where the SavingException came from. The problem which is tackled in this article is fairly simple. If you want to expand on this logic, you will introduce other objects which will need a synchronisation. In order to detect if we have synchronization problem (an object was changed by another thread before the current thread finished modifying it), additional logic is introduced when saving Train and Location objects:

Before storing the object in the Hazelcast map we calculate the hash code of the object, so that it is stored alongside other fields. When we get the object from Hazelcast, the original hash code is loaded from the Hazelcast as well. This way we can check if the original hash code is the same as the one returned from Hazelcast when saving (the put method of IMap returns the previous object). If they are different, it means that some other thread modified the object. This helps to identify where the synchronization key is missing.

Summary

Distributed locking is not an ideal solution and like everything has some drawbacks. Its pros and cons need to be evaluated before applying it to your system. In my experience it works well enough, which encourages me to use it.

Reference

Code Reference

Meet the geek-tastic people, and allow us to amaze you with what it's like to work with j‑labs!

Contact us