Monday, January 10, 2022

Java Tutorial: Executors Class

Chapters

Executors Class

Note: It's recommended to be knowledgeable about java.util.concurrent Package before reading this article.

Executors class contains Factory and utility methods for Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable classes defined in this package. This class supports the following kinds of methods:
  • Methods that create and return an ExecutorService set up with commonly useful configuration settings.
  • Methods that create and return a ScheduledExecutorService set up with commonly useful configuration settings.
  • Methods that create and return a "wrapped" ExecutorService, that disables reconfiguration by making implementation-specific methods inaccessible.
  • Methods that create and return a ThreadFactory that sets newly created threads to a known state.
  • Methods that create and return a Callable out of other closure-like forms, so they can be used in execution methods requiring Callable.
I'm gonna demonstrate some methods in this class. My explanation here is simplified, more information can be found in the documentation.

newSingleThreadExecutor() Method

Creates an Executor that uses a single worker thread operating off an unbounded queue. Tasks are guaranteed to execute sequentially, and no more than one task will be active at any given time. Unlike the otherwise equivalent newFixedThreadPool(1) the returned executor is guaranteed not to be reconfigurable to use additional threads.

Note however that if this single thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks.

This example demonstrates newSingleThreadExecutor().
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

public class SampleClass{

  public static void main(String[] args){
    ExecutorService es =
    Executors.newSingleThreadExecutor();
    
    es.execute(() -> System.out.println("Task #1"));
    es.execute(() -> System.out.println("Task #2"));
    es.execute(() -> System.out.println("Task #3"));
    es.execute(() -> System.out.println("Task #4"));
    es.shutdown();
  }
}

Result
Task #1
Task #2
Task #3
Task #4

callable() Method

callable() returns a Callable object. This method has four forms. However, I'll only demonstrate two of them. I'll demonstrate these two:
callable(Runnable task)
callable(Runnable task, T result)
I won't demonstrate these two:
callable(PrivilegedAction<?> action)
callable(PrivilegedExceptionAction<?> action)
PrivilegedAction and PrivilegedExceptionAction are connected to AccessController which is deprecated in java 17 and will be removed in future version of java.

This example demonstrates callable(Runnable task).
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;

public class SampleClass{

  public static void main(String[] args)
                     throws InterruptedException,
                            ExecutionException{
    ExecutorService es =
    Executors.newSingleThreadExecutor();
    
    Future<?> future = 
    es.submit(Executors.callable(
    () -> System.out.println("callable task!")));
    es.shutdown();
    
    //used to block main thread
    future.get();
    
    System.out.println("Exiting main thread.");
  }
}
Typically, we use get() to get result. Although, in some situation, we may wanna only use get() to block a thread especially for executors that don't accept Runnable. callable() method executes its Runnable and returns null.

Next, this example demonstrates callable(Runnable task, T result)
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;

public class SampleClass{

  public static void main(String[] args)
                     throws InterruptedException,
                            ExecutionException{
     CompletableFuture<String> cf =
     CompletableFuture.supplyAsync(() -> "Test");
     
     ExecutorService es =
     Executors.newSingleThreadExecutor();
     
     Future<String> future = 
     es.submit(
     Executors.callable(() -> 
     System.out.println("Executing Future..."),
     cf.get()));
     es.shutdown();
     
     System.out.println("Result: " + future.get());
  }
}

Executing Future...
Result: Test
If you're not knowledgeable about CompletableFuture, you may wanna read my blogpost about CompletableFuture.

defaultThreadFactory() Method

Returns a default thread factory used to create new threads. This factory creates all new threads used by an Executor in the same ThreadGroup. If there is a SecurityManager, it uses the group of System.getSecurityManager(), else the group of the thread invoking this defaultThreadFactory method.

Each new thread is created as a non-daemon thread with priority set to the smaller of Thread.NORM_PRIORITY and the maximum priority permitted in the thread group. New threads have names accessible via Thread.getName() of pool-N-thread-M, where N is the sequence number of this factory, and M is the sequence number of the thread created by this factory.
import java.util.concurrent.Executors;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;

public class SampleClass{

  public static void main(String[] args){
    
    CustomExecutor ce1 = new CustomExecutor();
    
    CustomExecutor ce2 = 
    new CustomExecutor(
    Executors.defaultThreadFactory());
    
    ce1.execute(() ->
    System.out.println(
    "ce1: "+Thread.currentThread().getName()));
    
    ce2.execute(() ->
    System.out.println(
    "ce2: "+Thread.currentThread().getName()));
  }
}

class CustomExecutor implements Executor{
  private ThreadFactory tFactory;
  
  CustomExecutor(){}
  
  CustomExecutor(ThreadFactory tFactory){
    this.tFactory = tFactory;
  }
  
