using ServiceStack; using System; using System.Collections.Generic; using System.Configuration; using System.IO; using System.Linq; using System.Text; using System.Threading.Tasks; using Topshelf; namespace By56.RabbitMQService { /// <summary> ///安装:By56.RabbitMQService.exe install ///启动:By56.RabbitMQService.exe start ///卸载:By56.RabbitMQService.exe uninstall /// </summary> class Program { static void Main(string[] args) { log4net.Config.XmlConfigurator.Configure(new FileInfo(AppDomain.CurrentDomain.BaseDirectory + @"Config\Log4Net.config")); var globalConfig = System.IO.File.ReadAllText(AppDomain.CurrentDomain.BaseDirectory + @"GlobalConfig.json"); AppConfigInfo.GlobalConfig = globalConfig.FromJson<GlobalConfigModel>(); HostFactory.Run(x => { x.Service<RabbitMQClient>(s => { s.ConstructUsing(name => new RabbitMQClient()); s.WhenStarted(sv => sv.Start()); s.WhenStopped(sv => sv.Stop()); }); x.RunAsLocalSystem(); x.SetDescription(AppConfigInfo.GlobalConfig.ServiceConfig.ServiceDesc); x.SetDisplayName(AppConfigInfo.GlobalConfig.ServiceConfig.DisplayName); x.SetServiceName(AppConfigInfo.GlobalConfig.ServiceConfig.ServiceName); }); } } }
客户端代码:
using By56.Tools.Common; using RabbitMQ.Client; using RabbitMQ.Client.Events; using RestSharp; using ServiceStack; using ServiceStack.Caching; using ServiceStack.Redis; using System; using System.Collections.Generic; using System.Configuration; using System.Linq; using System.Text; using System.Threading; namespace By56.RabbitMQService { public class RabbitMQClient { ConnectionFactory factory; IConnection connection; IModel channel; ICacheClient cacheClient; public RabbitMQClient() { if (AppConfigInfo.GlobalConfig.RabbitMQConfig.Port != 0) { factory.Port = AppConfigInfo.GlobalConfig.RabbitMQConfig.Port; } if (AppConfigInfo.GlobalConfig.CacheConfig.IsRedis) { cacheClient = new ServiceStack.Redis.RedisClient(AppConfigInfo.GlobalConfig.CacheConfig.Host, AppConfigInfo.GlobalConfig.CacheConfig.Port, AppConfigInfo.GlobalConfig.CacheConfig.Password); } else { cacheClient = new MemoryCacheClient(); } } /// <summary> /// 消息处理 /// </summary> public void RabbitMQRecieve() { this.channel.QueueDeclare(queue: AppConfigInfo.GlobalConfig.RabbitMQConfig.QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null); this.channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { string message = string.Empty; try { var body = ea.Body; message = Encoding.UTF8.GetString(body); LogHelper.Info("处理消息内容:" + message); //to do RequestModel requestModel = new RequestModel() { Data = message }; //BaseClientHelper clientHelper = new BaseClientHelper(AppConfigInfo.GlobalConfig.UrlConfig.BaseUrl); //if (AppConfigInfo.GlobalConfig.UrlConfig.Timeout != 0) //{ // clientHelper.Client.Timeout = AppConfigInfo.GlobalConfig.UrlConfig.Timeout; //} //BaseResponse<bool> res = null; //switch (AppConfigInfo.GlobalConfig.UrlConfig.Method.ToUpper()) //{ // case "GET": // res = clientHelper.ExecuteGetSingle<bool>(AppConfigInfo.GlobalConfig.UrlConfig.Resource, requestModel); // break; // case "POST": // res = clientHelper.ExecutePostSingle<bool>(AppConfigInfo.GlobalConfig.UrlConfig.Resource, requestModel); // break; //} BaseResponse<bool> res = CallInterfaceInfo(requestModel); if (res == null) { LogHelper.Info("处理失败。接口返回null值。内容:" + message); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } else { if (res.IsOK) { LogHelper.Info("处理成功。内容:" + message); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } else { LogHelper.Error("处理失败。内容:" + message + " 返回消息:" + res.ToJson()); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } } } catch (Exception ex) { LogHelper.Error("处理异常。内容:" + message + Environment.NewLine + ex.Message + Environment.NewLine + ex.Source + Environment.NewLine + ex.StackTrace); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } }; channel.BasicConsume(queue: AppConfigInfo.GlobalConfig.RabbitMQConfig.QueueName, autoAck: false, consumer: consumer); } /// <summary> /// 调用远程接口处理消息 /// </summary> /// <param name="requestModel"></param> /// <returns></returns> protected BaseResponse<bool> CallInterfaceInfo(RequestModel requestModel) { BaseResponse<bool> res = null; BaseClientHelper clientHelper = new BaseClientHelper(AppConfigInfo.GlobalConfig.UrlConfig.BaseUrl); if (AppConfigInfo.GlobalConfig.UrlConfig.Timeout != 0) { clientHelper.Client.Timeout = AppConfigInfo.GlobalConfig.UrlConfig.Timeout; } IDictionary<string, string> headerObj = new Dictionary<string, string>(); headerObj.Add("X-USER-LOGINNAME", "0"); switch (AppConfigInfo.GlobalConfig.UrlConfig.Method.ToUpper()) { case "GET": res = clientHelper.ExecuteGetSingle<bool>(AppConfigInfo.GlobalConfig.UrlConfig.Resource, requestModel, headerObj); break; case "POST": res = clientHelper.ExecutePostSingle<bool>(AppConfigInfo.GlobalConfig.UrlConfig.Resource, requestModel, headerObj); break; } return res; } public void Start() { //to do Start this.factory = new ConnectionFactory() { HostName = AppConfigInfo.GlobalConfig.RabbitMQConfig.HostName, UserName = AppConfigInfo.GlobalConfig.RabbitMQConfig.UserName, Password = AppConfigInfo.GlobalConfig.RabbitMQConfig.Password }; this.connection = factory.CreateConnection(); this.channel = connection.CreateModel(); RabbitMQRecieve(); LogHelper.Info("服务启动"); } public void Stop() { //to do Stop if (this.channel != null) { this.channel.Dispose(); this.channel = null; } if (this.connection != null) { this.connection.Dispose(); } if (this.factory != null) { this.factory = null; } LogHelper.Info("服务停止"); } } }