DotNetty实现WebSocket的简单使用

 工作中项目是物联网项目的,管理平台又是bs架构。

如果用 Socket 的话,Web 端还需要转发,就全部统一采用了 WebSocket 。

DotNet 平台上的 WebSocket 实现有很多种,这里介绍一下用 DotNetty 来实现的方式。

只完成基本使用功能:

  管理连接、

  服务端接收消息、

  服务端主动向指定连接发送消息、

  服务端主动端口某连接、

  客户端连接断开响应。

本地环境 .net core 2.2

1.创建控制台应用

2.安装NuGet包

DotNetty.Buffers

DotNetty.Codecs

DotNetty.Codecs.Http

DotNetty.Common

DotNetty.Handlers

DotNetty.Transport

DotNetty.Transport.Libuv

3.创建辅助解析的工具类

 新建类库 :Examples.Common

同步引用 NuGet 包。并安装以下几个。

Microsoft.Extensions.Configuration

Microsoft.Extensions.Configuration.FileExtensions

Microsoft.Extensions.Configuration.Json

Microsoft.Extensions.Logging.Console

 Examples.Common.csproj

<Project Sdk="Microsoft.NET.Sdk">

  <PropertyGroup>
    <TargetFramework>netcoreapp2.2</TargetFramework>
  </PropertyGroup>

  <ItemGroup>
    <PackageReference Include="DotNetty.Buffers" Version="0.6.0" />
    <PackageReference Include="DotNetty.Codecs" Version="0.6.0" />
    <PackageReference Include="DotNetty.Codecs.Http" Version="0.6.0" />
    <PackageReference Include="DotNetty.Common" Version="0.6.0" />
    <PackageReference Include="DotNetty.Handlers" Version="0.6.0" />
    <PackageReference Include="DotNetty.Transport" Version="0.6.0" />
    <PackageReference Include="DotNetty.Transport.Libuv" Version="0.6.0" />
    <PackageReference Include="Microsoft.Extensions.Configuration" Version="2.2.0" />
    <PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.2.0" />
    <PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.2.0" />
    <PackageReference Include="Microsoft.Extensions.Logging.Console" Version="1.1.1" />
  </ItemGroup>

</Project>
View Code

安装完了,记得在主控制台程序里面添加对该类库的引用。

4.添加解析辅助类

创建 ExampleHelper.cs

namespace Examples.Common
{
    using System;
    using DotNetty.Common.Internal.Logging;
    using Microsoft.Extensions.Configuration;
    using Microsoft.Extensions.Logging.Console;

    public static class ExampleHelper
    {
        static ExampleHelper()
        {
            Configuration = new ConfigurationBuilder()
                .SetBasePath(ProcessDirectory)
                .AddJsonFile("appsettings.json")
                .Build();
        }

        public static string ProcessDirectory
        {
            get
            {
#if NETSTANDARD1_3
                return AppContext.BaseDirectory;
#else
                return AppDomain.CurrentDomain.BaseDirectory;
#endif
            }
        }

        public static IConfigurationRoot Configuration { get; }

        public static void SetConsoleLogger() => InternalLoggerFactory.DefaultFactory.AddProvider(new ConsoleLoggerProvider((s, level) => true, false));
    }
}
View Code

创建 ServerSettings.cs

namespace Examples.Common
{
    public static class ServerSettings
    {
        public static bool IsSsl
        {
            get
            {
                string ssl = ExampleHelper.Configuration["ssl"];
                return !string.IsNullOrEmpty(ssl) && bool.Parse(ssl);
            }
        }

        public static int Port => int.Parse(ExampleHelper.Configuration["port"]);

        public static bool UseLibuv
        {
            get
            {
                string libuv = ExampleHelper.Configuration["libuv"];
                return !string.IsNullOrEmpty(libuv) && bool.Parse(libuv);
            }
        }
    }
}
View Code

创建 ClientSettings.cs

namespace Examples.Common
{
    using System.Net;

    public class ClientSettings
    {
        public static bool IsSsl
        {
            get
            {
                string ssl = ExampleHelper.Configuration["ssl"];
                return !string.IsNullOrEmpty(ssl) && bool.Parse(ssl);
            }
        }

