netty websocket

0.maven依赖

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha1</version>
</dependency>

1.

package cn.cloudwalk.isc.util.netty;

import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;


public class CaptureGlobalUserUtil {

/**
* 保存全局的 连接上服务器的客户
*/
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor
.INSTANCE);
}

2.

package cn.cloudwalk.isc.util.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.AttributeKey;
import io.netty.util.CharsetUtil;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.cloudwalk.isc.util.em.Enums.NettyUri;


public class CaptureMyChannelHandler
extends SimpleChannelInboundHandler<Object> {

private static final Logger LOGGER = LoggerFactory
.getLogger(CaptureMyChannelHandler.class);

private static WebSocketServerHandshaker handshaker;
private static final String HOME_ADD = "/home";
private static final int STATUS_CODE = 200;
private static Map<String, List<ChannelHandlerContext>> CTX_MAP = new HashMap<String, List<ChannelHandlerContext>>();

/**
* 连接上服务器
* <p>Title: handlerAdded</p>
* <p>Description: </p>
* @param ctx
* @throws Exception
* @see io.netty.channel.ChannelHandlerAdapter#handlerAdded(io.netty.channel.ChannelHandlerContext)
* @date: 2019年2月15日 下午3:07:23
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// LOGGER.info("【handlerAdded】====>" + ctx.channel().id());
CaptureGlobalUserUtil.channels.add(ctx.channel());
}

/**
* 断开连接
* <p>Title: handlerRemoved</p>
* <p>Description: </p>
* @param ctx
* @throws Exception
* @see io.netty.channel.ChannelHandlerAdapter#handlerRemoved(io.netty.channel.ChannelHandlerContext)
* @date: 2019年2月15日 下午3:07:16
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// LOGGER.info("【handlerRemoved】====>" + ctx.channel().id());
CaptureGlobalUserUtil.channels.remove(ctx);
}

/**
* 连接异常 需要关闭相关资源
* <p>Title: exceptionCaught</p>
* <p>Description: </p>
* @param ctx
* @param cause
* @throws Exception
* @see io.netty.channel.ChannelHandlerAdapter#exceptionCaught(io.netty.channel.ChannelHandlerContext, java.lang.Throwable)
* @date: 2019年2月15日 下午3:07:05
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
// LOGGER.error("【系统异常】======>" + cause.toString());
ctx.close();
ctx.channel().close();
}

/**
* 活跃的通道 也可以当作用户连接上客户端进行使用
* <p>Title: channelActive</p>
* <p>Description: </p>
* @param ctx
* @throws Exception
* @see io.netty.channel.ChannelHandlerAdapter#channelActive(io.netty.channel.ChannelHandlerContext)
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("【channelActive】=====>" + ctx.channel());
}

/**
* 不活跃的通道 就说明用户失去连接
* <p>Title: channelInactive</p>
* <p>Description: </p>
* @param ctx
* @throws Exception
* @see io.netty.channel.ChannelHandlerAdapter#channelInactive(io.netty.channel.ChannelHandlerContext)
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}

/**
* 这里只要完成 flush
* <p>Title: channelReadComplete</p>
* <p>Description: </p>
* @param ctx
* @throws Exception
* @see io.netty.channel.ChannelHandlerAdapter#channelReadComplete(io.netty.channel.ChannelHandlerContext)
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
ctx.flush();
}

/**
* 这里是保持服务器与客户端长连接 进行心跳检测 避免连接断开
* @param ctx
* @param evt
* @throws Exception
*/
/*
* @Override public void userEventTriggered(ChannelHandlerContext ctx,
* Object evt) throws Exception { if(evt instanceof IdleStateEvent){
* IdleStateEvent stateEvent = (IdleStateEvent) evt; PingWebSocketFrame ping
* = new PingWebSocketFrame(); switch (stateEvent.state()){ //读空闲(服务器端) case
* READER_IDLE: LOGGER.info("【"+ctx.channel().remoteAddress()+"】读空闲(服务器端)");
* ctx.writeAndFlush(ping); break; //写空闲(客户端) case WRITER_IDLE:
* LOGGER.info("【"+ctx.channel().remoteAddress()+"】写空闲(客户端)");
* ctx.writeAndFlush(ping); break; case ALL_IDLE:
* LOGGER.info("【"+ctx.channel().remoteAddress()+"】读写空闲"); break; } } }
*/

/**
* 收发消息处理
* <p>Title: messageReceived</p>
* <p>Description: </p>
* @param ctx
* @param msg
* @throws Exception
* @see io.netty.channel.SimpleChannelInboundHandler#messageReceived(io.netty.channel.ChannelHandlerContext, java.lang.Object)
*/
@Override
protected void messageReceived(ChannelHandlerContext ctx, Object msg)
throws Exception {
if (msg instanceof HttpRequest) {
doHandlerHttpRequest(ctx, (HttpRequest) msg);
// if(null == pubctx) {
// pubctx = ctx;
// }
}
else if (msg instanceof WebSocketFrame) {
doHandlerWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}

/**
* websocket消息处理
* @Title: doHandlerWebSocketFrame
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param ctx
* @param msg
* @return: void
*/
public static void doHandlerWebSocketFrame(ChannelHandlerContext ctx,
WebSocketFrame msg) {
//判断msg 是哪一种类型 分别做出不同的反应
if (msg instanceof CloseWebSocketFrame) {
// LOGGER.info("【关闭】");
handshaker.close(ctx.channel(), (CloseWebSocketFrame) msg);
return;
}
if (msg instanceof PingWebSocketFrame) {
// LOGGER.info("【ping】");
PongWebSocketFrame pong = new PongWebSocketFrame(
msg.content().retain());
ctx.channel().writeAndFlush(pong);
return;
}
if (msg instanceof PongWebSocketFrame) {
// LOGGER.info("【pong】");
PingWebSocketFrame ping = new PingWebSocketFrame(
msg.content().retain());
ctx.channel().writeAndFlush(ping);
return;
}
if (!(msg instanceof TextWebSocketFrame)) {
// LOGGER.info("【不支持二进制】");
throw new UnsupportedOperationException("不支持二进制");
}
//可以对消息进行处理
//群发
if (ctx == null) {
// ctx = CTX_MAP.get()
}
for (Channel channel : CaptureGlobalUserUtil.channels) {
// TextWebSocketFrame tw = new TextWebSocketFrame("123123");
// channel.writeAndFlush(new TextWebSocketFrame(tw.text()));
ctx.channel().writeAndFlush(
new TextWebSocketFrame(((TextWebSocketFrame) msg).text()));
// channel.writeAndFlush(new TextWebSocketFrame(((TextWebSocketFrame) msg).text()));
}

}

/**
* wetsocket第一次连接握手
* @Title: doHandlerHttpRequest
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param ctx
* @param msg
* @return: void
*/
private void doHandlerHttpRequest(ChannelHandlerContext ctx,
HttpRequest msg) {
// http 解码失败
System.out.println(
"-------------------------msg.headers().get("Upgrade"))--------------"
+ msg.headers().get("Validator"));
if (!msg.getDecoderResult().isSuccess()
|| (!"websocket".equalsIgnoreCase(msg.headers().get("Upgrade")))) {
sendHttpResponse(ctx, (FullHttpRequest) msg,
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.BAD_REQUEST));
}
//可以获取msg的uri来判断
String uri = msg.getUri();
if (!uri.substring(1).equals(NettyUri.URI_HOME.getCode())
&& !uri.substring(1).equals(NettyUri.URI_OTHER.getCode())) {
ctx.close();
}
ctx.attr(AttributeKey.valueOf("type")).set(uri);
//可以通过url获取其他参数
String flag = null;
if (HOME_ADD.equals(uri)) {
flag = NettyUri.URI_HOME.getCode();
}
else {
flag = NettyUri.URI_OTHER.getCode();
}

WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(
"ws://" + msg.headers().get("Host") + "/" + flag + "", null,
false);
handshaker = factory.newHandshaker(msg);
if (handshaker == null) {
WebSocketServerHandshakerFactory
.sendUnsupportedWebSocketVersionResponse(ctx.channel());
}
//进行连接
handshaker.handshake(ctx.channel(), (FullHttpRequest) msg);
//可以做其他处理
if (CTX_MAP.containsKey(flag)) {
CTX_MAP.get(flag).add(ctx);
}
else {
List<ChannelHandlerContext> ctxList = new ArrayList<ChannelHandlerContext>();
ctxList.add(ctx);
CTX_MAP.put(flag, ctxList);
}

}

/**
* 应答给客户端
* @Title: sendHttpResponse
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param ctx
* @param req
* @param res  
* @return: void
*/
private static void sendHttpResponse(ChannelHandlerContext ctx,
FullHttpRequest req, DefaultFullHttpResponse res) {
// 返回应答给客户端
if (res.getStatus().code() != STATUS_CODE) {
ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(),
CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
}
// 如果是非Keep-Alive,关闭连接
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != STATUS_CODE) {
f.addListener(ChannelFutureListener.CLOSE);
}
}

