实时分析系统--采集模块

1章 需求概述

1.1 实时需求与离线需求的比较

  离线需求(T+1):一般是根据前一日的数据生成报表等数据,虽然统计指标、报表繁多,但是对时效性不敏感。

  实时需求(T+0):主要侧重于对当日数据的实时监控,通常业务逻辑相对离线需求简单一下,统计指标也少一些,但是更注重数据的时效性,以及用户的交互性。

1.2 需求说明

1.2.1 日用户首次登录(日活)分时趋势图,昨日对比

1.2.2 交易额及分时趋势图,昨日对比

1.2.3 物券功能风险预警

1.2.4 用户购买明细灵活分析功能

2 架构分析

2.1 架构图一

2.2 架构图二

3 基础工程搭建

3.1 父工程

3.1.1 创建Maven工程 gmall_sparkstream

3.1.2 pom.xml

<properties>
        <spark.version>3.0.0</spark.version>
        <scala.version>2.12.12</scala.version>
        <log4j.version>1.2.17</log4j.version>
        <slf4j.version>1.7.22</slf4j.version>

        <fastjson.version>1.2.47</fastjson.version>
        <httpclient.version>4.5.5</httpclient.version>
        <httpmime.version>4.3.6</httpmime.version>

        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <!--此处放日志包,所有项目都要引用-->
        <!-- 所有子项目的日志框架 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <!-- 具体的日志实现 -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>${httpclient.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpmime</artifactId>
                <version>${httpmime.version}</version>
            </dependency>

            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>${fastjson.version}</version>
            </dependency>

            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.12</artifactId>
                <version>${spark.version}</version>
            </dependency>

            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.12</artifactId>
                <version>${spark.version}</version>
            </dependency>

            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.12</artifactId>
                <version>${spark.version}</version>
            </dependency>

            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.12</artifactId>
                <version>${spark.version}</version>
            </dependency>

            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
                <version>${spark.version}</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

3.2 子模块之公共模块

3.2.1 创建模块gmall_common

3.2.2 pom.xml

<dependencies>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpmime</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>
    </dependencies>

3.3 子模块之模拟数据模块

3.3.1 创建模块gmall-mocker

3.3.2 pom.xml

<dependencies>
        <dependency>
            <groupId>com.yuange</groupId>
            <artifactId>gmall_common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

3.3.3 工具类

  1)RandomDate

package com.yuange.gmall.utils;

import java.util.Date;
import java.util.Random;

/**
 * @作者:袁哥
 * @时间:2021/6/30 12:50
 * 产生随机日期
 */
public class RandomDate {

    Long logDateTime =0L;
    int maxTimeStep=0 ;

    public RandomDate (Date startDate , Date  endDate,int num) {
        Long avgStepTime = (endDate.getTime()- startDate.getTime())/num;
        this.maxTimeStep=avgStepTime.intValue()*2;
        this.logDateTime=startDate.getTime();
    }

    public  Date  getRandomDate() {
        int  timeStep = new Random().nextInt(maxTimeStep);
        logDateTime = logDateTime+timeStep;
        return new Date( logDateTime);
    }
}

  2)RanOpt

package com.yuange.gmall.utils;

/**
 * @作者:袁哥
 * @时间:2021/6/30 12:51
 * 为value赋值一个weight
 */
public class RanOpt<T>{
    T value ;
    int weight;

    public RanOpt ( T value, int weight ){
        this.value=value ;
        this.weight=weight;
    }

    public T getValue() {
        return value;
    }

    public int getWeight() {
        return weight;
    }
}

  3)RandomOptionGroup

package com.yuange.gmall.utils;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

/**
 * @作者:袁哥
 * @时间:2021/6/30 12:52
 * 创建一个List<RanOpt>,从集合中根据每个RanOpt的weight返回指定RanOpt的value,weight越大,返回value的机率越高
 */
public class RandomOptionGroup<T> {

    int totalWeight=0;

    List<RanOpt> optList=new ArrayList();

    public   RandomOptionGroup(RanOpt<T>... opts) {
        for (RanOpt opt : opts) {
            totalWeight += opt.getWeight();
            for (int i = 0; i <opt.getWeight() ; i++) {
                optList.add(opt);
            }
        }
    }

    public RanOpt<T> getRandomOpt() {
        int i = new Random().nextInt(totalWeight);
        return optList.get(i);
    }


