本篇包含了入门小栗子以及一些问题的思考
BIO
package com.demo.bio; import java.io.*; import java.net.ServerSocket; import java.net.Socket; import java.util.Scanner; /** * 问题:开启多个客户端,只有服务端发送足够条数的消息,客户端才会收到 */ public class Server { public static void main(String[] args) throws Exception { new Server().startServer(); } public void startServer() throws IOException { ServerSocket serverSocket = new ServerSocket(9999); while (true){ Socket client = serverSocket.accept(); System.err.println("Client:" + client.getInetAddress().getHostAddress()); OutputStream out = client.getOutputStream(); PrintWriter writer = new PrintWriter(new OutputStreamWriter(out, "UTF-8"), true); writer.println("Hello!We are already connected!say 'bye' to close"); new Thread(new SocketReadThread(client)).start(); new Thread(new SocketWriteThread(client)).start(); } } } /** * 读线程 */ class SocketReadThread implements Runnable{ private Socket socket; public SocketReadThread(Socket socket) { this.socket = socket; } @Override public void run() { try { InputStream in = socket.getInputStream(); Scanner scanner = new Scanner(in, "UTF-8"); boolean bye = false; while (!bye && scanner.hasNextLine()){ String line = scanner.nextLine(); System.out.println("Client Msg[" + socket + "]:" + line); if(line.trim().equals("bye")){ bye = true; } } in.close(); socket.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 写线程 */ class SocketWriteThread implements Runnable{ private Socket socket; public SocketWriteThread(Socket socket) { this.socket = socket; } @Override public void run() { try { OutputStream out = socket.getOutputStream(); PrintWriter writer = new PrintWriter(new OutputStreamWriter(out, "UTF-8"), true); Scanner scanIn = new Scanner(System.in); while (true){ String line = scanIn.nextLine(); writer.println(line); if (socket.isClosed()){ break; } } } catch (IOException e) { e.printStackTrace(); } } }
package com.demo.bio; import java.io.*; import java.net.Socket; import java.util.Scanner; /** * 客户端 */ public class Client { public static void main(String[] args) throws Exception { Socket socket = new Socket("127.0.0.1", 9999); OutputStream out = socket.getOutputStream(); PrintWriter writer = new PrintWriter(new OutputStreamWriter(out, "UTF-8"), true); new Thread(new SocketReceiveThread(socket)).start(); Scanner scanIn = new Scanner(System.in); while (!socket.isClosed()){ String line = scanIn.nextLine(); writer.println(line); if(line.trim().equals("bye")){ socket.close(); } } } } class SocketReceiveThread implements Runnable{ private Socket socket; public SocketReceiveThread(Socket socket) { this.socket = socket; } @Override public void run() { try { InputStream in = socket.getInputStream(); Scanner scanner = new Scanner(in, "UTF-8"); boolean bye = false; while (!bye && scanner.hasNextLine()){ String line = scanner.nextLine(); System.out.println("Server Msg:" + line); if(line.trim().equals("bye")){ bye = true; } } scanner.close(); } catch (IOException e) { e.printStackTrace(); } } }
BIO没什么难的,同步阻塞。上面实现的主要就是服务器和客户端你一句我一句,巴拉巴拉巴拉
NIO
我要实现一个客户端服务器通信的例子,我的第一个版本
package com.demo.nio; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Iterator; import java.util.Scanner; import java.util.Set; /** * 问题:启动服务器,没有启动客户端的时候,阻塞在selector.select();直到有客户端连接才会向下走。 * 启动客户端:获取到客户端的消息,并读取显示;然后写一条数据给客户端;然后进入了写操作模块,等待写入,阻塞。 * 这个时候,客户端已经经过了读取操作,并且没有读到数据,也进入了写操作模块,等待写入,阻塞。这就解释了为什么客户端收不到服务器的第一条消息。 * 客户端写入:客户端输入数据,发送给服务器,离开写操作模块,进入下一轮循环,然后进入读操作模块,读取到服务器的第一条消息并显示。 * 服务器接收:此时服务器并没有收到客户端的消息,因为此时还在写操作模块阻塞,所以想要读取到数据,就要向客户端发送数据,以离开写操作模块,进入下一轮循环。 * 这就解释了:为什么要先写入才能读取的数据。 */ public class Server { private boolean isFirst = true; private ServerSocketChannel ssc = null; private Selector selector = null; public Server(int port) throws IOException { ssc = ServerSocketChannel.open(); selector = Selector.open(); InetSocketAddress inetAddress = new InetSocketAddress(InetAddress.getLocalHost(), port); ssc.socket().bind(inetAddress); ssc.configureBlocking(false); ssc.register(selector, SelectionKey.OP_ACCEPT); listener(selector); } private void listener(Selector selector) throws IOException{ while(true){ System.out.println("等待客户端连接..."); selector.select(); System.out.println("捕获客户端连接..."); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); //连接事件 if(key.isAcceptable()){ ServerSocketChannel channel = (ServerSocketChannel) key.channel(); channel.accept().configureBlocking(false).register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); //System.out.println(channel.toString() + "-已连接"); } //读数据 if(key.isReadable()){ SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer bf = ByteBuffer.allocate(1024); channel.read(bf); System.out.println("来自客户端数据:" + new String(bf.array())); // 只有第一次通信返回消息 if(isFirst){ isFirst = false; ByteBuffer bst = ByteBuffer.wrap("Hi!".getBytes()); channel.write(bst); } } //写数据 if(key.isWritable()){ Scanner scanner = new Scanner(System.in); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String msg = sdf.format(new Date()) + " " + scanner.nextLine(); SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer bst = ByteBuffer.wrap(msg.getBytes()); channel.write(bst); // key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);// 取消写就绪,否则会一直触发写就绪 } iterator.remove(); } } } public static void main(String[] args) { try { Server server = new Server(9999); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
package com.demo.nio; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Iterator; import java.util.Scanner; import java.util.Set; public class Client { private SocketChannel sc = null; private Selector selector = null; public Client(int port) throws IOException { sc = SocketChannel.open(); selector = Selector.open(); sc.connect(new InetSocketAddress(InetAddress.getLocalHost(), port)); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE); ByteBuffer bf = ByteBuffer.wrap("Hello".getBytes()); sc.write(bf); listener(selector); } private void listener(Selector selector) throws IOException{ while(true){ selector.select(); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); while(iterator.hasNext()){ SelectionKey key = iterator.next(); if(key.isReadable()){ SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer dst = ByteBuffer.allocate(1024); channel.read(dst); System.out.println("来自服务器:" + new String(dst.array())); } if(key.isWritable()){ Scanner scanner = new Scanner(System.in); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String msg = sdf.format(new Date()) + " " + scanner.nextLine(); SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer bst = ByteBuffer.wrap(msg.getBytes()); channel.write(bst); } iterator.remove(); } } } public static void main(String[] args) { try { Client client = new Client(9999); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
上面例子的问题在注释里已经详细描述了,不信可以运行一下,下面是修正版,把写操作放在一个独立的线程里
package com.demo.nio; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Iterator; import java.util.Scanner; import java.util.Set; /** * 修正版 */ public class ServerRevision { private boolean isFirst = true; private ServerSocketChannel ssc = null; private Selector selector = null; public ServerRevision(int port) throws IOException { ssc = ServerSocketChannel.open(); selector = Selector.open(); InetSocketAddress inetAddress = new InetSocketAddress(InetAddress.getLocalHost(), port); ssc.socket().bind(inetAddress); ssc.configureBlocking(false); ssc.register(selector, SelectionKey.OP_ACCEPT); listener(selector); } private void listener(Selector selector) throws IOException{ while(true){ System.out.println("等待客户端连接..."); selector.select(); System.out.println("捕获客户端连接..."); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); //连接事件 if(key.isAcceptable()){ ServerSocketChannel channel = (ServerSocketChannel) key.channel(); channel.accept().configureBlocking(false).register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); //System.out.println(channel.toString() + "-已连接"); } //读数据 if(key.isReadable()){ SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer bf = ByteBuffer.allocate(1024); channel.read(bf); System.out.println("来自客户端数据:" + new String(bf.array())); // 只有第一次通信返回消息 if(isFirst){ isFirst = false; ByteBuffer bst = ByteBuffer.wrap("Hi!".getBytes()); channel.write(bst); } } //写数据 if(key.isWritable()){ System.out.println("[服务器]写就绪..."); new Thread(new DealWrite(key)).start(); key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);// 取消写就绪,否则会一直触发写就绪 } iterator.remove(); } } } public static void main(String[] args) { try { ServerRevision server = new ServerRevision(9999); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } class DealWrite implements Runnable{ private SelectionKey key; public DealWrite(SelectionKey key) { this.key = key; } @Override public void run() { while (true){ Scanner scanner = new Scanner(System.in); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String msg = sdf.format(new Date()) + " " + scanner.nextLine(); SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer bst = ByteBuffer.wrap(msg.getBytes()); try { channel.write(bst); } catch (IOException e) { e.printStackTrace(); } } } }
package com.demo.nio; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Iterator; import java.util.Scanner; import java.util.Set; /** * 修正版 */ public class ClientRevision { private SocketChannel sc = null; private Selector selector = null; public ClientRevision(int port) throws IOException { sc = SocketChannel.open(); selector = Selector.open(); sc.connect(new InetSocketAddress(InetAddress.getLocalHost(), port)); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE); ByteBuffer bf = ByteBuffer.wrap("Hello".getBytes()); sc.write(bf); listener(selector); } private void listener(Selector selector) throws IOException{ while(true){ selector.select(); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); while(iterator.hasNext()){ SelectionKey key = iterator.next(); if(key.isReadable()){ SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer dst = ByteBuffer.allocate(1024); channel.read(dst); System.out.println("来自服务器:" + new String(dst.array())); } if(key.isWritable()){ System.out.println("[客户端]写就绪..."); new Thread(new DealWrite(key)).start(); key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);// 取消写就绪,否则会一直触发写就绪 } iterator.remove(); } } } public static void main(String[] args) { try { ClientRevision client = new ClientRevision(9999); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
目前只是测试了服务器-客户端一对一的通信,不知道一个服务器对多个客户端会出什么bug
NIO稍微有些复杂吧,不过核心的就三个Selector、Channel、Buffer,NIO是同步非阻塞的。