JDK 9 provides a concrete Publisher named SubmissionPublisher that acts as a compliant Reactive Streams Publisher relying on drop handling and/or blocking for flow control. In this tutorial, we’re gonna take a look at SubmissionPublisher and an example that generates items for Subscribers.
Related Articles:
– Java 9 Flow API – Reactive Streams
– Java 9 Flow API example – Publisher and Subscriber
– Java 9 Flow API example – Processor
Contents
I. Technologies
– Java 9
– Eclipse with Java 9 Support for Oxygen (4.7)
II. Overview
1. SubmissionPublisher
SubmissionPublisher is an implementation of Java 9 Flow.Publisher
that asynchronously issues items to its subscribers until closing.
Depending on usage, we can indicate the Executor
for SubmissionPublisher in its constructor methods:
– If we wanna submitting items in separate threads, and can estimate number of subscribers, consider using Executors.newFixedThreadPool(int)
and constructor method:
SubmissionPublisher(Executor executor, int maxBufferCapacity); // maxBufferCapacity: the maximum capacity for each subscriber's buffer. |
– Otherwise, just call the default constructor (no input parameter) that will use ForkJoinPool.commonPool()
.
If a Subscriber has only one action that requests and processes all items, we can consider using consume(Consumer)
method (which returns a CompletableFuture
object) like this:
publisher.consume((data) -> process(data)); |
There are 2 publication methods:
– submit()
: asynchronously publishes the given item to each subscriber, but blocks until resources are available.
– offer()
: publishes the given item asynchronously, to each current subscriber if possible, but the item may be dropped by one or more subscribers if resource limits are exceeded.
Some more useful methods:
close()
: issues onComplete signals to all subscribers, and disallows subsequent attempts to publish.closeExceptionally(Throwable error)
: issues onError signals to all subscribers with the given error, and disallows subsequent attempts to publish.estimateMaximumLag()
: returns an estimate of the maximum number of items produced but not yet consumed among all subscribers.estimateMinimumDemand()
: returns an estimate of the minimum number of items requested (via request) but not yet produced, among all subscribers.getNumberOfSubscribers()
.hasSubscribers()
.isSubscribed(Subscriber)
.getSubscribers()
: returns list of current subscribers.
2. Project
We will create a Publisher (extends SubmissionPublisher) that is subscribed by two Subscribers:
– We don’t need to define any implementation of Subscription interface. Why?
SubmissionPublisher contains a linked list of BufferedSubscriptions, everytime we invoke subscribe()
method to a Subscriber, there will be a new BufferedSubscription item in list which is related to that Subscriber automatically.
– Using submit(T item)
method, Publisher periodically publishes the items generated from a Supplier to Subscribers (Publisher submit()
method will invoke Subscription onNext()
method).
– After receiving all items successfully, Subscriber can request new data or cancel Subscription (random).
– When generated items reach to MAX_ITEM_TO_PUBLISH, we will stop Publisher by using close()
method (that will send onComplete signal to Subscribers).
III. Practice
1. Create subclass of SubmissionPublisher
package com.javasampleapproach.java9flow.submissionpublisher; import static java.lang.Thread.currentThread; import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; public class MyPublisher extends SubmissionPublisher<Integer> { private static final String LOG_MESSAGE_FORMAT = "Publisher >> [%s] %s%n"; private final int MAX_ITEM_TO_PUBLISH = 5; private final ScheduledFuture<?> periodicTask; private final ScheduledExecutorService scheduler; private final AtomicInteger i; MyPublisher(Executor executor, int maxBufferCapacity, long period, TimeUnit unit) { super(executor, maxBufferCapacity); // if using the default, normally the ForkJoinPool.commonPool(), call: // super(); i = new AtomicInteger(0); scheduler = new ScheduledThreadPoolExecutor(1); periodicTask = scheduler.scheduleAtFixedRate(() -> { Integer item = supplier.get(); log("publishing item: " + item + " ..."); submit(item); log("estimateMaximumLag: " + super.estimateMaximumLag()); log("estimateMinimumDemand: " + super.estimateMinimumDemand()); if (item == MAX_ITEM_TO_PUBLISH) { close(); } }, 0, period, unit); } @Override public void subscribe(Subscriber<? super Integer> subscriber) { super.subscribe(subscriber); } public void close() { log("shutting down..."); List<Subscriber<? super Integer>> subscribers = getSubscribers(); for (Subscriber<? super Integer> subscriber : subscribers) { log("Subscriber " + subscriber.toString() + " isSubscribed(): " + isSubscribed(subscriber)); } periodicTask.cancel(false); scheduler.shutdown(); super.close(); } Supplier<? extends Integer> supplier = new Supplier<>() { @Override public Integer get() { int value = i.incrementAndGet(); return Integer.valueOf(value); } }; private void log(String message, Object... args) { String fullMessage = String.format(LOG_MESSAGE_FORMAT, currentThread().getName(), message); System.out.printf(fullMessage, args); } } |
2. Create implementation of Subscriber
package com.javasampleapproach.java9flow.submissionpublisher; import java.util.Random; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.atomic.AtomicInteger; import static java.lang.Thread.currentThread; public class MySubscriber implements Subscriber<Object> { private static final String LOG_MESSAGE_FORMAT = "~~ Subscriber %s >> [%s] %s%n"; private static final Random RANDOM = new Random(); private Subscription subscription; private AtomicInteger count; private String name; private int DEMAND = 0; public MySubscriber(String name) { this.name = name; } @Override public void onSubscribe(Subscription subscription) { log("Subscribed..."); this.subscription = subscription; request(DEMAND); } public void setDEMAND(int n) { this.DEMAND = n; count = new AtomicInteger(DEMAND); } private void request(int n) { log("request new " + n + " items..."); subscription.request(n); } @Override public void onNext(Object item) { log("itemValue: " + item); if (count.decrementAndGet() == 0) { if (RANDOM.nextBoolean()) { request(DEMAND); count.set(DEMAND); } else { log("Cancel subscribe..."); subscription.cancel(); } } } @Override public void onComplete() { log("Complete!"); } @Override public void onError(Throwable t) { log("Error: " + t.getMessage()); } private void log(String message, Object... args) { String fullMessage = String.format(LOG_MESSAGE_FORMAT, this.name, currentThread().getName(), message); System.out.printf(fullMessage, args); } } |
3. Create Test Class
package com.javasampleapproach.java9flow.submissionpublisher; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class MainApp { public static void main(String[] args) { final int MAX_BUFFER_CAPACITY = 128; final ExecutorService executor = Executors.newFixedThreadPool(4); MyPublisher publisher = new MyPublisher(executor, MAX_BUFFER_CAPACITY, 200, TimeUnit.MILLISECONDS); MySubscriber subscriberA = new MySubscriber("A"); subscriberA.setDEMAND(3); MySubscriber subscriberB = new MySubscriber("B"); subscriberB.setDEMAND(6); publisher.subscribe(subscriberA); publisher.subscribe(subscriberB); } } |
4. Check Result
Case 1:
– Subscriber A requests 3 items, then cancel subscribe.
– Subscriber B requests 6 items, then cancel subscribe.
Publisher >> [pool-2-thread-1] publishing item: 1 ... ~~ Subscriber A >> [pool-1-thread-1] Subscribed... ~~ Subscriber B >> [pool-1-thread-2] Subscribed... ~~ Subscriber A >> [pool-1-thread-1] request new 3 items... Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 ~~ Subscriber B >> [pool-1-thread-2] request new 6 items... Publisher >> [pool-2-thread-1] estimateMinimumDemand: -1 ~~ Subscriber B >> [pool-1-thread-2] itemValue: 1 ~~ Subscriber A >> [pool-1-thread-1] itemValue: 1 Publisher >> [pool-2-thread-1] publishing item: 2 ... Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 ~~ Subscriber A >> [pool-1-thread-3] itemValue: 2 ~~ Subscriber B >> [pool-1-thread-4] itemValue: 2 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 1 Publisher >> [pool-2-thread-1] publishing item: 3 ... Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 ~~ Subscriber A >> [pool-1-thread-2] itemValue: 3 ~~ Subscriber B >> [pool-1-thread-1] itemValue: 3 ~~ Subscriber A >> [pool-1-thread-2] Cancel subscribe... Publisher >> [pool-2-thread-1] estimateMinimumDemand: 0 Publisher >> [pool-2-thread-1] publishing item: 4 ... Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 ~~ Subscriber B >> [pool-1-thread-3] itemValue: 4 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 2 Publisher >> [pool-2-thread-1] publishing item: 5 ... ~~ Subscriber B >> [pool-1-thread-4] itemValue: 5 Publisher >> [pool-2-thread-1] estimateMaximumLag: 0 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 1 Publisher >> [pool-2-thread-1] publishing item: 6 ... Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 ~~ Subscriber B >> [pool-1-thread-1] itemValue: 6 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 0 ~~ Subscriber B >> [pool-1-thread-1] Cancel subscribe... Publisher >> [pool-2-thread-1] publishing item: 7 ... Publisher >> [pool-2-thread-1] estimateMaximumLag: 0 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 0 Publisher >> [pool-2-thread-1] publishing item: 8 ... Publisher >> [pool-2-thread-1] estimateMaximumLag: 0 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 0 Publisher >> [pool-2-thread-1] publishing item: 9 ... Publisher >> [pool-2-thread-1] estimateMaximumLag: 0 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 0 Publisher >> [pool-2-thread-1] publishing item: 10 ... Publisher >> [pool-2-thread-1] estimateMaximumLag: 0 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 0 Publisher >> [pool-2-thread-1] publishing item: 11 ... Publisher >> [pool-2-thread-1] estimateMaximumLag: 0 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 0 |
Case 2:
– Subscriber A requests 3 items, then cancel subscribe.
– Subscriber B requests 6 items, then request more 6 items.
– Because Publisher submits only 11 items, so Subscriber B only receives 11 items (while requesting total 12), then receives onComplete signal from Publisher (via Publisher
close()
method) when it still subscribes.~~ Subscriber B >> [pool-1-thread-2] Subscribed... Publisher >> [pool-2-thread-1] publishing item: 1 ... Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 Publisher >> [pool-2-thread-1] estimateMinimumDemand: -1 ~~ Subscriber A >> [pool-1-thread-1] Subscribed... ~~ Subscriber A >> [pool-1-thread-1] request new 3 items... ~~ Subscriber B >> [pool-1-thread-2] request new 6 items... ~~ Subscriber A >> [pool-1-thread-1] itemValue: 1 ~~ Subscriber B >> [pool-1-thread-2] itemValue: 1 Publisher >> [pool-2-thread-1] publishing item: 2 ... Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 ~~ Subscriber A >> [pool-1-thread-3] itemValue: 2 ~~ Subscriber B >> [pool-1-thread-4] itemValue: 2 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 1 Publisher >> [pool-2-thread-1] publishing item: 3 ... ~~ Subscriber B >> [pool-1-thread-2] itemValue: 3 ~~ Subscriber A >> [pool-1-thread-1] itemValue: 3 Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 0 ~~ Subscriber A >> [pool-1-thread-1] Cancel subscribe... Publisher >> [pool-2-thread-1] publishing item: 4 ... Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 ~~ Subscriber B >> [pool-1-thread-3] itemValue: 4 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 2 Publisher >> [pool-2-thread-1] publishing item: 5 ... Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 ~~ Subscriber B >> [pool-1-thread-4] itemValue: 5 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 1 Publisher >> [pool-2-thread-1] publishing item: 6 ... ~~ Subscriber B >> [pool-1-thread-2] itemValue: 6 Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 ~~ Subscriber B >> [pool-1-thread-2] request new 6 items... Publisher >> [pool-2-thread-1] estimateMinimumDemand: 0 Publisher >> [pool-2-thread-1] publishing item: 7 ... Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 ~~ Subscriber B >> [pool-1-thread-1] itemValue: 7 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 5 Publisher >> [pool-2-thread-1] publishing item: 8 ... Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 ~~ Subscriber B >> [pool-1-thread-3] itemValue: 8 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 4 Publisher >> [pool-2-thread-1] publishing item: 9 ... Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 ~~ Subscriber B >> [pool-1-thread-4] itemValue: 9 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 3 Publisher >> [pool-2-thread-1] publishing item: 10 ... Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 ~~ Subscriber B >> [pool-1-thread-2] itemValue: 10 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 2 Publisher >> [pool-2-thread-1] publishing item: 11 ... Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 ~~ Subscriber B >> [pool-1-thread-1] itemValue: 11 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 1 Publisher >> [pool-2-thread-1] shutting down... Publisher >> [pool-2-thread-1] Subscriber com.javasampleapproach.java9flow.submissionpublisher.MySubscriber@4278bf01 isSubscribed(): true ~~ Subscriber B >> [pool-1-thread-3] Complete! |
Case 3: We change MAX_ITEM_TO_PUBLISH to 5:
– Subscriber A requests 3 items, then request more 3 items.
– Subscriber B requests 6 items, then request more 6 items.
– Because Publisher submits only 5 items, so Subscriber A and B only receives 5 items (while request total 6 items for each), then receive onComplete signal from Publisher (via Publisher
close()
method) when they still subscribe.~~ Subscriber A >> [pool-1-thread-1] Subscribed... ~~ Subscriber B >> [pool-1-thread-2] Subscribed... Publisher >> [pool-2-thread-1] publishing item: 1 ... ~~ Subscriber B >> [pool-1-thread-2] request new 6 items... ~~ Subscriber A >> [pool-1-thread-1] request new 3 items... Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 2 ~~ Subscriber B >> [pool-1-thread-2] itemValue: 1 ~~ Subscriber A >> [pool-1-thread-1] itemValue: 1 Publisher >> [pool-2-thread-1] publishing item: 2 ... Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 ~~ Subscriber B >> [pool-1-thread-4] itemValue: 2 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 1 ~~ Subscriber A >> [pool-1-thread-3] itemValue: 2 Publisher >> [pool-2-thread-1] publishing item: 3 ... Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 ~~ Subscriber B >> [pool-1-thread-1] itemValue: 3 ~~ Subscriber A >> [pool-1-thread-2] itemValue: 3 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 0 ~~ Subscriber A >> [pool-1-thread-2] request new 3 items... Publisher >> [pool-2-thread-1] publishing item: 4 ... ~~ Subscriber A >> [pool-1-thread-4] itemValue: 4 ~~ Subscriber B >> [pool-1-thread-3] itemValue: 4 Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 2 Publisher >> [pool-2-thread-1] publishing item: 5 ... Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 ~~ Subscriber B >> [pool-1-thread-2] itemValue: 5 ~~ Subscriber A >> [pool-1-thread-1] itemValue: 5 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 1 Publisher >> [pool-2-thread-1] shutting down... Publisher >> [pool-2-thread-1] Subscriber com.javasampleapproach.java9flow.submissionpublisher.MySubscriber@42d273fa isSubscribed(): true Publisher >> [pool-2-thread-1] Subscriber com.javasampleapproach.java9flow.submissionpublisher.MySubscriber@dd73db isSubscribed(): true ~~ Subscriber B >> [pool-1-thread-3] Complete! ~~ Subscriber A >> [pool-1-thread-4] Complete! |
Special case: Using
consume(Consumer)
method:package com.javasampleapproach.java9flow.submissionpublisher; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class MainApp { public static void main(String[] args) { final int MAX_BUFFER_CAPACITY = 128; final ExecutorService executor = Executors.newFixedThreadPool(4); MyPublisher publisher = new MyPublisher(executor, MAX_BUFFER_CAPACITY, 200, TimeUnit.MILLISECONDS); publisher.consume((data) -> process(data)); } static void process(Integer i) { System.out.println("consume() testing: " + i.toString()); } } |
The result:
Publisher >> [pool-2-thread-1] publishing item: 1 ... Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 9223372036854775806 //7FFFFFFFFFFFFFFE : Long.MAX_VALUE - 1 consume() testing: 1 Publisher >> [pool-2-thread-1] publishing item: 2 ... Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 consume() testing: 2 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 9223372036854775805 //7FFFFFFFFFFFFFFD : Long.MAX_VALUE - 2 Publisher >> [pool-2-thread-1] publishing item: 3 ... Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 consume() testing: 3 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 9223372036854775804 //7FFFFFFFFFFFFFFC : Long.MAX_VALUE - 3 Publisher >> [pool-2-thread-1] publishing item: 4 ... consume() testing: 4 Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 9223372036854775803 //7FFFFFFFFFFFFFFB : Long.MAX_VALUE - 4 Publisher >> [pool-2-thread-1] publishing item: 5 ... consume() testing: 5 Publisher >> [pool-2-thread-1] estimateMaximumLag: 1 Publisher >> [pool-2-thread-1] estimateMinimumDemand: 9223372036854775802 //7FFFFFFFFFFFFFFA : Long.MAX_VALUE - 5 Publisher >> [pool-2-thread-1] shutting down... Publisher >> [pool-2-thread-1] Subscriber java.util.concurrent.SubmissionPublisher$ConsumerSubscriber@1c39c86 isSubscribed(): true |
Look at estimateMinimumDemand which is the returned value of
estimateMinimumDemand()
method.When using
consume()
method, we don’t specify any Subscriber for Publisher, so it will initiate estimateMinimumDemand value by Long.MAX_VALUE
, and subtract 1 every consumption. Last updated on September 13, 2018.
Thanks for this great example!
One issue I have is that the main app never terminates after the publisher has closed itself and the subscribers are notified that the subscription is complete. What do I do wrong or is there anything missing in the example?
pyriisvctykgbwyvuduubpzkcdnpmp
ok