Sunday, January 23, 2022

Java Tutorial: AsynchronousFileChannel

Chapters

Opening and Reading AsynchronousFileChannel

Note: You should be knowledgeable about FileChannel before reading this article. AsynchronousFileChannel is just a FileChannel but it's built for asynchronous file access.

AsynchronousFileChannel is associated with a thread pool to which tasks are submitted to handle I/O events and dispatch to completion handlers that consume the results of I/O operations on the channel. asynchronous file channel does not have a current position within the file. Instead, the file position is specified to each read and write method that initiates asynchronous operations.

A CompletionHandler is specified as a parameter and is invoked to consume the result of the I/O operation. This class also defines read and write methods that initiate asynchronous operations, returning a Future to represent the pending result of the operation. The Future may be used to check if the operation has completed, wait for its completion, and retrieve the result. More information can be found in the documentation.

This example demonstrates reading a file using AsynchronousFileChannel.
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.*;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class SampleClass{

  public static void main(String[] args)
                throws IOException,
                       ExecutionException,
                       InterruptedException{
  
    Path source = Paths.get("C:\\test\\fileTest.txt");
    
    if(!Files.exists(source)){
      System.err.println("source doesn't exist!");
      return;
    }
    String content = null;
    
    try(AsynchronousFileChannel afc = 
     AsynchronousFileChannel
     .open(source,StandardOpenOption.READ)){
    
      int buffer = 
      (1024L > afc.size()) ? 
      (int)afc.size() : 1024;
      
      ByteBuffer bb = 
      ByteBuffer.allocate(buffer);
      
      Future<Integer> result = afc.read(bb, 0);
      
      //wait for the Future to be completed
      result.get();
      
      content = 
      new String(bb.array(),
                 StandardCharsets.UTF_8);
    }
    if(content != null)
      System.out.println("Content: " + content);
    else System.err.println("content is null!");
  }
}
In the example above, I use this form of open() method of AsynchronousFileChannel:
open(Path file, OpenOption... options)
This method associates AsynchronousFileChannel with default thread pool that is defined in the AsynchronousChannelGroup class.

Then, Future is used to determine if all threads in the pool are done reading the file in the example ebove. The return value of Future of read() method is the number of bytes read. Note that ByteBuffer is not thread safe. Make sure that you're not using ByteBuffer in multiple threads except for AsynchronousFileChannel operations like read() and write() methods.

Writing AsynchronousFileChannel

We can use AsynchronousFileChannel for writing data to a file. This example demonstrates writing data to a file using AsynchronousFileChannel.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.*;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class SampleClass{

  public static void main(String[] args)
                     throws IOException{
                     
    Path source = Paths.get("C:\\test\\test.txt");
    
    ExecutorService es = Executors.newFixedThreadPool(2);
    
    try(AsynchronousFileChannel afc = 
     AsynchronousFileChannel
     .open(source, 
     java.util.EnumSet.of
     (StandardOpenOption.WRITE,
      StandardOpenOption.CREATE),es)){
           
       ByteBuffer bb = ByteBuffer.allocate(1024);
       bb.put("ABCDEFGHIJK".getBytes());
       bb.flip();
       
       afc.write(bb, 0, bb, new 
       FinalizeTask<Integer, ByteBuffer>());
     }
     es.shutdown();
  }
}

class FinalizeTask<T,A> implements 
CompletionHandler<T,A>{

  @Override
  public void completed(T result, A attachment) {
    
    System.out.println("Writing Complete!");
    if(attachment instanceof ByteBuffer){
      ByteBuffer bb = (ByteBuffer)attachment;
      System.out.println
      ("Buffer has remaining content? " + bb.hasRemaining());
      System.out.println("Number of bytes written: " + result);
    }
  }
  
  @Override
  public void failed(Throwable exc, A attachment) {
    exc.printStackTrace();
  }
}
In the example above, I use this form of open() method:
open(Path file, Set<? extends OpenOption> options, ExecutorService executor, FileAttribute<?>... attrs)
This method requires an ExecutorService. FileAttribute is optional. StandardOpenOption.CREATE create a file if the file in the source doesn't exist.

flip() method flips the buffer. The limit is set to the current position and then the position is set to zero. If the mark is defined then it is discarded. Just like file channel, byte buffer has pointer that is repositioned if byte buffer executes an operation like put(byte[] src) method.

Next, let's examine this form of write() method:
write(ByteBuffer src, long position, A attachment, CompletionHandler<Integer,? super A> handler)
src is a ByteBuffer source. position is the position of the file pointer of file channel. attachment is an attachment that we wanna attach in the CompletionHandler. handler is a CompletionHandler.

CompletionHandler consumes the result of an asynchronous I/O operation. The asynchronous channels defined in this package allow a completion handler to be specified to consume the result of an asynchronous operation.

The completed method is invoked when the I/O operation completes successfully. The failed method is invoked if the I/O operations fails. The implementations of these methods should complete in a timely manner so as to avoid keeping the invoking thread from dispatching to other completion handlers.

This interface has two overridable methods: completed(V result, A attachment) and failed(Throwable exc, A attachment). result is the total number of read/written bytes. attachment is the attached object. In this example, ByteBuffer bb is the attached object. exc is a Throwable exception that occurs if I/O operation fails.

read() method has this version of this form of write():
read(ByteBuffer dst, long position, A attachment, CompletionHandler<Integer,? super A> handler)

No comments:

Post a Comment