NIO学习(1)

1、NIO(New IO)非阻塞式IO
2、IO和NIO区别:
IO:面向流(Stream oriented)
阻塞IO(Blocking IO)
NIO:面向缓冲区(Buffer Oriented)
非阻塞IO
*选择器(Selectors)
通道负责传输,buffer负责存储。
3、(1)、缓存区(Buffer):在java NIO中负责数据的存取,缓冲区就是数组,用户存储不同类型的数据。
根据数据类型不同(boolean)除外,提供了相应的缓冲区。
ByteBuffer
CharBuffer
shortBuffer
IntBuffer
LongBuffer
Floatbuffer
(2)缓冲区存取数据的两个核心方法:
put():存
get():取
(3)缓冲区四个核心属性:
capacity:容量,表示缓冲区中最大存储数据的容量,一旦声明不可改变。
limit:界限,表示缓冲区可以操作数据的大小。
position:位置,表示缓冲区正在操作的数据位置。
flip()切换到读取数据的模式
rewind()可重复读数据
clear()清空缓冲区
mark:标记,表示记录当前position的位置,可以通过
4、直接缓冲区和非直接缓冲区。
非直接缓冲区通过allocate()方法分配缓冲区,将缓冲区建立在jvm的内存中。
直接缓冲区:通过allocateDirect()方法分配直接缓冲区。
通道(Channel):用于源节点与目标节点的连接,在java NIO中负责缓冲区中数据的传输。通道
本身不存数据。
5、通道的主要实现类。
java nio channel.channel接口。
fileChannel本地书籍传输
socketChannel  网络数据传输
ServersocketChannel
datagramChannel
6、获取通道
java针对支持通道的类提供了getChannel()方法
本地IO
FileInputStream/FileOupputStream
RandomAccessFile
网络ID:
Socket
serverSocket
DatagramSocket
2、在jdk1.7中的NIO.2正对各个通道提供了静态方法open()
3、Files工具类的newByteChannel()也可以获取通道
7、通道之间数据传输。
transferFrom()
transferTo()
8、分散(Scatter)与聚集(Gather)
分散读取(Scattering Reads):将通道得数据分散到多个缓冲区中
聚集写入(Gathering Writes) :将多个缓冲区的数据聚集到通道中
9、NIO的非阻塞式网络通信。
主要式选择器:Selector,客户端所有的通道都会注册到选择器上。直到请求准备就绪才会分配到服务器上。
使用NIO完成网络通信的三个核心:
通道(Channel):负责连接
缓冲区(Buffer):负责数据存取
选择器(Selector):是SelectableChannel的多路复用器。用于监控SelectableChannel的IO状况。
public class TestNonBlockingNIO{
   //客户端
  @Test
  public void client(){
       //1、获取通道   open()
      //2、切换成非阻塞模式 
        channel.configureBlocking(false);
      
   } 
}
package com.lxkj.nio.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
//客户端的实现有3个步骤:1.请求连接。2.当连接成功,写数据。3.读取服务端结果。
public class NIOClient implements Runnable {
    private BlockingQueue<String> words;
    private Random random;
    public static void main(String[] args) {
        // 多个线程发起Socket客户端连接请求
        for (int i = 0; i < 5; i++) {
            NIOClient c = new NIOClient();
            c.init();
            new Thread(c).start();
        }
    }
    //1. 初始化要发送的数据
    private void init() {
        words = new ArrayBlockingQueue<String>(5);
        random = new Random();
        try {
            words.put("hi");
            words.put("who");
            words.put("what");
            words.put("where");
            words.put("bye");
        } catch (Exception e) {
            // TODO: handle exception
        }
    }
    //2. 启动子线程代码
    @Override
    public void run() {
        SocketChannel channel = null;
        Selector selector = null;
        try {
            //3. 创建连接服务端的通道 并设置为阻塞方法,这里需要指定服务端的ip和端口号
            channel = SocketChannel.open();
            channel.configureBlocking(false);//切换成非阻塞模式
            channel.connect(new InetSocketAddress("localhost", 8383));
            selector = Selector.open();
            //4. 请求关心连接事件
            channel.register(selector, SelectionKey.OP_CONNECT);
            boolean isOver = false;
            while (!isOver) {
                selector.select();
                //获取到所有的key
                Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                while (keys.hasNext()) {
                    SelectionKey key = keys.next();
                    keys.remove();
                    if (key.isConnectable()) { //5. 当通道连接准备完毕,发送请求并指定接收允许获取服务端返回信息
                        if (channel.isConnectionPending()) {
                            if (channel.finishConnect()) {
                                key.interestOps(SelectionKey.OP_READ);
                                channel.write(CharsetHelper.encode(CharBuffer.wrap(getWord())));
                                sleep();
                            } else {
                                key.cancel();
                            }
                        }
                    } else if (key.isReadable()) {//6. 开始读取服务端返回数据
                        ByteBuffer byteBuffer = ByteBuffer.allocate(512);
                        channel.read(byteBuffer);
                        byteBuffer.flip();
                        String answer = CharsetHelper.decode(byteBuffer).toString();
                        System.out.println("client get the answer:" + answer);
                        String word = getWord();
                        if (word != null) {
                            channel.write(CharsetHelper.encode(CharBuffer.wrap(getWord())));
                        } else {
                            isOver = true;
                        }
                        sleep();
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //7. 关闭通道
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public String getWord() {
        return words.poll();
    }
    private void sleep() {
        try {
            TimeUnit.SECONDS.sleep(random.nextInt(3));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
package com.lxkj.nio.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import com.lxkj.nio.util.CharsetHelper;
//服务端的事件有2个,一是接受连接事件,二是读取数据:
public class NIOServer {
 private ByteBuffer readBuffer;
 private Selector selector;//选择器(轮询器)
 private ServerSocket serverSocket;
 public void init(){//启动
  //1、创建临时缓冲区(非直接缓冲区)
  readBuffer=ByteBuffer.allocate(1024);
  //2、创建服务端socket非阻塞通道
  ServerSocketChannel serverSocketChannel;
  try{
   //打开服务器端通道
   serverSocketChannel=ServerSocketChannel.open();
   serverSocketChannel.configureBlocking(false);
   //3、指定内部socket绑定服务器端地址 并支持重用端口,因为有可能多个客户端同时访问同一端口
    serverSocket=serverSocketChannel.socket();
    serverSocket.setReuseAddress(true);
    serverSocket.bind(new InetSocketAddress(8383));
    //4、创建轮询器并绑定到管道上,开始监听客户端请求
    selector=Selector.open();
    //绑定到管道上,也就是注册
     serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  }catch(Exception e){
   e.printStackTrace();
  }
 }
 
  private void listener(){//监听时间
   while(true){
    try{
      //5、开始监听事件,不断取出key,假如存在事件,则直接处理
     selector.select();
              Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
              while (keys.hasNext()) {
                  SelectionKey key = keys.next();
                  keys.remove();
                  handleKey(key);
              }
    }catch(Exception e){
     e.printStackTrace();
    }
   }
   
  }
  
  private void handleKey(SelectionKey key) throws IOException {
      SocketChannel channel = null;
      try {
          //6. 如果有客户端要连接 这里则处理是否接受连接事件
          if (key.isAcceptable()) {
              ServerSocketChannel severChannel = (ServerSocketChannel) key.channel();
              channel = severChannel.accept();
              channel.configureBlocking(false);
              // 告诉轮询器 接下来关心的是读取客户端数据这件事
              channel.register(selector, SelectionKey.OP_READ);
          } else if (key.isReadable()) { //7. 如果客户端发送数据,则这里读取数据。
              channel = (SocketChannel) key.channel();
              // 清空缓冲区
              readBuffer.clear();
              // 当客户端关闭channel后,会不断收到read事件,此刻read方法返回-1 所以对应的服务器端也需要关闭channel
              int readCount = channel.read(readBuffer);
              if (readCount > 0) {
                  readBuffer.flip();
                  String question = CharsetHelper.decode(readBuffer).toString();
                  System.out.println("server get the question:" + question);
                  String answer = getAnswer(question);
                  channel.write(CharsetHelper.encode(CharBuffer.wrap(answer)));
              } else {
                  channel.close();
              }
          } 
      } catch (Exception e) {
          e.printStackTrace();
      }finally {
          //8. 断开连接通道
          if (channel!=null) {
              channel.close();
          }
      }
  }
  public static String getAnswer(String question) {
      String answer = null;
      switch (question) {
      case "who":
          answer = "我是小娜 ";
          break;
      case "what":
          answer = "我是来帮你解闷的 ";
          break;
      case "where":
          answer = "我来自外太空 ";
          break;
      case "hi":
          answer = "hello ";
          break;
      case "bye":
          answer = "88 ";
          break;
      default:
          answer = "请输入 who, 或者what, 或者where";
      }
      return answer;
  }
  public static void main(String[] args) {
      NIOServer server = new NIOServer();
      server.init();
      System.out.println("server started:8383");
      server.listener();
  }
}
原文地址:https://www.cnblogs.com/llaq/p/9446885.html