    public static void main(String[] args) {
        RanOpt[] opts= {new RanOpt("zhang3",20),new RanOpt("li4",30),new RanOpt("wang5",50)};
        RandomOptionGroup randomOptionGroup = new RandomOptionGroup(opts);
        for (int i = 0; i <10 ; i++) {
            System.out.println(randomOptionGroup.getRandomOpt().getValue());
        }
    }
}

  4)RandomNum

package com.yuange.gmall.utils;

import java.util.Random;

/**
 * @作者:袁哥
 * @时间:2021/6/30 12:52
 * 产生一个从  [fromNum,toNum]的随机整数
 */
public class RandomNum {
    public static int getRandInt(int fromNum,int toNum){
        return fromNum + new Random().nextInt(toNum-fromNum+1);
    }
}

3.3.4 发送日志工具类

  LogUploader:通过http方法发送到采集系统的web端口

package com.yuange.gmall.main;

import java.io.OutputStream;

import java.net.HttpURLConnection;
import java.net.URL;

/**
 * @作者:袁哥
 * @时间:2021/6/30 12:53
 * 负责上传数据到springboot 开发的webapp
 * 启动日志:  "type":"startup"
 *         {"area":"beijing","uid":"392","os":"ios","ch":"appstore","appid":"gmall2019","mid":"mid_23","type":"startup","vs":"1.1.1"}
 * 事件日志:  "type":"event"   "evid":"coupon":领券
 *         {"area":"beijing","uid":"392","itemid":37,"npgid":32,"evid":"coupon","os":"ios","pgid":7,"appid":"gmall2019","mid":"mid_23","type":"event"}
 */
public class LogUploader {

    public static void sendLogStream(String log){
        try{
            //不同的日志类型对应不同的URL
            //访问nginx,由nginx将url反向代理到  hadoop102:8089/gmall_logger/log 或 hadoop103:8089/gmall_logger/log 或 hadoop104:8089/gmall_logger/log
            //访问hadoop102:81时,端口号和主机名会自动替换为 hadoop102:8089,hadoop103:8089 或 hadoop104:8089
            URL url  =new URL("http://localhost:8089/gmall_logger/log");

            HttpURLConnection conn = (HttpURLConnection) url.openConnection();

            //设置请求方式为post
            conn.setRequestMethod("POST");

            //时间头用来供server进行时钟校对的
            conn.setRequestProperty("clientTime",System.currentTimeMillis() + "");

            //允许上传数据
            conn.setDoOutput(true);

            //设置请求的头信息,设置内容类型为JSON
            //模拟使用表单发送
            conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");

            System.out.println("upload" + log);

            //输出流
            OutputStream out = conn.getOutputStream();

            //logString是参数名
            out.write(("logString="+log).getBytes());
            out.flush();
            out.close();
            //获取响应码,如果是200,代表ok,否则都是发送失败
            int code = conn.getResponseCode();
            System.out.println(code);
        }
        catch (Exception e){
            e.printStackTrace();
        }
    }
}

3.3.5 日志生成类

  JsonMocker

package com.yuange.gmall.main;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yuange.gmall.utils.RanOpt;
import com.yuange.gmall.utils.RandomDate;
import com.yuange.gmall.utils.RandomNum;
import com.yuange.gmall.utils.RandomOptionGroup;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;

/**
 * @作者:袁哥
 * @时间:2021/6/30 12:55
 * 造数据
 */
public class JsonMocker {

    private int startupNum = 100000;
    int eventNum = 200000;

    private RandomDate logDateUtil = null;


    private RanOpt[] osOpts = {new RanOpt("ios", 3), new RanOpt("andriod", 7)};
    private RandomOptionGroup<String> osOptionGroup = new RandomOptionGroup(osOpts);
    private Date startTime = null;
    private Date endTime = null;

    private RanOpt[] areaOpts = {new RanOpt("beijing", 10),
            new RanOpt("shanghai", 10), new RanOpt("guangdong", 20), new RanOpt("hebei", 5),
            new RanOpt("heilongjiang", 5), new RanOpt("shandong", 5), new RanOpt("tianjin", 5),
            new RanOpt("shan3xi", 5), new RanOpt("shan1xi", 5), new RanOpt("sichuan", 5)
    };
    private RandomOptionGroup<String> areaOptionGroup = new RandomOptionGroup(areaOpts);

    private String appId = "gmall2019";

