Java网络编程--echo服务器

客户端使用Java的阻塞IO

服务端使用Java的非阻塞NIO

package com.nio.echo;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Scanner;

/**
 * @author 作者 E-mail:
 * @version 创建时间:2015-10-29 下午02:49:47 类说明
 */
public class EchoClient
{
    public static final String REMOT_IP = "127.0.0.1";

    public static final int REMOTE_PORT = 8080;

    public void connectServer() throws IOException
    {
        Socket socket = new Socket();

        socket.connect(new InetSocketAddress(REMOT_IP, REMOTE_PORT));

        if (socket.isConnected())
        {
            System.out.println("connect remote address success");
        }

        // 启动线程监听server端消息
        new Thread(new client2server(socket)).start();
        Scanner scanner = new Scanner(System.in);

        OutputStream output = socket.getOutputStream();
        while (true)
        {
            String str = scanner.nextLine();

            if (str.equals("quit"))
            {
                socket.close();
                break;
            }
            output.write(str.getBytes("UTF-8"));

        }

    }

    public static void main(String[] args) throws IOException
    {
        new EchoClient().connectServer();
    }
}

class client2server implements Runnable
{
    private Socket socket = null;

    public client2server(Socket socket)
    {
        this.socket = socket;
    }

    @Override
    public void run()
    {
        InputStream inputStream;
        try
        {
            inputStream = socket.getInputStream();
            byte[] bytes = new byte[1024];
            while (true)
            {
                int num = inputStream.read(bytes);
                if (num != -1)
                {
                    System.out.print(num + " ");
                }
                else
                {
                    System.out.println("server is shutup");
                    break;
                }

                String str = new String(bytes, 0, num, "UTF-8");
                System.out.println("get data: " + str);

            }
        }
        catch(IOException e)
        {
            e.printStackTrace();
        }

    }
}
package com.nio.echo;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;
import java.util.Set;

/**
 * @author 作者 E-mail:
 * @version 创建时间:2015-10-29 下午02:49:12 类说明
 */
public class NIOEchoServer
{
    private static ServerSocketChannel ssc = null;

    private static Selector selector = null;

    private static final int PORT = 8080;

    public static void startServer() throws IOException
    {
        ssc = ServerSocketChannel.open();
        selector = Selector.open();
        ssc.configureBlocking(false);

        // nio 对socket 和serverSocket进行了怎样封装
        final ServerSocket serverSocket = ssc.socket();

        serverSocket.bind(new InetSocketAddress(PORT));
        serverSocket.setReuseAddress(true);

        final AcceptHandler acceptHandler = new AcceptHandler();
        ssc.register(selector, SelectionKey.OP_ACCEPT, acceptHandler);
        while (true)
        {
            int n = selector.select();
            if (n == 0)
                continue;

            final Set<SelectionKey> readyKeys = selector.selectedKeys();
            final Iterator<SelectionKey> it = readyKeys.iterator();
            while (it.hasNext())
            {
                final SelectionKey key = it.next();
                final Handle handler = (Handle) key.attachment();
                handler.doHandle(key);
                it.remove();
            }
        }
    }

    public static void main(String[] args) throws IOException
    {
        NIOEchoServer.startServer();
    }
}

interface Handle
{
    void doHandle(SelectionKey key) throws IOException;
}

class AcceptHandler implements Handle
{

    @Override
    public void doHandle(SelectionKey key) throws IOException
    {

        final ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
        final SocketChannel sc = ssc.accept();
        final IOHandler handler = new IOHandler(key.selector(), sc);
        System.out.println("server: connect success");
    }

}

class IOHandler implements Handle
{
    private final ByteBuffer readBuffer = ByteBuffer.allocate(1024);

    private OutputBuffer outputBuffer = new OutputBuffer();

    private SocketChannel socketChannel = null;

    // private Selector selector = null;

    private SelectionKey key = null;

    public IOHandler(Selector selector, SocketChannel sc) throws IOException
    {
        // this.selector = selector;
        this.socketChannel = sc;
        socketChannel.configureBlocking(false);
        key = socketChannel.register(selector, SelectionKey.OP_READ, this);
    }

