Monday, January 3, 2022

Java Tutorial: Exploring java.util.concurrent Package

Chapters

java.util.concurrent Package

This package contains classes and interfaces that are commonly useful in concurrent programming. In computer science, concurrency is the ability of different parts or units of a program, algorithm, or problem to be executed out-of-order or at the same time simultaneously partial order, without affecting the final outcome. This allows for parallel execution of the concurrent units, which can significantly improve overall speed of the execution in multi-processor and multi-core systems.

Parallel computing(Parallelism) is a type of computation in which many calculations or processes are carried out simultaneously. Large problems can often be divided into smaller ones, which can then be solved at the same time.

Readers need to be knowledgeable about multithreading first before reading this tutorial. In this tutorial, we're gonna discuss some classes and interfaces of this package. The explanation in this tutorial is simplified. More information can be found in the documentation.

Locks

In the multithreading topic, we learned the concept of locks(monitors) via synchronized methods and blocks. However, The concurrent package has different types of locks that we can use to create a synchronized system with locks that don't rely on synchonized methods and blocks.

ReentrantLock

ReentrantLock is a reentrant mutual exclusion Lock with the same basic behavior and semantics as the implicit monitor lock accessed using synchronized methods and statements, but with extended capabilities.

A ReentrantLock is owned by the thread last successfully locking, but not yet unlocking it. A thread invoking lock will return, successfully acquiring the lock, when the lock is not owned by another thread. The method will return immediately if the current thread already owns the lock. This can be checked using methods isHeldByCurrentThread(), and getHoldCount().

The constructor for this class accepts an optional fairness parameter. When set true, under contention, locks favor granting access to the longest-waiting thread. Otherwise this lock does not guarantee any particular access order. Programs using fair locks accessed by many threads may display lower overall throughput (i.e., are slower; often much slower) than those using the default setting, but have smaller variances in times to obtain locks and guarantee lack of starvation.

Note however, that fairness of locks does not guarantee fairness of thread scheduling. Thus, one of many threads using a fair lock may obtain it multiple times in succession while other active threads are not progressing and not currently holding the lock. Also note that the untimed tryLock() method does not honor the fairness setting. It will succeed if the lock is available even if other threads are waiting.

It is recommended practice to always immediately follow a call to lock with a try-finally block. Take a look at this example:
import java.util.concurrent.locks.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;

public class SampleClass{
  
  public static void main(String[] args){
  
    ArrayList<Integer> ar =
    new ArrayList<>();
    
    Collections.addAll(ar, 2, 4, 6, 8, 10);
                       
    MyCollection mc = new MyCollection(ar);
    
    Thread t1 = new Thread(
    () -> {
      try{
        while(mc.addInEveryElem(3))
          Thread.sleep(100);
      }
      catch(InterruptedException e){
        e.printStackTrace();
      }
      
    });
    
    Thread t2 = new Thread(
    () -> {
      try{
        while(mc.addInEveryElem(2))
          Thread.sleep(100);
      }
      catch(InterruptedException e){
        e.printStackTrace();
      }
    });
    
    t1.start();
    t2.start();
  }
}

class MyCollection{

  private final Lock lock = new ReentrantLock();
  private ArrayList<Integer> ar;
  private Iterator<Integer> it;
  
  MyCollection(ArrayList<Integer> ar){
    this.ar = new ArrayList<>(
    Collections.synchronizedList(ar));
    it = ar.iterator();
  }
  
  boolean addInEveryElem(int num){
  
    lock.lock();
    try{
      
      if(it.hasNext()){
      
        String t = Thread.currentThread()
                   .getName();
        int elem = it.next();
        
        System.out.println("Thread Name: " + t);
        System.out.println("Add: " + elem + " + " + num);
        System.out.println("Sum: " + (elem + num));
        
        return true;
      }
      else
        return false;
      
    }
    finally{
      lock.unlock();
    }
  }
  
}

Result(may vary)
Thread Name: Thread-0
Add: 2 + 3
Sum: 5
Thread Name: Thread-1
Add: 4 + 2
Sum: 6
Thread Name: Thread-0
Add: 6 + 3
Sum: 9
Thread Name: Thread-1
Add: 8 + 2
Sum: 10
Thread Name: Thread-0
Add: 10 + 3
Sum: 13
In the example above, we see that we can use this lock mechanism to synchronize a section of a block. In synchonized block, the whole block is synchronized. ReentrantLock has a constructor(ReentrantLock(boolean fair)) that can enable fairness. Fairness grants the longest-waiting thread an access to a lock. Fairness can reduce thread contention and starvation.

If a lock is acquired and other threads try to acquire the lock via lock() method, those threads will be blocked until the lock is released.

Another way of acquiring lock using ReentrantLock is the tryLock(). tryLock() method returns true if the lock is successfully acquired. Otherwise, returns false. Unlike lock(), this method doesn't block threads.
import java.util.concurrent.locks.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;

public class SampleClass{
  
  public static void main(String[] args){
  
    ArrayList<Integer> ar =
    new ArrayList<>();
    
    Collections.addAll(ar, 2, 4, 6, 8, 10);
                       
    MyCollection mc = new MyCollection(ar);
    
    Thread t1 = new Thread(
    () -> {
      try{
        while(mc.addInEveryElem(3))
          Thread.sleep(100);
      }
      catch(InterruptedException e){
        e.printStackTrace();
      }
      
    });
    
    Thread t2 = new Thread(
    () -> {
      try{
        while(mc.addInEveryElem(2))
          Thread.sleep(100);
      }
      catch(InterruptedException e){
        e.printStackTrace();
      }
    });
    
    t1.start();
    t2.start();
  }
}

class MyCollection{

  private final Lock lock = new ReentrantLock();
  private ArrayList<Integer> ar;
  private Iterator<Integer> it;
  
  MyCollection(ArrayList<Integer> ar){
    this.ar = new ArrayList<>(
    Collections.synchronizedList(ar));
    it = ar.iterator();
  }
  
  boolean addInEveryElem(int num){
  
    if(!lock.tryLock()){
      System.out.println(
      Thread.currentThread().getName() +
      " can't acquire the lock.");
      
      return true;
    }
    
    try{
      
      if(it.hasNext()){
      
        String t = Thread.currentThread()
                   .getName();
        int elem = it.next();
        
        System.out.println("Thread Name: " + t);
        System.out.println("Add: " + elem + " + " + num);
        System.out.println("Sum: " + (elem + num));
        
        return true;
      }
      else
        return false;
      
    }
    finally{
      lock.unlock();
    }
  }
  
}

Result(may vary)
Thread Name: Thread-0
Add: 2 + 3
Sum: 5
Thread-1 can't acquire the lock.
Thread Name: Thread-0
Add: 4 + 3
Sum: 7
Thread Name: Thread-1
Add: 6 + 2
Sum: 8
Thread Name: Thread-0
Add: 8 + 3
Sum: 11
Thread Name: Thread-1
Add: 10 + 2
Sum: 12
Another way of acquiring lock using ReentrantLock is using the lockInterruptibly() method. This method allows blocked threads that wait for a thread to release the lock to be interrupted. Take a look at this example.
import java.util.concurrent.locks.*;

public class SampleClass{

  public static void main(String[] args)
                throws InterruptedException{
  
    Thread mainThread = Thread.currentThread();
    
    ClassA ca = new ClassA("String");
    
    Thread t1 = new Thread(
    () -> {
      try{
        ca.displayString(
        Thread.currentThread(),
        mainThread);
      }
      catch(InterruptedException e){
      
        System.out.println(
        Thread.currentThread().getName()
        + " is interrupted.");
      }
    });
    t1.start();
    
    ca.displayString(mainThread, t1);
    
  }
}

class ClassA{
  private final Lock lock = new ReentrantLock();
  private final String str;
  
  ClassA(String str){
    this.str = str;
  }
  
  void displayString(Thread t, Thread other)
                 throws InterruptedException{
    
    lock.lockInterruptibly();
    try{
      other.interrupt();
      System.out.println(t.getName() + ": " + str);
    }
    finally{
      lock.unlock();
    }
    
  }
}

Result(may vary)
Thread-0 is interrupted
main: String
If a thread acquires a lock multiple times, ReentrantLock has a method that can count how many times the thread acquired the lock. This method is the getHoldCount() method. In Reentrant Synchronization, we can't count how many times the thread acquired the lock. Acquiring certain lock multiple times is also called as reentrant.
import java.util.concurrent.locks.*;

public class SampleClass{

  public static void main(String[] args){
  
    ClassA ca = new ClassA();
    ca.methC();
  }
}

class ClassA{
  
  private final ReentrantLock lock =
  new ReentrantLock();
  
  private void methA(){
    lock.lock();
    try{
      System.out.println("methA");
      System.out.println("reentrant count: " +
      lock.getHoldCount());
    }
    finally{
      lock.unlock();
    }
  }
  
  private void methB(){
    lock.lock();
    try{
      System.out.println("methB");
      System.out.println("reentrant count: " +
      lock.getHoldCount());
      methA();
    }
    finally{
      lock.unlock();
    }
  }
  
  void methC(){
  
    lock.lock();
    try{
      System.out.println("methC");
      System.out.println("reentrant count: " +
      lock.getHoldCount());
      methB();
    }
    finally{
      lock.unlock();
    }
  }
}

Result
methC
reentrant count: 1
methB
reentrant count: 2
methA
reentrant count: 3
Take note, according to the documentation, the hold count information is typically only used for testing and debugging purposes.

ReentrantReadWriteLock

This type of ReentrantLock extends ReadWriteLock interface and has two locks: readLock() and writeLock(). readLock() may be held simultaneously by multiple threads, so long as there are no writers. writeLock() is exclusive.

According to the documentation, ReentrantReadWriteLock can be used to improve concurrency in some uses of some kinds of Collections. This is typically worthwhile only when the collections are expected to be large, accessed by more reader threads than writer threads, and entail operations with overhead that outweighs synchronization overhead. For example, here is a class using a TreeMap that is expected to be large and concurrently accessed.
 class RWDictionary {
   private final Map<String, Data> m =
   new TreeMap<>();
   private final ReentrantReadWriteLock rwl =
   new ReentrantReadWriteLock();
   private final Lock r = rwl.readLock();
   private final Lock w = rwl.writeLock();

   public Data get(String key) {
     r.lock();
     try { return m.get(key); }
     finally { r.unlock(); }
   }
   public List<String> allKeys() {
     r.lock();
     try { return new ArrayList<>(m.keySet()); }
     finally { r.unlock(); }
   }
   public Data put(String key, Data value) {
     w.lock();
     try { return m.put(key, value); }
     finally { w.unlock(); }
   }
   public void clear() {
     w.lock();
     try { m.clear(); }
     finally { w.unlock(); }
   }
 }
ReentrantReadWriteLock has useful features like fair mode, lock downgrading and many more. These features are explained in the documentation.

StampedLock

