小言_互联网的博客

Java NIO 注册事件的正确姿势 以及对attach()的理解

829人阅读  评论(0)

Selector与SelectionKey与Channel之间的关系

看完Selector的api文档,相信大家已经有了认识。

  • 当你注册channel给Selector的时候,会把一个key与channel关联起来。
  • key包含了每一个channel所感兴趣的事件,即图中的key就是一个SelectionKey实例。所以api文档里的key set包含了每一个channel关联的key。
  • 而api文档里的selected-key set则包含了上一次调用select后,有就绪事件的channel的key。所以selected-key set是key set的子集。
  • 而SelectionKey这个类有一个attachment的成员。

使用interestOps修改注册事件

服务端代码:

package NonBlocking;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class NonBlockingServer {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(8888));

        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);  //首先注册ACCEPT事件

        int result = 0; int i = 1;
        while(true) {  //遍历获得就绪事件
            result = selector.select();
            System.out.println(String.format("selector %dth loop, ready event number is %d", i++, result));
            if (result == 0) {
                continue;
            }
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while(iterator.hasNext()){  //就绪事件可能不止一个
                SelectionKey sk=iterator.next();

                if(sk.isAcceptable()){  //如果是ACCEPT,那么与之关联的channel肯定是个ServerSocketChannel
                    System.out.println("服务端有ACCEPT事件就绪");
                    ServerSocketChannel ss = (ServerSocketChannel)sk.channel();
                    SocketChannel socketChannel = ss.accept();
                    socketChannel.configureBlocking(false);  //也切换非阻塞
                    socketChannel.register(selector, SelectionKey.OP_READ);  //注册read事件
                }
                else if(sk.isReadable()){    //如果是READ,那么与之关联的channel肯定是个SocketChannel
                    System.out.println("服务端有READ事件就绪");
                    SocketChannel socketChannel = (SocketChannel)sk.channel();
                    ByteBuffer buf=ByteBuffer.allocate(1024);
                    int len=0;
                    StringBuilder sb = new StringBuilder();
                    while((len=socketChannel.read(buf))>0){
                        buf.flip();
                        String s  = new String(buf.array(),0,len);
                        sb.append(s);
                        buf.clear();
                    }
                    //服务端开始响应消息
                    ByteBuffer readAtta = (ByteBuffer)sk.attachment();
                    if (readAtta != null) {
                        System.out.println("lasttime readAtta string is: "+new String(readAtta.array()));
                    } else {
                        System.out.println("lasttime readAtta is null ");
                    }
                    sk.attach(ByteBuffer.wrap(sb.toString().getBytes()));
                    sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
                    String sendStr = "您的消息'"+sb.toString()+"'我已经收到了";
                    System.out.println("接下来attach的是:"+sendStr);
                    sk.attach(ByteBuffer.wrap(sendStr.getBytes()));
                }
                else if(sk.isWritable()){
                    System.out.println("服务端有WRITE事件就绪");
                    SocketChannel socketChannel = (SocketChannel)sk.channel();

                    ByteBuffer writeAtta = (ByteBuffer) sk.attachment();
                    if (writeAtta != null) {
                        System.out.println("lasttime writeAtta string is: "+new String(writeAtta.array()));
                    } else {
                        System.out.println("lasttime writeAtta is null ");
                    }

                    sk.interestOps(sk.interestOps() & ~SelectionKey.OP_WRITE);
                }
                iterator.remove();
                System.out.println("after remove key");
            }
        }
    }
}

客户端:

package NonBlocking;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;

public class NonBlockingClient {
    static SocketChannel socketChannel = null;
    static Selector selector = null;

