C# 使用Topshelf 构建 基于 window 服务的 RabbitMQ消费端

using ServiceStack;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Topshelf;

namespace By56.RabbitMQService
{
    /// <summary>
    ///安装:By56.RabbitMQService.exe install
    ///启动:By56.RabbitMQService.exe start
    ///卸载:By56.RabbitMQService.exe uninstall
    /// </summary>
    class Program
    {
        static void Main(string[] args)
        {
            log4net.Config.XmlConfigurator.Configure(new FileInfo(AppDomain.CurrentDomain.BaseDirectory + @"Config\Log4Net.config"));

            var globalConfig = System.IO.File.ReadAllText(AppDomain.CurrentDomain.BaseDirectory + @"GlobalConfig.json");

            AppConfigInfo.GlobalConfig = globalConfig.FromJson<GlobalConfigModel>();
            
            HostFactory.Run(x =>
            {
                x.Service<RabbitMQClient>(s =>
                {
                    s.ConstructUsing(name => new RabbitMQClient());
                    s.WhenStarted(sv => sv.Start());
                    s.WhenStopped(sv => sv.Stop());
                });
                x.RunAsLocalSystem();
                
                x.SetDescription(AppConfigInfo.GlobalConfig.ServiceConfig.ServiceDesc);
                x.SetDisplayName(AppConfigInfo.GlobalConfig.ServiceConfig.DisplayName);
                x.SetServiceName(AppConfigInfo.GlobalConfig.ServiceConfig.ServiceName);
            });
        }
    }
}

  客户端代码:

using By56.Tools.Common;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RestSharp;
using ServiceStack;
using ServiceStack.Caching;
using ServiceStack.Redis;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using System.Text;
using System.Threading;

namespace By56.RabbitMQService
{
    public class RabbitMQClient
    {
        ConnectionFactory factory;
        IConnection connection;
        IModel channel;


        ICacheClient cacheClient;


        public RabbitMQClient()
        {

            if (AppConfigInfo.GlobalConfig.RabbitMQConfig.Port != 0)
            {
                factory.Port = AppConfigInfo.GlobalConfig.RabbitMQConfig.Port;
            }

            if (AppConfigInfo.GlobalConfig.CacheConfig.IsRedis)
            {
                cacheClient = new ServiceStack.Redis.RedisClient(AppConfigInfo.GlobalConfig.CacheConfig.Host, AppConfigInfo.GlobalConfig.CacheConfig.Port, AppConfigInfo.GlobalConfig.CacheConfig.Password);
            }
            else
            {
                cacheClient = new MemoryCacheClient();
            }

        }

        /// <summary>
        /// 消息处理
        /// </summary>
        public void RabbitMQRecieve()
        {
            this.channel.QueueDeclare(queue: AppConfigInfo.GlobalConfig.RabbitMQConfig.QueueName,
                                 durable: true,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            this.channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

            Console.WriteLine(" [*] Waiting for messages.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                string message = string.Empty;

                try
                {
                    var body = ea.Body;
                    message = Encoding.UTF8.GetString(body);

                    LogHelper.Info("处理消息内容:" + message);

                    //to do
                    RequestModel requestModel = new RequestModel()
                    {
                        Data = message
                    };

                    //BaseClientHelper clientHelper = new BaseClientHelper(AppConfigInfo.GlobalConfig.UrlConfig.BaseUrl);
                    //if (AppConfigInfo.GlobalConfig.UrlConfig.Timeout != 0)
                    //{
                    //    clientHelper.Client.Timeout = AppConfigInfo.GlobalConfig.UrlConfig.Timeout;
                    //}

                    //BaseResponse<bool> res = null;
                    //switch (AppConfigInfo.GlobalConfig.UrlConfig.Method.ToUpper())
                    //{
                    //    case "GET":
                    //        res = clientHelper.ExecuteGetSingle<bool>(AppConfigInfo.GlobalConfig.UrlConfig.Resource, requestModel);
                    //        break;
                    //    case "POST":
                    //        res = clientHelper.ExecutePostSingle<bool>(AppConfigInfo.GlobalConfig.UrlConfig.Resource, requestModel);
                    //        break;
                    //}
                    BaseResponse<bool> res = CallInterfaceInfo(requestModel);

                    if (res == null)
                    {
                        LogHelper.Info("处理失败。接口返回null值。内容:" + message);
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    }
                    else
                    {
                        if (res.IsOK)
                        {
                            LogHelper.Info("处理成功。内容:" + message);
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                        }
                        else
                        {
                            LogHelper.Error("处理失败。内容:" + message + " 返回消息:" + res.ToJson());
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                        }
                    }
                }
                catch (Exception ex)
                {
                    LogHelper.Error("处理异常。内容:" + message + Environment.NewLine +
                        ex.Message + Environment.NewLine + ex.Source + Environment.NewLine + ex.StackTrace);

                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                }

            };
            channel.BasicConsume(queue: AppConfigInfo.GlobalConfig.RabbitMQConfig.QueueName,
                                 autoAck: false,
                                 consumer: consumer);


        }

        /// <summary>
        /// 调用远程接口处理消息
        /// </summary>
        /// <param name="requestModel"></param>
        /// <returns></returns>
        protected BaseResponse<bool> CallInterfaceInfo(RequestModel requestModel)
        {
            BaseResponse<bool> res = null;

            BaseClientHelper clientHelper = new BaseClientHelper(AppConfigInfo.GlobalConfig.UrlConfig.BaseUrl);
            if (AppConfigInfo.GlobalConfig.UrlConfig.Timeout != 0)
            {
                clientHelper.Client.Timeout = AppConfigInfo.GlobalConfig.UrlConfig.Timeout;
            }

            IDictionary<string, string> headerObj = new Dictionary<string, string>();
            headerObj.Add("X-USER-LOGINNAME", "0");
            switch (AppConfigInfo.GlobalConfig.UrlConfig.Method.ToUpper())
            {
                case "GET":
                    res = clientHelper.ExecuteGetSingle<bool>(AppConfigInfo.GlobalConfig.UrlConfig.Resource, requestModel, headerObj);
                    break;
                case "POST":
                    res = clientHelper.ExecutePostSingle<bool>(AppConfigInfo.GlobalConfig.UrlConfig.Resource, requestModel, headerObj);
                    break;
            }

            return res;
        }

        public void Start()
        {
            //to do Start
            this.factory = new ConnectionFactory()
            {
                HostName = AppConfigInfo.GlobalConfig.RabbitMQConfig.HostName,
                UserName = AppConfigInfo.GlobalConfig.RabbitMQConfig.UserName,
                Password = AppConfigInfo.GlobalConfig.RabbitMQConfig.Password
            };
            this.connection = factory.CreateConnection();
            this.channel = connection.CreateModel();

            RabbitMQRecieve();

            LogHelper.Info("服务启动");
        }
        public void Stop()
        {
            //to do Stop
            if (this.channel != null)
            {
                this.channel.Dispose();
                this.channel = null;
            }
            if (this.connection != null)
            {
                this.connection.Dispose();
            }
            if (this.factory != null)
            {
                this.factory = null;
            }

            LogHelper.Info("服务停止");
        }
    }
}
原文地址:https://www.cnblogs.com/TallkingIsEasying/p/15181389.html