java NIO编程(转)

一、概念

在传统的java网络编程中,都是在服务端创建一个ServerSocket,然后为每一个客户端单独创建一个线程Thread分别处理各自的请求,由于对于CPU而言,线程的开销是很大的,无限创建线程会让操作系统崩溃,因此,比较好的方法是在系统启动的时候创建一个动态的线程池,例如鼎鼎大名的服务器Tomcat,就是采用这种解决方案,然而,这种解决方案在高并发的情况下,情况就不太乐观了,当线程池大小超过CPU瓶颈的时候,相应速度,就极其低下了。

传统的java网络编程的结构图如下

NIO单线程编写高性能、高并发服务器 - Seans Blog - Seans Blog
 
 
 
       在JDK1.4后,java引入的NIO的概念,即非阻塞的IO,服务端无需创建多个线程,仅仅只需要1个线程(将读写分别创建线程有利于提高性能)即可以处理全部客户端,解决了在性能和并发的2大问题。

       NIO采用了通道Channel和选择器Selector的核心对象,Select 机制,不用为每一个客户端连接新启线程处理,而是将其注册到特定的Selector 对象上,这就可以在单线程中利用Selector 对象管理大量并发的网络连接,更好的利用了系统资源;采用非阻塞I/O 的通信方式,不要求阻塞等待I/O 操作完成即可返回,从而减少了管理I/O 连接导致的系统开销,大幅度提高了系统性能。

当有读或写等任何注册的事件发生时,可以从Selector 中获得相应的SelectionKey , 从SelectionKey 中可以找到发生的事件和该事件所发生的具体的SelectableChannel,以获得客户端发送过来的数据。由于在非阻塞网络I/O 中采用了事件触发机制,处理程序可以得到系统的主动通知,从而可以实现底层网络I/O 无阻塞、流畅地读写,而不像在原来的阻塞模式下处理程序需要不断循环等待。使用NIO,可以编写出性能更好、更易扩展的并发型服务器程序。

     NIO的结构如下

NIO单线程编写高性能、高并发服务器 - Seans Blog - Seans Blog

 由此可见,服务端最少只需要一个线程,既可以处理所有客户端Socket

NIO的设计原理

       设计原理有点像设计模式中的观察者模式,由Selector去轮流咨询各个SocketChannel通道是否有事件发生,如果有,则选择出所有的Key集合,然后传递给处理程序。我们通过每个key就可以获取客户端的SocketChannel,从而进行通信。

       如果Selector发现所有通道都没有事件发生,则线程进入睡眠状态Sleep,阻塞。等到客户端有事件发生,会自动唤醒wakeup选择器selector,是不是有点类似观察者模式!!!!

下面以两个例子来说明,工程目录如下:

虽然是两个例子,但是代码都放在了一个工程里面,下面将分开介绍

二、例子1

1、DataPacket类

    该类是服务端和客户端传输的数据包

package com.nio;

import java.io.Serializable;
import java.util.Date;

/**
 * 数据包
 * @author Administrator
 *
 */
public class DataPacket implements Serializable{
    private long id;
    private String content;
    private Date sendTime;
    public long getId() {
        return id;
    }
    public void setId(long id) {
        this.id = id;
    }
    public String getContent() {
        return content;
    }
    public void setContent(String content) {
        this.content = content;
    }
    public Date getSendTime() {
        return sendTime;
    }
    public void setSendTime(Date sendTime) {
        this.sendTime = sendTime;
    }

}

2、服务端

  NIOServer,服务端,接收客户端发送过来的数据,并将接受到的数据再发送到客户端

package com.nio;

