什么是Netty
Netty由JBOSS提供的基于Java NIO的开源框架,Netty提供异步非阻塞、事件驱动、高性能、高可靠、高可定制性的网络应用程序和工具,
可用于开发服务端和客户端。
配置服务端
public class SimpleChatServer { private int port; public SimpleChatServer(int port) { this.port = port; } public void run() throws Exception { //EventLoopGroup 处理I/O操作的多线程事件循环器 bossGroup 用来接收进来的连接 workerGroup 用来处理已经被接收的连接 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 一个启动 NIO 服务的辅助启动类 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new SimpleChatServerInitializer()) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); System.out.println("服务端 启动"); // 绑定端口,开始接收进来的连接 ChannelFuture f = b.bind(port).sync(); // 等待服务器 socket 关闭 。 // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。 f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); System.out.println("SimpleChatServer 关闭了"); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new SimpleChatServer(port).run(); } }
配置服务端消息处理
public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String> { public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception { Channel incoming = channelHandlerContext.channel(); for (Channel channel : channels) { if (channel != incoming){ channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + s + " "); } else { channel.writeAndFlush("[you]" + s + " "); } } } /** * 服务端监听到客户端活动 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"在线"); } /** * 服务端监听到客户端不活动 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"掉线"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"异常"); // 当出现异常就关闭连接 cause.printStackTrace(); ctx.close(); } /** * 每当从服务端读到客户端写入信息时,将信息转发给其他客户端的 Channel * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); for (Channel channel : channels) { channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入 "); } channels.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); for (Channel channel : channels) { channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 离开 "); } channels.remove(ctx.channel()); }
配置服务端初始化设置
public class SimpleChatServerInitializer extends ChannelInitializer<SocketChannel> { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("handler", new SimpleChatServerHandler()); System.out.println("SimpleChatClient:"+ch.remoteAddress() +"连接上"); } }
配置用户端
public class SimpleChatClient { public static void main(String[] args) throws Exception{ new SimpleChatClient("localhost", 8080).run(); } private final String host; private final int port; public SimpleChatClient(String host, int port){ this.host = host; this.port = port; } public void run() throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new SimpleChatClientInitializer()); Channel channel = bootstrap.connect(host, port).sync().channel(); BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); while(true){ channel.writeAndFlush(in.readLine() + " "); } } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } }
用来启动服务
配置客户端初始化文件
public class SimpleChatClientInitializer extends ChannelInitializer<SocketChannel> { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("handler", new SimpleChatClientHandler()); } }
处理得到消息的配置
public class SimpleChatClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { System.out.println(s); } }