离线数据仓库项目(电商)--启动/事件日志采集

需求

日志文件 => Flume => HDFS => ODS

步骤

总体思路

1.taildir source监控多个目录;

2.编写自定义拦截器,不同来源的数据加上不同个标志

3.hdfs sink根据标志写文件到hdfs

Agent配置(source 、channel 、sink)

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /data/lagoudw/conf/startlog_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /data/lagoudw/logs/start/.*log
a1.sources.r1.headers.f1.logtype = start
a1.sources.r1.filegroups.f2 = /data/lagoudw/logs/event/.*log
a1.sources.r1.headers.f2.logtype = event

# 自定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = cn.lagou.dw.flume.interceptor.LogTypeInterceptor$Builder

# memory channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000

# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%{logtime}/
a1.sinks.k1.hdfs.filePrefix = startlog.
a1.sinks.k1.hdfs.fileType = DataStream

# 配置文件滚动方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1

# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

自定义拦截器

编码完成后,打包上传服务器,放置在$FLUME_HOME/lib下

package cn.lagou.dw.flume.interceptor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.compress.utils.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.interceptor.Interceptor;
import org.junit.Test;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class LogTypeInterceptor implements Interceptor {
    @Override
    public void initialize() {
    }

    @Override
// 逐条处理event
    public Event intercept(Event event) {
// 获取 event 的 body
        String eventBody = new String(event.getBody(), Charsets.UTF_8);
// 获取 event 的 header
        Map<String, String> headersMap = event.getHeaders();
// 解析body获取json串
        String[] bodyArr = eventBody.split("\s+");
        try {
            String jsonStr = bodyArr[6];
// 解析json串获取时间戳
            String timestampStr = "";
            JSONObject jsonObject =
                    JSON.parseObject(jsonStr);
            if (headersMap.getOrDefault("logtype", "").equals("start")) {
// 取启动日志的时间戳
                timestampStr = jsonObject.getJSONObject("app_active").getString("time");

            } else if (headersMap.getOrDefault("logtype", "").equals("event")) {
// 取事件日志第一条记录的时间戳
                JSONArray jsonArray = jsonObject.getJSONArray("lagou_event");
                if (jsonArray.size() > 0) {
                    timestampStr = jsonArray.getJSONObject(0).getString("time");
                }
            } // 将时间戳转换为字符串 "yyyy-MM-dd"
// 将字符串转换为Long
            long timestamp = Long.parseLong(timestampStr);
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
            Instant instant = Instant.ofEpochMilli(timestamp);
            LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
            String date = formatter.format(localDateTime);
// 将转换后的字符串放置header中
            headersMap.put("logtime", date);
            event.setHeaders(headersMap);
        } catch (Exception e) {
            headersMap.put("logtime", "Unknown");
            event.setHeaders(headersMap);
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> lstEvent = new ArrayList<>();
        for (Event event : events) {
            Event outEvent = intercept(event);
            if (outEvent != null) {
                lstEvent.add(outEvent);
            }
        }
        return lstEvent;
    }

    @Override
    public void close() {
    }


    public static class Builder implements
            Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new LogTypeInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }
}

测试

启动Agent,拷贝日志,检查HDFS文件

# 清理环境
rm -f /data/lagoudw/conf/startlog_position.json
rm -f /data/lagoudw/logs/start/*.log
rm -f /data/lagoudw/logs/event/*.log

# 启动 Agent
flume-ng agent --conf /opt/apps/flume-1.9/conf --conf-file /data/lagoudw/conf/flume-log2hdfs4.conf -name a1 -Dflume.root.logger=INFO,console

# 拷贝日志
cd /data/lagoudw/logs/source
cp event0802.log ../event/
cp start0802.log ../start/

# 检查HDFS文件
hdfs dfs -ls /user/data/logs/event
hdfs dfs -ls /user/data/logs/start

生产中,常用以下方式启动

# 生产环境中用以下方式启动Agent
nohup flume-ng agent --conf /opt/apps/flume-1.9/conf --conffile /data/lagoudw/conf/flume-log2hdfs3.conf -name a1 -
Dflume.root.logger=INFO,LOGFILE > /dev/null 2>&1 &
  • nohup,该命令允许用户退出帐户/关闭终端之后继续运行相应的进程
  • /dev/null,代表linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称黑洞
  • 标准输入0,从键盘获得输入 /proc/self/fd/0
  • 标准输出1,输出到屏幕(控制台) /proc/self/fd/1
  • 错误输出2,输出到屏幕(控制台) /proc/self/fd/2
  • >/dev/null 标准输出1重定向到 /dev/null 中,此时标准输出不存在,没有任何地方能够找到输出的内容
  • 2>&1 错误输出将会和标准输出输出到同一个地方
  • >/dev/null 2>&1 不会输出任何信息到控制台,也不会有任何信息输出到文件中

日志数据采集小结

  • 使用taildir source监控指定的多个目录,可以给不同的目录日志加上不同的header
  • 在每个目录中可以使用正则匹配多个文件
  • 使用自定义拦截器,主要功能是从json串中获取时间戳,加到event的header中
  • hdfs sink使用event header中的信息写数据(控制写文件的位置)
  • hdfs文件的滚动方式:基于文件大小、event数量、时间
  • 调节flume jvm内存的分配
原文地址:https://www.cnblogs.com/aloneme/p/15080615.html