StampedLock lock can be separated to read lock and write lock, which is similar to the lock in ReentrantReadWriteLock. readLock() and writeLock() of StampedLock possibly block waiting threads and return a stamp that can be used to release a lock or verify the status of lock.

Unlike ReentrantReadWriteLock, StampedLock is not reentrant. It means that acquisition of a lock returns different stamps to each thread that successfully acquired the lock.

This example demonstrates StampedLock.
import java.util.concurrent.locks.*;

public class SampleClass{

  public static void main(String[] args){
  
    StampSample ss = new StampSample("String");
    
    Thread t1 = new Thread(
    () -> ss.readString());
    
    Thread t2 = new Thread(
    () -> ss.readString());
    
    Thread t3 = new Thread(
    () -> ss.setString("MyString"));
    
    Thread t4 = new Thread(
    () -> ss.readString());
    
    t1.start();
    t2.start();
    t3.start();
    t4.start();
    
  }
}

class StampSample{
  
  private final StampedLock lock =
  new StampedLock();
  
  private String str;
  
  StampSample(String str){
  
    this.str = str;
  }
  
  public void setString(String str){
    long stamp = lock.writeLock();
    try{
      System.out.println("Set String...");
      this.str = str;
    }
    finally{
      lock.unlockWrite(stamp);
    }
  }
  
  public void readString(){
    long stamp = lock.readLock();
    try{
      var tName = Thread.currentThread()
                  .getName();
              
      System.out.println(tName + " stamp: "
                         + stamp + "\n"+
                         "readString: " +
                         str);
    }
    finally{
      lock.unlockRead(stamp);
    }
  }
}

Result(may vary)
Thread-1 stamp: 258
readString: String
Thread-0 stamp: 257
readString: String
Thread-3 stamp: 259
readString: String
Set String...
StampedLock has an Optimistic Reading feature. tryOptimisticRead() method returns a non-zero stamp only if the lock is not currently held in write mode. Method validate(long) returns true if the lock has not been acquired in write mode since obtaining a given stamp, in which case all actions prior to the most recent write lock release happen-before actions following the call to tryOptimisticRead.

This mode can be thought of as an extremely weak version of a read-lock, that can be broken by a writer at any time. The use of optimistic read mode for short read-only code segments often reduces contention and improves throughput. However, its use is inherently fragile. Optimistic read sections should only read fields and hold them in local variables for later use after validation.

Fields read while in optimistic read mode may be wildly inconsistent, so usage applies only when you are familiar enough with data representations to check consistency and/or repeatedly invoke method validate(). For example, such steps are typically required when first reading an object or array reference, and then accessing one of its fields, elements or methods.
import java.util.concurrent.locks.*;

public class SampleClass{

  public static void main(String[] args){
  
    OptimisticReading ss =
    new OptimisticReading("String");
    
    Thread t1 = new Thread(
    () -> ss.readString());
    
    Thread t2 = new Thread(
    () -> ss.readString());
    
    Thread t3 = new Thread(
    () -> ss.setString("MyString"));
    
    Thread t4 = new Thread(
    () -> ss.readString());
    
    t1.start();
    t2.start();
    t3.start();
    t4.start();
  }
}

class OptimisticReading{
  
  private final StampedLock lock =
  new StampedLock();
  
  private String str;
  
  OptimisticReading(String str){
  
    this.str = str;
  }
  
  public void setString(String str){
    long stamp = lock.writeLock();
    try{
      System.out.println("Set String...");
      this.str = str;
    }
    finally{
      lock.unlockWrite(stamp);
    }
  }
  
  public void readString(){
    long stamp = lock.tryOptimisticRead();
    try{
       
      if(lock.validate(stamp)){
        stamp = lock.readLock();
        System.out.println(
        Thread.currentThread().getName()
        + " stamp: " + stamp + "\n"+
        "readString: " + str);
      }
    }
    finally{
      if(StampedLock.isReadLockStamp(stamp))
        lock.unlockRead(stamp);
    }
  }
}

Result(may vary)
Thread-3 stamp: 259
readString: String
Thread-1 stamp: 258
readString: String
Thread-0 stamp: 257
readString: String
Set String...
tryOptimisticRead() doesn't block threads. We use this method to verify if there's a thread that acquire the writeLock(). In the readString() method, we use validate() to check if the lock has not been acquired in write mode since obtaining a given stamp.

If it returns true, the lock in write mode hasn't been acquired since the last time tryOptimisticRead() returned a stamp. Then, we continue to execute the body of the if statement. In the finally block, we invoke isReadLockStamp() method.

This methods returns true if the lock comes from readLock(). Otherwise, returns false tryOptimisticRead() is an extremely weak read-lock and it's doesn't need to be unlocked.

As we can see in the example above, we can use tryOptimisticRead() to strategize when a thread is going to acquire readLock().

StampedLock has methods that can convert a lock to another type of lock. In ReentrantReadWriteLock, we can only "downgrade" a write lock to read lock.

This example demonstrates conversion from read lock to write lock.
import java.util.concurrent.locks.*;

public class SampleClass{

  public static void main(String[] args){
    
    MyString ms = new MyString("Apple");
    
    Thread t1 = new Thread(
    () -> ms.getAndSet("Orange"));
    
    Thread t2 = new Thread(
    () -> ms.getAndSet("Avocado"));
    
    t1.start();
    t2.start();
  }
}

class MyString{

  private final StampedLock lock =
  new StampedLock();
  
  private String str;
  
  MyString(String str){
  
    this.str = str;
  }
  
  public void getAndSet(String str){
  
    long stamp = lock.readLock();
    try{
      System.out.println(
      "Current string: " + this.str);
      
      while(true){
      
        long stamp2 = lock
                     .tryConvertToWriteLock(stamp);
                     
        if(stamp2 == 0L){
          lock.unlockRead(stamp);
          stamp = lock.writeLock();
          continue;
        }
        
        stamp = stamp2;
        System.out.println("Changing " + this.str +
        " to " +str);
        this.str = str;
        System.out.println("Done!");
		break;
      }
      
    }
    finally{
      lock.unlock(stamp);
    }
  }
}

Result(may vary)
Current string: Apple
Current string: Apple
Changing Apple to Orange
Done!
Changing Orange to Avocado
Done!
In the getAndSet() method, we converted a read lock to write lock. First off, we took the read lock via readLock() method. Then, in the while loop, we first attempted to convert the lock into write lock. tryConvertToWriteLock() returns non-zero stamp and release the read lock if the conversion is a success. Otherwise, only returns 0.

If the conversion was a success, value of str would change. Otherwise, the write lock might be in use or not available. Thus, we manually released the read lock, waited for the write lock to be available via writeLock() and invoked tryConvertToWriteLock() again. tryConvertToWriteLock() returns the write lock with new stamp if the lock is already a write lock.

In the finally block, we use the unlock() method. This method match the lock of the calling lock to the specified stamp. IllegalMonitorStateException is thrown if the lock is not compatible with the lock of the specified stamp.

StampedLock supports coordinated usage across multiple lock modes, this class does not directly implement the Lock or ReadWriteLock interfaces. However, a StampedLock may be viewed asReadLock(), asWriteLock(), or asReadWriteLock() in applications requiring only the associated set of functionality.

Condition

Condition factors out the Object monitor methods (wait, notify and notifyAll) into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock implementations. Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods.

Conditions (also known as condition queues or condition variables) provide a means for one thread to suspend execution (to "wait") until notified by another thread that some state condition may now be true. Because access to this shared state information occurs in different threads, it must be protected, so a lock of some form is associated with the condition.

The key property that waiting for a condition provides is that it atomically releases the associated lock and suspends the current thread, just like Object.wait. A Condition instance is intrinsically bound to a lock. To obtain a Condition instance for a particular Lock instance use its newCondition() method.
import java.util.concurrent.locks.*;

public class SampleClass{

  public static void main(String[]args){
    Counter c1 = new Counter();
    
    //ThreadA
    new Thread(() ->{
      try{
        c1.consume();
      }
      catch(InterruptedException e){
        e.printStackTrace();
      }
    }).start();
    
    //ThreadB
    new Thread(() ->{
      try{
        c1.produce();
      }
      catch(InterruptedException e){
        e.printStackTrace();
      }
    }).start();
    
  }
}

class Counter{
  private final Lock lock =
  new ReentrantLock();
  private final Condition consCond =
  lock.newCondition();
  private final Condition prodCond =
  lock.newCondition();
  
  private int num = 1;
  private boolean isProduced = false;
  
  void consume() throws InterruptedException{
    lock.lock();
    try{
      if(!isProduced)
        consCond.await();
	  
      System.out.println("Consumed number: " + num);
      prodCond.signal();
      isProduced = false;
    }
    finally{
      lock.unlock();
    }
  }
  
  void produce() throws InterruptedException{
    lock.lock();
    try{
      if(isProduced)
        prodCond.await();
    
      num += num;
      System.out.println("Produced number: " + num);
      isProduced = true;
      consCond.signal();
    }
    finally{
      lock.unlock();
    }
  }
}

Result
Produced number: 2
Consumed number: 2
The "wait and notify" version of this example can be found in this blogpost.

Note that Condition instances are just normal objects and can themselves be used as the target in a synchronized statement, and can have their own monitor wait and notify methods invoked. Acquiring the monitor lock of a Condition instance, or using its monitor methods, has no specified relationship with acquiring the Lock associated with that Condition or the use of its waiting and signalling methods.

It is recommended that to avoid confusion you never use Condition instances in this way, except perhaps within their own implementation. Except where noted, passing a null value for any parameter will result in a NullPointerException being thrown.

TimeUnit

A TimeUnit represents time durations at a given unit of granularity and provides utility methods to convert across units, and to perform timing and delay operations in these units. A TimeUnit does not maintain time information, but only helps organize and use time representations that may be maintained separately across various contexts.

A nanosecond is defined as one thousandth of a microsecond, a microsecond as one thousandth of a millisecond, a millisecond as one thousandth of a second, a minute as sixty seconds, an hour as sixty minutes, and a day as twenty four hours. In the concurrent package, there are methods that use TimeUnit for timing purposes. For example, tryLock() and await() have forms that use TimeUnit.
...
private Lock lock = new ReentrantLock();
...
if(lock.tryLock(500L, TimeUnit.MILLISECONDS)){...}
...
In the pseudo-code above, tryLock() acquires the lock if it is not held by another thread within the given waiting which is five hundred milliseconds.
...
private Lock lock = new ReentrantLock();
Condition cond = lock.newCondition();
...
cond.await(2L, TimeUnit.SECONDS);
...
In the pseudo-code above, await() causes the current thread to wait until it is signalled or interrupted, or the specified waiting time elapses. The waiting time in the pseudo-code above is two seconds. Take note that there is no guarantee that a particular timeout implementation will be able to notice the passage of time at the same granularity as the given TimeUnit.

TimeUnit has utility methods that can be found in the documentation.

Executors

Executors executes tasks in Runnable or Callable form. Executors can execute tasks in sequence by using a single thread or asynchronously by using multiple threads. In this topic, we will learn different types of Executors.

