Chapters
Interrelated interfaces and static methods for establishing flow-controlled components in which Publishers produce items consumed by one or more Subscribers, each managed by a Subscription.
These interfaces correspond to the reactive-streams specification. They apply in both concurrent and distributed asynchronous settings: All (seven) methods are defined in void "one-way" message style. More information can be found in the documentation
To create a simple reactive-stream using java Flow API, we need three components: Publisher, Subscriber and Subscription. First off, let's create a Publisher. SubmissionPublisher implements Flow.Publisher. Thus we can use it as a Publisher.
Next, we need to create subscribers. Java doesn't have class implementation of Flow.Subscriber. We need to create a class that will implement
This example demonstrates creation of reactive-stream using Flow API in java.
We need to use some kind of flag like boolean or counter flag or some kind of timing if we want all subscribers to be done. I think the purpose of Flow API is to stream indefinite amount of data to subscribers. Thus, Publisher is not required to guarantee that all subscribers have completed before it closes.
defaultBufferSize() Returns a default value for Publisher or Subscriber buffering, that may be used in the absence of other constraints. The current value returned is 256.
SubmissionPublisher constructor has three forms. In the example above, I used this form:
This form creates a new SubmissionPublisher using the given Executor for async delivery to subscribers, with the given maximum buffer size for each subscriber. More information about SubmissionPublisher's constructors can be found in the documentation.
Note that if any Subscriber method throws an exception, its subscription is cancelled. To explicitly cancel a subscription, use the
Flow.Processor is a Flow API component that acts as both a Subscriber and Publisher. This component is mainly used for transforming messages that we sent to subscribers.
This example demonstrates Flow.Processor.
I extended SubmissionPublisher so that I didn't need to override
Now, let's discuss the Flow mechanism in the example above. Transformer class is a Publisher/Subscriber component. Thus, this class can subscribe to a Publisher. In the example above, I created a SubmissionPublisher
In the example above, the parameter types of Processor are
The reason why
Next,
Flow API
Interrelated interfaces and static methods for establishing flow-controlled components in which Publishers produce items consumed by one or more Subscribers, each managed by a Subscription.
These interfaces correspond to the reactive-streams specification. They apply in both concurrent and distributed asynchronous settings: All (seven) methods are defined in void "one-way" message style. More information can be found in the documentation
To create a simple reactive-stream using java Flow API, we need three components: Publisher, Subscriber and Subscription. First off, let's create a Publisher. SubmissionPublisher implements Flow.Publisher. Thus we can use it as a Publisher.
Next, we need to create subscribers. Java doesn't have class implementation of Flow.Subscriber. We need to create a class that will implement
Flow.Subscriber
.
This example demonstrates creation of reactive-stream using Flow API in java.
import java.util.concurrent.Flow.*; import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.Random; public class SampleClass{ public static void main(String[] args) throws InterruptedException{ Random rand = new Random(); int subCount = 2; ExecutorService es = Executors.newFixedThreadPool(subCount); SubmissionPublisher<Integer> sp = new SubmissionPublisher<> (es, java.util.concurrent.Flow .defaultBufferSize()); for(int i = 0; i < subCount; i++) sp.subscribe(new Receiver("Receiver"+i)); for(int i = 0; i < 3; i++) sp.submit(rand.nextInt(50)); //pause main thread for a bit //before closing executor and //publisher //to assure all tasks are //complete Thread.sleep(200); sp.close(); es.shutdown(); } } class Receiver implements Subscriber<Integer>{ private Subscription subscription; private String receiverName; Receiver(String receiverName){ this.receiverName = receiverName; } @Override public void onSubscribe(Subscription subscription){ System.out.println(receiverName + " has subscribed!"); this.subscription = subscription; this.subscription.request(1); } @Override public void onNext(Integer item){ System.out.println(receiverName + " got: " + item); this.subscription.request(1); } @Override public void onError(Throwable e){ e.printStackTrace(); } @Override public void onComplete(){ System.out.println(receiverName + " is done!"); } } Result(may vary) Receiver0 has subscribed! Receiver1 has subscribed! Receiver0 got: 47 Receiver1 got: 47 Receiver0 got: 20 Receiver1 got: 20 Receiver0 got: 21 Receiver1 got: 21 Receiver0 is done! Receiver1 is done!First off, let's examine Receiver class that implements Subscriber interface. Subscriber interface has four methods that are needed to be overriden.
onSubscribe()
method is the first method to be invoked once Publisher makes Subscriber subscribe to it by invoking subscribe()
method.
onNext()
method invoked everytime Subscriber fulfills a request from the Subscription. onError()
method
is invoked upon an unrecoverable error encountered by a Publisher or Subscription, after which no other Subscriber methods are invoked by the Subscription. onComplete()
method is invoked if Publisher is not producing any data or Publisher is closed. If there are leftover submitted data and then Publisher is closed, onComplete()
may not be invoked.
request()
method adds the given number n
of items to the current unfulfilled demand for this subscription. If n
is less than or equal to zero, the Subscriber will receive an onError signal with an IllegalArgumentException argument. Otherwise, the Subscriber will receive up to n
additional onNext invocations (or fewer if terminated). Everytime a request is fulfilled, n
will be reduced.
submit()
method submits a value to Publisher that will be distributed to Subscribers. close()
method issues onComplete signals to current subscribers, and disallows subsequent attempts to publish, unless already closed. Upon return, this method does NOT guarantee that all subscribers have yet completed.
We need to use some kind of flag like boolean or counter flag or some kind of timing if we want all subscribers to be done. I think the purpose of Flow API is to stream indefinite amount of data to subscribers. Thus, Publisher is not required to guarantee that all subscribers have completed before it closes.
defaultBufferSize() Returns a default value for Publisher or Subscriber buffering, that may be used in the absence of other constraints. The current value returned is 256.
SubmissionPublisher constructor has three forms. In the example above, I used this form:
SubmissionPublisher(Executor executor, int maxBufferCapacity)
This form creates a new SubmissionPublisher using the given Executor for async delivery to subscribers, with the given maximum buffer size for each subscriber. More information about SubmissionPublisher's constructors can be found in the documentation.
Note that if any Subscriber method throws an exception, its subscription is cancelled. To explicitly cancel a subscription, use the
cancel()
method in the Flow.Subscription
interface.
Flow.Processor Interface
Flow.Processor is a Flow API component that acts as both a Subscriber and Publisher. This component is mainly used for transforming messages that we sent to subscribers.
This example demonstrates Flow.Processor.
import java.util.concurrent.Flow.*; import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.Random; import java.util.function.Function; public class SampleClass{ public static void main(String[] args) throws InterruptedException{ Random rand = new Random(); ExecutorService es = Executors.newFixedThreadPool(2); SubmissionPublisher<Number> sp = new SubmissionPublisher<> (es, java.util.concurrent.Flow .defaultBufferSize()); Transformer t1 = new Transformer("t1", String::valueOf); sp.subscribe(t1); t1.subscribe(new Receiver("r1")); t1.subscribe(new Receiver("r2")); //submit three values sp.submit(rand.nextInt(50)); sp.submit(rand.nextDouble()); sp.submit(rand.nextInt(10)); //pause main thread for a bit //before closing executor and //publishers //to assure all tasks are //complete Thread.sleep(200); sp.close(); t1.close(); es.shutdown(); } } class Transformer extends SubmissionPublisher<String> implements Processor<Number, String>{ private Subscription subscription; private String processorName; private Function<Number, String> transform; Transformer(String processorName, Function<Number, String> transform){ this.processorName = processorName; this.transform = transform; } @Override public void onSubscribe(Subscription subscription){ System.out.println(processorName + " has subscribed!"); this.subscription = subscription; this.subscription.request(3); } @Override public void onNext(Number item){ submit(transform.apply(item)); } @Override public void onError(Throwable e){ e.printStackTrace(); } @Override public void onComplete(){ System.out.println(processorName + " is done!"); } } class Receiver implements Subscriber<String>{ private Subscription subscription; private String receiverName; Receiver(String receiverName){ this.receiverName = receiverName; } @Override public void onSubscribe(Subscription subscription){ System.out.println(receiverName + " has subscribed!"); this.subscription = subscription; this.subscription.request(3); } @Override public void onNext(String item){ System.out.println(receiverName + " got: " + item); } @Override public void onError(Throwable e){ e.printStackTrace(); } @Override public void onComplete(){ System.out.println(receiverName + " is done!"); } } Result(may vary) t1 has subscribed! r1 has subscribed! r2 has subscribed! r1 got: 15 r1 got: 0.5124233779675397 r1 got: 4 r2 got: 15 r2 got: 0.5124233779675397 r2 got: 4 r1 is done! t1 is done! r2 is done!First off, let's examine Transformer class that extends SubmissionPublisher and implements Processor. When a class implements Processor, it needs to override 5 methods:
subscribe
in Publisher and onSubscribe
, onNext
, onError
and onComplete
methods in Subscriber.
I extended SubmissionPublisher so that I didn't need to override
subscribe
method because SubmissionPublisher already overrides subscribe
. Moreover, we can use some SubmissionPublisher methods in Transformer class if we needed. In the example above, I used the submit()
method in onNext()
method.
Now, let's discuss the Flow mechanism in the example above. Transformer class is a Publisher/Subscriber component. Thus, this class can subscribe to a Publisher. In the example above, I created a SubmissionPublisher
sp
with a Number
generic type. The generic type of sp
is based on the Processor<T, R>
.
In the example above, the parameter types of Processor are
<Number, String>
. Parameter type T
is the subscribed item type. Therefore, Number is the subscribed item type in Transformer class. Parameter type R
is the published item type. Therefore, String is the published item type.
The reason why
sp
has Number generic-type argument is because the subscriber of sp
is Transformer class. Number type is the subscribed item type of Transformer class. Therefore, Publisher sp
needs to publish data with Number type.
Next,
Receiver
is Subscriber of Transformer class. The reason why Receiver has String generic-type argument is because String is the published item type of Transformer. As you can see in the example above, Processor transform Number to String by using a Function functional interface. submit()
method in onNext()
method of Transformer class submits the transformed data to the subscriber of Transformer which is the Receiver.
No comments:
Post a Comment