    /**
     * 增加输出缓存
     * 
     * @param writeData
     *            要写出的数据
     * @throws IOException
     * @return 返回处理的字节数
     */
    private int addWriteBuffer(ByteBuffer bytebuffer, int num) throws IOException
    {
        int prevPositon = bytebuffer.position();
        outputBuffer.size += num;

        outputBuffer.writeBuffer.put(bytebuffer).flip();
        int nowPosition = bytebuffer.position();

        this.interestOps(0, SelectionKey.OP_WRITE);

        return nowPosition - prevPositon;
    }

    /**
     * 增加删除相应事件
     * 
     * @param remove
     * @param add
     */
    private void interestOps(int remove, int add)
    {
        int cur = key.interestOps();
        int ops = (cur & ~remove) | add;
        if (cur != ops)
        {
            key.interestOps(ops);
            key.selector().wakeup();
        }
    }

    /**
     * ByteBuffer 转换 String
     * 
     * @param buffer
     * @return
     */
    public static String getString(ByteBuffer buffer)
    {
        Charset charset = null;
        CharsetDecoder decoder = null;
        CharBuffer charBuffer = null;
        try
        {
            charset = Charset.forName("UTF-8");
            decoder = charset.newDecoder();
            // charBuffer = decoder.decode(buffer);//用这个的话,只能输出来一次结果,第二次显示为空
            charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
            return charBuffer.toString();
        }
        catch(Exception ex)
        {
            ex.printStackTrace();
            return "";
        }
    }

    @Override
    public void doHandle(SelectionKey key) throws IOException
    {
        if (key.isReadable())
        {
            System.out.print("server:  meet read event ,before read position = " + readBuffer.position());

            int num = socketChannel.read(readBuffer);

            // 关闭
            if (num == -1)
            {
                System.out.println("close the channel ");
                key.channel();
                key.channel().close();
                return;
            }

            // 将position置为0
            readBuffer.flip();

            System.out.print(" reveive data " + getString(readBuffer));

            int dealsize = addWriteBuffer(readBuffer, num);

            System.out.println(" write to writeBuffer size = " + dealsize + " nowPostion = " + readBuffer.position());

            // 将处理过的数据清除
            readBuffer.compact();
        }
        else if (key.isWritable())
        {
            System.out.print("meet write event");

            long num = socketChannel.write(outputBuffer.writeBuffer);
            outputBuffer.size -= num;

            System.out.print("deal size = " + num + "left buffer size = " + outputBuffer.size);
            if (outputBuffer.size == 0)
            {
                System.out.println(" deal over,cancel write event");
                interestOps(SelectionKey.OP_WRITE, 0);
            }
            // 清除已经处理过的数据
            outputBuffer.writeBuffer.compact();

        }
    }
}

class OutputBuffer
{
    public int size;

    public final ByteBuffer writeBuffer = ByteBuffer.allocate(1024);

}

ByteBuffer没有提供有用数据的相关方法,只能自己写一个OutputBuffer来辅助处理

之前OutputBuffer只是封装了一个ByteBuffer以及一个size变量用于标示可以数据量

下面对OutputBuffer进行了重构,将size变量的修改以及数据的写入和写出操作都封装到方法中,其中output(SocketChannel socketChannel)

方法利用回调的思想,将socketChannel对象传入,在OutputBuffer当中实现数据的write输出

package com.nio.echo;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;
import java.util.Set;

/**
 * @author 作者 E-mail:
 * @version 创建时间:2015-10-29 下午02:49:12 类说明
 */
public class NIOEchoServer
{
    private static ServerSocketChannel ssc = null;

    private static Selector selector = null;

    private static final int PORT = 8080;

    public static void startServer() throws IOException
    {
        ssc = ServerSocketChannel.open();
        selector = Selector.open();
        ssc.configureBlocking(false);

        // nio 对socket 和serverSocket进行了怎样封装
        final ServerSocket serverSocket = ssc.socket();

        serverSocket.bind(new InetSocketAddress(PORT));
        serverSocket.setReuseAddress(true);

        final AcceptHandler acceptHandler = new AcceptHandler();
        ssc.register(selector, SelectionKey.OP_ACCEPT, acceptHandler);
        while (true)
        {
            int n = selector.select();
            if (n == 0)
                continue;

            final Set<SelectionKey> readyKeys = selector.selectedKeys();
            final Iterator<SelectionKey> it = readyKeys.iterator();
            while (it.hasNext())
            {
                final SelectionKey key = it.next();
                final Handle handler = (Handle) key.attachment();
                handler.doHandle(key);
                it.remove();
            }
        }
    }

