es增量自定义更新的脚本

安装需要可软件

sudo apt-get install python-pip
sudo pip install elasticsearch;
sudo apt-get install python-dev
sudo pip install MySQL-python

导入脚本import.sh

#!/bin/bash
set -e

bin=/usr/local/elasticsearch-jdbc-1.5.2.0/bin
lib=/usr/local/elasticsearch-jdbc-1.5.2.0/lib

echo '{
"type" : "jdbc",
"jdbc" : {
"url" : "jdbc:mysql://192.168.10.29:3306/db_1",
"user" : "root",
"password" : "root",
"sql" : "select * from '${1}' where dtTime>"'${2}'" ",
"index": "db_1",
"type": "'${1}'"
}
}' | java 
-cp "${lib}/*" 
-Dlog4j.configurationFile=${bin}/log4j2.xml 
org.xbib.tools.Runner 
org.xbib.tools.JDBCImporter

if [ $? != 0 ];then
  exit -1
fi

python调用import.sh实现增量添加:

#!/usr/bin/env python

from datetime import datetime
from elasticsearch import Elasticsearch
import MySQLdb
import time
import os
import subprocess

es=Elasticsearch("192.168.10.29")

def now():
    return time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))

def getLastTime(tableName):
    global es
    q={
      "aggs":
      {
         "max":{
            "max":{"field":"dtTime"}
          }
       }
    }
    dt=es.search(index="db_1",doc_type=tableName,body=q)['aggregations']['max']['value']

    if dt is None:
        return '2015-01-01 00:00:00'
    return time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(dt/1000))

def insert(tableName,dtLastTime):
    global es
    print tableName+" startTime:"+str(dtLastTime)
    print '/usr/local/elasticsearch-jdbc-1.5.2.0/bin/import.sh %s "%s"'%(tableName,str(dtLastTime))

    retCode = subprocess.call('/usr/local/elasticsearch-jdbc-1.5.2.0/bin/import.sh %s "%s"'%(tableName,str(dtLastTime)),shell=True)

    if retCode!=0:
        print "Import failed"
        return
    print "%s Import finished"%(now())
    es.indices.refresh(index="db_1")

def increment():

    conn=MySQLdb.connect(host='192.168.10.29',port=3306,user='root',passwd='root',db ='db_1',)

    cur=conn.cursor()
    ret=cur.execute('select vTableName,dtLastTime from importinfo')
    ret=cur.fetchall()
    for line in ret:
        tableName=line[0]
        fileName=line[1].strftime("%Y-%m-%d-%H-%M-%S")
        dtLastTime=getLastTime(tableName)
        insert(tableName,dtLastTime)
    cur.close()
    conn.close()

if __name__=="__main__":
    increment()
    #getLastTime("achi")
原文地址:https://www.cnblogs.com/ggzone/p/10121197.html