Saturday, January 8, 2022

Java Tutorial: CompletableFuture Class

Chapters

Java Tutorial: CompletableFuture Class

Note: It's recommended to be knowledgeable about java.util.concurrent Package before reading this article.

CompletableFuture is closely similar to Future. Although, CompletableFuture can complete task by explicitly setting its value and status. Thus, making this Future a CompletableFuture. Also, CompletableFuture implements CompletionStage and Future. Thus, CompletableFuture can se used as Future and CompletionStage.

CompletionStage is a stage of a possibly asynchronous computation, that performs an action or computes a value when another CompletionStage completes. A stage completes upon termination of its computation, but this may in turn trigger other dependent stages. My explanation here is simplified. Take a look at the documentation for more information.

I will only demonstrate some useful methods of CompletableFuture in this tutorial. CompletableFuture has a lot of methods and you should read the CompletableFuture documentation to know those methods.

This example demonstrates CompletableFuture run by a single thread.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class SampleClass{

  public static void main(String[] args)
                     throws InterruptedException,
                            ExecutionException{
    CompletableFuture<String> cf =
    new CompletableFuture<String>();
    
    cf.complete("Hello World!");
    System.out.println(cf.get());
  }
}

Result: Hello World!
complete() method explicitly completes CompletableFuture with the specified value. get() waits, if necessary, until CompletableFuture is complete and returns the specified value. Most of the time, CompletableFuture is used for asynchronous tasks.

runAsync() Method

Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() after it runs the given action.

This method has two forms. In this example I'm gonna demonstrate this form:
public static CompletableFuture<Void> runAsync(Runnable runnable)
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class SampleClass{

  public static void main(String[] args)
                     throws InterruptedException,
                            ExecutionException{
    CompletableFuture<Void> cf =
    CompletableFuture.runAsync(() -> 
    System.out.println(Thread.currentThread().getName()));
    
    cf.get();
    System.out.println("CompletableFuture is complete.");
    System.out.println("Main Thread is unblocked.");
  }
}

Result
ForkJoinPool.commonPool-worker-1
CompletableFuture is complete.
Main Thread is unblocked.
runAsync() returns a Completed CompletableFuture<Void> with null value.
The Void class is an uninstantiable placeholder class to hold a reference to the Class object representing the Java keyword void. If we want a variable that only accepts null value then, we can use this class like this: Void v = null;
We can also use Void class as a method return type
static Void meth(){
  return null;
}
Next, this example demonstrates this form of runAsync():
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SampleClass{

  public static void main(String[] args)
                     throws InterruptedException,
                            ExecutionException{
    ExecutorService es = Executors.newSingleThreadExecutor();
    
    CompletableFuture<Void> cf =
    CompletableFuture.runAsync(() -> 
    System.out.println(Thread.currentThread().getName()),es);
    
    cf.get();
    es.shutdown();
    System.out.println("CompletableFuture is complete.");
    System.out.println("Main Thread is unblocked.");
  }
}

Result
pool-1-Thread-1
CompletableFuture is complete.
Main Thread is unblocked.
We can use the runAsync() form above to pass a custom executor to runAsync() method instead of using ForkJoinPool.commonPool().

supplyAsync() Method

Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() with the value obtained by calling the given Supplier.

supplyAsync() is similar to runAsync(). However, supplyAsync() has a return type that can be null or non-null whereas runAsync() only returns null.

This method has two forms. In this example I'm gonna demonstrate this form:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class SampleClass{

  public static void main(String[] args)
                     throws InterruptedException,
                            ExecutionException{
    CompletableFuture<String> cf =
    CompletableFuture.supplyAsync(() -> 
    Thread.currentThread().getName());
    
    System.out.println("Return value: "+cf.get());
    System.out.println("CompletableFuture is complete.");
    System.out.println("Main Thread is unblocked.");
  }
}

Result
Return value: ForkJoinPool.commonPool-worker-1
CompletableFuture is complete.
Main Thread is unblocked.
If you don't wanna use ForkJoinPool.commonPool() as your executor, you can use the second form of this method:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

allOf() Method

Method form: public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
Returns a new CompletableFuture that is completed when all of the given CompletableFutures complete. If any of the given CompletableFutures complete exceptionally, then the returned CompletableFuture also does so, with a CompletionException holding this exception as its cause.

Otherwise, the results, if any, of the given CompletableFutures are not reflected in the returned CompletableFuture, but may be obtained by inspecting them individually. If no CompletableFutures are provided, returns a CompletableFuture completed with the value null.

This example demonstrates allOf().
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class SampleClass{

  public static void main(String[] args)
                     throws InterruptedException,
                            ExecutionException{
    CompletableFuture<Void> cf1 =
    CompletableFuture.runAsync(() -> 
    System.out.println(Thread.currentThread().getName()));
    
    CompletableFuture<Void> cf2 =
    CompletableFuture.runAsync(() -> 
    System.out.println(Thread.currentThread().getName()));
    
    CompletableFuture.allOf(cf1, cf2);
    
    System.out.println("CompletableFutures are complete.");
    System.out.println("Main Thread is unblocked.");
  }
}

