Spring+Netty+WebSocket实例

比较贴近生产,详见注释

一、pom.xml

具体太长,详见源码

    </dependency>
        <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.2.Final</version> 
    </dependency>

二、目录结构

三、AfterSpringBegin

继承了AfterSpringBegin的子类在spring加载成功后,会自动启动

package com.netty.init;

import java.util.Timer;
import java.util.TimerTask;



import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
/**
 * 
 * spring加载后改方法的子类
 * */
public abstract class AfterSpringBegin extends TimerTask  implements ApplicationListener<ContextRefreshedEvent>{

    public void onApplicationEvent(ContextRefreshedEvent event) {
        // TODO Auto-generated method stub
        if(event.getApplicationContext().getParent() ==null){
            
            Timer timer = new Timer();
            timer.schedule(this, 0);
        }
    }

}
View Code

四、Constant

存放了websocket相关信道

package com.netty.constant;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
 * 常量
 * */
public class Constant {
    //存放所有的ChannelHandlerContext
    public static Map<String, ChannelHandlerContext> pushCtxMap = new ConcurrentHashMap<String, ChannelHandlerContext>() ;
    
    //存放某一类的channel
    public static ChannelGroup aaChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

}
View Code

五、WebSocketServer

启动服务

package com.netty.server;

import javax.annotation.PreDestroy;

import org.springframework.beans.factory.annotation.Autowired;

import com.netty.init.AfterSpringBegin;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * 启动服务
 * */

public class WebSocketServer extends AfterSpringBegin{

    //用于客户端连接请求
    @Autowired
    private EventLoopGroup bossGroup;
    
    //用于处理客户端I/O操作
    @Autowired
    private EventLoopGroup workerGroup;
    
    //服务器的辅助启动类
    @Autowired
    private ServerBootstrap serverBootstrap;
    
    //BS的I/O处理类
    private ChannelHandler childChannelHandler;
    
    private ChannelFuture channelFuture;
    
    //服务端口
    private int port;
    
    public WebSocketServer(){
        
        System.out.println("初始化");
    }

    public EventLoopGroup getBossGroup() {
        return bossGroup;
    }

    public void setBossGroup(EventLoopGroup bossGroup) {
        this.bossGroup = bossGroup;
    }

    public EventLoopGroup getWorkerGroup() {
        return workerGroup;
    }

    public void setWorkerGroup(EventLoopGroup workerGroup) {
        this.workerGroup = workerGroup;
    }

    public ServerBootstrap getServerBootstrap() {
        return serverBootstrap;
    }

    public void setServerBootstrap(ServerBootstrap serverBootstrap) {
        this.serverBootstrap = serverBootstrap;
    }

    public ChannelHandler getChildChannelHandler() {
        return childChannelHandler;
    }

    public void setChildChannelHandler(ChannelHandler childChannelHandler) {
        this.childChannelHandler = childChannelHandler;
    }

    public ChannelFuture getChannelFuture() {
        return channelFuture;
    }

    public void setChannelFuture(ChannelFuture channelFuture) {
        this.channelFuture = channelFuture;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        try {
            bulid(port);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    public void bulid(int port) throws Exception{
        
        try {
            
            //(1)boss辅助客户端的tcp连接请求  worker负责与客户端之前的读写操作
            //(2)配置客户端的channel类型
            //(3)配置TCP参数,握手字符串长度设置
            //(4)TCP_NODELAY是一种算法,为了充分利用带宽,尽可能发送大块数据,减少充斥的小块数据,true是关闭,可以保持高实时性,若开启,减少交互次数,但是时效性相对无法保证
            //(5)开启心跳包活机制,就是客户端、服务端建立连接处于ESTABLISHED状态,超过2小时没有交流,机制会被启动
            //(6)netty提供了2种接受缓存区分配器,FixedRecvByteBufAllocator是固定长度,但是拓展,AdaptiveRecvByteBufAllocator动态长度
            //(7)绑定I/O事件的处理类,WebSocketChildChannelHandler中定义
            serverBootstrap.group(bossGroup,workerGroup)
                           .channel(NioServerSocketChannel.class)
                           .option(ChannelOption.SO_BACKLOG, 1024)
                           .option(ChannelOption.TCP_NODELAY, true)
                           .childOption(ChannelOption.SO_KEEPALIVE, true)
                           .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(592048))
                           .childHandler(childChannelHandler);
            
            System.out.println("成功");
            channelFuture = serverBootstrap.bind(port).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            // TODO: handle exception
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            
        }

    }
    
    //执行之后关闭
    @PreDestroy
    public void close(){
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
        
        
    }
}
View Code

六、WebSocketChildChannelHandler

五里面的private ChannelHandler childChannelHandler; 注入的就是这个类,注入配置在后面的xml中,用处在五代码里注解了

package com.netty.server;

import javax.annotation.Resource;

import org.springframework.stereotype.Component;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;

@Component
public class WebSocketChildChannelHandler extends ChannelInitializer<SocketChannel>{