    private RanOpt[] vsOpts = {new RanOpt("1.2.0", 50), new RanOpt("1.1.2", 15),
            new RanOpt("1.1.3", 30),
            new RanOpt("1.1.1", 5)
    };

    private RandomOptionGroup<String> vsOptionGroup = new RandomOptionGroup(vsOpts);

    private RanOpt[] eventOpts = {new RanOpt("addFavor", 10), new RanOpt("addComment", 30),new RanOpt("addCart", 20), new RanOpt("clickItem", 40), new RanOpt("coupon", 45)};

    private RandomOptionGroup<String> eventOptionGroup = new RandomOptionGroup(eventOpts);

    private RanOpt[] channelOpts = {new RanOpt("xiaomi", 10), new RanOpt("huawei", 20),
            new RanOpt("wandoujia", 30), new RanOpt("360", 20), new RanOpt("tencent", 20)
            , new RanOpt("baidu", 10), new RanOpt("website", 10)
    };

    private RandomOptionGroup<String> channelOptionGroup = new RandomOptionGroup(channelOpts);

    private RanOpt[] quitOpts = {new RanOpt(true, 20), new RanOpt(false, 80)};

    private RandomOptionGroup<Boolean> isQuitGroup = new RandomOptionGroup(quitOpts);

    private JsonMocker() {

    }

    public JsonMocker(String startTimeString, String endTimeString, int startupNum, int eventNum) {
        try {
            startTime = new SimpleDateFormat("yyyy-MM-dd").parse(startTimeString);
            endTime = new SimpleDateFormat("yyyy-MM-dd").parse(endTimeString);
        } catch (ParseException e) {
            e.printStackTrace();
        }
        logDateUtil = new RandomDate(startTime, endTime, startupNum + eventNum);
    }

    private String initEventLog(String startLogJson) {
            /*`type` string   COMMENT '日志类型',
             `mid` string COMMENT '设备唯一 表示',
            `uid` string COMMENT '用户标识',
            `os` string COMMENT '操作系统',
            `appid` string COMMENT '应用id',
            `area` string COMMENT '地区' ,
            `evid` string COMMENT '事件id',
            `pgid` string COMMENT '当前页',
            `npgid` string COMMENT '跳转页',
            `itemid` string COMMENT '商品编号',
            `ts` bigint COMMENT '时间',*/

        JSONObject startLog = JSON.parseObject(startLogJson);
        String mid = startLog.getString("mid");
        String uid = startLog.getString("uid");
        String os = startLog.getString("os");
        String appid = this.appId;
        String area = startLog.getString("area");
        String evid = eventOptionGroup.getRandomOpt().getValue();
        int pgid = new Random().nextInt(50) + 1;
        int npgid = new Random().nextInt(50) + 1;
        int itemid = new Random().nextInt(50);
        //  long ts= logDateUtil.getRandomDate().getTime();

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("type", "event");
        jsonObject.put("mid", mid);
        jsonObject.put("uid", uid);
        jsonObject.put("os", os);
        jsonObject.put("appid", appid);
        jsonObject.put("area", area);
        jsonObject.put("evid", evid);
        jsonObject.put("pgid", pgid);
        jsonObject.put("npgid", npgid);
        jsonObject.put("itemid", itemid);
        return jsonObject.toJSONString();
    }

    private String initStartsupLog() {
            /*`type` string   COMMENT '日志类型',
             `mid` string COMMENT '设备唯一标识',
             `uid` string COMMENT '用户标识',
             `os` string COMMENT '操作系统', ,
             `appId` string COMMENT '应用id', ,
             `vs` string COMMENT '版本号',
             `ts` bigint COMMENT '启动时间', ,
             `area` string COMMENT '城市' */

        //默认是造500个设备
        String mid = "mid_" + RandomNum.getRandInt(1, 500);
        String uid = "" + RandomNum.getRandInt(1, 500);
        String os = osOptionGroup.getRandomOpt().getValue();
        String appid = this.appId;
        String area = areaOptionGroup.getRandomOpt().getValue();
        String vs = vsOptionGroup.getRandomOpt().getValue();
        //long ts= logDateUtil.getRandomDate().getTime();
        String ch = os.equals("ios") ? "appstore" : channelOptionGroup.getRandomOpt().getValue();

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("type", "startup");
        jsonObject.put("mid", mid);
        jsonObject.put("uid", uid);
        jsonObject.put("os", os);
        jsonObject.put("appid", appid);
        jsonObject.put("area", area);
        jsonObject.put("ch", ch);
        jsonObject.put("vs", vs);
        return jsonObject.toJSONString();
    }

