Flink实时对账——双流join

https://blog.csdn.net/andyonlines/article/details/108173259

import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment}
import org.apache.flink.util.Collector


/**
 * 场景:实时对账
 */

object TwoStreamJoin {

    //订单支付事件
    case class OrderEvent(orderId:String,
                          eventType:String,
                          eventTime:Long)

    //第三方支付事件
    case class PayEvent(orderId:String,
                        eventType:String,
                        eventTime:Long)

    //侧输流
    import org.apache.flink.api.scala._
    val unmatchedOrders = new OutputTag[String]("unmatched-orders")
    val unmatchedPays = new OutputTag[String]("unmatched-pays")

    def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)

        //订单流
        val orders = env.fromElements(
            OrderEvent("order_1", "pay", 2000L),
            OrderEvent("order_2", "pay", 3000L),
            OrderEvent("order_3", "pay", 4000L)
        ).assignAscendingTimestamps(_.eventTime)

        //支付流
        val pays = env.fromElements(
            PayEvent("order_1", "weixin", 5000L),
            PayEvent("order_2", "zhifubao", 6000L),
            PayEvent("order_5", "weixin", 7000L),
            PayEvent("order_6", "weixin", 8000L)
        ).assignAscendingTimestamps(_.eventTime)

        //合并
        val processed = orders.connect(pays).keyBy(0,0).process(new MachFuction)

        processed.print("对账成功---")

        processed.getSideOutput(unmatchedOrders).print("订单未到-----")
        processed.getSideOutput(unmatchedPays).print("支付未到-----")

        //执行
        env.execute()
    }

    //泛型:第一条流  第二条流 输出
    class MachFuction extends CoProcessFunction[OrderEvent, PayEvent, String] {

        //状态编程
        lazy val orderState = getRuntimeContext.getState(
            new ValueStateDescriptor[OrderEvent]("order-state", Types.of[OrderEvent])
        )

        lazy val payState = getRuntimeContext.getState(
            new ValueStateDescriptor[PayEvent]("pay-state", Types.of[PayEvent])
        )

        //处理订单流的数据
        override def processElement1(order: OrderEvent, ctx: CoProcessFunction[OrderEvent, PayEvent, String]#Context, out: Collector[String]): Unit = {
           val pay = payState.value()
           if (pay != null) {
               //订单和支付事件都到了,清空并输出信息
               payState.clear()
               out.collect("订单id为" + order.orderId + "的订单对账成功")
           }else {
               //订单流到了支付流未到,更新订单流,等待5s
               orderState.update(order)
               ctx.timerService().registerEventTimeTimer(order.eventTime + 5000L)
           }
        }

        //处理支付流的数据
        override def processElement2(pay: PayEvent, ctx: CoProcessFunction[OrderEvent, PayEvent, String]#Context, out: Collector[String]): Unit = {
            val order = payState.value()
            if (order != null) {
                //订单和支付事件都到了,清空并输出信息
                orderState.clear()
                out.collect("订单id为" + pay.orderId + "的订单对账成功")
            }else {
                //支付流到了订单流未到,更新支付流,等待5s
                payState.update(pay)
                ctx.timerService().registerEventTimeTimer(pay.eventTime + 5000L)
            }
        }

        override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, PayEvent, String]#OnTimerContext, out: Collector[String]): Unit = {
            if (payState.value() != null) {
                ctx.output(unmatchedPays, "订单是" + payState.value().orderId + "对账失败")
                payState.clear()
            }
            if (orderState.value() != null) {
                ctx.output(unmatchedOrders, "订单是" + orderState.value().orderId + "对账失败")
                orderState.clear()
            }
        }
    }


}

对账成功---> 订单id为order_1的订单对账成功
对账成功---> 订单id为order_2的订单对账成功
订单未到-----> 订单是order_3对账失败
支付未到-----> 订单是order_5对账失败
支付未到-----> 订单是order_6对账失败

原文地址:https://www.cnblogs.com/dch-21/p/13941473.html