Executor

Executor executes submitted Runnable tasks. This interface provides a way of decoupling task submission from the mechanics of how each task will be run, including details of thread use, scheduling, etc. An Executor is normally used instead of explicitly creating threads.
import java.util.concurrent.Executor;

public class SampleClass{

  public static void main(String[] args){
  
    Executor exec = new TaskExecutor();
    
    for(int i = 0; i < 5; i++)
      exec.execute(
      () -> System.out.println(
            Thread.currentThread().getName()
            +": Doing task...")
      );
  }
}

class TaskExecutor implements Executor{

  @Override
  public void execute(Runnable command){
    command.run();
  }
}

Result
main: Doing task...
main: Doing task...
main: Doing task...
main: Doing task...
main: Doing task...
In the example above, we don't need to explicitly create a thread in order to run mutiple Runnable commands. Take note, the Executor interface does not strictly require that execution be asynchronous. In the simplest case, an executor can run the submitted task immediately in the caller's thread.

typically, tasks are executed in some thread other than the caller's thread. The executor below spawns a new thread for each task.
class TaskExecutor implements Executor{

  @Override
  public void execute(Runnable command){
    new Thread(command).start();
  }
}
ExecutorService

ExecutorService provides methods to manage termination and methods that can produce a Future for tracking progress of one or more asynchronous tasks. Future simply represents the result of an asynchronous computation.

An ExecutorService can be shut down, which will cause it to reject new tasks. Two different methods are provided for shutting down an ExecutorService. The shutdown() method will allow previously submitted tasks to execute before terminating, while the shutdownNow() method prevents waiting tasks from starting and attempts to stop currently executing tasks.

Upon termination, an executor has no tasks actively executing, no tasks awaiting execution, and no new tasks can be submitted. An unused ExecutorService should be shut down to allow reclamation of its resources.

This example demonstrates ExecutorService.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SampleClass{

  public static void main(String[] args){
  
    ExecutorService es = Executors.
                         newSingleThreadExecutor();
                         
    es.execute(
    () -> System.out.println("Task1..."));
    
    es.execute(
    () -> System.out.println("Task2..."));
    
    es.execute(
    () -> System.out.println("Task3..."));
    
    es.execute(
    () -> System.out.println("Task4..."));
    
    es.shutdown();
  }
}
In the example above, we use the Executors class. This class provides factory and utility methods for ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable classes defined in this package. newSingleThreadExecutor() creates a single thread that executes a queue(unbounded) of tasks. Tasks are guaranteed to execute sequentially, and no more than one task will be active at any given time.

Typically, we want to execute tasks asynchronously.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SampleClass{

  public static void main(String[] args){
  
    ExecutorService es = Executors.
                         newFixedThreadPool(4);
                         
    es.execute(
    () -> System.out.println(
    Thread.currentThread().getName()
    + ": Task1..."));
    
    es.execute(
    () -> System.out.println(
    Thread.currentThread().getName()
    + ": Task2..."));
    
    es.execute(
    () -> System.out.println(
    Thread.currentThread().getName()
    + ": Task3..."));
    
    es.execute(
    () -> System.out.println(
    Thread.currentThread().getName()
    + ": Task4..."));
    
    es.shutdown();
  }
}

Result(may vary)
pool-1-thread-3: Task3...
pool-1-thread-1: Task1...
pool-1-thread-4: Task4...
pool-1-thread-2: Task2...
newFixedThreadPool(int nThreads) creates a pool of threads that executes a queue(unbounded) of tasks. nThreads is the number of threads in the pool. At any point, threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available.

ScheduledExecutorService

ScheduledExecutorService is an ExecutorService that can schedule commands to run after a given delay, or to execute periodically.

The schedule methods create tasks with various delays and return a task object that can be used to cancel or check execution. The scheduleAtFixedRate and scheduleWithFixedDelay methods create and execute tasks that run periodically until cancelled.

Commands submitted using the Executor.execute(Runnable) and ExecutorService submit methods are scheduled with a requested delay of zero. Zero and negative delays (but not periods) are also allowed in schedule methods, and are treated as requests for immediate execution.

This example demonstrates ScheduledExecutorService. This example prints "Do command..." every 2 seconds for 12 seconds.
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class SampleClass{

  public static void main(String[] args){
  
    ScheduledExecutorService ses = 
    Executors.newScheduledThreadPool(1);
    
    Runnable command = () -> 
    System.out.println("Do command...");
    
    ScheduledFuture<?> task = 
    ses.scheduleWithFixedDelay(command, 2,
                      2, TimeUnit.SECONDS);
                      
    Runnable canceller = () ->
    task.cancel(false);
    
    //This method doesn't block main thread
    ses.schedule(canceller, 12, TimeUnit.SECONDS);
    
    while(true){
      if(task.isCancelled()){
        ses.shutdown();
        break;
      }
    }
    
  }
}

Result
Do command...
Do command...
Do command...
Do command...
Do command...
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
schedules task after an initial delay. Execution of task starts right after the previous task is completed, including delays. For example, period is set to two seconds and the first execution starts at two seconds and gets delayed by 1 second. If this is the case then the next execution starts at 5 seconds, then the next one starts at 7 seconds and so on.

This figure demonstrates scheduleWithFixedDelay's action in the example above.
Task:    |----|#--------#-----#
Seconds: 0--1--2--3--4--5--6--7
|----| is the initial delay time.
# is the executed task.

scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
schedules task after an initial delay. Execution of task respects the initial schedule regardless of delays. For example, period is set to two seconds and the first execution starts at two seconds and gets delayed by 1 second. If this is the case then the next execution starts at 5 seconds but the next one starts at 6 seconds and if the current task doesn't have unexpected delay, the next one starts at 8 seconds.

This figure demonstrates scheduleAtFixedRate's action if it's used in the example above.
Task:    |----|#--------#--#-----#
Seconds: 0--1--2--3--4--5--6--7--8
|----| is the initial delay time.
# is the executed task.

In the example above, if we had changed scheduleWithFixedDelay with scheduleAtFixedRate, the result would have six "Do command..." prints.

schedule(Runnable command, long delay, TimeUnit unit)
Submits a one-shot task that becomes enabled after the given delay. This method returns a ScheduledFuture object. ScheduledFuture is a Future for scheduling task.

ExecutorCompletionService

ExecutorCompletionService is a CompletionService that uses a supplied Executor to execute tasks. This class arranges that submitted tasks are, upon completion, placed on a queue accessible using take(). The class is lightweight enough to be suitable for transient use when processing groups of tasks.
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutionException;
import java.util.ArrayList;

public class SampleClass{

  public static void main(String[] args)
            throws InterruptedException, 
                      ExecutionException{
    
    ArrayList<Callable<Integer>>
    ar1 = new ArrayList<>();
    
    ArrayList<Integer> ar2 =
    new ArrayList<>();
    
    ar2.add(4);
    ar2.add(8);
    ar2.add(12);
    ar2.add(16);
    ar2.add(20);
    
    for(Integer i : ar2)
      ar1.add(() -> i*i);
    
    ExecutorService es = Executors.
                         newFixedThreadPool(3);
    ExecutorCompletionService<Integer> ecs =
    new ExecutorCompletionService<>(es);
    
    ar1.forEach(ecs::submit);
    es.shutdown();
    
    Object squared = null;
    int sum = 0;
    System.out.print("Squared: ");
    for(int i = 0; i < ar1.size(); i++){
      squared = ecs.take().get();
      sum += Integer.parseInt(squared.toString());
      System.out.print(squared + " ");
    }
    
    System.out.println("\nSum: " + sum);
  }
}

Result(may vary)
Squared: 144 16 64 256 400
Sum: 880
In the example above, we create two ArrayList: ar1 and ar2. ar1 stores Callable that we will use to compute the squared of numbers of ar2. Then, we create ExecutorsService with three threads. These threads will execute the Callable in ar1.

Then, we create ExecutorCompletionService using this constructor: ExecutorCompletionService(Executor executor)
This constructor Creates an ExecutorCompletionService using the supplied executor for base task execution and a LinkedBlockingQueue as a completion queue. In other words, executor executes tasks or Callable tasks and then all Future of those tasks are stored in a LinkedBlockingQueue.

Then, we use forEach() to submit tasks to ExecutorCompletionService. Once all tasks are done, we can shutdown ExecutorService. Then, we exhaust the LinkedBlockingQueue by using the take() method. This method retrieves and removes the Future representing the next completed task, waiting if none are yet present.

ForkJoinPool

ForkJoinPool is an ExecutorService for running ForkJoinTask. This executor employs work-stealing algorithm. This algorithm allows free threads to steal tasks of busy threads. ForkJoinPool can recursively break a task into sets of subtasks. Each thread of this executor has their own container(double-ended queue) for tasks that are assigned to them to execute.

ForkJoinTask has two subclasses that we can use to create tasks for ForkJoinPool: RecursiveAction and RecursiveTask. RecursiveAction is a task that doesn't return result whereas RecursiveTask is a task that returns result.

This example demonstrates ForkJoinPool.
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;  

public class SampleClass{

  public static void main(String[] args){
  
    int procNum = Runtime.getRuntime()
                  .availableProcessors();
    
    System.out.print("This machine's "+
                       "Processor count: ");
    System.out.print(procNum);
    
    System.out.println("\nCreating "+
                       "ForkJoinPool...");
    ForkJoinPool fjp = new ForkJoinPool();
    
    System.out.println("All worker "+
                       "threads idle?:"+
                       fjp.isQuiescent());
                       
    int result = fjp.invoke(new AddNumbers(2));
    System.out.println("Sum: " + result);
  }
}

class AddNumbers extends RecursiveTask<Integer>{

  private final Integer num;
  
  AddNumbers(Integer num){
    this.num = num;
  }
  
  protected Integer compute(){
    
    Integer result = 0;
    
    if(num > 0){
      if(num == 4)
        return num;
      
      AddNumbers s1 = new AddNumbers(num+1);
      s1.fork();
      AddNumbers s2 = new AddNumbers(num+1);
      result = s2.compute() + s1.join();
      System.out.println(Thread.
      currentThread().getName() + ": " +
      result);
    }
        
    return result;
  }
}

Result
This machine's Processor count: 4
Creating ForkJoinPool...
All worker threads idle?:true
ForkJoinPool-1-worker-1: 8
ForkJoinPool-1-worker-2: 8
ForkJoinPool-1-worker-1: 16
Sum: 16
In the compute() method, I recursively break the task into sets of subtasks. fork() asynchronously execute the task in the current pool, if applicable. In other words, fork() reschedules the task to another thread in the current pool.

join() returns the result of the computation when it is done. This method differs from get() in that abnormal completion results in RuntimeException or Error, not ExecutionException, and that interrupts of the calling thread do not cause the method to abruptly return by throwing InterruptedException.

Next, let's discuss the content of the main() method. availableProcessors() returns the number of processors available to the Java virtual machine. As the time of this writing, my processor has two physical cores and four logical cores.