/**
* 发送消息
*/
public static void send(String msg, String upgrade) {
if (CTX_MAP.containsKey(upgrade)) {
List<ChannelHandlerContext> ctxList = CTX_MAP.get(upgrade);
TextWebSocketFrame tw = new TextWebSocketFrame(msg);
for (ChannelHandlerContext ctx : ctxList) {
ctx.channel().writeAndFlush(new TextWebSocketFrame(tw.text()));
}
}
// else {
// LOGGER.info("消息推送失败,未找到[" + upgrade + "]channel");
// }
}
}

3.

package cn.cloudwalk.isc.util.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import cn.cloudwalk.isc.util.thread.SysThreadPool;

import javax.annotation.PostConstruct;

import java.util.concurrent.TimeUnit;


@Component
public class CaptureNettyServer {

private static final Logger LOGGER = LoggerFactory.getLogger(CaptureNettyServer.class);

@Value("${netty.server.port}")
public Integer port;


public Integer getPort() {
return port;
}

public void setPort(Integer port) {
this.port = port;
}

@PostConstruct()
public void init(){
StartTask task = new StartTask(port);
SysThreadPool.getThread().execute(task);
}

static class StartTask implements Runnable {
private int taskPport;

public StartTask(int taskPport) {
this.taskPport = taskPport;
}

public int getTaskPport() {
return taskPport;
}

public void setTaskPport(int taskPport) {
this.taskPport = taskPport;
}

@Override
public void run() {
//服务端需要2个线程组 boss处理客户端连接 work进行客服端连接之后的处理
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup work = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
//服务器 配置
bootstrap.group(boss,work).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// HttpServerCodec:将请求和应答消息解码为HTTP消息
socketChannel.pipeline().addLast("http-codec",new HttpServerCodec());
// HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息
socketChannel.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));
// ChunkedWriteHandler:向客户端发送HTML5文件
socketChannel.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
// 进行设置心跳检测
socketChannel.pipeline().addLast(new IdleStateHandler(60,30,60*30, TimeUnit.SECONDS));
// 配置通道处理 来进行业务处理
socketChannel.pipeline().addLast(new CaptureMyChannelHandler());
}
}).option(ChannelOption.SO_BACKLOG,1024).childOption(ChannelOption.SO_KEEPALIVE,true);
//绑定端口 开启事件驱动
LOGGER.info("【服务器启动成功========端口:"+taskPport+"】");
Channel channel = bootstrap.bind(taskPport).sync().channel();
channel.closeFuture().sync();
}catch (Exception e){
LOGGER.error(e.toString(), e);
}finally {
//关闭资源
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
}
}

