原创 netty初探及业务具体实现

1、实现ApplicationRunner,完成生命周期绑定
@Component
public class NettyBootsrapRunner implements ApplicationRunner, ApplicationListener {

private static final Logger LOGGER = LoggerFactory.getLogger(NettyBootsrapRunner.class);

@Value("${netty.websocket.port}")
private int port;
@Value("${netty.websocket.ip}")
private String ip;
@Value("${netty.websocket.path}")
private String path;
@Value("${netty.websocket.max-frame-size}")
private long maxFrameSize;

private ServerSocketChannel serverChannel;


public void run(ApplicationArguments args) throws Exception {
	EventLoopGroup bossGroup = new NioEventLoopGroup();
	EventLoopGroup workerGroup = new NioEventLoopGroup();
	ServerBootstrap serverBootstrap = new ServerBootstrap();
	serverBootstrap.group(bossGroup, workerGroup)
	.channel(NioServerSocketChannel.class)
	//暂时不能处理的客户端连接请求队列长度
	.option(ChannelOption.SO_BACKLOG, 300)
	//有数据立即发送
	.childOption(ChannelOption.TCP_NODELAY, true)
	//保持连接
	.childOption(ChannelOption.SO_KEEPALIVE, true)
	.childHandler(new ChannelInitializer<SocketChannel>() {
		@Override
		protected void initChannel(SocketChannel socketChannel) throws Exception {
			socketChannel.pipeline().addLast(new SocketChooseHandler());
		}
	});
	//绑定端口,同步等待成功
	try {
		ChannelFuture future = serverBootstrap.bind(port).sync();
		if (future.isSuccess()) {
			serverChannel = (ServerSocketChannel) future.channel();
			LOGGER.info("netty 服务启动,ip={},port={}", this.ip, this.port);
		} else {
			LOGGER.info("服务端启动失败:{}", future.cause().getMessage());
		}
		//等待服务监听端口关闭,主线程阻塞
		future.channel().closeFuture().sync();
	} finally {
		bossGroup.shutdownGracefully();
		workerGroup.shutdownGracefully();
	}
}

@Override
public void onApplicationEvent(ContextClosedEvent event) {
	if (this.serverChannel != null) {
		this.serverChannel.close();
	}
	LOGGER.info("websocket 服务停止");
}

}

2、兼容socket,选择处理器
@Component
public class SocketChooseHandler extends ByteToMessageDecoder {
// WebSocket握手的协议前缀
private static final String WEBSOCKET_PREFIX = "GET /";

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    String protocol = getBufStart(in);
    if (protocol.startsWith(WEBSOCKET_PREFIX)) {
        //  websocket连接时,执行以下处理
        // HttpServerCodec:将请求和应答消息解码为HTTP消息
        ctx.pipeline().addLast(new HttpServerCodec());
        // HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息
        ctx.pipeline().addLast(new HttpObjectAggregator(65535));
        // ChunkedWriteHandler:向客户端发送HTML5文件,文件过大会将内存撑爆
        ctx.pipeline().addLast(new ChunkedWriteHandler());
        ctx.pipeline().addLast(new WebSocketFrameAggregator(65535));
        //  若超过80秒未收到约定心跳,则主动断开channel释放资源
        ctx.pipeline().addLast(new IdleStateHandler(80, 0, 0));
        //用于处理websocket, /ws为访问websocket时的uri
        ctx.pipeline().addLast(new WebSocketServerProtocolHandler("/channel"));
        ctx.pipeline().addLast(new WebsocketMessageHandler());
    } else {
        //  常规TCP连接时,执行以下处理
        ctx.pipeline().addLast(new IdleStateHandler(0, 0, 60));
	// ctx.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024 * 1024, 2, 2, -4, 4, true));
	// ctx.pipeline().addLast(new DelimiterBasedFrameDecoder(8192, Unpooled.wrappedBuffer(new byte[] { 'k' })));
        ctx.pipeline().addLast(new LineBasedFrameDecoder(1024));
        ctx.pipeline().addLast(new StringDecoder(Charset.forName("GBK")));
        ctx.pipeline().addLast(new StringEncoder(Charset.forName("GBK")));
        ctx.pipeline().addLast(new TcpMessageHandler());
    }
    in.resetReaderIndex();
    ctx.pipeline().remove(this.getClass());
}