    public static void main(String[] args) throws IOException
    {
        NIOEchoServer.startServer();
    }
}

interface Handle
{
    void doHandle(SelectionKey key) throws IOException;
}

class AcceptHandler implements Handle
{

    @Override
    public void doHandle(SelectionKey key) throws IOException
    {

        final ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
        final SocketChannel sc = ssc.accept();
        final IOHandler handler = new IOHandler(key.selector(), sc);
        System.out.println("server: connect success");
    }

}

class IOHandler implements Handle
{
    private final ByteBuffer readBuffer = ByteBuffer.allocate(1024);

    private OutputBuffer outputBuffer = new OutputBuffer();

    private SocketChannel socketChannel = null;

    // private Selector selector = null;

    private SelectionKey key = null;

    public IOHandler(Selector selector, SocketChannel sc) throws IOException
    {
        // this.selector = selector;
        this.socketChannel = sc;
        socketChannel.configureBlocking(false);
        key = socketChannel.register(selector, SelectionKey.OP_READ, this);
    }

    /**
     * 增加输出缓存
     * 
     * @param writeData
     *            要写出的数据
     * @throws IOException
     * @return 返回处理的字节数
     */
    private int addWriteBuffer(ByteBuffer bytebuffer, int num) throws IOException
    {
        int prevPositon = bytebuffer.position();

        outputBuffer.put(bytebuffer, num);

        int nowPosition = bytebuffer.position();

        this.interestOps(0, SelectionKey.OP_WRITE);

        return nowPosition - prevPositon;
    }

    /**
     * 增加删除相应事件
     * 
     * @param remove
     * @param add
     */
    private void interestOps(int remove, int add)
    {
        int cur = key.interestOps();
        int ops = (cur & ~remove) | add;
        if (cur != ops)
        {
            key.interestOps(ops);
            key.selector().wakeup();
        }
    }

    /**
     * ByteBuffer 转换 String
     * 
     * @param buffer
     * @return
     */
    public static String getString(ByteBuffer buffer)
    {
        Charset charset = null;
        CharsetDecoder decoder = null;
        CharBuffer charBuffer = null;
        try
        {
            charset = Charset.forName("UTF-8");
            decoder = charset.newDecoder();
            // charBuffer = decoder.decode(buffer);//用这个的话,只能输出来一次结果,第二次显示为空
            charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
            return charBuffer.toString();
        }
        catch(Exception ex)
        {
            ex.printStackTrace();
            return "";
        }
    }

    @Override
    public void doHandle(SelectionKey key) throws IOException
    {
        if (key.isReadable())
        {
            System.out.print("server:  meet read event ,before read position = " + readBuffer.position());

            int num = socketChannel.read(readBuffer);

            // 关闭
            if (num == -1)
            {
                System.out.println("close the channel ");
                key.channel();
                key.channel().close();
                return;
            }

            // 将position置为0
            readBuffer.flip();

            System.out.println(" reveive data " + getString(readBuffer));

            int dealsize = addWriteBuffer(readBuffer, num);

            // 将处理过的数据清除
            readBuffer.compact();
        }
        else if (key.isWritable())
        {
            System.out.print("meet write event");

            // 写数据
            outputBuffer.output(socketChannel);

            if (outputBuffer.size() == 0)
            {
                System.out.println(" deal over,cancel write event");
                interestOps(SelectionKey.OP_WRITE, 0);
            }

        }
    }
}

class OutputBuffer
{
    private int size;

    private final ByteBuffer writeBuffer = ByteBuffer.allocate(1024);

    public void output(SocketChannel socketChannel) throws IOException
    {
        int num = socketChannel.write(writeBuffer);
        writeBuffer.compact();
        size -= num;
    }

    public void put(ByteBuffer b, int num)
    {
        writeBuffer.put(b).flip();
        this.size += num;
    }

    public int size()
    {
        return this.size;
    }
}

  

事实上在NIO网络编程中,写出数据的操作需要加入缓存才能保证效率,目的是为了写操作发生的时候不影响业务继续send消息,首先将send消息发送过来的数据缓存到A中,在写事件发生的时候将A中数据写出(此时仅短暂锁住A,将A中引用拿出,重新赋值新引用给A),这样写事件的处理过程和业务消息的send就可以高并发的进行。

原文地址:https://www.cnblogs.com/wuxinliulei/p/4923148.html