ELK获取用户真实IP

原理:在filebeat这台服务器上的nginx中获取到客户端真实IP($clientRealIp),    然后在访问日志中添加"$clientRealIp"字段。
1. 通过map获取到用户真实IP,并调整日志格式,增加$clientRealIp段
http {
        map $http_x_forwarded_for  $clientRealIp {
        ""      $remote_addr;
        ~^(?P<firstAddr>[0-9.]+),?.*$  $firstAddr;
    }
    
    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$clientRealIp" '
                      '$upstream_addr $upstream_response_time $upstream_status';
    }        
    
2. 在logstash的filter里,对日志进行过滤。
    2.1 自定义日志过滤匹配规则
    vi /data/elk/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.1.1/patterns/grok-patterns
    #NGINXPHONEACCESS
    NGINXPHONEACCESS %{IPORHOST:remote_addr} - %{USER:remote_user} [%{HTTPDATE:time_local}] "%{WORD:request_method} %{URIPATHPARAM:request_path} HTTP/%{NUMBER:http_version}" %{INT:http_status} %{INT:body_bytes_sent} "%{DATA:http_referer}" "%{DATA:http_user_agent}" "%{DATA:user_real_ip}" %{IPPORT:upstream_addr_port} %{NUMBER:upstream_response_time} %{INT:upstream_status}
    2.2 在filter中过滤
        if[type]== "phone-proxy-nginx-access" {
                grok {  
                        match => { "message" => "%{NGINXPHONEACCESS}" }
                }

                 mutate {
                        remove_field => ["message"]
                }

                 mutate {
                        convert => ["upstream_response_time", "float"]
                         convert => ["body_bytes_sent", "integer"]
                }

                geoip {
                        source => "user_real_ip"
                        target => "geoip"
                        database => "/data/elk/logstash/vendor/bundle/jruby/1.9/gems/logstash-filter-geoip-4.2.1-java/vendor/GeoLite2-City.mmdb"
                        add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
                        add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
                }

                mutate {
                        convert => [ "[geoip][coordinates]", "float"]
                }

        }

    2.3 经过上面的过滤后,将会在日志中添加geoip信息,例如这样:
    {
      "@version" => "1",
    "@timestamp" => "2015-01-01T22:15:13.000Z",
          "host" => "iMac-de-Consulthys.local",
          "path" => "/home/kibana/Documents/external_noise.log",
          "type" => "external_noise",
     "timestamp" => "Jan 1 23:15:13",
        "action" => "drop",
     "logsource" => "%LOGSOURCE%",
     "interface" => ">eth1",
          "rule" => "7",
      "rule_uid" => "{C1336766-9489-4049-9817-50584D83A245}",
           "src" => "218.8.245.123",
           "dst" => "%DSTIP%",
         "proto" => "tcp",
       "product" => "VPN-1&FireWall-1",
       "service" => "2967",
        "s_port" => "6000",
         "geoip" => {
                      "ip" => "218.8.245.123",
           "country_code2" => "CN",
           "country_code3" => "CHN",
            "country_name" => "China",
          "continent_code" => "AS",
             "region_name" => "08",
               "city_name" => "Harbin",
                "latitude" => 45.75,
               "longitude" => 126.64999999999998,
                "timezone" => "Asia/Harbin",
        "real_region_name" => "Heilongjiang",
                "location" => [
            [0] 126.64999999999998,
            [1] 45.75
        ],
             "coordinates" => [
            [0] 126.64999999999998,
             [1] 45.75
        ]
    }
    }
    
   

原文地址:https://www.cnblogs.com/zhaojonjon/p/7294115.html