    public static void main(String[] args) throws IOException {
        socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",8888));
        socketChannel.configureBlocking(false);
        selector = Selector.open();

        new Thread(new Runnable() {
            @Override
            public void run () {
                ByteBuffer buf=ByteBuffer.allocate(1024);
                Scanner scanner =new Scanner(System.in);
                while(scanner.hasNext()){
                    String inputStr=scanner.next();
                    buf.put(inputStr.getBytes());
                    buf.flip();
                    try {
                        socketChannel.write(buf);
                        while (buf.remaining() > 0) {
                            socketChannel.write(buf);
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    buf.clear();
                }
                scanner.close();
            }
        }).start();

        socketChannel.register(selector, SelectionKey.OP_READ);
        int result = 0; int i = 1;
        while((result = selector.select()) > 0) {
            System.out.println(String.format("selector %dth loop, ready event number is %d", i++, result));
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while(iterator.hasNext()){
                SelectionKey sk=iterator.next();

                if (sk.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel)sk.channel();
                    ByteBuffer buf=ByteBuffer.allocate(1024);
                    int len=0;
                    while((len=socketChannel.read(buf))>0){
                        buf.flip();
                        System.out.println(new String(buf.array(), 0 ,len));
                        buf.clear();
                    }
                }
                iterator.remove();
            }
        }
        socketChannel.close();
    }
}

使用方法:客户端在控制台每输入一句话,再去服务端看执行效果。

服务端效果:

selector 1th loop, ready event number is 1
服务端有ACCEPT事件就绪
after remove key
selector 2th loop, ready event number is 1
服务端有READ事件就绪
lasttime readAtta is null 
接下来attach的是:您的消息'你好'我已经收到了
after remove key
selector 3th loop, ready event number is 1
服务端有WRITE事件就绪
lasttime writeAtta string is: 您的消息'你好'我已经收到了
after remove key

selector 4th loop, ready event number is 1
服务端有READ事件就绪
lasttime readAtta string is: 您的消息'你好'我已经收到了
接下来attach的是:您的消息'我也好'我已经收到了
after remove key
selector 5th loop, ready event number is 1
服务端有WRITE事件就绪
lasttime writeAtta string is: 您的消息'我也好'我已经收到了
after remove key
  • 可以发现,上一次attach放置的对象,下一次attachment()后,一定会取出。

使用register修改注册事件

修改服务端两行代码:

//sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
socketChannel.register(selector, SelectionKey.OP_READ);  //改用这句

//sk.interestOps(sk.interestOps() & ~SelectionKey.OP_WRITE);
socketChannel.register(selector, SelectionKey.OP_READ);  //改用这句

服务端效果:

selector 1th loop, ready event number is 1
服务端有 ACCEPT 事件就绪
after remove key
selector 2th loop, ready event number is 1
服务端有 READ 事件就绪
lasttime readAtta is null 
接下来 attach 的是:您的消息'你好'我已经收到了
after remove key
selector 3th loop, ready event number is 1
服务端有 WRITE 事件就绪
lasttime writeAtta string is: 您的消息'你好'我已经收到了
after remove key


selector 4th loop, ready event number is 1
服务端有 READ 事件就绪
lasttime readAtta is null 
接下来 attach 的是:您的消息'他好吗'我已经收到了
after remove key
selector 5th loop, ready event number is 1
服务端有 WRITE 事件就绪
lasttime writeAtta string is: 您的消息'他好吗'我已经收到了
after remove key

之所以第4次循环里,lasttime readAtta is null,是因为register两个参数的版本会调用三个参数重载版本的register,这使得三个参数attachment为null。所以attachment被清空了。

之所以第5次循环里,lasttime writeAtta string is: 您的消息’他好吗’我已经收到了,是因为在socketChannel.register(selector, SelectionKey.OP_WRITE);之后,虽然attachment被清空了,但马上又执行了sk.attach(ByteBuffer.wrap(sendStr.getBytes())),所以附件是清空后马上又附上了。

总结

  • 如果想直接清空附件,那么使用这种socketChannel.register(selector, SelectionKey.OP_WRITE);即可。
  • 如果不想清空附件,那么使用这种sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);进行注册即可。使用sk.interestOps(sk.interestOps() & ~SelectionKey.OP_WRITE)进行反注册即可。

转载:https://blog.csdn.net/anlian523/article/details/105020210
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场