python中使用pyspark 读取和整理日志数据并将数据写入到es中去

代码:

import re
import datetime
from pyspark.sql import SparkSession
from pyspark import SparkContext
from elasticsearch import Elasticsearch
spark=SparkSession.builder.appName("lz").getOrCreate()
sc = SparkContext.getOrCreate()
es = Elasticsearch()
month_map = {'Jan': '1', 'Feb': '2', 'Mar':'3', 'Apr':'4', 'May':'5', 'Jun':'6', 'Jul':'7',
    'Aug':'8',  'Sep': '9', 'Oct':'10', 'Nov': '11', 'Dec': '12'}

log_data = sc.textFile("/Desktop/data_doc/data_Log/sshlogin/03.txt") #使用spark读取本地日志文件


for b in log_data.toLocalIterator(): 
    #以迭代的方式来把一条条数据读取出来进行正则匹配,并最终将 dict作为body写入到es中去
    # e='Ambari:Mar  2 02:14:16 ambari sshd[16716]: Accepted password for root from 172.21.202.174 port 59886 ssh2'#日志格式
    log_group=re.search('^(S+):(w{3})s+(d{1,2})s(d{2}:d{2}:d{2})s(S+)s(S+)[(d+)]:s(.+)',b)
    if log_group:
        year='2019'
        try:
            logtime = year+'-'+month_map[log_group.group(2)]+'-'+log_group.group(3)+' '+log_group.group(4) #将字段拼接成年月日的格式
            logtime = datetime.datetime.strptime(logtime,'%Y-%m-%d %H:%M:%S')
        except Exception as e:
           pass
        row = dict(_hostname=log_group.group(1), #将数据组成一个字典  k,v
                  syslog_timestamp=logtime,
                  hostname=log_group.group(5),
                  program=log_group.group(6),
                  pid=log_group.group(7),
                  msg = log_group.group(8))
        if re.match('^Accepted password for',row['msg']) or re.match('^Accepted publickey for',row['msg']) :

            msg_a=re.search('Acceptedsw+sfors(S+)sfroms(d{2,3}.d{2,3}.d{2,3}.d{2,3})sports(d+)',row['msg'])
            row['login_success']=True
            row['login_success_msg']={'username':msg_a.group(1),'user_ip':msg_a.group(2),'user_port':msg_a.group(3)}
        es.index(index='data_log02',doc_type='test02',body=row) #将数据写入到es中去
    else:
        break

转自:https://www.cnblogs.com/wangkun122/articles/10936938.html

原文地址:https://www.cnblogs.com/tjp40922/p/13125182.html