深入理解Java AIO(一)—— Java AIO的简单使用

深入理解Java AIO(一)—— Java AIO的简单使用

深入理解AIO系列分为三个部分

  • 第一部分也就是本节的Java AIO的简单使用
  • 第二部分是AIO源码解析(只解析关键部分)(待更新)
  • 第三部分是Linux中的AIO实现

Future和CompletionHandler 

Java 异步 IO 提供了两种使用方式,分别是返回 Future 实例和使用回调函数

Future 实例

  • future.isDone();

    判断操作是否已经完成,包括了正常完成、异常抛出、取消

  • future.cancel(true);

    取消操作,方式是中断。参数 true 说的是,即使这个任务正在执行,也会进行中断。

  • future.isCancelled();

    是否被取消,只有在任务正常结束之前被取消,这个方法才会返回 true

  • future.get();

    这是我们的老朋友,获取执行结果,阻塞。

  • future.get(10, TimeUnit.SECONDS);

    如果上面的 get() 方法的阻塞你不满意,那就设置个超时时间。

Futrue这个东西和在线程池里用的应该差不多,不过我暂时没看明白怎么用,也没查到什么资料,之后看一下线程池再过来更新这块。

CompletionHandler 回调函数

java.nio.channels.CompletionHandler 接口定义:

public interface CompletionHandler<V,A> {
// attachment是用来传递参数进去的 void completed(V result, A attachment); void failed(Throwable exc, A attachment); }

用法:

AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open().bind(null);

// accept 方法的第一个参数可以传递 attachment
listener.accept(attachment, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed( AsynchronousSocketChannel client, Object attachment) { // }
@Override
public void failed(Throwable exc, Object attachment) { // } });

简单实例

老样子只贴服务端部分的:

Server

public class Server {

    public static void main(String[] args) throws IOException {

          // 实例化,并监听端口
        AsynchronousServerSocketChannel server =
                AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(8080));

        // 自己定义一个 Attachment 类,用于传递一些信息
        Attachment att = new Attachment();
        att.setServer(server);

        server.accept(att, new CompletionHandler<AsynchronousSocketChannel, Attachment>() {
            @Override
            public void completed(AsynchronousSocketChannel client, Attachment att) {
                try {
                    SocketAddress clientAddr = client.getRemoteAddress();
                    System.out.println("收到新的连接:" + clientAddr);

                    // 收到新的连接后,server 应该重新调用 accept 方法等待新的连接进来
                    att.getServer().accept(att, this);

                    Attachment newAtt = new Attachment();
                    newAtt.setServer(server);
                    newAtt.setClient(client);
                    newAtt.setReadMode(true);
                    newAtt.setBuffer(ByteBuffer.allocate(2048));

                    // 这里也可以继续使用匿名实现类,不过代码不好看,所以这里专门定义一个类
                    client.read(newAtt.getBuffer(), newAtt, new ChannelHandler());
                } catch (IOException ex) {
                    ex.printStackTrace();
                }
            }

            @Override
            public void failed(Throwable t, Attachment att) {
                System.out.println("accept failed");
            }
        });
        // 为了防止 main 线程退出
        try {
            Thread.currentThread().join();
        } catch (InterruptedException e) {
        }
    }
}

ChannelHandler

public class ChannelHandler implements CompletionHandler<Integer, Attachment> {

    @Override
    public void completed(Integer result, Attachment att) {
        if (att.isReadMode()) {
            // 读取来自客户端的数据
            ByteBuffer buffer = att.getBuffer();
            buffer.flip();
            byte bytes[] = new byte[buffer.limit()];
            buffer.get(bytes);
            String msg = new String(buffer.array()).toString().trim();
            System.out.println("收到来自客户端的数据: " + msg);

            // 响应客户端请求,返回数据
            buffer.clear();
            buffer.put("Response from server!".getBytes(Charset.forName("UTF-8")));
            att.setReadMode(false);
            buffer.flip();
            // 写数据到客户端也是异步
            att.getClient().write(buffer, att, this);
        } else {
            // 到这里,说明往客户端写数据也结束了,有以下两种选择:
            // 1. 继续等待客户端发送新的数据过来
//            att.setReadMode(true);
//            att.getBuffer().clear();
//            att.getClient().read(att.getBuffer(), att, this);
            // 2. 既然服务端已经返回数据给客户端,断开这次的连接
            try {
                att.getClient().close();
            } catch (IOException e) {
            }
        }
    }

    @Override
    public void failed(Throwable t, Attachment att) {
        System.out.println("连接断开");
    }
}

Attachment

public class Attachment {
    private AsynchronousServerSocketChannel server;
    private AsynchronousSocketChannel client;
    private boolean isReadMode;
    private ByteBuffer buffer;
    // getter & setter
}

因为网上资料较少,只找到了


参考资料:Java 非阻塞 IO 和异步 IO

原文地址:https://www.cnblogs.com/fatmanhappycode/p/12388683.html