The constructor ForkJoinPool() Creates a ForkJoinPool with parallelism equal to availableProcessors(). and using defaults. for all other parameters. A static commonPool() method can be used to create a ForkJoinPool instance. In fact, commonPool() suffices for most applications and normally reduces resource usage (its threads are slowly reclaimed during periods of non-use, and reinstated upon subsequent use).

isQuiescent() returns true if all worker threads are currently idle. An idle worker is one that cannot obtain a task to execute because none are available to steal from other threads, and there are no pending submissions to the pool. This method is conservative; it might not return true immediately upon idleness of all threads, but will eventually become true if threads remain inactive.

invoke() performs the given task, returning its result once the task is finished. Take note that worker threads of ForkJoinPool are Daemon threads.

ThreadFactory

ThreadFactory is a class that creates new threads on demand. Using thread factories removes hardwiring of calls to new Thread, enabling applications to use special thread subclasses, priorities, etc.

This example demonstrates ThreadFactory.
import java.util.concurrent.ThreadFactory;

public class SampleClass{

  public static void main(String[] args){
    DaemonThreadFactory factory =
    new DaemonThreadFactory();
    
    Runnable command = () -> 
    {
      System.out.println(
      Thread.currentThread().getName());
    };
    
    for(int i = 0; i < 4; i++)
      factory.newThread(command).start();
      
    try{Thread.sleep(100);}
    catch(InterruptedException e){
      e.printStackTrace();
    }
    
  }
}

class DaemonThreadFactory implements ThreadFactory{

  @Override
  public Thread newThread(Runnable runnable){
    Thread thread = new Thread(runnable);
    thread.setDaemon(true);
    return thread;
  }

}

Result(may vary)
Thread-0
Thread-3
Thread-2
Thread-1
There are methods in some classes that accept ThreadFactory as argument.
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SampleClass{

  public static void main(String[] args){
    DaemonThreadFactory factory =
    new DaemonThreadFactory();
    
    ExecutorService es = Executors.
                         newFixedThreadPool(4, factory);
    
    Runnable command = () -> 
    {
      System.out.println(
      Thread.currentThread().getName());
    };
    
    for(int i = 0; i < 4; i++)
      es.execute(command);
    es.shutdown();
    
    while(!es.isTerminated());
  }
}

class DaemonThreadFactory implements ThreadFactory{

  @Override
  public Thread newThread(Runnable runnable){
    Thread thread = new Thread(runnable);
    thread.setDaemon(true);
    return thread;
  }

}

Result(may vary)
Thread-1
Thread-3
Thread-2
Thread-0

Future

A Future represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation. The result can only be retrieved using method get when the computation has completed, blocking if necessary until it is ready. Cancellation is performed by the cancel method.

Additional methods are provided to determine if the task completed normally or was cancelled. Once a computation has completed, the computation cannot be cancelled. If you would like to use a Future for the sake of cancellability but not provide a usable result, you can declare types of the form Future<?> and return null as a result of the underlying task.

This example demonstrates Future.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutionException;

public class SampleClass{

  public static void main(String[] args)
                throws InterruptedException,
                       ExecutionException{
  
    ExecutorService es =
    Executors.newSingleThreadExecutor();
    
    Callable<Integer> task = 
    () -> {
    int result = 0;
    
    for(int i = 0; i < 5; i++)
      result += i*i;
      
    return result;
    };
    
    Future<Integer> future = es.submit(task);
    
    System.out.println("Result: " + 
                        future.get());
    es.shutdown();
  }
}

Result
Result: 30
In the example above, we use Callable to create a task. Callable is just like Runnable but Callable can return a result from the task. submit() method returns a Future object that can be used to get result from a task or cancel a task. get() method blocks the thread that invokes it, if the task is not complete and once the task is complete the result of the task will be returned.

This example demonstrates task cancellation using Future.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutionException;

public class SampleClass{
 
  static volatile int num = 0; 
 
  public static void main(String[] args){
  
    ExecutorService es =
    Executors.newFixedThreadPool(2);
    
    Runnable task = 
    () -> {
      try{
        while(true){
          num += 2;
          Thread.sleep(250);
        }
      }
      catch(InterruptedException e){
        System.out.println(Thread.
        currentThread().getName() +
        " is interrupted.");
      }
    };
    
    Future<?> future1 = es.submit(task);
    Future<?> future2 = es.submit(task);
    
    while(!future1.isCancelled() ||
          !future2.isCancelled()){
      if(num > 12){
        future1.cancel(true);
        future2.cancel(true);
      }
    }
    
    es.shutdown();
  }
}

Result
pool-1-thread-1 is interrupted.
pool-1-thread-2 is interrupted.
cancel() method attempts to cancel execution of a task. This method has no effect if the task is already completed or cancelled, or could not be cancelled for some other reason. Otherwise, if the task has not started when cancel is called, this task should never run.

If the task has already started and the argument of cancel() is set to true, cancel() will interrupt the thread that executing the task. Otherwise, in-progress tasks are allowed to complete.

FutureTask

FutureTask is similar to Future. However, Future is an interface wherease FutureTask is a class that can be instantiated. This class can wrap Callable or Runnable that can be submitted to executors.

This example demonstrates FutureTask.
import java.util.concurrent.FutureTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutionException;

public class SampleClass{

  public static void main(String[] args)
                throws InterruptedException,
                       ExecutionException{
    
    int threadCount = 3;
    ExecutorService es =
    Executors.newSingleThreadExecutor();
    
    FutureTask<?> ft1 =
    new FutureTask<>(() ->
    {
      System.out.println("A Runnable task...");
    },null);
    
    FutureTask<Integer> ft2 =
    new FutureTask<>(() ->
    {
      System.out.println("A Callable task...");
      return 0;
    });
    
    es.execute(ft1);
    es.execute(ft2);
    
    Object obj = ft1.get();
    int result = ft2.get();
    
    System.out.println("ft1 result: " + obj);
    System.out.println("ft2 result: " + result);
    
    es.shutdown();
    
  }
}

Result(may vary)
A Runnable task...
A Callable task...
ft1 result: null
ft2 result: 0
In ft1, we use this constructor:
FutureTask(Runnable runnable, V result)
runnable is a Runnable task and result is the fixed result everytime we execute ft1.

In ft2, we use this constructor:
FutureTask(Callable>V< callable)
callable is a Callable task.

FutureTask implements RunnableFuture<V> which implements Runnable that's why FutureTask can be passed as argument in execute() method.

Synchronizers

Synchronizers provide aid for creating a synchronized system. If you need another layer of synchronization, You may consider using one of the Classes in this topic that suits your need.

Semaphore

Semaphore in the concurrent package is a counting semaphore. Conceptually, a semaphore maintains a set of permits. Each acquire() blocks if necessary until a permit is available, and then takes it. Each release() adds a permit, potentially releasing a blocking acquirer.

However, no actual permit objects are used; the Semaphore just keeps a count of the number available and acts accordingly. Semaphores are often used to restrict the number of threads than can access some (physical or logical) resource.

This example demonstrates Semaphore.
import java.util.concurrent.Semaphore;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SampleClass{

  public static void main(String[] args){
    int threadCount = 3;
    Counter counter = new Counter(2,1);
    
    ExecutorService es = Executors.
    newFixedThreadPool(threadCount);
    
    for(int i = 0; i < threadCount; i++)
      es.execute(
      () -> counter.readAndIncrement());
    es.shutdown();
  }
}

class Counter{
  
  final private Semaphore semaphore;
  private int count = 0;
  
  Counter(int permitNum, int count){
    this.count = count;
    semaphore = new Semaphore(permitNum);
  }
  
  void readAndIncrement(){
    readCount();
    increment();
  }
  
  private void readCount(){
    try{semaphore.acquire();}
    catch(InterruptedException e){
      e.printStackTrace();
    }
    System.out.println(Thread.currentThread().
                       getName() + ": " + count);
  }
  
  private synchronized void increment(){
    count++;
    semaphore.release();
  }
  
}

Result(may vary)
pool-1-thread-1: 1
pool-1-thread-2: 1
pool-1-thread-3: 2
In the example above, thread1 and thread2 access Counter object at the same time. As you can see from the result, thread1 and thread2 have the same read result. Then, thread3 is blocked because there can be only two threads that can access Counter. Once one of the two threads that are accessing Counter releases a permit, thread3 will get that permit and access Counter.

acquire() acquires a permit if there's an available permit. Otherwise, the thread that calls this method will be blocked until a permit has been released or the thread is interrupted.

release() releases a permit, increasing the number of available permits by one. If any threads are trying to acquire a permit, then one is selected and given the permit that was just released. That thread is (re)enabled for thread scheduling purposes.

There is no requirement that a thread that releases a permit must have acquired that permit by calling acquire(). Correct usage of a semaphore is established by programming convention in the application.

CountDownLatch

CountDownLatch is a synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes. A CountDownLatch is initialized with a given count.

The await methods block until the current count reaches zero due to invocations of the countDown() method and then all waiting threads are released and any subsequent invocations of await return immediately.

This example demonstrates CountDownLatch.
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SampleClass{

  public static void main(String[] args)
                throws InterruptedException{
    int threadCount = 3;
    CountDownLatch cdl = new CountDownLatch(threadCount);
    
    ExecutorService es = Executors.
    newFixedThreadPool(threadCount);
    
    for(int i = 0; i < threadCount; i++){
      es.execute(
      () -> {
        System.out.println(Thread.currentThread().
        getName() + " before await()");
        cdl.countDown();
      });
    }
    
    cdl.await();
    
    for(int i = 0; i < threadCount; i++){
      es.execute(
      () -> {
        System.out.println(Thread.currentThread().
        getName() + " after await()");
      });
    }
    es.shutdown();
  }
}

Result(may vary)
pool-1-Thread-3 before await()
pool-1-Thread-2 before await()
pool-1-Thread-1 before await()
pool-1-Thread-3 after await()
pool-1-Thread-1 after await()
pool-1-Thread-2 after await()
If you remove CountDownLatch in the example above, the messages before await() and after await() will probably shuffle.

CyclicBarrier

CyclicBarrier is a synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.

This example demonstrates CyclicBarrier.
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SampleClass{

  public static void main(String[] args){
    int threadCount = 3;
    
    Runnable barrierAction = () ->
    System.out.println("executing barrierAction");
    
    CyclicBarrier cb = 
    new CyclicBarrier(threadCount,barrierAction);
    
    ExecutorService es = Executors.
    newFixedThreadPool(threadCount);
    
    for(int i = 0; i < threadCount; i++){
      es.execute(
      () -> {
        System.out.println(Thread.currentThread().
        getName() + " before await()");
        
        try{cb.await();}
        catch(InterruptedException |
              BrokenBarrierException e){
              e.printStackTrace();
        }
        
        System.out.println(Thread.currentThread().
        getName() + " after await()");
        
      });
    }
    es.shutdown();
  }
}