4.html

<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset = utf-8"/>
<title>WebSocket客户端</title>
<script type="text/javascript">
var socket;
if(!window.WebSocket){
window.WebSocket = window.MozWebSocket;
}

if(window.WebSocket){
socket = new WebSocket("ws://localhost:9005/home");
socket.onmessage = function(event){
var ta = document.getElementById('responseContent');
ta.value += event.data + " ";
};

socket.onopen = function(event){
var ta = document.getElementById('responseContent');
ta.value = "你当前的浏览器支持WebSocket,请进行后续操作 ";
};

socket.onclose = function(event){
var ta = document.getElementById('responseContent');
ta.value = "";
ta.value = "WebSocket连接已经关闭 ";
};
}else{
alert("您的浏览器不支持WebSocket");
}


function send(message){
if(!window.WebSocket){
return;
}
if(socket.readyState == WebSocket.OPEN){
socket.send(message);
}else{
alert("WebSocket连接没有建立成功!!");
}
}
</script>
</head>
<body>
<form onSubmit="return false;">
<input type = "text" name = "message" value = ""/>
<br/><br/>
<input type = "button" value = "发送WebSocket请求消息" onClick = "send(this.form.message.value)"/>
<hr color="red"/>
<h2>客户端接收到服务端返回的应答消息</h2>
<textarea id = "responseContent" style = "1024px; height:300px"></textarea>
</form>
</body>
</html>

原文地址:https://www.cnblogs.com/jinnian18sui/p/11023924.html