java AIO

java AIO

Posted by julyerr on February 22, 2018

紧接着上一篇java nio,这篇blog主要总结aio在java中使用。
jdk1.4引入对io多路复用支持,被称为java nio; jdk1.7中引入对异步非阻塞io的支持,被称为java nio2,也就是aio.

开发人员向内核注册读、写以及异常发生的事件,操作系统在检测到对应事件发生的时候会发起一个信号通知,通常有两种处理方式

  • 将来式:程序轮循判断或者阻塞等待结果, 使用Future保存异步操作的处理结果
  • 回调式:事件发生的时候自动调用函数进行处理,也就是所谓的异步回调。需要实现java.nio.channels.CompletionHandler<V,A>接口,重写其中的completed<V,A>failed<V,A>方法,其中V表示结果类型,A是结果的附着对象。

aio中三种异步通道(asynchonous io)

  • AsynchronousFileChannel: 用于文件异步读写;
  • AsynchronousSocketChannel: 客户端异步socket;
  • AsynchronousServerSocketChannel: 服务器异步socket。

AsynchronousFileChannel

从类提供的方法来看,read()和write()均提供了future和回调式两种处理方式,以回调式为例

public abstract <A> void read(ByteBuffer dst,
                                  long position,
                                  A attachment,
                                  CompletionHandler<Integer,? super A> handler);

public abstract <A> void write(ByteBuffer src,
                                   long position,
                                   A attachment,
                                   CompletionHandler<Integer,? super A> handler);                                  

下面是一个异步读取文件的demo

Path path = Paths.get("/home/julyerr/github/collections/java/src/com/julyerr/interviews/aio/AIOFile.java");
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path);
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer, 0, buffer, new CompletionHandler<Integer,ByteBuffer>() {
    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        System.out.println(Thread.currentThread().getName() + " read success!");
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        System.out.println("read error");
    }
});

while (true){
    System.out.println(Thread.currentThread().getName() + " sleep");
    Thread.sleep(1000);
}

AsynchronousServerSocketChannel

可以指定使用线程池并发响应请求

public static AsynchronousServerSocketChannel open(AsynchronousChannelGroup group)
        throws IOException

类似,accept()同样提供future和回调式两种使用方式

public abstract <A> void accept(A attachment,
                                    CompletionHandler<AsynchronousSocketChannel,? super A> handler);

下面是一个demo

AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(4));
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group).bind(new InetSocketAddress("0.0.0.0", 8013));
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
    @Override
    public void completed(AsynchronousSocketChannel result, Void attachment) {
        server.accept(null, this); // 接受下一个连接
        try {
            String now = new Date().toString();
            ByteBuffer buffer = encoder.encode(CharBuffer.wrap(now + "\r\n"));
            //future 获取保存运行结果
            Future<Integer> f = result.write(buffer);
            f.get();
            System.out.println("sent to client: " + now);
            result.close();
        } catch (IOException | InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void failed(Throwable exc, Void attachment) {
        exc.printStackTrace();
    }
});
group.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

AsynchronousSocketChannel

虽然提供方法和接口比较多,但是常用的还是open(),connect(),read(),write()等
read

public abstract <A> void read(ByteBuffer dst,
                                  long timeout,
                                  TimeUnit unit,
                                  A attachment,
                                  CompletionHandler<Integer,? super A> handler);

write

public abstract <A> void write(ByteBuffer src,
                                   long timeout,
                                   TimeUnit unit,
                                   A attachment,
                                   CompletionHandler<Integer,? super A> handler);

下面是一个demo

AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
//future保存运行结果
Future<Void> future = client.connect(new InetSocketAddress("127.0.0.1", 8013));
future.get();

ByteBuffer buffer = ByteBuffer.allocate(100);
client.read(buffer, null, new CompletionHandler<Integer, Void>() {
    @Override
    public void completed(Integer result, Void attachment) {
        System.out.println("client received: " + new String(buffer.array()));

    }
    @Override
    public void failed(Throwable exc, Void attachment) {
        exc.printStackTrace();
        try {
            client.close();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
});

Thread.sleep(10000);

blog所涉及的代码参见


参考资料