Result(may vary)
pool-1-Thread-1 before await()
pool-1-Thread-3 before await()
pool-1-Thread-2 before await()
executing barrierAction
pool-1-Thread-2 after await()
pool-1-Thread-1 after await()
pool-1-Thread-3 after await()
First off, await() blocks threads that invoke it. Then, we create a CyclicBarrier instance using this constructor:
CyclicBarrier(int parties, Runnable barrierAction)
parties is the number of threads that need to invoke await() of CyclicBarrier in order to trip the barrier. Once the barrier is tripped, or in other words, once the number of threads that invoke await() is equal to the parties, all threads that invoke await() will be active again.

barrierAction is a Runnable that is invoked by the last thread that is going to trip the barrier. In the example above, once the barrier is tripped, barrierAction is invoked first before all threads that invoke await() become active again.

As you can see, CyclicBarrier is similar to CountDownLatch. However, CyclicBarrier instance is reusable unlike CountDownLatch instance.

Phaser

Phaser is a synchronization aid that is similar to CountDownLatch and CyclicBarrier. Unlike those two, number of parties(threads) may vary over time. Phaser can break executions into Phases.

This example demonstrates Phaser.
import java.util.concurrent.Phaser;

public class SampleClass{

  public static void main(String[] args){
  
    Phaser ph = new Phaser();
    int threadCount = 3;
    Counter c = new Counter(threadCount);
    
    for(int i = 0; i < threadCount; i++)
      new Thread(
      () -> {
        ph.register();
        while(true){
          int phase = ph.getPhase();
          if(c.max(phase))
            break;
          ph.arriveAndAwaitAdvance();
          
        }
        ph.arriveAndDeregister();
      }).start();
    
  }
}

class Counter{

  private int count = 0;
  private int maxThread = 0;
  
  Counter(int maxThread){
    this.maxThread = maxThread;
  }
  
  synchronized boolean max(int phase){
    count++;
    
    System.out.println(Thread
    .currentThread()
    .getName() + " | Phase" + phase);
    if(count >= maxThread){
      count = 0;
      maxThread -= 1;
      return true;
    }
    else return false;
  }
  
}

Result(may vary)
Thread-0 | Phase0
Thread-1 | Phase0
Thread-2 | Phase0
Thread-0 | Phase1
Thread-1 | Phase1
Thread-0 | Phase2
In the example above, we create a Phaser instance using this constructor:
Phaser()
This constructor creates a new phaser with no initially registered parties, no parent, and initial phase number 0.

register() opens up a slot in the Phaser for threads that may wanna join in the Phaser. To move on to the next phase, the number of slots that register() opens must match the number of threads that arrive(join) in the Phaser. If you want to open multiple slots use bulkRegister(int parties) method.

arriveAndAwaitAdvance() is equivalent to this combination: awaitAdvance(arrive())
A thread that invokes arrive() uses one slot in the Phaser. This method doesn't block the thread. A thread that invokes arrive() when there's no available slot is considered as unregistered party(thread). This scenario may cause IllegalStateException.

A thread that invokes awaitAdvance(int phase) will be blocked until the Phaser moves on to the next phase. This method will return immediately if the current phase is not equal to the given phase value or this phaser is terminated.

A thread that invokes arriveAndDeregister() uses one slot in the Phaser and reduces total available slot by 1 in the next phase. Deregistration reduces the number of parties required to advance in future phases. A Phaser will terminate if the total available slot reaches 0.

We can create a tree of Phasers by using these constructors:
Phaser(Phaser parent)
Phaser(Phaser parent, int parties)

This example demonstrates the constructor Phaser(Phaser parent)
import java.util.concurrent.Phaser;

public class SampleClass{

  public static void main(String[] args){
    Phaser ph = new Phaser(1);
    Phaser child = new Phaser(ph);
    
    for(int i = 0; i < 3; i++)
      new Thread(() -> 
      {
        child.register();
        while(child.getPhase() < 2){
          System.out.println(Thread
          .currentThread()
          .getName() + " | Child | Phase"
          + child.getPhase());
          child.arriveAndAwaitAdvance();
        }
        child.arriveAndDeregister();
      }).start();
      
    new Thread(() -> 
    {
      while(ph.getPhase() < 2){
        System.out.println(Thread
        .currentThread()
        .getName() + " | Parent | Phase"
        + ph.getPhase());
        ph.arriveAndAwaitAdvance();
      }
      ph.arriveAndDeregister();
    }).start();
  }
}

Result(may vary)
Thread-0 | Child | Phase0
Thread-1 | Child | Phase0
Thread-2 | Child | Phase0
Thread-3 | Parent | Phase0
Thread-3 | Parent | Phase1
Thread-2 | Child | Phase1
Thread-0 | Child | Phase1
Thread-1 | Child | Phase1
In the example above, we see that child and ph share the same phase number. That's because once there's an open slot in child, child Phaser will be registered with its parent. This means that child won't go to the next phase if its parent ph can't go to the next phase. If we remove the thread that uses ph's slot, child won't move to the next phase even all of its slots are occupied.

Exchanger

Exchanger is a synchronization point at which threads can pair and swap elements within pairs. Each thread presents some object on entry to the exchange method, matches with a partner thread, and receives its partner's object on return.
import java.util.concurrent.Exchanger;

public class SampleClass{

  public static void main(String[] args){
    
    Exchanger<Integer> ex =
    new Exchanger<>();
    
    new Thread(() ->
    {
      int num = 1;
      
      for(int i = 1; i < 4; i++)
        num += i;
      
      try{
        System.out.println(Thread
        .currentThread()
        .getName() + " | Original value: " +
        num);
        num = ex.exchange(num);
      }
      catch(InterruptedException e){
        e.printStackTrace();
      }
      
      System.out.println(Thread
      .currentThread()
      .getName() + " | exchange value: " +
      num);
      
    },"Thread1").start();
    
    new Thread(() ->
    {
      int num = 1;
      
      for(int i = 1; i < 4; i++)
        num *= i;
        
      try{
        System.out.println(Thread
        .currentThread()
        .getName() + " | Original value: " +
        num);
        num = ex.exchange(num);
      }
      catch(InterruptedException e){
        e.printStackTrace();
      }
      
      System.out.println(Thread
      .currentThread()
      .getName() + " | Exchange value: " +
      num);
      
    },"Thread2").start();
    
  }
}

Result(may vary)
Thread1 | Original value: 7
Thread2 | Original value: 6
Thread1 | Exchange value: 6
Thread2 | Exchange value: 7
exchange() method blocks the calling thread and waits until another thread calls exchange() of the same Exchanger instance.

Queues

concurrent package has thread-safe queues. Some of them are non-blocking like ConcurrentLinkedDeque; some are blocking like LinkedBlockingQueue. These queues are optimized for asynchronous tasks and can be used as alternative to synchronized queues synchronized by synchronization methods in Collections class. Moveover, synchronized queues synchronized by synchronization methods in Collections class is only governed by a single exclusion lock.

I assume readers are knowledgeable about the Collections Framework before reading this topic.

ConcurrentLinkedQueue

ConcurrentLinkedQueue is an unbounded thread-safe queue based on linked nodes. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time.

The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue. A ConcurrentLinkedQueue is an appropriate choice when many threads will share access to a common collection.

Like most other concurrent collection implementations, this class does not permit the use of null elements. Beware that, unlike in most collections, the size method is NOT a constant-time operation. Because of the asynchronous nature of these queues, determining the current number of elements requires a traversal of the elements, and so may report inaccurate results if this collection is modified during traversal.

Bulk operations that add, remove, or examine multiple elements like addAll() orforeach() methods are not guaranteed to be performed atomically. For example, a forEach traversal concurrent with an addAll operation might observe only some of the added elements. Atomic operation simply means an operation where its result can immediately be seen by other threads.

Iterators are weakly consistent, returning elements reflecting the state of the queue at some point at or since the creation of the iterator. They do not throw ConcurrentModificationException, and may proceed concurrently with other operations. Elements contained in the queue since the creation of the iterator will be returned exactly once.

Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into a ConcurrentLinkedQueue happen-before actions subsequent to the access or removal of that element from the ConcurrentLinkedQueue in another thread.

This example demonstrates ConcurrentLinkedQueue.
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SampleClass{

  public static void main(String[] args){
    
    ExecutorService es = 
    Executors.newFixedThreadPool(4);
    
    ConcurrentLinkedQueue<Integer> clq =
    new ConcurrentLinkedQueue<>();
    
    clq.add(2);
    clq.add(4);
    clq.add(6);
    clq.add(8);
    
    while(!clq.isEmpty())
      es.execute(() -> 
      {
        Integer num = clq.poll();
        if(num != null)
          System.out.println(Thread
          .currentThread()
          .getName() + " | " +
          num + " * " + num +
          " = " + (num*num));  
      });
      
   es.shutdown(); 
  }
}

Result(may vary)
pool-1-thread-2 | 4 * 4 = 16
pool-1-thread-3 | 6 * 6 = 36
pool-1-thread-1 | 2 * 2 = 4
pool-1-thread-4 | 8 * 8 = 64

ConcurrentLinkedDeque

ConcurrentLinkedDeque is an unbounded concurrent deque based on linked nodes. Concurrent insertion, removal, and access operations execute safely across multiple threads. A ConcurrentLinkedDeque is an appropriate choice when many threads will share access to a common collection. Like most other concurrent collection implementations, this class does not permit the use of null elements. Iterators and spliterators are weakly consistent.

Beware that, unlike in most collections, the size method is NOT a constant-time operation. Because of the asynchronous nature of these deques, determining the current number of elements requires a traversal of the elements, and so may report inaccurate results if this collection is modified during traversal.

Bulk operations that add, remove, or examine multiple elements, such as addAll() or forEach() methods are not guaranteed to be performed atomically. For example, a forEach traversal concurrent with an addAll operation might observe only some of the added elements. Atomic operation simply means an operation where its result can immediately be seen by other threads.

Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into a ConcurrentLinkedDeque happen-before actions subsequent to the access or removal of that element from the ConcurrentLinkedDeque in another thread.

This example demonstrates ConcurrentLinkedDeque.
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SampleClass{

  public static void main(String[] args){
    ExecutorService es = 
    Executors.newFixedThreadPool(4);
    
    ConcurrentLinkedDeque<Integer> cld =
    new ConcurrentLinkedDeque<>();
    
    cld.add(2);
    cld.add(4);
    cld.add(6);
    cld.add(8);
    
    while(!cld.isEmpty())
      es.execute(() -> 
      {
        Integer num = cld.pollFirst();
        if(num != null)
          System.out.println(Thread
          .currentThread()
          .getName() + " | " +
          num + " * " + num +
          " = " + (num*num));  
      });
      
   es.shutdown(); 
  
  }
}

Result(may vary)
pool-1-thread-1 | 2 * 2 = 4
pool-1-thread-4 | 8 * 8 = 64
pool-1-thread-2 | 4 * 4 = 16
pool-1-thread-3 | 6 * 6 = 36

