Springboot + Netty + WebSocket 实现简单的聊天

  简单的实现聊天,发送至服务器端之后由服务器转发给其他在线的用户。

1. pom

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>cn.qz</groupId>
    <artifactId>xm</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>blog</name>
    <description>blog-server</description>
    <packaging>jar</packaging>

    <properties>
        <java.version>1.8</java.version>
        <!--<maven-jar-plugin.version>3.2.0</maven-jar-plugin.version> -->
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13</version>
            <scope>test</scope>
        </dependency>

        <!-- spring-boot整合mybatis-plus -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>2.3</version>
        </dependency>
        <dependency>
            <groupId>com.github.pagehelper</groupId>
            <artifactId>pagehelper</artifactId>
            <version>5.1.2</version>
        </dependency>

        <!-- spring-boot整合mysql -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>

        <!-- 引入 redis 依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!-- spring-boot整合druid -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.22</version>
        </dependency>

        <!-- 使用事务需要引入这个包 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>

        <!-- 引入 spring aop 依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>

        <!-- commons工具包 -->

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-collections4</artifactId>
            <version>4.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.4</version>
        </dependency>
        <dependency>
            <groupId>commons-collections</groupId>
            <artifactId>commons-collections</artifactId>
            <version>3.2</version>
        </dependency>
        <!-- 阿里的fastjson用于手动转JSON -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.56</version>
        </dependency>
        <!--httpclient相关包 -->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpmime</artifactId>
            <version>4.3.1</version>
        </dependency>

        <!--tika解析文本内容 -->
        <dependency>
            <groupId>org.apache.tika</groupId>
            <artifactId>tika-parsers</artifactId>
            <version>1.17</version>
        </dependency>

        <!--POI -->
        <dependency>
            <groupId>org.apache.poi</groupId>
            <artifactId>poi</artifactId>
            <version>3.16</version>
        </dependency>
        <dependency>
            <groupId>org.apache.poi</groupId>
            <artifactId>poi-ooxml</artifactId>
            <version>3.16</version>
        </dependency>

        <!-- springdata jpa依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.jsoup/jsoup -->
        <dependency>
            <groupId>org.jsoup</groupId>
            <artifactId>jsoup</artifactId>
            <version>1.12.2</version>
        </dependency>

        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>

        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.15.0</version>
        </dependency>

        <!--netty-->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.11.Final</version>
        </dependency>
        <!-- poi依赖 <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi</artifactId>
            <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.poi</groupId>
            <artifactId>poi-ooxml</artifactId> <version>RELEASE</version> </dependency> -->
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <dependencies>
                    <!-- spring热部署 -->
                    <!-- 该依赖在此处下载不下来,可以放置在build标签外部下载完成后再粘贴进plugin中 -->
                    <dependency>
                        <groupId>org.springframework</groupId>
                        <artifactId>springloaded</artifactId>
                        <version>1.2.6.RELEASE</version>
                    </dependency>
                </dependencies>
                <configuration>
                    <fork>true</fork>
                </configuration>
            </plugin>

            <!-- 要将源码放上去,需要加入这个插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-source-plugin</artifactId>
                <executions>
                    <execution>
                        <id>attach-sources</id>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!-- 执行Junit测试(测试所有类) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.10</version>
                <configuration>
                    <includes>
                        <!--<include>****Test.java</include>-->
                        <include>***</include>
                    </includes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

核心是netty-all, 其他依赖按需引入即可

2. 主要类信息

1. 服务端程序

package com.xm.ggn.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NettyServer {

    private final int port;

    public NettyServer(int port) {
        this.port = port;
    }

    public void start() throws Exception {
        // 修改bossGroup的数量,2线程足够用
        EventLoopGroup bossGroup = new NioEventLoopGroup(2);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap sb = new ServerBootstrap();
            sb.option(ChannelOption.SO_BACKLOG, 1024);
            sb.group(workerGroup, bossGroup) // 绑定线程池
                    .channel(NioServerSocketChannel.class) // 指定使用的channel
                    .localAddress(this.port)// 绑定监听端口
                    .childHandler(new MyChannelInitializer());
            ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定
            log.info(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress());
            cf.channel().closeFuture().sync(); // 关闭服务器通道
        } finally {
            workerGroup.shutdownGracefully().sync(); // 释放线程池资源
            bossGroup.shutdownGracefully().sync();
        }
    }
}

2. Initializer

package com.xm.ggn.netty;


