Java `NIO` 之选择通道 `SelectableChannel` 示例 -- ISS(Ideas Should Spread)

本文是笔者 Java 学习笔记之一,旨在总结个人学习 Java 过程中的心得体会,现将该笔记发表,希望能够帮助在这方面有疑惑的同行解决一点疑惑,我的目的也就达到了。欢迎分享和转载,转载请注明出处,谢谢合作。由于笔者水平有限,文中难免有所错误,希望读者朋友不吝赐教,发现错漏我也会及时更新修改,欢迎斧正。(可在文末评论区说明或索要联系方式进一步沟通。)
本文发表地址是 我的 CSDN 博客:使用 markdown 语法,如果在其它博客看到排版效果不好,请点击此处查看原文。

SelectableChannel

SelectableChannel 是 JDK 1.4 引入的 nio 包中另一个非常有用的抽象类,SelectableChannel 能够让程序员避免编写大量阻塞式的代码,特别是在高并发的交互式网络编程中,作用尤其明显。

SelectableChannel 相关的类

SelectableChannel

SelectableChannel 是实现了 Channel 接口的一个抽象类,在 JDK 源码中对 SelectableChannel 的注释中有以下解释:

/**
 * A channel that can be multiplexed via a {@link Selector}.
 *
 * <p> In order to be used with a selector, an instance of this class must
 * first be <i>registered</i> via the {@link #register(Selector,int,Object)
 * register} method.  This method returns a new {@link SelectionKey} object
 * that represents the channel's registration with the selector.
 *
 * <p> Once registered with a selector, a channel remains registered until it
 * is <i>deregistered</i>.  This involves deallocating whatever resources were
 * allocated to the channel by the selector.
 * ...

multiplexed 是多路复用的意思。也就是说多个 SelectableChannel 能够被集合起来,在某个情况下只选择其中的某一个或某几个来使用,那么选择的条件是什么,又是由谁来决定这些条件呢?这就是 Selector 要做的事。

Selector

/**
 * A multiplexor of {@link SelectableChannel} objects.
 *
 * <p> A selector may be created by invoking the {@link #open open} method of
 * this class, which will use the system's default {@link
 * java.nio.channels.spi.SelectorProvider selector provider} to
 * create a new selector.  A selector may also be created by invoking the
 * {@link java.nio.channels.spi.SelectorProvider#openSelector openSelector}
 * method of a custom selector provider.  A selector remains open until it is
 * closed via its {@link #close close} method.
 * ...

Selector 是一个多路复用器,也就是说一个 Selector 能够将多个 SelectableChannel 集中到一起管理,在某种情况下,它能够决定哪些 SelectableChannel 可以被选中使用。

SelectionKey

/**
 * A token representing the registration of a {@link SelectableChannel} with a
 * {@link Selector}.
 *
 * <p> A selection key is created each time a channel is registered with a
 * selector.  A key remains valid until it is <i>cancelled</i> by invoking its
 * {@link #cancel cancel} method, by closing its channel, or by closing its
 * selector.  Cancelling a key does not immediately remove it from its
 * selector; it is instead added to the selector's <a
 * href="Selector.html#ks"><i>cancelled-key set</i></a> for removal during the
 * next selection operation.  The validity of a key may be tested by invoking
 * its {@link #isValid isValid} method.
 * ...

SelectionKey 表示了一个 SelectableChannel 和一个 Selector 之间的注册的关系,如果一个 SelectableChannel 没有 “决定” 加入到 Selector 受该选择器的选择,那么该选择器无法决定该 SelectableChannel 何时被启用,这时该 SelectableChannel 与传统的用法没有什么差别。SelectionKey 可以当作是数据库中 SelectableChannel 表和 Selector 表的中间表,它一边连接 SelectableChannel 一边连接 Selector

例子

以下例子实现了一个简单的 echo 服务,也就是无论客户端发来什么,服务器都将原样返回客户端发来的内容。由于使用了 SelectableChannel ,因此该例子中服务器并没有为每一个客户端请求创建一个线程,读者朋友可以将以前编写 Java 网络程序的代码和以下代码进行比较。

SelectableChannel.java 文件

package ml.kezhenxu.demo.blog.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * Created by kezhenxu on 3/16/16.
 */
public class SelectableChannelDemo {

    private ByteBuffer buffer = ByteBuffer.allocate (4096);

    public static void main (String[] args) throws IOException, InterruptedException {
        new SelectableChannelDemo ().go ();
    }

