Flink 双流合并之connect Demo2

1、主类

package towStream

/**
 * @program: demo
 * @description: ${description}
 * @author: yang
 * @create: 2020-12-31 11:39
 */
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
import scala.util.Random

object TwoStreamJoinDemo {

  // 订单支付事件
  case class OrderEvent(orderId: String,
                        eventType: String,
                        eventTime: Long)

  // 第三方支付事件,例如微信,支付宝
  case class PayEvent(orderId: String,
                      eventType: String,
                      eventTime: Long)

  // 用来输出没有匹配到的订单支付事件
  val unmatchedOrders = new OutputTag[String]("unmatched-orders")
  // 用来输出没有匹配到的第三方支付事件
  val unmatchedPays = new OutputTag[String]("unmatched-pays")

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //自定义数据源1
    val orders: KeyedStream[OrderEvent, String] = env.addSource(new SourceFunction[OrderEvent] {

      val flag = true
      private val random = new Random()

      override def run(sourceContext: SourceFunction.SourceContext[OrderEvent]): Unit = {
        while (flag) {
          val temTime: Long = System.currentTimeMillis()
          val orderId: String = random.nextInt(4).toString
          sourceContext.collect(OrderEvent("order_" + orderId, "pay", temTime))
          println("source1:"+"order_"+orderId+":pay",temTime)
          Thread.sleep(1000)
        }
      }

      override def cancel(): Unit = false
    }).assignAscendingTimestamps(_.eventTime).keyBy(_.orderId)

    //自定义数据源2
    val pays: KeyedStream[PayEvent, String] = env.addSource(new SourceFunction[PayEvent] {

      val flag = true
      private val random = new Random()

      override def run(sourceContext: SourceFunction.SourceContext[PayEvent]): Unit = {
        while (flag) {
          val temTime: Long = System.currentTimeMillis()
          val orderId: String = random.nextInt(4).toString
          sourceContext.collect(PayEvent("order_" +orderId , "weixin", temTime))
          println("source2:"+"order_"+orderId+":weixin",temTime)
          Thread.sleep(1000)
        }
      }

      override def cancel(): Unit = false
    }).assignAscendingTimestamps(_.eventTime).keyBy(_.orderId)

    val processed = orders.connect(pays).process(new MatchFunction)

    processed.print()

    processed.getSideOutput(unmatchedOrders).print()

    processed.getSideOutput(unmatchedPays).print()

    env.execute()
  }

  //进入同一条流中的数据肯定是同一个key,即OrderId
  class MatchFunction extends KeyedCoProcessFunction[String, OrderEvent, PayEvent, String] {
    lazy private val orderState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("orderState", Types.of[OrderEvent]))
    lazy private val payState: ValueState[PayEvent] = getRuntimeContext.getState(new ValueStateDescriptor[PayEvent]("payState", Types.of[PayEvent]))

    override def processElement1(value: OrderEvent, ctx: KeyedCoProcessFunction[String, OrderEvent, PayEvent, String]#Context, out: Collector[String]): Unit = {
      //从payState中查找数据,如果存在,说明匹配成功
      val pay = payState.value()
      if (pay != null) {
        payState.clear()
        out.collect("处理器1:订单ID为 " + pay+"=="+value+ " 的两条流对账成功!")
      } else {
        //如果不存在,则说明可能对应的pay数据没有来,需要存入状态等待
        //定义一个5min的定时器,到时候再匹配,如果还没匹配上,则说明匹配失败发出警告
        orderState.update(value)
        ctx.timerService().registerEventTimeTimer(value.eventTime + 60000)
      }
    }

    override def processElement2(value: _root_.towStream.TwoStreamJoinDemo.PayEvent, ctx: _root_.org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction[_root_.scala.Predef.String, _root_.towStream.TwoStreamJoinDemo.OrderEvent, _root_.towStream.TwoStreamJoinDemo.PayEvent, _root_.scala.Predef.String]#Context, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = {
      val order = orderState.value()
      if (order != null) {
        orderState.clear()
        out.collect("处理器2:订单ID为 " + order+"=="+value + " 的两条流对账成功!")
      } else {
        payState.update(value)
        ctx.timerService().registerEventTimeTimer(value.eventTime + 60000)
      }
    }

    override def onTimer(timestamp: Long, ctx: KeyedCoProcessFunction[String, OrderEvent, PayEvent, String]#OnTimerContext, out: Collector[String]): Unit = {
      if (orderState.value() != null) {
        //将警告信息发送到侧输出流中
        ctx.output(unmatchedOrders,s"订单ID为 ${orderState.value().orderId } 的两条流没有对账成功!")
        orderState.clear()
      }
      if (payState.value() != null){
        ctx.output(unmatchedPays,s"订单ID为 ${payState.value().orderId } 的两条流没有对账成功!")
        payState.clear()
      }

    }
  }

}

2、结果

(source1:order_3:pay,1609753528069)
(source2:order_3:weixin,1609753528069)
处理器2:订单ID为 OrderEvent(order_3,pay,1609753528069)==PayEvent(order_3,weixin,1609753528069) 的两条流对账成功!
(source2:order_1:weixin,1609753529085)
(source1:order_0:pay,1609753529085)
(source1:order_3:pay,1609753530086)
(source2:order_2:weixin,1609753530086)
(source1:order_1:pay,1609753531087)
(source2:order_0:weixin,1609753531087)
处理器1:订单ID为 PayEvent(order_1,weixin,1609753529085)==OrderEvent(order_1,pay,1609753531087) 的两条流对账成功!
处理器2:订单ID为 OrderEvent(order_0,pay,1609753529085)==PayEvent(order_0,weixin,1609753531087) 的两条流对账成功!
(source2:order_0:weixin,1609753532087)
(source1:order_1:pay,1609753532087)
(source2:order_1:weixin,1609753533088)
(source1:order_1:pay,1609753533088)
处理器2:订单ID为 OrderEvent(order_1,pay,1609753533088)==PayEvent(order_1,weixin,1609753533088) 的两条流对账成功!
订单ID为 order_3 的两条流没有对账成功!
(source1:order_0:pay,1609753534088)
(source2:order_3:weixin,1609753534088)
处理器1:订单ID为 PayEvent(order_0,weixin,1609753532087)==OrderEvent(order_0,pay,1609753534088) 的两条流对账成功!
(source2:order_3:weixin,1609753535089)
(source1:order_0:pay,1609753535089)
订单ID为 order_3 的两条流没有对账成功!
订单ID为 order_2 的两条流没有对账成功!
(source2:order_2:weixin,1609753536089)
(source1:order_2:pay,1609753536089)
处理器2:订单ID为 OrderEvent(order_2,pay,1609753536089)==PayEvent(order_2,weixin,1609753536089) 的两条流对账成功!
(source2:order_0:weixin,1609753537090)
(source1:order_1:pay,1609753537090)
处理器2:订单ID为 OrderEvent(order_0,pay,1609753535089)==PayEvent(order_0,weixin,1609753537090) 的两条流对账成功!
订单ID为 order_1 的两条流没有对账成功!
(source1:order_2:pay,1609753538091)
(source2:order_1:weixin,1609753538091)
订单ID为 order_1 的两条流没有对账成功!
(source1:order_2:pay,1609753539091)
(source2:order_3:weixin,1609753539091)
订单ID为 order_3 的两条流没有对账成功!
原文地址:https://www.cnblogs.com/ywjfx/p/14231586.html