NIO Socket编程实例

1.阻塞模式实例  

  NIOUtil类,用来通过SOcket获取BufferedReader和PrintWriter。

package IO;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;

public class NIOUtil {
    public static PrintWriter getPrintWriter(Socket socket) throws IOException {
        OutputStream outputStream = socket.getOutputStream();
        return new PrintWriter(outputStream, true);
    }

    public static BufferedReader getBufferedReader(Socket socket)
            throws IOException {
        InputStream inputStream = socket.getInputStream();
        return new BufferedReader(new InputStreamReader(inputStream));
    }
}
View Code

  使用ServerSocketChannel创建阻塞服务器端程序:

package IO;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BlockingNIOServer {
    private int port = 8000;
    private ServerSocketChannel serverSocketChannel = null;
    private ExecutorService executorService = null;
    private static int DEFAULT_POOI_SIZE = 4;

    public BlockingNIOServer() throws IOException {
        super();
        this.executorService = Executors.newFixedThreadPool(DEFAULT_POOI_SIZE
                * Runtime.getRuntime().availableProcessors());
        this.serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().setReuseAddress(true);
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        this.executorService = executorService;
    }

    public void service() {
        while (true) {
            SocketChannel channel = null;
            try {
                channel = serverSocketChannel.accept();
                executorService.execute(new Handler(channel));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        new BlockingNIOServer().service();
    }

    private class Handler implements Runnable {
        private SocketChannel channel;
        public Handler(SocketChannel channel) {
            super();
            this.channel = channel;
        }

        @Override
        public void run() {
            handler(channel);
        }

        public void handler(SocketChannel channel) {
            Socket socket = null;
            try {
                socket = channel.socket();
                System.out.println("接收到来自:" + socket.getInetAddress() + " 端口:"
                        + socket.getPort() + "的请求");
                BufferedReader bufferedReader = NIOUtil
                        .getBufferedReader(socket);
                PrintWriter printWriter = NIOUtil.getPrintWriter(socket);
                String msg = null;

                while ((msg = bufferedReader.readLine()) != null) {
                    System.out.println(msg);
                    printWriter.println(Echo(msg));
                    if ("bye".equalsIgnoreCase(msg))
                        break;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (channel != null) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }

        }

        private String Echo(String msg) {
            return "ECHO:" + msg;
        }
    }
}
View Code

  使用SocketChannel创建阻塞Socket客户端:

package IO;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BlockingNIOServer {
    private int port = 8000;
    private ServerSocketChannel serverSocketChannel = null;
    private ExecutorService executorService = null;
    private static int DEFAULT_POOI_SIZE = 4;

    public BlockingNIOServer() throws IOException {
        super();
        this.executorService = Executors.newFixedThreadPool(DEFAULT_POOI_SIZE
                * Runtime.getRuntime().availableProcessors());
        this.serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().setReuseAddress(true);
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        this.executorService = executorService;
    }

    public void service() {
        while (true) {
            SocketChannel channel = null;
            try {
                channel = serverSocketChannel.accept();
                executorService.execute(new Handler(channel));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        new BlockingNIOServer().service();
    }

    private class Handler implements Runnable {
        private SocketChannel channel;
        public Handler(SocketChannel channel) {
            super();
            this.channel = channel;
        }

        @Override
        public void run() {
            handler(channel);
        }

        public void handler(SocketChannel channel) {
            Socket socket = null;
            try {
                socket = channel.socket();
                System.out.println("接收到来自:" + socket.getInetAddress() + " 端口:"
                        + socket.getPort() + "的请求");
                BufferedReader bufferedReader = NIOUtil
                        .getBufferedReader(socket);
                PrintWriter printWriter = NIOUtil.getPrintWriter(socket);
                String msg = null;

                while ((msg = bufferedReader.readLine()) != null) {
                    System.out.println(msg);
                    printWriter.println(Echo(msg));
                    if ("bye".equalsIgnoreCase(msg))
                        break;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (channel != null) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }

        }

        private String Echo(String msg) {
            return "ECHO:" + msg;
        }
    }
}
View Code

 2.非阻塞模式实例

  Charset类,主要用于decode()和encode()

package IO;

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;

public class CharSetUtil {
    public static String decode(ByteBuffer buffer, String charsetName) {
        Charset charset = Charset.forName(charsetName);
        CharBuffer msg = charset.decode(buffer);
        return msg.toString();
    }
    
    public static ByteBuffer encode(String msg, String charsetName) {
        Charset charset = Charset.forName(charsetName);
        ByteBuffer byteBuffer = charset.encode(msg);
        return byteBuffer;
    }
}
View Code

  非阻塞的服务器端

package IO;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NoBlockingServer {
    private int port = 8000;
    private ServerSocketChannel serverSocketChannel = null;
    private Selector selector = null;

    public NoBlockingServer() throws IOException {
        super();
        selector = Selector.open();
        this.serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().setReuseAddress(true);
        serverSocketChannel.configureBlocking(false);//设置为非阻塞
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        System.out.println("服务器启动成功");
    }

    public void service() throws IOException {
        //给serverSocketChannel注册OP_ACCEPT事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        //注意selector.select()将会阻塞
        while (selector.select() > 0) {
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = null;
                try {
                    selectionKey = (SelectionKey) iterator.next();
                    iterator.remove();

                    if (selectionKey.isAcceptable()) {
                        dealWithAcceptable(selectionKey);
                    }
                    if (selectionKey.isReadable()) {
                        dealWithReadable(selectionKey);
                    }
                    if (selectionKey.isWritable()) {
                        dealWithWritable(selectionKey);
                    }
                } catch (Exception e) {
                    if (selectionKey != null) {
                        selectionKey.cancel();
                        selectionKey.channel().close();
                    }
                }
            }
        }
    }

    private void dealWithAcceptable(SelectionKey selectionKey)
            throws IOException {
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey
                .channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        System.out.println("接收到来自:" + socketChannel.socket().getInetAddress()
                + " 端口" + socketChannel.socket().getPort() + "的请求");
        socketChannel.configureBlocking(false);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ
                | SelectionKey.OP_WRITE, buffer);
    }

    private void dealWithReadable(SelectionKey selectionKey) throws IOException{
        ByteBuffer buffer=(ByteBuffer) selectionKey.attachment();
        SocketChannel channel=(SocketChannel) selectionKey.channel();
        ByteBuffer readBuffer=ByteBuffer.allocate(32);
        channel.read(readBuffer);
        readBuffer.flip();
        
        buffer.limit(buffer.capacity());
        buffer.put(readBuffer);    
    }
    
    private void dealWithWritable(SelectionKey selectionKey) throws IOException{
        ByteBuffer buffer=(ByteBuffer) selectionKey.attachment();
        SocketChannel channel=(SocketChannel) selectionKey.channel();
        buffer.flip();
        
        String msg=CharSetUtil.decode(buffer, "UTF-8");
        
        if(msg.indexOf("
")==-1){
            return;
        }
        
        String outPutData=msg.substring(0, msg.indexOf("
")+1);
        System.out.println("接收来自客户端的数据:"+outPutData);
        
        ByteBuffer outbyteBuffer=CharSetUtil.encode("echo:"+outPutData, "UTF-8");
        while (outbyteBuffer.hasRemaining()) {
            channel.write(outbyteBuffer);    
        }
        
        ByteBuffer tmp=CharSetUtil.encode(outPutData, "UTF-8");
        buffer.position(tmp.limit());
        buffer.compact();
        if("bye
".equalsIgnoreCase(outPutData)){
            selectionKey.cancel();
            channel.close();
            System.out.println("关闭与客户端的连接");
        }
    }
    
    public static void main(String[] args) throws Exception {
        new NoBlockingServer().service();
    }
}
View Code

   非阻塞的客户端

package IO;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NoBlockingClient {
    private SocketChannel channel = null;
    private ByteBuffer send = ByteBuffer.allocate(1024);
    private ByteBuffer rece = ByteBuffer.allocate(1024);
    private Selector selector;

    public NoBlockingClient() throws IOException {
        super();
        channel = SocketChannel.open();
        channel.socket().connect(new InetSocketAddress("localhost", 8000));
        channel.configureBlocking(false);
        
        System.out.println("与服务器建立连接成功");
        selector = Selector.open();
    }

    public void talk() throws IOException {
        channel.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE);
        while (selector.select() > 0) {
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = null;
                try {
                    selectionKey = (SelectionKey) iterator.next();
                    iterator.remove();

                    if (selectionKey.isReadable()) {
                        dealWithReadable(selectionKey);
                    } 
                    if (selectionKey.isWritable()) {
                        dealWithWritable(selectionKey);
                    }
                } catch (Exception e) {
                    if (selectionKey != null) {
                        selectionKey.cancel();
                        try {
                            selectionKey.channel().close();
                        } catch (IOException e1) {
                            e1.printStackTrace();
                        }
                    }
                }
            }
        }
    }
    
    private void receFromUser() throws IOException{
        BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(System.in));
        String msg=null;
        while ((msg=bufferedReader.readLine())!=null) {
            synchronized (send) {
                send.put(CharSetUtil.encode(msg+"
", "UTF-8"));
            }
            if ("bye".equalsIgnoreCase(msg)) {
                break;
            }
        }
    }
    
    private void dealWithWritable(SelectionKey selectionKey) throws IOException{
        SocketChannel channel=(SocketChannel) selectionKey.channel();
        synchronized (send) {
            send.flip();
            channel.write(send);
            send.compact();
        }
    }
    
    private void dealWithReadable(SelectionKey selectionKey) throws IOException{
        SocketChannel channel=(SocketChannel) selectionKey.channel();
        channel.read(rece);
        rece.flip();
        String msg=CharSetUtil.decode(rece, "UTF-8");
        
        if(msg.indexOf("
")==-1){
            return;
        }
        
        String outPutData=msg.substring(0, msg.indexOf("
")+1);
        System.out.println(outPutData);
        
        if("echo:bye
".equalsIgnoreCase(outPutData)){
            selectionKey.cancel();
            channel.close();
            selector.close();
            System.out.println("关闭与客户端的连接");
        }
            
        ByteBuffer tmp=CharSetUtil.encode(outPutData, "UTF-8");
        rece.position(tmp.limit());
        rece.compact();    
    }
    
    public static void main(String[] args) throws IOException {
           System.out.println(System.getProperty("file.encoding"));   
        final NoBlockingClient noBlockingClient=new NoBlockingClient();
        Thread thread=new Thread(){
            public void run() {
                try {
                    noBlockingClient.receFromUser();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            };
        };
         
        thread.start();
        noBlockingClient.talk();
    }
}
View Code

 3.阻塞和非阻塞编程实例

  服务器端使用阻塞和非阻塞模式,f负责接收客户端连接的线程按照阻塞模式工作,负责接收和发送数据的线程按照非阻塞模式工作。

package IO;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NOBlockingAndBolckingServer {
    private int port = 8000;
    private ServerSocketChannel serverSocketChannel = null;
    private Selector selector = null;
    private Object gate = new Object();

    public NOBlockingAndBolckingServer() throws IOException {
        super();
        selector = Selector.open();
        this.serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        System.out.println("服务器启动成功");
    }

    public void accept() {
        for (;;) {
            try {
                SocketChannel socketChannel = serverSocketChannel.accept();
                System.out.println("接收到来自:"
                        + socketChannel.socket().getInetAddress() + " 端口"
                        + socketChannel.socket().getPort() + "的请求");
                socketChannel.configureBlocking(false);

                synchronized (gate) {
                    selector.wakeup();
                    socketChannel.register(selector, SelectionKey.OP_READ
                            | SelectionKey.OP_WRITE);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public void service() throws IOException {
        // 给serverSocketChannel注册OP_ACCEPT事件
        for (;;) {
            synchronized (gate) {
                int n = selector.select();
                if (n == 0)
                    continue;
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = null;
                    try {
                        selectionKey = (SelectionKey) iterator.next();
                        iterator.remove();

                        if (selectionKey.isReadable()) {
                            dealWithReadable(selectionKey);
                        }
                        if (selectionKey.isWritable()) {
                            dealWithWritable(selectionKey);
                        }
                    } catch (Exception e) {
                        if (selectionKey != null) {
                            selectionKey.cancel();
                            selectionKey.channel().close();
                        }
                    }
                }
            }
        }
    }
    
    private void dealWithReadable(SelectionKey selectionKey) throws IOException {
        ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        ByteBuffer readBuffer = ByteBuffer.allocate(32);
        channel.read(readBuffer);
        readBuffer.flip();

        buffer.limit(buffer.capacity());
        buffer.put(readBuffer);
    }

    private void dealWithWritable(SelectionKey selectionKey) throws IOException {
        ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        buffer.flip();

        String msg = CharSetUtil.decode(buffer, "UTF-8");

        if (msg.indexOf("
") == -1) {
            return;
        }

        String outPutData = msg.substring(0, msg.indexOf("
") + 1);
        System.out.println("接收来自客户端的数据:" + outPutData);

        ByteBuffer outbyteBuffer = CharSetUtil.encode("echo:" + outPutData,
                "UTF-8");
        while (outbyteBuffer.hasRemaining()) {
            channel.write(outbyteBuffer);
        }

        ByteBuffer tmp = CharSetUtil.encode(outPutData, "UTF-8");
        buffer.position(tmp.limit());
        buffer.compact();
        if ("bye
".equalsIgnoreCase(outPutData)) {
            selectionKey.cancel();
            channel.close();
            System.out.println("关闭与客户端的连接");
        }
    }

    public static void main(String[] args) throws Exception {
        final NOBlockingAndBolckingServer server=new NOBlockingAndBolckingServer();
        Thread thread=new Thread(){
            public void run() {
                server.accept();
            };
        };
        thread.start();
        server.service();
    }
}
View Code

  客户端和服务器端创建多个连接。

package IO;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedList;

class Target { // 表示一项任务
    InetSocketAddress address;
    SocketChannel channel;
    Exception failure;
    long connectStart; // 开始连接时的时间
    long connectFinish = 0; // 连接成功时的时间
    boolean shown = false; // 该任务是否已经打印

    Target(String host) {
        try {
            address = new InetSocketAddress(InetAddress.getByName(host), 80);
        } catch (IOException x) {
            failure = x;
        }
    }

    void show() { // 打印任务执行的结果
        String result;
        if (connectFinish != 0)
            result = Long.toString(connectFinish - connectStart) + "ms";
        else if (failure != null)
            result = failure.toString();
        else
            result = "Timed out";
        System.out.println(address + " : " + result);
        shown = true;
    }
}

public class PingClient {
    private Selector selector;
    // 存放用户新提交的任务
    private LinkedList targets = new LinkedList();
    // 存放已经完成的需要打印的任务
    private LinkedList finishedTargets = new LinkedList();

    public PingClient() throws IOException {
        selector = Selector.open();
        Connector connector = new Connector();
        Printer printer = new Printer();
        connector.start();
        printer.start();
        receiveTarget();
    }

    public static void main(String args[]) throws IOException {
        new PingClient();
    }

    public void addTarget(Target target) {
        // 向targets队列中加入一个任务
        SocketChannel socketChannel = null;
        try {
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            socketChannel.connect(target.address);

            target.channel = socketChannel;
            target.connectStart = System.currentTimeMillis();

            synchronized (targets) {
                targets.add(target);
            }
            selector.wakeup();
        } catch (Exception x) {
            if (socketChannel != null) {
                try {
                    socketChannel.close();
                } catch (IOException xx) {
                }
            }
            target.failure = x;
            addFinishedTarget(target);
        }
    }

    public void addFinishedTarget(Target target) {
        // 向finishedTargets队列中加入一个任务
        synchronized (finishedTargets) {
            finishedTargets.notify();
            finishedTargets.add(target);
        }
    }

    public void printFinishedTargets() {
        // 打印finisedTargets队列中的任务
        try {
            for (;;) {
                Target target = null;
                synchronized (finishedTargets) {
                    while (finishedTargets.size() == 0)
                        finishedTargets.wait();
                    target = (Target) finishedTargets.removeFirst();
                }
                target.show();
            }
        } catch (InterruptedException x) {
            return;
        }
    }

    public void registerTargets() {
        // 取出targets队列中的任务,向Selector注册连接就绪事件
        synchronized (targets) {
            while (targets.size() > 0) {
                Target target = (Target) targets.removeFirst();

                try {
                    target.channel.register(selector, SelectionKey.OP_CONNECT,
                            target);
                } catch (IOException x) {
                    try {
                        target.channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    target.failure = x;
                    addFinishedTarget(target);
                }
            }
        }
    }

    public void processSelectedKeys() throws IOException {
        // 处理连接就绪事件
        for (Iterator it = selector.selectedKeys().iterator(); it.hasNext();) {
            SelectionKey selectionKey = (SelectionKey) it.next();
            it.remove();

            Target target = (Target) selectionKey.attachment();
            SocketChannel socketChannel = (SocketChannel) selectionKey
                    .channel();

            try {
                if (socketChannel.finishConnect()) {
                    selectionKey.cancel();
                    target.connectFinish = System.currentTimeMillis();
                    socketChannel.close();
                    addFinishedTarget(target);
                }
            } catch (IOException x) {
                socketChannel.close();
                target.failure = x;
                addFinishedTarget(target);
            }
        }
    }

    //接收用户输入的地址,向targets队列中加入任务
    public void receiveTarget() {
        try {
            BufferedReader localReader = new BufferedReader(
                    new InputStreamReader(System.in));
            String msg = null;
            while ((msg = localReader.readLine()) != null) {
                if (!msg.equals("bye")) {
                    Target target = new Target(msg);
                    addTarget(target);
                } else {
                    shutdown = true;
                    selector.wakeup();
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    boolean shutdown = false;

    public class Printer extends Thread {
        public Printer() {
            setDaemon(true);
        }

        public void run() {
            printFinishedTargets();
        }
    }

    public class Connector extends Thread {
        public void run() {
            while (!shutdown) {
                try {
                    registerTargets();
                    if (selector.select() > 0) {
                        processSelectedKeys();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
View Code
原文地址:https://www.cnblogs.com/wxgblogs/p/5676052.html