【aspnetcore】用ConcurrentQueue实现一个简单的队列系统

第一步:定义队列服务接口

public interface ISimpleQueueServer
{
    /// <summary>
    /// 添加队列消息
    /// </summary>
    /// <param name="message">消息</param>
    /// <param name="clientName">客户端名称</param>
    /// <returns></returns>
    string Add(string message, string clientName);
}

第二步:添加队列服务接口的实现

public class SimpleQueueServer : ISimpleQueueServer
{
    /// <summary>
    /// 队列
    /// </summary>
    private static ConcurrentQueue<string> _queue = new ConcurrentQueue<string>();

    /// <summary>
    /// 日志
    /// </summary>
    private static ILogger _log;

    /// <summary>
    /// 后台任务
    /// </summary>
    private static Task _task;

    /// <summary>
    /// 连续获取队列为空的次数
    /// </summary>
    private int EmptyRepeatCount = 0;

    /// <summary>
    /// 属性,后台任务
    /// </summary>
    private Task MyTask
    {
        get
        {
            if (_task == null)
            {
                _task = new Task(MessageHandler);
            }
            return _task;
        }
    }

    /// <summary>
    /// 构造函数
    /// </summary>
    /// <param name="factory"></param>
    public SimpleQueueServer(ILoggerFactory factory)
    {
        if (_log == null)
        {
            _log = factory.CreateLogger("SimpleQueueServer");
        }

        MyTask.Start();
    }

    /// <summary>
    /// 添加消息到队列
    /// </summary>
    /// <param name="message">消息</param>
    /// <param name="clientName">发送的客户端名称</param>
    /// <returns></returns>
    public string Add(string message, string clientName = "")
    {
        try
        {
            string prefix = string.IsNullOrWhiteSpace(clientName) ? "" : $"【{clientName}】";
            _queue.Enqueue($"{prefix}{message}");
            return "OK";
        }
        catch (Exception ex)
        {
            _log.LogError(ex, "向队列添加信息失败");
            return ex.Message;
        }
    }

    /// <summary>
    /// 队列中要实现的任务
    /// </summary>
    /// <param name="threadName">线程名称,如果多</param>
    /// <returns></returns>
    private Action MessageHandler => () => 
    {
        while (true)
        {
            try
            {
                if (_queue.IsEmpty)
                {
                    _log.LogDebug($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}    队列为空");
                    Thread.Sleep(3000);
                }
                else
                {
                    if (_queue.TryDequeue(out string result))
                    {
                        _log.LogDebug($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}    获取到数据:{result}");
                    }
                    else
                    {
                        _log.LogDebug($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}    尝试从队列获取消息失败");
                    }
                    Thread.Sleep(500);
                }
            }
            catch (Exception ex)
            {
                _log.LogError(ex, "系统错误");
            }
        }
    };
}

第三步:在startup中注册服务,这里稍稍装个x,定义一个IServiceCollection扩展,让代码看起来x格稍微高点

public static class ServiceCollectionExtension
{
    public static void AddSimpleQueueServer(this IServiceCollection services)
    {
        services.AddSingleton<ISimpleQueueServer, SimpleQueueServer>((provider) =>
        {
            return new SimpleQueueServer(provider.GetService<ILoggerFactory>());
        });
    }
}

第四步:在startup的ConfigureServices中添加服务

services.AddSimpleQueueServer();

第五步:修改appsettings.Development.json文件

{
  "Logging": {
    "LogLevel": {
      "Default": "Debug",
      "System": "Warning",
      "Microsoft": "Warning"
    }
  }
}

修改System和Microsoft的日志级别,防止调试时队列显示的消息淹没在无穷无尽的info中。

=======无聊的分割线======

新建一个QueueController

namespace AspnetCoreMvcStudy.Controllers
{
    public class QueueController : Controller
    {
        private ISimpleQueueServer _server;

        public QueueController(ISimpleQueueServer server)
        {
            _server = server;
        }

        public IActionResult Index()
        {
            return View();
        }

        [HttpPost]
        public JsonResult Send(string msg, string client)
        {
            for (int i = 0; i < 100; i++)
            {
                _server.Add($"{msg}-{i}", client);
            }
            return Json(new { Code = 200 });
        }
    }
}

创建视图 Index.cshtml

@{
    ViewData["Title"] = "Index";
}

<h2>队列测试</h2>

<form id="form1" onsubmit="return false;">
    <div class="form-group">
        <label for="message">客户端名称</label>
        <input type="text" id="client" value="" />
        <label for="message">发送内容</label>
        <input type="text" id="message" value="" />
        <hr />
        <button id="btnSubmit" class="btn btn-success">发送</button>
        <span class="text-danger" id="info"></span>
    </div>
</form>

@section scripts 
{
    <script>
        $('#btnSubmit').on('click', function () {
            var that = $(this);
            that.attr('disabled', 'disabled');
            $.post('/Queue/Send', { msg: $('#message').val(), client: $('#client').val() }, function (response) {
                if (response.code == 200) {
                    $('#info').text('发送成功');
                } else {
                    $('#info').text(response.message);
                }
                that.removeAttr('disabled');
            });
        });
    </script>
}

运行程序,开整

原文地址:https://www.cnblogs.com/diwu0510/p/10059145.html