Structured Concurrency – nowe podejście do współbieżności w Javie

Mateusz Joniak

Uruchamianie wielu zadań równolegle

Pracując z mikroserwisami, często musimy wykonywać pewne operacje równolegle. Przykładowo trzeba wysłać zapytanie HTTP do innego serwisu, albo przenieść kosztowne obliczenia do innego wątku, a w międzyczasie wykonać resztę przetwarzania. Można to zrobić albo uruchamiając nowy wątek systemowy z klasy Thread lub ExecutorService. Niestety, ma to kilka wad.

Po pierwsze, nie mamy żadnej kontroli nad tym, jak pozostałe zadania zachowają się, kiedy jedno z nich zostanie anulowane:

try (var executorService = Executors.newCachedThreadPool()) {
    Future<Invoice> invoiceFuture = executorService.submit(
        () -> fetchFromInvoiceService(invoiceId)
    );
    Future<PaymentStatus> paymentStatusFuture = executorService.submit(
        () -> fetchFromPaymentsService(invoiceId)
    );

    processInvoicePayment(invoiceFuture.get(), paymentStatusFuture.get());
}

Jeśli przykładowo serwis Invoice aktualnie nie działa, otrzymasz wyjątek po wywołaniu get(). Wyjątek ten jednak nie przerwie zapytania do serwisu Payments, który wykonuje się w innym wątku, i dlatego i tak będziesz musiał zaczekać na jego wynik. W wypadku krótkiego zapytania HTTP to może nie być duży problem, ale równie dobrze mogą to być jakieś intensywne obliczenia wykorzystujące zasoby bez potrzeby.

Moglibyśmy to rozwiązać np. umieszczając każde wywołanie get() w blokach try ... catch i anulować odpowiednie wątki w wypadku błędu, ale w miarę jak rośnie liczba podzadań, taki kod będzie się stawał coraz bardziej skomplikowany.

Analogicznie, jeśli zapytanie do Payments skończy się błędem, to i tak musimy poczekać na zakończenie invoiceFuture zanim dostaniemy informację o wyjątku.

Inny problem z kodem powyżej tkwi w tym, że w wypadku zapytań HTTP, bazodanowych oraz innych operacji i/o, wątek zostanie zablokowany do zakończenia odczytu, przez co marnujemy czas CPU. Zamiast tworzyć wątek systemowy i go blokować, lepiej by było wykorzystać tutaj wątki wirtualne z Javy 21.

Nowy model Structured Concurrency, umieszczony w Javie 21 jako preview feature, stara się rozwiązać wszystkie te problemy.

Structured Concurrency: Task Scope’y i Subtaski

W Javie SE 21 (obecny LTS) nowy model współbieżności jest oznaczony jako preview feature. Dlatego musimy go najpierw włączyć, dodając do parametrów kompilatora --enable-preview. Na przykład w projekcie Mavenowym Spring Boot, trzeba dodać jvmArguments do pom.xml:

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <configuration>
                <jvmArguments>--enable-preview</jvmArguments>
            </configuration>
        </plugin>
    </plugins>
</build>

Główne API, które można rozumieć jako zamiennik dla ExecutorService to klasa StructuredTaskScope. Zamiast jak dotychczas Future, zwraca ona Subtaski (podzadania):

public void processInvoice(long invoiceId) throws ExecutionException, InterruptedException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        StructuredTaskScope.Subtask<Invoice> invoiceSubtask = scope.fork(
            () -> fetchFromInvoiceService(invoiceId)
        );
        StructuredTaskScope.Subtask<PaymentStatus> paymentStatusSubtask = scope.fork(
            () -> fetchFromPayments(invoiceId)
        );
        scope.join().throwIfFailed();

        processInvoicePayment(invoiceSubtask.get(), paymentStatusSubtask.get());
    }
}