  @Override
  public void execute(Runnable command){
    if(tFactory == null)
      new Thread(command).start();
    else
      tFactory.newThread(command).start();
  }
}

Result(may vary)
ce2: pool-1-Thread-1
ce1: Thread-0

newCachedThreadPool() Method

Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available. These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks. Calls to execute will reuse previously constructed threads if available.

If no existing thread is available, a new thread will be created and added to the pool. Threads that have not been used for sixty seconds are terminated and removed from the cache. Thus, a pool that remains idle for long enough will not consume any resources.

If you want to create your own custom cached thread pool, consider using ThreadPoolExecutor. Although, this method is enough for most situations.

This method has two forms. I'll only demonstrate this form:
public static ExecutorService newCachedThreadPool()

This example demonsrates newCachedThreadPool().
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

public class SampleClass{

  public static void main(String[] args)
                     throws InterruptedException{
  
    ExecutorService es =
    Executors.newCachedThreadPool();
    
    es.execute(() ->
    System.out.println(
    Thread.currentThread()
    .getName()+": task #1"));
    
    es.execute(() ->
    System.out.println(
    Thread.currentThread()
    .getName()+": task #2"));
    
    Thread.sleep(50);
    
    es.execute(() ->
    System.out.println(
    Thread.currentThread()
    .getName()+": task #3"));
    
    es.execute(() ->
    System.out.println(
    Thread.currentThread()
    .getName()+": task #4"));
    
    es.shutdown();
  }
}

Result(may vary)
pool-1-Thread-1: task #1
pool-1-Thread-2: task #2
pool-1-Thread-2: task #3
pool-1-Thread-1: task #4

newFixedThreadPool() Method

Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available.

If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. The threads in the pool will exist until it is explicitly shutdown.

This method has two forms. I'll only demonstrate this form:
public static ExecutorService newFixedThreadPool(int nThreads)

This example demonstrates newFixedThreadPool(int nThreads).
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

public class SampleClass{

  public static void main(String[] args)
                     throws InterruptedException{
                     
    ExecutorService es =
    Executors.newFixedThreadPool(4);
    
    es.execute(() ->
    System.out.println(
    Thread.currentThread()
    .getName()+": task #1"));
    
    es.execute(() ->
    System.out.println(
    Thread.currentThread()
    .getName()+": task #2"));
    
    Thread.sleep(200);
    
    es.execute(() ->
    System.out.println(
    Thread.currentThread()
    .getName()+": task #3"));
    
    es.execute(() ->
    System.out.println(
    Thread.currentThread()
    .getName()+": task #4"));
    
    es.shutdown();
  }
}

Result(may vary)
pool-1-Thread-1: task #1
pool-1-Thread-2: task #2
pool-1-Thread-3: task #3
pool-1-Thread-4: task #4

newSingleThreadScheduledExecutor() Method

Creates a single-threaded executor that can schedule commands to run after a given delay, or to execute periodically. Tasks are guaranteed to execute sequentially, and no more than one task will be active at any given time. Unlike the otherwise equivalent newScheduledThreadPool(1) the returned executor is guaranteed not to be reconfigurable to use additional threads.

Note however that if this single thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks.

This method has two forms. I'll only demonstrate this form:
public static ScheduledExecutorService newSingleThreadScheduledExecutor()

This example demonstrates newSingleThreadScheduledExecutor().
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class SampleClass{

  public static void main(String[] args){
  
    ScheduledExecutorService ses = 
    Executors.newSingleThreadScheduledExecutor();
    
    Runnable command = () -> 
    System.out.println("Do command...");
    
    ScheduledFuture<?> task = 
    ses.scheduleWithFixedDelay(command, 2,
                      2, TimeUnit.SECONDS);
                      
    Runnable canceller = () ->
    task.cancel(false);
    
    //This method doesn't block the main thread
    ses.schedule(canceller, 12, TimeUnit.SECONDS);
    
    while(true){
      if(task.isCancelled()){
        ses.shutdown();
        break;
      }
    }
    
  }
}

Result
Do command...
Do command...
Do command...
Do command...
Do command...
This example is explained in this article. In that article, newScheduledThreadPool(1) is used. Although, in this example, newScheduledThreadPool(1) and newSingleThreadScheduledExecutor() are interchangeable.

newScheduledThreadPool() Method

Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically. This method has two forms. I'll only demonstrate this form:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

This example demonstrates newScheduledThreadPool(int corePoolSize).
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class SampleClass{

  public static void main(String[] args){
  
    ScheduledExecutorService ses = 
    Executors.newScheduledThreadPool(2);
    
    Runnable command = () -> 
    System.out.println(Thread.currentThread().getName());
    
    ScheduledFuture<?> task1 = 
    ses.scheduleWithFixedDelay(command, 2,
                      2, TimeUnit.SECONDS);
                      
    ScheduledFuture<?> task2 = 
    ses.scheduleWithFixedDelay(command, 2,
                      2, TimeUnit.SECONDS);
                      
    Runnable canceller = () -> {
    task1.cancel(false);
    task2.cancel(false);
    };
    
    //This method doesn't block the main thread
    ses.schedule(canceller, 6, TimeUnit.SECONDS);
    
    while(true){
      if(task1.isCancelled() &&
         task2.isCancelled()){
        ses.shutdown();
        break;
      }
    }
    
  }
}