private String getBufStart(ByteBuf in) {
    int length = in.readableBytes();
    // 标记读位置
    in.markReaderIndex();
    byte[] content = new byte[length > 10? 10 : length];
    in.readBytes(content);
    return new String(content);
}

}
3、处理请求流
socket处理器
@Slf4j
@Sharable
public class TcpMessageHandler extends ChannelInboundHandlerAdapter {
// 用于记录和管理所有客户端的channel
public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

DeviceUseRecordService deviceUseRecordService = SpringUtil.getBean(DeviceUseRecordService.class);
/**
 * @Description:客户端与服务端创建连接的时候调用
 * @author TangYan
 * @date 2020/5/12 10:45
 * @param ctx
 * @return void
 */
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    log.info("netty:TCP客户端与服务端连接开始");
}

/**
 * @Description:客户端与服务端断开连接时调用
 * @author TangYan
 * @date 2020/5/12 10:45
 * @param ctx
 * @return void
 */
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    log.info("netty:TCP客户端与服务端连接关闭");
}

/**
 * @Description: 服务端接收客户端发送过来的数据结束之后调用
 * @author TangYan
 * @date 2020/5/12 10:44
 * @param ctx
 * @return void
 */
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    ctx.flush();
    log.info("netty:TCP信息接收完毕");
}

/**
 * @Description:程出现异常的时候调用
 * @author TangYan
 * @date 2020/5/12 10:44
 * @param ctx
 * @param cause
 * @return void
 */
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    log.error("", cause);
    ctx.channel().writeAndFlush(0 + "
");
    ctx.close();
    users.remove(ctx.channel());
    UserChannelRel.removeByValue(ctx.channel().id());
    log.info("netty:当前连接出现异常");
}

/**
 * @Description:服务端处理客户端websocket请求的核心方法,这里接收了客户端发来的信息
 * @author TangYan
 * @date 2020/5/12 10:44
 * @param channelHandlerContext
 * @param info
 * @return void
 */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    log.info("来自TCP客户端" + ctx.channel().remoteAddress() + "的信息: " + msg);
    long k = deviceUseRecordService.uploadApp(ctx.channel().remoteAddress().toString().substring(1).split(":")[0], msg.toString());
    ctx.channel().writeAndFlush(k + "
");
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    super.handlerAdded(ctx);
    users.add(ctx.channel());
    log.info("netty:TCP客户端与服务端连接开始");
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
	String channelId = ctx.channel().id().asLongText();
    // 当触发handlerRemoved,ChannelGroup会自动移除对应的客户端channel
    users.remove(ctx.channel());
	// UserChannelRel.removeByValue(ctx.channel().id());
    log.info("客户端被移除,channelId为:" + channelId);
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    // 判断evt是否是IdleStateEvent(用于触发用户事件,包含 读空闲/写空闲/读写空闲)
    if (evt instanceof IdleStateEvent) {
        IdleStateEvent event = (IdleStateEvent) evt; // 强制类型转换
        if (event.state() == IdleState.READER_IDLE) {
            log.info("进入读空闲...");
        } else if (event.state() == IdleState.WRITER_IDLE) {
        	log.info("进入写空闲...");
        } else if (event.state() == IdleState.ALL_IDLE) {
        	log.info("channel关闭前users数量为:"+ users.size());
        	log.info("进入读写空闲...");
            Channel channel = ctx.channel();
            //关闭无用的channel,以防资源浪费
            channel.close();
            log.info("channel关闭后users数量为:"+ users.size());
        }
    }
}
}

netty处理器
@Sharable
public class WebsocketMessageHandler extends SimpleChannelInboundHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketMessageHandler.class);


@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
	if (msg instanceof TextWebSocketFrame) {
		TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) msg;
		// 业务层处理数据
		try {
			CommonUserService commonUserService = SpringUtil.getBean(CommonUserService.class);
			commonUserService.loadUserByUsername(textWebSocketFrame.text());
		} catch (Exception e) {
			LOGGER.error(e.getMessage());
		}
		// 响应客户端
		ctx.channel().writeAndFlush(new TextWebSocketFrame("我收到了你的消息:" + System.currentTimeMillis()));
	} else {
		// 不接受文本以外的数据帧类型
		ctx.channel().writeAndFlush(WebSocketCloseStatus.INVALID_MESSAGE_TYPE).addListener(ChannelFutureListener.CLOSE);
	}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
	super.channelInactive(ctx);
	LOGGER.info("链接断开:{}", ctx.channel().remoteAddress());
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
	super.channelActive(ctx);
	LOGGER.info("链接创建:{}", ctx.channel().remoteAddress());
}
}

一个简单的netty处理业务的功能实现了。

化繁为简,极致高效。 所有代码为本人原创,转载请联系本人。
原文地址:https://www.cnblogs.com/crissblog/p/14861462.html