        public static IPAddress Host => IPAddress.Parse(ExampleHelper.Configuration["host"]);

        public static int Port => int.Parse(ExampleHelper.Configuration["port"]);

        public static int Size => int.Parse(ExampleHelper.Configuration["size"]);

        public static bool UseLibuv
        {
            get
            {
                string libuv = ExampleHelper.Configuration["libuv"];
                return !string.IsNullOrEmpty(libuv) && bool.Parse(libuv);
            }
        }
    }
}
View Code

 5.完成WebSocket的服务端代码

 JSON 配置文件 appsettings.json

设置文件属性,始终复制。

{
  "port": "8080",
  "libuv": "true",
  "ssl": "false"
}

程序启动 Program.cs

namespace DotNettyWebSocket
{
    using System;
    using System.IO;
    using System.Net;
    using System.Runtime;
    using System.Runtime.InteropServices;
    using System.Security.Cryptography.X509Certificates;
    using System.Threading.Tasks;
    using DotNetty.Codecs.Http;
    using DotNetty.Common;
    using DotNetty.Handlers.Tls;
    using DotNetty.Transport.Bootstrapping;
    using DotNetty.Transport.Channels;
    using DotNetty.Transport.Channels.Sockets;
    using DotNetty.Transport.Libuv;
    using Examples.Common;

    class Program
    {
        static Program()
        {
            ResourceLeakDetector.Level = ResourceLeakDetector.DetectionLevel.Disabled;
        }

