ELK数据批量导入

                                                                        数据批量导入

• 使用 _bulk 批量导入数据

– 批量导入数据使用 POST 方式,数据格式为 json,url编码使用 data-binary

– 导入含有 index 配置的 json 文件

gzip d logs.jsonl.gz

curl -XPOST 'http://192.168.4.14:9200/_bulk' --data-binary

@logs.jsonl

gzip d shakespeare.json.gz

curl -XPOST 'http://192.168.4.14:9200/_bulk' --data-binary

@shakespeare.json

练习

1)下载解压

[root@esk06 ~]# gzip -d accounts.json.gz

[root@esk06 ~]# gzip -d logs.jsonl.gz

[root@esk06 ~]# gzip -d shakespeare.json.gz

2)•使用 _bulk 批量导入数据

批量导入数据使用 POST 方式,数据格式为 json,url

编码使用 data-binary

导入含有 index 配置的 json 文件

curl -X POST 'http://192.168.1.35:9200/_bulk' --data-binary @accounts.json

[curl -X POST 'http://192.168.1.35:9200/_bulk' --data-binary @shakespeare.json

 curl -X POST 'http://192.168.1.35:9200/_bulk' --data-binary @logs.jsonl

logstash 部分

logstash 是什么

logstash是一个数据采集、加工处理以及传输的工具

logstash 特点:

– 所有类型的数据集中处理

– 丌同模式和格式数据的正常化

– 自定义日志格式的迅速扩展

– 为自定义数据源轻松添加插件

logstash

Logstash 依赖 java 环境,需要安装 java-1.8.0-openjdk

Logstash 没有默认的配置文件,需要手劢配置

logstash 安装在 /opt/logstash 目录下

rpm -ivh logstash-2.3.4-1.noarch.rpm

logstash 部分

codec 类插件

input{

stdin{ codec => "json" }

}

filter{ }

output{

stdout{ codec => "rubydebug" }

}

– 我们输入普通数据和 json 对比

{"a": 1, "c": 3, "b": 2}logstash 部分

codec 类插件

– 练习 output input 配置

– 练习 在 input 丌指定类型 json 输出结果

– 练习 在 output 丌指定 rubydebug 的输出结果

– 同时指定以后的输出结果logstash 部分

• 练习 input file 插件

file{

start_position => "beginning"

sincedb_path => "/var/lib/logstash/sincedb-access"

path => [/tmp/alog, /tmp/blog]

type => 'filelog'

}

sincedb_path 记录读取文件的位置

start_position 配置第一次读取文件从什么地方开始logstash 部分

• 练习 input tcp udp 插件

tcp{

host => "0.0.0.0"

port => 8888

type => "tcplog"

}

udp{

host => "192.168.4.16"

port => 9999

type => "udplog"

}logstash 部分

tcp & udp 练习

– 使用 shell 脚本,tcp 指定端口发送数据

function sendmsg(){

if (( $# == 4 )) && [ $1 == "tcp" -o $1 == "udp" ];then

exec 9<>/dev/$1/$2/$3

echo "$4" >&9

exec 9<&-

else

echo "$0 (tcp|udp) ipaddr port msg"

fi

}logstash 部分

tcp & udp 练习

– 发送 tcp 数据

sendmsg tcp 192.168.4.10 8888 tcp msg

– 发送 udp 数据

sendmsg udp 192.168.4.10 9999 udp msglogstash 部分

syslog 插件练习

syslog{

host => "192.168.4.10"

port => 514

type => "syslog"

}

rsyslog.conf 配置向进程发送数据

local0.info

@@192.168.4.10:514

– 写 syslog ,查看状态

logger -p local0.info -t test_logstash 'test message'logstash 部分

filter grok插件

– 解析各种非结构化的日志数据插件

grok 使用正则表达式把飞结构化的数据结构化

– 在分组匹配,正则表达式需要根据具体数据结构编写

– 虽然编写困难,但适用性极广

– 几乎可以应用于各类数据

grok{

match => [message,%{IP:ip}, (?<key>reg)]

}logstash 部分

grok 正则分组匹配

– 匹配 ip 时间戳 和 请求方法

"(?<ip>(d+.){3}d+) S+ S+

(?<time>.*])s+"(?<method>[A-Z]+)"]

– 使用正则宏

%{IPORHOST:clientip} %{HTTPDUSER:ident} %{USER:auth}

[%{HTTPDATE:timestamp}] "%{WORD:verb}

– 最终版本

%{COMMONAPACHELOG} "(?<referer>[^"]+)"

"(?<UA>[^"]+)"logstash 部分

input redis 插件

redis{

host => 'redis-server'

port => '6379'

data_type => 'list'

key => 'lb'

codec => 'json'

}

– 生产环境往往理由 redis 来做缓冲,这里给出配置logstash 部分

output ES 插件

if [type] == "filelog"{

elasticsearch {

hosts => ["192.168.4.15:9200"]

index => "weblog"

flush_size => 2000

idle_flush_time => 10

}}

– 调试成功后,把数据写入 ES 集群

案例1

1)改写配置文件  //注意防火墙

[root@localhost logstash]# vim logstash.conf

input {

  tcp {

    port => 8888

    mode => "server"

    type => "tcplog"

  }

  udp {

    port => 8888

    type => "udplog"

  }

}

filter {}

output {

    stdout { codec => "rubydebug" }

}

 [root@localhost logstash]# /opt/logstash/bin/logstash -f logstash.conf  //执行    

2)真机写入脚本                                            验证是否有xibhjhkj

  [root@redhat ~]# cat aa

function sendmsg(){

  if [ "$1" == "tcp" -o "$1" == "udp" ];then

    exec 9<>/dev/$1/192.168.1.117/8888

    echo "$2"  >&9

    exec 9<&-

  else

    echo "$0 tcp|udp msg"

  fi

}

 [root@redhat ~]#. aa

 [root@redhat ~]# sendmsg udp xibhjhkj   //发送

 

案例2

1)监听日志,收集信息

input{

        file {

        path => ["/var/log/secure"]  //ssh的登入日志

        sincedb_path => "/dev/null"

        start_position => "beginning"

        type => "filelog"

        }

        tcp{

        port => 8888

        mode => "server"

        type => "tcplog"

        }

        udp {

        port => 8888

        type => "udplog"

        }

        syslog {

        port => 514

        type => "syslog"

        }

}

filter{          //收集内容的正则表达式

        grok {

        match => { "message" =>

"(?<rip>[0-9.]+).*[(?<time>.+)].*"(?<method>[A-Z]+) (?<url>S+)

(?<PROTO>.+)" (?<res>d+) (?<size>d+) "(?<ref>[^"]+)"

"(?<agent>.+)"" }

        }

}

output{

  stdout{codec => "rubydebug" }

}

原文地址:https://www.cnblogs.com/qingbai/p/11957593.html