用swoole实现nginx日志解析

1.原技术路线解析

在nging配置中将日志信息交给syslog处理,rsyslog配置中将数据传递给了514端口解析,然后将解析好的数据传入elasticsearch中。

nginx配置

    server {
        listen 80; 
        listen [::]:80;
        server_name test.86dev.wrddns.com;
        
        # 以下两行将日志写入syslog
        access_log syslog:server=unix:/dev/log,facility=local5,tag=web_1,severity=info main;
        error_log syslog:server=unix:/dev/log,facility=local5,tag=web_1,severity=error warn;
        
        # ....其他配置
    }   

/etc/rsyslog.conf

# 配置文件,存放解析规则xxx.conf和ruleBase文件xx.rb
$IncludeConfig /etc/rsyslog.d/*.conf

# 配置将日志放到哪个端口解析
local5.* @10.3.19.86:514

在实际应用过程中有一些问题,不能和php上面的一些配置进行配合记录,解析规则不好配置,有些内容解析不好,所以探索使用新的技术路线。

2.新技术路线

尝试使用新技术路线,通过swoole起一个服务,然后监听9502端口,rsyslog将日志推向该端口,对日志进行解析后推入elasticsearch,此时可以获取到php端的一些配置。以下是大体思路

2.1 选择swoole的server类型。

这里不再赘述swoole的安装,首要考虑的问题是原推向514的协议类型。

先查看端口的协议类型

# root @ WENGINE in ~ [9:48:51] 
$ netstat -antup | grep 514 
udp        0      0 0.0.0.0:514                 0.0.0.0:*                               23560/rsyslogd      
udp        0      0 :::514                      :::*                                    23560/rsyslogd   

可以看到是udp协议,所以选用swoole的 upd服务

结合laravel的commands来编写服务端

<?php

namespace AppConsoleSwoole;

use IlluminateConsoleCommand;
use swoole_websocket_server;
use swoole_server;
use swoole_process;
use swoole_sock_udp;
use UAParserParser;
use GeoIp2DatabaseReader;
use WrdFrameworkModelsSysConfig;
use ElasticsearchClientBuilder;

class SwooleServer extends Command
{

    protected $signature = 'swoole-server start
                            {cmd=start : can use start}
                            {--daemon : set to run in daemonize mode}
                            ';

    protected $description = 'swoole server control';

    public $access_buffer = [];


    public function __construct()
    {
        parent::__construct();
    }

    public function handle()
    {
        $command = $this->argument('cmd');
        $option = $this->option('daemon');
        switch ($command) {
            case 'start':
                $this->initWs($option);
                break;
            default:
                $this->info('请按照下面格式输入命令:php artisan swoole-server {start}');
                break;
        }

    }
        
    public function initWs($daemonize = false) 
    {
        if ($daemonize) {
            $this->info('Starting Websocket server in daemon mode...');
        } else {
            $this->info('Starting Websocket server in interactive mode...');
        }

        $server = new swoole_server('0.0.0.0', 9502, SWOOLE_PROCESS, SWOOLE_SOCK_UDP);
        $server->set([
            'daemonize' => $daemonize,
            'log_file' => '/var/www/html/storage/logs/websocket.log',
            'worker_num' => 1,
            'task_worker_num' => 1,
        ]);

        $server->on('Packet', function($serv, $data, $clientInfo) 
        {
            $serv->task($data);
        });
        
        $server->on('Task', function ($serv, $task_id, $from_id, $data) {

            //通过正则表达式提取出需要的信息,不同的日志格式需要不同的正则,这里只写一种情况
            $rule = '/<d*>.*d{2}:d{2}:d{2}s[^s]*s[^s]*s(w*\_d*):s[Customize-format]/';

            preg_match($rule, $data, $matches);

            if (empty($matches)) {
                $this->writeLog($data); //记录下无法解析的日志,更正正则
                return false;
            }
            
            $vhost = $matches[1];
            $ip = $matches[2];
            //...更多参数
            
            $ua = $matches[12];
            //解析UA,这里使用的解析库https://github.com/ua-parser/uap-php
            $parser = Parser::create();
            $parser_ua = $parser->parse($ua);
            $browser = $parser_ua->ua->family;
            $os = $parser_ua->os->family;
            $device = $parser_ua->device->family; 

            //解析IP,这里使用的解析库https://github.com/maxmind/GeoIP2-php
            $reader = new Reader(public_path().'/geoip2/GeoLite2-City.mmdb');

            try{
                $record = $reader->city($ip);
                $country = $record->country->isoCode;
                $continent = $record->continent->names['zh-CN'];
                $subdivisions = $record->mostSpecificSubdivision->names['zh-CN'];
                $city = $record->city->names['zh-CN'];
                $geoip = array(
                    'location' => array($record->location->longitude, $record->location->latitude)
                );

            } catch (Exception $e) {
               //如果ip没有被收录(项目有很多内网ip),则拿数据库中的提前配置项,进行解析
            }
            
            $res = array(
                'vhost' => $vhost,
                'ip' => $ip,
                 // ...其它项
                'token' => $token,
                'browser' => $browser,
                'os' => $os,
                'device' => $device,
                'continent' => $continent,
                'country' => $country,
                'subdivisions' => $subdivisions,
                'city' => $city,
                'geoip' => $geoip,
            );

            $this->access_buffer[] = $res;

            //每隔一段时间,写入到elasticsearch
            if (count($this->access_buffer) > 0 && time() - strtotime($this->access_buffer[0]['@timestamp']) > 10) {
                $insert_data = $this->access_buffer;
                $this->access_buffer = [];

                $this->insertElasticsearch('access', $insert_data);
            }

            //return 数据 给 Finish
            return "Task {$task_id}'s result";
        });
        
        $server->on('Finish', function ($serv,$task_id, $data) {
            echo "Task {$task_id} finish
";
        });

        $server->start();
                
    }
    
    public function insertElasticsearch($type='access', $data){
        foreach($data as $item){
            $params['body'][] = [
                'index' => [
                    '_index' => $type.'-'.date('Y.m.d', time()),
                    '_type' => 'events',
                ]
            ];
            $params['body'][] = $item;
        }

        extract(Config::get('app.elastic', [
            'host' => '127.0.0.1',
            'port' => '9200'
        ]));

        //往elasticsearch写数据,这里使用的库https://github.com/elastic/elasticsearch-php
        $helper = ClientBuilder::create()
            ->setHosts([$host.":".$port])
            ->build();

        if (!empty($params['body'])) {
            $response = $helper->bulk($params);
            //var_dump($response);
        }

    }
    
    public function writeLog($info){
        $alert_message = array(
            'error' => '此条信息未能命中日志格式,未写入elasticsearch',
            'info' => $info
        );
        Log::alert($alert_message);
    }

2.2 通过更改nginx的main日志格式简化正则表达式

nginx的配置中的

log_format main [$proxy_add_x_forwarded_for]-[$remote_user]-[$time_local]-[$request]-[$status]-[$bytes_sent]-[$http_host]-[$http_referer]-[$http_user_agent]-[$cookie_wengine_ticket]-[archer-main];

加了特殊符号,并且最后给了一个标识,这样能提高命中准确度

2.3 更改rsyslog的端口

local5.* @10.3.19.86:9502

2.4 其它一些问题

可以通过nc来检测swoole的udp服务是否通

yum install -y nc
# root @ WENGINE in ~ [10:17:07] C:130
$ nc -u 127.0.0.1 9502
ceshi

可以写supervisor的脚本来使swoole服务器一直启动

[program:swooleserver]
directory = /var/www/html
command=php artisan swoole-server
user=apache
autostart=true
startsecs=2
autorestart=true
redirect_stderr=true
stopsignal=INT
stderr_logfile_maxbytes=1MB
stderr_logfile_backups=10
stderr_capture_maxbytes=1MB
stderr_events_enabled=false

原文地址:https://www.cnblogs.com/redirect/p/10066616.html