Java 8 Multiple CompletableFutures

In previous posts, we have concept of how to use CompletableFutures. This tutorial is about combining multiple CompletableFutures, it is one of the most important and useful abilities of Java 8 asynchronous processing.

Related articles:
Java Future
CompletableFutures
CompletableFuture Handle Exception

I. Usage

1. thenCompose()

<U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)

thenCompose() can chain 2 CompletableFutures by using the result which is returned from the invoking future.

private static void testCompose() throws InterruptedException, ExecutionException {
	CompletableFuture<String> future = createCF(2); // inside future
	CompletableFuture<String> combinedFuture = future.thenCompose(MainApp::calculateCF);

	combinedFuture.thenAccept(result -> System.out.println("accept: " + result));
	// check results
	System.out.println("Future result>> " + future.get());
	System.out.println("combinedFuture result>> " + combinedFuture.get());
}

private static CompletableFuture<String> calculateCF(String s) {

	return CompletableFuture.supplyAsync(new Supplier<String>() {
		@Override
		public String get() {
			System.out.println("> inside new Future");
			return "new Completable Future: " + s;
		}
	});
}

combinedFuture uses thenCompose() method of future with calculateCF function to get and modify the result from previous step of future.

Run the code, the Console shows:


inside future: waiting for detecting index: 2...
inside future: done...
> inside new Future
accept: new Completable Future: 2
Future result>> 2
combinedFuture result>> new Completable Future: 2

2. thenCombine()

<U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn)

thenCombine() combines two futures when they are both done, it is different from thenCompose() which chains a new future only when the first future is done.

private static void testCombine() throws InterruptedException, ExecutionException {
	CompletableFuture<String> future = createCFLongTime(1); // future 1
	CompletableFuture<String> newFuture = createCF(2); // future 2

	CompletableFuture<String> combinedFuture = future.thenCombine(newFuture, MainApp::appendString);

	// check results
	System.out.println("Future result>> " + future.get());
	System.out.println("newFuture result>> " + newFuture.get());
	System.out.println("combinedFuture result>> " + combinedFuture.get());
}

private static String appendString(String a, String b) {
	return a + " now appended to " + b;
}

Run the code, the Console shows:


inside future: waiting for detecting index: 1...
inside future: waiting for detecting index: 2...
inside future: done...
running inside Future 1... 1 sec
running inside Future 1... 2 sec
running inside Future 1... 3 sec
running inside Future 1... 4 sec
running inside Future 1... 5 sec
inside future: done...
Future result>> 1
newFuture result>> 2
combinedFuture result>> 1 now appended to 2

3. thenAcceptBoth()

<U> CompletableFuture<Void> thenAcceptBoth(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action)

thenAcceptBoth() is just like thenCombine(), but not return a CompletableFuture.


private static void testAcceptBoth() throws InterruptedException, ExecutionException {
	CompletableFuture<String> future = createCF(1); // future 1
	CompletableFuture<String> newFuture = createCFLongTime(2); // future 2

	newFuture.thenAcceptBoth(future, MainApp::showAcceptBoth);

	// check results
	System.out.println("Future result: " + newFuture.get() + " and " + future.get());
}

private static void showAcceptBoth(String a, String b) {
	System.out.println("Accept Both: now finish both Futures>> " + a + " and " + b);
}

Run the code, the Console shows:


inside future: waiting for detecting index: 1...
inside future: done...
inside future: waiting for detecting index: 2...
running inside Future 2... 1 sec
running inside Future 2... 2 sec
running inside Future 2... 3 sec
running inside Future 2... 4 sec
running inside Future 2... 5 sec
inside future: done...
Future result: 2 and 1
Accept Both: now finish both Futures>> 2 and 1

4. acceptEither()

CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)

Use testAcceptEither() when we just need the faster result of one of them.

private static void testAcceptEither() throws InterruptedException, ExecutionException {
	CompletableFuture<String> future = createCFLongTime(1);
	CompletableFuture<String> newFuture = createCF(2);

	future.acceptEither(newFuture, s -> {
		System.out.println("the future which finishs first is: " + s);
	});

	// check results
	System.out.println("Future result>> " + newFuture.get() + " and " + future.get());
}

Run the code, the Console shows:


inside future: waiting for detecting index: 1...
inside future: waiting for detecting index: 2...
inside future: done...
the future which finishs first is: 2
running inside Future 1... 1 sec
running inside Future 1... 2 sec
running inside Future 1... 3 sec
running inside Future 1... 4 sec
running inside Future 1... 5 sec
inside future: done...
Future result>> 2 and 1

5. allOf()

static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

allOf() gets list of futures and waits all of the input futures done to complete itself.


private static void testAllOf() throws InterruptedException, ExecutionException {
	CompletableFuture<String> future0 = createCF(0);
	CompletableFuture<String> future1 = createCFLongTime(1);
	CompletableFuture<String> future2 = createCF(2);

	boolean isDone = CompletableFuture.allOf(future0, future1, future2).isDone();

	// check results
	System.out.println("All futures are completed now>> " + isDone);
	System.out.println("Future result>> " + future0.get() + " | " + future1.get() + " | " + future2.get());
}

Run the code, the Console shows:


inside future: waiting for detecting index: 1...
inside future: waiting for detecting index: 2...
inside future: done...
inside future: waiting for detecting index: 0...
inside future: done...
All futures are completed now>> false
running inside Future 1... 1 sec
running inside Future 1... 2 sec
running inside Future 1... 3 sec
running inside Future 1... 4 sec
running inside Future 1... 5 sec
inside future: done...
Future result>> 0 | 1 | 2

6. anyOf()

static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

anyOf() is opposite to allOf(), it just waits any input future done to complete itself.


private static void testAnyOf() throws InterruptedException, ExecutionException {
	CompletableFuture<String> future0 = createCFLongTime(0);
	CompletableFuture<String> future1 = createCF(1);
	CompletableFuture<String> future2 = createCFLongTime(2);

	CompletableFuture<Object> future = CompletableFuture.anyOf(future0, future1, future2);

	// check results
	System.out.println("Future result>> " + future.get());
	System.out.println(
			"All combined Futures result>> " + future0.get() + " | " + future1.get() + " | " + future2.get());
}

Run the code, the Console shows:


inside future: waiting for detecting index: 0...
inside future: waiting for detecting index: 2...
inside future: waiting for detecting index: 1...
inside future: done...
Future result>> 1
running inside Future 0... 1 sec
running inside Future 2... 1 sec
running inside Future 0... 2 sec
running inside Future 2... 2 sec
running inside Future 0... 3 sec
running inside Future 2... 3 sec
running inside Future 0... 4 sec
running inside Future 2... 4 sec
running inside Future 0... 5 sec
inside future: done...
running inside Future 2... 5 sec
inside future: done...
All combined Futures result>> 0 | 1 | 2

II. Source code

MultiCompletableFuture



By grokonez | December 17, 2016.

Last updated on April 19, 2021.



Related Posts


Got Something To Say:

Your email address will not be published. Required fields are marked *

*