spark双流join

https://blog.csdn.net/dinghua_xuexi/article/details/107943242

背景

在构建实时数仓过程中,有时需要将两个实时数据源进行关联,生成大宽表数据,这时就不得不用到双流join。

场景

比如有这样的场景,订单实时数据源,和订单物品实时数据源。订单数据源有订单id,下单时间,订单金额,收货人,收货地址等信息,订单商品数据源有订。单号id,商品名称,商品品牌,商品价格,商品成交价格,商品所属商家。订单和订单商品一般是一对多的关系。生成订单大宽表数据,需要将订单的每一件商品信息都关联上订单详细信息。

面临问题

我们使用流数据join,不像静态数据那么简单。我们知道静态数据join,两份数据都是确定的,没关联上就是没关联上,关联上了就是关联上了。此刻关联上的,将来再运行,也必然是关联上的,此刻没关联上的,将来运行,也是关联不上的。而在流数据join时,由于数据是动态的,两个原本能关联的数据进入程序的时机未必一致,此时就有可能造成关联数据失去关联,当然没关联上的也有可能是真的关联不上(join的一方数据缺漏)。

解决方案

由于流数据join的特殊性,我们须采取必要的处理方案,使得两份join的流数据尽量都能够关联上,真正完成流数据join的使命。

订单数据和订单商品数据是一对多的关系,也就是一条订单数据对应着一条或者多条订单商品数据,而一条订单商品数据只能对应着一条订单数据。所以在join的时候,订单数据无论是否关联上订单商品数据,都应当保存下来,留作下次继续join使用。而一条订单商品数据,一旦关联上订单数据,即可被废弃,否则就应当将失配的订单商品数据保存下来,直至下次关联成功。

详细过程

为方便说明,假定订单数据A, B, 订单商品数据a1,a2, a3. A可关联a1, a2, a3. B则没有订单商品数据与之关联。下面讨论某一时刻join的各种情况及其应对办法。

1、(A, None), 该情况说明某一时刻,订单数据A到了,能与之关联的a系列数据一个也没到, 此时有2种情况,
1) a系列数据从未进入过程序, 对于这种情况,我们将A数据保存到redis中,key的结构为 order_info:<order_id>. 使得下次a系列的订单商品数据来了之后,有机会进行关联;
2) 还有一种情况是a系列数据先到了,已经保存到了redis中,如情况3的第 2) 种情况所述, 去redis中将a系列数据查出(可能存在多个订单商品数据), 完成匹配,然后将匹配完的a系列数据,从redis中删除,以免下次join又去redis查询,重复匹配。接着,还需将A数据保存到redis中,因为将来还有可能有a系列的数据需要匹配。

2、(A, a1), 该情况说明某一时刻,订单数据A到了,能与之关联的a1订单商品数据也到了,正好进行join。此时需要注意,由于订单数据和订单商品数据是一对多的关系,后续是否还存在能关联A的a系列订单商品数据,我们并不知道,极有可能是存在的。因此,尽管已经形成了关联,还需要将A保存到redis中,如1所述,而a1已经完成匹配,不可能再有匹配的机会,无需保存. 另外,还需要注意的是,有可能此时redis中还存有a系列的商品订单数据,因此还需要像情况1的情形 2) 那样,去redis中寻找a系列的商品订单数据,找到后进行关联,关联完要将redis中的a系列商品订单数据删除,避免下次重复匹配。我们注意到,情况2和情况1的过程几乎差不多,就一个进入程序的A和a1不用查redis就可关联之外,其他过程几乎一样,因此在程序中,可以将这2种情况合并

3、(None, a2),该情况说明,订单商品数据a2到了,能与之关联的A订单数据却不在。此时我们应当这样处理,先去redis查找A订单数据.
1) 如果找到,则可完成匹配,不用保存a2数据;
2) 如果没有找到A数据,说明A数据还没进入过程序,a2数据应当保存到redis中, key结构为 order_product:<order_id>:<produce_id>, 这样a2数据可以在redis中等待下一次A数据的到来,然后完成匹配
4、(B, None) 其实是和情况1相同,不过实际数据中,不存在能与之关联的订单商品数据,但在程序中我们并不知道这一情况,我们一律将B存到redis中,订单流数据是源源不断的进来,会造成redis数据不断膨胀,因此我们不能订单数据永久地存于redis中,订单和订单商品是强关联的,关联匹配的两方数据不可能会相隔太久进入程序。因此我们在redis中设置订单数据的存活时间

需要说明的是,对于没有匹配的订单商品数据,保存到redis中,一旦下次完成匹配,须立刻将其删除,以免下次join又将其查询出来重复匹配。还有,如果订单商品数据一直没有匹配的订单数据,过一段时间也应该将其删除,因此同样地,需要在redis中设置订单商品的存活时间。

原文地址:https://www.cnblogs.com/dch-21/p/13941493.html