当前位置:网站首页>NIO多路复用之Selector的使用
NIO多路复用之Selector的使用
2022-06-24 19:34:00 【Nice2cu_Code】
Selector的使用
文章目录
一、阻塞 & 非阻塞
通过多个客户端向服务器发送数据的例子理解阻塞和非阻塞。
1. 阻塞
服务器端:
// 1. 定义ByteBuffer,存放客户端发来的数据
ByteBuffer buffer = ByteBuffer.allocate(16);
// 2. 创建服务器channel
ServerSocketChannel ssc = ServerSocketChannel.open();
// 3. 服务器绑定监听端口
ssc.bind(new InetSocketAddress(8080));
// 4. 客户端channel的连接集合
List<SocketChannel> channels = new ArrayList<>();
//服务器不停的接收客户端的请求
while (true) {
// 5. 服务器调用accept建立与客户端的连接
// 如果客户端发起了连接请求,且服务器端调用了accept方法,则双方会通过三次握手建立连接
// SocketChannel用来与客户端之间通信
SocketChannel sc = ssc.accept();
// 阻塞方法,如果没有客户端发来请求,线程停止运行,客户端连接建立之后,继续运行
channels.add(sc); //将客户端channel添加到channel集合中
for (SocketChannel channel : channels) {
// 6. 接收客户端发送的数据
//读取的数据存放在bytebuffer中
channel.read(buffer);
// 阻塞方法,如果客户端仅连接,但是没有发送数据,则阻塞,客户端发送数据之后,线程继续运行
}
}
客户端:
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080)); //建立连接
sc.write(StandardCharsets.UTF_8.encode("你好");) //发送数据
如果某个客户端成功向服务器发送了信息,这个客户端再次发送数据会失败,因为服务器会等待客户端连接,连接之后的客户端再次发送数据并没有建立新连接,所以发送失败。也就是说,如果想要接收新的数据,必须建立新的连接。
缺点:
单线程模式下,一个方法的执行会影响另外一个方法,比如上述,一个服务器线程处理多个客户端连接,等待连接时无法读取另一个客户端的数据,读取数据时无法连接另一个客户端,一个客户端处理完毕后才可以处理另一个客户端。
2. 非阻塞
服务器端:
所有的 channel 都要设置为非阻塞模式。
ByteBuffer buffer = ByteBuffer.allocate(16);
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 切换成非阻塞模式,默认为阻塞模式
ssc.bind(new InetSocketAddress(8080));
List<SocketChannel> channels = new ArrayList<>();
//持续等待客户端的连接
while (true) {
SocketChannel sc = ssc.accept();
// 非阻塞,线程还会继续运行,如果没有连接建立,则sc是null(如果没有连接,会不断的生成null)
//客户端有连接
if (sc != null) {
sc.configureBlocking(false); // 设置为非阻塞模式
channels.add(sc);
}
for (SocketChannel channel : channels) {
int read = channel.read(buffer);
// 非阻塞,如果没有读到数据,线程仍然会继续运行,read值为0
}
}
即使连接成功一个客户端,服务器还是会不停的尝试连接(生成null),连接好的客户端可以执行读取数据的操作,也就是说,读取和连接两个操作互不影响。
缺点:
即使没有连接建立和可读数据,线程仍然在不断运行,白白浪费了 cpu。
二、selector 介绍及常用API
1. 多路复用