        static async Task RunServerAsync()
        {
            Console.WriteLine(
                $"
{RuntimeInformation.OSArchitecture} {RuntimeInformation.OSDescription}"
                + $"
{RuntimeInformation.ProcessArchitecture} {RuntimeInformation.FrameworkDescription}"
                + $"
Processor Count : {Environment.ProcessorCount}
");

            bool useLibuv = ServerSettings.UseLibuv;
            Console.WriteLine("Transport type : " + (useLibuv ? "Libuv" : "Socket"));

            if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
            {
                GCSettings.LatencyMode = GCLatencyMode.SustainedLowLatency;
            }

            Console.WriteLine($"Server garbage collection : {(GCSettings.IsServerGC ? "Enabled" : "Disabled")}");
            Console.WriteLine($"Current latency mode for garbage collection: {GCSettings.LatencyMode}");
            Console.WriteLine("
");

            /*
             Netty 提供了许多不同的 EventLoopGroup 的实现用来处理不同的传输。
             在这个例子中我们实现了一个服务端的应用,因此会有2个 NioEventLoopGroup 会被使用。
             第一个经常被叫做‘boss’,用来接收进来的连接。第二个经常被叫做‘worker’,用来处理已经被接收的连接,一旦‘boss’接收到连接,就会把连接信息注册到‘worker’上。
             如何知道多少个线程已经被使用,如何映射到已经创建的 Channel上都需要依赖于 IEventLoopGroup 的实现,并且可以通过构造函数来配置他们的关系。
             */

            // 主工作线程组,设置为1个线程
            // Boss线程:由这个线程池提供的线程是boss种类的,用于创建、连接、绑定socket, (有点像门卫)然后把这些socket传给worker线程池。
            // 在服务器端每个监听的socket都有一个boss线程来处理。在客户端,只有一个boss线程来处理所有的socket。
            IEventLoopGroup bossGroup;

            // 子工作线程组,----默认为内核数*2的线程数
            // Worker线程:Worker线程执行所有的异步I/O,即处理操作
            IEventLoopGroup workGroup;
            if (useLibuv)
            {
                var dispatcher = new DispatcherEventLoopGroup();
                bossGroup = dispatcher;
                workGroup = new WorkerEventLoopGroup(dispatcher);
            }
            else
            {
                bossGroup = new MultithreadEventLoopGroup(1);
                workGroup = new MultithreadEventLoopGroup();
            }

            X509Certificate2 tlsCertificate = null;
            if (ServerSettings.IsSsl)
            {
                tlsCertificate = new X509Certificate2(Path.Combine(ExampleHelper.ProcessDirectory, "dotnetty.com.pfx"), "password");
            }
            try
            {
                // 声明一个服务端Bootstrap,每个Netty服务端程序,都由ServerBootstrap控制,通过链式的方式组装需要的参数
                // ServerBootstrap 启动NIO服务的辅助启动类,负责初始话netty服务器,并且开始监听端口的socket请求
                var bootstrap = new ServerBootstrap();

                // 设置主和工作线程组
                bootstrap.Group(bossGroup, workGroup);

                if (useLibuv)
                {
                    // 申明服务端通信通道为TcpServerChannel
                    // 设置非阻塞,用它来建立新accept的连接,用于构造serversocketchannel的工厂类
                    bootstrap.Channel<TcpServerChannel>();
                    if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)
                        || RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
                    {
                        bootstrap
                            .Option(ChannelOption.SoReuseport, true)
                            .ChildOption(ChannelOption.SoReuseaddr, true);
                    }
                }
                else
                {
                    bootstrap.Channel<TcpServerSocketChannel>();
                }

                // ChildChannelHandler 对出入的数据进行的业务操作,其继承ChannelInitializer
                bootstrap
                    // 设置网络IO参数等
                    .Option(ChannelOption.SoBacklog, 8192)
                    /*
                    * ChannelInitializer 是一个特殊的处理类,他的目的是帮助使用者配置一个新的 Channel。
                    * 也许你想通过增加一些处理类比如DiscardServerHandler 来配置一个新的 Channel 或者其对应的ChannelPipeline 来实现你的网络程序。
                    * 当你的程序变的复杂时,可能你会增加更多的处理类到 pipline 上,然后提取这些匿名类到最顶层的类上。
                    */
                    // 设置工作线程参数
                    .ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
                    {
                        /*
                        * 工作线程连接器是设置了一个管道,服务端主线程所有接收到的信息都会通过这个管道一层层往下传输,
                        * 同时所有出栈的消息 也要这个管道的所有处理器进行一步步处理。
                        */
                        IChannelPipeline pipeline = channel.Pipeline;
                        if (tlsCertificate != null)
                        {
                            pipeline.AddLast(TlsHandler.Server(tlsCertificate));
                        }
                        pipeline.AddLast(new HttpServerCodec());
                        pipeline.AddLast(new HttpObjectAggregator(65536));

                        //业务handler ,这里是实际处理业务的Handler
                        //pipeline.AddLast(new WebSocketServerHandler());
                        //自己写的业务类
                        pipeline.AddLast(new SendFunction());
                    }));

                // bootstrap绑定到指定端口的行为 就是服务端启动服务,同样的Serverbootstrap可以bind到多个端口
                int port = ServerSettings.Port;
                IChannel bootstrapChannel = await bootstrap.BindAsync(IPAddress.Loopback, port);
                // 似乎没有成功阻塞 而是连接服务端后 就马上执行下一句了 导致连接一次就关闭   (是成功进入 ChannelActive 判断的)也就是无法保持长连接
                // 添加长连接即可,参考EchoClient

                Console.WriteLine("Open your web browser and navigate to "
                    + $"{(ServerSettings.IsSsl ? "https" : "http")}"
                    + $"://127.0.0.1:{port}/");
                Console.WriteLine("Listening on "
                    + $"{(ServerSettings.IsSsl ? "wss" : "ws")}"
                    + $"://127.0.0.1:{port}/websocket");
                Console.ReadLine();

                // 关闭服务
                await bootstrapChannel.CloseAsync();
            }
            finally
            {
                // 释放工作组线程
                workGroup.ShutdownGracefullyAsync().Wait();
                bossGroup.ShutdownGracefullyAsync().Wait();
            }
        }

        static void Main() => RunServerAsync().Wait();
    }
}
View Code

业务处理类 SendFunction.cs

using DotNetty.Buffers;
using DotNetty.Codecs.Http;
using DotNetty.Codecs.Http.WebSockets;
using DotNetty.Common.Utilities;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Groups;
using Examples.Common;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using static DotNetty.Codecs.Http.HttpResponseStatus;
using static DotNetty.Codecs.Http.HttpVersion;