    @Resource(name = "webSocketServerHandler")
    private ChannelHandler webSocketServerHandler;
    




    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        // TODO Auto-generated method stub
        ch.pipeline().addLast("http-codec", new HttpServerCodec());
        ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
        ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
        ch.pipeline().addLast("handler",webSocketServerHandler);
    }

}
View Code

七、WebSocketServerHandler

websocket具体的业务处理,六中的private ChannelHandler webSocketServerHandler;,注入的就是这个类

package com.netty.server;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;
import com.netty.constant.Constant;
import com.netty.manage.ManageMessage;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;

/**
 * websocket 具体业务处理方法
 * 
 * */

@Component
@Sharable
public class WebSocketServerHandler extends BaseWebSocketServerHandler{

    
    private WebSocketServerHandshaker handshaker;
    
    
    /**
     * 当客户端连接成功,返回个成功信息
     * */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // TODO Auto-generated method stub
        push(ctx, "连接成功");
    }

    /**
     * 当客户端断开连接
     * */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // TODO Auto-generated method stub
        for(String key:Constant.pushCtxMap.keySet()){
            
            if(ctx.equals(Constant.pushCtxMap.get(key))){
                //从连接池内剔除
                System.out.println(Constant.pushCtxMap.size());
                System.out.println("剔除"+key);
                Constant.pushCtxMap.remove(key);
                System.out.println(Constant.pushCtxMap.size());
            }
            
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // TODO Auto-generated method stub
        ctx.flush();
    }


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        // TODO Auto-generated method stub
        
        //http://xxxx
        if(msg instanceof FullHttpRequest){
            
            handleHttpRequest(ctx,(FullHttpRequest)msg);
        }else if(msg instanceof WebSocketFrame){
        //ws://xxxx    
            handlerWebSocketFrame(ctx,(WebSocketFrame)msg);
        }
        
        
        
    }
    
    
    public void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception{
        
        //关闭请求
        if(frame instanceof CloseWebSocketFrame){
            
            handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain());
            
            return;
        }
        //ping请求
        if(frame instanceof PingWebSocketFrame){
            
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            
            return;
        }
        //只支持文本格式,不支持二进制消息
        if(!(frame instanceof TextWebSocketFrame)){
            
            throw new Exception("仅支持文本格式");
        }
        
        //客服端发送过来的消息
        
        
        
        
         String request = ((TextWebSocketFrame) frame).text();
         System.out.println("服务端收到:" + request);
         
         JSONObject jsonObject = null;
         
            try
            {
                jsonObject = JSONObject.parseObject(request);
                System.out.println(jsonObject.toJSONString());
            }
            catch (Exception e)
            {
            }
         if (jsonObject == null){
             
             return;
         }
            
          String id = (String) jsonObject.get("id");
          String type = (String) jsonObject.get("type");   
         
          //根据id判断是否登陆或者是否有权限等
          
          if(id!=null && !"".equals("id")  &&  type!=null && !"".equals("type")){
              
              //用户是否有权限
              boolean idAccess = true;  
              //类型是否符合定义
              boolean typeAccess = true; 
              
              if(idAccess && typeAccess){
                  System.out.println("添加到连接池:"+request);
                  Constant.pushCtxMap.put(request,ctx);
                  Constant.aaChannelGroup.add(ctx.channel());
              }
              
              
              //根据type 存放进对于的channel池,这里就简单实现,直接放进aaChannelGroup,方便群发
              
              
              
          }


        
        
    }
    //第一次请求是http请求,请求头包括ws的信息
    public void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req){
        
        
        if(!req.decoderResult().isSuccess()){
            
            sendHttpResponse(ctx,req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
        
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws:/"+ctx.channel()+ "/websocket",null,false);
        handshaker = wsFactory.newHandshaker(req);
        
        
        if(handshaker == null){
            //不支持
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        }else{
            
            handshaker.handshake(ctx.channel(), req);
        }
        
    }
    
    
    public static void sendHttpResponse(ChannelHandlerContext ctx,FullHttpRequest req,DefaultFullHttpResponse res){
        
        
        // 返回应答给客户端
        if (res.status().code() != 200)
        {
            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
        }

        // 如果是非Keep-Alive,关闭连接
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!isKeepAlive(req) || res.status().code() != 200)
        {
            f.addListener(ChannelFutureListener.CLOSE);
        }
        
    }
    
    private static boolean isKeepAlive(FullHttpRequest req)
    {
        return false;
    }

    
    //异常处理,netty默认是关闭channel
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        // TODO Auto-generated method stub
        //输出日志
         cause.printStackTrace();
         ctx.close();
    }
    
    
    
}
View Code

八、BaseWebSocketServerHandler

把推送方法单独抽象出来,方便调用

package com.netty.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;


