Posts Java NIO 一览
Post
Cancel

Java NIO 一览

NIO 概述

Java 层面:new io OS 层面:non-blocking io

核心组成

  • Channels
    • FileChannel: 从文件中读写数据
    • DatagramChannel: 能通过UDP读写网络中的数据
    • SocketChannel: 能通过TCP读写网络中的数据
    • ServerSocketChannel: 可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel
  • Buffers
    • ByteBuffer
    • CharBuffer
    • DoubleBuffer
    • FloatBuffer
    • IntBuffer
    • LongBuffer
    • ShortBuffer
    • MappedByteBuffer: 内存映射文件
  • Selectors
    • 允许在单线程中处理多个 channel
    • 要使用 selector 需要先向其注册 channel,然后调用其 select() 方法
  • 其他组件均是与这三个核心组件搭配使用,比如 PipeFileLock

Channel 和 Buffer

基本上,所有的 IO 在NIO 中都从一个Channel 开始。Channel 有点象流。 数据可以从Channel读到Buffer中,也可以从Buffer 写到Channel中。

Channel

NIO 中的 channel 类似 stream,但又有些不同:

  1. 既可以从通道中读取数据,又可以写数据到通道。但流的读写通常是单向的
  2. 通道可以异步地读写
  3. 通道中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void run() throws IOException {
   final String path = System.getProperty("user.home")
            + "/code/github/java/JavaCore/lib-nio/src/main/resources/nio-data.txt";
   final RandomAccessFile raf = new RandomAccessFile(path, "rw");
   final FileChannel channel = raf.getChannel();
   // 分配缓冲区
   final ByteBuffer buffer = ByteBuffer.allocate(48);
   // 1. 不停地将数据读取到缓冲区中
   while (channel.read(buffer) != -1) {
      // 2. 反转缓冲区,开始读取数据
      buffer.flip();
      // 3. 缓冲区有数据,就不停地取出 buffer.get()
      while (buffer.hasRemaining()) {
         // 注意此处不能直接用 buffer.getChar()
         System.out.print(((char) buffer.get()));
      }
      // 4. 清空 buffer,再读
      buffer.clear();
   }
   raf.close();
}

channel 中三个重要的属性

  1. capacity
    • 缓冲区容量,一旦缓冲区满了,需要清除数据才能重新写入
  2. position
    • 读模式
    • 从某个指定的位置读取,表示当前可读取数据的位置
    • 当缓冲区切换到写模式时,position 会被重置为 0 - 写模式
    • 表示当前可写入数据的位置,最大值为 capacity - 1
  3. limit
    • 读模式
    • 表示能从缓冲区读取多少数据;当切换Buffer到读模式时,limit 会被设置成写模式下的 position 值 - 写模式
    • 表示能向缓冲区中写入多少数据,此时 limit = capacity

channel 中的一些方法

  1. 读取
    • 从 channel 读取数据到 buffer:channel.read(buffer)
    • 从 buffer 中读取数据:buffer.get()
    • 从 buffer 中读取数据到 channel: channel.write(buffer)
  2. 写入
    • 向缓冲区写入数据buffer.put("data")
  3. flip()
    • 将缓冲取从写模式切换到读模式
  4. rewind()
    • 将position设回0,所以你可以重读Buffer中的所有数据。limit保持不变,仍然表示能从Buffer中读取多少个元素(byte、char等)
  5. clear() 与 compact()
    • 一旦读完Buffer中的数据,需要让Buffer准备好再次被写入。可以通过clear()或compact()方法来完成
    • clear:
    • position将被设回0,limit被设置成 capacity的值,即:换句话说,Buffer 被清空了
    • Buffer中的数据并未清除,只是这些标记告诉我们可以从哪里开始往Buffer里写数据 - compact:
    • 将所有未读的数据拷贝到Buffer起始处,然后将position设到最后一个未读元素正后面
    • limit属性依然像clear()方法一样,设置成capacity。现在Buffer准备好写数据了,但是不会覆盖未读的数据
  6. mark() 与 reset()
    • mark: 可以标记Buffer中的一个特定position
    • reset: 恢复到这个 position

channel 之间传递数据

两个通道中有一个是FileChannel,那你可以直接将数据从一个channel(译者注:channel中文常译作通道)传输到另外一个channel

  1. transferFrom() 方法
    1
    2
    3
    4
    5
    6
    7
    
    public void write() throws IOException {
     final FileChannel sourceChannel = new RandomAccessFile(source, "rw").getChannel();
     final FileChannel sinkChannel = new RandomAccessFile(sink, "rw").getChannel();
     sinkChannel.transferFrom(sourceChannel, 0, sourceChannel.size());
     sourceChannel.close();
     sinkChannel.close();
    }
    
  2. transferTo() 方法
    1
    2
    3
    4
    5
    6
    7
    
    public void write2() throws IOException {
     final FileChannel sourceChannel = new RandomAccessFile(source, "rw").getChannel();
     final FileChannel sinkChannel = new RandomAccessFile(sink, "rw").getChannel();
     sourceChannel.transferTo(0, sourceChannel.size(), sinkChannel);
     sourceChannel.close();
     sinkChannel.close();
    }
    

