In previous post, we have general knowledge about Reactive Streams and Java 9 Flow API Components and Behaviour. In this tutorial, we’re gonna look at an example that implements Publisher, Subscriber with Processor as a bridge for reactive programming.
Related Articles:
– Java 9 Flow API – Reactive Streams
– Java 9 Flow API example – Publisher and Subscriber
– Java 9 FLow SubmissionPublisher – A Concrete Publisher
Contents
I. Technologies
– Java 9
– Eclipse with Java 9 Support for Oxygen (4.7)
II. Overview
1. Processor
A Processor is a component that sits between the Publisher and Subscriber. It acts as:
+ a Subscriber when emitting a request signal to Publisher
+ a Publisher when pushing items to Subscriber.
We can create one or more Processors in chain which link a Publisher to a Subscriber.
2. Project
We will create a Publisher that is subscribed by a Processor, and that Processor will publish data to a Subscriber.
– Publisher define a Subscription to work with Processor.
– Processor define its own Subscription to work with Subscriber.
– Using Subscriber::onNext()
method, Publisher pushes items to Processor, and Processor pushes items to Subscriber.
– Using Subscription::request()
method, Processor requests items from Publisher, and Subscriber requests items from Processor.
– Publisher and Processor defines an Executor for multi-threading. Then request()
and onNext()
method work asynchronously.
– Processor has a data buffer to store items in case the demand number of items requested by Subscriber and Processor are different.
III. Practice
To understand how Publisher, Subscriber, Subscription and Processor behave and way to implementing them, please visit: Java 9 Flow API – Reactive Streams
Publisher<Integer> —— Processor<Integer, String> —— Subscriber<String>
// --------- Publisher--------- public class MyPublisher implements Publisher<Integer> { final ExecutorService executor = Executors.newFixedThreadPool(4); private MySubscription subscription; @Override public void subscribe(Subscriber<? super Integer> subscriber) { } private class MySubscription implements Subscription { private Subscriber<? super Integer> subscriber; @Override public void request(long n) { } @Override public void cancel() { } } } // --------- Processor --------- public class MyProcessor implements Processor<Integer, String> { private Subscription publisherSubscription; final ExecutorService executor = Executors.newFixedThreadPool(4); private MySubscription subscription; private ConcurrentLinkedQueue<String> dataItems; @Override public void subscribe(Subscriber<? super String> subscriber) { } @Override public void onSubscribe(Subscription subscription) { } @Override public void onNext(Integer item) { } @Override public void onComplete() { } @Override public void onError(Throwable t) { } private class MySubscription implements Subscription { private Subscriber<? super String> subscriber; @Override public void request(long n) { } @Override public void cancel() { } } } // --------- Subscriber --------- public class MySubscriber implements Subscriber<String> { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { } @Override public void onNext(String item) { } @Override public void onComplete() { } @Override public void onError(Throwable t) { } } |
1. Create implementation of Publisher
package com.javasampleapproach.java9flow.pubprocsub; import static java.lang.Thread.currentThread; import static java.util.concurrent.Executors.newSingleThreadExecutor; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; public class MyPublisher implements Publisher<Integer> { private static final String LOG_MESSAGE_FORMAT = "Publisher >> [%s] %s%n"; final ExecutorService executor = Executors.newFixedThreadPool(4); private MySubscription subscription; private final CompletableFuture<Void> terminated = new CompletableFuture<>(); @Override public void subscribe(Subscriber<? super Integer> subscriber) { subscription = new MySubscription(subscriber, executor); subscriber.onSubscribe(subscription); } public void waitUntilTerminated() throws InterruptedException { try { terminated.get(); } catch (ExecutionException e) { System.out.println(e); } } private class MySubscription implements Subscription { private final ExecutorService executor; private Subscriber<? super Integer> subscriber; private final AtomicInteger value; private AtomicBoolean isCanceled; public MySubscription(Subscriber<? super Integer> subscriber, ExecutorService executor) { this.subscriber = subscriber; this.executor = executor; value = new AtomicInteger(); isCanceled = new AtomicBoolean(false); } @Override public void request(long n) { if (isCanceled.get()) return; if (n < 0) executor.execute(() -> subscriber.onError(new IllegalArgumentException())); else publishItems(n); } @Override public void cancel() { isCanceled.set(true); shutdown(); } private void publishItems(long n) { for (int i = 0; i < n; i++) { executor.execute(() -> { int v = value.incrementAndGet(); log("publish item: [" + v + "] ..."); subscriber.onNext(v); }); } } private void shutdown() { log("Shut down executor..."); executor.shutdown(); newSingleThreadExecutor().submit(() -> { log("Shutdown complete."); terminated.complete(null); }); } } private void log(String message, Object... args) { String fullMessage = String.format(LOG_MESSAGE_FORMAT, currentThread().getName(), message); System.out.printf(fullMessage, args); } } |
2. Create implementation of Processor
package com.javasampleapproach.java9flow.pubprocsub; import static java.lang.Thread.currentThread; import static java.util.concurrent.Executors.newSingleThreadExecutor; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Flow.Processor; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.atomic.AtomicBoolean; public class MyProcessor implements Processor<Integer, String> { private static final String LOG_MESSAGE_FORMAT = "Processor >> [%s] %s%n"; private Subscription publisherSubscription; final ExecutorService executor = Executors.newFixedThreadPool(4); private MySubscription subscription; private long DEMAND; private ConcurrentLinkedQueue<String> dataItems; private final CompletableFuture<Void> terminated = new CompletableFuture<>(); public MyProcessor() { DEMAND = 0; dataItems = new ConcurrentLinkedQueue<String>(); } public void setDEMAND(long n) { this.DEMAND = n; } @Override public void subscribe(Subscriber<? super String> subscriber) { subscription = new MySubscription(subscriber, executor); subscriber.onSubscribe(subscription); } @Override public void onSubscribe(Subscription subscription) { log("Subscribed..."); publisherSubscription = subscription; requestItems(); } private void requestItems() { log("Requesting %d new items...", DEMAND); publisherSubscription.request(DEMAND); } @Override public void onNext(Integer item) { if (null == item) throw new NullPointerException(); dataItems.add("item value = " + item * 10 + " after processing"); log("processing item: [" + item + "] ..."); } @Override public void onComplete() { log("Complete!"); } @Override public void onError(Throwable t) { log("Error >> %s", t); } private class MySubscription implements Subscription { private final ExecutorService executor; private Subscriber<? super String> subscriber; private AtomicBoolean isCanceled; public MySubscription(Subscriber<? super String> subscriber, ExecutorService executor) { this.executor = executor; this.subscriber = subscriber; isCanceled = new AtomicBoolean(false); } @Override public void request(long n) { if (isCanceled.get()) return; if (n < 0) executor.execute(() -> subscriber.onError(new IllegalArgumentException())); else if (dataItems.size() > 0) publishItems(n); else if (dataItems.size() == 0) { subscriber.onComplete(); } } private void publishItems(long n) { int remainItems = dataItems.size(); if ((remainItems == n) || (remainItems > n)) { for (int i = 0; i < n; i++) { executor.execute(() -> { subscriber.onNext(dataItems.poll()); }); } log("Remaining " + (dataItems.size() - n) + " items to be published to Subscriber!"); } else if ((remainItems > 0) && (remainItems < n)) { for (int i = 0; i < remainItems; i++) { executor.execute(() -> { subscriber.onNext(dataItems.poll()); }); } subscriber.onComplete(); } else { log("Processor contains no item!"); } } @Override public void cancel() { isCanceled.set(true); shutdown(); publisherSubscription.cancel(); } private void shutdown() { log("Shut down executor..."); executor.shutdown(); newSingleThreadExecutor().submit(() -> { log("Shutdown complete."); terminated.complete(null); }); } } private void log(String message, Object... args) { String fullMessage = String.format(LOG_MESSAGE_FORMAT, currentThread().getName(), message); System.out.printf(fullMessage, args); } } |
3. Create implementation of Subscriber
package com.javasampleapproach.java9flow.pubprocsub; import static java.lang.Thread.currentThread; import java.util.Random; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; public class MySubscriber implements Subscriber<String> { private static final String LOG_MESSAGE_FORMAT = "Subscriber >> [%s] %s%n"; private long DEMAND = 0; private Subscription subscription; private long count; public void setDEMAND(long n) { this.DEMAND = n; count = DEMAND; } @Override public void onSubscribe(Subscription subscription) { log("Subscribed"); this.subscription = subscription; requestItems(DEMAND); } private void requestItems(long n) { log("Requesting %d new items...", n); subscription.request(n); } @Override public void onNext(String item) { if (item != null) { log(item); synchronized (this) { count--; if (count == 0) { log("Cancelling subscription..."); subscription.cancel(); } } } else { log("Null Item!"); } } @Override public void onComplete() { log("onComplete(): There is no remaining item in Processor."); } @Override public void onError(Throwable t) { log("Error >> %s", t); } private void log(String message, Object... args) { String fullMessage = String.format(LOG_MESSAGE_FORMAT, currentThread().getName(), message); System.out.printf(fullMessage, args); } } |
4. Check Result
We uses this class to test:
package com.javasampleapproach.java9flow.pubprocsub; public class MainApp { public static void main(String[] args) throws InterruptedException { MyPublisher publisher = new MyPublisher(); MySubscriber subscriber = new MySubscriber(); subscriber.setDEMAND(...); // MUST set number of items to be requested here! MyProcessor processor = new MyProcessor(); processor.setDEMAND(...); // MUST set number of items to be requested here! publisher.subscribe(processor); processor.subscribe(subscriber); publisher.waitUntilTerminated(); } } |
// ... subscriber.setDEMAND(3); // ... processor.setDEMAND(3); // ... |
The result:
Processor >> [main] Subscribed... Processor >> [main] Requesting 3 new items... Publisher >> [pool-1-thread-1] publish item: [1] ... Publisher >> [pool-1-thread-3] publish item: [3] ... Publisher >> [pool-1-thread-2] publish item: [2] ... Processor >> [pool-1-thread-2] processing item: [2] ... Processor >> [pool-1-thread-1] processing item: [1] ... Processor >> [pool-1-thread-3] processing item: [3] ... Subscriber >> [main] Subscribed Subscriber >> [main] Requesting 3 new items... Processor >> [main] Remaining 0 items to be published to Subscriber! Subscriber >> [pool-2-thread-2] item value = 20 after processing Subscriber >> [pool-2-thread-1] item value = 30 after processing Subscriber >> [pool-2-thread-3] item value = 10 after processing Subscriber >> [pool-2-thread-3] Cancelling subscription... Processor >> [pool-2-thread-3] Shut down executor... Publisher >> [pool-2-thread-3] Shut down executor... Processor >> [pool-3-thread-1] Shutdown complete. Publisher >> [pool-4-thread-1] Shutdown complete. |
// ... subscriber.setDEMAND(5); // ... processor.setDEMAND(3); // ... |
In this case, we invoke
Subscriber::onComplete()
method to notice Subscriber that Processor have already processed all its items and pushed them to Subscriber.The result:
Processor >> [main] Subscribed... Processor >> [main] Requesting 3 new items... Publisher >> [pool-1-thread-1] publish item: [1] ... Publisher >> [pool-1-thread-2] publish item: [2] ... Publisher >> [pool-1-thread-3] publish item: [3] ... Subscriber >> [main] Subscribed Processor >> [pool-1-thread-3] processing item: [3] ... Subscriber >> [main] Requesting 5 new items... Processor >> [pool-1-thread-2] processing item: [2] ... Processor >> [pool-1-thread-1] processing item: [1] ... Subscriber >> [main] onComplete(): There is no remaining item in Processor. Subscriber >> [pool-2-thread-1] item value = 20 after processing Subscriber >> [pool-2-thread-2] item value = 30 after processing Subscriber >> [pool-2-thread-3] item value = 10 after processing |
// ... subscriber.setDEMAND(3); // ... processor.setDEMAND(5); // ... |
The result:
Processor >> [main] Subscribed... Processor >> [main] Requesting 5 new items... Publisher >> [pool-1-thread-1] publish item: [1] ... Publisher >> [pool-1-thread-2] publish item: [2] ... Processor >> [pool-1-thread-1] processing item: [1] ... Publisher >> [pool-1-thread-1] publish item: [3] ... Processor >> [pool-1-thread-1] processing item: [3] ... Processor >> [pool-1-thread-2] processing item: [2] ... Subscriber >> [main] Subscribed Subscriber >> [main] Requesting 3 new items... Publisher >> [pool-1-thread-3] publish item: [4] ... Processor >> [pool-1-thread-3] processing item: [4] ... Publisher >> [pool-1-thread-4] publish item: [5] ... Processor >> [pool-1-thread-4] processing item: [5] ... Processor >> [main] Remaining 2 items to be published to Subscriber! Subscriber >> [pool-2-thread-1] item value = 10 after processing Subscriber >> [pool-2-thread-2] item value = 20 after processing Subscriber >> [pool-2-thread-3] item value = 30 after processing Subscriber >> [pool-2-thread-3] Cancelling subscription... Processor >> [pool-2-thread-3] Shut down executor... Publisher >> [pool-2-thread-3] Shut down executor... Publisher >> [pool-4-thread-1] Shutdown complete. Processor >> [pool-3-thread-1] Shutdown complete. |
Last updated on September 12, 2018.