Monday, January 24, 2022

Java Tutorial: Flow API

Chapters

Flow API

Interrelated interfaces and static methods for establishing flow-controlled components in which Publishers produce items consumed by one or more Subscribers, each managed by a Subscription.

These interfaces correspond to the reactive-streams specification. They apply in both concurrent and distributed asynchronous settings: All (seven) methods are defined in void "one-way" message style. More information can be found in the documentation

To create a simple reactive-stream using java Flow API, we need three components: Publisher, Subscriber and Subscription. First off, let's create a Publisher. SubmissionPublisher implements Flow.Publisher. Thus we can use it as a Publisher.

Next, we need to create subscribers. Java doesn't have class implementation of Flow.Subscriber. We need to create a class that will implement Flow.Subscriber.

This example demonstrates creation of reactive-stream using Flow API in java.
import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.Random;

public class SampleClass{

  public static void main(String[] args)
                throws InterruptedException{
    Random rand = new Random();
    int subCount = 2;
    ExecutorService es = 
    Executors.newFixedThreadPool(subCount);
    
    SubmissionPublisher<Integer> sp =
    new SubmissionPublisher<>
    (es, java.util.concurrent.Flow
         .defaultBufferSize());
    
    for(int i = 0; i < subCount; i++)
      sp.subscribe(new Receiver("Receiver"+i));
      
    for(int i = 0; i < 3; i++)
      sp.submit(rand.nextInt(50));
    
    //pause main thread for a bit
    //before closing executor and
    //publisher
    //to assure all tasks are
    //complete
    Thread.sleep(200);
    
    sp.close();
    es.shutdown();
  }
}

class Receiver implements Subscriber<Integer>{
  
  private Subscription subscription;
  private String receiverName;
  
  Receiver(String receiverName){
    this.receiverName = receiverName;
  }
  
  @Override
  public void onSubscribe(Subscription subscription){
    System.out.println(receiverName + " has subscribed!");
    this.subscription = subscription;
    this.subscription.request(1);
  }
  
  @Override
  public void onNext(Integer item){
    System.out.println(receiverName + " got: " + 
    item);
    this.subscription.request(1);
  }
  
  @Override
  public void onError(Throwable e){
    e.printStackTrace();
  }
  
  @Override
  public void onComplete(){
    System.out.println(receiverName + 
    " is done!");
  }
  
}

Result(may vary)
Receiver0 has subscribed!
Receiver1 has subscribed!
Receiver0 got: 47
Receiver1 got: 47
Receiver0 got: 20
Receiver1 got: 20
Receiver0 got: 21
Receiver1 got: 21
Receiver0 is done!
Receiver1 is done!
First off, let's examine Receiver class that implements Subscriber interface. Subscriber interface has four methods that are needed to be overriden. onSubscribe() method is the first method to be invoked once Publisher makes Subscriber subscribe to it by invoking subscribe() method.

onNext() method invoked everytime Subscriber fulfills a request from the Subscription. onError() method is invoked upon an unrecoverable error encountered by a Publisher or Subscription, after which no other Subscriber methods are invoked by the Subscription. onComplete() method is invoked if Publisher is not producing any data or Publisher is closed. If there are leftover submitted data and then Publisher is closed, onComplete() may not be invoked.

request() method adds the given number n of items to the current unfulfilled demand for this subscription. If n is less than or equal to zero, the Subscriber will receive an onError signal with an IllegalArgumentException argument. Otherwise, the Subscriber will receive up to n additional onNext invocations (or fewer if terminated). Everytime a request is fulfilled, n will be reduced.

submit() method submits a value to Publisher that will be distributed to Subscribers. close() method issues onComplete signals to current subscribers, and disallows subsequent attempts to publish, unless already closed. Upon return, this method does NOT guarantee that all subscribers have yet completed.

We need to use some kind of flag like boolean or counter flag or some kind of timing if we want all subscribers to be done. I think the purpose of Flow API is to stream indefinite amount of data to subscribers. Thus, Publisher is not required to guarantee that all subscribers have completed before it closes.

defaultBufferSize() Returns a default value for Publisher or Subscriber buffering, that may be used in the absence of other constraints. The current value returned is 256.

SubmissionPublisher constructor has three forms. In the example above, I used this form:
SubmissionPublisher(Executor executor, int maxBufferCapacity)
This form creates a new SubmissionPublisher using the given Executor for async delivery to subscribers, with the given maximum buffer size for each subscriber. More information about SubmissionPublisher's constructors can be found in the documentation.

