Reactor – Simple Ways to create Flux/Mono

Reactive Streams is an API and pattern that provides a standard for asynchronous stream processing with non-blocking back pressure. It is also being introduced in Java 9 Flow API with four simple interfaces: Publisher, Subscriber, Subscription and Processor.

But Reactive Streams API is just low level to make practical use in reactive applications. So Reactor Core provides two main implementations of Publisher: Flux and Mono. In this tutorial, we’re gonna know what they are and simple ways to create them.

I. Overview

For more details about Reactive Streams and Publish-Subscribe Pattern, please visit:
Java 9 Flow API – Reactive Streams

1. Flux & Mono

A Flux<T> is a standard Publisher<T> representing a reactive sequence of 0..N items, optionally terminated by either a success signal or an error.

While Mono<T> is a specialized Publisher<T> that emits at most single-valued-or-empty result.

2. Simple ways to create Publishers with Flux and Mono

2.1 Mono

Empty Mono:

Mono<String> noData = Mono.empty();

– Mono with value:

Mono<String> data = Mono.just("JSA");

– Mono that emits an Exception:


Mono.error(new CustomException());

2.2 Flux

Empty Flux:

Flux<String> noData = Flux.empty();

– Flux with items:

Flux<String> data = Flux.just("Java", "Sample", "Approach", ".com");

– Flux from Collections:

List<String> list = Arrays.asList("JAVA", "SAMPLE", "APPROACH", ".COM");
Flux<String> sequence = Flux.fromIterable(list);

– Flux that is infinite and ticks every x units of time with an increasing Long:

Flux<Long> counter = Flux.interval(Duration.ofMillis(x));

– Flux that emits an Exception:


Flux.error(new CustomException());

II. Practice

0. Technology

– Java 8
– Maven 3.6.1
– Reactor Core 3.0.4, with the Aluminium release train.

1. Reactor installation in Maven

– First, import the BOM by adding the following to pom.xml:

<dependencyManagement>
	<dependencies>
		<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-bom</artifactId>
			<version>Aluminium-SR1</version>
			<type>pom</type>
			<scope>import</scope>
		</dependency>
	</dependencies>
</dependencyManagement>
</pre>
- Next, add dependency:
<pre class="lang:xml">
<dependencies>
	<dependency>
		<groupId>io.projectreactor</groupId>
		<artifactId>reactor-core</artifactId>
	</dependency>
</dependencies>

2. Write Code

package com.javasampleapproach.reactorpublisher;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class MainApp {

	public static void main(String[] args) throws InterruptedException {

		System.out.println("=== Empty Mono ===");
		Mono.empty().subscribe(System.out::println);

		System.out.println("=== Mono.just ===");
		Mono.just("JSA")
			.map(item -> "Mono item: " + item)
			.subscribe(System.out::println);
		
		System.out.println("=== Empty Flux ===");
		Flux.empty()
			.subscribe(System.out::println);

		System.out.println("=== Flux.just ===");
		Flux.just("Java", "Sample", "Approach", ".com")
			.map(item -> item.toUpperCase())
			.subscribe(System.out::print);

		System.out.println("\n=== Flux from List ===");
		List<String> list = Arrays.asList("JAVA", "SAMPLE", "APPROACH", ".COM");
		Flux.fromIterable(list)
			.map(item -> item.toLowerCase())
			.subscribe(System.out::print);

		System.out.println("\n=== Flux emits increasing values each 100ms ===");
		Flux.interval(Duration.ofMillis(100))
			.map(item -> "tick: " + item)
			.take(10)
			.subscribe(System.out::println);
		
		Thread.sleep(1500);

		System.out.println("=== Mono emits an Exception ===");
		Mono.error(new CustomException("Mono"))
			.doOnError(e -> {System.out.println("inside Mono doOnError()");})
			.subscribe(System.out::println);
		
		System.out.println("=== Flux emits an Exception ===");
		Flux.error(new CustomException("Flux"))
			.subscribe(System.out::println);
	}
}
package com.javasampleapproach.reactorpublisher;

public class CustomException extends RuntimeException {

	private static final long serialVersionUID = -5970845585469454688L;

	public CustomException(String type) {
		System.out.println(type + ": throw CustomException!");
	}
}

3. Run & Check Result


=== Empty Mono ===
=== Mono.just ===
Mono item: JSA
=== Empty Flux ===
=== Flux.just ===
grokonez.com
=== Flux from List ===
grokonez.com
=== Flux emits increasing values each 100ms ===
tick: 0
tick: 1
tick: 2
tick: 3
tick: 4
tick: 5
tick: 6
tick: 7
tick: 8
tick: 9
=== Mono emits an Exception ===
Mono: throw CustomException!
inside Mono doOnError()
=== Flux emits an Exception ===
Flux: throw CustomException!
Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: com.javasampleapproach.reactorpublisher.CustomException
Caused by: com.javasampleapproach.reactorpublisher.CustomException
	at com.javasampleapproach.reactorpublisher.MainApp.main(MainApp.java:50)


By grokonez | June 28, 2017.

Last updated on May 3, 2021.



Related Posts


1 thought on “Reactor – Simple Ways to create Flux/Mono”

Got Something To Say:

Your email address will not be published. Required fields are marked *

*