netty 转发服务

NettyServer

package com.youxiong.netty.server;

import com.youxiong.netty.handler.MyChannelHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;

@Component
public class NettyServer {

    private static final Logger LOGGER = LoggerFactory.getLogger(NettyServer.class);

    @Value("${netty.server.port}")
    public Integer port;


    public Integer getPort() {
        return port;
    }

    public void setPort(Integer port) {
        this.port = port;
    }

    private void startServer(){
        //服务端需要2个线程组  boss处理客户端连接  work进行客服端连接之后的处理
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup work = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            //服务器 配置
            bootstrap.group(boss,work).channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    // HttpServerCodec:将请求和应答消息解码为HTTP消息
                    socketChannel.pipeline().addLast("http-codec",new HttpServerCodec());
                    // HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息
                    socketChannel.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));
                    // ChunkedWriteHandler:向客户端发送HTML5文件
                    socketChannel.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
                    // 进行设置心跳检测
                    socketChannel.pipeline().addLast(new IdleStateHandler(60,30,60*30, TimeUnit.SECONDS));
                    // 配置通道处理  来进行业务处理
                    socketChannel.pipeline().addLast(new MyChannelHandler());
                }
            }).option(ChannelOption.SO_BACKLOG,1024).childOption(ChannelOption.SO_KEEPALIVE,true);
            //绑定端口  开启事件驱动
            LOGGER.info("【服务器启动成功========端口:"+port+"");
            Channel channel = bootstrap.bind(port).sync().channel();
            channel.closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //关闭资源
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }

    @PostConstruct()
    public void init(){
        //需要开启一个新的线程来执行netty server 服务器
        new Thread(new Runnable() {
            public void run() {
                startServer();
            }
        }).start();
    }
}


handler

package com.youxiong.netty.handler;

import com.youxiong.netty.util.GlobalUserUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyChannelHandler extends SimpleChannelInboundHandler<Object> {


    private static final Logger LOGGER = LoggerFactory.getLogger(MyChannelHandler.class);

    private static final String URI = "websocket";

    private WebSocketServerHandshaker handshaker ;

    /**
     * 连接上服务器
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        LOGGER.info("【handlerAdded】====>"+ctx.channel().id());
        GlobalUserUtil.channels.add(ctx.channel());
    }

    /**
     * 断开连接
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        LOGGER.info("【handlerRemoved】====>"+ctx.channel().id());
        GlobalUserUtil.channels.remove(ctx);
    }

    /**
     * 连接异常   需要关闭相关资源
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LOGGER.error("【系统异常】======>"+cause.toString());
        ctx.close();
        ctx.channel().close();
    }

    /**
     * 活跃的通道  也可以当作用户连接上客户端进行使用
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LOGGER.info("【channelActive】=====>"+ctx.channel());
    }

    /**
     * 不活跃的通道  就说明用户失去连接
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    }

    /**
     * 这里只要完成 flush
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    /**
     * 这里是保持服务器与客户端长连接  进行心跳检测 避免连接断开
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent){
            IdleStateEvent stateEvent = (IdleStateEvent) evt;
            PingWebSocketFrame ping = new PingWebSocketFrame();
            switch (stateEvent.state()){
                //读空闲(服务器端)
                case READER_IDLE:
                    LOGGER.info(""+ctx.channel().remoteAddress()+"】读空闲(服务器端)");
                    ctx.writeAndFlush(ping);
                    break;
                    //写空闲(客户端)
                case WRITER_IDLE:
                    LOGGER.info(""+ctx.channel().remoteAddress()+"】写空闲(客户端)");
                    ctx.writeAndFlush(ping);
                    break;
                case ALL_IDLE:
                    LOGGER.info(""+ctx.channel().remoteAddress()+"】读写空闲");
                    break;
            }
        }
    }

    /**
     * 收发消息处理
     * @param ctx
     * @param msg
     * @throws Exception
     */
    protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
        if(msg instanceof HttpRequest){
            doHandlerHttpRequest(ctx,(HttpRequest) msg);
        }else if(msg instanceof WebSocketFrame){
            doHandlerWebSocketFrame(ctx,(WebSocketFrame) msg);
        }
    }

    /**
     * websocket消息处理
     * @param ctx
     * @param msg
     */
    private void doHandlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame msg) {
        //判断msg 是哪一种类型  分别做出不同的反应
        if(msg instanceof CloseWebSocketFrame){
            LOGGER.info("【关闭】");
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) msg);
            return ;
        }
        if(msg instanceof PingWebSocketFrame){
            LOGGER.info("【ping】");
            PongWebSocketFrame pong = new PongWebSocketFrame(msg.content().retain());
            ctx.channel().writeAndFlush(pong);
            return ;
        }
        if(msg instanceof PongWebSocketFrame){
            LOGGER.info("【pong】");
            PingWebSocketFrame ping = new PingWebSocketFrame(msg.content().retain());
            ctx.channel().writeAndFlush(ping);
            return ;
        }
        if(!(msg instanceof TextWebSocketFrame)){
            LOGGER.info("【不支持二进制】");
            throw new UnsupportedOperationException("不支持二进制");
        }
        //可以对消息进行处理
        //群发
        for (Channel channel : GlobalUserUtil.channels) {
            channel.writeAndFlush(new TextWebSocketFrame(((TextWebSocketFrame) msg).text()));
        }

    }


    /**
     * wetsocket第一次连接握手
     * @param ctx
     * @param msg
     */
    private void doHandlerHttpRequest(ChannelHandlerContext ctx, HttpRequest msg) {
        // http 解码失败
        if(!msg.getDecoderResult().isSuccess() || (!"websocket".equals(msg.headers().get("Upgrade")))){
            sendHttpResponse(ctx, (FullHttpRequest) msg,new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.BAD_REQUEST));
        }
        //可以获取msg的uri来判断
        String uri = msg.getUri();
        if(!uri.substring(1).equals(URI)){
            ctx.close();
        }
        ctx.attr(AttributeKey.valueOf("type")).set(uri);
        //可以通过url获取其他参数
        WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(
                "ws://"+msg.headers().get("Host")+"/"+URI+"",null,false
        );
        handshaker = factory.newHandshaker(msg);
        if(handshaker == null){
            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
        }
        //进行连接
        handshaker.handshake(ctx.channel(), (FullHttpRequest) msg);
        //可以做其他处理
    }

    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
        // 返回应答给客户端
        if (res.getStatus().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
        }
        // 如果是非Keep-Alive,关闭连接
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }
}
package com.youxiong.netty.util;

