Rozproszona blokada przy użyciu Hazelcast’a

Jakub Kosiorek

Wprowadzenie

Podczas rozwijania aplikacji często trafiamy na problemy z wydajnością. Oczywistym rozwiązaniem (poza optymalizacją kodu) jest użycie skalowania. Zazwyczaj zaczyna się od użycia wielowątkowości. Minusem tego rozwiązania jest konieczność synchronizowania dostępu do wspólnych zasobów, zazwyczaj przy użyciu blokad. Skalując wertykalnie (dodając zasoby do maszyny, na której działa aplikacja) szybko napotykamy na fizyczną i kosztową barierę. Atrakcyjniejszym wyborem wydaję się użycie skalowania horyzontalnego (dodanie kolejnych maszyn/węzłów). To niestety wiąże się z kolejnym wyzwaniem – synchronizacją dostępu do wspólnych zasobów przez wiele węzłów. W tym artykule chciałbym podzielić się implementacją synchronizacji za pomocą rozproszonych blokad, którą miałem przyjemność współtworzyć.

Opis problemu

Rozważmy następujący scenariusz. Mamy do zaprojektowania system, który powinien rejestrować lokalizację pociągów. Pociąg jest aktualizowany za pomocą trzech typów wiadomości: utworzenie pociągu, przemieszczenie pociągu i usunięcie pociągu. Jako pierwszą otrzymujemy wiadomość tworzącą pociąg, następnie otrzymujemy wiadomości przemieszczające pociąg do kolejnych lokalizacji i na końcu otrzymujemy wiadomość, która usuwa pociąg. Załóżmy, że chcemy śledzić ile pociągów zostało utworzonych/usuniętych na poszczególnych lokalizacjach i ile pociągów przyjechało/odjechało z każdej lokalizacji. Aby to osiągnąć musimy zapisać te dane i zapewnić synchronizacje, ponieważ mogą być one zmieniane przez różne pociągi.

Implementacja – domena

Odnośniki do ostatecznej implementacji są umieszczone na końcu artykułu. Teraz przejdziemy krok po kroku wyjaśniając jak zostało to zaimplementowane. Z opisu problemu wiemy, że potrzebujemy dwóch obiektów by przetrzymywać dane – pociągu (Train) oraz lokalizacji (Location). Potrzebne są nam też obiekty dla każdej z typów wiadomości.

public class Train {
    private boolean exists;
    private String trainId;
    private String lastLocationId;

    public static Train getFromStore(String trainId) {
        // implementation
    }

    public void save() {
        // implementation
    }

    public void delete() {
        // implementation   
    }

    public boolean exists() {
        return exists;
    }

    // Getters, setters, equals, hash code, toString
}

W obiekcie typu Train potrzebujemy unikalnego identyfikatora pociągu oraz pola by przetrzymywać ostatnią lokalizację (unikalny identyfikator lokalizacji). Poza tym mamy metody, które pozwolą nam utworzyć/zmodyfikować/usunąć obiekt w magazynie oraz pomocniczą metodę, która sprawdza czy obiekt już tam istnieje.

public class Location {
    private boolean exists;
    private String id;
    private Integer numberOfTrainsCreated = 0;
    private Integer numberOfTrainsDeleted = 0;
    private Integer numberOfTrainsWhichArrived = 0;
    private Integer numberOfTrainsWhichDeparted = 0;

    public static Location getFromStore(String locationId) {
        // implementation
    }

    public void save() throws SavingException {
        // implementation
    }

    public void delete() {
        // implementation
    }

    public boolean exists() {
        return exists;
    }

    public void trainCreated() {
        ++numberOfTrainsCreated;
    }

    public void trainDeleted() {
        ++numberOfTrainsDeleted;
    }

    public void trainArrived() {
        ++numberOfTrainsWhichArrived;
    }

    public void trainDeparted() {
        ++numberOfTrainsWhichDeparted;
    }

    // Getters, setters, equals, hash code, toString
}

Obiekt typu Location pozwala na przechowywanie ile pociągów zostało utworzonych/usuniętych oraz ile przyjechało/odjechało z danej lokalizacji. Podobnie jak w przypadku pociągu mamy tu metody, które pozwalają zapisać/zmodyfikować/usunąć obiekt. Dodatkowe metody pozwalają na zwiększanie liczników poszczególnych zdarzeń pociągu.

public class TrainCreationMessage {
    private String id;
    private String trainId;
    private String location;

    // Getters, setters, equals, hash code, toString
}
public class TrainDeletionMessage {
    private String id;
    private String trainId;

    // Getters, setters, equals, hash code, toString
}
public class TrainMovementMessage {
    private String id;
    private String trainId;
    private String toLocation;

    // Getters, setters, equals, hash code, toString
}

