Rozproszona blokada przy użyciu Hazelcast’a
Wprowadzenie
Podczas rozwijania aplikacji często trafiamy na problemy z wydajnością. Oczywistym rozwiązaniem (poza optymalizacją kodu) jest użycie skalowania. Zazwyczaj zaczyna się od użycia wielowątkowości. Minusem tego rozwiązania jest konieczność synchronizowania dostępu do wspólnych zasobów, zazwyczaj przy użyciu blokad. Skalując wertykalnie (dodając zasoby do maszyny, na której działa aplikacja) szybko napotykamy na fizyczną i kosztową barierę. Atrakcyjniejszym wyborem wydaję się użycie skalowania horyzontalnego (dodanie kolejnych maszyn/węzłów). To niestety wiąże się z kolejnym wyzwaniem – synchronizacją dostępu do wspólnych zasobów przez wiele węzłów. W tym artykule chciałbym podzielić się implementacją synchronizacji za pomocą rozproszonych blokad, którą miałem przyjemność współtworzyć.
Opis problemu
Rozważmy następujący scenariusz. Mamy do zaprojektowania system, który powinien rejestrować lokalizację pociągów. Pociąg jest aktualizowany za pomocą trzech typów wiadomości: utworzenie pociągu, przemieszczenie pociągu i usunięcie pociągu. Jako pierwszą otrzymujemy wiadomość tworzącą pociąg, następnie otrzymujemy wiadomości przemieszczające pociąg do kolejnych lokalizacji i na końcu otrzymujemy wiadomość, która usuwa pociąg. Załóżmy, że chcemy śledzić ile pociągów zostało utworzonych/usuniętych na poszczególnych lokalizacjach i ile pociągów przyjechało/odjechało z każdej lokalizacji. Aby to osiągnąć musimy zapisać te dane i zapewnić synchronizacje, ponieważ mogą być one zmieniane przez różne pociągi.
Implementacja – domena
Odnośniki do ostatecznej implementacji są umieszczone na końcu artykułu. Teraz przejdziemy krok po kroku wyjaśniając jak zostało to zaimplementowane. Z opisu problemu wiemy, że potrzebujemy dwóch obiektów by przetrzymywać dane – pociągu (Train) oraz lokalizacji (Location). Potrzebne są nam też obiekty dla każdej z typów wiadomości.
W obiekcie typu Train potrzebujemy unikalnego identyfikatora pociągu oraz pola by przetrzymywać ostatnią lokalizację (unikalny identyfikator lokalizacji). Poza tym mamy metody, które pozwolą nam utworzyć/zmodyfikować/usunąć obiekt w magazynie oraz pomocniczą metodę, która sprawdza czy obiekt już tam istnieje.
Obiekt typu Location pozwala na przechowywanie ile pociągów zostało utworzonych/usuniętych oraz ile przyjechało/odjechało z danej lokalizacji. Podobnie jak w przypadku pociągu mamy tu metody, które pozwalają zapisać/zmodyfikować/usunąć obiekt. Dodatkowe metody pozwalają na zwiększanie liczników poszczególnych zdarzeń pociągu.
Wszystkie wiadomości mają unikalny identyfikator, który pomoże nam śledzić jak dana wiadomość została przetworzona oraz pole zawierające unikalny identyfikator pociągu. Wiadomość typu TrainCreationMessage oraz TrainMovementMessage posiadają również pole z identyfikatorem lokalizacji by wskazać gdzie pociąg ma być utworzony/gdzie powinien się przemieścić.
Implementacja – logika przetwarzania wiadomości
Skoro mamy już początkowy model domeny, możemy określić jak chcemy przetworzyć każdą z wiadomości. Rozpoczniemy z wiadomością typu TrainCreationMessage:
Wiadomość TrainCreationMessage jest przetwarzana następująco:
– Jeśli pociąg już istnieje – wiadomość jest ignorowana
– W przeciwnym wypadku – ustawiamy ostatnią lokalizację pociągu w obiekcie typu Train, zapisujemy go, następnie zwiększamy licznik utworzonych pociągów w obiekcie typu Location i również go zapisujemy
Wiadomość typu TrainMovementMessage poza aktualizacją pociągu i lokalizacji, uaktualnia też poprzednią lokalizację pociągu – zwiększa licznik pociągów, które odjechały.
Wiadomość typu TrainDeletionMessage usuwa obiekt pociągu (jeśli istnieje) oraz aktualizuje licznik usuniętych pociągów dla lokalizacji na której pociąg znajdował się ostatnio.
Implementacja – synchronizacja
Analizując logikę przetwarzania wiadomości widać, że modyfikowanie obiektów typu Train oraz Location przez wiele wątków będzie sprawiać problemy. Do synchronizacji pomiędzy wątkami użyjemy Hazelcast IMDG. Mamy dwa typy obiektów więc utwórzmy dla nich enum:
Ten enum pozwoli nam utworzyć klucz, którego użyjemy do synchronizacji w osobnym obiekcie:
Jako klucz do synchronizacji użyjemy prefiksu z enum’a oraz unikalnego identyfikatora obiektu typu Location lub Train. Dzięki temu klucz do synchronizacji będzie unikalny nawet jeśli pociąg i lokalizacja będą miały ten sam unikalny identyfikator. Do uzyskania blokad użyjemy metody tryLock z Hazelcast’owej IMap’y. Próbujemy uzyskać blokadę przez 10 milisekund. Gdy to się nie powiedzie, metoda zwraca false. Dodatkowo ustawiamy TTL (time to live) na blokadzie na 2 minuty na wypadek gdybyśmy nie zwolnili blokady. To pozwoli na kontynuowanie przetwarzania wiadomości dla danego klucza w przypadku błędu. HazelcastManager jest to klasa typu singleton, która tworzy i przetrzymuje obiekt typu HazelcastInstance. HazelcastInstance umożliwia interakcje z Hazelcast’em.
Skoro mamy już klasę do synchronizacji, możemy wziąć się za jej implementację dla każdego typu wiadomości. Na początku stwórzmy nowy interfejs:
Metoda obtainLocks z interfejsu Synchronizable stara się założyć blokadę na wszystkich kluczach zdefiniowanych do synchronizacji. Jeśli dla któregoś z nich się nie uda, już założone blokady są zwalniane i próbujemy ponownie (do 10 razy). Gdy uda się założyć blokadę dla wszystkich kluczy, zwracamy wartość true by oznajmić sukces.
Następnym krokiem jest ustalenie jakie klucze należy synchronizować dla każdego typu wiadomości. Dla wiadomości typu TrainCreationMessage jest to identyfikator pociągu i lokalizacji na której pociąg będzie utworzony.
Dla wiadomości typu TrainMovementMessage jest to identyfikator pociągu, który będzie się przemieszczał, identyfikator lokalizacji na której obecnie znajduję się pociąg oraz identyfikator lokalizacji do której pociąg się przemieści.
Dla wiadomości typu TrainDeletionMessage jest to identyfikator pociągu, który zostanie usunięty oraz identyfikator lokalizacji na której obecnie znajduję się pociąg.
Teraz gdy mamy zaimplementowaną synchronizacje dla każdego z typów wiadomości, użyjmy jej w logice przetwarzania wiadomości:
Zanim przetworzymy synchronizowalną (Synchronizable) wiadomość potrzebujemy założyć blokady na wszystkie potrzebne klucze. Gdy już je mamy, wywoływana jest odpowiednia metoda odpowiedzialna za przetworzenie odpowiedniego typu wiadomości. Jeśli założenie blokad się nie uda, próbujemy ponownie (do trzech razy). Uważna lektura powyższego kodu pozwoli zauważyć blok obsługi błędu SavingException. Problem poruszany w tym artykule, jest w stosunkowo prosty. Jeśli chcemy go rozbudować, wprowadzimy nowe wiadomości i nowe obiekty, które będą musiały być synchronizowane. By zidentyfikować potencjalne problemy z synchronizacją (modyfikacja obiektu przez inny wątek zanim obecny skończył z modyfikacją), dodano dodatkową logikę przy zapisywaniu obiektów typu Train i Location:
Zanim zapiszemy obiekt w Hazelcast’owej mapie obliczamy kod skrótu obiektu, by zapisać go wewnątrz obiektu z innymi polami. Kiedy dostajemy obiekt z Hazelcast’a oryginalny kod skrótu jest również wgrywany. To pozwala nam porównać czy oryginalny kod skrótu jest taki same jak ten, który jest zwrócony z Hazelcast’a przy zapisywaniu (metoda put klasy IMap zwraca poprzednio przechowywany obiekt). Jeśli są różne oznacza to, że inny wątek zmodyfikował ten obiekt. Pozwala to ustalić jakich kluczy synchronizacji nam jeszcze brakuje.
Podsumowanie
Użycie rozproszonych blokad nie jest rozwiązaniem idealnym i jak wszystko ma swoje problemy. Przed ich użyciem w konkretnym systemie, należy przeanalizować jego wady i zalety. W tym konkretnym przypadku działało to na tyle dobrze, iż uzasadniało jego użycie.
Odnośniki
- https://docs.hazelcast.com/hazelcast/5.3/
- https://www.linkedin.com/advice/0/what-some-common-distributed-locking-patterns
- https://www.baeldung.com/java-concurrent-locks
Odnośniki do kodu
Poznaj mageek of j‑labs i daj się zadziwić, jak może wyglądać praca z j‑People!
Skontaktuj się z nami


