Flume和sqoop的搭建及简单使用

flume是实时收集的一种大数据框架

sqoop是一个数据转换的大数据框架,它可以将关系型数据库,比如mysql,里面的数据导入到hdfs和hive中,当然反过来也可以

一、Flume的搭建

  1、将/opt/software目录下的flume安装包,解压到/opt/app目录下

  2、进入flume目录下,修改配置文件

    1>将flume-env.sh.tem...文件重命名为flume-env.sh,并进去里面指定JAVA_HOME路径

    2>导入HDFS的有关jar包

    

  3、使用

    1>实时收集数据(监听一个端口,并实时接收该端口的数据)

      a.安装telnet

        将telnet-rpms包上传到/opt/software目录下,然后进入,直接sudo rpm -ivh ./*,安装

      b.创建配置文件,这个文件名随意,比如我命名为a1.conf,内容如下

             
# The configuration file needs to define the sources, 
# the channels and the sinks.

### define agent
a1.sources = r1
a1.channels = c1
a1.sinks = k1

### define sources
a1.sources.r1.type = netcat
a1.sources.r1.bind = 主机名(hadoop.spark.com)
a1.sources.r1.port = 端口号(44444)

### define channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

### define sink
a1.sinks.k1.type = logger
a1.sinks.k1.maxBytesToLog = 2014

### bind the soures and  sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.conf

      c.进入flume目录下,运行       

             
bin/flume-ng agent 
> -c conf 
> -n a1 
> -f conf/a1.conf 
> -Dflume.root.logger=DEBUG,console
运行代码

      d.telnet连接,telnet  主机名  端口号,注意这里的主机名和端口号要和你的a1.conf中的要一致,然后就可以发送数据了

    2>实时收集某个目录下的日志文件(我以Hive的日志文件为例)

      a.创建配置文件,比如我命名为flume-tail.conf     

        
# The configuration file needs to define the sources, 
# the channels and the sinks.

### define agent
a2.sources = r2
a2.channels = c2
a2.sinks = k2

### define sources
a2.sources.r2.type = exec
a2.sources.r2.command = tail -f /opt/app/hive-0.13.1-cdh5.3.6/logs/hive.log
a2.sources.r2.shell = /bin/bash -c

### define channels
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

### define sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop.spark.com:8020/user/flume/hive-logs/

a2.sinks.k2.hdfs.fileType = DataStream 
a2.sinks.k2.hdfs.writeFormat = Text
a2.sinks.k2.hdfs.batchSize = 10


### bind the soures and  sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
flume-tail.conf

      b.进入flume目录下,运行    

            
 bin/flume-ng agent 
> -c conf 
> -n a2 
> -f conf/flume-tail.conf 
> -Dflume.root.logger=DEBUG,console
运行代码

      c.另开一个窗口,启动hive,看看flume运行那一端有没有数据过来

    3>实时收集某个目录下,指定文件名的数据(我还以Hive为例)

      a.创建配置文件,比如我命名为flume-app.conf   

           
# The configuration file needs to define the sources, 
# the channels and the sinks.

### define agent
a3.sources = r3
a3.channels = c3
a3.sinks = k3


### define sources
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/app/flume-1.5.0-cdh5.3.6/spoollogs
a3.sources.r3.ignorePattern = ^(.)*\.log$
a3.sources.r3.fileSuffix = .delete

### define channels
a3.channels.c3.type = file
a3.channels.c3.checkpointDir = /opt/app/flume-1.5.0-cdh5.3.6/filechannel/checkpoint
a3.channels.c3.dataDirs = /opt/app/flume-1.5.0-cdh5.3.6/filechannel/data

### define sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop.spark.com:8020/user/flume/splogs/%Y%m%d
a3.sinks.k3.hdfs.fileType = DataStream 
a3.sinks.k3.hdfs.writeFormat = Text
a3.sinks.k3.hdfs.batchSize = 10
a3.sinks.k3.hdfs.useLocalTimeStamp = true


### bind the soures and  sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
flume-app.conf

   4.更多使用,请详见官网

     http://flume.apache.org/

二、sqoop的搭建

  1、将/opt/software目录下的sqoop安装包,解压到/opt/app目录下

  2、将sqoop-env.sh.tem....文件重命名为sqoop-env.sh,并进去里面指定路径

    

  3、拷贝mysql驱动jar包

    将/opt/software/mysql下的驱动jar包拷贝到sqoop的lib目录下

  4、使用

    1>将mysql中test数据库中的my_user表中的数据,导入到hdfs上,在hdfs上默认存储     

      
bin/sqoop import 
--connect jdbc:mysql://hadoop.spark.com:3306/test 
--username root 
--password centos 
--table my_user 
--target-dir /user/sqoop/imp_my_user 
--num-mappers 1
mysql--hdfs(默认)

    2>将mysql中test数据库中的my_user表中的数据,导入到hdfs上,在hdfs上以parquet存储,除了parquet形式外,还有textfile(默认),orcfile

      
bin/sqoop import 
--connect jdbc:mysql://hadoop.spark.com:3306/test 
--username root 
--password 123456 
--table my_user 
--target-dir /user/beifeng/sqoop/imp_my_user_parquet 
--fields-terminated-by ',' 
--num-mappers 1 
--as-parquetfile
mysql--hdfs(parquet)

    3>将mysql中test数据库中my_user表中指定的列,导入到hdfs上   

      
bin/sqoop import 
--connect jdbc:mysql://hadoop.spark.com:3306/test 
--username root 
--password 123456 
--query 'select id, account from my_user where $CONDITIONS' 
--target-dir /user/beifeng/sqoop/imp_my_user_query 
--num-mappers 1
mysql--hdfs(column)

    4>将mysql中test数据库中my_user表中的数据,导入到hdfs上,压缩存储(以snappy为例)  

      
bin/sqoop import 
--connect jdbc:mysql://hadoop.spark.com:3306/test 
--username root 
--password 123456 
--table my_user 
--target-dir /user/sqoop/imp_my_sannpy 
--delete-target-dir 
--num-mappers 1 
--compress 
--compression-codec org.apache.hadoop.io.compress.SnappyCodec 
--fields-terminated-by '	'
mysql--hdfs(snappy)

      这种方式,一般结合下面的代码一起使用     

      
drop table if exists default.hive_user_snappy ;
create table default.hive_user_snappy(
id int,
username string,
password string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ;

load data inpath '/user/sqoop/imp_my_sannpy' into table default.hive_user_snappy ;
View Code

      先将mysql数据库中的数据导入到hdfs上压缩存储,然后将压缩的数据导入到hive表中

    5>增量导入   

      
bin/sqoop import 
--connect jdbc:mysql://hadoop.spark.com:3306/test 
--username root 
--password 123456 
--table my_user 
--target-dir /user/sqoop/imp_my_incr 
--num-mappers 1 
--incremental append 
--check-column id 
--last-value 4
mysql--hdfs(increase)

    6>直接导入(第二次会覆盖第一次) 

      
bin/sqoop import 
--connect jdbc:mysql://hadoop.spark.com:3306/test 
--username root 
--password 123456 
--table my_user 
--target-dir /user/beifeng/sqoop/imp_my_incr 
--num-mappers 1 
--delete-target-dir 
--direct
mysql--hdfs(direct)

    7>将hdfs上的数据,导出到mysql中

      
touch /opt/datas/user.txt
vi /opt/datas/user.txt
12,zhangsan,zhangsan
13,lisi,lisi

bin/hdfs dfs -mkdir -p /user/sqoop/exp/user/ 
bin/hdfs dfs -put /opt/datas/user.txt /user/sqoop/exp/user/


bin/sqoop export 
--connect jdbc:mysql://hadoop.spark.com:3306/test 
--username root 
--password 123456 
--table my_user 
--export-dir /user/beifeng/sqoop/exp/user/ 
--num-mappers 1
hdfs--mysql

    8>将mysql中的数据导入到hive表中

      
use default ;
drop table if exists user_hive ;
create table user_hive(
id int,
account string,
password string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '	' ;

bin/sqoop import 
--connect jdbc:mysql://hadoop.spark.com:3306/test 
--username root 
--password 123456 
--table my_user 
--fields-terminated-by '	' 
--delete-target-dir 
--num-mappers 1 
--hive-import 
--hive-database default 
--hive-table user_hive
mysql--hive

    9>将hive表中的数据导入到mysql   

      
CREATE TABLE `my_user2` (
  `id` tinyint(4) NOT NULL AUTO_INCREMENT,
  `account` varchar(255) DEFAULT NULL,
  `passwd` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
);

bin/sqoop export 
--connect jdbc:mysql://hadoop.spark.com:3306/test 
--username root 
--password 123456 
--table my_user2 
--export-dir /user/hive/warehouse/user_hive 
--num-mappers 1 
--input-fields-terminated-by '	'
hive--mysql

    10>也可以将语句写在一个文件里面

      命令:

      bin/sqoop --options-file /opt/datas/sqoop-import-hdfs.txt 

    11>更多使用请详见官网:

      http://sqoop.apache.org/

原文地址:https://www.cnblogs.com/medal-li/p/7657069.html