flume(1.9.0)把数据导入hive(文件方式)

1. 配置表支持事务

  • (1)改配置文件hive-site.xml 或者 临时设置参数 命令行
<property>
    <name>hive.support.concurrency</name>
    <value>true</value>
</property>
<property>
    <name>hive.exec.dynamic.partition.mode</name>
    <value>nonstrict</value>
</property>
<property>
    <name>hive.txn.manager</name>
    <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
    <name>hive.compactor.initiator.on</name>
    <value>true</value>
</property>
<property>
    <name>hive.compactor.worker.threads</name>
    <value>1</value>
    <!--这里的线程数必须大于0 :理想状态和分桶数一致-->
</property>
<property>
    <name>hive.enforce.bucketing</name>
    <value>true</value>
</property>
  • (2)建表时 分区 分桶 stored as orc tblproperties('transactional'='true')

2. 版本问题导jar包

把${HIVE_HOME}/hcatalog/share/hcatalog下的所有包,拷贝入${FLUME_HOME}/lib

3. copyhive文件(这步好像不必要的??)

将hive.xml和hive-env.sh放到${HIVE_HOME}/conf下

4. 修改hdfs目录权限(这步不知道是不是必要的)

hadoop fs -chmod 777/tmp/hive chmod 777 /tmp/hive

5.建表

正确的建表实例

create table flume_hive.flume_hive(nid int,name string,phone string)
partitioned by(time string)
clustered by(nid) into 3 buckets
row format delimited fields terminated by ','
stored as orc tblproperties('transactional'='true');

6.配置flume的配置文件

配置文件flume_hive.cnof

#定义agent名, source、channel、sink的名称
agent3.sources = source3
agent3.channels = channel3
agent3.sinks = sink3
#具体定义source
agent3.sources.source3.type = spooldir
agent3.sources.source3.spoolDir = /soft/flume/logstohive
agent3.sources.source3.fileHeader=false
#定义拦截器,为消息添加时间戳
agent3.sources.source3.interceptors = i1
agent3.sources.source3.interceptors.i1.type=timestamp

#设置channel类型为磁盘
agent3.channels.channel3.type = file
#file channle checkpoint文件的路径
agent3.channels.channel3.checkpointDir=/soft/flume/tmp/point
# file channel data文件的路径
agent3.channels.channel3.dataDirs=/soft/flume/tmp

#具体定义sink
agent3.sinks.sink3.type = hive
agent3.sinks.sink3.hive.metastore = thrift://hadoop1:9083
agent3.sinks.sink3.hive.database = flume_hive
agent3.sinks.sink3.hive.table = flume_hive
agent3.sinks.sink3.hive.partition = %y-%m-%d-%H-%M
agent3.sinks.sink3.useLocalTimeStamp = false
agent3.sinks.sink3.round = true
agent3.sinks.sink3.roundValue = 10
agent3.sinks.sink3.roundUnit = minute
agent3.sinks.sink3.serializer = DELIMITED
agent3.sinks.sink3.serializer.delimiter = ","
agent3.sinks.sink3.serializer.serdeSeparator = ','
agent3.sinks.sink3.serializer.fieldnames = nid,name,phone
agent3.sinks.sink3.batchSize = 90

#组装source、channel、sink
agent3.sources.source3.channels = channel3
agent3.sinks.sink3.channel = channel3

7.启动

  • 先启动hive hive hive --service metastore -p 9083(这个端口号要配置到flume文件中,可用netstat -tulpn | grep 9083查看端口是否监听)
  • 然后启动flume
  • 拷贝文件

数据内容

[root@hadoop1 flume]# cat flume_hive.csv 
1001,aaa,12312453359,
1002,bbb,12678723873,
1003,ccc,12736732989,
1004,ddd,12327836839,
1005,eee,23728179728,
1006,fff,12387623878,
[root@hadoop1 flume]# cat flume_hive1.csv 
1007,aaa,12312453359,
1008,bbb,12678723873,
1009,ccc,12736732989,
1010,ddd,12327836839,
1011,eee,23728179728,
1012,fff,12387623878,

人生之事岂能尽如人意,生活如戏,哭笑皆由人,悲喜自己定
原文地址:https://www.cnblogs.com/Hephaestus/p/12633094.html