SparkStreaming+Kafa+HBase

1. 总结一些概念:

安装zookeeper3.4.6

cp zoo_sample.cfg zoo.cfg
vim zoo.cfg

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/software/zookeeper-3.4.6/data
dataLogDir=/opt/software/zookeeper-3.4.6/logs
clientPort=2181
server.1=pc.apache:2888:3888

将zookeeper加入环境变量;
在mkdir data目录, vim myid 插入1, mkdir logs

zkServer.sh start







 

安装kafka 0.10.2

vim server.properties

broker.id=0
port=9092
host.name=pc.apache
log.dirs=/opt/software/kafka_2.11-0.10.2.0/data

num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

zookeeper.connect=pc.apache:2181
num.partitions=3
num.recovery.threads.per.data.dir=1

log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=6000


将kafka添加进环境变量, 启动服务:
nohup kafka-server-start.sh /opt/software/kafka_2.11-0.10.2.0/config/server.properties> /opt/software/kfk.out &

创建分区
kafka-topics.sh --create --zookeeper pc.apache:2181 --replication-factor 1 --partitions 3 --topic k1


设置测试用生产者:
vim producer.properties
bootstrap.servers=pc.apache:9092

设置测试用消费者:
vim consumer.properties
zookeeper.connect=pc.apache:2181


kafka-console-producer.sh --broker-list pc.apache:9092 --topic k1
kafka-console-consumer.sh --bootstrap-server pc.apache:9092 --topic k1 --from-beginning

  

IDEA作为生产者, 向kafka发送数据;

添加依赖:
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.2.0</version>
        </dependency>


public class Productor implements Runnable {

    private final KafkaProducer<Integer, String> producer;
    private final String topic;
    private final Properties props = new Properties();


    public static void main(String[] args) {
        Thread t = new Thread(new Productor("k1"));
        t.start();
    }

    public Productor(String topic) {
        props.put("metadata.broker.list", "pc.apache:9092");
        props.put("bootstrap.servers", "pc.apache:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<Integer, String>(props);
        this.topic = topic;
    }

    @Override
    public void run() {
        int messageNo = 1;
        while (true){
            String messageStr = "Message_" + messageNo;
            producer.send(new ProducerRecord<Integer, String>(topic, messageStr));
            messageNo++;
            try {
                sleep(3000);
            }catch (Exception e){
                System.out.println(e.toString());
            }
        }
    }
}

  

 3. 安装HBase

为了解除HBase与Zookeeper的耦合性, 使用HBase自带的Zookeeper;

配置好hbase-site.xml后, 直接启动start-hbase.sh即可;

<property>
 <name>hbase.rootdir</name>
 <value>hdfs://pc.apache:8020/hbase</value>
</property>

<property>
  <name>hbase.cluster.distributed</name>
  <value>true</value>
</property>

  <property>
	<name>hbase.zookeeper.quorum</name>
	<value>pc.apache</value>
</property>
<property>
	<name>hbase.master</name>
	<value>hdfs://pc.apache:60000</value>
</property>

<property>
 <name>hbase.tmp.dir</name>
 <value>/opt/software/hbase-1.2.5/tmp</value>
</property>

<property>
 <name>hbase.zookeeper.property.dataDir</name>
 <value>/opt/software/hbase-1.2.5/zooData</value>
</property>

  

使用IDEA连HBase

将hbase-site.xml 放入classpath 放入classpath
添加依赖:

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase</artifactId>
            <version>${hbase.version}</version>
            <type>pom</type>
        </dependency>


private static Configuration config;

    static {
        config = HBaseConfiguration.create();
    }

  

即可创建连接;

 

原文地址:https://www.cnblogs.com/ruili07/p/10050281.html