Advanced Concurrency in Java

Krzysztof Dymek

The concurrent programming model in the Java language has changed significantly in the past years. From “green threads” to native thread support, from synchronized, blocking operations to asynchronous, non-blocking, from mutable objects to immutable shared state.

Some of those changes happen because of increasing platform capabilities, some were introduced as new API in the standard library, some of them were just a result of the paradigm and a way of thinking shift. In this article, we will go through three ways of solving the same problem, using different tools and paradigms.

The Producer-Consumer problem

Producer-Consumer problem is a classic problem of multi-process synchronization, where two groups of threads are using a common buffer. One of the groups called producers is pushing to the buffer, while the other, called consumers is constantly pulling. Our goal is to write code that guarantees that:

  • All produced items will be consumed
  • Consumers will not pull from an empty buffer
  • Threads will not fall into a deadlock
  • Throughput will be maximized We will use an unbounded buffer but the same approach will allow you to solve this problem with a bounded buffer.

A student’s solution

The first solution will be provided by an IT student. Our student has just finished the “Java Concurrency Crash Course” and knows a bit about the synchronized keyword and wait/notify methods decides to solve the problem on his own. After all, what can go wrong? The student doesn’t need a lot, a good, old List plus two synchronized methods and we have a solution:

package concurency;

import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class Student {
    List<integer> buffer = new LinkedList<>();

    private Runnable producer = () -> {
        Random r = new Random();
        while (true) {
            try {
                int item = r.nextInt();
                System.out.println("Produced: " + item);
                produce(item);
                Thread.sleep(1000 + r.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    };

    private Runnable consumer = () -> {
        while(true) {
            try {
                Integer consumed = consume();
                System.out.println("Consumed: " + consumed);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };


    private void produce(Integer item) {
        synchronized (buffer) {
            buffer.add(item);
            buffer.notifyAll();
        }
    }

    private Integer consume() throws InterruptedException {
        synchronized (buffer) {
            while (buffer.isEmpty()) {
                buffer.wait();
            }
            Integer removed = buffer.remove(0);
            buffer.notifyAll();
            return removed;
        }
    }

    public void play() {
        List<thread> threads = IntStream.range(0, 10).boxed()
                .map(i -> new Thread(producer))
                .collect(Collectors.toList());
        IntStream.range(0, 10).boxed()
                .map(i -> new Thread(consumer))
                .forEach(threads::add);
        threads.forEach(Thread::start);
        threads.forEach(thread -> {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

    public static void main(String[] args) {
        new Student().play();
    }
}

The code is simple and straightforward. All critical sections are protected using the synchronized block, consumers correctly check for an item to consume presence. Threads are notified after producing a new item in the buffer so deadlock shouldn’t occur. Although the code is correct, it could be simpler. Let’s try with the next solution.

An engineer’s solution

The next solution will be provided by a Java Engineer. He already has some experience, has heard about new packages introduced in Java 1.5 (sic!), and feels he can do it better. After a brief lecture on “Java concurrency in practice” and a quick StackOverflow search, he came up with the following code:

package concurency;

import java.util.Random;
import java.util.concurrent.*;

import java.util.stream.IntStream;

public class Engineer {
    BlockingQueue<integer> buffer = new LinkedBlockingQueue<>();

    private Runnable producer = () -> {
        Random r = new Random();
        while (true) {
            try {
                int item = r.nextInt();
                System.out.println("Produced: " + item);
                buffer.put(item);
                Thread.sleep(1000 + r.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    };

    private Runnable consumer = () -> {
        while(true) {
            try {
                Integer consumed = buffer.take();
                System.out.println("Consumed: " + consumed);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };

    public void play() {
        ExecutorService executor = Executors.newCachedThreadPool();

        IntStream.range(0, 10).boxed()
                .map(i -> producer)
                .forEach(executor::submit);
        IntStream.range(0, 10).boxed()
                .map(i -> consumer)
                .forEach(executor::submit);
        try {
            executor.awaitTermination(Long.MAX_VALUE,TimeUnit.DAYS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new Engineer().play();
    }
}

The code above is definitely better than the first solution. It is significantly shorter and does not have synchronized blocks. Instead, a blocking queue was introduced to provide synchronization between threads. As synchronized blocks together with wait/notify methods are extremely error-prone we can consider this as a good step forward. Moreover, instead of manually running and managing threads, our Engineer decides to use ExecutorService – easy to use abstraction rather than the raw threads used by the Student. ExecutorService also gives us the ability to submit a Callable interface instead of Runnable, which might be useful when solving complicated, real-life problems.

The biggest issue which is still not resolved is the usage of blocking operations. Items are produced slower than they are consumed so consumer threads are blocked by most of their time. To solve this we need our third solution:

A Geek’s solution

A Geek is our last guest today. He likes a non-blocking approach to solving concurrency problems, he’s already familiar with new APIs introduced in Java 1.8 (new APIs… it was 6 years ago!) and will try to solve the Producer-Consumer problem in a purely asynchronous way. Here is his code:

package concurency;

import java.util.Random;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.IntStream;

public class Geek {
    private Random r = new Random();

    private Supplier<integer> producer = () -> {
        int item = r.nextInt();
        System.out.println("Produced: " + item + " by " + Thread.currentThread().getName());
        return item;
    };

    private Consumer<integer> consumer = i -> {
        System.out.println("Consumed: " + i + " by " + Thread.currentThread().getName());
    };

    public void play() {
        while (true) {
            IntStream.range(0, 10).boxed().forEach(i ->
            CompletableFuture.supplyAsync(producer)
                    .thenAcceptAsync(consumer));
            try {
                Thread.sleep(500 + r.nextInt(500));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        new Geek().play();
    }
}

This time not only is the code shorter and simpler but we can also observe a complete paradigm shift. Instead of having Runnable implementations, we have Supplier and Consumer – interfaces completely not connected to concurrency. Instead of creating ExecutionService with long-running tasks submitted, we have CompletableFuture where our one-off tasks are supplied.

We can also skip the buffer – the producer task result is passed directly to the consumer which is asynchronously executed when the producer finishes his work. And all of this without even a single line of blocking operation.

The solution above just scratches the surface of possibilities provided by CompletableFuture. When facing complex, real-life problems we can execute stages expressed as FunctionConsumer, or Runnable (depending on whether they require arguments and/or produces results ). Execution stages can be chained and triggered based on another stage (or stages) execution. Even exceptions can be handled in an asynchronous way, as they were ordinal computation results.

Conclusion

The modern Java concurrency programming model changed significantly during the last few releases. CompletableFuture supported by lambda expressions makes the language more adapted to our times – it allows developers to write concise, function-oriented, asynchronous code without worrying about low-level synchronization and manual Thread management. If you are still curious about how to stop Thread in java after it was started, all I can tell you is that there is no good answer to this question. The only safe and universal solution is System.exit() although this one is rarely possible. But this is a topic for another article.

https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/package-summary.html

Poznaj mageek of j‑labs i daj się zadziwić, jak może wyglądać praca z j‑People!

Skontaktuj się z nami