一个线程配合 selector 就可以监控多个 channel 的事件,事件发生时线程才去处理,事件没有发生则线程处于阻塞状态(多路复用),避免非阻塞模式下CPU空转的问题。
好处:
- 让这个线程能够被充分利用
- 节约了线程的数量
- 减少了线程上下文切换的次数
2. 常用API
创建selector
//调用静态方法
Selector selector = Selector.open();
绑定channel事件
建立 selector 与 channel 之间的联系,也称之为注册事件,只有绑定之后的事件 selector 才会关心 。
channel.configureBlocking(false); //设置channel为非阻塞模式
//将channel注册到selector上,并且关注某一事件类型
SelectionKey key = channel.register(selector, 绑定事件类型);
//返回值表示事件发生后,通过SelectionKey可以得知是什么事件,并且得知是哪个channel发生的事件
- channel 必须工作在非阻塞模式
- FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用,一般用于网络编程,而不是文件编程
- 绑定的事件类型可以有
- connect - 客户端连接成功时触发
- accept - 客户端发起连接请求时触发
- read - 客户端数据可读入时触发,存在因为接收能力弱,数据暂不能读入的情况
- write - 数据可写出至客户端时触发,存在因为发送能力弱,数据暂不能写出的情况
监听channel事件
可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少 channel 发生了事件:
方法1,阻塞直到绑定事件发生
int count = selector.select();
//没有事件发生,就会让线程阻塞,事件发生了才会让线程继续向下运行
//解决了cpu空转的问题
方法2,阻塞直到绑定事件发生,或是超时(时间单位为 ms)
int count = selector.select(long timeout);
方法3,不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件发生
int count = selector.selectNow();
三、处理 accept 事件
客户端代码(对以下各种事件都通用):
public class Client {
public static void main(String[] args) {
try (Socket socket = new Socket("localhost", 8080)) {
//客户端发送的数据
socket.getOutputStream().write("world".getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}
}
服务器端代码:
@Slf4j
public class ChannelDemo6 {
public static void main(String[] args) {
try (ServerSocketChannel channel = ServerSocketChannel.open()) {
channel.bind(new InetSocketAddress(8080));
//创建Selector
Selector selector = Selector.open();
channel.configureBlocking(false);
//注册事件
SelectionKey sscKey = channel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
//阻塞直到事件发生
int count = selector.select();
// 获取所有可用的事件,set集合
Set<SelectionKey> keys = selector.selectedKeys();
// 遍历所有事件,逐一处理(使用迭代器)
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
//得到某一个SelectionKey
SelectionKey key = iter.next();
// 判断事件类型
if (key.isAcceptable()) {
//获取发生这个事件的channel(强转,默认的返回类型为SelectableChannel)
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 与客户端建立连接
SocketChannel sc = c.accept();
// 获取事件以后必须处理,如果没有处理,selector认为有事件没有处理,就不会阻塞,cpu持续被占用
//如果不想处理这个事件,可以调用key.cancel方法,取消事件后,会自动的将事件移除,selector依然会阻塞
// 处理完毕,必须将事件移除
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
四、处理 read 事件
服务器端代码:
@Slf4j
public class ChannelDemo6 {
public static void main(String[] args) {
try (ServerSocketChannel channel = ServerSocketChannel.open()) {
channel.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey sscKey = channel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int count = selector.select();
// 获取所有事件
Set<SelectionKey> keys = selector.selectedKeys();
// 遍历所有事件,逐一处理
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 判断事件类型
//accept事件
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 必须处理
SocketChannel sc = c.accept();
sc.configureBlocking(false); //连接建立之后,必须设置为非阻塞模式
//将所要管理的channel注册到selector上
SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ);
//可读事件
} else if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
//读取channel中的数据,写入到buffer中
ByteBuffer buffer = ByteBuffer.allocate(128);
int read = sc.read(buffer);
//如果客户端正常断开,或者读取完毕,将key删除
if(read == -1) {
key.cancel();
} else {
buffer.flip();
//读取buffer中的数据
}
}
// 处理完毕,必须将事件移除
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
1. 为什么事件必须删除
因为 selector 在事件发生后,就会将相关的 key 放入 selectedKeys
集合,但不会在处理完后从 selectedKeys
集合中移除,需要自己编码删除。比如上述代码:
- 第一次触发了 accept 事件,假如没有移除事件,还存在于
selectedKeys
集合 - 第二次触发了 read 事件,但这时
selectedKeys
中还有上次的 accept 事件 ,在遍历时,首次遍历到的元素依然是上次的 accept 事件,进入 if 分支,但此时已经建立了连接,所以 accept 方法返回的是 null,继续向下执行会出现空指针异常
2. 处理客户端断开问题
2.1 客户端强制断开
客户端强制断开后,如果服务器正在从中读取数据,服务器会抛出 IOException
,如果不处理异常,会导致服务器停止运行,此时应该捕获异常,处理异常时将此 key 删除:

2.2 客户端正常断开
连接通道调用 close()
方法,会让客户端正常断开,调用 read()
读取数据时,返回值为-1,所以应该判断,如果值为 -1,将 key 删除。
3. 处理消息边界问题
如果给buffer申请的空间较小,比如4个字节,客户端发送 “中国” 两个汉字,共占用6个字节(utf-8),需要读取两次才可将消息读入,第一次读取4个字节,获取的是 “中” 以及 “国” 的前三分之一,第二次获取 “国” 的后三分之二,“国” 字会被截断(黏包、半包问题),出现乱码,如下:

三种情况:

- 时刻1表示消息的长度大于buffer的大小,buffer需扩容
- 时刻2对应半包现象
- 时刻3对应黏包现象
三种处理方式:
- 一种思路是客户端和服务器约定消息长度,所有数据包大小一样,服务器按预定长度读取,客户端的消息也按照预定长度发送,缺点是浪费带宽(不够的长度会填充至约定的长度)。
- 另一种思路是按分隔符拆分,服务器根据分隔符来确定一条完整的消息,缺点是效率低,在服务器端需要一个字节一个字节对比。
- 第三种思路是数据使用 LTV 格式,即 Length 长度、Type 类型、Value 数据。发送的数据分成三部分,第一部分表示此数据的长度,第二部分是数据的类型,第三部分是实际的数据,服务器根据第一部分在长度已知的情况下,就可以很方便的确定消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量。
- Http 1.1 是 TLV 格式
- Http 2.0 是 LTV 格式
第二种思路的使用:
假设服务器设置的buffer大小为16个字节,客户端发送的数据为20个字节,则第一次读事件只能传输16个字节,所以需要对buffer进行扩容,扩容之后,第二次读事件把剩余的消息读入,生成一个完整的消息,如下图:

服务器端代码:
private static void split(ByteBuffer source) {
//切换到读模式
source.flip();
for (int i = 0; i < source.limit(); i++) {
// 找到一条完整消息
if (source.get(i) == '\n') {
int length = i + 1 - source.position();
// 把这条完整消息存入新的ByteBuffer
ByteBuffer target = ByteBuffer.allocate(length);
// 从source读,向target写
for (int j = 0; j < length; j++) {
target.put(source.get());
}
}
}
//只有找到分隔符,才会读取,如果没有找到分割符,则不会读取,也就是压缩之后,position位于limit处,都是16
source.compact();
}
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
//关注accept事件
SelectionKey sscKey = ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while (true) {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, read
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 如果是accept事件
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16);
//将channel数据通道注册到selector上
//第三个参数表示将一个 byteBuffer 作为附件关联到 selectionKey 上
//表示这个buffer只能由这个channel使用,此buffer与SelectionKey的生命周期一致
SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ, buffer);
// 如果是read事件
} else if (key.isReadable()) {
try {
// 拿到触发事件的channel
SocketChannel channel = (SocketChannel) key.channel();
// 获取 selectionKey 上关联的附件(buffer)
ByteBuffer buffer = (ByteBuffer) key.attachment();
int read = channel.read(buffer); // 如果是正常断开,read方法返回值是 -1
if(read == -1) {
key.cancel();
} else {
split(buffer);
// 如果position和limit相等,则表示不是一条完整的消息,需要扩容
if (buffer.position() == buffer.limit()) {
//设定扩容机制为2倍
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
//将旧buffer内容写入到新扩容后的buffer中
buffer.flip();
newBuffer.put(buffer);
//将新的buffer作为selectionKey的附加条件,替换旧的buffer
key.attach(newBuffer);
}
}
} catch (IOException e) {
e.printStackTrace();
key.cancel();
}
}
//移除事件
iter.remove();
}
}
}
五、selector 何时不阻塞
事件发生时
- 客户端发起连接请求时,会触发 accept 事件
- 客户端发送数据、客户端正常 / 异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次 read 事件
- channel 可写时,会触发 write 事件
- 在 Linux 下发生
nio bug
时
调用
selector.wakeup()
调用
selector.close()
selector 所在线程 interrupt
六、使用多线程优化
上述过程只有一个 selector,没有充分利用多核 cpu,现利用多线程进行优化。
分两组 selector:
- 单线程配一个 selector,专门处理 accept 事件(起名为boss)
- 创建 CPU 核心数的线程,每个线程配一个 selector,轮流处理 read/write 事件(起名为worker)
服务器端代码:
/** * 流程: * 1. 客户端发起连接,触发accept事件,在accept事件中给worker绑定read事件 * 2. 需要保证绑定read事件在worker调用select方法之前,阻塞之后必须得发生一个别的事件才能绑定,耗费时间 */
public class MultiThreadServer {
//worker用来处理读写事件
static class Worker implements Runnable{
private Thread thread; // 不同的worker有不同的线程
private Selector selector; // 不同的worker有不同的selector
private String name; // 不同的worker有不同的名字
private volatile boolean start = false; // worker对应的thread和selector是否已经初始化
//队列用来保证绑定read事件在worker调用select方法之前
//并发队列用来在多个线程之间传递数据
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
// 构造器,定义worker的名字
public Worker(String name) {
this.name = name;
}
// 初始化Thread和selector
public void register(SocketChannel sc) throws IOException {
//如果已经初始化过了,就不需要再初始化
//要保证worker对应一个thread、一个selector,不能每次调用register方法都创建新的
if(!start) {
selector = Selector.open();
thread = new Thread(this, name); // 一个线程对应一个worker
thread.start();
start = true;
}
//将绑定操作添加到队列中,此时还没有执行
queue.add(() -> {
try {
sc.register(selector, SelectionKey.OP_READ, null); //绑定read事件
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
selector.wakeup();
//main线程唤醒selector,防止worker线程中selector阻塞导致read事件无法绑定
}
//worker的职责,监听读写事件
@Override
public void run() {
while(true) {
try {
selector.select();
//从队列中取出绑定操作并执行
Runnable task = queue.poll();
if (task != null) {
task.run(); //执行队列中的任务,即绑定操作
}
/** * 这样做的原因: * main线程调用worker的register方法绑定read事件, * 但是select方法的执行不在main线程中,而在worker自己的线程中, * 由于是两个线程,所以并不能保证先后顺序, * 所以利用队列,将绑定read事件和worker调用select方法放在一个线程中,保证先后顺序, */
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
channel.read(buffer);
}
iter.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
//main线程当作仅处理accept事件的线程
public static void main(String[] args) throws IOException {
//boss线程(main线程)专门用来监听accept事件
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
//绑定accept事件
SelectionKey bossKey = ssc.register(boss, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
//创建cpu核心数的worker并初始化
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker("worker-" + i);
}
//用来保证多个客户端平均的连接到worker上
AtomicInteger index = new AtomicInteger();
while(true) {
boss.select();
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
//在accept事件中给worker绑定read事件,调用worker的register方法绑定
//保证平均的访问到worker
workers[index.getAndIncrement() % workers.length].register(sc);
}
}
}
}
}
边栏推荐
猜你喜欢
leetcode-201_ 2021_ 10_ seventeen
虚拟人的产业发展现状
Description of software version selection of kt6368a Bluetooth dual-mode transparent chip
leetcode:515. 在每个树行中找最大值【无脑bfs】
Flutter 库冲突问题解决
leetcode:55. Jumping game [classic greed]
socket done
性能测试工具wrk安装使用详解
Development trend and path of SaaS industry in China
[notes of wuenda] fundamentals of machine learning
随机推荐
leetcode:55. Jumping game [classic greed]
CV2 package guide times could not find a version that satisfies the requirement CV2 (from versions: none)
[notes of Wu Enda] convolutional neural network
磁盤的結構
How to extract dates from web pages?
Practice of hierarchical management based on kubesphere
Xinlou: Huawei's seven-year building journey of sports health
Raspberry pie preliminary use
Embedded development: tips and tricks -- clean jump from boot loader to application code
socket(2)
解决dataframe报错ValueError: Cannot take a larger sample than population when ‘replace=False‘
Flutter-使用 typedef的注意事项
Junior college background, 2 years in Suning, 5 years in Ali. How can I get promoted quickly?
如何抓手机的包进行分析,Fiddler神器或许能帮到您!
Balanced binary search tree
leetcode_ 191_ 2021-10-15
[notes of Wu Enda] multivariable linear regression
DP problem set
DAO 中常见的投票治理方式
Zero code can apply data visualization to enterprise management