LinkedBlockingQueue

LinkedBlockingQueue is an optionally-bounded blocking queue based on linked nodes. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time.

New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue. Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.

The optional capacity bound constructor argument serves as a way to prevent excessive queue expansion. The capacity, if unspecified, is equal to Integer.MAX_VALUE. Linked nodes are dynamically created upon each insertion unless this would bring the queue above capacity.

This example demonstrates LinkedBlockingQueue.
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SampleClass{

  public static void main(String[] args){
    ExecutorService es = 
    Executors.newFixedThreadPool(2);
    
    LinkedBlockingQueue<Integer> lbq =
    new LinkedBlockingQueue<>(5);
    
    lbq.add(2);
    lbq.add(4);
    lbq.add(6);
    lbq.add(8);
    lbq.add(10);
    
    System.out.println("Size: " + lbq.size());
    es.execute(() -> 
    {
      try{lbq.put(12);}
      catch(InterruptedException e){
        e.printStackTrace();
      }
      System.out.println("12 is added to the queue.");
    });
    
    es.execute(() ->
    System.out.println(lbq.poll() + " is retrieved."));
    
    es.shutdown();
  
    //wait for all tasks to be completed
    while(!es.isTerminated());
  
    System.out.println("Size: " + lbq.size());
  }
  
}

Result(may vary)
Size: 5
2 is retrieved
12 is added to the queue
Size: 5
put() blocks the thread that invokes it, if the queue is full. The blocked thread will be notified once the queue has some space available and a new element will be added to the queue. Try adding new element using put() in a full LinkedBlockingQueue without removing any element first and the thread that calls put() will wait forever unless you decide to interrupt it or it's interrupted for some unknown reason.

Another interesting method in this queue is the take() method. If a LinkedBlockingQueue is empty and take() is called, the thread that calls this method will be blocked until a new element is added to the queue and then the newly added element will be retrieved immediately. Take note that the blocked thread can be interrupted.

LinkedBlockingDeque

LinkedBlockingDeque is an optionally-bounded blocking deque based on linked nodes. The optional capacity bound constructor argument serves as a way to prevent excessive expansion. The capacity, if unspecified, is equal to Integer.MAX_VALUE. Linked nodes are dynamically created upon each insertion unless this would bring the deque above capacity.

Most operations run in constant time (ignoring time spent blocking). Exceptions include remove, removeFirstOccurrence, removeLastOccurrence, contains, iterator.remove, and the bulk operations, all of which run in linear time.

This is similar to LinkedBlockingQueue. One of the common differences between these two is that LinkedBlockingDeque supports FIFO and LIFO whereas LinkedBlockingQueue only supports FIFO. This example demonstrates LinkedBlockingDeque.
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SampleClass{

  public static void main(String[] args){
    ExecutorService es = 
    Executors.newFixedThreadPool(2);
    
    LinkedBlockingDeque<Integer> lbd =
    new LinkedBlockingDeque<>(5);
    
    lbd.add(2);
    lbd.add(4);
    lbd.add(6);
    lbd.add(8);
    lbd.add(10);
    
    System.out.println("Size: " + lbd.size());
    es.execute(() -> 
    {
      try{lbd.putFirst(12);}
      catch(InterruptedException e){
        e.printStackTrace();
      }
      System.out.println("12 is added to the head of the queue.");
    });
    
    es.execute(() ->
    System.out.println(lbd.pollLast() + " is retrieved."));
    
    es.shutdown();
  
    //wait for all tasks to be completed
    while(!es.isTerminated());
  
    System.out.println("Size: " + lbd.size());
  }
}

Result(may vary)
Size: 5
12 is added to the head of the queue.
10 is retrieved.
Size: 5

ArrayBlockingQueue

ArrayBlockingQueue is a bounded blocking queue backed by an array. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.

This class supports an optional fairness policy for ordering threads that access this queue. By default, this ordering is not guaranteed. However, a queue constructed with fairness set to true grants threads access in FIFO order. Fairness generally decreases throughput but reduces variability and avoids starvation.

This is a classic "bounded buffer", in which a fixed-sized array holds elements inserted by producers and extracted by consumers. Once created, the capacity cannot be changed. Attempts to put an element into a full queue will result in the operation blocking; attempts to take an element from an empty queue will similarly block.

This queue is similar to LinkedBlockingQueue. Some of the common differences between these two are:
  • LinkedBlockingQueue uses linked nodes whereas this queue uses an array to store its elements.
  • this queue has optional fairness policy whereas LinkedBlockingQueue doesn't have.
  • and this queue is bounded whereas LinkedBlockingQueue is optionally-bounded.

This example demonstrates ArrayBlockingQueue.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SampleClass{

  public static void main(String[] args){
    ExecutorService es = 
    Executors.newFixedThreadPool(2);
    
    ArrayBlockingQueue<Integer> abq =
    new ArrayBlockingQueue<>(5);
    
    abq.add(2);
    abq.add(4);
    abq.add(6);
    abq.add(8);
    abq.add(10);
    
    System.out.println("Size: " + abq.size());
    es.execute(() -> 
    {
      try{abq.put(12);}
      catch(InterruptedException e){
        e.printStackTrace();
      }
      System.out.println("12 is added to the queue.");
    });
    
    es.execute(() ->
    System.out.println(abq.poll() + " is retrieved."));
    
    es.shutdown();
  
    //wait for all tasks to be completed
    while(!es.isTerminated());
  
    System.out.println("Size: " + abq.size());
  }
  
}

Result(may vary)
Size: 5
12 is added to the queue
2 is retrieved
Size: 5

SynchronousQueue

SynchronousQueue is a blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one.

You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate.

This class supports an optional fairness policy for ordering waiting producer and consumer threads. By default, this ordering is not guaranteed. However, a queue constructed with fairness set to true grants threads access in FIFO order.

This example demonstrates SynchronousQueue.
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SampleClass{

  public static void main(String[] args){
    SynchronousQueue<String> sq = new SynchronousQueue<>();
    
    ExecutorService es = 
    Executors.newFixedThreadPool(2);
    
    es.execute(() -> {
    
    try{
      System.out.println("Retrieved Element: "+sq.take());
    }
    catch(InterruptedException e){
      e.printStackTrace();
    }
    
    });
    
    es.execute(() -> {
    
    try{
      sq.put("String");
    }
    catch(InterruptedException e){
      e.printStackTrace();
    }
    
    });
    
    es.shutdown();
  }
}

Result
Retrieved Element: String
In the example above, if the take() method is executed first then, the thread that executed take waits, if necessary, for another thread to call a method that inserts an element in the SynchronousQueue like put() method.

If the put() method is executed first then, the thread that executed put() waits, if necessary to call a method that retrieves an element in the SynchronousQueue like take() method.

PriorityBlockingQueue

PriorityBlockingQueue is an unbounded blocking queue that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations. While this queue is logically unbounded, attempted additions may fail due to resource exhaustion (causing OutOfMemoryError).

This class does not permit null elements. A priority queue relying on natural ordering also does not permit insertion of non-comparable objects (doing so results in ClassCastException).

The Iterator provided in method iterator() and the Spliterator provided in method spliterator() are not guaranteed to traverse the elements of the PriorityBlockingQueue in any particular order. If you need ordered traversal, consider using Arrays.sort(pq.toArray()). Also, method drainTo can be used to remove some or all elements in priority order and place them in another collection.

Operations on this class make no guarantees about the ordering of elements with equal priority. If you need to enforce an ordering, you can define custom classes or comparators that use a secondary key to break ties in primary priority values.

This example demonstrates PriorityBlockingQueue.
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SampleClass{

  public static void main(String[] args){
    
    PriorityBlockingQueue<String> pbq = new
    PriorityBlockingQueue<>();
    
    pbq.offer("Ocelot");
    pbq.offer("Zebra");
    pbq.offer("Dog");
    
    ExecutorService es = 
    Executors.newFixedThreadPool(3);
    
    int size = pbq.size();
    
    for(int i = 0; i < size; i++)
      es.execute(() ->{
      
      try{
      System.out.println(Thread.currentThread().
      getName() + ": " + pbq.take());
      }
      catch(InterruptedException e){
        e.printStackTrace();
      }
      
      });
      es.shutdown();
  }
}

Result(may vary)
pool-1-Thread-1: Dog
pool-1-Thread-2: Ocelot
pool-1-Thread-3: Zebra
DelayQueue

DelayQueue is an unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired. The head of the queue is that Delayed element whose delay expired furthest in the past. If no delay has expired there is no head and poll will return null.

Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero. Even though unexpired elements cannot be removed using take or poll, they are otherwise treated as normal elements. For example, the size method returns the count of both expired and unexpired elements. This queue does not permit null elements.

This example demonstrates DelayQueue.
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class SampleClass{

  public static void main(String[] args){
    DelayQueue<DelayedElements> dq =
    new DelayQueue<>();
    
    ExecutorService es = 
    Executors.newFixedThreadPool(3);
    
    es.execute(() -> dq.put(new DelayedElements(1000, "item1")));
    es.execute(() -> dq.put(new DelayedElements(2000, "item2")));
    es.execute(() -> dq.put(new DelayedElements(3000, "item3")));
    
    for(int i = 0; i < 3; i++)
      es.execute(() -> {
        
        try{
          System.out.println(Thread.currentThread().getName()+
          ": "+dq.take());
        }
        catch(InterruptedException e){
        }
      });
      es.shutdown();
  }
}

class DelayedElements implements Delayed{

  private long delayTime;
  private String elem;
  
  DelayedElements(long delayMillis, String elem){
    delayTime = System.currentTimeMillis() + delayMillis;
    this.elem = elem;
  }
  
  @Override
  public long getDelay(TimeUnit unit){
    
    long diff = delayTime - System.currentTimeMillis();
    return unit.convert(diff, TimeUnit.MILLISECONDS);
  }
  
  @Override
  public int compareTo(Delayed o){
    DelayedElements de = (DelayedElements)o;
    
    if(this.delayTime < de.delayTime)
      return -1;
    else if(this.delayTime > de.delayTime)
      return 1;
    return 0;
  }
  
  @Override
  public String toString(){
    return elem.toString();
  }
  
}

Result(may vary)
pool-1-Thread-1: item1
pool-1-Thread-2: item2
pool-1-Thread-3: item3
First off, we create a class that implements Delayed interface. Next, we need to override getDelay() and compareTo() methods. I overrode toString() so I could get the value of name variable. In the getDelay() method, I used convert(long sourceDuration, TimeUnit sourceUnit) method from TimeUnit. This method converts the sourceDuration to its respective sourceUnit and then converts the converted duration to the TimeUnit that calls convert().

Java recommends us to put delayed elements whose delay expired furthest in the past to the head of the queue. Thay's why I overrode compareTo(). See my compareTo() blogpost to more about compareTo().

LinkedTransferQueue