import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        log.info("收到新的客户端连接: {}", socketChannel.toString());
        // websocket协议本身是基于http协议的,所以这边也要使用http解编码器
        socketChannel.pipeline().addLast(new HttpServerCodec());
        // 以块的方式来写的处理器(添加对于读写大数据流的支持)
        socketChannel.pipeline().addLast(new ChunkedWriteHandler());
        // 对httpMessage进行聚合
        socketChannel.pipeline().addLast(new HttpObjectAggregator(8192));

        // ================= 上述是用于支持http协议的 =============

        // websocket 服务器处理的协议,用于给指定的客户端进行连接访问的路由地址
        socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));

        // 添加自己的handler
        socketChannel.pipeline().addLast(new MyWebSocketHandler());
    }
}

3.handler

package com.xm.ggn.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;

import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 自定义服务器端处理handler,继承SimpleChannelInboundHandler,处理WebSocket 连接数据
 */
@Slf4j
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    // 用户id=>channel示例
    // 可以通过用户的唯一标识保存用户的channel
    // 这样就可以发送给指定的用户
    public static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();

    /**
     * 每当服务端收到新的客户端连接时,客户端的channel存入ChannelGroup列表中,并通知列表中其他客户端channel
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // 获取连接的channel
        Channel incomming = ctx.channel();
        //通知所有已经连接到服务器的客户端,有一个新的通道加入
        /*for(Channel channel:channelGroup){
            channel.writeAndFlush("[SERVER]-"+incomming.remoteAddress()+"加入
");
        }*/
        channelGroup.add(incomming);
    }

    /**
     * 每当服务端断开客户端连接时,客户端的channel从ChannelGroup中移除,并通知列表中其他客户端channel
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        //获取连接的channel
        /*Channel incomming = ctx.channel();
        for(Channel channel:channelGroup){
            channel.writeAndFlush("[SERVER]-"+incomming.remoteAddress()+"离开
");
        }*/
        //从服务端的channelGroup中移除当前离开的客户端
        channelGroup.remove(ctx.channel());

        //从服务端的channelMap中移除当前离开的客户端
        Collection<Channel> col = channelMap.values();
        while (true == col.contains(ctx.channel())) {
            col.remove(ctx.channel());
            log.info("netty客户端连接删除成功!");
        }

    }


    /**
     * 每当从服务端读到客户端写入信息时,将信息转发给其他客户端的Channel.
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        log.info("netty客户端收到服务器数据, 客户端地址: {}, msg: {}", ctx.channel().remoteAddress(), msg.text());
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        //消息处理类
        message(ctx, msg.text(), date);

        //channelGroup.writeAndFlush( new TextWebSocketFrame(msg.text()));
    }

    /**
     * 当服务端的IO 抛出异常时被调用
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel incoming = ctx.channel();
        log.error("SimpleChatClient:" + incoming.remoteAddress() + "异常", cause);
        //异常出现就关闭连接
        ctx.close();
    }

    //消息处理类
    public void message(ChannelHandlerContext ctx, String msg, String date) {
        try {
            // 消息转发给在线的其他用户
            Channel channel = ctx.channel();
            for (Channel tmpChannel : channelGroup) {
                if (!tmpChannel.equals(channel)) {
                    String sendedMsg = date + ":" + msg;
                    log.info("服务器转发消息,客户端地址: {}, msg: {}", ctx.channel().remoteAddress(), sendedMsg);
                    tmpChannel.writeAndFlush(new TextWebSocketFrame(sendedMsg));
                }
            }
        } catch (Exception e) {
            log.error("message 处理异常, msg: {}, date: {}", msg, date, e);
        }
    }
}

4. Springboot主启动类: 也可以将启动nettyServer代码移动至监听Spring容器启动事件类中

package com.xm.ggn;

import com.xm.ggn.netty.NettyServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@ServletComponentScan("com")
@EntityScan(basePackages = {"com"})
@EnableScheduling
// 允许通过AopContext.currentProxy() 获取代理类
@EnableAspectJAutoProxy(proxyTargetClass = true, exposeProxy = true)
@EnableAsync
public class BlogApplication {

    public static void main(String[] args) {
        SpringApplication.run(BlogApplication.class, args);

        // 启动netty服务器
        try {
            new NettyServer(8091).start();
        } catch (Exception e) {
            System.out.println("NettyServerError:" + e.getMessage());
        }
    }
}

5. 前端就用HTML界面简单的测试

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
    <title>Netty-Websocket</title>
    <script type="text/javascript">
        var socket;
        if(!window.WebSocket){
            window.WebSocket = window.MozWebSocket;
        }
        if(window.WebSocket){
            socket = new WebSocket("ws://127.0.0.1:8091/ws");
            socket.onmessage = function(event){
                var ta = document.getElementById('responseText');
                ta.value += event.data+"
";
            };
            socket.onopen = function(event){
                var ta = document.getElementById('responseText');
                ta.value = "Netty-WebSocket服务器。。。。。。连接  
";
            };
            socket.onclose = function(event){
                var ta = document.getElementById('responseText');
                ta.value = "Netty-WebSocket服务器。。。。。。关闭 
";
            };
        }else{
            alert("您的浏览器不支持WebSocket协议!");
        }
        function send(message){
            if(!window.WebSocket){return;}
            if(socket.readyState == WebSocket.OPEN){
                socket.send(message);
            }else{
                alert("WebSocket 连接没有建立成功!");
            }
 
        }
 
    </script>
</head>
<body>
<form onSubmit="return false;">
    <label>TEXT</label><input type="text" name="message" value="这里输入消息" 
    style=" 1024px;height: 100px;"/> <br />
    <br /> <input type="button" value="发送ws消息"
                  onClick="send(this.form.message.value)" />
    <hr color="black" />
    <h3>服务端返回的应答消息</h3>
    <textarea id="responseText" style=" 1024px;height: 300px;"></textarea>
</form>
</body>
</html>

3. 测试

1. 启动boot应用

2. 前端用两个浏览器打开

3. 查看服务器端控制台:

2021-03-02 18:14:00.644 | cmdb - INFO | main | com.xm.ggn.netty.NettyServer | line:32 - class com.xm.ggn.netty.NettyServer 启动正在监听: /0:0:0:0:0:0:0:0:8091
2021-03-02 18:14:07.304 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyChannelInitializer | line:17 - 收到新的客户端连接: [id: 0x94e8a9ec, L:/127.0.0.1:8091 - R:/127.0.0.1:65288]
2021-03-02 18:14:18.861 | cmdb - INFO | nioEventLoopGroup-4-2 | com.xm.ggn.netty.MyChannelInitializer | line:17 - 收到新的客户端连接: [id: 0xaf71586a, L:/127.0.0.1:8091 - R:/127.0.0.1:65369]

3. 两个控制台分别发几条信息

查看两个界面的服务器端消息:

(1) 第一个

 (2) 第二个:

 4. 查看服务器端日志

2021-03-02 18:14:00.644 | cmdb - INFO | main | com.xm.ggn.netty.NettyServer | line:32 - class com.xm.ggn.netty.NettyServer 启动正在监听: /0:0:0:0:0:0:0:0:8091
2021-03-02 18:14:07.304 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyChannelInitializer | line:17 - 收到新的客户端连接: [id: 0x94e8a9ec, L:/127.0.0.1:8091 - R:/127.0.0.1:65288]
2021-03-02 18:14:18.861 | cmdb - INFO | nioEventLoopGroup-4-2 | com.xm.ggn.netty.MyChannelInitializer | line:17 - 收到新的客户端连接: [id: 0xaf71586a, L:/127.0.0.1:8091 - R:/127.0.0.1:65369]
2021-03-02 18:15:20.947 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:82 - netty客户端收到服务器数据, 客户端地址: /127.0.0.1:65288, msg: 我说是什么
2021-03-02 18:15:20.948 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:113 - 服务器转发消息,客户端地址: /127.0.0.1:65288, msg: 2021-03-02 18:15:20:我说是什么
2021-03-02 18:15:29.342 | cmdb - INFO | nioEventLoopGroup-4-2 | com.xm.ggn.netty.MyWebSocketHandler | line:82 - netty客户端收到服务器数据, 客户端地址: /127.0.0.1:65369, msg: 我说不知道
2021-03-02 18:15:29.342 | cmdb - INFO | nioEventLoopGroup-4-2 | com.xm.ggn.netty.MyWebSocketHandler | line:113 - 服务器转发消息,客户端地址: /127.0.0.1:65369, msg: 2021-03-02 18:15:29:我说不知道
2021-03-02 18:15:34.745 | cmdb - INFO | nioEventLoopGroup-4-2 | com.xm.ggn.netty.MyWebSocketHandler | line:82 - netty客户端收到服务器数据, 客户端地址: /127.0.0.1:65369, msg: 我说不知道个鬼
2021-03-02 18:15:34.746 | cmdb - INFO | nioEventLoopGroup-4-2 | com.xm.ggn.netty.MyWebSocketHandler | line:113 - 服务器转发消息,客户端地址: /127.0.0.1:65369, msg: 2021-03-02 18:15:34:我说不知道个鬼
2021-03-02 18:15:44.819 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:82 - netty客户端收到服务器数据, 客户端地址: /127.0.0.1:65288, msg: 你说身子
2021-03-02 18:15:44.820 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:113 - 服务器转发消息,客户端地址: /127.0.0.1:65288, msg: 2021-03-02 18:15:44:你说身子

  接下来就是基于上面的代码简单的实现基于vue的聊天设计。

补充: 关于WebSocketServerProtocolHandler 这个处理器用于处理WebSocket请求套路

  验证URL是否是WebSocke的URL,主要就是判断创建时候传进去的这个"/ws"。默认是根据equals来匹配,也可以通过参数来设置进行startWith 匹配,如下方法:

io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandshakeHandler#isNotWebSocketPath

    private boolean isNotWebSocketPath(FullHttpRequest req) {
        return checkStartsWith ? !req.uri().startsWith(websocketPath) : !req.uri().equals(websocketPath);
    }

(1) 第一种:

socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));

  上面这中实际调了重载构造方法传递的checkStartsWith  为false

(2) 第二种: 也可以直接调用参数设置checkStartsWith  为true

socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10, true, true));

对应的构造方法是:

    public WebSocketServerProtocolHandler(String websocketPath, String subprotocols,
            boolean allowExtensions, int maxFrameSize, boolean allowMaskMismatch, boolean checkStartsWith) {
        this.websocketPath = websocketPath;
        this.subprotocols = subprotocols;
        this.allowExtensions = allowExtensions;
        maxFramePayloadLength = maxFrameSize;
        this.allowMaskMismatch = allowMaskMismatch;
        this.checkStartsWith = checkStartsWith;
    }

 补充: socket建立连接的时候我们希望获取到用户的标识信息,然后将用户信息和channel维护起来

1. 调整MyChannelInitializer 中的handler

package com.xm.ggn.netty;


import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        log.info("收到新的客户端连接: {}", socketChannel.toString());
        // websocket协议本身是基于http协议的,所以这边也要使用http解编码器
        socketChannel.pipeline().addLast(new HttpServerCodec());
        // 以块的方式来写的处理器(添加对于读写大数据流的支持)
        socketChannel.pipeline().addLast(new ChunkedWriteHandler());
        // 对httpMessage进行聚合
        socketChannel.pipeline().addLast(new HttpObjectAggregator(8192));

        // ================= 上述是用于支持http协议的 =============

        // 添加自己的handler
        socketChannel.pipeline().addLast(new MyWebSocketHandler());

        // websocket 服务器处理的协议,用于给指定的客户端进行连接访问的路由地址
        // 这个主要就是验证URL是否是WebSocke的URL,主要就是判断创建时候传进去的这个"/ws"。 下面四个参数的是比较路径相等io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandshakeHandler.isNotWebSocketPath
//        socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));
        // 也可以用下面参数用于比较startWith
        socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10, true, true));
    }
}

2. 修改MyWebSocketHandler 重写channelRead 方法,注意不是channelRead0 方法

    /**
     * 处理建立连接时候请求(用于拿参数)
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (null != msg && msg instanceof FullHttpRequest) {
            log.info("连接请求,准备提取参数");
            //转化为http请求
            FullHttpRequest request = (FullHttpRequest) msg;
            //拿到请求地址
            String uri = request.uri();
            log.info("uri: " + uri);
            if (StringUtils.isNotBlank(uri)) {
                String path = StringUtils.substringBefore(uri, "?");
                log.info("path: {}", path);
                String username = StringUtils.substringAfterLast(path, "/");
                log.info(username);
                channelMap.put(username, ctx.channel());
                log.info("channelMap: {}", channelMap);
            }

            //重新设置请求地址为WebSocketServerProtocolHandler 匹配的地址(如果WebSocketServerProtocolHandler 的时候checkStartsWith   为true则不需要设置,会根据前缀匹配)
//            request.setUri("/ws");
        }

        //接着建立请求
        super.channelRead(ctx, msg);
    }

3. 调整前端请求连接地址增加用户姓名

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
    <title>Netty-Websocket</title>
    <script type="text/javascript">
        var socket;
        if(!window.WebSocket){
            window.WebSocket = window.MozWebSocket;
        }
        if(window.WebSocket){
            socket = new WebSocket("ws://127.0.0.1:8091/ws/admin?username=admin");
            socket.onmessage = function(event){
                var ta = document.getElementById('responseText');
                ta.value += event.data+"
";
            };
            socket.onopen = function(event){
                var ta = document.getElementById('responseText');
                ta.value = "Netty-WebSocket服务器。。。。。。连接  
";
            };
            socket.onclose = function(event){
                var ta = document.getElementById('responseText');
                ta.value = "Netty-WebSocket服务器。。。。。。关闭 
";
            };
        }else{
            alert("您的浏览器不支持WebSocket协议!");
        }
        function send(message){
            if(!window.WebSocket){return;}
            if(socket.readyState == WebSocket.OPEN){
                socket.send(message);
            }else{
                alert("WebSocket 连接没有建立成功!");
            }
 
        }
 
    </script>
</head>
<body>
<form onSubmit="return false;">
    <label>TEXT</label><input type="text" name="message" value="这里输入消息" 
    style=" 1024px;height: 100px;"/> <br />
    <br /> <input type="button" value="发送ws消息"
                  onClick="send(this.form.message.value)" />
    <hr color="black" />
    <h3>服务端返回的应答消息</h3>
    <textarea id="responseText" style=" 1024px;height: 300px;"></textarea>
</form>
</body>
</html>

4. 测试服务器端日志:(可以看到正确的拿到参数信息并且建立连接,也可以通过?传递参数)

2021-03-02 22:43:07.196 | cmdb - INFO | main | com.xm.ggn.netty.NettyServer | line:32 - class com.xm.ggn.netty.NettyServer 启动正在监听: /0:0:0:0:0:0:0:0:8091
2021-03-02 22:43:11.026 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyChannelInitializer | line:17 - 收到新的客户端连接: [id: 0xa9fd1310, L:/127.0.0.1:8091 - R:/127.0.0.1:50269]
2021-03-02 22:43:12.702 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:47 - 添加新的channel, incomming: [id: 0xa9fd1310, L:/127.0.0.1:8091 - R:/127.0.0.1:50269]
2021-03-02 22:43:12.928 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:85 - 连接请求,准备提取参数
2021-03-02 22:43:12.928 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:90 - uri: /ws/admin?username=admin
2021-03-02 22:43:12.934 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:93 - path: /ws/admin
2021-03-02 22:43:12.935 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:95 - admin
2021-03-02 22:43:12.935 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:97 - channelMap: {admin=[id: 0xa9fd1310, L:/127.0.0.1:8091 - R:/127.0.0.1:50269]}

补充:整合Vue的jwchat实现聊天 ,jwchat是基于Elementui封装的聊天插件

jwchat官网: https://codegi.gitee.io/jwchatdoc/

这里用了jwchat的两个组件: JwChat-rightbox 展示在线用户、JwChat-index 展示聊天窗口

0. 界面截图如下:

1. 前端核心vue:

<template>
  <div class="dashboard-container">
    <div class="dashboard-text">
      <el-row>
        <el-col :span="6"
          ><div class="grid-content bg-purple">
            <JwChat-rightbox :config="onlineUsers" @click="rightClick" /></div
        ></el-col>

        <!-- 如果选择了在线用户显示聊天窗口 -->
        <el-col :span="18" v-if="chatUserConfig.name != ''"
          ><div class="grid-content bg-purple-light">
            <JwChat-index
              :config="chatUserConfig"
              :showRightBox="true"
              :taleList="chatlogTaleList"
              @enter="bindEnter"
              v-model="inputMsg"
              :toolConfig="toolConfig"
              scrollType="scroll"
              @clickTalk="clickTalk"
            >
              <!-- 右边插槽 -->
              <template>
                <h3>聊天愉快</h3>
              </template>
            </JwChat-index>
          </div></el-col
        >
      </el-row>
    </div>
  </div>
</template>

<script>
// other.png 表示对方头像; myself.png 表示我自己
import { MessageBox } from "element-ui";
import {findCurrentUsername} from "@/utils/auth"

export default {
  data() {
    return {
      // 在线用户相关信息
      onlineUsers: {
        tip: "选择在线人开始聊天",
        listTip: "当前在线",
        list: [],
      },

      // 输入框内默认的消息
      inputMsg: "",
      // 聊天记录
      chatlogTaleList: [
        // {
        //   date: "2020/04/25 21:19:07",
        //   text: { text: "起床不" },
        //   mine: false,
        //   name: "留恋人间不羡仙",
        //   img: "/images/other.png",
        // }
      ],
      // 展示的工具栏配置
      toolConfig: {
        // show: ['file', 'history', 'img', ['文件1', '', '美图']],
        show: null, // 关闭所有其他组件
        showEmoji: true,
        callback: this.toolEvent,
      },

      // 正在聊天的用户的信息
      chatUserConfig: {
        img: "/images/other.png",
        name: "",
        username: "",
        fullname: "",
        dept: "大部门",
        callback: this.bindCover,
        historyConfig: {
          show: true,
          tip: "加载更多",
          callback: this.bindLoadHistory,
        },
      },

      // 当前用户信息
      currentUser: {
        username: "",
        fullname: "",
      },

      socket: new Object(),
    };
  },

  created() {
    this.listOnlineUsers();
    this.findCurrentUserInfo();
    this.webSocket();
  },

  methods: {
    webSocket() {
      // 先记录this对象
      const that = this;
      if (typeof WebSocket == "undefined") {
        MessageBox.alert("浏览器暂不支持聊天", "提示信息");
      } else {
        // 实例化socket,这里我把用户名传给了后台,使后台能判断要把消息发给哪个用户,其实也可以后台直接获取用户IP来判断并推送
        const socketUrl = "ws://127.0.0.1:8091/ws/" + findCurrentUsername();
        this.socket = new WebSocket(socketUrl);
        // 监听socket打开
        this.socket.onopen = function () {
          console.log("浏览器WebSocket已打开");
        };
        // 监听socket消息接收
        this.socket.onmessage = function (messageEvent) {
          // 转换为json对象然后添加到chatlogTaleList
          let receivedLog = JSON.parse(messageEvent.data);
          console.log(receivedLog);
          let receivedLogs = new Array();
          receivedLogs.push(receivedLog);
          receivedLogs = that.rehandleChatLogs(receivedLogs);
          if (!that.chatlogTaleList) {
            that.chatlogTaleList = new Array();  
          }
          that.chatlogTaleList = that.chatlogTaleList.concat(receivedLogs);
        };
        // 监听socket错误
        this.socket.onerror = function () {};
        // 监听socket关闭
        this.socket.onclose = function () {
          MessageBox.alert("WebSocket已关闭");
        };
      }
    },
    // 查询当前用户信息
    findCurrentUserInfo() {
      let url = "/user/getInfo";
      this.$http.post(url).then((res) => {
        this.currentUser = res.data;
      });
    },
    // 发送websocket 消息
    send(message) {
      if (!window.WebSocket) {
        return;
      }

      // 封装消息,然后发送消息
      const chatLog = {
        sendUsername: this.currentUser.username,
        sendFullname: this.currentUser.fullname,
        receiveUsername: this.chatUserConfig.username,
        receiveFullname: this.chatUserConfig.fullname,
        content: message,
        readed: false,
      };
      let socket = this.socket;
      if (socket.readyState == WebSocket.OPEN) {
        socket.send(JSON.stringify(chatLog));
      } else {
        MessageBox.alert("WebSocket 连接没有建立成功!");
      }
    },

    // 获取在线用户(有在线用户的情况下赋值到右边窗口)
    listOnlineUsers() {
      let url = "/user/listOnlineUser";
      this.$http.get(url).then((res) => {
        var onlineUsers = res.data;
        if (!onlineUsers || onlineUsers.length < 1) {
          return;
        }

        onlineUsers.forEach((element) => {
          element.name = element.username;
          element.img = "/images/cover.png";
        });
        this.onlineUsers.list = onlineUsers;
      });
    },
    // 点击在线人事件
    rightClick(type) {
      // 1.赋值给聊天人信息
      let chatUser = type.value;
      this.chatUserConfig.name = chatUser.fullname;
      this.chatUserConfig.username = chatUser.username;
      this.chatUserConfig.fullname = chatUser.fullname;
      // 2. 查询聊天记录
      let listChatlogurl = "/chat/log/list";
      let requestVO = {
        sendUsername: this.currentUser.username,
        receiveUsername: this.chatUserConfig.username,
        queryChangeRole: true,
      };
      this.$http.post(listChatlogurl, requestVO).then((res) => {
        this.chatlogTaleList = this.rehandleChatLogs(res.data);
      });
    },
    // 重新处理聊天记录, 主要是做特殊标记以及设置图像等操作
    rehandleChatLogs(chatlogs) {
      if (!chatlogs || chatlogs.length < 1) {
        return new Array();
      }

      chatlogs.forEach((element) => {
        element.date = element.createtimeStr;
        element.name = element.sendFullname;
        // 聊天内容(如下为设置文本,也可以设置其他video、图片等)
        element.text = new Object();
        element.text.text = element.content;
        if (element.sendUsername == this.currentUser.username) {
          element.mine = true;
          element.img = "/images/myself.png";
        } else {
          element.mine = false;
          element.img = "/images/other.png";
        }
      });
      return chatlogs;
    },

    // 点击左上角用户名称事件
    clickTalk(obj) {
      console.log(obj);
    },
    // 点击发送或者回车事件
    bindEnter(obj) {
      const msg = this.inputMsg;
      if (!msg) {
        MessageBox.alert("您不能发送空消息");
        return;
      }
      // 发送消息
      this.send(msg);
    },
    /**
     * @description:
     * @param {*} type 当前点击的按钮
     * @param {*} plyload 附加文件或者需要处理的数据
     * @return {*}
     */
    toolEvent(type, plyload) {
      console.log("tools", type, plyload);
    },
    /**
     * @description: 点击加载更多的回调函数
     * @param {*}
     * @return {*}
     */
    bindLoadHistory() {
      const history = new Array(3).fill().map((i, j) => {
        return {
          date: "2020/05/20 23:19:07",
          text: { text: j + new Date() },
          mine: false,
          name: "JwChat",
          img: "image/three.jpeg",
        };
      });
      let list = history.concat(this.list);
      this.list = list;
    },
    bindCover(type) {
      console.log("header", type);
    },
  },
};
</script>

