Kafka入门教程

最近准备看一下大数据相关的工具,hadoop, spark,zookeeper,kafka

hadoop:大数据库处理框架,主要是map和reduce,通过map分解任务成一个个小问题,并行处理,reduce,将并行处理结果整合起来

spark:也是一个大数据库处理框架,基于RDD,这个RDD在内存中进行操作,速度较快,spark是用scala实现的。scala是一种类似java的语言,编译的字节码可以在jvm上运行。spark同时支持python,java,Scala等,通过这几种语言感觉python确实抽象程度更高,相同问题,python可能几行就解决了,java需要更多代码

zookeeper:可以在上面创建节点,存放值,同时,watch这些节点,节点改变会通知客户端

kafka:消息中间件,生产者将消息放到kafka里面,消费者从kafka里面消费消息

运行环境用的是centos,以前喜欢用ubuntu,但是ubuntu不是很稳定,总是提示内部错误。centos相对稳定一点,公司服务器装的也是centos。

1. kafka环境

linux : centos

java:java8

zookeeper:自带的

2.安装

2.1 安装jdk(谷歌 or 百度)

2.2 安装kafka

下载kafka:https://kafka.apache.org/downloads,这里选择编译好的,.tgz的。这里我下载的版本是

放到centos的/opt/soft里面,解压,重命名为kafk211,路径/opt/soft/kafka211

ok,到这里安装完成。其实,就是下载,解压就可以了

3. 启动kafka

kafka启动需要先启动zookeeper

3.1 启动zookeeper,控制台执行

bin/zookeeper-server-start.sh config/zookeeper.properties& 

zookeeper启动成功后,可以使用zk客户端连接去检查是否启动成功。或者用ps命令,jps命令等

3.2 启动kafka

bin/kafka-server-start.sh config/server.properties &

同样可以用ps命令或jps命令,查看是否启动成功

这样kafak已经启动完成

kafka作为一个消息中间件,把消息分为了不同的topic,生产者把不同的消息放到不同的topic里面,消费者从不同的topic里面取消息

4. 控制台启动生产者和消费者

4.1 创建topic test

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

可以通过命令查看创建好的topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

4.2 启动生产者,往topic test里面写消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

kafka监听的9092端口,zookeeper监听的是2181端口

4.3 启动消费者,从topic test里面消费消息

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

ok,到这儿,消费者和生产者已经启动完成

5. 通过java客户端,写一个生产者和消费者的demo

需要修改一下kafka的配置文件config/server.properties

advertised.host.name=192.168.211.129
advertised.port=9092

这里改成自己网卡的ip,这里是个坑,不配置这个,会通过

java.net.InetAddress.getCanonicalHostName()

获取host.name,造成不能回复生成者消息

5.1 kafka maven依赖

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.10</artifactId>
  <version>2.1.1</version>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.10.1.0</version>
</dependency>

工具类KafkaUtil.java

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;

import java.util.Properties;

/**
 * Created by gxf on 2017/6/23.
 */
public class KafkaUtil {
    private static Producer<String, String> kp;
    private static KafkaConsumer<String, String> kc;

    public static Producer<String, String> getProducer() {
        if (kp == null) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.211.129:9092");
            props.put("acks", "1");
            props.put("retries", 0);
//            props.put("batch.size", 16384);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            kp = new KafkaProducer<String, String>(props);
        }
        return kp;
    }

    public static KafkaConsumer<String, String> getConsumer() {
        if(kc == null) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.211.129:9092");
            props.put("group.id", "1");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            kc = new KafkaConsumer<String, String>(props);
        }
        return kc;
    }
}

生产者 ProducerTest.java

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
 * Created by gxf on 2017/6/23.
 */
public class ProducerTest {
    public static void main(String[] args) throws Exception{
        Producer<String, String> producer = KafkaUtil.getProducer();
        int i = 0;
        while(true) {
            ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", String.valueOf(i), "this is message"+i);
            producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e != null)
                        e.printStackTrace();
                    System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset());
                }
            });
            i++;
            Thread.sleep(1000);
        }
    }
}

每个1秒钟,向kafka发送消息,并注册回调函数,完成消息发送,调用回调函数。kafka控制台消费者也可以消费这里发送的消息

消费者ConsumerTest.java

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;

/**
 * Created by gxf on 2017/6/23.
 */
public class ConsumerTest {
    public static void main(String[] args) throws Exception{
        KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer();
        consumer.subscribe(Arrays.asList("test"));
        while(true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for(ConsumerRecord<String, String> record : records) {
                System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
            }
        }
    }
}

同样,这里的消费者,也可以消费控制台生产者放的消息

ok至此,kafak安装以及java客户端的使用基本了解了

原文地址:https://www.cnblogs.com/luckygxf/p/7072691.html