admin管理员组

文章数量:1122852

Java NIO Selector , SelectionKey , SocketChannel , ServerSocketChannel

一    NIO介绍

1. NIO是非阻塞的

    NIO非堵塞应用通常适用用在I/O读写等方面,我们知道,系统运行的性能瓶颈通常在I/O读写,包括对端口和文件的操作上,过去,在打开一个I/O通道后,read()将一直等待在端口一边读取字节内容,假如没有内容进来,read()也是傻傻的等,这会影响我们程序继续做其他事情,那么改进做法就是开设线程,让线程去等待,但是这样做也是相当耗费资源的。

2. 实现原理

    Java NIO非堵塞技术实际是采取Reactor模式,或者说是Observer模式为我们监察I/O端口,假如有内容进来,会自动通知我们,这样,我们就不必开启多个线程死等。

    Selector就是观察者,观察 Server 端的ServerSocketChannel  和 Client 端的 SocketChannel ;前提是它们需要先注册到 同一个Selector,即观察者中;

----详细说明

    NIO 有一个主要的类Selector,这个类似一个观察者,只要我们把需要探知的socketchannel告诉Selector,我们接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,我们读取这些Key,就会获得我们刚刚注册过的socketchannel,然后,我们从这个Channel中读取数据,放心,包准能够读到,接着我们可以处理这些数据。Selector内部原理实际是在做一个对所注册的Channel(SocketChannel)的轮询访问,不断的轮询(目前就这一个算法),一旦轮询到一个channel有所注册的事情发生,比如数据来了,它就会站起来报告,交出一把钥匙,让我们通过这把钥匙来读取这个channel的内容。

