Java Netty (2)

通过一个实例来说明Netty的使用。用1个服务器连接5个客户端线程,客户端连接上服务器以后就向服务器发送消息,服务器接收到消息后向客户端返回消息,客户端接收到消息以后,等待随机的时间,再向服务端发送消息,这样一直循环下去。

项目结构:

NettyServer.java:

package Server;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;  
import org.jboss.netty.channel.*;  
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;  
import org.jboss.netty.handler.execution.ExecutionHandler;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;

import Util.Constant;


public class NettyServer {
    
    public static String host = "127.0.0.1";
    
    // 创建1个线程池
    static ExecutionHandler executionHandler = new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));
    
    public static void main(String[] args) {
        // ChannelFactory
        final ChannelFactory channelFactory = new NioServerSocketChannelFactory(  
                // Boss线程池,处理Socket请求
                Executors.newCachedThreadPool(),  
                // Worker线程池,由于使用的是NIO,1个Worker线程可以管理多个Channel
                Executors.newCachedThreadPool()); 
        // ServerBootstrap
        ServerBootstrap bootstrap = new ServerBootstrap(channelFactory);

        ServerPipelineFactory serverPipelineFactory = new ServerPipelineFactory(executionHandler);
        bootstrap.setPipelineFactory(serverPipelineFactory); 
        
        // 禁用nagle算法
        bootstrap.setOption("child.tcpNoDelay", true);  
        // 启用TCP保活检测
        bootstrap.setOption("child.keepAlive", true); 
        
        // 监听5个端口
        bootstrap.bind(new InetSocketAddress(Constant.p1));
        System.out.println("Listening port " + Constant.p1 + "...");
        bootstrap.bind(new InetSocketAddress(Constant.p2));
        System.out.println("Listening port " + Constant.p2 + "...");
        bootstrap.bind(new InetSocketAddress(Constant.p3));
        System.out.println("Listening port " + Constant.p3 + "...");
        bootstrap.bind(new InetSocketAddress(Constant.p4));
        System.out.println("Listening port " + Constant.p4 + "...");
        bootstrap.bind(new InetSocketAddress(Constant.p5));
        System.out.println("Listening port " + Constant.p5 + "...");
    }

}

ServerPipelineFactory.java:

package Server;

import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
import org.jboss.netty.handler.execution.ExecutionHandler;

public class ServerPipelineFactory implements ChannelPipelineFactory {
    
    private final ExecutionHandler executionHandler; 
    
    public ServerPipelineFactory(ExecutionHandler executionHandler){
        this.executionHandler = executionHandler;
    }

    @Override
    public ChannelPipeline getPipeline() throws Exception {
        // TODO Auto-generated method stub
        return Channels.pipeline( 
                new StringEncoder(),    
                new StringDecoder(), 
                // 多个pipeline之间必须共享同一个ExecutionHandler,放在业务逻辑handler之前
                executionHandler,
                // 业务逻辑handler
                new MyServerHandler());
    } 

}

MyServerHandler.java:

package Server;

import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import Util.Tool;

public class MyServerHandler extends SimpleChannelHandler{
    
    @SuppressWarnings("static-access")
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        System.out.println("Server received:" + e.getMessage());
        // 休息随机秒后发送消息
        Thread th = Thread.currentThread();
        int interval = Tool.getInterval(100);
        th.sleep(interval*1000);
        e.getChannel().write("from Server: Hello!");
        super.messageReceived(ctx, e);
    }
    
    @Override  
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {  
        e.getCause().printStackTrace();  
        Channel ch = e.getChannel();  
        ch.close(); 
        super.exceptionCaught(ctx, e);
    } 
    
    @Override  
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        System.out.println("A client connected!");
        super.channelConnected(ctx, e); 
    }

}

NettyClient.java:

package Client;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ClientBootstrap;  
import org.jboss.netty.channel.*;  
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;  
import org.jboss.netty.handler.execution.ExecutionHandler;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;

import Util.Constant;