Wszystkie wiadomości mają unikalny identyfikator, który pomoże nam śledzić jak dana wiadomość została przetworzona oraz pole zawierające unikalny identyfikator pociągu. Wiadomość typu TrainCreationMessage oraz TrainMovementMessage posiadają również pole z identyfikatorem lokalizacji by wskazać gdzie pociąg ma być utworzony/gdzie powinien się przemieścić.

Implementacja – logika przetwarzania wiadomości

Skoro mamy już początkowy model domeny, możemy określić jak chcemy przetworzyć każdą z wiadomości. Rozpoczniemy z wiadomością typu TrainCreationMessage:

public class MessageProcessor {

    private static final Logger log = LoggerFactory.getLogger(MessageProcessor.class);

    private void process(TrainCreationMessage message) {
        Train train = Train.getFromStore(message.getTrainId());
        if (!train.exists()) {
            train.setLastLocationId(message.getLocation());
            train.save();

            Location location = Location.getFromStore(message.getLocation());
            location.trainCreated();
            location.save();
            log.info("{} Train '{}' created at location '{}'", message.getMessageId(), train.getTrainId(),
                    train.getLastLocationId());
        } else {
            log.info("{} Cannot create train '{}' at location '{}' as it already exists. Train location '{}'", message.getMessageId(), train.getTrainId(), message.getLocation(), train.getLastLocationId());
        }
    }

    // [...]
}

Wiadomość TrainCreationMessage jest przetwarzana następująco:
– Jeśli pociąg już istnieje – wiadomość jest ignorowana
– W przeciwnym wypadku – ustawiamy ostatnią lokalizację pociągu w obiekcie typu Train, zapisujemy go, następnie zwiększamy licznik utworzonych pociągów w obiekcie typu Location i również go zapisujemy

public class MessageProcessor {
    // [...]
    private void process(TrainMovementMessage message) {
        Train train = Train.getFromStore(message.getTrainId());
        if (train.exists()) {
            Location lastTrainLocation = Location.getFromStore(train.getLastLocationId());
            lastTrainLocation.trainDeparted();
            lastTrainLocation.save();

            Location newTrainLocation = Location.getFromStore(message.getToLocation());
            newTrainLocation.trainArrived();
            newTrainLocation.save();

            train.setLastLocationId(message.getToLocation());
            train.save();
            log.info("{} Train '{}' moved from '{}' to '{}'", message.getMessageId(), train.getTrainId(), lastTrainLocation.getId(),
                    message.getToLocation());
        } else {
            log.info("{} Train {} does not exists - needs to be created first", message.getMessageId(), train.getTrainId());
        }
    }
}

Wiadomość typu TrainMovementMessage poza aktualizacją pociągu i lokalizacji, uaktualnia też poprzednią lokalizację pociągu – zwiększa licznik pociągów, które odjechały.

public class MessageProcessor {
    // [...]
    private void process(TrainDeletionMessage message) {
        Train train = Train.getFromStore(message.getTrainId());
        if (train.exists()) {
            Location location = Location.getFromStore(train.getLastLocationId());
            location.trainDeleted();
            location.save();

            train.delete();
            log.info("{} Train '{}' deleted", message.getMessageId(), train.getTrainId());
        } else {
            log.info("{} Train {} cannot be deleted as it does not exists", message.getMessageId(), train.getTrainId());
        }
    }
}

Wiadomość typu TrainDeletionMessage usuwa obiekt pociągu (jeśli istnieje) oraz aktualizuje licznik usuniętych pociągów dla lokalizacji na której pociąg znajdował się ostatnio.

Implementacja – synchronizacja

Analizując logikę przetwarzania wiadomości widać, że modyfikowanie obiektów typu Train oraz Location przez wiele wątków będzie sprawiać problemy. Do synchronizacji pomiędzy wątkami użyjemy Hazelcast IMDG. Mamy dwa typy obiektów więc utwórzmy dla nich enum:

public enum SynchronizationKeyType {
    LOCATION("LOC_"),
    TRAIN("TRA_");

    private SynchronizationKeyType(String prefix) {
        this.prefix = prefix;
    }

    private String prefix;

    public String getPrefix() {
        return prefix;
    }
}

Ten enum pozwoli nam utworzyć klucz, którego użyjemy do synchronizacji w osobnym obiekcie:

public class Synchronization {

    private SynchronizationKeyType key;
    private String value;

    public Synchronization(SynchronizationKeyType key, String value) {
        this.key = key;
        this.value = value;
    }

    private String getKey() {
        return key.getPrefix() + value;
    };

    public boolean synchronize() throws InterruptedException {
        String keyToSync = getKey();
        return getMap().tryLock(keyToSync, 10, TimeUnit.MILLISECONDS, 2, TimeUnit.MINUTES);
    }

    public void desynchronize() {
        String keyToSync = getKey();
        getMap().unlock(keyToSync);
    }