while(!stop) {try {selector.select(1000);  // 等待客户端发请求  1000msselector.selectedKeys().stream().forEach( key -> handleSelectionKey(key) );}catch(Exception e) {e.printStackTrace();}}

 

3. 小结

 

    从外界看,实现了流畅的I/O读写,不堵塞了。Java NIO出现不只是一个技术性能的提高,你会发现网络上到处在介绍它,因为它具有里程碑意义,从JDK1.4开始,Java开始提高性能相关的功能,从而使得Java在底层或者并行分布式计算等操作上已经可以和C或Perl等语言并驾齐驱。

 

下文部分转载自  

二    Selector , SelectionKey  , SocketChannel , ServerSocketChannel 具体应用和功能

     NIO的通讯过程

1.    Selector  (选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接

    仅用单个线程来处理多个Channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程处理所有的通道。

 

  1. java.nio.channels

  2. public abstract class Selector extends Object implements Closeable

 

1.1    Selector 的创建

通过调用Selector.open()方法创建一个Selector;Selector selector = Selector.open();

    isOpen() —— 判断Selector是否处于打开状态。Selector对象创建后就处于打开状态了。

    close() —— 当调用了Selector对象的close()方法,就进入关闭状态.。用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。通道本身并不会关闭

 

1.2    ServerChanel  向 Selector 中注册

为了将Channel和Selector配合使用,必须将channel注册到selector上。

通过SelectableChannel。register()方法来实现。

 
  1. channel.configureBlocking(false);

  2. SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

    与Selector一起使用时,Channel必须处于非阻塞模式下。这意味着FIleChannel与Selector不能一起使用。

    注意register()方法的第二个参数,这是一个”interest集合“,意思是在通过Selector监听Channel时对什么事件感兴趣。

可以监听四种不同类型的事件:

  • Connect
  • Accept
  • Read
  • Write

    通道触发了一个事件意思是该事件已经就绪。所以,某个channel成功连接到另一个服务器称为”连接就绪“。一个server socket channel准备号接收新进入的连接称为”接收就绪“。一个有数据可读的通道可以说是”读就绪“。等代写数据的通道可以说是”写就绪“。

    这四种事件用SelectionKey的四个常量来表示:

  • SelectionKey.OP_CONNECT
  • SelectionKey.OP_ACCEPT
  • SelectionKey.OP_READ
  • SelectionKey.OP_WRITE

 

 

2. register()返回值 —— SelectionKey,  Selector中的SelectionKey集合

    只要ServerSocketChannel及SocketChannel向Selector注册了特定的事件,Selector就会监控这些事件是否发生

    SelectableChannel的register()方法返回一个SelectionKey对象,该对象是用于跟踪这些被注册事件的句柄

一个Selector对象会包含3种类型的SelectionKey集合:

  • all-keys集合 —— 当前所有向Selector注册的SelectionKey的集合,Selector的keys()方法返回该集合
  • selected-keys集合 —— 相关事件已经被Selector捕获的SelectionKey的集合,Selector的selectedKeys()方法返回该集合
  • cancelled-keys集合 —— 已经被取消的SelectionKey的集合,Selector没有提供访问这种集合的方法
  •  

    当register()方法执行时,新建一个SelectioKey,并把它加入Selector的all-keys集合中。

----selectionKey手动关闭 remove() 或cancel()

如果关闭了与SelectionKey对象关联的Channel对象,或者调用了SelectionKey对象的cancel方法,这个SelectionKey对象就会被加入到cancelled-keys集合中,表示这个SelectionKey对象已经被取消。

    在执行Selector的select()方法时,如果与SelectionKey相关的事件发生了,这个SelectionKey就被加入到selected-keys集合中,程序直接调用selected-keys集合的remove()方法,或者调用它的iterator的remove()方法,都可以从selected-keys集合中删除一个SelectionKey对象。

 

3.    SelectionKey——SelectableChannel 在 Selector 中的注册的标记/句柄。

register()方法返回一个SelectinKey对象,这个对象包含一些你感兴趣的属性:

  • interest集合
  • ready集合
  • Channel
  • Selector
  • 附加的对象

通过调用某个SelectionKey的cancel()方法,关闭其通道,或者通过关闭其选择器来取消该Key之前,它一直保持有效。

取消某个Key之后不会立即从Selector中移除它,相反,会将该Key添加到Selector的已取消key set,以便在下一次进行选择操作的时候移除它。

  • interest集合 —— 感兴趣的事件集合,可以通过SelectionKey读写interest集合,
 
  1. int interestSet = selectionKey.interestOps();

  2. boolean isInterestedInAccept = (interestSet & Selection.OP_ACCEPT) == SelectionKey.OP_ACCEPT;

  3. boolean isInterestedInConnect = interestSet & SelectioKey.OP_CONNECT;

  4. boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;

  5. boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;

  • ready集合 —— 是通道已经准备就绪的操作的集合,在一个选择后,你会是首先访问这个ready set,
int readySet = selectionKey.readyOps();

可以向检测interet集合那样的方法,来检测channel中什么事件或操作已经就绪,也可以使用一下四个方法,

selectionKey.isAcceptable();selectionKey.isConnectable();selectionKey.isReadable();selectionKey.isWritable();

​​----Selector 内容

  • 从SelectionKey中获取Channel和Selector:
  1. Channel channel = selectionKey.channel();

  2. Selector selector = selectionKey.selector();

  • 附加的对象 —— 可以将一个对象或者更多的信息附着到SelectionKey上,这样就能方便的识别某个给定的通道。例如,可以附加与通道一起使用的Buffer,或是包含聚集数据的某个对象,
  1. selectionKey.attach(theObject);

  2. Object attachedObj = selectionKey.attachment();

 

4.    通过Selector选择就绪的通道

一旦向Selector注册了一个或多个通道,就可以调用几个重载的select()方法。

这些方法返回你所感兴趣的事件(连接,接受,读或写)已经准备就绪的那些通道。换句话说,如果你对”读就绪“的通道感兴趣,select()方法会返回读事件已经就绪的那些通道。

  • select() —— 阻塞到至少有一个通道在你注册的事件上就绪了
  • select(long timeout) —— 和select()一样,除了最长会阻塞timeout毫秒
  • selectNow() —— 不会阻塞,不管什么通道就绪都立刻返回;此方法执行非阻塞的选择操作,如果自从上一次选择操作后,没有通道变成可选择的,则此方法直接返回0
  • select()方法返回的Int值表示多少通道就绪。

一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用selector的selectorKeys()方法,访问”已选择键集“中的就绪通道

Set selectedKeys = selector.selectedKeys();

可以遍历这个已选择的集合来访问就绪的通道

 
  1. Set selectedKeys = selector.selectedKeys();
    Iterator keyIterator = selectedKeys.iterator();
    while(keyIterator.hasNext()){
    SelectionKey key = keyIterator.next();
    if (key.isAcceptable()){ // a connection was accepted by a ServerSocketChannel
    }else
    if (key.isConnectable()){ // a connection was eatablished with a remote server
    }else
    if (key.isReadable()){ // a channel is ready for reading
    }else
    if (key.isWritable()){ // a channel is ready for writing
    }
    keyIterator.remove();
    }

     

这个循环遍历已选择集中的每个键,并检测各个键所对象的通道的就绪事件。

注意每次迭代末尾的remove()调用,Selector不会自己从已选择集中移除SelectioKey实例,必须在处理完通道时自己移除。

 

5.    Selector的wakeUp()方法

    某个线程调用select()方法后阻塞了,即使没有通道已经就绪,也有办法让其从select()方法返回。只要让其他线程在第一个线程调用select()方法的那个对象上调用Selector.wakeup()方法即可。阻塞在select()方法上的线程会立马返回。

 

-------个人实现的一个 案例代码

 

import org.apache.commons.lang.StringUtils;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.logging.Logger;class MyTimeServer implements Runnable {private static Logger logger = Logger.getLogger("MyTimeServer");private int port;private volatile boolean stop;private Selector selector;private ServerSocketChannel servChannel;public MyTimeServer(int port) {this.port = port;}private void startTimeServer(){try{selector = Selector.open();servChannel = ServerSocketChannel.open();servChannel.configureBlocking(false);servChannel.socket().bind(new InetSocketAddress(this.port),1024);servChannel.register(selector, SelectionKey.OP_ACCEPT);this.stop = false;logger.info("TimeServer starts in port : " + port);}catch(IOException e){e.printStackTrace();System.exit(1);}}public void stopTimeServer() {this.stop = true;try{selector.close();}catch (IOException e){e.printStackTrace();}}public void run() {startTimeServer();while(!stop) {try {selector.selectedKeys().stream().forEach( selectKey -> handleSelectionKey(selectKey) );}catch(Exception e) {e.printStackTrace();}}//多路复用器关闭以后,所有注册在上面的Channel和Pipe等资源都会自动去注册并关闭,所以不需要重复释放资源if(selector != null) {stopTimeServer();}}private void handleSelectionKey(SelectionKey selectKey){try{handleRequest(selectKey);if(selectKey != null) {selectKey.cancel();if(selectKey.channel() != null) {selectKey.channel().close();  //处理client的key后 关闭该连接}}}catch(Exception e) {e.printStackTrace();}}private void handleRequest(SelectionKey selectKey) throws Exception {if(selectKey.isValid()) {if(selectKey.isAcceptable()) {ServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectKey.channel();SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector , SelectionKey.OP_READ);}if(selectKey.isReadable()) {SocketChannel socketChannel = (SocketChannel)selectKey.channel();ByteBuffer readBuffer = ByteBuffer.allocate(1024); //预分配1024int readBytes = socketChannel.read(readBuffer);if(readBytes > 0) {byte[] readbytes = new byte[readBytes];readBuffer.position(0); // 重置 byteBuffer 的位置 position 变量readBuffer.get(readbytes , 0 , readBytes);handleReadContext(socketChannel,new String(readbytes , "UTF-8"));} else {selectKey.cancel();socketChannel.close();}}}}private void handleReadContext(SocketChannel socketChannel , String requestBody) {if(StringUtils.isNotBlank(requestBody) && requestBody.equals("time")) {logger.info("The timeServer receive request :  " + requestBody);String response = "[success] msg : " + new Date(System.currentTimeMillis()).toString();try {reponse(socketChannel, response);} catch (Exception e) {e.printStackTrace();}}else {try {reponse(socketChannel, "[success] msg : nothing to say");} catch (Exception e) {e.printStackTrace();}}}private void reponse(SocketChannel socketChannel, String response) throws Exception {if(response != null && response.length() > 0) {byte[] reponesBytes = response.getBytes();ByteBuffer writeBuffer  = ByteBuffer.allocate(reponesBytes.length);writeBuffer.put(reponesBytes);writeBuffer.flip();socketChannel.write(writeBuffer);writeBuffer.clear();}}
}

 

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.logging.Logger;class MyTimeServer implements Runnable {private static Logger logger = Logger.getLogger("MyTimeServer");private int port;private volatile boolean stop;private Selector selector;private ServerSocketChannel servChannel;public MyTimeServer(int port) {this.port = port;}private void startTimeServer(){try{selector = Selector.open();servChannel = ServerSocketChannel.open();servChannel.configureBlocking(false);servChannel.socket().bind(new InetSocketAddress(this.port),1024);servChannel.register(selector, SelectionKey.OP_ACCEPT);this.stop = false;logger.info("TimeServer starts in port : " + port);}catch(IOException e){e.printStackTrace();System.exit(1);}}public void stopTimeServer() {this.stop = true;try{selector.close();}catch (IOException e){e.printStackTrace();}}public void run() {startTimeServer();while(!stop) {try {selector.select(1000);  // 等待客户端发请求  1000msselector.selectedKeys().stream().forEach( key -> handleSelectionKey(key) );}catch(Exception e) {e.printStackTrace();}}//多路复用器关闭以后,所有注册在上面的Channel和Pipe等资源都会自动去注册并关闭,所以不需要重复释放资源if(selector != null) {stopTimeServer();}}private void handleSelectionKey(SelectionKey key){try{handleRequest(key);if(key != null) {key.cancel();if(key.channel() != null) {key.channel().close();  //处理client的key后 关闭该连接}}}catch(Exception e) {e.printStackTrace();}}private void handleRequest(SelectionKey key) throws Exception {if(key.isValid()) {if(key.isAcceptable()) {ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector , SelectionKey.OP_READ);}if(key.isReadable()) {SocketChannel socketChannel = (SocketChannel)key.channel();ByteBuffer readBuffer = ByteBuffer.allocate(1024);int readBytes = socketChannel.read(readBuffer);if(readBytes > 0) {byte[] readbytes = new byte[readBytes];readBuffer.position(0); // 重置 byteBuffer 的位置 position 变量readBuffer.get(readbytes , 0 , readBytes);String body = new String(readbytes, "UTF-8");logger.info("The timeServer receive request :  " + body);String response = "[success] msg : " + new Date(System.currentTimeMillis()).toString();reponse(socketChannel, response);} else {key.cancel();socketChannel.close();}}}}private void reponse(SocketChannel socketChannel, String response) throws Exception {if(response != null && response.length() > 0) {byte[] reponesBytes = response.getBytes();ByteBuffer writeBuffer  = ByteBuffer.allocate(reponesBytes.length);writeBuffer.put(reponesBytes);writeBuffer.flip();socketChannel.write(writeBuffer);writeBuffer.clear();}}
}

 

本文标签: Java NIOSelectorSelectionKeySocketChannelServerSocketChannel