Warto zwrócić uwagę na kilka szczegółów. Po pierwsze, możemy w jawny sposób określić, co dzieje się, kiedy jedno z zadań zakończy się przed pozostałymi. Polityka ShutdownOnFailure oznacza, że pierwszy błąd, który wystąpi w jednym z zadań automatycznie anuluje pozostałe. Druga opcja to ShutdownOnSuccess, które anuluje pozostałe zadania jeśli jedno z nich zakończy się poprawnie. Przydaje się ona w sytuacjach, gdzie wystarczy nam tylko jedna, dowolna z odpowiedzi.

Możemy też tworzyć własne polityki, rozszerzając klasę StructuredTaskScope.

Żeby poczekać na zakończenie podzadań trzeba wprost wywołać metodę join(). Dostępna jest też metoda joinUntil(), która pozwala przekazać termin wykonania (deadline) dla podzadań, dzięki czemu nie zblokujemy się na zawsze.

ShutdownOnSuccess ma też metodę result(), która pozwala zwrócić jedno z poprawnie wykonanych podzadań.

public WeatherForecast getFirstForecast(List<WeatherService> weatherServices, Instant deadline) {
    try (var scope = new StructuredTaskScope.ShutdownOnSuccess<WeatherForecast>()) {
        for (var weatherService : weatherServices) {
            scope.fork(() -> getWeather(weatherService.url()));
        }

        return scope.joinUntil(deadline).result();
    } catch (ExecutionException | InterruptedException | TimeoutException e) {
        // handle exceptions here
        throw new RuntimeException(e);
    }
}

StructuredTaskScope używa za kulisami wątków wirtualnych, co jest zaletą dla naturalnie asynchronicznych operacji i/o, ale niekoniecznie dla obliczeń zależnych od procesora. Można to nadpisać przekazując ThreadFactory:

try (var scope = new StructuredTaskScope.ShutdownOnFailure(null, Thread.ofPlatform().factory())) {
    // ...
}

Scoped Values

Kolejną nowością dostępną w Javie 21 jako preview feature są Scoped Values: sposób na przekazywanie wspólnego kontekstu w dół hierarchii podzadań. Przypominają one trochę zmienne thread local z dodatkowymi zabezpieczeniami.

Wróćmy do przykładu z serwisami Invoice i Payment. Powiedzmy, że potrzebujemy przekazać informacje kontekstowe o obecnie zalogowanym użytkowniku (nazwiemy tę klasę UserContext). Zamiast przekazywać ją do każdego podzadania jako parametr, możemy przechowywać ją jako Scoped Value:

private final static ScopedValue<UserContext> USER_CONTEXT = ScopedValue.newInstance();

Następnie możemy zapisać nasze dane kontekstowe i uruchomić processInvoice:

var userContext = new UserContext(userId, /* ... */);
ScopedValue.where(USER_CONTEXT, userContext).run(() -> {
    processInvoice(invoiceId);
});

Warto zwrócić uwagę, że ustawiona wartość nie wycieknie poza wywołanie naszej funkcji.

Ostatecznie, gdzieś w dole hierarchii podzadań, możemy wyciągnąć dane z odziedziczonego kontekstu:

private PaymentStatus fetchFromPaymentsService(long invoiceId) {
    long userId = USER_CONTEXT.get().userId();
    // authorize and send the HTTP call
    // ...
}

Nie musieliśmy zmieniać niczego w processInvoice, ponieważ Scoped Values zostały zaprojektowane z myślą o współpracy ze Structured Concurrency. Jeśli używamy StructuredTaskScope, to ona obsłuży dziedziczenie za nas.

Wnioski

Structured Concurrency to nowy, w wielu aspektach lepszy sposób pisania kodu zrównoleglonego. Pozwala na definiowanie polityki obsługi błędów, warunków anulowania podzadań, a w połączeniu ze Scoped Values pozwala na dziedziczenie stanu, a to wszystko bez konieczności używania np. skomplikowanych bibliotek reaktywnych.

W Javie 21 wciąż są to tylko zapowiedzi, ale wyglądają one obiecująco i warto zastanowić się nad ich użyciem, szczególnie jeśli chcemy wykorzystać również wątki wirtualne.

Bibliografia

Poznaj mageek of j‑labs i daj się zadziwić, jak może wyglądać praca z j‑People!

Skontaktuj się z nami