涉及到主要逻辑:

(1)  点击页面进行如下操作:

1》创建WebSocket连接,创建WebSocket的时候将当前的用户名传到后端,后端记录当前用户名与连接到的netty的channel

2》 查询当前在线用户,并且展示到JwChat-rightbox 列表内。(也可以展示所有用户、如果有群聊体系展示所有的群)

(2) 点击在线用户的时候获取到在线用户的信息并记录下来,然后用ajax异步获取聊天记录(后台根据发布者和接收者按时间升序排序),然后前台根据聊天记录做对应的转换。这里是ajax获取,也可以用websocket拿,对发送的消息做处理,后端接收到消息处理对应的业务即可

(3) 聊天的时候判断是否输入有信息,有信息的时候将信息包装一下(增加发送者、接收者信息)发到后端,后端存入数据库之后再发送到对应的channel返回给前端,前端接收到后做处理完加入聊天记录数组展示在界面

2. 后端主要文件:

(1) 聊天记录表

package com.xm.ggn.bean.chat;

import com.xm.ggn.bean.AbstractSequenceEntity;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import javax.persistence.Entity;

/**
 * 聊天记录
 */
@Entity
@Getter
@Setter
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class ChatLog extends AbstractSequenceEntity {

    private String sendUsername;

    private String sendFullname;

    private String receiveUsername;

    private String receiveFullname;

    private String content;

    private String remark;

    /**
     * 是否已读
     */
    private boolean readed;
}

