ELASTIC的备份与恢复

前言

elasticsearch官方并没有提供合适的备份工具,然而生产场景中备份却是的确需要的。

本文介绍了使用自己写的php脚本以及第三方工具来进行索引的备份,恢复以及删除等操作。

全量备份

elasticdump --input  http://127.0.0.1:9200/logstash-postback --output  logstash-postback_2017.01.17.json --limit 1000
 
elasticdump安装方法在下面

恢复

命令例子
elasticdump --input logstash-postback_2017.01.14.json --output http://127.0.0.1:9200/logstash-postback --limit 1000
详解
elasticdump命令需要单独安装。
--input 是指定输入,可指定文件,或需要备份的es集群。
--output是指定输出,可指定文件,或需要备份的es集群。
logstash-postback_2017.01.14.json  是指定的已备份的文件。
 http://127.0.0.1:9200/logstash-postback  是指定的要恢复数据的elastictic集群,IP加端口加索引名。注意,索引名是不带时间的。
--limit 1000 一次导入一千条数据,加快进度。
 
elasticdump命令安装
yum install npm
npm install elasticdump -g
命令安装完毕,可以测试。
可能会报出nodejs的版本之类的错误,你需要升级一下版本。
npm install -g n
n stable
至此可以使用。
 

删除索引

 php delete.php --index http://127.0.0.1:9200/<index> --start 2017-01-01 --end 2017-01-02  
--start 选择删除这个索引中指定日期的内容
--end 不传则默认删除一天,也就是start的那天
<index> 要删除的索引名
 
cat delete.php
 
<?php
date_default_timezone_set('Asia/Shanghai');

$longopts  = array(
    'index:',
    'query:',
    'start:',
    'end:',
);
$options = getopt('a', $longopts);