namespace DotNettyWebSocket
{
    public class SendFunction : SimpleChannelInboundHandler<object> 
    {
        const string WebsocketPath = "/websocket";

        WebSocketServerHandshaker handshaker;

        static volatile IChannelGroup groups;

        public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush();

        //客户端连接异常
        public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
        {
            WebSocketClose(context);
            Console.WriteLine(" SendFunction Exception: " + exception);
            context.CloseAsync();
        }

        protected override void ChannelRead0(IChannelHandlerContext ctx, object msg)
        {
            if (msg is IFullHttpRequest request)
            {
                this.HandleHttpRequest(ctx, request);
            }
            else if (msg is WebSocketFrame frame)
            {
                this.HandleWebSocketFrame(ctx, frame);
            }
        }

        void HandleHttpRequest(IChannelHandlerContext ctx, IFullHttpRequest req)
        {
            if (!req.Result.IsSuccess)
            {
                SendHttpResponse(ctx, req, new DefaultFullHttpResponse(Http11, BadRequest));
                return;
            }

            if (!Equals(req.Method, HttpMethod.Get))
            {
                SendHttpResponse(ctx, req, new DefaultFullHttpResponse(Http11, Forbidden));
                return;
            }

            var wsFactory = new WebSocketServerHandshakerFactory(
                GetWebSocketLocation(req), null, true, 5 * 1024 * 1024);
            this.handshaker = wsFactory.NewHandshaker(req);
            if (this.handshaker == null)
            {
                WebSocketServerHandshakerFactory.SendUnsupportedVersionResponse(ctx.Channel);
            }
            else
            {
                this.handshaker.HandshakeAsync(ctx.Channel, req);
            }

            base.HandlerAdded(ctx);
            IChannelGroup g = groups;
            if (g == null)
            {
                lock (this)
                {
                    if (groups == null)
                    {
                        g = groups = new DefaultChannelGroup(ctx.Executor);
                    }
                }
            }
            g.Add(ctx.Channel);

            //主动向当前连接的客户端发送信息
            TextWebSocketFrame tst = new TextWebSocketFrame($"欢迎{ctx.Channel.RemoteAddress}加入111.");
            TextWebSocketFrame tstId = new TextWebSocketFrame($"欢迎{ctx.Channel.Id}加入222.");
            groups.WriteAndFlushAsync(tst);
            groups.WriteAndFlushAsync(tstId);

            //保存连接对象
            lock (ConnChannelList)
            {
                if (ConnChannelList.Count > 0)
                {
                    if (ConnChannelList.ContainsKey(ctx.Channel.Id.ToString()))
                    {
                        ConnChannelList.Remove(ctx.Channel.Id.ToString());
                    }
                }
                ConnChannelList.Add(ctx.Channel.Id.ToString(), ctx.Channel.Id);
                Console.WriteLine($"当前在线数:{ConnChannelList.Count}");
            }


            Console.WriteLine("---------首次到达----------");
            Console.WriteLine("连接成功");
            Console.WriteLine($"欢迎{ctx.Channel.RemoteAddress}加入");
            Console.WriteLine("---------首次到达----------");
        }

        public static volatile Dictionary<string, IChannelId> ConnChannelList = new Dictionary<string, IChannelId>();
        void HandleWebSocketFrame(IChannelHandlerContext ctx, WebSocketFrame frame)
        {
            //客户端关闭连接
            if (frame is CloseWebSocketFrame)
            {
                WebSocketClose(ctx);

                Console.WriteLine($"连接关闭     {ctx.Channel.RemoteAddress}");
                this.handshaker.CloseAsync(ctx.Channel, (CloseWebSocketFrame)frame.Retain());

                return;
            }

            if (frame is PingWebSocketFrame)
            {
                ctx.WriteAsync(new PongWebSocketFrame((IByteBuffer)frame.Content.Retain()));
                return;
            }

            if (frame is TextWebSocketFrame textFrame)
            {
                Console.WriteLine("---------消息到达----------");
                Console.WriteLine("Received from client: " + frame.Content.ToString(Encoding.UTF8));
                Console.WriteLine("---------消息到达----------");


                //发送信息到指定连接
                string[] strArg = textFrame.Text().Split(',');
                if (strArg.Length > 1)
                {
                    lock (ConnChannelList)
                    {
                        if (ConnChannelList.ContainsKey(strArg[0]))
                        {
                            var connChannel = groups.Find(ConnChannelList[strArg[0]]);//null

                            if (connChannel != null)
                            {
                                //主动向当前连接的客户端发送信息
                                TextWebSocketFrame tst = new TextWebSocketFrame(strArg[1]);
                                connChannel.WriteAndFlushAsync(tst);

                                //服务端断开指定客户端连接
                                if (strArg[1] == "close")
                                {
                                    connChannel.CloseAsync();
                                }
                            }
                        }
                    }
                }

                ctx.WriteAsync(frame.Retain());

                return;
            }

            if (frame is BinaryWebSocketFrame)
            {
                ctx.WriteAsync(frame.Retain());
            }
        }