包含继承的通用字段:

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @TableId(type = IdType.AUTO) // 增加该注解,mybatis plus insert之后会给bean设上Id
    protected long id;

    /**
     * 创建者
     */
    @Index(name = "creator")
    @TableField(update = "%s")
    protected String creator;

    /**
     * 唯一编号
     */
    @Index(name = "uniqueCode")
    @TableField(update = "%s")
    protected String uniqueCode;

    /**
     * 创建时间
     */
    @Index(name = "createtime")
    @TableField(update = "%s")
    protected Date createtime;

(2) MyWebSocketHandler消息处理者类:

package com.xm.ggn.netty;

import com.alibaba.fastjson.JSONObject;
import com.xm.ggn.bean.chat.ChatLog;
import com.xm.ggn.service.chat.ChatLogService;
import com.xm.ggn.utils.system.SpringBootUtils;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 自定义服务器端处理handler,继承SimpleChannelInboundHandler,处理WebSocket 连接数据
 */
@Slf4j
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    // 用户id=>channel示例
    // 可以通过用户的唯一标识保存用户的channel
    // 这样就可以发送给指定的用户
    public static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();

    /**
     * 每当服务端收到新的客户端连接时,客户端的channel存入ChannelGroup列表中,并通知列表中其他客户端channel
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // 获取连接的channel
        Channel incomming = ctx.channel();
        //通知所有已经连接到服务器的客户端,有一个新的通道加入
        /*for(Channel channel:channelGroup){
            channel.writeAndFlush("[SERVER]-"+incomming.remoteAddress()+"加入
");
        }*/
        channelGroup.add(incomming);
        log.info("添加新的channel, incomming: {}", incomming);
    }

    /**
     * 每当服务端断开客户端连接时,客户端的channel从ChannelGroup中移除,并通知列表中其他客户端channel
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        //获取连接的channel
        /*Channel incomming = ctx.channel();
        for(Channel channel:channelGroup){
            channel.writeAndFlush("[SERVER]-"+incomming.remoteAddress()+"离开
");
        }*/
        //从服务端的channelGroup中移除当前离开的客户端
        channelGroup.remove(ctx.channel());

        //从服务端的channelMap中移除当前离开的客户端
        Collection<Channel> col = channelMap.values();
        while (true == col.contains(ctx.channel())) {
            col.remove(ctx.channel());
            log.info("netty客户端连接删除成功!");
        }

    }

    /**
     * 处理建立连接时候请求(用于拿参数)
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (null != msg && msg instanceof FullHttpRequest) {
            log.info("连接请求,准备提取参数");
            //转化为http请求
            FullHttpRequest request = (FullHttpRequest) msg;
            //拿到请求地址
            String uri = request.uri();
            log.info("uri: " + uri);
            if (StringUtils.isNotBlank(uri)) {
                String path = StringUtils.substringBefore(uri, "?");
                log.info("path: {}", path);
                String username = StringUtils.substringAfterLast(path, "/");
                log.info(username);
                channelMap.put(username, ctx.channel());
                log.info("channelMap: {}", channelMap);
            }

            //重新设置请求地址为WebSocketServerProtocolHandler 匹配的地址(如果WebSocketServerProtocolHandler 的时候checkStartsWith   为true则不需要设置,会根据前缀匹配)
//            request.setUri("/ws");
        }

        //接着建立请求
        super.channelRead(ctx, msg);
    }

    /**
     * 每当从服务端读到客户端写入信息时,将信息转发给其他客户端的Channel.
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        log.info("netty客户端收到服务器数据, 客户端地址: {}, msg: {}", ctx.channel().remoteAddress(), msg.text());
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        //消息处理类
        handleMessage(ctx, msg.text(), date);
        //channelGroup.writeAndFlush( new TextWebSocketFrame(msg.text()));
    }

    /**
     * 当服务端的IO 抛出异常时被调用
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel incoming = ctx.channel();
        log.error("SimpleChatClient:" + incoming.remoteAddress() + "异常", cause);
        //异常出现就关闭连接
        ctx.close();
    }

    /**
     * 处理读取到的消息
     *
     * @param ctx
     * @param msg
     * @param date
     */
    private void handleMessage(ChannelHandlerContext ctx, String msg, String date) {
        try {
            // 消息入库
            ChatLog chatLog = JSONObject.parseObject(msg, ChatLog.class);
            log.info("chatLog: {}", chatLog);
            ChatLogService chatLogService = SpringBootUtils.getBean(ChatLogService.class);
            chatLogService.insert(chatLog);

            // 消息转发给对应用户(发给发送者和接收者)
            String receiveUsername = chatLog.getReceiveUsername();
            String sendUsername = chatLog.getSendUsername();
            Set<Map.Entry<String, Channel>> entries = channelMap.entrySet();
            String key = null;
            for (Map.Entry<String, Channel> entry : entries) {
                key = entry.getKey();
                if (key.equals(receiveUsername) || key.equals(sendUsername)) {
                    log.info("服务器转发消息, key: {}, msg: {}", key, JSONObject.toJSONString(chatLog));
                    entry.getValue().writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(chatLog)));
                }
            }
        } catch (Exception e) {
            log.error("message 处理异常, msg: {}, date: {}", msg, date, e);
        }
    }
}

   这个只是简单的实现了在线用户的单聊,如果要做的好可以添加通讯录功能、群聊,其实这个就是发送消息的时候接受者是群号等。待有这方面需求的时候会继续完善。

【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】
原文地址:https://www.cnblogs.com/qlqwjy/p/14470517.html