Result(may vary)
pool-1-Thread-2
pool-1-Thread-1
pool-1-Thread-2
pool-1-Thread-1
I explained scheduleWithFixedDelay method in this article.

newWorkStealingPool() Method

Creates a work-stealing thread pool using the number of available processors as its target parallelism level.

This method has two forms. I'll only demonstrate this form:
public static ExecutorService newWorkStealingPool()

This example demonstrates newWorkStealingPool().
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

public class SampleClass{

  public static void main(String[] args){
  
    ExecutorService es =
    Executors.newWorkStealingPool();
    
    int procNum = Runtime.getRuntime()
                         .availableProcessors();
                         
    System.out.print("This Machine's "+
                       "Processor Count: ");
    System.out.print(procNum + "\n");
    
    es.execute(() -> {
    
      System.out.println
      (Thread.currentThread().getName()+
      " | Task #1");
      for(int i = 0; i < 1000; i++){}
    });
    
    es.execute(() -> {
    
      System.out.println
      (Thread.currentThread().getName()+
      " | Task #2");
      for(int i = 0; i < 10000; i++){}
    });
    
    es.execute(() -> {
    
      System.out.println
      (Thread.currentThread().getName()+
      " | Task #3");
      for(int i = 0; i < 100000; i++){}
    });
    
    es.execute(() -> {
    
      System.out.println
      (Thread.currentThread().getName()+
      " | Task #4");
      for(int i = 0; i < 1000000; i++){}
    });
    
    es.execute(() -> {
    
      System.out.println
      (Thread.currentThread().getName()+
      " | Task #5");
      for(int i = 0; i < 10000000; i++){}
    });
    es.shutdown();
    
    while(!es.isTerminated());
  }
}

Result(may vary)
This Machine's Processor Count: 4
ForkJoinPool-1-worker-1 | Task #1
ForkJoinPool-1-worker-1 | Task #3
ForkJoinPool-1-worker-2 | Task #2
ForkJoinPool-1-worker-4 | Task #5
ForkJoinPool-1-worker-3 | Task #4

unconfigurableExecutorService() Method

Returns an object that delegates all defined ExecutorService methods to the given executor, but not any other methods that might otherwise be accessible using casts. This provides a way to safely "freeze" configuration and disallow tuning of a given concrete implementation.

This method has two forms. I'll only demonstrate this form:
public static ExecutorService unconfigurableExecutorService(ExecutorService executor)

This example demonstrates unconfigurableExecutorService().
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

public class SampleClass{

  public static void main(String[] args){
  
    ExecutorService es = 
    Executors.unconfigurableExecutorService(
    Executors.newFixedThreadPool(1));
    
    ThreadPoolExecutor tpe = null;
    if(es instanceof ThreadPoolExecutor)
      tpe = (ThreadPoolExecutor)es;
    
    if(tpe != null){
      tpe.setMaximumPoolSize(3);
      tpe.setCorePoolSize(3);
    }
    else System.out.println("tpe is null!");
    
    es.execute(() -> 
    System.out.println(
    Thread.currentThread().getName()));
    
    es.execute(() -> 
    System.out.println(
    Thread.currentThread().getName()));
    
    es.execute(() -> 
    System.out.println(
    Thread.currentThread().getName()));
    
    es.shutdown();
  }
}

Result
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
In the example above, the executor "es" is still using a single thread even I set the core pool size to 3. Now, this next example demonstrates configurable executor.
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

public class SampleClass{

  public static void main(String[] args){
  
    ExecutorService es = Executors.newFixedThreadPool(1);
    
    ThreadPoolExecutor tpe = null;
    if(es instanceof ThreadPoolExecutor)
      tpe = (ThreadPoolExecutor)es;
    
    if(tpe != null){
      tpe.setMaximumPoolSize(3);
      tpe.setCorePoolSize(3);
    }
    else System.out.println("tpe is null!");
    
    es.execute(() -> 
    System.out.println(
    Thread.currentThread().getName()));
    
    es.execute(() -> 
    System.out.println(
    Thread.currentThread().getName()));
    
    es.execute(() -> 
    System.out.println(
    Thread.currentThread().getName()));
    
    es.shutdown();
  }
}

Result
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
Note the executors that are returned by newSingleThreadExecutor() and newSingleThreadScheduledExecutor() are unconfigurable by default.

No comments:

Post a Comment