    //生成日志
    private static void genLog() {
        JsonMocker jsonMocker = new JsonMocker();
        jsonMocker.startupNum = 1000000;

        //默认会连续造100w条
        for (int i = 0; i < jsonMocker.startupNum; i++) {
            //造启动日志
            String startupLog = jsonMocker.initStartsupLog();
            // 发数据
            jsonMocker.sendLog(startupLog);
            while (!jsonMocker.isQuitGroup.getRandomOpt().getValue()) {
                //如果能进入此循环,顺便造一条事件日志
                String eventLog = jsonMocker.initEventLog(startupLog);
                // 发数据
                jsonMocker.sendLog(eventLog);
            }
            try {
                //每间隔多久造一条日志,机器性能不好,调大此参数
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    //发送日志数据
    private void sendLog(String log) {
        //负责上传数据到springboot 开发的webapp
        LogUploader.sendLogStream(log);
    }

    public static void main(String[] args) {
        genLog();
    }
}

4章 日志采集系统搭建

4.1 子模块之日志采集模块---(本地测试)

4.1.1 SpringBoot简介

  Spring Boot 是由 Pivotal 团队提供的全新框架,其设计目的是用来简化新 Spring 应用的初始搭建以及开发过程。 该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的配置

  1)有了SpringBoot我们就可以

    (1)不再需要那些千篇一律,繁琐的xml文件。

    (2)内嵌Tomcat,不再需要外部的Tomcat。

    (3)更方便的和各个第三方工具(mysql,redis,elasticsearch,dubbo,kafka等等整合),而只要维护一个配置文件即可。

  2SpringBoot和ssm的关系:

    SpringBoot整合了SpringMVC,Spring等核心功能。也就是说本质上实现功能的还是原有的Spring,SpringMVC的包,但是SpringBoot单独包装了一层,这样用户就不必直接对SpringMVCSpring等,在xml中配置。

  3)没有xml,我们要去哪配置

    (1SpringBoot实际上就是把以前需要用户手工配置的部分,全部作为默认项。除非用户需要额外更改不然不用配置。这就是所谓的:“约定大于配置”。

    (2)如果需要特别配置的时候,去修改application.properties

4.1.2 快速搭建

  1)新建module,在project下增加一个Module,选择Spring Initializr,名称为 gmall_logger

  2)项目继承关系调整(尖叫提示:这段操作从自己项目中剪切和复制,不要从此处复制)

    (1)将gmall_logger模块pom.xml中的以下内容剪切至父工程 gmall_sparkstream 的 pom.xml 中(若你的SpringBoot版本为高版本,请修改为与我一致!!!

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.10.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    (2)将common模块pom.xml中的以下内容复制至父工程 gmall_sparkstream 的 pom.xml 中

<parent>
        <artifactId>gmall_sparkstream</artifactId>
        <groupId>com.yuange</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    (3)父工程 gmall_sparkstream 的 pom.xml 中添加以下内容

<modules>
        <module>gmall_common</module>
        <module>gmall_mocker</module>
        <module>gmall_logger</module>
    </modules>

    (4)gmall_logger pom.xml 中添加如下内容:

<properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>com.yuange</groupId>
            <artifactId>gmall_common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

4.1.3 SpringBoot整合Kafka

  1)增加log4j.properties

#定义了一个ConsoleAppender 向控制台输出日志,红色打印
log4j.appender.yuange.MyConsole=org.apache.log4j.ConsoleAppender
log4j.appender.yuange.MyConsole.target=System.err
log4j.appender.yuange.MyConsole.layout=org.apache.log4j.PatternLayout
# 年-月-日 时:分:秒  10个占位符  日志级别  (全类名:方法名) - 消息 换行
log4j.appender.yuange.MyConsole.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %10p (%c:%M) - %m%n

#定义了一个DailyRollingFileAppender 将日志写入到一个按照日期滚动的文件中。 当天的数据,会写入到app.log中,
# 过一天后,app.log会滚动,滚动的旧文件 以 app.log.昨天的日期 ,滚动的新文件还叫app.log
log4j.appender.yuange.File=org.apache.log4j.DailyRollingFileAppender
log4j.appender.yuange.File.file=D:\applog\app.log

