在线教育 (实时需求)

1项目需求架构

1.1 项目需求概览

  一、数据采集平台搭建

  二、Kafka中间件准备

  三、下游Spark Streaming对接Kafka接收数据,实现vip个数统计、页面之间的跳转率、做题正确率与掌握度、播放时长统计及历史区间统计的实时计算功能。

1.2 项目框架设计

1.2.1 技术选型

  一、数据存储:KafkaMySQL

  二、数据处理:Spark

  三、其他组件:Zookeeper

1.2.2 流程设计

1.2.3 代码框架

2章 需求

2.1环境准备

  在本机三台虚拟机上分别搭建好zookeeperkafka。(注:CDH6.3.2kafka的版本为2.2.1),创建所需topic(如果使用--bootstrap-server hadoop102:9092创建,则消费者的offset保存在kafka中,如果使用--zookeeper hadoop:2181创建,则消费者的offset保存在zk中)

kafka-topics --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 10  --topic register_topic
kafka-topics --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 10  --topic qz_log
kafka-topics --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 10  --topic page_topic
kafka-topics --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 10  --topic course_learn

2.2原始数据格式及kafka对应topic

2.2.1实时统计注册人数 - register.log

  kafka对应topic:register_topic

  数据格式:

字段

字段说明

1

用户id

 

2

平台id

1:PC

2:APP

3:Other

3

创建时间

 

  数据示例:

# 数据使用/t作为分隔符
7188    2    2019-07-16 16:01:55
7189    1    2019-07-16 16:01:55
7190    1    2019-07-16 16:01:55
7191    1    2019-07-16 16:01:55
7192    1    2019-07-16 16:01:55
7193    3    2019-07-16 16:01:55
7194    1    2019-07-16 16:01:55
7195    3    2019-07-16 16:01:55

2.2.2做题正确率数与知识点掌握度数据格式 - qz.log

  kafka对应topicqz_log

  数据格式:

字段

字段说明

1

用户id

 

2

课程id

 

3

知识点id

 

4

题目id

 

5

是否正确

0错误

1正确

6

创建时间

 

  数据示例:

# 数据使用/t作为分隔符
1006    504    8    7    0    2019-07-12 11:17:45
1007    505    16    9    1    2019-07-12 11:17:45
1002    505    29    3    0    2019-07-12 11:17:45
1006    504    10    5    0    2019-07-12 11:17:45
1001    502    28    8    0    2019-07-12 11:17:45
1006    505    27    0    1    2019-07-12 11:17:45
1004    503    25    3    0    2019-07-12 11:17:45
1007    504    12    1    0    2019-07-12 11:17:45
1006    501    7    6    0    2019-07-12 11:17:45

2.2.3商品页面到订单页,订单页到支付页数据格式 - page.log

  kafka对应topicpage_topic

  数据格式:

序号

字段

字段说明

1

app_id

平台id

1:PC

2:APP

3:Other

2

device_id

平台id

3

distinct_id

唯一标识

4

ip

用户ip地址

5

last_event_name

上一事件名称

6

last_page_id

上一页面id

7

next_event_name

下一事件名称

8

next_page_id

下一页面id

9

page_id

当前页面id

1:商品课程页

2:订单页面

3:支付页面

10

server_time

服务器时间

11

uid

用户id

  数据示例:

# 数据为json格式
{"app_id":"2","device_id":"100","distinct_id":"23a6d4a7-6903-46a4-bce2-a8317693da45","event_name":"-","ip":"123.235.113.225","last_event_name":"-","last_page_id":"0","next_event_name":"-","next_page_id":"2","page_id":"1","server_time":"-","uid":"0"}
# json格式化之后
{
  "app_id": "2",
  "device_id": "100",
  "distinct_id": "23a6d4a7-6903-46a4-bce2-a8317693da45",
  "event_name": "-",
  "ip": "123.235.113.225",
  "last_event_name": "-",
  "last_page_id": "0",
  "next_event_name": "-",
  "next_page_id": "2",
  "page_id": "1",
  "server_time": "-",
  "uid": "0"
}

