大数据学习——Storm+Kafka+Redis整合

1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.cyf</groupId>
    <artifactId>TestStorm</artifactId>
    <version>1.0-SNAPSHOT</version>

    <repositories>
        <repository>
            <id>alimaven</id>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>


    <dependencies>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <!--<scope>provided</scope>-->
            <version>0.9.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>0.9.5</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.7.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.8.2</artifactId>
            <version>0.8.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>com.cyf.StormTopologyDriver</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

2  MyLocalFileSpout.java

package kfk;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import org.apache.commons.lang.StringUtils;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * Created by Administrator on 2019/2/19.
 */
public class MyLocalFileSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private BufferedReader bufferedReader;

    //初始化方法
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
        try {
            this.bufferedReader = new BufferedReader(new FileReader("/root/1.log"));
//            this.bufferedReader = new BufferedReader(new FileReader("D:\1.log"));
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }

    }

    //循环调用的方法
    //Storm实时计算的特性就是对数据一条一条的处理

    public void nextTuple() {
        //每调用一次就会发送一条数据出去
        try {
            String line = bufferedReader.readLine();

            if (StringUtils.isNotBlank(line)) {
                List<Object> arrayList = new ArrayList<Object>();
                arrayList.add(line);
                collector.emit(arrayList);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("juzi"));
    }
}

3 MySplitBolt.java

package kfk;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * Created by Administrator on 2019/2/19.
 */
public class MySplitBolt extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {

        //1.数据如何获取
        byte[] juzi = (byte[]) tuple.getValueByField("bytes");
        //2.进行切割
        String[] strings = new String(juzi).split(" ");
        //3.发送数据
        for (String word : strings) {
            basicOutputCollector.emit(new Values(word, 1));
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word", "num"));
    }
}

4  MyWordCountAndPrintBolt.java

package kfk;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import redis.clients.jedis.Jedis;

import java.util.HashMap;
import java.util.Map;

/**
 * Created by Administrator on 2019/2/19.
 */
public class MyWordCountAndPrintBolt extends BaseBasicBolt {

    private Map<String, String> wordCountMap = new HashMap<String, String>();
    private Jedis jedis;



    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        //连接redis——代表可以连接任何事物
        jedis=new Jedis("127.0.0.1",6379);
        super.prepare(stormConf, context);
    }

    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        String word = (String) tuple.getValueByField("word");
        Integer num = (Integer) tuple.getValueByField("num");

        //1查看单词对应的value是否存在
        Integer integer = wordCountMap.get(word)==null?0:Integer.parseInt(wordCountMap.get(word)) ;
        if (integer == null || integer.intValue() == 0) {
            wordCountMap.put(word, num+"");
        } else {
            wordCountMap.put(word, (integer.intValue() + num)+"");
        }
        //2.打印数据
//        System.out.println(wordCountMap);
        //保存数据到redis
        //redis key wordcount:Map
        jedis.hmset("wordcount",wordCountMap);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

5 StormTopologyDriver.java

package kfk;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;

/**
 * Created by Administrator on 2019/2/21.
 */
public class StormTopologyDriver {
    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
        //1准备任务信息
        TopologyBuilder topologyBuilder = new TopologyBuilder();

        topologyBuilder.setSpout("KafkaSpout", new KafkaSpout(new SpoutConfig(new ZkHosts("mini1:2181"),"wordCount","/wc","wc")));
        topologyBuilder.setBolt("bolt1", new MySplitBolt()).shuffleGrouping("KafkaSpout");
        topologyBuilder.setBolt("bolt2", new MyWordCountAndPrintBolt()).shuffleGrouping("bolt1");

        //2任务提交
        //提交给谁,提交什么内容
        Config config=new Config();
        StormTopology stormTopology=topologyBuilder.createTopology();

        //本地模式
        LocalCluster localCluster=new LocalCluster();
        localCluster.submitTopology("wordcount",config,stormTopology);

        //集群模式
//        StormSubmitter.submitTopology("wordcount",config,stormTopology);
    }
}

6 TestRedis.java

package kfk;

import redis.clients.jedis.Jedis;

import java.util.Map;

/**
 * Created by Administrator on 2019/2/25.
 */
public class TestRedis {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("127.0.0.1", 6379);

        Map<String, String> wordcount = jedis.hgetAll("wordcount");
        System.out.println(wordcount);
    }
}

在mini1的/root/apps/kafka目录下 

创建topic

bin/kafka-topics.sh --create --zookeeper mini1:2181 --replication-factor 1 --partitions 1 --topic wordCount

生产数据

bin/kafka-console-producer.sh --broker-list mini1:9092 --topic wordCount

启动 StormTopologyDriver.java

运行 redis-cli.exe

 

启动TestRedis.java

原文地址:https://www.cnblogs.com/feifeicui/p/10415780.html