log4j.appender.yuange.File.DatePattern='.'yyyy-MM-dd
log4j.appender.yuange.File.layout=org.apache.log4j.PatternLayout
log4j.appender.yuange.File.layout.ConversionPattern=%m%n

#代表指定哪个类的logger使用什么级别和appender进行日志输出  全类名可能需要修改
log4j.logger.com.yuange.gmall_logger.controller.LogController=info,yuange.File,yuange.MyConsole

  2application.propeties

#配置访问路径前缀
server.context-path=/gmall_logger
#配置端口号
server.port=8089

#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092,

# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

  3)在common模块(定义常量)

package com.yuange.constants;

/**
 * @作者:袁哥
 * @时间:2021/6/30 13:09
 */
public class Constants {

    public static final String GMALL_STARTUP_LOG = "GMALL_STARTUP_LOG"; //gmall_startup_log
    public static final String GMALL_EVENT_LOG = "GMALL_EVENT_LOG";     //gmall_event_log

}

  4LogController

package com.yuange.gmall_logger.controller;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yuange.constants.Constants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @作者:袁哥
 * @时间:2021/6/30 10:44
 */
@Slf4j          //自动帮你创建  Logger log = Logger.getLogger(LogController.class);
@RestController //Controller+responsebody
public class LogController {

    @Autowired
    private KafkaTemplate<String,String> producer;

    @RequestMapping(value = "/log")     //value的值需要和gmall_mock中LogUploader的url对应
    public String handleLog(String logString){  //形参的属性名需要和gmall_mock中LogUploader的请求参数名一致:out.write(("logString="+log).getBytes());
        //在服务器端为日志生成时间
        JSONObject jsonObject = JSON.parseObject(logString);
        jsonObject.put("ts",System.currentTimeMillis());
        //将日志数据落盘,使用Log4j
        log.info(jsonObject.toJSONString());

        //将数据生产到kafka
        if ("event".equals(jsonObject.getString("type"))){
            producer.send(Constants.GMALL_EVENT_LOG,jsonObject.toString());
        }else {
            producer.send(Constants.GMALL_STARTUP_LOG,jsonObject.toString());
        }
        return "success";
    }
}

  5)测试

    (1)启动GmallLoggerApplication

    (2)启动 Zookeeper 和 kafka 集群

    (3)使用 kafka Tool 连接 kafka,创建两个主题:GMALL_EVENT_LOG  和 GMALL_STARTUP_LOG 

    (4)启动日志Mock的生成类 :JsonMocker

    (5)查看磁盘是否生成了文件

    (6)kafka进行测试

bin/kafka-console-consumer.sh --bootstrap-server  hadoop102:9092 --topic GMALL_EVENT_LOG

4.2 日志采集模块打包部署---(单机部署)

4.2.1 修改log4j中的配置文件

log4j.appender.yuange.File.file=/opt/module/gmall_realtime/app.log

4.2.2 把打好的gmall_logger-1.0-SNAPSHOT.jar拷贝到Linux 的 /opt/software路径下

4.2.3 启动gmall_logger-1.0-SNAPSHOT.jar

java -jar /opt/software/gmall_logger-1.0-SNAPSHOT.jar

  注意:若出现权限问题,是因为Linux默认不允许非root账号使用1024以下的端口,所以改换为8080端口

java -jar /opt/software/gmall_logger-1.0-SNAPSHOT.jar –server.port=8089  >/dev/null  2>&1  &

4.2.4 再次测试Kafka消费

bin/kafka-console-consumer.sh --bootstrap-server  hadoop102:9092 --topic GMALL_EVENT_LOG

4.3 搭建日志采集集群---(集群部署)

4.3.1 Nginx简介

  Nginx ("engine x") 是一个高性能的HTTP和反向代理服务器,特点是占有内存少,并发能力强,事实上nginx的并发能力确实在同类型的网页服务器中表现较好,中国大陆使用nginx网站用户有:百度、京东、新浪、网易、腾讯、淘宝等。

4.3.2 Nginx功能

  1什么是反向代理?先看什么是正向代理

    再看什么是反向代理

  2)负载均衡

    负载均衡策略: 轮询、权重、备机

  3)动静分离

4.3.3 安装

  1) yum安装依赖包