2.2.4实时统计学员播放视频各时长 - course_learn.log

  Kafka对应topiccourse_learn

  数据格式:

序号

字段

字段说明

1

biz

唯一标识

2

chapterid

章节id

3

cwareid

课件id

4

edutypeid

辅导id

5

pe

视频播放结束区间

6

ps

视频播放开始区间

7

sourceType

播放平台

8

speed

播放倍速

9

subjectid

主题id

10

te

视频播放结束时间(时间戳)

11

ts

视频播放开始时间(时间戳)

12

uid

用户id

13

videoid

视频id

  数据示例:

# 数据为json格式
{"biz":"34e4b8fe-476c-4c7e-8e2c-274ec00bb5c9","chapterid":"2","cwareid":"2","edutypeid":"1","pe":"56","ps":"24","sourceType":"PC","speed":"2","subjectid":"1","te":"1563352144131","ts":"1563352128131","uid":"219","videoid":"6"}
# json格式化之后
{
  "biz": "34e4b8fe-476c-4c7e-8e2c-274ec00bb5c9",
  "chapterid": "2",
  "cwareid": "2",
  "edutypeid": "1",
  "pe": "56",
  "ps": "24",
  "sourceType": "PC",
  "speed": "2",
  "subjectid": "1",
  "te": "1563352144131",
  "ts": "1563352128131",
  "uid": "219",
  "videoid": "6"
}

2.3模拟数据采集

  将准备好的log文件使用kafka生产者代码发送信息到对应的topic中。log文件均在资料包的.2.资料1日志数据4_实时-kafka主题数据中)

数据说明

日志文件

Kafka topic

代码文件

注册日志数据

register.log

register_topic

 

做题数据

qz.log

qz_log

 

商品页面数据

page.log

page_topic

 

视频播放时长数据

course_learn.log

course_learn

 

  注:如果windows下没有安装hadoop环境,先windows配置环境变量。(代码运行时候会寻找环境变量中的HADOOP_HOME,然后找%HADOOP_HOME%/bin/winutils.exe,所以我们不需要下载全部的代码,只需要把bin包配置好,能让系统找到%HADOOP_HOME%/bin/winutils.exe即可)

  该文件为hadoop-3.0.0bin目录压缩包,之前在讲HDFS时也有说过,具体请看:https://www.cnblogs.com/LzMingYueShanPao/p/14649564.html

2.4 ip解析工具测试

  1)ip解析本地库文件,网盘地址:

  2)测试ip解析工具代码:

import org.lionsoul.ip2region.{DbConfig, DbSearcher}

object IpTest {
  def main(args: Array[String]): Unit = {
    val ipSearch = new DbSearcher(new DbConfig(), this.getClass.getResource("/ip2region.db").getPath)
    val region = ipSearch.binarySearch("113.88.85.106").getRegion
    println(region)
    val city = region.split("\|")(2)
    println(city)
  }
}

  3)测试结果:

2.实时统计注册人员信息

2.5.1 MySQL建表语句

