大数据学习——kafka+storm+hdfs整合

1 需求

kafka,storm,hdfs整合是流式数据常用的一套框架组合,现在

根据需求使用代码实现该需求

需求:应用所学技术实现,kafka接收随机句子,对接到storm中;使用storm集群统计句子中每个单词重复出现的次数(wordcount),将统计结果存入hdfs中。

1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>bigdata</groupId>
    <artifactId>homework</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <!--<scope>provided</scope>-->
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <!--<scope>provided</scope>-->
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-hdfs</artifactId>
            <version>1.0.2</version>
            <exclusions>
                <exclusion>
                    <groupId>io.confluent</groupId>
                    <artifactId>kafka-avro-serializer</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.1</version>
                <configuration>
                    <createDependencyReducedPom>true</createDependencyReducedPom>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>storm.StormTopologyDriver</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                    <skip>true</skip>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>
2 PullWords.java
package kafka;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @Description
 * kafka消费者
 */
public class PullWords {

    private KafkaConsumer<String, String> consumer;
    private AtomicBoolean isAutoCommit;

    // kafka topic
    private final static String TOPIC = "wordCount";

    public PullWords() {
        isAutoCommit = new AtomicBoolean(false); // 默认非自动提交
        Properties props = new Properties();
        props.put("bootstrap.servers", "mini1:2181,mini2:2181,mini3:2181");
        props.put("group.id", "wordCount"); // 设置消费者组,组内的所有消费者协调在一起来消费订阅主题
        if (isAutoCommit.get()) {
            props.put("enable.auto.commit", "true"); // 设置自动提交
            props.put("auto.commit.interval.ms", "1000"); //配置自动提交消费进度的时间
        } else {
            props.put("enable.auto.commit", "false");
        }
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(TOPIC));
        this.isAutoCommit = isAutoCommit;
    }

    public void subscribe(String... topic) {
        consumer.subscribe(Arrays.asList(topic));
    }

    public ConsumerRecords<String, String> pull() {
        ConsumerRecords<String, String> records = consumer.poll(100);
        consumer.commitSync();
        return records;
    }

    public ConsumerRecords<String, String> pullOneOrMore() {
        ConsumerRecords<String, String> records = null;
        List<String> values = new ArrayList<>();
        while (true) {
            records = consumer.poll(10);
            if (records != null) {
                records.forEach(e -> values.add(e.value()));
                if (values.size() >= 1) {
                    consumer.commitSync();
                    values.clear();
                    break;
                }
            }
        }
        return records;
    }

    public void close() {
        consumer.close();
    }

}

3 PushWords.java

package kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.Future;

/**a
 * @Description
 * kafka生产者
 * @Author hongzw@citycloud.com.cn
 * @Date 2019-02-16 下午 7:08
 */
public class PushWords {

    private Producer<String, String> producer;

    // kafka topic
    private final static String TOPIC = "words";


    public PushWords() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "storm01:9092,storm02:9092,storm03:9092");
        props.put("acks", "all");
        props.put("retries", 0); // 请求失败不再重试
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(props);
    }

    // 发送句子到kafka集群
    public Future<RecordMetadata> push(String key, String words) {
        return producer.send(new ProducerRecord<>(TOPIC, key, words)); // send方法为异步调用
    }

    public void close() {
        producer.close();
    }

}

4 WordCount.java

package storm;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


public class WordCount extends BaseBasicBolt {

    Map<String, Integer> wordCountMap = new HashMap<>();

    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        String word = tuple.getValueByField("word").toString();
        Integer count = Integer.valueOf(tuple.getValueByField("count").toString());
        Integer integer = wordCountMap.get(word);
        if (integer == null) {
            wordCountMap.put(word, count);
        } else {
            wordCountMap.put(word, wordCountMap.get(word) + 1);
        }
        if (wordCountMap.size() > 20) { // map里面有超过20个单词则发送hfdsBolt
            List<Object> list = new ArrayList<>();

//            wordCountMap.forEach((k, v) -> {
//                String result = new String(k + ":" + v);
//                list.add(result);
//            });

            for (Map.Entry<String, Integer> entry : wordCountMap.entrySet()) {
                String result = new String(entry.getKey() + ":" + entry.getValue());
                list.add(result);
            }

            wordCountMap.clear();
            if (list.size() > 0) {
                basicOutputCollector.emit(new Values(list));
            }
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("total"));
    }
}

5 WordCountSplit.java

package storm;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;


public class WordCountSplit extends BaseBasicBolt {

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String[] words = tuple.getValue(tuple.fieldIndex("value")).toString().split(" ");
        for (String word : words) {
            collector.emit(new Values(word, 1));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

6 StormTopologyDriver.java

package storm;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder;

public class StormTopologyDriver {

    public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        KafkaSpoutConfig.Builder builder = new KafkaSpoutConfig.Builder("mini1:2181", "wordCount");
        builder.setProp("group.id", "wordCount");
        builder.setProp("enable.auto.commit", "true");
        builder.setProp("auto.commit.interval.ms", "1000");
        builder.setProp("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        builder.setProp("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        topologyBuilder.setSpout("kafkaSpout", new KafkaSpout<>(builder.build()));

        topologyBuilder.setBolt("wordCountSplit", new WordCountSplit()).shuffleGrouping("kafkaSpout");
        topologyBuilder.setBolt("wordCount", new WordCount()).shuffleGrouping("wordCountSplit");

        // 将文件保存到hdfs
        // 设置输出目录
        // 输出字段分隔符
        RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(",");
        // 每100个tuple同步到HDFS一次
        SyncPolicy syncPolicy = new CountSyncPolicy(5);
        // 每个写出文件的大小为5MB
        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.MB);
        FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/storm");
        HdfsBolt hdfsBolt = new HdfsBolt().withFsUrl("hdfs://mini1:9000").withFileNameFormat(fileNameFormat)
        .withRecordFormat(format).withRotationPolicy(rotationPolicy).withSyncPolicy(syncPolicy);
        topologyBuilder.setBolt("hdfsBolt", hdfsBolt).shuffleGrouping("wordCount");

        Config config = new Config();
        config.setNumWorkers(2);
        // 本地模式
//        LocalCluster localCluster = new LocalCluster();
//        localCluster.submitTopology("countWords", config, topologyBuilder.createTopology());

        // 集群模式
        StormSubmitter.submitTopology("countWords", config, topologyBuilder.createTopology());
    }
}
原文地址:https://www.cnblogs.com/feifeicui/p/10441785.html