        static void SendHttpResponse(IChannelHandlerContext ctx, IFullHttpRequest req, IFullHttpResponse res)
        {
            if (res.Status.Code != 200)
            {
                IByteBuffer buf = Unpooled.CopiedBuffer(Encoding.UTF8.GetBytes(res.Status.ToString()));
                res.Content.WriteBytes(buf);
                buf.Release();
                HttpUtil.SetContentLength(res, res.Content.ReadableBytes);
            }

            Task task = ctx.Channel.WriteAndFlushAsync(res);
            if (!HttpUtil.IsKeepAlive(req) || res.Status.Code != 200)
            {
                task.ContinueWith((t, c) => ((IChannelHandlerContext)c).CloseAsync(),
                    ctx, TaskContinuationOptions.ExecuteSynchronously);
            }
        }

        static string GetWebSocketLocation(IFullHttpRequest req)
        {
            bool result = req.Headers.TryGet(HttpHeaderNames.Host, out ICharSequence value);
            Debug.Assert(result, "Host header does not exist.");
            string location = value.ToString() + WebsocketPath;

            if (ServerSettings.IsSsl)
            {
                return "wss://" + location;
            }
            else
            {
                return "ws://" + location;
            }
        }


        /// <summary>
        /// 关闭ws连接
        /// </summary>
        /// <param name="ctx"></param>
        static void WebSocketClose(IChannelHandlerContext ctx)
        {
            lock (ConnChannelList)
            {
                string channelId = ctx.Channel.Id.ToString();
                if (ConnChannelList.ContainsKey(channelId))
                {
                    ConnChannelList.Remove(channelId);
                }
                Console.WriteLine($"当前在线数:{ConnChannelList.Count}");
            }
        }

    }
}
View Code

6.测试HTML脚本

<!DOCTYPE html>
<html>

<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>Document</title>
</head>

<body>

    <input type="text" id="message">
    <div id="msgBox"></div>

    <input type="button" onclick="sendText()" value="发送信息">

    <script>

        var ws = '';

        window.onload = function () {
            connect();
        }

        function connect() {
            var address = "ws://127.0.0.1:8080/websocket";
            // address = "wss://127.0.0.1:8080/websocket";
            ws = new WebSocket(address);

            ws.onopen = function (e) {

            };

            //收到信息时
            ws.onmessage = function (e) {
                var Html = '<p>' + e.data + '</p>';
                document.getElementById("msgBox").innerHTML += Html;
            };

            //发生错误时
            ws.onerror = function (e) {

            };

            //连接关闭时
            ws.onclose = function (e) {
                document.getElementById("msgBox").innerHTML += "<p>与服务器的连接已断开。</p>";
            };

        }

        function sendText() {
            ws.send(document.getElementById("message").value);
        }

    </script>

</body>

</html>
View Code

7.运行测试

 运行第一个HTML页面

运行第二个

发送消息

给第二个HTML发送消息,要拿一些特征。

关闭第二个页面

基础功能都已经完成。

在 Ubuntu 上测试也 OK。

从 SuperWebSocket 换到 DotNetty 主要原因就是想上 Linux 。

 

原文地址:https://www.cnblogs.com/miaowacao/p/11060464.html