CREATE TABLE `offset_manager` (
  `groupid` varchar(50) DEFAULT NULL,
  `topic` varchar(50) DEFAULT NULL,
  `partition` int(11) DEFAULT NULL,
  `untiloffset` mediumtext,
  UNIQUE KEY `offset_unique` (`groupid`,`topic`,`partition`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

2.5.2表结构说明

表名:

offset_manager

主键:

字段名

字段说明

1

groupid

Kafka consumergroupid

2

topic

Kafka consumertopic

3

partition

Kafka consumerpartition

4

untiloffset

最新的消费offset(由上面的GTP进行定位)

2.5.3业务流程说明

  用户使用网站或APP进行注册,后台实时收集数据传输KafkaSpark Streaming进行对接统计,实时统计注册人数。

2.5.4需求说明

  需求1:实时统计注册人数,批次为3秒一批,使用updateStateBykey算子计算历史数据和当前批次的数据总数,仅此需求使用updateStateBykey,后续需求不使用updateStateBykey

  需求2:每6秒统统计一次1分钟内的注册数据,不需要历史数据(提示:reduceByKeyAndWindow算子)

  需求3:观察对接数据,尝试进行调优。

2.6实时统计学员做题正确率与知识点掌握度

2.6.1 MySQL建表语句

CREATE TABLE `qz_point_detail` (
  `userid` int(11) DEFAULT NULL,
  `courseid` int(11) DEFAULT NULL,
  `pointid` int(11) DEFAULT NULL,
  `qz_sum` int(11) DEFAULT NULL,
  `qz_count` int(11) DEFAULT NULL,
  `qz_istrue` int(11) DEFAULT NULL,
  `correct_rate` double(4,2) DEFAULT NULL,
  `mastery_rate` double(4,2) DEFAULT NULL,
  `createtime` datetime DEFAULT NULL,
  `updatetime` datetime DEFAULT NULL,
  UNIQUE KEY `qz_point_detail_unique` (`userid`,`courseid`,`pointid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `qz_point_history` (
  `userid` int(11) DEFAULT NULL,
  `courseid` int(11) DEFAULT NULL,
  `pointid` int(11) DEFAULT NULL,
  `questionids` text,
  `createtime` datetime DEFAULT NULL,
  `updatetime` datetime DEFAULT NULL,
  UNIQUE KEY `qz_point_set_unique` (`userid`,`courseid`,`pointid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

2.6.2表结构说明

表名:

qz_point_detail

主键:

`qz_point_detail_unique` (`userid`,`courseid`,`pointid`)

字段名

字段说明

1

userid

用户id

2

courseid

课程id

3

pointid

知识点id

4

qz_sum

做题总数(与历史进行累加)

5

qz_count

当前批次做题个数(去重)

6

qz_istrue

做题正确题目总数(与历史进行累加)

7

correct_rate

正确率

题目正确率=qz_istrue/qz_count

8

mastery_rate

知识点掌握程度=(题目正确率*题目完成度)

题目完成度=当前知识点去重完成题目数/当前知识点题目总数10

9

createtime

创建时间

10

updatetime

更新时间

表名:

qz_point_history

主键:

`qz_point_set_unique` (`userid`,`courseid`,`pointid`)

字段名

字段说明

1

userid

用户id

2

courseid

课程id

3

pointid

知识点id

4

questionids

题目id(使用,”作为拼接)

5

createtime

创建时间

6

updatetime

更新时间

2.6.3业务流程说明

  用户在网站或APP进行做题,做完题点击交卷按钮,程序将做题记录提交,传输到Kafka中,下游Spark Streaming对接kafka实现实时计算做题正确率和掌握度的计算,将正确率和掌握度存入MySQL中,用户点击交卷后刷新页面能立马(思考:这个更新的速度取决于什么?)看到自己做题的详情。

2.6.4需求说明

  需求1:要求Spark Streaming保证数据不丢失,每秒100条处理速度,需要手动维护偏移量

  需求2:同一个用户做在同一门课程同一知识点下做题需要去重,需要根据历史数据进行去重并且记录去重后的做题id与个数

  需求3:计算知识点正确率正确率计算公式:做题正确总个数/做题总数)保留两位小数

  需求4:计算知识点掌握度,(知识点掌握度=去重后的做题个数/当前知识点总题数(已知10题)*当前知识点的正确率

2.7实时统计商品页到订单页,订单页到支付页转换率

2.7.1 MySQL建表语句

CREATE TABLE `page_jump_rate` (
  `id` INT(11) NOT NULL AUTO_INCREMENT,
  `last_page_id` INT(11) DEFAULT NULL,
  `page_id` INT(11) DEFAULT NULL,
  `next_page_id` INT(11) DEFAULT NULL,
  `num` BIGINT(20) DEFAULT NULL,
  `jump_rate` VARCHAR(10) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `page_jum_rate_unique` (`page_id`)
) ENGINE=INNODB AUTO_INCREMENT=394 DEFAULT CHARSET=utf8;
CREATE TABLE `tmp_city_num_detail` (
  `id` INT(11) NOT NULL AUTO_INCREMENT,
  `province` VARCHAR(10) DEFAULT NULL,
  `num` BIGINT(20) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `tmp_cityp_num_index_province` (`province`)
) ENGINE=INNODB AUTO_INCREMENT=4191 DEFAULT CHARSET=utf8;
CREATE TABLE `top_city_num` (
  `id` INT(11) NOT NULL AUTO_INCREMENT,
  `province` VARCHAR(10) DEFAULT NULL,
  `num` BIGINT(20) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=INNODB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;

2.7.2表结构说明

表名:

page_jump_rate

主键:

`id`

唯一键:

`page_jum_rate_unique` (`page_id`)

字段名

字段说明

1

id

用户id

2

last_page_id

上一页面id 1:商品课程页 2:订单页面 3:支付页面

3

page_id

当前页面id 1:商品课程页 2:订单页面 3:支付页面

4

next_page_id

下一页面id 1:商品课程页 2:订单页面 3:支付页面

5

num

 

6

jump_rate

页面跳转率

表名:

tmp_city_num_detail

主键:

`id`

唯一键:

`tmp_cityp_num_index_province` (`province`)

字段名

字段说明

1

id

自增id

2

province

省份

3

num

各省数据统计结果

表名:

top_city_num

主键:

`id`

唯一键:

`page_jum_rate_unique` (`page_id`)

字段名

字段说明

1

id

自增id

2

province

省份

3

num

各省数据统计结果(只取前3

2.7.3业务流程说明

  用户浏览课程首页点击下订单,跳转到订单页面,再点击支付跳转到支付页面进行支付,收集各页面跳转json数据,解析json数据计算各页面点击数和转换率,计算top3点击量按地区排名(ip字段,需要根据历史数据累计)

2.7.4需求说明

  需求1:计算首页(商品详情页)总浏览数、订单页总浏览数、支付页面总浏览数

  需求2:计算商品课程页面到订单页的跳转转换率、订单页面到支付页面的跳转转换率

  需求3:根据ip得出相应省份,展示出top3省份的点击数,需要根据历史数据累加

  注:此处默认首页为商品页,如果当前页为商品页则无需计算转化率,记为100%,为了简化需求,该页面跳转逻辑默认为1号页面跳转至2号页面,2号页面才能跳转3号页面。3号不能跳转回2号和1号。即页面是按序号顺序前进。

2.8实时统计学员播放视频各时长

2.8.1 MySQL建表语句

CREATE TABLE `video_learn_detail` (
  `userid` INT(11) NOT NULL DEFAULT '0',
  `cwareid` INT(11) NOT NULL DEFAULT '0',
  `videoid` INT(11) NOT NULL DEFAULT '0',
  `totaltime` BIGINT(20) DEFAULT NULL,
  `effecttime` BIGINT(20) DEFAULT NULL,
  `completetime` BIGINT(20) DEFAULT NULL,
  PRIMARY KEY (`userid`,`cwareid`,`videoid`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

CREATE TABLE `chapter_learn_detail` (
  `chapterid` INT(11) NOT NULL DEFAULT '0',
  `totaltime` BIGINT(20) DEFAULT NULL,
  PRIMARY KEY (`chapterid`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

CREATE TABLE `cwareid_learn_detail` (
  `cwareid` INT(11) NOT NULL DEFAULT '0',
  `totaltime` BIGINT(20) DEFAULT NULL,
  PRIMARY KEY (`cwareid`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;


CREATE TABLE `edutype_learn_detail` (
  `edutypeid` INT(11) NOT NULL DEFAULT '0',
  `totaltime` BIGINT(20) DEFAULT NULL,
  PRIMARY KEY (`edutypeid`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

CREATE TABLE `sourcetype_learn_detail` (
  `sourcetype_learn` VARCHAR(10) NOT NULL DEFAULT '',
  `totaltime` BIGINT(20) DEFAULT NULL,
  PRIMARY KEY (`sourcetype_learn`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

CREATE TABLE `subject_learn_detail` (
  `subjectid` INT(11) NOT NULL DEFAULT '0',
  `totaltime` BIGINT(20) DEFAULT NULL,
  PRIMARY KEY (`subjectid`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

CREATE TABLE `video_interval` (
  `userid` INT(11) NOT NULL DEFAULT '0',
  `cwareid` INT(11) NOT NULL DEFAULT '0',
  `videoid` INT(11) NOT NULL DEFAULT '0',
  `play_interval` TEXT,
  PRIMARY KEY (`userid`,`cwareid`,`videoid`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

2.8.2表结构说明

表名:

video_learn_detail

主键:

`userid`,`cwareid`,`videoid`

字段名

字段说明

1

userid

用户id

2

cwareid

课件id

3

videoid

视频id

4

totaltime

播放总时长:(te-ts)/1000

5

effecttime

有效总时长:[((te-ts)/1000)/(pe-ps)] * complete_duration

6

completetime

完成总时长:(pe-ps)需要对历史数据进行对比并去重

表名:

chapter_learn_detail

主键:

chapterid

字段名

字段说明

1

chapterid

章节id

2

totaltime

统计总时长

表名:

cwareid_learn_detail

主键:

cwareid

字段名

字段说明

1

cwareid

课件id

2

totaltime

统计总时长

表名:

edutype_learn_detail

主键:

edutypeid

字段名

字段说明

1

edutypeid

辅导id

2

totaltime

统计总时长

表名:

sourcetype_learn_detail

主键:

sourcetype

字段名

字段说明

1

sourcetype

播放设备来源类型

2

totaltime

统计总时长

表名:

subject_learn_detail

主键:

subjectid

字段名

字段说明

1

subjectid

主题id

2

totaltime

统计总时长

表名:

video_interval

主键:

`userid`,`cwareid`,`videoid`

字段名

字段说明

1

userid

用户id

2

cwareid

课件id

3

videoid

视频id

4

play_interval

播放历史区间

2.8.3业务流程说明

  用户在线播放视频进行学习课程,后台记录视频播放开始区间和结束区间,及播放开始时间和播放结束时间,后台手机数据传输kafka需要计算用户播放视频总时长、有效时长、完成时长,及各维度总播放时长。

2.8.4需求说明

  需求1:计算各章节下的播放总时长(chapterid聚合统计播放总时长)

  需求2:计算各课件下的播放总时长(cwareid聚合统计播放总时长)

  需求3:计算各辅导下的播放总时长(edutypeid聚合统计播放总时长)

  需求4:计算各播放平台下的播放总时长(sourcetype聚合统计播放总时长)

  需求5:计算各科目下的播放总时长(subjectid聚合统计播放总时长)

  需求6:计算用户学习视频的播放总时长、有效时长、完成时长,需求记录视频播历史区间,对于用户多次学习的播放区间不累计有效时长和完成时长。

  播放总时长计算:(te-ts)/1000  向下取整  单位:秒

  完成时长计算:根据pe-ps 计算 需要对历史数据进行去重处理

  有效时长计算:根据te-ts 除以pe-ps 先计算出播放每一区间需要的实际时长 * 完成时长

3章 思考

  (1Spark Streaming下每个stage的耗时由什么决定

  (2Spark Streaming task发生数据倾斜如何解决

  (3Spark Streaming操作MySQL时,相同维度的数据如何保证线程安全问题

  (4)如何保证kill Spark Streaming任务的时候不丢失数据

  (5)如何保证Spark Streaming的第一次启动和kill后第二次启动时据不丢失数据

  (6Spark Streaming下如何正确操作MySQL(如何正确使用连接)

  (7MySQL建表时 注意索引问题

  (8)代码地址:https://gitee.com/LzMingYueShanPao/online-education.git

原文地址:https://www.cnblogs.com/LzMingYueShanPao/p/15156211.html