NIO使用Reactor模式遇到的问题

关于Reactor模式,不再多做介绍,推荐Doug Lea大神的教程:Java 可扩展的IO

本来在Reactor的构造方法中完成一系列操作是没有问题的:

public class Reactor implements Runnable {

    private final Selector selector;

    public Reactor() throws IOException {
        selector = Selector.open();
        String host = "127.0.0.1";
        int port = 12345;
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.socket().bind(new InetSocketAddress(host, port));
        serverChannel.configureBlocking(false);
        serverChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(serverChannel, selector));
    }

    @Override
    public void run() {
        while (!Thread.interrupted()) {
            try {
                selector.select();
                Set<SelectionKey> selected = selector.selectedKeys();
                Iterator<SelectionKey> it = selected.iterator();
                while (it.hasNext()) {
                    dispatch(it.next());
                }
                selected.clear();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void dispatch(SelectionKey next) {
        Runnable r = (Runnable) next.attachment();
        if (r != null) {
            r.run();
        }
    }

    public void registerChannel(SelectableChannel channel, int ops) throws IOException {
        if (channel instanceof ServerSocketChannel) {
            ServerSocketChannel socketChannel = (ServerSocketChannel) channel;
            channel.register(selector, ops, new Acceptor(socketChannel, selector));
        }
    }
}

然而有些参数需要在外层操作,我想这样弄:

public Reactor() throws IOException {
    selector = Selector.open();
}

在主线程中启动server [reactorManager.start()来启动Reactor线程]

private void startReactorManager() throws IOException {
    reactorManager = new ReactorManager();
    reactorManager.start();
}

private void startNIOServer(ServerConfig serverConfig) throws IOException {
    String host = serverConfig.getHost();
    int port = serverConfig.getTcpPort();
    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    serverChannel.socket().bind(new InetSocketAddress(host, port));
    serverChannel.configureBlocking(false);
    reactorManager.registerChannel(serverChannel, SelectionKey.OP_ACCEPT);
}

也就是先启动一个线程来Selector.open();然后在主线程中注册通道和事件。

结果一直无法监听到客户端的连接,跟踪才发现服务端的注册方法阻塞了,原因是锁的问题,具体还不清楚。

这里留下一个疑问

想一想,既然在构造函数中可以注册,放到main线程中却不行,那么是否我们可以在注册时检查,如果this是当前Reactor线程,就直接注册,这跟在构造函数中没有区别。

如果不是,就放到队列中,当然你这时放了,selector.select()一直阻塞着,你也无法取出来注册,那么我们可以利用selector.wakeup()唤醒它。

新的方案如下:

public class Reactor extends Thread {

    private final Selector selector;
    private LinkedBlockingQueue<Object[]> register = new LinkedBlockingQueue<>() ;//channel、ops、attach
    private final AtomicBoolean wakeup = new AtomicBoolean() ;

    public Reactor() throws IOException {
        selector = Selector.open();
    }

    @Override
    public void run() {
        while (!Thread.interrupted()) {
            try {
                wakeup.set(false);
                processRegister();
                selector.select();
                Set<SelectionKey> selected = selector.selectedKeys();
                Iterator<SelectionKey> it = selected.iterator();
                while (it.hasNext()) {
                    dispatch(it.next());
                }
                selected.clear();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void processRegister() {
        Object[] object;
        while ((object = this.register.poll()) != null) {
            try {
                SelectableChannel channel = (SelectableChannel) object[0];
                if (!channel.isOpen())
                    continue;
                int ops = ((Integer) object[1]).intValue();
                Object attachment = object[2];
                channel.register(this.selector, ops, attachment);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void dispatch(SelectionKey next) {
        Runnable r = (Runnable) next.attachment();
        if (r != null) {
            r.run();
        }
    }

    public void registerChannel(SelectableChannel channel, int ops) throws IOException {
        ServerSocketChannel serverChannel = null;
        if (channel instanceof ServerSocketChannel) {
            serverChannel = (ServerSocketChannel) channel;
        }
        Object attachment = new Acceptor(serverChannel, selector);
        if (this == Thread.currentThread()) {
            serverChannel.register(selector, ops, attachment);
        } else {
            this.register.offer(new Object[]{ channel, ops, attachment });
            if (wakeup.compareAndSet(false, true)) {
                this.selector.wakeup();
            }
        }
    }
}

这样就解决了问题,在此记录下!

================================== 赵客缦胡缨,吴钩霜雪明。 银鞍照白马,飒沓如流星。 ==================================
原文地址:https://www.cnblogs.com/lucare/p/9312656.html