import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

public class GlobalUserUtil {

    //保存全局的  连接上服务器的客户
    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor
            .INSTANCE);
}

NettyServer
package com.youxiong.netty.server;
import com.youxiong.netty.handler.MyChannelHandler;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.http.HttpObjectAggregator;import io.netty.handler.codec.http.HttpServerCodec;import io.netty.handler.stream.ChunkedWriteHandler;import io.netty.handler.timeout.IdleStateHandler;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;import java.util.concurrent.TimeUnit;
@Componentpublic class NettyServer {
    private static final Logger LOGGER = LoggerFactory.getLogger(NettyServer.class);
    @Value("${netty.server.port}")    public Integer port;

    public Integer getPort() {        return port;    }
    public void setPort(Integer port) {        this.port = port;    }
    private void startServer(){        //服务端需要2个线程组  boss处理客户端连接  work进行客服端连接之后的处理        EventLoopGroup boss = new NioEventLoopGroup();        EventLoopGroup work = new NioEventLoopGroup();        try {            ServerBootstrap bootstrap = new ServerBootstrap();            //服务器 配置            bootstrap.group(boss,work).channel(NioServerSocketChannel.class)            .childHandler(new ChannelInitializer<SocketChannel>() {                protected void initChannel(SocketChannel socketChannel) throws Exception {                    // HttpServerCodec:将请求和应答消息解码为HTTP消息                    socketChannel.pipeline().addLast("http-codec",new HttpServerCodec());                    // HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息                    socketChannel.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));                    // ChunkedWriteHandler:向客户端发送HTML5文件                    socketChannel.pipeline().addLast("http-chunked",new ChunkedWriteHandler());                    // 进行设置心跳检测                    socketChannel.pipeline().addLast(new IdleStateHandler(60,30,60*30, TimeUnit.SECONDS));                    // 配置通道处理  来进行业务处理                    socketChannel.pipeline().addLast(new MyChannelHandler());                }            }).option(ChannelOption.SO_BACKLOG,1024).childOption(ChannelOption.SO_KEEPALIVE,true);            //绑定端口  开启事件驱动            LOGGER.info("【服务器启动成功========端口:"+port+"】");            Channel channel = bootstrap.bind(port).sync().channel();            channel.closeFuture().sync();        }catch (Exception e){            e.printStackTrace();        }finally {            //关闭资源            boss.shutdownGracefully();            work.shutdownGracefully();        }    }
    @PostConstruct()    public void init(){        //需要开启一个新的线程来执行netty server 服务器        new Thread(new Runnable() {            public void run() {                startServer();            }        }).start();    }}handler
package com.youxiong.netty.handler;
import com.youxiong.netty.util.GlobalUserUtil;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.handler.codec.http.*;import io.netty.handler.codec.http.websocketx.*;import io.netty.handler.timeout.IdleStateEvent;import io.netty.util.AttributeKey;import io.netty.util.CharsetUtil;import org.slf4j.Logger;import org.slf4j.LoggerFactory;
public class MyChannelHandler extends SimpleChannelInboundHandler<Object> {

    private static final Logger LOGGER = LoggerFactory.getLogger(MyChannelHandler.class);
    private static final String URI = "websocket";
    private WebSocketServerHandshaker handshaker ;
    /**     * 连接上服务器     * @param ctx     * @throws Exception     */    @Override    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {        LOGGER.info("【handlerAdded】====>"+ctx.channel().id());        GlobalUserUtil.channels.add(ctx.channel());    }
    /**     * 断开连接     * @param ctx     * @throws Exception     */    @Override    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {        LOGGER.info("【handlerRemoved】====>"+ctx.channel().id());        GlobalUserUtil.channels.remove(ctx);    }
    /**     * 连接异常   需要关闭相关资源     * @param ctx     * @param cause     * @throws Exception     */    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        LOGGER.error("【系统异常】======>"+cause.toString());        ctx.close();        ctx.channel().close();    }
    /**     * 活跃的通道  也可以当作用户连接上客户端进行使用     * @param ctx     * @throws Exception     */    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        LOGGER.info("【channelActive】=====>"+ctx.channel());    }
    /**     * 不活跃的通道  就说明用户失去连接     * @param ctx     * @throws Exception     */    @Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception {    }
    /**     * 这里只要完成 flush     * @param ctx     * @throws Exception     */    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        ctx.flush();    }
    /**     * 这里是保持服务器与客户端长连接  进行心跳检测 避免连接断开     * @param ctx     * @param evt     * @throws Exception     */    @Override    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {        if(evt instanceof IdleStateEvent){            IdleStateEvent stateEvent = (IdleStateEvent) evt;            PingWebSocketFrame ping = new PingWebSocketFrame();            switch (stateEvent.state()){                //读空闲(服务器端)                case READER_IDLE:                    LOGGER.info("【"+ctx.channel().remoteAddress()+"】读空闲(服务器端)");                    ctx.writeAndFlush(ping);                    break;                    //写空闲(客户端)                case WRITER_IDLE:                    LOGGER.info("【"+ctx.channel().remoteAddress()+"】写空闲(客户端)");                    ctx.writeAndFlush(ping);                    break;                case ALL_IDLE:                    LOGGER.info("【"+ctx.channel().remoteAddress()+"】读写空闲");                    break;            }        }    }
    /**     * 收发消息处理     * @param ctx     * @param msg     * @throws Exception     */    protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {        if(msg instanceof HttpRequest){            doHandlerHttpRequest(ctx,(HttpRequest) msg);        }else if(msg instanceof WebSocketFrame){            doHandlerWebSocketFrame(ctx,(WebSocketFrame) msg);        }    }
    /**     * websocket消息处理     * @param ctx     * @param msg     */    private void doHandlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame msg) {        //判断msg 是哪一种类型  分别做出不同的反应        if(msg instanceof CloseWebSocketFrame){            LOGGER.info("【关闭】");            handshaker.close(ctx.channel(), (CloseWebSocketFrame) msg);            return ;        }        if(msg instanceof PingWebSocketFrame){            LOGGER.info("【ping】");            PongWebSocketFrame pong = new PongWebSocketFrame(msg.content().retain());            ctx.channel().writeAndFlush(pong);            return ;        }        if(msg instanceof PongWebSocketFrame){            LOGGER.info("【pong】");            PingWebSocketFrame ping = new PingWebSocketFrame(msg.content().retain());            ctx.channel().writeAndFlush(ping);            return ;        }        if(!(msg instanceof TextWebSocketFrame)){            LOGGER.info("【不支持二进制】");            throw new UnsupportedOperationException("不支持二进制");        }        //可以对消息进行处理        //群发        for (Channel channel : GlobalUserUtil.channels) {            channel.writeAndFlush(new TextWebSocketFrame(((TextWebSocketFrame) msg).text()));        }
    }

    /**     * wetsocket第一次连接握手     * @param ctx     * @param msg     */    private void doHandlerHttpRequest(ChannelHandlerContext ctx, HttpRequest msg) {        // http 解码失败        if(!msg.getDecoderResult().isSuccess() || (!"websocket".equals(msg.headers().get("Upgrade")))){            sendHttpResponse(ctx, (FullHttpRequest) msg,new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.BAD_REQUEST));        }        //可以获取msg的uri来判断        String uri = msg.getUri();        if(!uri.substring(1).equals(URI)){            ctx.close();        }        ctx.attr(AttributeKey.valueOf("type")).set(uri);        //可以通过url获取其他参数        WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(                "ws://"+msg.headers().get("Host")+"/"+URI+"",null,false        );        handshaker = factory.newHandshaker(msg);        if(handshaker == null){            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());        }        //进行连接        handshaker.handshake(ctx.channel(), (FullHttpRequest) msg);        //可以做其他处理    }
    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {        // 返回应答给客户端        if (res.getStatus().code() != 200) {            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);            res.content().writeBytes(buf);            buf.release();        }        // 如果是非Keep-Alive,关闭连接        ChannelFuture f = ctx.channel().writeAndFlush(res);        if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != 200) {            f.addListener(ChannelFutureListener.CLOSE);        }    }}package com.youxiong.netty.util;
import io.netty.channel.group.ChannelGroup;import io.netty.channel.group.DefaultChannelGroup;import io.netty.util.concurrent.GlobalEventExecutor;
public class GlobalUserUtil {
    //保存全局的  连接上服务器的客户    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor            .INSTANCE);}--------------------- 作者:yx726843014 来源:CSDN 原文:https://blog.csdn.net/xieliaowa9231/article/details/80151446 版权声明:本文为博主原创文章,转载请附上博文链接!

原文地址:https://www.cnblogs.com/hejunnuo/p/10333432.html