LinkedTransferQueue is an unbounded TransferQueue based on linked nodes. This queue orders elements FIFO (first-in-first-out) with respect to any given producer. The head of the queue is that element that has been on the queue the longest time for some producer. The tail of the queue is that element that has been on the queue the shortest time for some producer.

Beware that, unlike in most collections, the size method is NOT a constant-time operation. Because of the asynchronous nature of these queues, determining the current number of elements requires a traversal of the elements, and so may report inaccurate results if this collection is modified during traversal.

Bulk operations that add, remove, or examine multiple elements, such as addAll(), removeIf() or foreach() are not guaranteed to be performed atomically. For example, a forEach traversal concurrent with an addAll operation might observe only some of the added elements.

This queue has method transfer() and tryTransfer(). These methods transfer an element to a consumer thread and wait, if necessary, until one of the consumer threads receives an element. This functionality is the main functionality of a transfer queue.

This example demonstrates LinkedTransferQueue.
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SampleClass{

  public static void main(String[] args){
    LinkedTransferQueue<String> ltq =
    new LinkedTransferQueue<>();
    
    ltq.add("String1");
    
    ExecutorService producer = 
    Executors.newFixedThreadPool(1);
    
    ExecutorService consumer = 
    Executors.newFixedThreadPool(1);
    
    producer.execute(() -> {
    try{
      ltq.transfer("String2");
    }
    catch(InterruptedException e){
      e.printStackTrace();
    }
    });
    producer.shutdown();
    
    consumer.execute(() -> {
    try{
      System.out.println("Element: " + ltq.take());
      System.out.println("Element: " + ltq.take());
    }
    catch(InterruptedException e){
      e.printStackTrace();
    }
    });
    consumer.shutdown();
  }
}

Result
Element: String1
Element: String2
Producer is unblocked
transfer() is just like add(). Although, unlike add(), transfer() blocks the thread that calls the method until some thread retrieves the transferred element. If no thread retrieves the transferred element, it will be placed at the tail(bottom) of the queue until it's retrieved.

take() is just like poll(). Although, if a queue is empty, take() blocks the thread that calls the method until there's an available element in the queue.

Concurrent Collections

Concurrent Collections are alternative to some Collections in some situations. For example, a ConcurrentHashMap is normally preferable to a synchronized HashMap, and a ConcurrentSkipListMap is normally preferable to a synchronized TreeMap.

Collections in the concurrent package, including queues, that have "concurrent" in ther names are thread-safe, but not governed by a single exclusion lock. Synchronized collections that are not part of the concurrent package are governed by a single exclusion lock. More information can be read in the documentation.

ConcurrentHashMap

A hash table supporting full concurrency of retrievals and high expected concurrency for updates. This class obeys the same functional specification as Hashtable, and includes versions of methods corresponding to each method of Hashtable.

However, even though all operations are thread-safe, retrieval operations do not entail locking, and there is not any support for locking the entire table in a way that prevents all access. This class is fully interoperable with Hashtable in programs that rely on its thread safety but not on its synchronization details.

Retrieval operations (including get) generally do not block, so may overlap with update operations (including put and remove). Retrievals reflect the results of the most recently completed update operations holding upon their onset.

Retrieval operations (including get) generally do not block, so may overlap with update operations (including put and remove). Retrievals reflect the results of the most recently completed update operations holding upon their onset. Like Hashtable but unlike HashMap, this class does not allow null to be used as a key or value. More information can be read in the documentation.

This example demonstrates ConcurrentHashMap.
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.Map;

public class SampleClass{

  public static void main(String[] args){
    ConcurrentHashMap<Integer,String> chm =
    new ConcurrentHashMap<>();
    
    chm.put(100,"String1");
    chm.put(102,"String2");
    
    System.out.println("Loop through ConcurrentHashMap...");
    for(Map.Entry entry : chm.entrySet())
      System.out.println(entry.getKey() + " | " +
                         entry.getValue());
    
    ExecutorService es = Executors.newFixedThreadPool(3);
    es.execute(() -> 
    chm.compute(100, (k,v) -> v.concat("00")));
    es.execute(() -> 
    chm.computeIfAbsent(101, (k) -> "String3"));
    es.execute(() -> 
    chm.computeIfPresent(102, (k,v) -> v.concat("22")));
    es.shutdown();
    
    //make sure all tasks are completed in order to make
    //the updates visible to the main thread
    while(!es.isTerminated());
    
    System.out.println("Loop through ConcurrentHashMap...");
    for(Map.Entry entry : chm.entrySet())
      System.out.println(entry.getKey() + " | " +
                         entry.getValue());
    es.shutdown();
  }
}

Result
Loop through ConcurrentHashMap...
100 | String1
102 | String2
Loop through ConcurrentHashMap...
100 | String100
101 | String3
102 | String222

ConcurrentSkipListMap

ConcurrentSkipListMap is a scalable concurrent ConcurrentNavigableMap implementation. The map is sorted according to the natural ordering of its keys, or by a Comparator provided at map creation time, depending on which constructor is used.

This map has similarities with TreeMap. Although, TreeMap uses Red-Black Tree whereas ConcurrentSkipListMap uses Skip List that provides expected average log(n) time cost for the containsKey, get, put and remove operations and their variants.

Insertion, removal, update, and access operations safely execute concurrently by multiple threads. Ascending key ordered views and their iterators are faster than descending ones.

This example demonstrates ConcurrentSkipListMap.
import java.util.concurrent.ConcurrentSkipListMap;

public class SampleClass{

  public static void main(String[] args){
  
    ConcurrentSkipListMap<Integer, String> skiplist
    = new ConcurrentSkipListMap<Integer, String>();
    
    skiplist.put(105, "String5");
    skiplist.put(100, "String0");
    skiplist.put(103, "String3");
    skiplist.put(101, "String1");
    
    //firstEntry() returns a key-value mapping associated
    //with the least key in this map, or null if the map is empty.
    System.out.println("First entry: " + skiplist.firstEntry());
    
    //lastEntry() returns a key-value mapping associated with
    //the greatest key in this map, or null if the map is empty.
    System.out.println("Last entry: " + skiplist.lastEntry());
    
    //higherEntry() returns a key-value mapping associated
    //with the least key strictly greater than the given key,
    //or null if there is no such key.
    System.out.println("Higher entry of key\"102\": "
    + skiplist.higherEntry(102));
    
    //lowerEntry() returns a key-value mapping associated
    //with the greatest key strictly less than the given
    //key, or null if there is no such key.
	System.out.println("Lower entry: of key\"102\": "
    + skiplist.lowerEntry(102));
    
    //higherEntry() here returns null because the key
    //"106" doesn't exist and the key(107 and above)
    //that is greater than 106 exceeds the available
    //greatest key(105) on the list
    System.out.println("Higher entry of key\"106\": "
    + skiplist.higherEntry(106));
  }
}

Result
First entry: 100=String0
Last entry: 105=String5
Higher entry of key "102": 103=String3
Lower entry of key "102": 101=String1
Higher entry of key "106": null

ConcurrentSkipListSet

ConcurrentSkipListSet is simply just the Set version of ConcurrentSkipListMap. Beware that, unlike in most collections, the size() method is not a constant-time operation. Because of the asynchronous nature of these sets, determining the current number of elements requires a traversal of the elements, and so may report inaccurate results if this collection is modified during traversal.

This example demonstrates ConcurrentSkipListSet.
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SampleClass{

  public static void main(String[] args){
    ConcurrentSkipListSet<String> skiplist
    = new ConcurrentSkipListSet<String>();
    
    System.out.println("Adding Elements...");
    ExecutorService es = Executors.newFixedThreadPool(3);
    es.execute(() -> skiplist.add("String3"));
    es.execute(() -> skiplist.add("String1"));
    es.execute(() -> skiplist.add("String2"));
    es.shutdown();
    
    //make sure all tasks are completed in order to make
    //the updates visible to the main thread
    while(!es.isTerminated());
    
    System.out.println("Elements...");
    for(String s : skiplist)
      System.out.println(s);
  }
}

Result
Adding Elements...
Elements...
String1
String2
String3

CopyOnWriteArrayList

CopyOnWriteArrayList is a thread-safe variant of ArrayList in which all mutative operations (add, set, and so on) are implemented by making a fresh copy of the underlying array.

This is ordinarily too costly, but may be more efficient than alternatives when traversal operations vastly outnumber mutations, and is useful when you cannot or don't want to synchronize traversals, yet need to preclude interference among concurrent threads. The "snapshot" style iterator method uses a reference to the state of the array at the point that the iterator was created.

This array never changes during the lifetime of the iterator, so interference is impossible and the iterator is guaranteed not to throw ConcurrentModificationException. The iterator will not reflect additions, removals, or changes to the list since the iterator was created.

Element-changing operations on iterators themselves (remove, set, and add) are not supported. These methods throw UnsupportedOperationException. All elements are permitted, including null.

This example demonstrates CopyOnWriteArrayList.
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SampleClass{

  public static void main(String[] args){
    CopyOnWriteArrayList<String> concurrentArrayList
    = new CopyOnWriteArrayList<String>();
    
    concurrentArrayList.add("String1");
    concurrentArrayList.add("String2");
    concurrentArrayList.add("String3");
    concurrentArrayList.add("String2");
    
    ExecutorService es = Executors.newFixedThreadPool(3);
    
    es.execute(() ->
    System.out.println(Thread.currentThread().getName() +": "+
                       concurrentArrayList.get(0)));
    es.execute(() ->
    System.out.println(Thread.currentThread().getName() +": "+
                       concurrentArrayList.get(1)));
    es.execute(() ->
    System.out.println(Thread.currentThread().getName() +": "+
                       concurrentArrayList.get(2)));
    es.execute(() ->
    System.out.println(Thread.currentThread().getName() +": "+
                       concurrentArrayList.get(3)));
    es.shutdown();
    
  }
}

Result(may vary)
pool-1-Thread-2: String2
pool-1-Thread-1: String1
pool-1-Thread-3: String3
pool-1-Thread-2: String2

CopyOnWriteArraySet

CopyOnWriteArraySet is just simply the Set version of CopyOnWriteArrayList. This example demonstrates CopyOnWriteArraySet.
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SampleClass{

  public static void main(String[] args){
    CopyOnWriteArraySet<String> concurrentArraySet
    = new CopyOnWriteArraySet<String>();
    
    System.out.println("Adding Elements...");
    ExecutorService es = Executors.newFixedThreadPool(3);
    es.execute(() -> concurrentArraySet.add("String3"));
    es.execute(() -> concurrentArraySet.add("String1"));
    es.execute(() -> concurrentArraySet.add("String2"));
    
    //This element here won't be added because it has
    //duplicate in the Set
    es.execute(() -> concurrentArraySet.add("String2"));
    es.shutdown();
    
    //make sure all tasks are completed in order to make
    //the updates visible to the main thread
    while(!es.isTerminated());
    
    System.out.println("Elements...");
    for(String s : concurrentArraySet)
      System.out.println(s);
    
  }
}

Result(may vary)
Adding Elements...
Elements...
String3
String1
String2

