netty webSocket

import com.hc.hc.learnPlan.netty.nettyClientAndServerToStudy.ServerClientTOTalk;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.CharsetUtil;

import java.net.SocketImpl;

/**

  • http 协议是无状态的请求,浏览器和服务器的请求,每请求一次就会从新生成一次
  • 通过webSocket方式来实现 页面和服务器的长连接 并能检测到状态
    */

public class NettyAboutWebSocket {

private final static Integer port = 9999;

/** 1 http在传输过程中是分段的 httpObjectAggreator 将多个分段聚合起来
 *  当数据量较大的时候就需要这个来把数据聚合起来
 *  2 因为需要http的编解码器 httpServeCodec()
 *  3 而且整个过程是以块的方式进行编写的ChunkedWriteHandler编写的
 *  4 webSocketServerProtocolHandler 主要处理url,将http协议升级为ws协议 ws即保持长连接
 *  5 websocket是通过帧的形式来进行的
 * @throws InterruptedException
 */

private static void init() throws InterruptedException {
// 创建一个启动器
ServerBootstrap serverBootstrap = new ServerBootstrap();
// boss 线程 group 线程
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
serverBootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

                    pipeline.addLast(new HttpServerCodec());
                    pipeline.addLast(new HttpObjectAggregator(9856));
                    pipeline.addLast(new ChunkedWriteHandler());

                    pipeline.addLast(new WebSocketServerProtocolHandler("/hai"));
                    pipeline.addLast(new MyselfSocket());
                }
            });
  ChannelFuture bind = serverBootstrap.bind(port);
  bind.channel().closeFuture().sync();
  bossGroup.shutdownGracefully();
  workGroup.shutdownGracefully();

}

public static void main(String[] args) throws InterruptedException {

   NettyAboutWebSocket.init();

}

}

自定义业务逻辑

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import java.time.LocalDateTime;

/**

  • 有数据通过SimpleChannelInboundHandler

  • 通过TextWebSocketFrame来进行数据交互
    */
    public class MyselfSocket extends SimpleChannelInboundHandler {

    /**

    • 连接后 就会触发
    • @param ctx
    • @throws Exception
      */
      @Override
      public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
      System.out.println("服务器连接id为"+ctx.channel().id().asLongText());
      }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
    String text = msg.text();
    System.out.println("服务器收到了消息"+text);
    ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间"+ LocalDateTime.now()+text));

    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    System.out.println("服务器关闭"+ctx.channel().id().asLongText());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    System.out.println("服务器异常发生"+cause.getMessage());
    ctx.close();
    }
    }

原文地址:https://www.cnblogs.com/chianw877466657/p/13088465.html