第一章 双流合并
除了事实表与维表进行合并形成宽表,还需要事实表与事实表进行合并形成更大的宽表。
1 双流合并的问题
由于两个流的数据是独立保存,独立消费,很有可能同一业务的数据,分布在不同的批次。因为join算子只join同一批次的数据。如果只用简单的join流方式,会丢失掉不同批次的数据。
2 解决策略
一种 利用滑动窗口进行join 然后再进行去重
第二种 把数据存入缓存 ,关联时进行join后 再去查询缓存中的数据,来弥补不同批次的问题。
3 代码实现
case class
case class OrderDetailWide( var order_detail_id:Long =0L, var order_id: Long=0L, var order_status:String=null, var create_time:String=null, var user_id: Long=0L, var sku_id: Long=0L, var sku_price: Double=0D, var sku_num: Long=0L, var sku_name: String=null, var benefit_reduce_amount:Double =0D , var original_total_amount:Double =0D , var feight_fee:Double=0D, var final_total_amount: Double =0D , var final_detail_amount:Double=0D,
var if_first_order:String=null,
var province_name:String=null, var province_area_code:String=null,
var user_age_group:String=null, var user_gender:String=null,
var dt:String=null,
var spu_id: Long=0L, var tm_id: Long=0L, var category3_id: Long=0L, var spu_name: String=null, var tm_name: String=null, var category3_name: String=null
) { def this(orderInfo:OrderInfo,orderDetail: OrderDetail) { this mergeOrderInfo(orderInfo) mergeOrderDetail(orderDetail)
}
def mergeOrderInfo(orderInfo:OrderInfo): Unit ={ if(orderInfo!=null){ this.order_id=orderInfo.id this.order_status=orderInfo.order_status this.create_time=orderInfo.create_time this.dt=orderInfo.create_date
this.benefit_reduce_amount =orderInfo.benefit_reduce_amount this.original_total_amount =orderInfo.original_total_amount this.feight_fee =orderInfo.feight_fee this.final_total_amount =orderInfo.final_total_amount
this.province_name=orderInfo.province_name this.province_area_code=orderInfo.province_area_code
this.user_age_group=orderInfo.user_age_group this.user_gender=orderInfo.user_gender
this.if_first_order=orderInfo.if_first_order
this.user_id=orderInfo.user_id } }
def mergeOrderDetail(orderDetail: OrderDetail): Unit ={ if(orderDetail!=null){ this.order_detail_id=orderDetail.id this.sku_id=orderDetail.sku_id this.sku_name=orderDetail.sku_name this.sku_price=orderDetail.order_price this.sku_num=orderDetail.sku_num
this.spu_id =orderDetail.spu_id this.tm_id =orderDetail.tm_id this.category3_id =orderDetail.category3_id this.spu_name =orderDetail.spu_name this.tm_name =orderDetail.tm_name this.category3_name =orderDetail.category3_name
} }
|
实时计算代码
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setAppName("dws_order_detail_wide_app").setMaster("local[*]") val ssc = new StreamingContext(sparkConf, Seconds(5)) val orderInfoTopic = "DW_ORDER_INFO" val orderDetailTopic = "DW_ORDER_DETAIL" val groupId = "dws_order_detail_wide_consumer" val orderInfoOffsets: Map[TopicPartition, Long] = OffsetManager.getOffset(groupId, orderInfoTopic) val orderDetailOffsets: Map[TopicPartition, Long] = OffsetManager.getOffset(groupId, orderDetailTopic)
val orderInputDstream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(orderInfoTopic, ssc, orderInfoOffsets, groupId)
val orderDetailInputDstream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(orderDetailTopic, ssc, orderDetailOffsets, groupId)
var orderInfoOffsetRanges: Array[OffsetRange] = Array.empty[OffsetRange] val orderInputNDstream: DStream[ConsumerRecord[String, String]] = orderInputDstream.transform { rdd => orderInfoOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd } var orderDetailOffsetRanges: Array[OffsetRange] = Array.empty[OffsetRange] val orderDetailInputNDstream: DStream[ConsumerRecord[String, String]] = orderDetailInputDstream.transform { rdd => orderDetailOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }
//把订单和订单明细 转换为 case class的流 val orderInfoDstream: DStream[OrderInfo] = orderInputNDstream.map { record => val jsonString: String = record.value() val orderInfo: OrderInfo = JSON.parseObject(jsonString, classOf[OrderInfo]) orderInfo }
val orderDetailDstream: DStream[OrderDetail] = orderDetailInputNDstream.map(record => JSON.parseObject(record.value, classOf[OrderDetail]))
val orderInfoWinDstream: DStream[OrderInfo] = orderInfoDstream.window(Seconds(15), Seconds(5)) val orderDetailWinDstream: DStream[OrderDetail] = orderDetailDstream.window(Seconds(15), Seconds(5)) orderInfoWinDstream.cache() orderDetailWinDstream.cache() // orderInfoWinDstream.print(1000) // orderDetailWinDstream.print(1000)
// orderinfo 和 orderDetail 的双流join val orderInfoWithKeyDstream: DStream[(Long, OrderInfo)] = orderInfoWinDstream.map(orderInfo => (orderInfo.id, orderInfo)) val orderDetailWithKeyDstream: DStream[(Long, OrderDetail)] = orderDetailWinDstream.map(orderDetail => (orderDetail.order_id, orderDetail))
val orderJoinDstream: DStream[(Long, (OrderInfo, OrderDetail))] = orderInfoWithKeyDstream.join(orderDetailWithKeyDstream)
val orderDetailWideDstream: DStream[OrderDetailWide] = orderJoinDstream.map { case (id, (orderInfo, orderDetail)) => new OrderDetailWide(orderInfo, orderDetail) }
//去重 val orderDetailWideFilteredDstream: DStream[OrderDetailWide] = orderDetailWideDstream.transform { rdd => println("前:" + rdd.count()) val logInfoRdd: RDD[OrderDetailWide] = rdd.mapPartitions { orderDetailWideItr => val jedis: Jedis = RedisUtil.getJedisClient val orderDetailWideFilteredList = new ListBuffer[OrderDetailWide] val orderDetailWideList: List[OrderDetailWide] = orderDetailWideItr.toList
println(orderDetailWideList.map(orderDetailWide => orderDetailWide.order_id).mkString(",")) for (orderDetailWide <- orderDetailWideList) {
val orderDetailWideKey = "order_detail_wide:" + orderDetailWide.dt val ifFirst: lang.Long = jedis.sadd(orderDetailWideKey, orderDetailWide.order_detail_id.toString) if (ifFirst == 1L) { orderDetailWideFilteredList += orderDetailWide } } jedis.close() orderDetailWideFilteredList.toIterator } logInfoRdd.cache() println("后:" + logInfoRdd.count()) logInfoRdd }
orderDetailWideFilteredDstream.map(orderwide=>(orderwide.order_id,orderwide.final_total_amount,orderwide.original_total_amount, orderwide.sku_price,orderwide.sku_num,orderwide.final_detail_amount)).print(1000) ssc.start() ssc.awaitTermination()
}
|
注意点:join时尽量不要出现shuffle。
如何解决:
在join前的数据保证分区是一对一的关系,利用kafka发送时的分区键,两张表的分区键和分区数保持一致。
第二章 订单明细实付金额分摊
1 需求
主订单的应付金额【origin_total_amount】一般是由所有订单明细的商品单价*数量汇总【sku_price*sku_num】组成。
但是由于优惠、运费等都是以订单为单位进行计算的,所以减掉优惠、加上运费会得到一个最终实付金额【final_total_amount】。
但问题在于如果是以商品进行交易额分析,也要把优惠、运费的效果分摊到购买的每个商品中。
2 如何分摊呢?
一般是由订单明细每种商品的消费占总订单的比重进行分摊,比如总价1000元的商品,
由分别由600元和400元的A、B两种商品组成, 但是经过打折和加运费后,实际付款金额变为810,那么A的分摊实付金额为486元和B的分摊实付金额为324元。
3 麻烦的情况:
由于明细的分摊是由占比而得,那就会进行除法,除法就有可能出现除不尽的情况。
比如:原价90元 ,三种商品每件30元。没有优惠但有10元运费,总实付金额为100元。按占比分摊各三分之一,就会出现三个33.33元。加起来就会出现99.99元。就会出现差一分钱的情况。
而我们要求所有订单明细的实付分摊加总必须和订单的总实付相等。
所以我们要的是100=33.33+33.33+33.34
4 解决思路:
核心思路:就是需要用两种算法来计算金额
1) 算法一:
如果 计算时该明细不是最后一笔
使用乘除法公式:: 实付分摊金额/实付总金额= (数量*单价)/原始总金额
调整移项可得 实付分摊金额=(数量*单价)*实付总金额 / 原始总金额
2) 算法二:
如果 计算时该明细是最后一笔
使用减法公式:
实付分摊金额= 实付总金额 - (其他明细已经计算好的【实付分摊金额】的合计)
3) 判断是否是最后一笔
判断公式: 如果 该条明细 (数量*单价)== 原始总金额 -(其他明细 【数量*单价】的合计)
4) 整个计算中需要的两个合计值:
l 其他明细已经计算好的【实付分摊金额】的合计
l 订单的已经计算完的明细的【数量*单价】的合计
如何保存这两个合计?保存在redis中。
type
|
hash
|
key
|
order_split_amount:[order_id]
|
field
|
split_amount_sum , origin_amount_sum
|
value
|
合计值
|
5、实现代码
val orderWideWithSplitDstream: DStream[OrderDetailWide] = orderDetailWideDStream.mapPartitions { orderWideItr => val jedis: Jedis = RedisUtil.getJedisClient // 1 先从redis取 两个合计 【实付分摊金额】的合计,【数量*单价】的合计 val orderWideList: List[OrderDetailWide] = orderWideItr.toList for (orderWide <- orderWideList) { // type ? hash key? order_split_amount:[order_id] field split_amount_sum ,origin_amount_sum value ? 累积金额 val key = "order_split_amount:" + orderWide.order_id val orderSumMap: util.Map[String, String] = jedis.hgetAll(key) var splitAmountSum = 0D var originAmountSum = 0D if (orderSumMap != null && orderSumMap.size() > 0) { val splitAmountSumString: String = orderSumMap.get("split_amount_sum") splitAmountSum = splitAmountSumString.toDouble
val originAmountSumString: String = orderSumMap.get("origin_amount_sum") originAmountSum = originAmountSumString.toDouble } // 2 先判断是否是最后一笔 : (数量*单价)== 原始总金额 -(其他明细 【数量*单价】的合计) val detailOrginAmount: Double = orderWide.sku_num * orderWide.sku_price //单条明细的原始金额 数量*单价 val restOriginAmount: Double = orderWide.final_total_amount - originAmountSum if (detailOrginAmount == restOriginAmount) { //3.1 最后一笔 用减法 :实付分摊金额= 实付总金额 - (其他明细已经计算好的【实付分摊金额】的合计) orderWide.final_detail_amount = orderWide.final_total_amount - splitAmountSum } else { //3.2 不是最后一笔 用乘除 实付分摊金额=(数量*单价)*实付总金额 / 原始总金额 orderWide.final_detail_amount = detailOrginAmount * orderWide.final_total_amount / orderWide.original_total_amount orderWide.final_detail_amount= Math.round(orderWide.final_detail_amount*100D)/100D } // 4 进行合计保存 splitAmountSum += orderWide.final_detail_amount originAmountSum += detailOrginAmount orderSumMap.put("split_amount_sum", splitAmountSum.toString) orderSumMap.put("origin_amount_sum", originAmountSum.toString) jedis.hmset(key, orderSumMap) } jedis.close() orderWideList.toIterator }
|
第三章 保存到clickhouse
1 clickhouse 安装及入门,参见《尚硅谷clickhouse》课件
2 在clickhouse中建立表
create table order_wide (
order_detail_id UInt64,
order_id UInt64,
order_status String,
create_time DateTime,
user_id UInt64,
sku_id UInt64,
sku_price Decimal64(2),
sku_num UInt64,
sku_name String,
benefit_reduce_amount Decimal64(2),
original_total_amount Decimal64(2),
feight_fee Decimal64(2),
final_total_amount Decimal64(2),
final_detail_amount Decimal64(2),
if_first_order String,
province_name String,
province_area_code String,
user_age_group String,
user_gender String,
dt Date,
spu_id UInt64,
tm_id UInt64,
category3_id UInt64,
spu_name String,
tm_name String,
category3_name String
)engine =ReplacingMergeTree(create_time)
partition by dt
primary key (order_detail_id)
order by (order_detail_id )
|
3 在sparkstreaming中增加写入clickhouse部分
3.1 pom.xml
添加
<dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.1.55</version> </dependency>
|
3.2 sparkstreaming 写入clickhouse
val sparkSession = SparkSession.builder() .appName("order_detail_wide_spark_app") .getOrCreate()
import sparkSession.implicits._ orderDetailWideDStream.foreachRDD{rdd=>
val df: DataFrame = rdd.toDF() df.write.mode(SaveMode.Append) .option("batchsize", "100") .option("isolationLevel", "NONE") // 设置事务 .option("numPartitions", "4") // 设置并发 .option("driver","ru.yandex.clickhouse.ClickHouseDriver") .jdbc("jdbc:clickhouse://hdp1:8123/test1","order_wide",new Properties())
}
|
第四章 发布数据接口
1 代码清单
控制层
|
PublisherController
|
实现接口的web发布
|
服务层
|
ClickhouseService
|
数据业务查询interface
|
ClickhouseServiceImpl
|
业务查询的实现类
|
数据层
|
OrderMapper
|
数据层查询的interface
|
OrderMapper.xml
|
数据层查询的实现配置
|
2 接口
2.1 访问路径
总数
|
http://publisher:8070/realtime-total?date=2019-02-01
|
分时统计
|
http://publisher:8070/realtime-hour?id=order_amount&date=2019-02-01
|
2.2 要求数据格式
总数
|
[{"id":"dau","name":"新增日活","value":1200},
{"id":"new_mid","name":"新增设备","value":233 },
{"id":"order_amount","name":"新增交易额","value":1000.2 }]
|
分时统计
|
{"yesterday":{"11":383,"12":123,"17":88,"19":200 },
"today":{"12":38,"13":1233,"17":123,"19":688 }}
|
3 代码开发
3.1 pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency>
<dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.3.4</version> </dependency>
<dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.1.55</version> </dependency>
|
3.2 OrderMapper
import java.util.List; import java.util.Map;
public interface OrderMapper {
//1 查询当日交易额总数 public BigDecimal selectOrderAmountTotal(String date);
//2 查询当日交易额分时明细 public List<Map> selectOrderAmountHourMap(String date);
}
|
3.3 OrderMapper.xml
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper SYSTEM "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace="com.atguigu.gmall0105.publisher.mapper.OrderMapper"> <select id="selectOrderAmountTotal" resultType="java.math.BigDecimal"> select sum(final_total_amount) sum_amount from order_wide where dt=#{date} </select>
<select id="selectOrderAmountHourMap" resultMap="orderAmountHour" > select toHour(create_time) hr ,sum(final_total_amount) am from order_wide where dt=#{date} group by toHour(create_time)
</select> <resultMap id="orderAmountHour" type="java.util.Map" autoMapping="true"> </resultMap>
</mapper>
|
3.4 application.properties
添加:
spring.datasource.driver-class-name=ru.yandex.clickhouse.ClickHouseDriver spring.datasource.url=jdbc:clickhouse://hdp1:8123/test1 mybatis.mapperLocations=classpath:mapper/*.xml mybatis.configuration.map-underscore-to-camel-case=true
|
3.5 增加扫描包路径
@SpringBootApplication @MapperScan(basePackages = "com.atguigu.gmallXXXXXXX.publisher.mapper") public class Gmall2019PublisherApplication{
public static void main(String[] args) { SpringApplication.run(Gmall2019PublisherApplication.class, args); }
}
|
3.6 ClickHouseService
public BigDecimal getOrderAmount(String date);
public Map getOrderAmountHour(String date);
|
3.7 ClickHouseServiceImpl
@Service public class ClickHouseServiceImpl implements ClickHouseService {
@Autowired OrderMapper orderMapper;
@Override public BigDecimal getOrderAmount(String date) { return orderMapper.selectOrderAmountTotal(date); }
@Override public Map getOrderAmountHour(String date) { List<Map> mapList = orderMapper.selectOrderAmountHourMap(date); Map orderAmountHourMap=new HashMap(); for (Map map : mapList) { orderAmountHourMap.put(map.get("hr"), map.get("am")); } return orderAmountHourMap; }
}
|
3.5 PublisherController
@RestController public class PublisherController {
@Autowired EsService esService;
@Autowired ClickHouseService clickHouseService;
@RequestMapping(value = "realtime-total",method = RequestMethod.GET) public String realtimeTotal(@RequestParam("date") String dt){ List<Map<String,Object>> rsList=new ArrayList<>();
Map<String,Object> dauMap = new HashMap(); dauMap.put("id","dau"); dauMap.put("name","新增日活"); Long dauTotal=0L; try { dauTotal = esService.getDauTotal(dt); }catch ( Exception e){ e.printStackTrace(); } if(dauTotal!=null){ dauMap.put("value",dauTotal); }else { dauMap.put("value",0L); }
rsList.add(dauMap);
Map<String,Object> newMidMap = new HashMap(); newMidMap.put("id","new_mid"); newMidMap.put("name","新增设备"); newMidMap.put("value",233); rsList.add(newMidMap);
Map<String,Object> orderAmountMap = new HashMap(); orderAmountMap.put("id","order_amount"); orderAmountMap.put("name","新增交易额"); BigDecimal orderAmount = clickHouseService.getOrderAmount(dt).setScale(2, RoundingMode.HALF_UP); orderAmountMap.put("value",orderAmount);
rsList.add(orderAmountMap);
return JSON.toJSONString(rsList); }
@GetMapping("realtime-hour") public String realtimeHour(@RequestParam("id") String id ,@RequestParam("date") String dt){ if(id.equals("dau")){ Map dauHourMapTD = esService.getDauHour(dt); String yd = getYd(dt); Map dauHourMapYD = esService.getDauHour(yd);
Map<String,Map<String,Long>> rsMap=new HashMap<>(); rsMap.put("yesterday",dauHourMapYD); rsMap.put("today",dauHourMapTD); return JSON.toJSONString(rsMap); }else if(id.equals("order_amount")){ Map orderAmountHourMapTD = clickHouseService.getOrderAmountHour(dt); String yd = getYd(dt); Map orderAmountHourMapYD = clickHouseService.getOrderAmountHour(yd);
Map<String,Map<String,BigDecimal>> rsMap=new HashMap<>(); rsMap.put("yesterday",orderAmountHourMapYD); rsMap.put("today",orderAmountHourMapTD); return JSON.toJSONString(rsMap); }else{ return null; }
}
private String getYd(String today){ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); try { Date todayDate = dateFormat.parse(today); Date ydDate = DateUtils.addDays(todayDate, -1); return dateFormat.format(ydDate);
} catch (ParseException e) { e.printStackTrace(); throw new RuntimeException("日期格式不正确"); }
}
}
|