    private IMap<String, Synchronization> getMap() {
        return HazelcastManager.getInstance().getSynchronizationMap();
    }
}

Jako klucz do synchronizacji użyjemy prefiksu z enum’a oraz unikalnego identyfikatora obiektu typu Location lub Train. Dzięki temu klucz do synchronizacji będzie unikalny nawet jeśli pociąg i lokalizacja będą miały ten sam unikalny identyfikator. Do uzyskania blokad użyjemy metody tryLock z Hazelcast’owej IMap’y. Próbujemy uzyskać blokadę przez 10 milisekund. Gdy to się nie powiedzie, metoda zwraca false. Dodatkowo ustawiamy TTL (time to live) na blokadzie na 2 minuty na wypadek gdybyśmy nie zwolnili blokady. To pozwoli na kontynuowanie przetwarzania wiadomości dla danego klucza w przypadku błędu. HazelcastManager jest to klasa typu singleton, która tworzy i przetrzymuje obiekt typu HazelcastInstance. HazelcastInstance umożliwia interakcje z Hazelcast’em.

Skoro mamy już klasę do synchronizacji, możemy wziąć się za jej implementację dla każdego typu wiadomości. Na początku stwórzmy nowy interfejs:

public interface Synchronizable {
    public static final Logger log = LoggerFactory.getLogger(Synchronizable.class);
    public static final int MAX_RETRIES = 10;

    public Set<Synchronization> getSynchronizationKeys();
    public String getMessageId();

    public default boolean obtainLocks(Set<Synchronization> keys) throws InterruptedException {
        Set<Synchronization> synchronizedKeys = new HashSet<>();
        int counter = 0;
        while (counter < MAX_RETRIES) {
            ++counter;
            for (Synchronization key : keys) {
                if (key.synchronize()) {
                    synchronizedKeys.add(key);
                } else {
                    releaseLocks(synchronizedKeys);
                    synchronizedKeys.clear();
                    log.debug("{} Failed to get all required locks for message {}", getMessageId());
                    break;
                }
            }
            if (keys.size() == synchronizedKeys.size()) {
                log.info("{} Acquired all required locks for message on {} attempt", getMessageId(), counter);
                return true;
            }
        }
        log.error("{} Failed to get all required locks for message {}", getMessageId());
        return false;
    }

    public default void releaseLocks(Set<Synchronization> keys) {
        for (Synchronization key : keys) {
            key.desynchronize();
        }
    }
}

Metoda obtainLocks z interfejsu Synchronizable stara się założyć blokadę na wszystkich kluczach zdefiniowanych do synchronizacji. Jeśli dla któregoś z nich się nie uda, już założone blokady są zwalniane i próbujemy ponownie (do 10 razy). Gdy uda się założyć blokadę dla wszystkich kluczy, zwracamy wartość true by oznajmić sukces.

Następnym krokiem jest ustalenie jakie klucze należy synchronizować dla każdego typu wiadomości. Dla wiadomości typu TrainCreationMessage jest to identyfikator pociągu i lokalizacji na której pociąg będzie utworzony.

public class TrainCreationMessage implements Synchronizable {
    //[...]

    @Override
    public Set<Synchronization> getSynchronizationKeys() {
        Set<Synchronization> result = new LinkedHashSet<>(); 
        result.add(new Synchronization(SynchronizationKeyType.TRAIN, trainId));
        result.add(new Synchronization(SynchronizationKeyType.LOCATION, location));
        return result;
    }
}

Dla wiadomości typu TrainMovementMessage jest to identyfikator pociągu, który będzie się przemieszczał, identyfikator lokalizacji na której obecnie znajduję się pociąg oraz identyfikator lokalizacji do której pociąg się przemieści.

public class TrainMovementMessage implements Synchronizable {
    //[...]

    @Override
    public Set<Synchronization> getSynchronizationKeys() {
        Set<Synchronization> result = new LinkedHashSet<>(); 
        result.add(new Synchronization(SynchronizationKeyType.TRAIN, trainId));
        Train train = Train.getFromStore(trainId);
        if (train.exists()) {
            result.add(new Synchronization(SynchronizationKeyType.LOCATION, train.getLastLocationId()));
        }
        result.add(new Synchronization(SynchronizationKeyType.LOCATION, toLocation));
        return result;
    }

Dla wiadomości typu TrainDeletionMessage jest to identyfikator pociągu, który zostanie usunięty oraz identyfikator lokalizacji na której obecnie znajduję się pociąg.

public class TrainDeletionMessage implements Synchronizable {
    //[...]

