Reactor – Handle Error

In this tutorial, JavaSampleApproach introduces ways to handle Errors: catch Exception and fall back, recover from errors, deal with backpressure errors…

I. Ways to handle Errors

0. Declare & Initialize Flux
Flux flux = Flux
//						.error(new IllegalArgumentException());
						.range(1, 4)
						.map(item -> {
							if (item <= 3)
								return "item: " + item;
							else {
								System.out.println(">> Exception occurs on map()");
								throw new RuntimeException();
							}
						});
1. Catching Exception
1.1 Then do something
flux.doOnError(e -> System.out.println("doOnError: " + e))
	.subscribe(System.out::println);
/**
item: 1
item: 2
item: 3
>> Exception occurs on map()
doOnError: java.lang.RuntimeException
*/
1.2 Then fall back to a default value
flux.onErrorReturn("onErrorReturn: Value!")
	.subscribe(System.out::println);
/**
item: 1
item: 2
item: 3
>> Exception occurs on map()
onErrorReturn: Value!
*/
1.3 Then re-throw new Exception
flux.mapError(e -> new CustomException("mapError"))
	.subscribe(System.out::println);
/**
item: 1
item: 2
item: 3
>> Exception occurs on map()
mapError: throw CustomException!
Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: com.javasampleapproach.reactor.handleerror.CustomException
Caused by: com.javasampleapproach.reactor.handleerror.CustomException
	at com.javasampleapproach.reactor.handleerror.MainApp.lambda$6(MainApp.java:115)
	at reactor.core.publisher.Flux.lambda$mapError$23(Flux.java:4189)
	...
*/

public class CustomException extends RuntimeException {

	private static final long serialVersionUID = -5970845585469454688L;

	public CustomException(String type) {
		System.out.println(type + ": throw CustomException!");
	}
}
2. Recover from Errors
2.1 By falling back to another Flux
flux.onErrorResumeWith(e -> {
		System.out.println("-> inside onErrorResumeWith()");
		return Flux.just(1,2)
			.map(item -> {return "-> new Flux item: " + item;});
	})
	.subscribe(System.out::println);
/**
item: 1
item: 2
item: 3
>> Exception occurs on map()
-> inside onErrorResumeWith()
-> new Flux item: 1
-> new Flux item: 2
*/
2.2 By retrying
flux.retry(1)
	.doOnError(System.out::println)
	.subscribe(System.out::println);
/**
item: 1
item: 2
item: 3
>> Exception occurs on map()
item: 1
item: 2
item: 3
>> Exception occurs on map()
java.lang.RuntimeException
*/

With Predicate:

flux.retry(1, e -> {
		boolean shouldRetry = RANDOM.nextBoolean();
		System.out.println("shouldRetry? -> " + shouldRetry);
		return shouldRetry;
	})
	.doOnError(System.out::println)
	.subscribe(System.out::println);
/**
item: 1
item: 2
item: 3
>> Exception occurs on map()
shouldRetry? -> true
item: 1
item: 2
item: 3
>> Exception occurs on map()
java.lang.RuntimeException
*/
3. Deal with Backpressure Errors
3.1 By throwing a special Exception

When Receiver is overrun by more signals than expected:

// Publisher emits 3-4 items, but Subscriber requests only 1
flux.onBackpressureError()
	.doOnError(System.out::println)
	.subscribe(new BaseSubscriber() {
	
		@Override
		protected void hookOnSubscribe(Subscription subscription) {
			System.out.println("Subscriber > request only 1 item...");
			request(1);
		}
	
		@Override
		protected void hookOnNext(String value) {
			System.out.println("Subscriber > process... [" + value + "]");
		}
	});
/**
Subscriber > request only 1 item...
Subscriber > process... [item: 1]
reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
*/
3.2 By dropping excess values
// Publisher emits 3 items, but Subscriber requests only 1
flux.onBackpressureDrop(item -> System.out.println("Drop: [" + item+ "]"))
	.doOnError(System.out::println)
	.subscribe(new BaseSubscriber() {
	
		@Override
		protected void hookOnSubscribe(Subscription subscription) {
			System.out.println("Subscriber > request only 1 item...");
			request(1);
		}
	
		@Override
		protected void hookOnNext(String value) {
			System.out.println("Subscriber > process... [" + value + "]");
		}
	});
