ActiveMQ c# 系列——进阶实例(三)

前言

前面介绍了基本的消费者和生产者,那么看下他们之间有什么其他的api。

正文

消费者设置等待时间

生产者生产了5条消息

改一下消费者。

static void Main(string[] args)
{

	Uri connecturl = new Uri("activemq:tcp://106.15.250.57:61616");
	IConnectionFactory factory = new ConnectionFactory(connecturl);
  
	using (IConnection connection = factory.CreateConnection())
	{
		using (ISession session = connection.CreateSession())
		{
			IDestination destination = SessionUtil.GetDestination(session, "queue://test");
			using (IMessageConsumer consumer=session.CreateConsumer(destination))
			{
				connection.Start();
				//consumer.Listener += new MessageListener(onMessage);
				while (true)
				{
					var message = (ITextMessage)consumer.Receive(TimeSpan.FromSeconds(4));
					if (message != null)
					{
						Console.WriteLine(message.NMSMessageId);
						Console.WriteLine(message.Text);
					}
				}
			}
		}
	}
}

Receive(TimeSpan.FromSeconds(4)) 表示如果4秒没有消息将不再等待。通过断点调式可以很快的展示出来。

多个消费者的情况

我们知道队列是每个生产的东西都只能消费一次,这个就不做试验了,因为这个是队列的基本原理。

那么我启动两个消费者,那么消费情况是什么样的呢?

生产者:

class Program
{
	static void Main(string[] args)
	{
		Uri connecturl = new Uri("activemq:tcp://106.15.250.57:61616");
		IConnectionFactory factory = new ConnectionFactory(connecturl);
		using (IConnection connection = factory.CreateConnection())
		{
			using (ISession session = connection.CreateSession())
			{
				IDestination destination = SessionUtil.GetDestination(session, "queue://test");
				using (IMessageProducer producer = session.CreateProducer(destination))
				{
					//producer.DeliveryMode = MsgDeliveryMode.Persistent;
					//producer.RequestTimeout = TimeSpan.FromSeconds(2);
					for (int i = 1; i < 7; i++)
					{
						ITextMessage request = session.CreateTextMessage("oh,my friend"+i);
						producer.Send(request);
					}
				}
			}
		}
	}
}

两个消费者:

class Program
{
	protected static TimeSpan receiveTimeout = TimeSpan.FromSeconds(10);
	static void Main(string[] args)
	{

		Uri connecturl = new Uri("activemq:tcp://106.15.250.57:61616");
		IConnectionFactory factory = new ConnectionFactory(connecturl);

		using (IConnection connection = factory.CreateConnection())
		{
			using (ISession session = connection.CreateSession())
			{
				IDestination destination = SessionUtil.GetDestination(session, "queue://test");
				using (IMessageConsumer consumer = session.CreateConsumer(destination))
				{
					connection.Start();
					consumer.Listener += new MessageListener(onMessage);
					Console.Read();
					//while (true)
					//{
					//    var message = (ITextMessage)consumer.Receive(TimeSpan.FromSeconds(4));
					//    if (message != null)
					//    {
					//        Console.WriteLine(message.NMSMessageId);
					//        Console.WriteLine(message.Text);
					//    }
					//}
				}
			}
		}
	}

	protected static void onMessage(IMessage receivedMsg)
	{
		ITextMessage message = receivedMsg as ITextMessage;
		if (message != null)
		{
			//查询出消息
			Console.WriteLine(message.Text);
		}
	}
}

消费情况:

平均分配

原文地址:https://www.cnblogs.com/aoximin/p/13383745.html