String整合kafka的步骤

Java代码作为消息的生产者和消费者

1.导入依赖

 <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.2.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.1</version>
            <exclusions>
                <exclusion>
                    <artifactId>jmxri</artifactId>
                    <groupId>com.sun.jmx</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jms</artifactId>
                    <groupId>javax.jms</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jmxtools</artifactId>
                    <groupId>com.sun.jdmk</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

2.编写配置文件

consumer的XML

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context.xsd">

    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <!--Kafka服务地址 -->
                <entry key="bootstrap.servers" value="192.168.21.128:9092" />
                <!--Consumer的组ID,相同group.id的consumer属于同一个组。一个组中的不同成员,相同的消息只能有一个人收到 -->
                <entry key="group.id" value="test-consumer-group" />
                <!--如果此值设置为true,consumer会周期性的把当前消费的offset值保存到zookeeper。当consumer失败重启之后将会使用此值作为新开始消费的值。 -->
                <entry key="enable.auto.commit" value="true" />
                <!--网络请求的socket超时时间。实际超时时间由max.fetch.wait + socket.timeout.ms 确定 -->
                <entry key="session.timeout.ms" value="15000 " />

                <entry key="key.deserializer"
                    value="org.apache.kafka.common.serialization.StringDeserializer" />

                <entry key="value.deserializer"
                    value="org.apache.kafka.common.serialization.StringDeserializer" />
            </map>
        </constructor-arg>
    </bean>


    <!-- 创建consumerFactory bean -->
    <bean id="consumerFactory"
        class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <ref bean="consumerProperties" />
        </constructor-arg>
    </bean>


    <bean id="messageListenerContainer"
        class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
        init-method="doStart">
        <constructor-arg ref="consumerFactory" />
        <constructor-arg ref="containerProperties" />
    </bean>



    <!-- 记得修改主题 -->
    <bean id="containerProperties" class="org.springframework.kafka.listener.ContainerProperties">
        <!-- 构造函数 就是 主题的参数值 -->
        <!-- kafka消费者监听的主题 -->
        <constructor-arg value="liujin" />
        <property name="messageListener" ref="messageListernerConsumerService" />
    </bean>



    <!--指定具体监听类的bean 这个类需要我们手工写-->
    <bean id="messageListernerConsumerService" class="com.bawei.MsgListener" />
    
    
</beans>

producer的XML(要确保IP和端口号是正确的)(发消息的配置文件)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context.xsd">
        
        <!--参数配置 -->
    <bean id="producerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <!-- kafka服务地址,可能是集群 value="localhost:9092,localhost:9093,localhost:9094"-->
                <entry key="bootstrap.servers" value="192.168.21.128:9092" />
                
                <!-- 有可能导致broker接收到重复的消息-->
                <entry key="retries" value="0" />
                <!-- 每次批量发送消息的数量 -->
                <entry key="batch.size" value="1638" />
                <!-- 默认0ms,在异步IO线程被触发后(任何一个topic,partition满都可以触发) -->
                <entry key="linger.ms" value="1" />
                
                <!--producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常 -->
                <entry key="buffer.memory" value="33554432 " />
                
                <entry key="key.serializer"
                    value="org.apache.kafka.common.serialization.StringSerializer" />
                    
                <entry key="value.serializer"
                    value="org.apache.kafka.common.serialization.StringSerializer" />
            </map>
        </constructor-arg>
    </bean>

    <!-- 创建kafkatemplate需要使用的producerfactory bean -->
    <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
    <!-- 通过构造的方式,指定 了一个生产者的配置集合 -->
        <constructor-arg>
            <ref bean="producerProperties" />
        </constructor-arg>
    </bean>

    <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
    <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
    <!-- 生产者的工厂,这里是声明了ip地址,端口号,和一些其他的基础配置 -->
        <constructor-arg ref="producerFactory" />
        <!--设置对应topic 如果服务节点里没有这个主题,就会自动创建-->
        <property name="defaultTopic" value="liujin" />
    </bean>
</beans>

3.编写发消息的测试类

首先注入依赖

@Autowired
    KafkaTemplate<String, String> kafkaTemplate;

调用方法来发送

@Test
    public void testSend() {
        //send方法第一个方法:指定发送的主题;第二个方法:指定要发送的内容
        kafkaTemplate.send("liujin", "你好,这个是JAVA代码发来的消息");
    }

最重要的,不能忘记spring整合junit单元测试、加载配置文件

//spring整合junit单元测试
@RunWith(SpringJUnit4ClassRunner.class)
//指定配置文件的加载路径
@ContextConfiguration("classpath:producer.xml")

4.在测试是否能接收到消息时,不能忘记开启kafka

首先开启zookeeper

/opt/zookeeper/bin/zkServer.sh start(开启)
/opt/zookeeper/bin/zkServer.sh status(查看状态)

然后开启kafka

/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties 

创建主题

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.26.130:9092 --topic liujin(这里的主题与配置文件与测试类的主题名要一致)

假如接收不到消息可以关闭防火墙,释放端口号

[root@localhost ~]# service iptables stop
[root@localhost ~]# chkconfig iptables off

5.消息监听类,在这个类中实现一个接口此时这个类就具备了监听消息的功能

实现了MessageListener接口

//这个方法就是收消息的方法
    public void onMessage(ConsumerRecord<String, String> data) {
        // TODO Auto-generated method stub
        String value = data.value();
        System.out.println("java代码来的消息是:"+value);
    }

再写一个启动消费者的类,先启动消费者的类(启动后一直在监听消息),因为只有消费者先启动,kafka生产者发来的消息,才能时刻被接收到

//先启动消费者,因为只有消费者先启动,kafka生产者发来的消息,才能时刻被接收到
    public static void main(String[] args) {
        //加载消费者的配置文件
        new ClassPathXmlApplicationContext("classpath:consumer.xml");
    }

然后启动生产者的测试类,通过生产者的配置文件就会发送消息至主题,因为消息监听类时刻监听者消息就会打印出消息

原文地址:https://www.cnblogs.com/liujinqq7/p/12404744.html