Azure Event Hub 技术研究系列3-Event Hub接收事件

上篇博文中,我们通过编程的方式介绍了如何将事件消息发送到Azure Event Hub:

Azure Event Hub 技术研究系列2-发送事件到Event Hub

本篇文章中,我们继续:从Event Hub中接收事件。

1. 新建控制台工程 EventHubReceiver

2. 添加Nuget引用

Microsoft.Azure.EventHubs

Microsoft.Azure.EventHubs.Processor

3. 实现IEventProcessor接口

MyEventProcessor

 1     using Microsoft.Azure.EventHubs;
 2     using Microsoft.Azure.EventHubs.Processor;
 3     using System.Threading.Tasks;
 4 
 5     public class MyEventProcessor : IEventProcessor
 6     {
 7         public Task CloseAsync(PartitionContext context, CloseReason reason)
 8         {
 9             Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
10             return Task.CompletedTask;
11         }
12 
13         public Task OpenAsync(PartitionContext context)
14         {
15             Console.WriteLine($"MyEventProcessor initialized. Partition: '{context.PartitionId}'");
16             return Task.CompletedTask;
17         }
18 
19         public Task ProcessErrorAsync(PartitionContext context, Exception error)
20         {
21             Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
22             return Task.CompletedTask;
23         }
24 
25         public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
26         {
27             foreach (var eventData in messages)
28             {
29                 var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
30                 Console.WriteLine($"Event message received. Partition: '{context.PartitionId}', Data: '{data}'");
31             }
32 
33             return context.CheckpointAsync();
34         }
35     }

4. Program程序

添加常量作为事件中心连接字符串、事件中心名称、存储帐户容器名称、存储帐户名称和存储帐户密钥。 添加以下代码,并将占位符替换为其对应的值。

        private const string EhConnectionString = "{Event Hubs connection string}";
        private const string EhEntityPath = "{Event Hub path/name}"; //MyEventHub
        private const string StorageContainerName = "{Storage account container name}"; //eventhubcontainer
        private const string StorageAccountName = "{Storage account name}"; //linux1
        private const string StorageAccountKey = "{Storage account key}";
private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey);

这里涉及到Azure Storage Account,必须为上篇博文中创建的事件中心MyEventHub指定一个存储账户和存储容器

增加MainAysnc方法:注册事件处理器,处理事件消息

 1         /// <summary>
 2         /// 注册事件处理器
 3         /// </summary>
 4         /// <param name="args"></param>
 5         /// <returns></returns>
 6         private static async Task MainAsync(string[] args)
 7         {
 8             Console.WriteLine("Registering EventProcessor...");
 9 
10             var eventProcessorHost = new EventProcessorHost(
11                 EhEntityPath,
12                 PartitionReceiver.DefaultConsumerGroupName,
13                 EhConnectionString,
14                 StorageConnectionString,
15                 StorageContainerName);
16 
17             // Registers the Event Processor Host and starts receiving messages
18             await eventProcessorHost.RegisterEventProcessorAsync<MyEventProcessor>();
19 
20             Console.WriteLine("Receiving. Press ENTER to stop worker.");
21             Console.ReadLine();
22 
23             // Disposes of the Event Processor Host
24             await eventProcessorHost.UnregisterEventProcessorAsync();
25         }

Main函数

1         static void Main(string[] args)
2         {
3             MainAsync(args).GetAwaiter().GetResult();
4         }

Run

至此,我们实现了事件消息发送到Event Hub,同时从Event Hub接收处理事件消息。

周国庆

2017/5/18

原文地址:https://www.cnblogs.com/tianqing/p/6865149.html