ActiveMQ 复杂类型的发布与订阅

很久没po文章了,但是看到.Net里关于ActiveMQ发送复杂类型的文章确实太少了,所以贴出来和大家分享

发布:

    //消息发布
    public class Publisher
    {
        private IConnection _connection;
        private ISession _session;
        private IMessageProducer _producer;

        /// <summary>
        /// 初始化
        /// </summary>
        /// <param name="brokerUrl">广播地址</param>
        /// <param name="queueDestination">队列目标</param>
        public void Init(string brokerUrl = "tcp://localhost:61616", string queueDestination = "nms.msg.topic")
        {
            try
            {
                IConnectionFactory connectionFactory = new ConnectionFactory(brokerUrl);
                _connection = connectionFactory.CreateConnection();
                _connection.Start();
                _session = _connection.CreateSession();
                IDestination destination = _session.GetTopic(queueDestination);
                _producer = _session.CreateProducer(destination);
            }
            catch (Exception e)
            {
                Log.Error($"activemq初始化异常:{e.InnerException.ToString()}");
            }
        }

        public void Close()
        {
            _session.Close();
            _connection.Close();
        }

        /// <summary>
        /// 发送普通字符串消息
        /// </summary>
        /// <param name="text">字符串</param>
        public void SendText(string text)
        {
            ITextMessage objecto = _producer.CreateTextMessage(text);
            _producer.Send(objecto);
        }

        /// <summary>
        /// 发送对象消息
        /// </summary>
        /// <param name="mapMessages">MapMessage对象</param>
        /// <returns></returns>
        public bool SendObject(List<MapMessage> mapMessages)
        {
            bool result = true;
            if (mapMessages == null || mapMessages.Count < 0) return false;
            foreach (var mapMessage in mapMessages)
            {
                var message = _producer.CreateMapMessage();
                ActiveCommon.SetMapMessage<MapMessage>(message, mapMessage);
                try
                {
                    _producer.Send(message);
                    result = true;
                }
                catch (Exception e)
                {
                    Log.Error($"activemq发送美好异常:{e.InnerException.ToString()}");
                    result = false;
                }
            }
            return result;
        }

        /// <summary>
        /// 获取对象XML结果
        /// </summary>
        /// <param name="m">对象</param>
        /// <returns></returns>
        public string GetXmlStr(object m)
        {
            return _producer.CreateXmlMessage(m).Text;
        }
    }

  

订阅:

    //消息订阅
    class Subsriber
    {
        private IConnection _connection;
        private ISession _session;
        private IMessageConsumer _consumer;

        /// <summary>
        /// 初始化
        /// </summary>
        /// <param name="brokerUrl">广播地址</param>
        /// <param name="queueDestination">队列目标</param>
        public void Init(string brokerUrl = "tcp://localhost:61616", string queueDestination = "nms.msg.topic")
        {
            try
            {
                IConnectionFactory connectionFactory = new ConnectionFactory(brokerUrl);
                _connection = connectionFactory.CreateConnection();
                _connection.Start();
                _session = _connection.CreateSession();
                IDestination destination = _session.GetTopic(queueDestination);
                _consumer = _session.CreateConsumer(destination);
                _consumer.Listener += _consumer_Listener;

            }
            catch (Exception e)
            {
                Log.Error($"activemq初始化异常:{e.InnerException.ToString()}");
            }

        }

        private void _consumer_Listener(IMessage message)
        {
            var model = ActiveCommon.GetMapMessageByIMapMessage((IMapMessage)message);
            Log.Infor($"订阅接收:{_session.CreateXmlMessage(model).Text}");
        }
    }

复杂类型处理:

    public class ActiveCommon
    {
        //设置Message的Body信息
        public static void SetMapMessage<T>(IMapMessage mapMessage, T messages)
        {
            if (mapMessage == null || object.Equals(messages, null))
            {
                return;
            }

            foreach (var propertyInfo in messages.GetType().GetProperties())
            {
                if (propertyInfo.PropertyType.Name == "String")
                    mapMessage.Body.SetString(propertyInfo.Name, Convert.ToString(propertyInfo.GetValue(messages, null)));
                else
                    mapMessage.Body.SetInt(propertyInfo.Name, Convert.ToInt16(propertyInfo.GetValue(messages, null)));
            }
        }

        public static MapMessage GetMapMessageByIMapMessage(IMapMessage mapMessage)
        {
            if (mapMessage == null)
            {
                return null;
            }

            var MapMessage = new MapMessage();
            foreach (var propertyInfo in MapMessage.GetType().GetProperties())
            {
                propertyInfo.SetValue(MapMessage, mapMessage.Body[propertyInfo.Name], null);
            }

            return MapMessage;
        }

        public static T GetMapMessageByIMapMessage<T>(IMapMessage mapMessage, T MapMessage)
        {
            if (mapMessage == null || object.Equals(MapMessage, null))
            {
                return default(T);
            }

            foreach (var propertyInfo in MapMessage.GetType().GetProperties())
            {
                propertyInfo.SetValue(MapMessage, mapMessage.Body[propertyInfo.Name.ToUpper()], null);
            }

            return MapMessage;
        }
    }

重点是跨站点和跨服务器传输的时候,需要通过Message的Body去设置传输参数

原文地址:https://www.cnblogs.com/leeolevis/p/5796232.html