import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class NIOServer {
    
    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    private ServerSocket serverSocket;
    private static int PORT;
    private static int BUFFER_SIZE;
    private ByteBuffer buf;
    
    /**
     * 服务器构造
     * @param port
     * @param buffersize
     */
    public NIOServer(int port,int buffersize){
        this.PORT=port;
        this.BUFFER_SIZE=buffersize;
        buf=ByteBuffer.allocate(BUFFER_SIZE);
    }
    /**
     * 启动监听服务
     * @throws Exception
     */
    public void startListen() throws Exception{
        //打开选择器
        selector=Selector.open();
        //打开服务通道
        serverSocketChannel=ServerSocketChannel.open();
        //将服务通道设置为非阻塞
        serverSocketChannel.configureBlocking(false);
        //创建服务端Socket
        serverSocket=serverSocketChannel.socket();
        //服务端socket绑定端口
        serverSocket.bind(new InetSocketAddress(PORT));
        //服务端通道注册链接事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("端口注册完毕");
        Iterator<SelectionKey> iterator=null;
        SelectionKey selectionKey=null;
        while(true){
            //选择一批选择键(线程在此阻塞)
            selector.select();
            iterator=selector.selectedKeys().iterator();
            while(iterator.hasNext()){
                //selectionKey里包含了客户端发送过来的信息
                selectionKey=iterator.next();
                this.handleKey(selectionKey);
                iterator.remove();
            }
        }
    }
    
    /**
     * 处理选择的键
     * @param selectionKey
     * @throws Exception
     */
    @SuppressWarnings("unused")
    private void handleKey(SelectionKey selectionKey)throws Exception{
        //如果是链接事件
        if(selectionKey.isAcceptable()){
            //链接客户端通道(非阻塞)
            SocketChannel socketChannel=this.serverSocketChannel.accept();
            //设置客户端通道(非阻塞)
            socketChannel.configureBlocking(false);
            //注册读事件
            socketChannel.register(selector, SelectionKey.OP_READ);
            System.out.println("有新链接");
        }
        //如果是读信息事件
        else if(selectionKey.isReadable()){
            //获取客户端socket通道
            SocketChannel socketChannel=(SocketChannel)selectionKey.channel();
            //清空缓冲区
            buf.clear();
            //读取数据到缓冲区,并返回读取的字节数
            int a=socketChannel.read(buf);
            if(a>0){
                //将开始指针指向0;把结束指针指向实际有效位置
                buf.flip();
                //得到的b数据组大小
                byte[] b=new byte[buf.limit()];
                //取的时实际有效的数据
                buf.get(b,buf.position(),buf.limit());
                //ObjectInputStream 不能直接接受byte数组,所以先转换成ByteArrayInputStream
                ByteArrayInputStream byteIn=new ByteArrayInputStream(b);
                ObjectInputStream objIn=new ObjectInputStream(byteIn);
                DataPacket dataPacket=(DataPacket) objIn.readObject();
                objIn.close();
                byteIn.close();
                
                System.out.println("从客户端发送到服务端:"+dataPacket.getContent());
                System.out.println("接收时间:"+dataPacket.getSendTime().toLocaleString());
                
                buf.flip();
                //将发过来的数据再发送到客户端
                socketChannel.write(buf);                
            }
            else{
                //关闭客户端socket通道
                socketChannel.close();
            }
        }
    }

}

3、客户端

  NIOClient,客户端输入提示字符按回车,只要不是null都将信息发送到服务端,并监听客户端传过来的数据

package com.nio;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Date;

/**
 * 
 * @author Administrator
 *
 */
public class NIOClient {
    