public class NettyClient extends Thread{
    
    public static String host = "127.0.0.1";
    ClientBootstrap bootstrap;
    int port;
    
    // 创建1个线程池
    static ExecutionHandler executionHandler = new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));
    
    public NettyClient(int port) {
        this.port = port;
        // ChannelFactory
        final ChannelFactory channelFactory = new NioClientSocketChannelFactory(  
                // Boss线程池
                Executors.newCachedThreadPool(),  
                // Worker线程池
                Executors.newCachedThreadPool());  
        // ServerBootstrap
        bootstrap = new ClientBootstrap(channelFactory);
        
        ClientPipelineFactory clientPipelineFactory = new ClientPipelineFactory(executionHandler);
        bootstrap.setPipelineFactory(clientPipelineFactory);
        bootstrap.setOption("tcpNoDelay" ,true);  
        bootstrap.setOption("keepAlive", true);  
        bootstrap.connect(new InetSocketAddress(port));
 
    }
    
    public void run(){
        ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
        // 开始试图连接
        System.out.println("Connecting to port " + port + "...");
        // 等待直到连接关闭或失败
        future.getChannel().getCloseFuture().awaitUninterruptibly(); 
        // 关闭线程池准备退出
        bootstrap.releaseExternalResources();
    }
    
    public static void main(String[] args) {
        NettyClient nc1 = new NettyClient(Constant.p1);
        NettyClient nc2 = new NettyClient(Constant.p2);
        NettyClient nc3 = new NettyClient(Constant.p3);
        NettyClient nc4 = new NettyClient(Constant.p4);
        NettyClient nc5 = new NettyClient(Constant.p5);
        
        nc1.start();
        nc2.start();
        nc3.start();
        nc4.start();
        nc5.start();
    }

}

ClientPipelineFactory.java:

package Client;

import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
import org.jboss.netty.handler.execution.ExecutionHandler;

public class ClientPipelineFactory implements ChannelPipelineFactory {
    
    private final ExecutionHandler executionHandler; 
    
    public ClientPipelineFactory(ExecutionHandler executionHandler){
        this.executionHandler = executionHandler;
    }

    @Override
    public ChannelPipeline getPipeline() throws Exception {
        // TODO Auto-generated method stub
        return Channels.pipeline( 
                new StringEncoder(),    
                new StringDecoder(), 
                // 多个pipeline之间必须共享同一个ExecutionHandler,放在业务逻辑handler之前
                executionHandler,
                // 业务逻辑handler
                new MyClientHandler());
    } 
}

MyClientHandler.java:

package Client;

import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;

import Util.Tool;

public class MyClientHandler extends SimpleChannelHandler{
    
      
    // 连接到服务端时,发出消息
    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 
        System.out.println("Connected to Server!");
        e.getChannel().write("from Client: Hello! " + System.currentTimeMillis()); 
        super.channelConnected(ctx, e);
    }  
  
    @SuppressWarnings("static-access")
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        System.out.println("Client Received:" + e.getMessage());
        // 休息随机秒后发送消息
        Thread th = Thread.currentThread();
        int interval = Tool.getInterval(5);
        th.sleep(interval*1000);
        e.getChannel().write("from Client: Hello! "  + System.currentTimeMillis());
        super.messageReceived(ctx, e);
    }  
      
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {  
        e.getCause().printStackTrace();  
        Channel ch = e.getChannel();  
        ch.close(); 
        super.exceptionCaught(ctx, e);
    } 

}

Constant.java:

package Util;

public class Constant {
    final static int start = 10000;
    public static int p1 = start + 1;
    public static int p2 = start + 2;
    public static int p3 = start + 3;
    public static int p4 = start + 4;
    public static int p5 = start + 5;
}

Tool.java:

package Util;

import java.util.Random;

public class Tool {
    
    static Random rand = new Random();
    
    public static int getInterval(int max){
        return rand.nextInt(max);
    }
}
原文地址:https://www.cnblogs.com/mstk/p/6791675.html