In this tutorial, JavaSampleApproach introduces simple ways to create Publisher (Flux) that emits items every specified Interval of time.
Contents
I. Overview
1. Steps to do
Our goal is to create Flux
for (data1, data2, data3, …) that emits items every specified Interval of time.
– First, we need to produce a Flux<Long>
that is infinite and emits regular ticks from a clock -> use Flux.interval(Duration)
.
Now we have Flux<Long>
: (i1, i2, i3, …)
– Then we have several ways to convert each Long
item (i) to each data item (data) that we want to publish:
+ transform i to data -> use map(i -> data)
.
+ zip i with data together -> use zipWith(publisher,combinator)
or zipWithIterable(collection)
. Then separate Zip(i,data) to get data.
2. Methods’ details
1. Flux#interval
This method creates a new Flux
that emits incrementing Long
starting with 0 every period
.
static Flux<Long> interval(Duration period); |
2. Flux.zipWithIterable
Pairwise combines as Tuple2
elements of the Flux
and an Iterable
sequence.
Flux<Tuple2<T, T2>> zipWithIterable(Iterable<T2> iterable) |
To get the content of Tuple2 in Type-safe way:
Tuple2<T, T2> tuple2 = ...; // tuple2(object1,object2) T object1 = tuple2.getT1(); T2 object2 = tuple2.getT2(); |
3. Flux.zipWith
The operator will forward all combinations produced by the combinator
from the most recent items emitted by source<T>
(that invokes zipWith() method) and source2<T2>
(that contains data item to be zipped with source
item):
final Flux<V> zipWith(Publisher<T2> source2, final BiFunction<T,T2,V> combinator); |
II. Practice
0. Technology
– Java 8
– Maven 3.6.1
– Reactor Core 3.0.4, with the Aluminium release train.
1. Reactor installation in Maven
– First, import the BOM by adding the following to pom.xml:
<dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>Aluminium-SR1</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> |
– Next, add dependency:
<dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> </dependencies> |
2. Code
2.1 Using map()
List<String> data = new ArrayList<String>(Arrays.asList("{A}", "{B}", "{C}")); Flux<String> intervalFlux1 = Flux .interval(Duration.ofMillis(500)) .map(tick -> { if (tick < data.size()) return "item " + tick + ": " + data.get(tick.intValue()); return "Done (tick == data.size())"; }); intervalFlux1.take(data.size() + 1).subscribe(System.out::println); Thread.sleep(3000); |
Result:
item 0: {A} item 1: {B} item 2: {C} Done (tick == data.size()) |
2.2 Using zipWithIterable()
List<String> data = new ArrayList<String>(Arrays.asList("{A}", "{B}", "{C}")); Flux<String> intervalFlux2 = Flux .interval(Duration.ofMillis(500)) .zipWithIterable(data) .map(source -> "item " + source.getT1() + ": " + source.getT2()); intervalFlux2.subscribe(System.out::println); Thread.sleep(3000); |
Result:
item 0: {A} item 1: {B} item 2: {C} |
2.3 Using zipWith()
Flux<String> flux = Flux.just("{A}", "{B}", "{C}"); Flux<String> intervalFlux3 = Flux .interval(Duration.ofMillis(500)) .zipWith(flux, (i, item) -> "item " + i + ": " + item); intervalFlux3.subscribe(System.out::println); Thread.sleep(3000); |
Result:
item 0: {A} item 1: {B} item 2: {C} |
III. Source Code
package com.javasampleapproach.reactor.publisherinterval; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import reactor.core.publisher.Flux; public class MainApp { public static void main(String[] args) throws InterruptedException { System.out.println("=== from Collection using map() ==="); List<String> data = new ArrayList<String>(Arrays.asList("{A}", "{B}", "{C}")); Flux<String> intervalFlux1 = Flux .interval(Duration.ofMillis(500)) .map(tick -> { if (tick < data.size()) return "item " + tick + ": " + data.get(tick.intValue()); return "Done (tick == data.size())"; }); intervalFlux1.take(data.size() + 1).subscribe(System.out::println); Thread.sleep(3000); System.out.println("=== from Collection using zipWithIterable() and map() ==="); Flux<String> intervalFlux2 = Flux .interval(Duration.ofMillis(500)) .zipWithIterable(data) .map(source -> "item " + source.getT1() + ": " + source.getT2()); intervalFlux2.subscribe(System.out::println); Thread.sleep(3000); System.out.println("=== from Flux using zipWith() ==="); Flux<String> flux = Flux.just("{A}", "{B}", "{C}"); Flux<String> intervalFlux3 = Flux .interval(Duration.ofMillis(500)) .zipWith(flux, (i, item) -> "item " + i + ": " + item); intervalFlux3.subscribe(System.out::println); Thread.sleep(3000); } } |
Run and check Results:
=== from Collection using map() === item 0: {A} item 1: {B} item 2: {C} Done (tick == data.size()) === from Collection using zipWithIterable() and map() === item 0: {A} item 1: {B} item 2: {C} === from Flux using zipWith() === item 0: {A} item 1: {B} item 2: {C} |
Last updated on March 13, 2018.