一、准备
1、启动zookeeper
2、启动kafka
3、kafka创建主题。主题名称为:couponTopic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic couponTopic
二、生产者工程
1、增加引用
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.1.RELEASE</version> </dependency>
2、增加配置
spring: kafka: bootstrap-servers: 47.xx.xx.120:9092 consumer: group-id:mygroup listener: concurrency: 4
3、服务层增加kafka调用
注入kafkaTemplate
@Autowired public MerchantsServImpl(MerchantsDao merchantsDao, KafkaTemplate<String, String> kafkaTemplate) { this.merchantsDao = merchantsDao; this.kafkaTemplate = kafkaTemplate; }
通过kafkaTemplate发送消息。Constants.TEMPLATE_TOPIC为couponTopic
String passTemplate = JSON.toJSONString(template); kafkaTemplate.send( Constants.TEMPLATE_TOPIC, Constants.TEMPLATE_TOPIC, passTemplate );
三、消费者工程
1、增加Kafka依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.1.RELEASE</version> </dependency>
2、配置连接kafka
spring: application: name: kafkaConsume kafka: bootstrap-servers: 4x.xx.xx.xx:9092 consumer: group-id: = mygroup listener: concurrency: 4 server: port: 9527
3、接收Kafka消息
主题名称:
public static final String TEMPLATE_TOPIC = "couponTopic";
@Slf4j @Component public class ConsumeTemplate { @KafkaListener(topics = {Constants.TEMPLATE_TOPIC}) public void receive(@Payload String passTemplate, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { log.info("Consumer Receive PassTemplate: {}", passTemplate); PassTemplate pt; try { pt = JSON.parseObject(passTemplate, PassTemplate.class); } catch (Exception ex) { log.error("Parse PassTemplate Error: {}", ex.getMessage()); return; } ... } }