    public void go () throws IOException {
        // 使用静态方法获得 "ServerSocketChannel" , 该类是 "SelectableChannel" 的子类
        final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open ();
        // 如果想要使用选择器, 就要配置成非阻塞模式
        serverSocketChannel.configureBlocking (false);
        // 将创建 ServerSocketChannel 时自动创建的 ServerSocket 取出
        // 用来设置监听端口
        final ServerSocket serverSocket = serverSocketChannel.socket ();
        serverSocket.bind (new InetSocketAddress (9000));

        // 使用静态方法获取一个选择器
        final Selector selector = Selector.open ();
        // 将 serverSocketChannel 注册到选择器上
        // 选择的条件是 OP_ACCEPT 能够进行 accept 操作的条件
        serverSocketChannel.register (selector, SelectionKey.OP_ACCEPT);

        while (true) {
            // 开始选择
            final int selectedNumber = selector.select ();
            // 如果没有选到合适的, 也就是说如果调用该 serverSocket 的 accept 方法,
            // 会阻塞, 那我们就先不调用
            if (selectedNumber == 0) {
                continue;
            }
            // 如果选择的数量不是 0, 表示选到了一些可以操作的了
            // 将选中的 SelectionKey 集合取出
            final Set<SelectionKey> selectedKeys = selector.selectedKeys ();
            // 进行迭代
            for (Iterator<SelectionKey> iterator = selectedKeys.iterator (); iterator.hasNext (); ) {
                final SelectionKey selectedKey = iterator.next ();
                // 判断是否是 acceptable 的, 如果是, 表示我们的服务器
                // 有连接进来了
                if (selectedKey.isAcceptable ()) {
                    // 因为 SelectionKey 关联了注册的 SelectableChannel 和 Selector,
                    // 因此可以获得关联的 Channel, 我们知道只有 ServeSocketChannel 才可能
                    // 有 OP_ACCEPT 操作, 因此可以安全的进行类型转换
                    final ServerSocketChannel channel       = (ServerSocketChannel) selectedKey.channel ();
                    // 以上都表明调用 accept 方法不会阻塞了(由 Selector 选择出来的)
                    // 那么就调用它
                    final SocketChannel       socketChannel = channel.accept ();
                    if (socketChannel != null) {
                        // 将连接进来的客户端 socket 也注册到选择器里面去, 不过操作是读取(OP_READ)
                        // 因为我们想在不阻塞的情况下马上读取客户端的请求, 因此也需要selector来帮助我们选择
                        socketChannel.configureBlocking (false);
                        socketChannel.register (selector, SelectionKey.OP_READ);
                    }
                }
                // 如果selector选择出来的是可以读的, 表明这个selectedKey关联的channel是
                // 上一次服务器接受的客户端连接, 被上一条语句
                // socketChannel.register (selector, SelectionKey.OP_READ);
                // 注册到selector中, 这时候由于不阻塞被选择出来, 因此可以安全的转换类型
                if (selectedKey.isReadable ()) {
                    final SocketChannel socketChannel = (SocketChannel) selectedKey.channel ();
                    // 使用 echo 方法将数据回显
                    echo (socketChannel);
                    socketChannel.close ();
                }
                iterator.remove ();
            }
        }
    }

    private void echo (final SocketChannel socketChannel) throws IOException {
        while (socketChannel.read (buffer) > 0) {
            buffer.flip ();
            socketChannel.write (buffer);
            buffer.clear ();
        }
    }
}

客户端测试代码文件 Client.java

package ml.kezhenxu.demo.blog.nio;

import java.io.*;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;

/**
 * Created by kezhenxu on 3/16/16.
 */
public class Client {

    public static void main (String[] args) {

        // 创建 20 个线程, 跟服务器发起连接
        for (int i = 0; i < 20; i++) {
            new Thread (() -> {
                try {
                    final Socket socket = new Socket ();
                    socket.connect (new InetSocketAddress (InetAddress.getLocalHost (), 9000));
                    final BufferedWriter writer = new BufferedWriter (new OutputStreamWriter (socket.getOutputStream ()));
                    writer.write ("Hello this is from thread " + Thread.currentThread ().getName ());
                    writer.flush ();
                    final BufferedReader reader = new BufferedReader (new InputStreamReader (socket.getInputStream ()));
                    for (String line = reader.readLine (); line != null; line = reader.readLine ()) {
                        System.out.println (Thread.currentThread ().getName () + " Server reply: " + line);
                    }
                    socket.close ();
                } catch (IOException e) {
                    e.printStackTrace ();
                }
            }).start ();
        }
    }
}

测试结果

先启动服务器程序,后启动客户端程序,可以看到以下结果(每次运行结果可能都不相同):

Thread-17 Server reply: Hello this is from thread Thread-17
Thread-5 Server reply: Hello this is from thread Thread-5
Thread-1 Server reply: Hello this is from thread Thread-1
Thread-11 Server reply: Hello this is from thread Thread-11
Thread-0 Server reply: Hello this is from thread Thread-0
Thread-7 Server reply: Hello this is from thread Thread-7
Thread-4 Server reply: Hello this is from thread Thread-4
Thread-12 Server reply: Hello this is from thread Thread-12
Thread-6 Server reply: Hello this is from thread Thread-6
Thread-14 Server reply: Hello this is from thread Thread-14
Thread-16 Server reply: Hello this is from thread Thread-16
Thread-2 Server reply: Hello this is from thread Thread-2
Thread-9 Server reply: Hello this is from thread Thread-9
Thread-19 Server reply: Hello this is from thread Thread-19
Thread-15 Server reply: Hello this is from thread Thread-15
Thread-3 Server reply: Hello this is from thread Thread-3
Thread-13 Server reply: Hello this is from thread Thread-13
Thread-18 Server reply: Hello this is from thread Thread-18
Thread-8 Server reply: Hello this is from thread Thread-8
Thread-10 Server reply: Hello this is from thread Thread-10
原文地址:https://www.cnblogs.com/keZhenxu94/p/5288474.html