java NIO

java NIO

Posted by julyerr on February 20, 2018

上图展示了java多路复用io的工作流程,多个SelectableChannel向Selector注册感兴趣的事件,Selector负责将感兴趣的事件交给对应的Channel进行处理,传输的数据可以从Channel读取到Buffer,也可以从Buffer读取到Channel。

channel

  • FileChannel 用于文件中读写数据
  • DatagramChannel 用于UDP网络中读写数据
  • SocketChannel 用于TCP网络中读写数据
  • ServerSocketChannel 监听TCP连接,对每一个新进来的连接都会创建一个SocketChannel

buffer

对应有ByteBuffer,CharBuffer,DoubleBuffer等,较为常用的是ByteBuffer

三属性

  • capacity buffer的固定大小
  • position 表示当前位置,写模式下,每写入一个数据都会向下移动一个位置;从写模式转为读模式,position被置0。
  • limit 限制量,顾名思义,写模式下等于capacity,读模式下表示最多可以读取到的位置。

提供的方法

  • flip() 将Buffer从写模式切换到读模式,将position置0,并将limit设置成之前position的值;
  • rewind() 将position置0,limit保持不变,可以重复读取buffer中的数据;
  • clear() position置0,limit设置成capacity的值,清空buffer,用于数据写入;
  • compact() 将未读取的数据copy到buffer起始位置,position设置到未读元素后面
  • limit设置capacity,腾出足够的空间用于数据写入。
  • mark()和reset()不常用,通常mark()用于标记一个position,下次通过reset()返回到这个position。

通用使用步骤

  • 写入数据到Buffer
  • 调用flip()方法,准备数据读取
  • 从Buffer中读取数据
  • 调用clear()或者compact()准备数据的写入

notes
数据从channel写入到buffer,同时也能从buffer写入到channel:

  • channel.write(buf)操作对应的是buffer.get()
  • channel.read(buf)操作对应的是buffer.put()

使用方法参见后文的demo

Selector

使用单通道可以用于处理多个Channels

  • 首先创建一个Selector
    Selector selector = Selector.open();
    
  • 然后向Selector注册通道
    channel.configureBlocking(false);
    SelectionKey key = channel.register(selector,
      SelectionKey.OP_READ);
    

    与Selector一起使用时,Channel必须处于非阻塞模式下(FileChannel不支持异步io);

  • 对感兴趣的事件进行注册

    • SelectionKey.OP_CONNECT 新建立的连接
    • SelectionKey.OP_ACCEPT 接受就绪(server socket channel)
    • SelectionKey.OP_READ 读取数据
    • SelectionKey.OP_WRITE 写入数据 感兴趣的事件可以位或操作
      SelectionKey.OP_READ | SelectionKey.OP_WRITE
      

SelectionKey

注册Selector之后返回一个SelectionKey对象,属性包含

  • interest集合
    int interestSet = selectionKey.interestOps();
    boolean isInterestedInAccept  = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT
    
  • ready集合
    selectionKey.isAcceptable();
    
  • Channel
    Channel  channel  = selectionKey.channel();
    
  • Selector
    Selector selector = selectionKey.selector();
    
  • 附加的对象(可选)
    selectionKey.attach(theObject);
    Object attachedObj = selectionKey.attachment();
    

调用select()
阻塞到至少有一个channel在注册的事件上就绪,功能类似的其他函数

  • int select(long timeout) 有时间限制
  • int selectNow() 立即返回 select()发生阻塞之后,其他线程可以调用selector.wakeup()从阻塞状态恢复。

以下是一个示例demo

Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
while(true) {
  int readyChannels = selector.select();
  if(readyChannels == 0) continue;
  Set selectedKeys = selector.selectedKeys();
  Iterator keyIterator = selectedKeys.iterator();
  while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
    //处理完通道需手动移除ready位,下次该通道变成就绪时,Selector会再次将其放入已选择键集中。
    keyIterator.remove();
  }
}

SocketChannel

创建SocketChannel的两种方式

  • 打开连接到某台服务器上的SocketChannel
    SocketChannel socketChannel = SocketChannel.open();
    socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80));
    
  • 新连接到达ServerSocketChannel
    SocketChannel socketChannel =
              serverSocketChannel.accept();
    

非阻塞模式
SocketChannel设置成非阻塞模式之后,connect()、read()和write()均可以在异步模式下进行

  • connect() 在异步模式下调用会直接返回,确定连接成功需要循环判断
    socketChannel.configureBlocking(false);
    socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80));
    while(! socketChannel.finishConnect() ){
      //wait, or do something else...
    }
    
  • write操作
    String newData = "New String to write to file..." + System.currentTimeMillis();
    ByteBuffer buf = ByteBuffer.allocate(48);
    buf.clear();
    buf.put(newData.getBytes());
    //准备数据读取
    buf.flip();
    //channel的写入对应的是buf的读取,循环判断所有数据是否写入完成
    while(buf.hasRemaining()) {
      channel.write(buf);
    }
    
  • read操作
    ByteBuffer buf = ByteBuffer.allocate(48);
    //返回读取数据的数量
    int bytesRead = socketChannel.read(buf);
    

ServerSocketChannel

监听新进来的TCP连接,使用方式和SocketChannel类似,需要bind端口等待新的连接

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);

while(true){
    SocketChannel socketChannel =
            serverSocketChannel.accept();
    //设置好非阻塞模式之后,调用accept()会直接返回
    if(socketChannel != null){
        //do something with socketChannel...
    }
}

io 和 nio 使用场景

  • IO是面向流的阻塞IO,NIO是面向缓冲的非阻塞IO;
  • io通常用于一个线程处理一个连接的情况,每次传输的数据量相对较大; nio则通常用于单线程处理多连接的情况,每次传输的数据量不是很大。

一个完整的nio的demo


参考资料