/**
Subscriber > request only 1 item...
Subscriber > process... [item: 1]
Drop: [item: 2]
Drop: [item: 3]
>> Exception occurs on map()
java.lang.RuntimeException
*/
3.2 By buffering excess values
BaseSubscriber subscriber = new BaseSubscriber() {
	
	@Override
	protected void hookOnSubscribe(Subscription subscription) {
		System.out.println("Subscriber > request only 1 item...");
		request(1);
	}

	@Override
	protected void hookOnNext(String value) {
		System.out.println("Subscriber > process... [" + value + "]");
	}
};

// Publisher emits 3 items, but Subscriber requests only 1 [item 1]
// Buffer 2 items [item 2, item 3]		
flux.onBackpressureBuffer(2, item -> System.out.println("Buffer: [" + item+ "]"))
	.doOnError(System.out::println)
	.subscribe(subscriber);
		
System.out.println("Subscriber > request more items:");
subscriber.request(1); // receive one more item (from Buffer)
/**
Subscriber > request only 1 item...
Subscriber > process... [item: 1]
>> Exception occurs on map()
Subscriber > request more items:
Subscriber > process... [item: 2]
*/

II. Source Code

1. Technology

– Java 8
– Maven 3.6.1
– Reactor Core 3.0.4, with the Aluminium release train.

2. Reactor installation in Maven

– First, import the BOM by adding the following to pom.xml:


	
		
			io.projectreactor
			reactor-bom
			Aluminium-SR1
			pom
			import
		
	

– Next, add dependency:


	
		io.projectreactor
		reactor-core
	

3. Code
package com.javasampleapproach.reactor.handleerror;

import java.util.Random;

import org.reactivestreams.Subscription;

import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;

public class MainApp {