    public static void main(String[] args){
        
        try {
            SocketAddress address=new InetSocketAddress("127.0.0.1",9999);
            //客户端通道打开的时候要指向一个地址和端口
            SocketChannel clientChannel=SocketChannel.open(address);
            clientChannel.configureBlocking(false);
            ByteBuffer buf=ByteBuffer.allocate(1024);
            while(true){
                buf.clear();
                System.out.println("请输入发送数据包:");
                //把输入的字节流转换成字符串
                String msg=new BufferedReader(new InputStreamReader(System.in)).readLine();
                if(msg.equals("null")){
                    break;
                }
                DataPacket dataPacket=new DataPacket();
                dataPacket.setContent("I am hzb");
                dataPacket.setSendTime(new Date());
                dataPacket.setId(1);
                ByteArrayOutputStream baos=new ByteArrayOutputStream();
                ObjectOutputStream oos=new ObjectOutputStream(baos);
                //把对象写入了oos流里面,但是没有到缓冲
                oos.writeObject(dataPacket);
                //把流的数据写入到缓冲区
                buf.put(baos.toByteArray());
                buf.flip();
                //把缓冲区里面的数据写到通道里面
                clientChannel.write(buf);
                System.out.println("客户端发送数据:"+msg);
                while(true){
                    int len=clientChannel.read(buf);
                    if(len>0){
                        buf.flip();
                        byte[] b=new byte[buf.limit()];
                        buf.get(b,buf.position(),buf.limit());
                        //注意:如果想要把服务端传过来的数据还原成对像,需要用
                        //ByteArrayInputStream byteIn=new ByteArrayInputStream(b);
                        //ObjectInputStream objIn=new ObjectInputStream(byteIn);
                        //DataPacket dataPacket=(DataPacket) objIn.readObject();
                        System.out.println("服务端传来数据:"+new String(b,"utf-8"));
                        break;
                    }
                }
            }
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

 三、例子2

   功能是,客户端将F:/work/nioSample/fileTest/client/client_send.txt发送给服务端,服务端接收到后存成F:/work/nioSample/fileTest/server/server_receive.txt

然后,服务端将F:/work/nioSample/fileTest/server/server_send.txt发送给客户端,客户端接收到后存成F:/work/nioSample/fileTest/client/client_receive.txt

1、服务端

package com.nio;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.logging.Level;
import java.util.logging.Logger;

public class NIOFileServer {
    private final static Logger logger = Logger.getLogger(NIOFileServer.class.getName());  
    /**
     * 主方法
     * @param args
     */
     public static void main(String[] args){
         Selector selector=null;
         ServerSocketChannel serverSocketChannel=null;
         try {
            selector=Selector.open();
            serverSocketChannel=ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().setReuseAddress(true);
            serverSocketChannel.socket().bind(new InetSocketAddress(10000));
            //注册链接事件
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            while(selector.select()>0){
                Iterator<SelectionKey> iterator=selector.selectedKeys().iterator();
                while(iterator.hasNext()){
                    SelectionKey key=iterator.next();
                    iterator.remove();
                    doiterator((ServerSocketChannel) key.channel());
                }
            }
            
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            logger.log(Level.SEVERE, e.getMessage(),e);
        }finally{
             try {
                selector.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            try {
                serverSocketChannel.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
     }
     /**
      * 
      * @param serverSocketChannel,定义成final保证在方法内部serverSocketChannel的内容不会被改变
      */
     private static  void doiterator(final ServerSocketChannel serverSocketChannel){
         SocketChannel socketChannel=null;
         try {
            socketChannel=serverSocketChannel.accept();
            receiveFile(socketChannel, new File("F:/work/nioSample/fileTest/server/server_receive.txt"));
            sendFile(socketChannel, new File("F:/work/nioSample/fileTest/server/server_send.txt"));
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
     }
     
     /**
      * 接收文件
      * @param socketChannel
      * @param file
      * @throws IOException
      */
     private static void receiveFile(SocketChannel socketChannel,File file) throws IOException{
         FileOutputStream fos=null;
         FileChannel fileChannel=null;
         try{
             //保存文件要保存的路径
             fos=new FileOutputStream(file);
             fileChannel=fos.getChannel();
             ByteBuffer buffer=ByteBuffer.allocateDirect(1024);
             int len=0;
             //把客户端通道socketChannel的文件读到缓冲区,再从缓冲区写到本地文件通道channel的路径下
             while((len=socketChannel.read(buffer))!=-1){
                 buffer.flip();
                 if(len>0){
                     buffer.limit(len);
                     fileChannel.write(buffer);
                     buffer.clear();
                 }
             }
         }catch(Exception ex){
             ex.printStackTrace();
         }finally{
             fos.close();
             fileChannel.close();
         }     
     }
     
     /**
      * 发送文件
      * @param socketChannel
      * @param file
      * @throws IOException
      */
     private static void sendFile(SocketChannel socketChannel,File file) throws IOException{
         FileInputStream fis=null;
         FileChannel fileChannel=null;
         try{
             fis=new FileInputStream(file);
             fileChannel=fis.getChannel();
             ByteBuffer buffer=ByteBuffer.allocateDirect(1024);
             int len=0;
             while((len=fileChannel.read(buffer))!=-1){
                 //将buffer的游标position指向0
                 buffer.rewind();
                 buffer.limit(len);
                 socketChannel.write(buffer);
                 buffer.clear();
             }
             //防止正在发送的过程中又发送一个文件
             socketChannel.socket().shutdownOutput(); 
         }catch(Exception ex){
             ex.printStackTrace();
         }finally{
             fis.close();
             fileChannel.close();
         }
     }
}

2、客户端

package com.nio;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.util.logging.Level;
import java.util.logging.Logger;

public class NIOFileClient {

    private final static Logger logger = Logger.getLogger(NIOFileClient.class.getName());
    
    public static void main(String[] args) throws Exception {
        new Thread(new MyRunnable()).start();
    }
    
    private static final class MyRunnable implements Runnable {
        public void run() {
            SocketChannel socketChannel = null;
            try {
                socketChannel = SocketChannel.open();
                SocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 10000);
                socketChannel.connect(socketAddress);

                sendFile(socketChannel, new File("F:/work/nioSample/fileTest/client/client_send.txt"));
                receiveFile(socketChannel, new File("F:/work/nioSample/fileTest/client/client_receive.txt"));
                
            } catch (Exception ex) {
                logger.log(Level.SEVERE, null, ex);
            } finally {
                try {
                    socketChannel.close();
                } catch(Exception ex) {}
            }
        }

        private void sendFile(SocketChannel socketChannel, File file) throws IOException {
            FileInputStream fis = null;
            FileChannel channel = null;
            try {
                fis = new FileInputStream(file);
                channel = fis.getChannel();
                ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
                int size = 0;
                while ((size = channel.read(buffer)) != -1) {
                    buffer.rewind();
                    buffer.limit(size);
                    socketChannel.write(buffer);
                    buffer.clear();
                }
                socketChannel.socket().shutdownOutput();
            } finally {
                try {
                    channel.close();
                } catch(Exception ex) {}
                try {
                    fis.close();
                } catch(Exception ex) {}
            }
        }

        private void receiveFile(SocketChannel socketChannel, File file) throws IOException {
            FileOutputStream fos = null;
            FileChannel channel = null;
            
            try {
                fos = new FileOutputStream(file);
                channel = fos.getChannel();
                ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

                int size = 0;
                while ((size = socketChannel.read(buffer)) != -1) {
                    buffer.flip();
                    if (size > 0) {
                        buffer.limit(size);
                        channel.write(buffer);
                        buffer.clear();
                    }
                }
            } finally {
                try {
                    channel.close();
                } catch(Exception ex) {}
                try {
                    fos.close();
                } catch(Exception ex) {}
            }
        }
    }
}
原文地址:https://www.cnblogs.com/boshen-hzb/p/5897230.html