Result(may vary)
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
CompletableFutures are complete.
Main Thread is unblocked.
Next, this example demonstrates allOf() completed exceptionally.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class SampleClass{

  public static void main(String[] args)
                     throws InterruptedException{
    CompletableFuture<Void> cf1 =
    CompletableFuture.runAsync(() -> 
    System.out.println(Thread.currentThread().getName()));
    
    CompletableFuture<Void> cf2 =
    CompletableFuture.runAsync(() -> {
    throw new NullPointerException("Null");
    });
    
    CompletableFuture<Void> cf3 = 
    CompletableFuture.allOf(cf1, cf2);
    
    //allOf() may unblock waiting threads without thoroughly
    //checking the complete status of all CompletableFutures
    //in its argument. Thus, in this example,
    //cf3.isCompletedExceptionally() may return false
    //To prevent that from happening, we may check if cf3
    //is completely done with its task by using isDone() method
    //
    //while(!cf3.isDone());
    
    System.out.println("is completed exceptionally?: " +
    cf3.isCompletedExceptionally());
    
    System.out.println("CompletableFutures are complete.");
    System.out.println("Main Thread is unblocked.");
  }
}

Result(may vary)
ForkJoinPool.commonPool-worker-1
is completed exceptionally: true
CompletableFutures are complete.
Main Thread is unblocked.

anyOf() Method

Method form: public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
Returns a new CompletableFuture that is completed when any of the given CompletableFutures complete, with the same result. Otherwise, if it completed exceptionally, the returned CompletableFuture also does so, with a CompletionException holding this exception as its cause. If no CompletableFutures are provided, returns an incomplete CompletableFuture.

This example demonstrates anyOf().
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class SampleClass{

  static volatile String str = "Thread name: ";
  public static void main(String[] args)
                     throws InterruptedException,
                            ExecutionException{
    CompletableFuture<String> cf1 =
    CompletableFuture.supplyAsync(() -> 
    str + Thread.currentThread().getName());
    
    CompletableFuture<String> cf2 =
    CompletableFuture.supplyAsync(() -> 
    str + Thread.currentThread().getName());
    
    CompletableFuture<Object> cf3 =
    CompletableFuture.anyOf(cf1, cf2);
    
    System.out.println("Return value\n"+cf3.get());
    System.out.println("cf3 is complete.");
    System.out.println("Main Thread is unblocked.");
  }
}

Result(may vary)
Return value
Thread name: ForkJoinPool.commonPool-worker-1
cf3 is complete.
Main thread is unblocked.
Next, this example demonstrates allOf() that may be completed exceptionally.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class SampleClass{

  static volatile String str = "Thread name: ";
  public static void main(String[] args)
                     throws InterruptedException,
                            ExecutionException{
    CompletableFuture<Void> cf1 =
    CompletableFuture.supplyAsync(() -> {
    throw new NullPointerException("null");
    });
    
    CompletableFuture<String> cf2 =
    CompletableFuture.supplyAsync(() -> 
    str + Thread.currentThread().getName());
    
    CompletableFuture<Object> cf3 =
    CompletableFuture.anyOf(cf1, cf2);
    
    if(cf3.isCompletedExceptionally())
      System.out.println("Exceptionally Completed.");
    else
      System.out.println("Return value\n"+cf3.get());
    
    System.out.println("cf3 is complete.");
    System.out.println("Main Thread is unblocked.");
  }
}

Result(may vary)
Exceptionally Completed.
cf3 is complete.
Main Thread is unblocked.

Chaining CompletionStage Methods

CompletableFuture derives CompletionStage methods. These chained methods are called stages.

This example demonstrates Chaining CompletionStage Methods.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;

public class SampleClass{

  public static void main(String[] args)
                     throws InterruptedException,
                            ExecutionException{
    CompletionStage<String> cs =
    CompletableFuture.completedStage
    (Thread.currentThread().getName()).
    thenApply((x) -> x.concat(" | cf1")).
    thenCombine(CompletableFuture.completedStage("Test"),
    (t,u) -> t.concat(" | " + u));
    
    CompletableFuture<String> cf =
    cs.toCompletableFuture();
    
    System.out.println("Return value: " + cf.get());
  }
}

Result
Return value: main | cf1 | Test
First off, completedStage() returns a new CompletionStage that is already completed with the given value and supports only those methods in interface CompletionStage.

thenApply() applies the function in its argument to the completed CompletionStage instance that calls this method. thenCombine() combines the completed CompletionStage in its first argument with the completed CompletionStage that calls this method. toCompletableFuture() converts CompletableStage to CompletableFuture. Once CompletionStage is converted to CompletableFuture, we can use the get() method to get the result.