sudo yum -y install openssl openssl-devel pcre pcre-devel    zlib zlib-devel gcc gcc-c++

  2)资料下载地址:https://pan.baidu.com/s/1-1VBJsADn0GUf00XLIAu0g  提取码:84df

  3) 上传nginx-1.12.2.tar.gz至/opt/software

  4)解压至/opt/module/

tar -zxvf /opt/software/nginx-1.12.2.tar.gz -C /opt/module/

  5)进入解压缩目录,执行

cd /opt/module/nginx-1.12.2
./configure --prefix=/opt/module/nginx
make && make install

  6)启动、关闭命令nginxroot用户!!!

    (1)启动命令:在/opt/module/nginx/sbin目录下执行 

sudo ./nginx

    (2)关闭命令:在/opt/module/nginx/sbin目录下执行 

sudo ./nginx -s stop

    (3)重新加载命令:在/opt/module/nginx/sbin目录下执行 

sudo ./nginx -s reload

    注意:如果启动时报错,则执行

sudo ln -s /usr/local/lib/libpcre.so.1 /lib64

4.3.4 修改配置文件

vim /opt/module/nginx/conf/nginx.conf
#修改如下内容
upstream logserver{
      server    hadoop102:8089 weight=1;
      server    hadoop103:8089 weight=1;
      server    hadoop104:8089 weight=1;
    }

    server {
    #如果你的80端口被占用了,可以改端口
        listen       81;
        server_name  logserver;

        location / {
            root   html;
            index  index.html index.htm;
            #启动反向代理
            proxy_pass http://logserver;
            proxy_connect_timeout 10;
        }

  重新启动Nginx并在网页访问

4.3.5 集群脚本

  1)修改 gmall_mocker 中的 LogUploader ,将URL修改为以下

//访问nginx,由nginx将url反向代理到  hadoop102:8089/gmall_logger/log 或 hadoop103:8089/gmall_logger/log 或 hadoop104:8089/gmall_logger/log
            //访问hadoop102:81时,端口号和主机名会自动替换为 hadoop102:8089,hadoop103:8089 或 hadoop104:8089
            URL url  =new URL("http://hadoop102:81/gmall_logger/log");

  2)将打好的jar包上传至hadoop102中的 /opt/module/gmall/ ,并分发至hadoop103hadoop104

mkdir /opt/module/gmall
xsync /opt/module/gmall/gmall_logger-1.0-SNAPSHOT.jar

  2)编写脚本

vim /home/atguigu/bin/startgmall.sh
#!/bin/bash
JAVA_BIN=/opt/module/java/bin/java
PROJECT=gmall
APPNAME=gmall_logger-1.0-SNAPSHOT.jar
SERVER_PORT=8089

case $1 in
 "start")
   {
    for i in hadoop102 hadoop103 hadoop104
    do
     echo "===============$i==============="
    ssh $i  "$JAVA_BIN -Xms32m -Xmx64m  -jar /opt/module/$PROJECT/$APPNAME --server.port=$SERVER_PORT >/dev/null 2>&1  &"
    done
  };;

  "stop")
  {
    for i in hadoop102 hadoop103 hadoop104
    do
     echo "===============$i==============="
     ssh $i "ps -ef | grep $APPNAME | grep -v grep | awk '{print $2}' | xargs kill" >/dev/null 2>&1
    done
  };;
   esac

  3)赋予可执行权限

chmod +x /home/atguigu/bin/startgmall.sh

4.3.6 测试

  1)启动ZookeeperKafka

  2)开启两个消费者分别消费GMALL_EVENT_LOG GMALL_STARTUP_LOG 主题的数据;

bin/kafka-console-consumer.sh --bootstrap-server  hadoop102:9092 --topic GMALL_EVENT_LOG
bin/kafka-console-consumer.sh --bootstrap-server  hadoop102:9092 --topic GMALL_STARTUP_LOG

  3开启三台日志服务器:

startgmall.sh start

  4)root用户下执行/opt/module/nginx/sbin/nginx开启代理服务器nginx

sudo /opt/module/nginx/sbin/nginx

  5)分别在hadoop102,hadoop103,hadoop104上监控/opt/module/gmall_realtime/app.log数据;

tail -f /opt/module/gmall_realtime/app.log

  6IDEA中开启mock模块生成数据,注意请求地址:http://hadoop102:81/gmall_logger/log

  7)观察第2步及第5步是否有数据生成

原文地址:https://www.cnblogs.com/LzMingYueShanPao/p/14954134.html