flume_kafka_hdfs_hive数据的处理

     使用flume收集数据,将数据传递给kafka和hdfs,kafka上的数据可以使用storm构建实时计算,而hdfs上的数据,经过MR处理之后可以导入hive中进行处理。

     环境:hadoop1.2.1,hive 0.13.1,maven 3.2.5,flume 1.4,kafka 0.7.2,eclipse luna,jdk 1.7_75;mysql-connector-java-5.1.26.bin.jar,flume-kafka-master.zip。

     说明:所有服务都架设在一台机器上。

     1:安装hadoop:这篇文章写得比较完整,可以看看:Ubuntu 12.10 安装JDK、Hadoop全过程

我在安装过程中出现:Does not contain a valid host:port authority: file:/// ,看了一遍自己的core-site.xml,hdfs-site.xml,mapred-site.xml没有发现错误,还特地看了些hosts配置,最后网上找到,fs.default.name中default写错了,启动hadoop。

hadoopnew

     2:安装hive:下载解压之后,设置HIVE_HOME,将HIVE_HOME/bin加入到PATH变量中,直接输入hive即可启动。默认hive是使用嵌入模式的Derby数据库,它的特点是小巧,而且老爹也是apache,但存在单session,无法多用户共享,这里参考网上的资料将元数据存储到mysql中去:Hive集成Mysql作为元数据,这里我出现了点问题,只能使用localhost进行连接,无法使用root@myggg,试着按照文章中查找my.conf,但没有找到相关配置,在实验环境下这样也可以用:

hive

     3:安装flume:

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

Flume是由Cloudera推出的一个高效收集,处理,移动大量日志数据的分布式,可靠地,高可用的服务。它有简单并且灵活的架构基于数据流之上。它使用可靠地协调性,容错转移,恢复机制使它强健性并且容错。它使用简单可伸缩的数据模型能实现在线数据分析。

Flume安agent划分,一个agent包括Source,Channel,Sink三个部分,Source从Web Server中取数据,push交给Channel,Sink将pull Channel得到数据,一个agent可以有多个Channel, Sink。

     配置FLUME_HOME,将PATH中加入Flume下的执行路径,将conf下的flume-conf.properties.template重命名为flume-conf.properties,然后进行配置,在单机情况下:

agent.sources = r1                //agent中添加source,命名为r1
agent.sinks = s1                  //agent中添加sink,命名为s1
agent.channels = c1               //agent中添加channel,命名为c1
agent.sinks.s1.channel = c1       //s1从c1中取数据
agent.sources.r1.channels = c1    //r1将数据交给c1

#describe the source
agent.sources.r1.type = exec      //定义r1的类型为exec
agent.sources.r1.command = tail -F /root/input/loginfo    //r1执行的命令

#use a channel which buffers events in memory
agent.channels.c1.type = memory    //定义c1的类型memory
agent.channels.c1.capacity = 1000  //c1的容量
agent.channels.c1.transactionCapacity = 100  //channel获取或者sink获得一次最大的数据量 

#sinks type
agent.sinks.s1.type = logger       //定义s1的类型为logger

完成后启动flume,测试:

bin/flume-ng agent --conf ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent

flume

4:安装kafka:关于kafka的介绍:Kafka快速入门,简单来说,kafka集群中的一台服务器就是一个broker,消息按名字分类,叫做topic,消息的产生是producer,消息的获取方为customer。kafka的安装方法同上。由于使用默认配置,kafka中的config下不需要配置了,直接启动即可进行模拟:

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/kafka-console-producer.sh --zookeeper localhost:2181 localhost:9092 --topic test 
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

相关可以看apache-kafka

5:进行整合:数据的处理过程包括数据收集,数据清理,数据存储,数据分析,数据展现。在这里数据的收集由flume负责,定期从web server中收集log相关信息,对于实时数据的处理,将数据直接发送到kafka,然后交给后面的storm处理(这个没有做),对于离线部分,经过简单的mr处理后存储到hdfs上,然后使用hive操作。

总的架构图:

arch

Flume的设计:

flume_ng

在搭建之前,先安装maven:安装步骤同上Flume与Kafka

安装完后echo $PATH:

/usr/lib/qt-3.3/bin:/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/usr/java/jdk1.7.0_75/bin:/root/hadoop-1.2.1/bin:/root/apache-hive-0.13.1/bin:/root/apache-flume-1.4.0/bin:/root/kafka-0.7.2/bin:/root/bin:/usr/java/jdk1.7.0_75/bin:/root/hadoop-1.2.1/bin:/root/apache-hive-0.13.1/bin:/root/apache-flume-1.4.0/bin:/root/kafka-0.7.2/bin:/root/Downloads/apache-maven-3.2.5/bin

Flume与Kafka之间整合需要一个插件:这里介绍个flume-kafka插件,flume1.4,kafka0.7.2的基础上,将代码下载下来,进入目录,使用maven打包成jar文件,将生成的jar包放到flume的lib或相关目录下,依次将hadoop.1.2.1-*.jar,kafka0.7.2.jar, scala-compiler.jar(2.8),scala-library.jar(2.8),zkclient-0.1.jar导入,mvn package过程中可能会报错,找不到kafka0.7.2.jar,你需要将额外的extra-dependencies下的包放到‘~/.m2/repository/com/linkedin/kafka/kafka/0.7.2/’下,再进行package。

对于myggg开启6个终端:

console

对于发送到kafka中的数据以后在处理,现在主要是针对hadoop中的数据,首先使用MR处理,格式化文本。

6:后续:解压eclipse,将之前准备的hadoop-eclipse-plugin-1.2.1.jar放到eclipse下的plugins目录下,使用vnc连接到机器,编写MR程序。

使用’hadoop fs -cat /myFlume/FlumeData.1426320728464’查看文件:

1,b
2,c
3,d
4,e
5,f
6,g
7,z
0,o

编写MR,将一行记录拆分为key,value:

public static class MyMapper
         extends Mapper<Object, Text, IntWritable, Text>{
      private IntWritable hello = new IntWritable();
      private Text world = new Text();
  
      public void map(Object key, Text value, Context context
                      ) throws IOException, InterruptedException {
        String[] array = value.toString().split(",");
        if (array.length == 2) {
          hello.set(Integer.parseInt(array[0]));
          world.set(array[1]);
  
          context.write(hello, world);
        }
      }
    }

查看结果:

[root@myggg eclipse]# hadoop fs -cat /myoutput/part-r-00000
0    o
1    b
2    c
3    d
4    e
5    f
6    g
7    z

使用hive建立外部表查看数据:

create external table employee(id int, name string)
row format delimited fields terminated by '	'
lines terminated by '
'
stored as textfile
location '/myoutput';

然后就可以进行相关查询与处理了。

原文地址:https://www.cnblogs.com/dslover/p/4337849.html