    @Override
    public Set<Synchronization> getSynchronizationKeys() {
        Set<Synchronization> result = new LinkedHashSet<>(); 
        result.add(new Synchronization(SynchronizationKeyType.TRAIN, trainId));
        Train train = Train.getFromStore(trainId);
        if (train.exists()) {
            result.add(new Synchronization(SynchronizationKeyType.LOCATION, train.getLastLocationId()));
        }
        return result;
    }

Teraz gdy mamy zaimplementowaną synchronizacje dla każdego z typów wiadomości, użyjmy jej w logice przetwarzania wiadomości:

public class MessageProcessor {

    private static final Logger log = LoggerFactory.getLogger(MessageProcessor.class);
    public static final int MAX_RETRIES = 3;

    public void process(Synchronizable message) throws InterruptedException {
        boolean processed = false;
        int attempt = 0;
        while (!processed && attempt < MAX_RETRIES) {
            ++attempt;
            Set<Synchronization> keysToLock = message.getSynchronizationKeys();
            if (message.obtainLocks(keysToLock)) {
                try {
                    if (message instanceof TrainCreationMessage) {
                        process((TrainCreationMessage) message);
                    } else if (message instanceof TrainDeletionMessage) {
                        process((TrainDeletionMessage) message);
                    } else if (message instanceof TrainMovementMessage) {
                        process((TrainMovementMessage) message);
                    }
                    processed = true;
                } catch (SavingException e) {
                    log.error("Error while processing message " + message.getClass().getSimpleName() + " with id "
                            + message.getMessageId(), e);
                } finally {
                    message.releaseLocks(keysToLock);
                    log.info("{} released all locks for message", message.getMessageId());
                }
            }
        }
    }

    //[...]
}

Zanim przetworzymy synchronizowalną (Synchronizable) wiadomość potrzebujemy założyć blokady na wszystkie potrzebne klucze. Gdy już je mamy, wywoływana jest odpowiednia metoda odpowiedzialna za przetworzenie odpowiedniego typu wiadomości. Jeśli założenie blokad się nie uda, próbujemy ponownie (do trzech razy). Uważna lektura powyższego kodu pozwoli zauważyć blok obsługi błędu SavingException. Problem poruszany w tym artykule, jest w stosunkowo prosty. Jeśli chcemy go rozbudować, wprowadzimy nowe wiadomości i nowe obiekty, które będą musiały być synchronizowane. By zidentyfikować potencjalne problemy z synchronizacją (modyfikacja obiektu przez inny wątek zanim obecny skończył z modyfikacją), dodano dodatkową logikę przy zapisywaniu obiektów typu Train i Location:

public class Train implements Serializable {
    private static final long serialVersionUID = 7105498476179814166L;
    private boolean exists;
    private Integer hashCode;
    private String trainId;
    private String lastLocationId;

    public static Train getFromStore(String trainId) {
        Train train = getMap().get(trainId);
        if (train == null) {
            train = new Train();
            train.setTrainId(trainId);
            train.setToNotExists();
        }
        return train;
    }

    private static IMap<String, Train> getMap() {
        return HazelcastManager.getInstance().getTrainMap();
    }

    public void save() throws SavingException {
        boolean prevExists = exists;
        Integer originalHashCode = hashCode;
        hashCode = hashCode();
        setToExists();
        Train previous = getMap().put(trainId, this);
        if (prevExists && previous != null && !previous.getHashCode().equals(originalHashCode)) {
            String message = """
                    Other process modified the object. Saved object 
                    {%s},
                    object in hazelcast 
                    {%s}
                    """.formatted(this, previous);
            throw new SavingException(message);
        }
    }

    public void delete() {
        getMap().remove(trainId);
        setToNotExists();
    }

    public void setToExists() {
        exists = true;
    }

    public void setToNotExists() {
        exists = false;
    }

    public boolean exists() {
        return exists;
    }

    //[...]
}

Zanim zapiszemy obiekt w Hazelcast’owej mapie obliczamy kod skrótu obiektu, by zapisać go wewnątrz obiektu z innymi polami. Kiedy dostajemy obiekt z Hazelcast’a oryginalny kod skrótu jest również wgrywany. To pozwala nam porównać czy oryginalny kod skrótu jest taki same jak ten, który jest zwrócony z Hazelcast’a przy zapisywaniu (metoda put klasy IMap zwraca poprzednio przechowywany obiekt). Jeśli są różne oznacza to, że inny wątek zmodyfikował ten obiekt. Pozwala to ustalić jakich kluczy synchronizacji nam jeszcze brakuje.

Podsumowanie

Użycie rozproszonych blokad nie jest rozwiązaniem idealnym i jak wszystko ma swoje problemy. Przed ich użyciem w konkretnym systemie, należy przeanalizować jego wady i zalety. W tym konkretnym przypadku działało to na tyle dobrze, iż uzasadniało jego użycie.

Odnośniki

Odnośniki do kodu

Poznaj mageek of j‑labs i daj się zadziwić, jak może wyglądać praca z j‑People!

Skontaktuj się z nami