Kafka基础教程(四):.net core集成使用Kafka消息队列

  .net core使用Kafka可以像上一篇介绍的封装那样使用(Kafka基础教程(三):C#使用Kafka消息队列),但是我还是觉得再做一层封装比较好,同时还能使用它做一个日志收集的功能。

  因为代码比较多,所有就直接放到码云(Gitee)上去了,地址:https://gitee.com/shanfeng1000/dotnetcore-demo/tree/master/Kafka(为什么不是github,因为github太慢了-_-!!)

  感兴趣的可以克隆下来再按照自己的需求修改,这里简单介绍一下使用的Demo(Demo基于.net core3.1的版本,其他版本可能需要自行测试)

  生产者(AspNetCore.WebApi.Producer)

  首选需要在ConfigureServices中添加相关依赖项:  

    public void ConfigureServices(IServiceCollection services)
    {
        var hosts = new string[] { "192.168.209.133:9092", "192.168.209.134:9092", "192.168.209.135:9092" };

        #region 日志记录

        services.AddLogging(builder =>
        {
            builder.SetMinimumLevel(LogLevel.Trace);
        });
        services.AddKafkaLogger(options =>
        {
            options.BootstrapServers = hosts;
            options.Category = "Home";
            options.InitializeCount = 10;
            options.Key = "log";
            options.MinLevel = LogLevel.Trace;
            options.Topic = "topic.logger";
            options.ApplicationName = "AspNetCore.WebApi.Producer";
        });

        #endregion

        #region Kafka

        services.AddKafkaProducer(options =>
        {
            options.BootstrapServers = hosts;
            options.InitializeCount = 3;
            options.Key = "kafka";
            options.Topic = "topic.kafka";
        });

        #endregion

        ......
    }

  AddKafkaLogger是添加日志的相关依赖服务配置,之后使用.net core的ILogger对象记录消息时就可以直接将消息发布到Kafka了。

  AddKafkaProducer是添加Kafka发布者的相关配置,可以指定一个名称,使用时使用IKafkaProducerFactory接口注入即可,比如在Home控制器中使用:  

    [ApiController]
    [Route("[controller]")]
    public class HomeController : ControllerBase
    {
        IKafkaProducerFactory kafkaProducerFactory;
        ILoggerFactory loggerFactory;

        public HomeController(IKafkaProducerFactory kafkaProducerFactory, ILoggerFactory loggerFactory)
        {
            this.kafkaProducerFactory = kafkaProducerFactory;
            this.loggerFactory = loggerFactory;
        }

        /// <summary>
        /// 发布消息
        /// </summary>
        /// <param name="message">消息</param>
        /// <returns>success</returns>
        [HttpGet("Kafka")]
        public string Kafka(string message)
        {
            message = message ?? "";
            var producer = kafkaProducerFactory.Create();
            producer.Publish(message);

            return "success";
        }
        /// <summary>
        /// 日志
        /// </summary>
        /// <param name="message">消息</param>
        /// <returns>success</returns>
        [HttpGet("Logger")]
        public string Logger(string message)
        {
            var logger1 = loggerFactory.CreateLogger("logger");
            logger1.LogTrace($"logger1(LogTrace):{message}");
            logger1.LogDebug($"logger1(LogDebug):{message}");
            logger1.LogInformation($"logger1(LogInformation):{message}");
            logger1.LogWarning($"logger1(LogWarning):{message}");
            logger1.LogError($"logger1(LogError):{message}");
            logger1.LogCritical($"logger1(LogCritical):{message}");

            var logger2 = loggerFactory.CreateLogger("123456");
            logger2.LogTrace($"logger2(LogTrace):{message}");
            logger2.LogDebug($"logger2(LogDebug):{message}");
            logger2.LogInformation($"logger2(LogInformation):{message}");
            logger2.LogWarning($"logger2(LogWarning):{message}");
            logger2.LogError($"logger2(LogError):{message}");
            logger2.LogCritical($"logger2(LogCritical):{message}");

            return "success";
        }
    }

  消费者(AspNetCore.WebApi.Consumer)

   首选需要在ConfigureServices中添加相关依赖项: 

    public void ConfigureServices(IServiceCollection services)
    {
        var hosts = new string[] { "192.168.209.133:9092", "192.168.209.134:9092", "192.168.209.135:9092" };

        #region 日志记录

        services.AddKafkaConsumer(options =>
        {
            options.BootstrapServers = hosts;
            options.EnableAutoCommit = true;//自动提交
            options.GroupId = "group.1";
            options.Subscribers = KafkaSubscriber.From("topic.logger");

        }).AddListener(result =>
        {
            Console.WriteLine("Message From topic.logger:" + result.Message);
        });

        #endregion

        #region Kafka

        services.AddKafkaConsumer(options =>
        {
            options.BootstrapServers = hosts;
            options.EnableAutoCommit = false;
            options.GroupId = "group.2";
            options.Subscribers = KafkaSubscriber.From("topic.kafka");

        }).AddListener(result =>//直接在lambda表达式中完成消费逻辑
        {
            Console.WriteLine("Message From topic.kafka:" + result.Message);
            result.Commit();
        }).AddListener<KafkaConsumerListener>();//实现IKafkaConsumerListener接口完成消费逻辑

        #endregion

        ......
    }

  无论是日志的消息消费还是自定义的消息消费,都是先使用AddKafkaConsumer方法声明Kafka消费者的配置,然后使用AddListener方法添加消息消费的处理程序,AddListener有几个委托,可以接受一个lambda表达式,可以使用一个实现了IKafkaConsumerListener接口的类,就比如上面的KafkaConsumerListener类:  

    public class KafkaConsumerListener : IKafkaConsumerListener
    {
        public Task ConsumeAsync(RecieveResult recieveResult)
        {
            Console.WriteLine("KafkaConsumerListener:" + recieveResult.Message);
            recieveResult.Commit();
            return Task.CompletedTask;
        }
    }
原文地址:https://www.cnblogs.com/shanfeng1000/p/13035726.html