Scatter & Gather

scatter/gather用于描述从Channel(译者注:Channel在中文经常翻译为通道)中读取或者写入到Channel的操作

  1. scatter: 分散,将一个 channel 中的数据读取到多个 buffer 中

  2. gather:聚集,将多个 buffer 中的数据写入到一个 channel 中

应用场景

  • scatter / gather经常用于需要将传输的数据分开处理的场合
  • 例如:传输一个由消息头和消息体组成的消息,你可能会将消息体和消息头分散到不同的buffer中,这样你可以方便的处理消息头和消息体

scatter 示例

1
2
3
4
5
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body   = ByteBuffer.allocate(1024);
// 读取时会先把 header 缓冲区填满然后才填充 body 缓冲区
// 使用于读取固定的数据
channel.read(new ByteBuffer[] {header, body});

gather 示例

1
2
3
4
5
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body   = ByteBuffer.allocate(1024);
// 只有 position 和 limit 之间的数据才会被写入
// 固定与非固定数据均适用
channel.write(new ByteBuffer[] {header, body});

Selector

Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。 这样一个单独的线程可以管理多个channel,从而管理多个网络连接。

为什么使用 Selector

  • 仅用单个线程来处理多个Channels的好处是,只需要更少的线程来处理通道 事实上,可以只用一个线程处理所有的通道
  • 对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源(如内存)因此,使用的线程越少越好

如何创建 Selector

  • Selector s = Selector.open();

向 Selector 注册 channel

1
2
3
// 关键位置,设置 nonblock,底层调用操作系统 fcntl(fd, F_SETFL, O_NONBLOCK) 设置为非阻塞
channel.configureBlocking(false);
SlectionKey key = channel.register(s, SelectionKey.OP_READ);
  • 与Selector一起使用时,Channel必须处于非阻塞模式下。
  • 不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式。而套接字通道都可以
  • 有四种不同的事件可以监听
    • connect -> SelectionKey.OP_CONNECT
    • accept -> SelectionKey.OP_ACCEPT
    • read -> SelectionKey.OP_READ
    • write -> SelectionKey.OP_WRITE
  • 通道触发了一个事件意思是该事件已经就绪。所以:
    • 某个channel成功连接到另一个服务器称为“连接就绪”
    • 一个server socket channel准备好接收新进入的连接称为“接收就绪”
    • 一个有数据可读的通道可以说是“读就绪”。等待写数据的通道可以说是“写就绪”
  • 注册多个事件:channel.register(s, SelectionKey.OP_READ | SelectionKey.OP_WRITE)

SelectionKey

  • interest set:
    • 通过 & 操作可以确定该 set 中有哪些事件:set & SelectionKey.OP_CONNECT == SelectionKey.OP_CONNECT
  • ready set:
    • 检测方式同 interest set
    • key.isAcceptable() or key.isReadable() or key.isWritable() or key.isConnectable()
  • channel: channel = key.channel()
  • selector: selector = key.selector()
  • optional: 附加对象
    1
    2
    
    key.attach(obj);
    Object obj = key.attachment();
    

使用 select() 选择 channel

  1. select()
    • select():一直阻塞到至少有一个通道在你注册的事件上就绪了
    • select(long timeout):阻塞 (timeout 毫秒) 到至少有一个通道在你注册的事件上就绪了
    • selectNow():不会阻塞,不管什么通道就绪都立刻返回
    • select() 方法返回的int值表示有多少通道已经就绪
  2. selectedKeys()
    • 一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用selector的selectedKeys()方法
    • 访问“已选择键集(selected key set)”中的就绪通道 Set keys = s.selectedKeys()
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      
      Set selectedKeys = selector.selectedKeys();
      Iterator keyIterator = selectedKeys.iterator();
      while(keyIterator.hasNext()) {
       SelectionKey key = keyIterator.next();
       if(key.isAcceptable()) {
         // a connection was accepted by a ServerSocketChannel.
       } else if (key.isConnectable()) {
         // a connection was established with a remote server.
       } else if (key.isReadable()) {
         // a channel is ready for reading
       } else if (key.isWritable()) {
         // a channel is ready for writing
       }
       keyIterator.remove();
      }
      
  3. wakeUp()
    • 某个线程调用select()方法后阻塞了,即使没有通道已经就绪,也有办法让其从select()方法返回
    • 只要让其它线程在第一个线程调用select()方法的那个对象上调用Selector.wakeup()方法即可。阻塞在select()方法上的线程会立马返回
    • 如果有其它线程调用了wakeup()方法,但当前没有线程阻塞在select()方法上,下个调用select()方法的线程会立即“醒来(wake up)”
  4. close()
    • 用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。通道本身并不会关闭

完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Selector selector = Selector.open();
channel.configureBlocking(false);
// 本质上是向底层注册 POOLIN/POOLOUT/POOLCONN 等 epoll 事件
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
while(true) {
  int readyChannels = selector.select();
  if(readyChannels == 0) continue;
  Set selectedKeys = selector.selectedKeys();
  Iterator keyIterator = selectedKeys.iterator();
  while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
    keyIterator.remove();
  }
}
This post is licensed under CC BY 4.0 by the author.