NIO 概述
Java 层面:new io OS 层面:non-blocking io
核心组成
- Channels
- FileChannel: 从文件中读写数据
- DatagramChannel: 能通过UDP读写网络中的数据
- SocketChannel: 能通过TCP读写网络中的数据
- ServerSocketChannel: 可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel
- Buffers
- ByteBuffer
- CharBuffer
- DoubleBuffer
- FloatBuffer
- IntBuffer
- LongBuffer
- ShortBuffer
- MappedByteBuffer: 内存映射文件
- Selectors
- 允许在单线程中处理多个 channel
- 要使用 selector 需要先向其注册 channel,然后调用其 select() 方法
- 其他组件均是与这三个核心组件搭配使用,比如
Pipe和FileLock
Channel 和 Buffer
基本上,所有的 IO 在NIO 中都从一个Channel 开始。Channel 有点象流。 数据可以从Channel读到Buffer中,也可以从Buffer 写到Channel中。
Channel
NIO 中的 channel 类似 stream,但又有些不同:
- 既可以从通道中读取数据,又可以写数据到通道。但流的读写通常是单向的
- 通道可以异步地读写
- 通道中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入
实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void run() throws IOException {
final String path = System.getProperty("user.home")
+ "/code/github/java/JavaCore/lib-nio/src/main/resources/nio-data.txt";
final RandomAccessFile raf = new RandomAccessFile(path, "rw");
final FileChannel channel = raf.getChannel();
// 分配缓冲区
final ByteBuffer buffer = ByteBuffer.allocate(48);
// 1. 不停地将数据读取到缓冲区中
while (channel.read(buffer) != -1) {
// 2. 反转缓冲区,开始读取数据
buffer.flip();
// 3. 缓冲区有数据,就不停地取出 buffer.get()
while (buffer.hasRemaining()) {
// 注意此处不能直接用 buffer.getChar()
System.out.print(((char) buffer.get()));
}
// 4. 清空 buffer,再读
buffer.clear();
}
raf.close();
}
channel 中三个重要的属性
- capacity
- 缓冲区容量,一旦缓冲区满了,需要清除数据才能重新写入
- position
- 读模式
- 从某个指定的位置读取,表示当前可读取数据的位置
- 当缓冲区切换到写模式时,position 会被重置为 0 - 写模式
- 表示当前可写入数据的位置,最大值为 capacity - 1
- limit
- 读模式
- 表示能从缓冲区读取多少数据;当切换Buffer到读模式时,limit 会被设置成写模式下的 position 值 - 写模式
- 表示能向缓冲区中写入多少数据,此时 limit = capacity
channel 中的一些方法
- 读取
- 从 channel 读取数据到 buffer:
channel.read(buffer) - 从 buffer 中读取数据:
buffer.get() - 从 buffer 中读取数据到 channel:
channel.write(buffer)
- 从 channel 读取数据到 buffer:
- 写入
- 向缓冲区写入数据
buffer.put("data")
- 向缓冲区写入数据
- flip()
- 将缓冲取从写模式切换到读模式
- rewind()
- 将position设回0,所以你可以重读Buffer中的所有数据。limit保持不变,仍然表示能从Buffer中读取多少个元素(byte、char等)
- clear() 与 compact()
- 一旦读完Buffer中的数据,需要让Buffer准备好再次被写入。可以通过clear()或compact()方法来完成
- clear:
- position将被设回0,limit被设置成 capacity的值,即:换句话说,Buffer 被清空了
- Buffer中的数据并未清除,只是这些标记告诉我们可以从哪里开始往Buffer里写数据 - compact:
- 将所有未读的数据拷贝到Buffer起始处,然后将position设到最后一个未读元素正后面
- limit属性依然像clear()方法一样,设置成capacity。现在Buffer准备好写数据了,但是不会覆盖未读的数据
- mark() 与 reset()
- mark: 可以标记Buffer中的一个特定position
- reset: 恢复到这个 position
channel 之间传递数据
两个通道中有一个是FileChannel,那你可以直接将数据从一个channel(译者注:channel中文常译作通道)传输到另外一个channel
- transferFrom() 方法
1 2 3 4 5 6 7
public void write() throws IOException { final FileChannel sourceChannel = new RandomAccessFile(source, "rw").getChannel(); final FileChannel sinkChannel = new RandomAccessFile(sink, "rw").getChannel(); sinkChannel.transferFrom(sourceChannel, 0, sourceChannel.size()); sourceChannel.close(); sinkChannel.close(); }
- transferTo() 方法
1 2 3 4 5 6 7
public void write2() throws IOException { final FileChannel sourceChannel = new RandomAccessFile(source, "rw").getChannel(); final FileChannel sinkChannel = new RandomAccessFile(sink, "rw").getChannel(); sourceChannel.transferTo(0, sourceChannel.size(), sinkChannel); sourceChannel.close(); sinkChannel.close(); }
Scatter & Gather
scatter/gather用于描述从Channel(译者注:Channel在中文经常翻译为通道)中读取或者写入到Channel的操作
scatter: 分散,将一个 channel 中的数据读取到多个 buffer 中
gather:聚集,将多个 buffer 中的数据写入到一个 channel 中
应用场景
- scatter / gather经常用于需要将传输的数据分开处理的场合
- 例如:传输一个由消息头和消息体组成的消息,你可能会将消息体和消息头分散到不同的buffer中,这样你可以方便的处理消息头和消息体
scatter 示例
1
2
3
4
5
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
// 读取时会先把 header 缓冲区填满然后才填充 body 缓冲区
// 使用于读取固定的数据
channel.read(new ByteBuffer[] {header, body});
gather 示例
1
2
3
4
5
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
// 只有 position 和 limit 之间的数据才会被写入
// 固定与非固定数据均适用
channel.write(new ByteBuffer[] {header, body});
Selector
Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。 这样一个单独的线程可以管理多个channel,从而管理多个网络连接。
为什么使用 Selector
- 仅用单个线程来处理多个Channels的好处是,只需要更少的线程来处理通道 事实上,可以只用一个线程处理所有的通道
- 对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源(如内存)因此,使用的线程越少越好
如何创建 Selector
Selector s = Selector.open();
向 Selector 注册 channel
1
2
3
// 关键位置,设置 nonblock,底层调用操作系统 fcntl(fd, F_SETFL, O_NONBLOCK) 设置为非阻塞
channel.configureBlocking(false);
SlectionKey key = channel.register(s, SelectionKey.OP_READ);
- 与Selector一起使用时,Channel必须处于非阻塞模式下。
- 不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式。而套接字通道都可以
- 有四种不同的事件可以监听
- connect -> SelectionKey.OP_CONNECT
- accept -> SelectionKey.OP_ACCEPT
- read -> SelectionKey.OP_READ
- write -> SelectionKey.OP_WRITE
- 通道触发了一个事件意思是该事件已经就绪。所以:
- 某个channel成功连接到另一个服务器称为“连接就绪”
- 一个server socket channel准备好接收新进入的连接称为“接收就绪”
- 一个有数据可读的通道可以说是“读就绪”。等待写数据的通道可以说是“写就绪”
- 注册多个事件:
channel.register(s, SelectionKey.OP_READ | SelectionKey.OP_WRITE)
SelectionKey
- interest set:
- 通过 & 操作可以确定该 set 中有哪些事件:
set & SelectionKey.OP_CONNECT == SelectionKey.OP_CONNECT
- 通过 & 操作可以确定该 set 中有哪些事件:
- ready set:
- 检测方式同 interest set
- key.isAcceptable() or key.isReadable() or key.isWritable() or key.isConnectable()
- channel:
channel = key.channel() - selector:
selector = key.selector() - optional: 附加对象
1 2
key.attach(obj); Object obj = key.attachment();
使用 select() 选择 channel
- select()
- select():一直阻塞到至少有一个通道在你注册的事件上就绪了
- select(long timeout):阻塞 (timeout 毫秒) 到至少有一个通道在你注册的事件上就绪了
- selectNow():不会阻塞,不管什么通道就绪都立刻返回
- select() 方法返回的int值表示有多少通道已经就绪
- selectedKeys()
- 一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用selector的selectedKeys()方法
- 访问“已选择键集(selected key set)”中的就绪通道
Set keys = s.selectedKeys()1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
Set selectedKeys = selector.selectedKeys(); Iterator keyIterator = selectedKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if(key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } keyIterator.remove(); }
- wakeUp()
- 某个线程调用select()方法后阻塞了,即使没有通道已经就绪,也有办法让其从select()方法返回
- 只要让其它线程在第一个线程调用select()方法的那个对象上调用Selector.wakeup()方法即可。阻塞在select()方法上的线程会立马返回
- 如果有其它线程调用了wakeup()方法,但当前没有线程阻塞在select()方法上,下个调用select()方法的线程会立即“醒来(wake up)”
- close()
- 用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。通道本身并不会关闭
完整示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Selector selector = Selector.open();
channel.configureBlocking(false);
// 本质上是向底层注册 POOLIN/POOLOUT/POOLCONN 等 epoll 事件
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
while(true) {
int readyChannels = selector.select();
if(readyChannels == 0) continue;
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
}