if(!isset($options['index'])) {
        fwrite(STDERR, "index必须设置索引地址
");
        exit(1);
}

$components = parse_url($options['index']);
if(!isset($components['path'])) {
        fwrite(STDERR, 'index不可为空');
        exit(1);
}

$host  = "{$components['scheme']}://{$components['host']}:{$components['port']}";
$index = basename($components['path']);

$query = isset($options['query']) ? $options['query'] : '{"query":{"match_all":{}}}';
if(isset($options['start'])) {
        $start_time = strtotime($options['start']);
        $start = date('Y-m-d', $start_time).'T00:00:00+0800';
        if(isset($options['end'])) {
                $end_time = strtotime($options['end']);
                $end = date('Y-m-d', $end_time).'T00:00:00+0800';
        } else {
                $end = date('Y-m-d', $start_time+86400).'T00:00:00+0800';
        }

        $field = strpos($index, 'analysis')!==false ? 'create_time' : 'date';
        $query = '{"size":1000,"_source":false,"query":{"filtered":{"filter":{"range":{"'.$field.'":{"gte":"'.$start.'","lt":"'.$end.'"}}}}}}';
}

$scroll_id = null;
$retry     = 0;
$num       = 0;
while(true) {
        if(is_null($scroll_id)) {
                $result = post("{$host}/{$index}/_search?scroll=2m", $query);
        } else {
                $result = get("{$host}/_search/scroll?scroll=2m&scroll_id={$scroll_id}");
        }

        $json = json_decode($result, true);
        if(!isset($json['_scroll_id'])) {
                fwrite(STDERR, "查询失败:索引-{$index} 起始-{$start}  截止-{$end}
");
                sleep(5);
                $retry++;
                if($retry>10) {
                        exit(4);
                }
        }

        $scroll_id = $json['_scroll_id'];
        $bulk = [];
        foreach($json['hits']['hits'] as $row) {
                unset($row['_score']);
                $bulk[] = json_encode(['delete'=>$row]);
                $num++;
        }

        if(count($json['hits']['hits'])==0)
        {
                break;
        }

        $check = post("{$host}/_bulk", implode("
", $bulk)."
");
        fwrite(STDOUT, "{$num}
");
        usleep(100000);
}

echo "deleted:{$num}
";

function get($url)
{
        $handle = curl_init();
        //curl_setopt($handle, CURLOPT_POST, 1);
        curl_setopt($handle, CURLOPT_HEADER, 0);
        curl_setopt($handle, CURLOPT_RETURNTRANSFER, 1);
        curl_setopt($handle, CURLOPT_URL, $url);
        //curl_setopt($handle, CURLOPT_POSTFIELDS, $data);
        return curl_exec($handle);
}

function post($url, $data)
{
        $handle = curl_init();
        curl_setopt($handle, CURLOPT_POST, 1);
        curl_setopt($handle, CURLOPT_HEADER, 0);
        curl_setopt($handle, CURLOPT_RETURNTRANSFER, 1);
        curl_setopt($handle, CURLOPT_URL, $url);
        curl_setopt($handle, CURLOPT_POSTFIELDS, $data);
        return curl_exec($handle);
}
 
 

索引增量备份

elasticdump增量备份有BUG,无规律的丢数据。
于是我们自己写了脚本来进行增量备份。
备份脚本分为两个部分,PHP脚本沟通elasticsearch来进行数据读取与备份。SHELL脚本来写作传参与PHP脚本
 
SHELL脚本如下
 
#!/bin/bash

#如果有参数,则第一个参数为备份日期,否则默认备份昨天数据
if [ -z "$1" ];then
        start=$(date +%Y-%m-%d --date '-1 day 00:00:00')
        end=$(date +%Y-%m-%d --date 'today 00:00:00')
else
        start=$(date +%Y-%m-%d --date $1)
        end=$(date +%Y-%m-%d --date "$1 +1 day 00:00:00")
fi

#如果是每月1号则加一天,解决时区导致的数据跨索引问题
if [ $(date +%d --date $end) -eq "01" ]
then
        end=$(date +%Y-%m-%d --date "$end +1 day 00:00:00")
fi

php esdump.php --input=http://127.0.0.1:9200/logstash-event-$(date +%Y.%m --date $start) --start $start --end $end  2>> backup_error_$start.log | gzip > event/logstash-event-$(date +%Y.%m_%d --date $start).json.gz  2>> backup_error_$start.log &
 
注:代码第18行的logstash-event替换为你要备份的索引名
 
PHP脚本如下(无需修改)
<?php
date_default_timezone_set('Asia/Shanghai');

$longopts  = array(
    'input:',
    'output:',
    'query:',
    'start:',
    'end:',
);
$options = getopt('a', $longopts);

if(!isset($options['input'])) {
    fwrite(STDERR, "input必须设置索引地址
");
    exit(1);
}

$check = get($options['input'].'/_count');
if($check===false) {
    fwrite(STDERR, "input索引地址无效:{$options['input']}
");
    exit(2);
}

$check = json_decode($check, true);
if(!isset($check['count'])) {
    fwrite(STDERR, "input索引地址无效:{$options['input']}
");
    exit(3);
}

$components = parse_url($options['input']);
$host  = "{$components['scheme']}://{$components['host']}:{$components['port']}";
$index = basename($components['path']);

$query = isset($options['query']) ? $options['query'] : '{"query":{"match_all":{}}}';
if(isset($options['start'])) {
    $start_time = strtotime($options['start']);
    $start = date('Y-m-d', $start_time).'T00:00:00+0800';
    if(isset($options['end'])) {
        $end_time = strtotime($options['end']);
        $end = date('Y-m-d', $end_time).'T00:00:00+0800';
    } else {
        $end = date('Y-m-d', $start_time+86400).'T00:00:00+0800';
    }

    $field = strpos($index, 'analysis')!==false ? 'create_time' : 'date';
    $query = '{"size":1000,"sort":{"'.$field.'":{"order":"asc"}},"query":{"filtered":{"filter":{"range":{"'.$field.'":{"gte":"'.$start.'","lt":"'.$end.'"}}}}}}';

    if(strpos($index, 'eventlogs')!==false) {
        $query = '{"size":1000,"sort":{"'.$field.'":{"order":"asc"}},"query":{"filtered":{"filter":{"bool":{'.
            '"must":[{"range":{"date":{"gte":"'.$start.'","lte":"'.$end.'"}}}],'.
                        '"must_not":[{"exists": {"field":"nsp3hq"}},{"exists": {"field":"q0i8u1"}},{"exists": {"field":"eyn916"}},{"exists": {"field":"20mqd8"}},'.
                        '{"exists": {"field":"wwbkux"}},{"exists": {"field":"r5ua96"}},{"exists": {"field":"easiz"}},{"exists": {"field":"dexusu"}},{"exists": {"field":"earts"}},'.
                        '{"exists": {"field":"ealu"}},{"exists": {"field":"ealf"}},{"exists": {"field":"eal"}},{"exists": {"field":"ears"}},{"exists": {"field":"ealuf"}},'.
                        '{"exists": {"field":"ealus"}},{"exists": {"field":"eaatf"}},{"exists": {"field":"enail"}},{"exists": {"field":"enuail"}},{"exists": {"field":"test"}}]'.
                        '}}}}}';
         }
}

$scroll_id = null;
$retry     = 0;
$num       = 0;
while(true) {
    if(is_null($scroll_id)) {
        $result = post("{$host}/{$index}/_search?scroll=2m", $query);
    } else {
        $result = get("{$host}/_search/scroll?scroll=2m&scroll_id={$scroll_id}");
    }

    $json = json_decode($result, true);
    if(!isset($json['_scroll_id'])) {
        fwrite(STDERR, "查询失败:索引-{$index} 起始-{$start}  截止-{$end}
");
        sleep(5);
        $retry++;
        if($retry>10) {
            exit(4);
        }
    }

    $scroll_id = $json['_scroll_id'];

    foreach($json['hits']['hits'] as $row) {
        fwrite(STDOUT, json_encode($row)."
");
        $num++;
    }

    if(count($json['hits']['hits'])==0)
    {
        break;
    }

    usleep(100000);
}

//校验条数是否一致
$query = json_decode($query, true);
unset($query['size'], $query['sort']);
$result = post("{$host}/{$index}/_count", json_encode($query));
$json = json_decode($result, true);
if(!isset($json['count']) or intval($json['count'])!==$num) {
    fwrite(STDERR, "校验失败:索引-{$index} 起始-{$start} 截止-{$end} 记录条数-{$json['count']} 导出条数-{$num}
");
}

function get($url)
{
    $handle = curl_init();
    //curl_setopt($handle, CURLOPT_POST, 1);
    curl_setopt($handle, CURLOPT_HEADER, 0);
    curl_setopt($handle, CURLOPT_RETURNTRANSFER, 1);
    curl_setopt($handle, CURLOPT_URL, $url);
    //curl_setopt($handle, CURLOPT_POSTFIELDS, $data);
    return curl_exec($handle);
}

function post($url, $data)
{
    $handle = curl_init();
    curl_setopt($handle, CURLOPT_POST, 1);
    curl_setopt($handle, CURLOPT_HEADER, 0);
    curl_setopt($handle, CURLOPT_RETURNTRANSFER, 1);
    curl_setopt($handle, CURLOPT_URL, $url);
    curl_setopt($handle, CURLOPT_POSTFIELDS, $data);
    return curl_exec($handle);
}

备份时执行shell脚本即可备份昨天的增量数据

谢土豪

如果有帮到你的话,请赞赏我吧!

本文为kerwin原创,转载请注明出处。

 http://www.cnblogs.com/kerwinC/p/6296675.html

 

原文地址:https://www.cnblogs.com/kerwinC/p/6296675.html