java.util.concurrent.atomic Package

This package consists of classes that support lock-free thread-safe programming on single variables. Instances of Atomic classes maintain values that are accessed and updated using methods otherwise available for fields using associated atomic VarHandle operations. More information can be read in the documentation.

The term atomic simply means an update(write) to a value in the memory is guaranteed to be fully visible to other threads. This ensures other threads can see the overall change in the value without inconsistencies. Normally, atomic operations only consist of a single step. For example, i = 1; is an atomic operation. i++, i+=1 and i=i+1 are not atomic operations because they consist of multiple steps.

Also note that atomic operations can be JVM specific. For example, long a = 1L and double a = 1.5d are not atomic in 32 bit JVM because the bits of those data types are 64 bits. Most of the time, 32-bit JVM's split the writing operation of 64-bit data type to memory into two operations: write the 32 bits part of the 64-bit data type and then write the remaining 32 bits after.

This package has classes that provide atomic operations for increment, decrement, addition and many more. Classes in this package also provide non-blocking synchronization techniques like compare-and-set(a variant of compare-and-swap) and many more. Most modern computers support compare-and-swap or CAS for short.

non-blocking synchronization techniques like compare-and-set have weaker versions. The effect of the weaker versions is platform specific. You can read this discussion or read this java 8 documentation for more information.

AtomicBoolean, AtomicInteger, AtomicLong and AtomicReference

These classes provide access and updates to a single variable of the corresponding type. Each class also provides appropriate utility methods for that type. For example, classes AtomicLong and AtomicInteger provide atomic increment methods.

AtomicBoolean, AtomicInteger and AtomicLong are the thread-safe versions of Boolean, Integer and Long wrapper classes. However, they're not interchangeable. Wrapper classes are immutable and not thread-safe by default whereas their atomic versions are mutable and thread-safe.

These example demonstrates atomic operation on a single variable.
import java.util.concurrent.atomic.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SampleClass{

  public static void main(String[] args){
    AtomicReference<ClassA> ar =
    new AtomicReference<>(new ClassA(3));
    
    ExecutorService es = Executors.newFixedThreadPool(2);
    es.execute(() -> 
    System.out.println("getAndIncrement: "+
    ar.get().intNum.getAndIncrement()));
    es.execute(() -> ar.set(new ClassA(0)));
    es.shutdown();
    
    while(!es.isTerminated());
    System.out.println("Current value: " + 
    ar.get().intNum.get());
  }
}

class ClassA{
  AtomicInteger intNum;
  
  ClassA(int initValue){
    intNum = new AtomicInteger(initValue);
  }
}

Result(may vary)
getAndIncrement: 3
Current value: 0
The atomic methods(get(), set(), getAndIncrement()) that I've shown above are atomic methods that are commonly used. There are advanced methods(compareAndSet(), compareAndExchange()) that can be used for advanced atomic tuning.

More information about the methods and classes used above can be read in the documentation.

AtomicIntegerArray, AtomicLongArray, and AtomicReferenceArray

These classes further extend atomic operation support to arrays of these types. These classes are also notable in providing volatile access semantics for their array elements.

This example demonstrates atomic operation on atomic array.
import java.util.concurrent.atomic.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SampleClass{

  public static void main(String[] args){
    AtomicIntegerArray aia = 
    new AtomicIntegerArray(new int[]{2,4,6,8});
    
    System.out.println("Initial Values");
    for(int i = 0; i < aia.length(); i++)
      System.out.println(aia.get(i));
    
    ExecutorService es = Executors.newFixedThreadPool(2);
    es.execute(() -> 
    {
      for(int i = 0; i < aia.length(); i++)
        aia.getAndIncrement(i);
    
    });
    es.execute(() -> 
    {
      for(int i = 0; i < aia.length(); i++)
        aia.getAndIncrement(i);
    
    });
    es.shutdown();
    
    while(!es.isTerminated());
    
    System.out.println("Current Values");
    for(int i = 0; i < aia.length(); i++)
      System.out.println(aia.get(i));
    
  }
}

Result
Initial Values
2
4
6
8
Current Values
4
6
8
10
More information about the methods and classes used above can be read in the documentation.

Updater Classes

Updater classes like AtomicReferenceFieldUpdater, AtomicIntegerFieldUpdater, and AtomicLongFieldUpdater are reflection-based utilities that provide access to the associated field types. These are mainly of use in atomic data structures in which several volatile fields of the same node (for example, the links of a tree node) are independently subject to atomic updates.

We can use this classes to implement atomic operations and non-blocking synchronization techniques to volatile variables. volatile keyword provides memory visibility but lacking of atomic operations. Why do we need this? you might ask. Classes like AtomicInteger, AtomicBoolean and others can be expensive to instatiate espeicially if we want a lot of instances of those classes.

These updater classes have less expensive setup than standard atomic classes in this package and enable greater flexibility in how and when to use atomic updates, at the expense of more awkward reflection-based setup, less convenient usage, and weaker guarantees.

This example demonstrates AtomicIntegerFieldUpdater.
import java.util.concurrent.atomic.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SampleClass{

  public static void main(String[] args){
    ClassA c1 = new ClassA();
    
    AtomicIntegerFieldUpdater<ClassA>
    intUpdater = AtomicIntegerFieldUpdater.newUpdater(
                 ClassA.class, "intOne");
                 
    ExecutorService es = Executors.newFixedThreadPool(2);
    es.execute(() -> 
    {
      System.out.println(Thread.currentThread().getName()+
      ": " + intUpdater.addAndGet(c1, 3));
    
    });
    es.execute(() -> 
    {
      System.out.println(Thread.currentThread().getName()+
      ": " + intUpdater.addAndGet(c1, 5));
    
    });
    es.shutdown();
    
    while(!es.isTerminated());
    
    System.out.println("intOne current value: "+c1.intOne);
    
  }
}

class ClassA{

  volatile int intOne = 0;
  
}

Result(may vary)
pool-1-thread-1: 3
pool-1-thread-2: 8
intOne current value: 8
In the example above, we use newUpdater() method. This method returns an updater object that atomically updates the volatile variable in a class. The first argument is the object's Class type. The second argument is the name of the volatile field.

AtomicMarkableReference

An AtomicMarkableReference maintains an object reference along with a mark bit, that can be updated atomically. We can use this to associate a reference with a boolean flag that can be useful in some situations. For example, we use the boolean flag to indicate that a file is marked for deletion.
import java.util.concurrent.atomic.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import java.io.File;

public class SampleClass{

  public static void main(String[] args) throws Exception{
     AtomicMarkableReference<File> file =
     new AtomicMarkableReference<>
     (new File("C:\\test\\test.txt"), false);
     
     ExecutorService es = Executors.newFixedThreadPool(1);
     
     es.execute(() -> {
       
       try{
       if(!file.attemptMark(file.getReference(), true))
         throw new Exception("Reference has been changed!\n"+
                             "Marking failed!");
       }
       catch(Exception e){
         e.printStackTrace();
       }
     
     });
     es.shutdown();
     
     while(!es.isTerminated());
     
     if(file.getReference().exists() &&
        file.isMarked())
       file.getReference().delete();
  }
}
attemptMark() has two parameters. First parameter is the expected reference. This reference will be compared to the reference that AtomicMarkableReference holds. If the current reference and the expected reference are equal, the boolean flag can be changed atomatically. If this method returns false, it means the attempt to change the mark failed.

Make sure that the file you put in this example is a throw-away file. The delete() method deletes the specified file permanently without notification.

AtomicStampedReference

An AtomicStampedReference maintains an object reference along with an integer "stamp", that can be updated atomically. This is similar to AtomicMarkableReference but we use "stamp" here instead of "mark".

This example demonstrates AtomicStampedReference.
import java.util.concurrent.atomic.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SampleClass{

  public static void main(String[] args){
    AtomicStampedReference<Integer> number =
    new AtomicStampedReference<>
    (10, 0);
    
    ExecutorService es = Executors.newFixedThreadPool(3);
    
    es.execute(() -> 
    number.compareAndSet(number.getReference(),
                         number.getReference()+10,
                         number.getStamp(),
                         number.getStamp()+1));
    es.execute(() -> 
    number.compareAndSet(number.getReference(),
                         number.getReference()+20,
                         number.getStamp(),
                         number.getStamp()+1));
    es.execute(() -> 
    number.compareAndSet(number.getReference(),
                         number.getReference()+30,
                         number.getStamp(),
                         number.getStamp()+1));
    es.shutdown();
    
    while(!es.isTerminated());
    
    System.out.println("Current value: " 
                       + number.getReference());
    System.out.println("# of times the value was changed: "+
                       number.getStamp());
    
  }
}

Result
Current value: 70
# of times the value was changed: 3
In the example above, I use the stamp to keep track of the number of changes that happened in the reference value of number variable. It's a good practice to generate unique stamp everytime we change it because the stamp parameter in this class is like a version number. Though, stamp can be randomized.

Accumulators and Adders

Adders like LongAdder and DoubleAdder are used to increase a value initially starts at 0. This class is different from standard atomic classes like AtomicLong. If a large amount of threads tries to access the value in AtomicLong, contention may happen.

Unlike standard atomic classes that store incremented value in one variable, adders store incremented values in a set of variables or an array. In this way, some threads can update one cell(element) while others update other cells. Moreover, the array that holds incremented values may grow if necessary to reduce contention.

Accumulators like LongAccumulator and DoubleAccumulator are the same as adders. Although, accumulators require two arguments: lambda function and identity(initial value).

Just like other non-blocking algorithms, execution order in Accumulators and Adders is undetermined. Operations in these classes should be in associative or commulative form.

This example demonstrates LongAdder.
import java.util.concurrent.atomic.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SampleClass{

  public static void main(String[] args){
    LongAdder la = new LongAdder();
    System.out.println("Initial value: " + la.intValue());
    
    ExecutorService es = Executors.newFixedThreadPool(4);
    
    es.execute(() -> la.add(2));
    es.execute(() -> la.add(4));
    es.execute(() -> la.add(6));
    es.execute(() -> la.add(8));
    es.shutdown();
    
    while(!es.isTerminated());
    
    System.out.println("Sum: " + la.sum());
  }
}

Result
Initial value: 0
Sum: 20
Next, this example demonstrates LongAccumulator.
import java.util.concurrent.atomic.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SampleClass{

  public static void main(String[] args){
    LongAccumulator la = 
    new LongAccumulator((l,r) -> l*r, 2);
    
    System.out.println("Initial value: " + la.get());
    
    ExecutorService es = Executors.newFixedThreadPool(4);
    
    es.execute(() -> la.accumulate(1));
    es.execute(() -> la.accumulate(2));
    es.execute(() -> la.accumulate(3));
    es.execute(() -> la.accumulate(4));
    es.shutdown();
    
    while(!es.isTerminated());
    
    System.out.println("Product: " + la.get());
  }
}

Result
Initial value: 2
Product: 48

No comments:

Post a Comment