Note that if any Subscriber method throws an exception, its subscription is cancelled. To explicitly cancel a subscription, use the cancel() method in the Flow.Subscription interface.

Flow.Processor Interface

Flow.Processor is a Flow API component that acts as both a Subscriber and Publisher. This component is mainly used for transforming messages that we sent to subscribers.

This example demonstrates Flow.Processor.
import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.Random;
import java.util.function.Function;

public class SampleClass{

  public static void main(String[] args) 
                throws InterruptedException{
    Random rand = new Random();
    ExecutorService es = 
    Executors.newFixedThreadPool(2);
    
    SubmissionPublisher<Number> sp =
    new SubmissionPublisher<>
    (es, java.util.concurrent.Flow
         .defaultBufferSize());
    
    Transformer t1 = 
    new Transformer("t1", String::valueOf);
    sp.subscribe(t1);
    t1.subscribe(new Receiver("r1"));
    t1.subscribe(new Receiver("r2"));
    
    //submit three values
    sp.submit(rand.nextInt(50));
    sp.submit(rand.nextDouble());
    sp.submit(rand.nextInt(10));
    
    //pause main thread for a bit
    //before closing executor and
    //publishers
    //to assure all tasks are
    //complete
    Thread.sleep(200);
    
    sp.close();
    t1.close();
    es.shutdown();
  }
}

class Transformer extends SubmissionPublisher<String>
implements Processor<Number, String>{

  private Subscription subscription;
  private String processorName;
  private Function<Number, String> transform;
  
  Transformer(String processorName, 
  Function<Number, String> transform){
    this.processorName = processorName;
    this.transform = transform;
  }
  
  @Override
  public void onSubscribe(Subscription subscription){
    System.out.println(processorName + " has subscribed!");
    this.subscription = subscription;
    this.subscription.request(3);
  }
  
  @Override
  public void onNext(Number item){
    submit(transform.apply(item));
  }
  
  @Override
  public void onError(Throwable e){
    e.printStackTrace();
  }
  
  @Override
  public void onComplete(){
    System.out.println(processorName + 
    " is done!");
  }
}

class Receiver implements Subscriber<String>{
  
  private Subscription subscription;
  private String receiverName;
  
  Receiver(String receiverName){
    this.receiverName = receiverName;
  }
  
  @Override
  public void onSubscribe(Subscription subscription){
    System.out.println(receiverName + " has subscribed!");
    this.subscription = subscription;
    this.subscription.request(3);
  }
  
  @Override
  public void onNext(String item){
    System.out.println(receiverName + " got: " + 
    item);
  }
  
  @Override
  public void onError(Throwable e){
    e.printStackTrace();
  }
  
  @Override
  public void onComplete(){
    System.out.println(receiverName + 
    " is done!");
  }
  
}

Result(may vary)
t1 has subscribed!
r1 has subscribed!
r2 has subscribed!
r1 got: 15
r1 got: 0.5124233779675397
r1 got: 4
r2 got: 15
r2 got: 0.5124233779675397
r2 got: 4
r1 is done!
t1 is done!
r2 is done!
First off, let's examine Transformer class that extends SubmissionPublisher and implements Processor. When a class implements Processor, it needs to override 5 methods: subscribe in Publisher and onSubscribe, onNext, onError and onComplete methods in Subscriber.

I extended SubmissionPublisher so that I didn't need to override subscribe method because SubmissionPublisher already overrides subscribe. Moreover, we can use some SubmissionPublisher methods in Transformer class if we needed. In the example above, I used the submit() method in onNext() method.

Now, let's discuss the Flow mechanism in the example above. Transformer class is a Publisher/Subscriber component. Thus, this class can subscribe to a Publisher. In the example above, I created a SubmissionPublisher sp with a Number generic type. The generic type of sp is based on the Processor<T, R>.

In the example above, the parameter types of Processor are <Number, String>. Parameter type T is the subscribed item type. Therefore, Number is the subscribed item type in Transformer class. Parameter type R is the published item type. Therefore, String is the published item type.

The reason why sp has Number generic-type argument is because the subscriber of sp is Transformer class. Number type is the subscribed item type of Transformer class. Therefore, Publisher sp needs to publish data with Number type.

Next, Receiver is Subscriber of Transformer class. The reason why Receiver has String generic-type argument is because String is the published item type of Transformer. As you can see in the example above, Processor transform Number to String by using a Function functional interface. submit() method in onNext() method of Transformer class submits the transformed data to the subscriber of Transformer which is the Receiver.

No comments:

Post a Comment