Reactor – How to Create Flux (Publisher) with Interval

In this tutorial, JavaSampleApproach introduces simple ways to create Publisher (Flux) that emits items every specified Interval of time.

I. Overview

1. Steps to do

Our goal is to create Flux for (data1, data2, data3, …) that emits items every specified Interval of time.

– First, we need to produce a Flux<Long> that is infinite and emits regular ticks from a clock -> use Flux.interval(Duration).
Now we have Flux<Long>: (i1, i2, i3, …)

– Then we have several ways to convert each Long item (i) to each data item (data) that we want to publish:
+ transform i to data -> use map(i -> data).
+ zip i with data together -> use zipWith(publisher,combinator) or zipWithIterable(collection). Then separate Zip(i,data) to get data.

2. Methods’ details
1. Flux#interval

This method creates a new Flux that emits incrementing Long starting with 0 every period.

static Flux interval(Duration period);
2. Flux.zipWithIterable

Pairwise combines as Tuple2 elements of the Flux and an Iterable sequence.

Flux> zipWithIterable(Iterable iterable)

To get the content of Tuple2 in Type-safe way:

Tuple2 tuple2 = ...; // tuple2(object1,object2)

T object1 = tuple2.getT1();
T2 object2 = tuple2.getT2();
3. Flux.zipWith

The operator will forward all combinations produced by the combinator from the most recent items emitted by source<T> (that invokes zipWith() method) and source2<T2> (that contains data item to be zipped with source item):

final Flux zipWith(Publisher source2, final BiFunction combinator);

II. Practice

0. Technology

– Java 8
– Maven 3.6.1
– Reactor Core 3.0.4, with the Aluminium release train.

1. Reactor installation in Maven

– First, import the BOM by adding the following to pom.xml:


	
		
			io.projectreactor
			reactor-bom
			Aluminium-SR1
			pom
			import
		
	

– Next, add dependency:


	
		io.projectreactor
		reactor-core
	

2. Code
2.1 Using map()
List data = new ArrayList(Arrays.asList("{A}", "{B}", "{C}"));
Flux intervalFlux1 = Flux
							.interval(Duration.ofMillis(500))
							.map(tick -> {
								if (tick < data.size())
									return "item " + tick + ": " + data.get(tick.intValue());
								return "Done (tick == data.size())";
							});

intervalFlux1.take(data.size() + 1).subscribe(System.out::println);
Thread.sleep(3000);

Result:

item 0: {A}
item 1: {B}
item 2: {C}
Done (tick == data.size())
2.2 Using zipWithIterable()
List data = new ArrayList(Arrays.asList("{A}", "{B}", "{C}"));
Flux intervalFlux2 = Flux
							.interval(Duration.ofMillis(500))
							.zipWithIterable(data)
							.map(source -> "item " + source.getT1() + ": " + source.getT2());

intervalFlux2.subscribe(System.out::println);
Thread.sleep(3000);

Result:

item 0: {A}
item 1: {B}
item 2: {C}
2.3 Using zipWith()
Flux flux = Flux.just("{A}", "{B}", "{C}");
Flux intervalFlux3 = Flux
							.interval(Duration.ofMillis(500))
							.zipWith(flux, (i, item) -> "item " + i + ": " + item);
		
intervalFlux3.subscribe(System.out::println);
Thread.sleep(3000);

Result:

item 0: {A}
item 1: {B}
item 2: {C}

III. Source Code

package com.javasampleapproach.reactor.publisherinterval;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import reactor.core.publisher.Flux;

public class MainApp {

	public static void main(String[] args) throws InterruptedException {

		System.out.println("=== from Collection using map() ===");
		List data = new ArrayList(Arrays.asList("{A}", "{B}", "{C}"));
		Flux intervalFlux1 = Flux
									.interval(Duration.ofMillis(500))
									.map(tick -> {
										if (tick < data.size())
											return "item " + tick + ": " + data.get(tick.intValue());
										return "Done (tick == data.size())";
									});
		
		intervalFlux1.take(data.size() + 1).subscribe(System.out::println);
		Thread.sleep(3000);
		
		System.out.println("=== from Collection using zipWithIterable() and map() ===");
		Flux intervalFlux2 = Flux
									.interval(Duration.ofMillis(500))
									.zipWithIterable(data)
									.map(source -> "item " + source.getT1() + ": " + source.getT2());

		intervalFlux2.subscribe(System.out::println);
		Thread.sleep(3000);
		
		System.out.println("=== from Flux using zipWith() ===");
		Flux flux = Flux.just("{A}", "{B}", "{C}");
		Flux intervalFlux3 = Flux
									.interval(Duration.ofMillis(500))
									.zipWith(flux, (i, item) -> "item " + i + ": " + item);
		
		intervalFlux3.subscribe(System.out::println);
		Thread.sleep(3000);

	}
}

Run and check Results:

=== from Collection using map() ===
item 0: {A}
item 1: {B}
item 2: {C}
Done (tick == data.size())
=== from Collection using zipWithIterable() and map() ===
item 0: {A}
item 1: {B}
item 2: {C}
=== from Flux using zipWith() ===
item 0: {A}
item 1: {B}
item 2: {C}


By grokonez | June 28, 2017.

Last updated on March 13, 2018.



Related Posts


Got Something To Say:

Your email address will not be published. Required fields are marked *

*