通过hive向写elasticsearch的写如数据

通过hive向写elasticsearch的写如数据

hive 和 elasticsearch 的整合可以参考官方的文档:
ES-hadoop的hive整合 : https://www.elastic.co/guide/en/elasticsearch/hadoop/current/hive.html#hive
ES-hadoop的配置说明 : https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

1. 新建一个hive表es_goods_order

将该hive表的数据存储指定到ES上,指定索引的ID列是goods_order_id('es.mapping.id' = 'goods_order_id',);
指定数据写入的方式是upsert('es.write.operation'='upsert'),如果id不存在就插入,如果存在就执行更新操作。

add jar file:///home/hadoop/lib/elasticsearch-hadoop-5.1.1.jar;
set username=fxin.zhao
use temp;
CREATE EXTERNAL TABLE es_goods_order(
	goods_order_id string, 
	sale_place string,
	station_place string,
	multi_channel_id string,
	business_date string,
	discount  string,
	discount_type string,
	payment_amouunt string,
	refun_amount string
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.resource' = 'test_crm/es_goods_order', 
'es.nodes'='10.10.110.125',
'es.port'='9200',
'es.mapping.id' = 'goods_order_id',
'es.write.operation'='upsert'
);

向es_goods_order表中插入数据:3分钟启用1个maper写入80万数据。Es中的index是在导入数据的时候检查的,如果不存在,则会创建。

add jar file:///home/hadoop/lib/elasticsearch-hadoop-5.1.1.jar;
use temp;
insert into table es_goods_order 
select goods_order_id,
       sale_place,
       station_place,
       multi_channel_id,
       business_date,
       discount,
       discount_type,
       payment_amouunt,
       refun_amount
  from ods.goods_order
 where dt >= '2016-10-01'
   and dt <= '2016-10-04';
  • 验证upsert功能是否有效:再重新写入部分相同的数据。
insert into table es_goods_order
select goods_order_id,
       sale_place,
       station_place,
       multi_channel_id,
       business_date,
       discount,
       discount_type,
       payment_amouunt,
       refun_amount
  from ods.goods_order
 where dt = '2016-10-01'
 limit 1000;

结论:
指定ID问题: 通过'es.mapping.id' = 'goods_order_id' 指定id。
数据更新问题: 通过'es.write.operation'='upsert' 来执行插入或者更新操作(如果id存在)。

  • ES 的hive表基于json存储。

hadoop fs -put 20170111202237 /tmp/fuxin.zhao/es_json

add jar file:///home/hadoop/lib/elasticsearch-hadoop-5.1.1.jar;
use temp;
##创建一个临时表
CREATE EXTERNAL TABLE es_json_tmp (
    json    STRING
 );
##给临时表添加数据
load data  inpath '/tmp/fuxin.zhao/es_json/20170116185548' into table es_json_tmp;


drop table es_json;
##创建json格式的hive表
CREATE EXTERNAL TABLE es_json (
    json    STRING
 )
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.resource' = 'test_crm/es_json', 
'es.nodes'='10.10.110.125',
'es.port'='9200',
'es.input.json' = 'yes',
'es.mapping.id' = 'uid'
);

##执行插入数据操作
insert into table es_json
select json
  from es_json_tmp;

报出如下错误:

Caused by: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [10.10.110.125:9200] returned Bad Request(400) - Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters.; Bailing out..

原因是json文档中的_id 字段是ES的元数据字段,属于ES的关键字,解决方法:
vi 20170116185548
将文档中的_id 替换成uid。
1,$s/_id/uid/g

原文地址:https://www.cnblogs.com/honeybee/p/6293634.html