	public static void main(String[] args) {

		final Random RANDOM = new Random();
		
		Flux flux = Flux
//								.error(new IllegalArgumentException());
								.range(1, 4)
								.map(item -> {
									if (item <= 3)
										return "item: " + item;
									else {
										System.out.println(">> Exception occurs on map()");
										throw new RuntimeException();
									}
								});

		System.out.println("=== do when error ===");
		flux.doOnError(e -> System.out.println("doOnError: " + e))
			.subscribe(System.out::println);
		
		System.out.println("=== fall back to a default value ===");
		flux.onErrorReturn("onErrorReturn: Value!")
			.subscribe(System.out::println);

		System.out.println("=== fall back to another Flux ===");
		flux.onErrorResumeWith(e -> {
				System.out.println("-> inside onErrorResumeWith()");
				return Flux.just(1,2)
							.map(item -> {return "-> new Flux item: " + item;});
			})
			.subscribe(System.out::println);

		System.out.println("=== retry ===");
		flux.retry(1)
			.doOnError(System.out::println)
			.subscribe(System.out::println);
		
		System.out.println("=== retry with Predicate ===");
		flux.retry(1, e -> {
				boolean shouldRetry = RANDOM.nextBoolean();
				System.out.println("shouldRetry? -> " + shouldRetry);
				return shouldRetry;
			})
			.doOnError(System.out::println)
			.subscribe(System.out::println);
		
		System.out.println("=== deal with backpressure Error ===");
		flux.onBackpressureError()
			.doOnError(System.out::println)
			.subscribe(new BaseSubscriber() {
	
				@Override
				protected void hookOnSubscribe(Subscription subscription) {
					System.out.println("Subscriber > request only 1 item...");
					request(1);
				}
	
				@Override
				protected void hookOnNext(String value) {
					System.out.println("Subscriber > process... [" + value + "]");
				}
			});
		
		System.out.println("===  dropping excess values ===");
		flux.onBackpressureDrop(item -> System.out.println("Drop: [" + item+ "]"))
			.doOnError(System.out::println)
			.subscribe(new BaseSubscriber() {
	
				@Override
				protected void hookOnSubscribe(Subscription subscription) {
					System.out.println("Subscriber > request only 1 item...");
					request(1);
				}
	
				@Override
				protected void hookOnNext(String value) {
					System.out.println("Subscriber > process... [" + value + "]");
				}
			});
		
		System.out.println("===  buffer excess values ===");
		BaseSubscriber subscriber = new BaseSubscriber() {
			
			@Override
			protected void hookOnSubscribe(Subscription subscription) {
				System.out.println("Subscriber > request only 1 item...");
				request(1);
			}

			@Override
			protected void hookOnNext(String value) {
				System.out.println("Subscriber > process... [" + value + "]");
			}
		};
		
		flux.onBackpressureBuffer(2, item -> System.out.println("Buffer: [" + item+ "]"))
			.doOnError(System.out::println)
			.subscribe(subscriber);
		
		System.out.println("Subscriber > request more items:");
		subscriber.request(1);
		
		System.out.println("=== catch and rethrow ===");
		flux.mapError(e -> new CustomException("mapError"))
			.subscribe(System.out::println);
	}
}
4. Results
=== do when error ===
item: 1
item: 2
item: 3
>> Exception occurs on map()
doOnError: java.lang.RuntimeException
=== fall back to a default value ===
item: 1
item: 2
item: 3
>> Exception occurs on map()
onErrorReturn: Value!
=== fall back to another Flux ===
item: 1
item: 2
item: 3
>> Exception occurs on map()
-> inside onErrorResumeWith()
-> new Flux item: 1
-> new Flux item: 2
=== retry ===
item: 1
item: 2
item: 3
>> Exception occurs on map()
item: 1
item: 2
item: 3
>> Exception occurs on map()
java.lang.RuntimeException
=== retry with Predicate ===
item: 1
item: 2
item: 3
>> Exception occurs on map()
shouldRetry? -> true
item: 1
item: 2
item: 3
>> Exception occurs on map()
java.lang.RuntimeException
=== deal with backpressure Error ===
Subscriber > request only 1 item...
Subscriber > process... [item: 1]
reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
===  dropping excess values ===
Subscriber > request only 1 item...
Subscriber > process... [item: 1]
Drop: [item: 2]
Drop: [item: 3]
>> Exception occurs on map()
java.lang.RuntimeException
===  buffer excess values ===
Subscriber > request only 1 item...
Subscriber > process... [item: 1]
>> Exception occurs on map()
Subscriber > request more items:
Subscriber > process... [item: 2]
=== catch and rethrow ===
item: 1
item: 2
item: 3
>> Exception occurs on map()
mapError: throw CustomException!
Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: com.javasampleapproach.reactor.handleerror.CustomException
Caused by: com.javasampleapproach.reactor.handleerror.CustomException
	at com.javasampleapproach.reactor.handleerror.MainApp.lambda$6(MainApp.java:115)
	at reactor.core.publisher.Flux.lambda$mapError$23(Flux.java:4189)
	at reactor.core.publisher.FluxResume$ResumeSubscriber.onError(FluxResume.java:88)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:132)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115)
	at reactor.core.publisher.FluxRange$RangeSubscription.fastPath(FluxRange.java:119)
	at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:97)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:172)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1476)
	at reactor.core.publisher.FluxResume$ResumeSubscriber.onSubscribe(FluxResume.java:68)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:94)
	at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
	at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:67)
	at reactor.core.publisher.FluxResume.subscribe(FluxResume.java:47)
	at reactor.core.publisher.Flux.subscribe(Flux.java:5780)
	at reactor.core.publisher.Flux.subscribe(Flux.java:5747)
	at reactor.core.publisher.Flux.subscribe(Flux.java:5679)
	at com.javasampleapproach.reactor.handleerror.MainApp.main(MainApp.java:116)


By grokonez | July 1, 2017.

Last updated on March 13, 2018.



Related Posts


Got Something To Say:

Your email address will not be published. Required fields are marked *

*