/**
 * 发消息方式 抽象出来
 * 
 * */
public abstract class BaseWebSocketServerHandler extends SimpleChannelInboundHandler<Object>{

    
    /**
     * 推送单个
     * 
     * */
    public static final void push(final ChannelHandlerContext ctx,final String message){
        
        TextWebSocketFrame tws = new TextWebSocketFrame(message);
        ctx.channel().writeAndFlush(tws);
        
    }
    /**
     * 群发
     * 
     * */
    public static final void push(final ChannelGroup ctxGroup,final String message){
        
        TextWebSocketFrame tws = new TextWebSocketFrame(message);
        ctxGroup.writeAndFlush(tws);
        
    }
}
View Code

九、配置

application-netty.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:cache="http://www.springframework.org/schema/cache"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-4.1.xsd  
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-4.1.xsd">


    <bean id="bossGroup" class="io.netty.channel.nio.NioEventLoopGroup"></bean>
    <bean id="workerGroup" class="io.netty.channel.nio.NioEventLoopGroup"></bean>
    <bean id="serverBootstrap" class="io.netty.bootstrap.ServerBootstrap" scope="prototype"></bean>
    <bean id="webSocketServer" class="com.netty.server.WebSocketServer">

        <property name="port" value="${websocket.server.port}"></property>
        <property name="childChannelHandler" ref="webSocketChildChannelHandler" />
    </bean>
</beans>
View Code

application-beans.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
           http://www.springframework.org/schema/context
           http://www.springframework.org/schema/context/spring-context-3.0.xsd">
           
    
           
     <bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>
     
     <context:annotation-config />
     
     <context:component-scan base-package="com.netty">  
          <!-- 排除vst.back目录下Controller的service注入 -->         
         <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
    </context:component-scan>
    
    <bean id="configProperties"
        class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="locations">
            <list>
                <value>classpath*:conf/websocket.properties</value>
            </list>
        </property>
    </bean>
    
   <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PreferencesPlaceholderConfigurer">
        <property name="properties" ref="configProperties" />
   </bean> 
     

     
     
     
     
     
</beans>
View Code

springmvc.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:p="http://www.springframework.org/schema/p" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="  
http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd 
http://www.springframework.org/schema/context 
http://www.springframework.org/schema/context/spring-context-4.1.xsd 
http://www.springframework.org/schema/aop 
http://www.springframework.org/schema/aop/spring-aop-4.1.xsd 
http://www.springframework.org/schema/tx 
http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd">

    <description>Spring-web MVC配置</description>
    
    
    
    <bean
        class="org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter">
        <property name="messageConverters">
            <list>
                <bean
                    class="org.springframework.http.converter.StringHttpMessageConverter">
                    <property name="supportedMediaTypes">
                        <list>
                            <value>text/html;charset=UTF-8</value>
                        </list>
                    </property>
                </bean>

            </list>
        </property>
    </bean>
    
    <mvc:annotation-driven />
    
    
    <context:component-scan base-package="com.netty.controller">
        <context:include-filter type="annotation"
            expression="org.springframework.stereotype.Controller" />
        <context:exclude-filter type="annotation"
            expression="org.springframework.stereotype.Service" />
    </context:component-scan>


    
    
</beans>
View Code

websocket.properties

websocket.server.port=7397

十、客户端

用的jsp页面,具体连接逻辑什么的看需要写

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">  
<html xmlns="http://www.w3.org/1999/xhtml">  
<head>  
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />  
<title></title>  
</head>  
  </head>  
  <script type="text/javascript">  
  var socket;   
  //实际生产中,id可以从session里面拿用户id
  var id  = Math.random().toString(36).substr(2);
  if(!window.WebSocket){  
      window.WebSocket = window.MozWebSocket;  
  }  
   
  if(window.WebSocket){  
      socket = new WebSocket("ws://localhost:7397");  
        
      socket.onmessage = function(event){             
            appendln("receive:" + event.data);  
      };  
   
      socket.onopen = function(event){  
            appendln("WebSocket is opened");  
            login(); 
      };  
   
      socket.onclose = function(event){  
            appendln("WebSocket is closed");  
      };  
  }else{  
        alert("WebSocket is not support");  
  }  

    
  function appendln(text) {  
    var ta = document.getElementById('responseText');  
    ta.value += text + "
";  
  }  
    
  function login(){
      console.log("aaaaaa");
      var date={"id":id,"type":"aa"};
      var login = JSON.stringify(date);
      socket.send(login);
      
  
  }  
        
  </script>  
  <body>  
    <form onSubmit="return false;">  
        <input type = "text" name="message" value="hello"/>  
        <br/><br/>  
 
        <textarea id="responseText" style=" 800px;height: 300px;"></textarea>  
    </form>  
  </body>  
</html> 
View Code

十一、源码

源码

原文地址:https://www.cnblogs.com/ggwow/p/7994213.html