日志采集(logback+kafka+elasticsearch+kibana)

这是基于  logback=>kafka=>elasticsearch=>kibana的一套完整的日志采集系统,这里做java配置部分的记录

一.pom.xml

     <!--kafka依赖 -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.1.6.RELEASE</version>
        </dependency>
        <!--logback-kafka-appender依赖 -->
        <dependency>
            <groupId>com.github.danielwegener</groupId>
            <artifactId>logback-kafka-appender</artifactId>
            <version>0.2.0-RC2</version>
        </dependency>
        <dependency>
            <groupId>net.logstash.logback</groupId>
            <artifactId>logstash-logback-encoder</artifactId>
            <version>4.9</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
            <exclusions>
                <exclusion>
                    <groupId>ch.qos.logback</groupId>
                    <artifactId>logback-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.2.3</version>
        </dependency>

二.logback-spring.xml

<?xml version="1.0" encoding="utf-8"?>
<configuration>
    <appender name="consoleLog"
        class="ch.qos.logback.core.ConsoleAppender">
        <layout class="ch.qos.logback.classic.PatternLayout">
            <!-- <pattern>%d - %msg%n</pattern> -->
            <pattern>[%-5p] %d{yyy MMM dd HH:mm:ss} -->[%F:%L] %m%n</pattern>
        </layout>
    </appender>

    <appender name="fileInfoLog"
        class="ch.qos.logback.core.rolling.RollingFileAppender">
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>ERROR</level>
            <onMatch>DENY</onMatch>
            <onMismatch>ACCEPT</onMismatch>
        </filter>
        <encoder>
            <pattern>[%-5p] %d{yyy MMM dd HH:mm:ss} -->[%F:%L] %m%n</pattern>
        </encoder>
        <!--滚动策略 -->
        <rollingPolicy
            class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!--路径 -->
            <fileNamePattern>logs/info.%d.log</fileNamePattern>
            <maxHistory>30</maxHistory>
            <cleanHistoryOnStart>true</cleanHistoryOnStart>
        </rollingPolicy>
    </appender>

    <appender name="fileErrorLog"
        class="ch.qos.logback.core.rolling.RollingFileAppender">
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>ERROR</level>
        </filter>
        <encoder>
            <pattern>[%-5p] %d{yyy MMM dd HH:mm:ss} -->[%F:%L] %m%n</pattern>
        </encoder>
        <!--滚动策略 -->
        <rollingPolicy
            class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!--路径 -->
            <fileNamePattern>logs/error.%d.log</fileNamePattern>
            <maxHistory>30</maxHistory>
            <cleanHistoryOnStart>true</cleanHistoryOnStart>
        </rollingPolicy>
    </appender>
    <appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
        <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
            <providers>
                <pattern>
                    <pattern>
                        {
                        "tags": ["product_logs_kafka","product","weichat"],
                        "project": "weichat",
                        "logger": "%logger",
                        "timestamp": "%date{"yyyy-MM-dd'T'HH:mm:ss,SSSZ"}",
                        "class": "%class",
                        "contextName": "%cn",
                        "file": "%file",
                        "line": "%line",
                        "msg": "%msg",
                        "method": "%method",
                        "level": "%level",
                        "thread": "%thread"
                        }
                    </pattern>
                </pattern>
            </providers>
        </encoder>
        <topic>product_logs_kafka</topic>
        <!-- we don't care how the log messages will be partitioned -->
        <keyingStrategy
            class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" />
        <!-- use async delivery. the application threads are not blocked by logging -->
        <deliveryStrategy
            class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
        <!-- each <producerConfig> translates to regular kafka-client config (format: 
            key=value) -->
        <!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs -->
        <!-- bootstrap.servers is the only mandatory producerConfig -->
        <producerConfig>bootstrap.servers=node-str-corePBOn:9092,node-str-coremrKo:9092,node-str-corejIQc:9092
        </producerConfig>
        <!-- don't wait for a broker to ack the reception of a batch. -->
        <producerConfig>acks=0</producerConfig>
        <!-- wait up to 1000ms and collect log messages before sending them as 
            a batch -->
        <producerConfig>linger.ms=1000</producerConfig>
        <!-- even if the producer buffer runs full, do not block the application 
            but start to drop messages -->
        <producerConfig>max.block.ms=0</producerConfig>
        <!-- define a client-id that you use to identify yourself against the kafka 
            broker -->
        <producerConfig>client.id=weichat-logback-relaxed</producerConfig>
    </appender>

    <root level="info">
        <!-- <appender-ref ref="consoleLog" />  -->
        <appender-ref ref="fileInfoLog" />
        <appender-ref ref="fileErrorLog" />
        <appender-ref ref="kafkaAppender" />
    </root>
</configuration>
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
原文地址:https://www.cnblogs.com/KdeS/p/13539270.html