NetMq学习--发布订阅(一)

基于NeqMq 4.0.0-rc5版本

发布端:
using (var publisher = new PublisherSocket()) { publisher.Bind("tcp://*:5556"); var rng = new Random(); while (true) { Stopwatch sw = new Stopwatch(); sw.Start(); Console.WriteLine(); Console.WriteLine(); var list = GetList<Messages>(sql); var wards = list.GroupBy(s => s.所属病区).Select(s => s.Key).ToList(); foreach (var item in wards) { publisher.SendMoreFrame(item);//发送信封标识,用于消息过滤 var msg = string.Join(";", list.FindAll(s => s.所属病区 == item).Select(s => s.Message)); publisher.SendFrame(msg); //Console.WriteLine($"{item}"); } //int zipcode = rng.Next(0, 99999); //int temperature = rng.Next(-80, 135); //int relhumidity = rng.Next(0, 90); //Console.WriteLine($"{zipcode} {temperature} {relhumidity}"); //publisher.SendMoreFrame("AAA");//发送信封标识,用于消息过滤 //publisher.SendFrame($"{zipcode} {temperature} {relhumidity}"); sw.Stop(); Console.WriteLine("用时:" + sw.ElapsedMilliseconds); Thread.Sleep(3000); } }
订阅端:
  using (var subscriber = new SubscriberSocket())
            {
                subscriber.Connect("tcp://172.16.131.222:5556");
                //subscriber.Subscribe(zipToSubscribeTo.ToString(CultureInfo.InvariantCulture));
                //订阅多个信封  执行多次Subscribe即可
                //订阅全部消息   SubscribeToAnyTopic();方法  或 Subscribe("")内参数传空
                subscriber.Subscribe(wards[r.Next(0, wards.Count - 1)]);//订阅消息,根据消息信封过滤消息
                //subscriber.Subscribe("11384");
                //subscriber.SubscribeToAnyTopic();
                subscriber.Subscribe(wards[r.Next(0, wards.Count - 1)]);
                subscriber.Subscribe(wards[r.Next(0, wards.Count - 1)]);

                while (true)
                {
                    //subscriber.SkipFrame();//跳过消息信封,只接收消息内容
                    string results = subscriber.ReceiveFrameString();
                    Console.WriteLine(results);
                }
            }
原文地址:https://www.cnblogs.com/liyangLife/p/6179081.html