Kafka.net使用编程入门

最近研究分布式消息队列,分享下!

首先zookeeper  和 kafka 压缩包 解压 并配置好!

我本机zookeeper环境配置如下:

D:WorksoftwareApacheZookeeper3confzoo.cfg

以下是kafka的配置

D:WorksoftwareApachekafka2.11configserver.properties

我已经加了path环境变量,没加的话需要到zookeeper对应bin目录下执行zkServer

然后执行cmd命令:

 

结果:

 然后打开第二个dos窗口,我没加环境变量path,执行kafka命令如下:

 

 

重头戏来了,开始kafka C#客户端处理:

首先引用kafka-net.dll,可以用vs2013的nuget下载,

以下是Prorame.cs

 1 class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             const string topicName = "test";
 6             var options = new KafkaOptions(new Uri("http://localhost:9092"))
 7             {
 8                 Log = new ConsoleLog()
 9             };
10             
11             Task.Run(() =>
12             {
13                 var consumer = new Consumer(new ConsumerOptions(topicName, new BrokerRouter(options)) { Log = new ConsoleLog() });
14                 foreach (var data in consumer.Consume())
15                 {
16                     Console.WriteLine("Response: PartitionId={0},Offset={1} :Value={2}", data.Meta.PartitionId, data.Meta.Offset, data.Value.ToUtf8String());
17                 }
18             });
19 
20             //创建一个生产者发消息
21             var producer = new Producer(new BrokerRouter(options))
22             {
23                 BatchSize = 100,
24                 BatchDelayTime = TimeSpan.FromMilliseconds(2000)
25             };
26 
27             Console.WriteLine("打出一条消息按 enter...");
28             while (true)
29             {
30                 var message = Console.ReadLine();
31                 if (message == "quit") break;
32 
33                 if (string.IsNullOrEmpty(message))
34                 {
35                     //
36                     SendRandomBatch(producer, topicName, 200);
37                 }
38                 else
39                 {
40                     producer.SendMessageAsync(topicName, new[] { new Message(message) });
41                 }
42             }
43 
44             //释放资源
45             using (producer)
46             {
47 
48             }
49         }
50         private static async void SendRandomBatch(Producer producer, string topicName, int count)
51         {
52             //发送多个消息
53             var sendTask = producer.SendMessageAsync(topicName, Enumerable.Range(0, count).Select(x => new Message(x.ToString())));
54 
55             Console.WriteLine("传送了 #{0} messages.  Buffered:{1} AsyncCount:{2}", count, producer.BufferCount, producer.AsyncCount);
56 
57             var response = await sendTask;
58 
59             Console.WriteLine("已完成批量发送: {0}. Buffered:{1} AsyncCount:{2}", count, producer.BufferCount, producer.AsyncCount);
60             foreach (var result in response.OrderBy(x => x.PartitionId))
61             {
62                 Console.WriteLine("主题:{0} PartitionId:{1} Offset:{2}", result.Topic, result.PartitionId, result.Offset);
63             }
64 
65         }
66     }

结果:

闲的蛋疼,随便研究一些好东西,.net环境太封闭,每个.net程序员都要扩展视野,技术交流,本人QQ827937686

原文地址:https://www.cnblogs.com/Wulex/p/5578339.html