If we want to handle exception during method chaining, we can use exceptionally(), handle() and whenComplete() methods.

This example demonstrates handle(), exceptionally() and whenComplete() methods.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

public class SampleClass{

  public static void main(String[] args){
    CompletionStage<String> cs =
    CompletableFuture.completedStage
    (Thread.currentThread().getName())
    .handle((t,u) -> t.concat(null))
    .exceptionally((t) -> {
      throw new RuntimeException
      ("failed concatenation",t);
    })
    .whenComplete((t,u) -> {
    
      try{
        if(u == null && t != null){
          System.out.println("Concatenated value");
          System.out.println(t);
        }
        
        if(u != null)
          throw new RuntimeException(u);
      }
      catch(Exception e){
        System.out.println(e.getCause());
      }
      
    });
    
  }
}

Result
java.util.CompletionException: 
java.lang.RuntimeException:
failed concatenation
In the example above, CompletionException which is a subclass of RuntimeException is propagated to whenComplete() by wrapping it in RuntimeException. First off, handle() method executes its BiFunction in its argument if the completed CompletionStage that calls this method is completed normally or exceptionally. completed CompletionStage result and exception are the arguments in the supplied function.

exceptionally() executes its Function in its argument if the Completed CompletionStage that calls this method is completed exceptionally. Otherwise, its Function won't be exectured.completed CompletionStage result and exception are the arguments in the supplied function.

whenComplete() executes its BiConsumer in its argument if the completed CompletionStage that calls this method is completed normally or exceptionally. completed CompletionStage result and exception are the arguments in the supplied function.

We just handled one exception in the example above. Multiple exceptions may occur if we construct a more complex method chaining. You need to read the CompletionStage documentation
if you're planning to handle multiple exceptions while chaining CompletionStage methods.

If we want multiple threads to chain methods at the same time, we need to use those methods "asynchronous" versions. Their asynchronous versions have a suffix "async" in their names.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;

public class SampleClass{
  
  private static AtomicInteger num =
  new AtomicInteger(3);
  public static void main(String[] args){
    
    System.out.println("Initial value: " + num.get());
    
    CompletionStage<Void> cs1 =
    CompletableFuture.runAsync(() -> 
    {num.incrementAndGet();})
    .minimalCompletionStage()
    .thenRunAsync(() -> 
    {num.addAndGet(-3);});
    
    CompletionStage<Void> cs2 =
    CompletableFuture.runAsync(() -> 
    {num.addAndGet(2);})
    .minimalCompletionStage()
    .runAfterBothAsync(cs1, () -> 
    {num.addAndGet(5);});
    
    CompletableFuture.allOf(cs1.toCompletableFuture(),
                            cs2.toCompletableFuture());
    
    System.out.println("Current value: " + num.get());
  }
}

Result
Initial value: 3
Current value: 8
minimalCompletionStage() returns a CompletionStage with the same value as this CompletableFuture and cannot be independently completed or otherwise used in ways not defined by the methods of interface CompletionStage.

thenRunAsync(Runnable action) returns a new CompletionStage that, when this stage completes normally, executes the given action using this stage's default asynchronous execution facility.

runAfterBothAsync(CompletionStage<?> other, Runnable action) returns a new CompletionStage that, when this and the other given stage both complete normally, executes the given action using this stage's default asynchronous execution facility.

defaultExecutor() Method

Returns the default Executor used for async methods that do not specify an Executor. This class uses the ForkJoinPool.commonPool() if it supports more than one parallel thread, or else an Executor using one thread per async task. This method may be overridden in subclasses to return an Executor that provides at least one independent thread.

This example demonstrates defaultExecutor().
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ForkJoinPool;

public class SampleClass{

  public static void main(String[] args){
    CompletableFuture<Void> cf = 
    CompletableFuture.completedFuture(null);
    
    if(cf.defaultExecutor() instanceof ForkJoinPool)
      System.out.println
      ("Default executor is an instance of ForkJoinPool.");
    else
      System.out.println
      ("Default executor is not an instance of ForkJoinPool.");
  }
}
Result(if your system supports more than one parallel thread)
Default executor is an instance of ForkJoinPool.

delayedExecutor() Method

Returns a new Executor that submits a task to the default executor after the given delay (or no delay if non-positive). Each delay commences upon invocation of the returned executor's execute method. This method has two forms, in this example I'm gonna demonstrate this form:
public static Executor delayedExecutor(long delay, TimeUnit unit)
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

public class SampleClass{
  
  private static volatile boolean isdone = false;
  public static void main(String[] args){
  
    System.out.println("Initial value: " + isdone);
    CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS)
    .execute(() -> {isdone = true;});
    
    while(!isdone);
    
    System.out.println("Current value: " + isdone);
  }
}

No comments:

Post a Comment