Java 8 CompletableFutures

Java 8 multithreading programming has an important feature named CompletableFuture.
In previous post about Java Future, we have the concept of a Future object, which can help us make something done while waiting for other things. However, although we can inspect Future objects to see if they’re done, what if we wanna execute some code whenever its ready? We still have to wait until the result is available.

CompletableFuture meets the requirement, and more than that.

Related articles:
Java Future
CompletableFuture Handle Exception
Java 8 Multiple CompletableFutures

I. Create CompletableFuture

We use one of those factory methods to create a CompletableFuture object:

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

If we don’t provide any Executor object as input parameter, the method will use ForkJoinPool.commonPool()


private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1);
    /**
     * Default executor -- ForkJoinPool.commonPool() unless it cannot
     * support parallelism.
     */
    private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

In this example, we use that global, general purpose pool ForkJoinPool.commonPool():

CompletableFuture.supplyAsync(new Supplier<String>() {
	@Override
	public String get() {
		try {
			System.out.println("inside future: waiting for detecting...");
			// process data
			System.out.println("inside future: done...");

			return numbers.get(index);
		} catch (Throwable e) {
			return "not detected";
		}
	}
});

II. Transform results

If we wanna do more things after the result is available, CompletableFuture has thenApply() method:

<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);

We will make some operations in chain: parse String to Integer, do math operation, and create a String result in the example:

future = future.thenApply(Integer::parseInt).thenApply(r -> r * 2 * Math.PI).thenApply(s -> "apply>> " + s);

Just like supplyAsync() method above, we can apply those operations asynchronously in different thread pool by using:

<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);

III. Complete a future

1. Final stage

thenApply() return a CompletableFuture object with specific Type, but thenAccept() returns a CompletableFuture<Void>.

CompletableFuture<Void> thenAccept(Consumer<? super T> action);

We doesn’t need a thenAccept() as final stage to make the future object work at the end. It just helps us do more things. But we need complete method or get method to make a final actually.

2. complete method

The complete method sends the result to the future despite transformation that the operations have done before.


future = future.thenApply(Integer::parseInt).thenApply(r -> r * 2 * Math.PI).thenApply(s -> "apply>> " + s);
future.complete("foo");
future = future.thenApply(s -> "apply>> " + s);
future.thenAccept(result -> System.out.println("accept: " + result));

The code above will show the result:


accept: apply>> foo

3. get method

Just like Java Future, we use get() to retrieve the result of the completion when it completes:


String contents = future.get();

IV. Source code


package com.grokonez.completablefuture;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;

public class MainApp {

	private static ArrayList<String> numbers = new ArrayList<String>(Arrays.asList("0", "1", "2"));

	public static void main(String[] args) throws InterruptedException, ExecutionException {

		System.out.println("Requesting");

		CompletableFuture<String> future = createCF(1);

		future = future.thenApply(Integer::parseInt).thenApply(r -> r * 2 * Math.PI).thenApply(s -> "apply>> " + s);
//		future.complete("foo");
		future.thenAccept(result -> System.out.println("accept: " + result));
		
		// other statements
		for (int i = 1; i <= 3; i++) {
			try {
				Thread.sleep(500);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println("running outside... " + i + " time");
		}

//		future.complete("foo");
		String contents = future.get();
		System.out.println("Future result: " + contents);
	}

	private static CompletableFuture<String> createCF(int index) {
		return CompletableFuture.supplyAsync(new Supplier<String>() {
			@Override
			public String get() {
				try {
					System.out.println("inside future: waiting for detecting...");
					for (int i = 1; i <= 5; i++) {
						try {
							Thread.sleep(1000);
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
						System.out.println("running inside Future... " + i + " sec");
					}
					System.out.println("inside future: done...");

					return numbers.get(index);
				} catch (Throwable e) {
					return "not detected";
				}
			}
		});
	}
}

Check results in Console Screen:


Requesting
inside future: waiting for detecting...
running outside... 1 time
running inside Future... 1 sec
running outside... 2 time
running outside... 3 time
running inside Future... 2 sec
running inside Future... 3 sec
running inside Future... 4 sec
running inside Future... 5 sec
inside future: done...
accept: apply>> 6.283185307179586
Future result: apply>> 6.283185307179586


By grokonez | December 14, 2016.

Last updated on April 3, 2021.



Related Posts


Got Something To Say:

Your email address will not be published. Required fields are marked *

*