Fink| CEP

什么是CEP?

复杂事件处理(Complex Event Processing, CEP);(复杂的逻辑关系,谁前谁后,--- 可以通过自定义processing function,状态编程,设置定时器来实现,但是比较复杂)

Flink CEP是在Flink中实现的复杂事件处理(CEP)库;

CEP允许在无休止的事件流中检测事件模式,让我们有机会掌握数据中重要的部分;

一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出用户想得到的数据,满足规则的复杂事件。

特征:

  • 目标:从有序的简单事件流中发现一些高阶特征

  • 输入:一个或多个由简单事件构成的事件流

  • 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件

  • 输出:满足规则的复杂事件

           

Pattern API

处理事件的规则,被叫做“模式”(Pattern)---(怎么定义事件之间的关系如何处理);

Flink CEP提供了Pattern API,用于对输入流数据进行复杂事件规则定义,用来提取符合规则的事件序列:

                    

 分类:

个体模式(Individual Patterns):

             ---组成复杂规则的每一个独特的模式定义,就是“个体模式” start.times(3).where( _.behavior.startwith("fav") )

组合模式(Combining Patterns,也叫模式序列):

            ----很多个体模式组合起来,就形成了整个的模式序列; 模式序列必须以一个“初始模式”开始:val start = Pattern.begin("start")

模式组(Groups Of Patterns):

            -----将一个模式序列作为条件嵌套在个体模式里,成为一组模式。

① 个体模式(Individual Patterns):

个体模式可以包括“单例(singleton)模式”和“循环(looping)模式”;

  单例模式只接收一个事件(定义好状态只筛选一次),而循环模式可以接收多个;  

  循环模式中的量词(Quantifier):可以在一个个体模式后追加量词,也就是指定循环次数;

   

个体模式的条件:

条件(Condition)

    --每个模式都需要指定触发条件,作为模式是否接受事件进入的判断依据;

    --CEP中的个体模式主要通过调用.where(), or(), 和until() 来指定条件;

    --按不同的调用方式,可以分成以下几类:

  • 简单条件(Simple Condition):通过.where() 方法对事件中的字段进行判断筛选,决定是否接受该事件 start.where( event => event.getName.startWith("foo"))
  • 组合条件(Combining Condition):将简单条件进行组合;.or()方法表示或逻辑相连,where的直接组合(.where.or)就是AND,

                            pattern.where(event => .../* some condition */  ) .or(event => .../* or condition */ )

  • 终止条件(Stop Condition):如果使用了oneOrMore或者oneOrMore.option(可有可无,没有终点),建议使用.until()作为终止条件,以便清理状态;
  • 迭代条件(Iterative Condition): 能够对模式之前所有接收的事件进行处理;调用.where((value, ctx) => {...}), 可以调用ctx. getEventsForPattern( "name" )

② 组合模式(Combining Patterns,也叫模式序列)

 不同的“近邻”模式(近邻的区别,谁前谁后谁先发生)

 严格近邻(Strict Contiguity):所有事件按照严格的顺序出现,中间没有任何不匹配的事件,由.next()指定;

                                                                                                 例如对于模式“a next b”,事件序列[a, c, b1, b2]没有匹配;

 宽松近邻(Relaxed Contiguity): 允许中间出现不匹配的事件,由.followedBy()指定(跟在后边的);

                                                                                                 例如对于模式“a followedBy b” ,事件序列[a, c, b1, b2]匹配为{a, b1};

非确定性宽松近邻(Non-Deterministic Relaxed Contiguity): 进一步放宽条件,之前已经匹配过的事件也可以再次使用,由followByAny()指定;

                                                                                                  例如对于模式“a followedByAny b” ,事件序列[a, c, b1, b2]匹配为{a, b1},{a, b2};

除以上模式序列外,还可以定义“不希望出现某种近邻关系”:(不希望谁跟在谁后边,想要它们分开的转态)

    .notNext() ---不想让某个事件严格紧邻前一个事件发生;

    .notFollowedBy() --不想让某个事件在两个事件之间(用在两个事件之间,表示某个事件不要在这两个事件之间);

需要注意:所有模式序列必须以.begin()开始;模式序列不能以.notFollowedBy()结束;“not”类型的模式不能被optional所修饰;

                还可以为模式指定时间约束,用来要求在多长时间内匹配有效,next.within(Time.seconds(10))

模式的检测:

指定要查找的模式序列后,就可以将其应用于输入流以检测潜在匹配;

调用CEP.pattern(),给定输入流和模式,就能得到一个PatternStream:

    val input: DataStream[Event] = ...
    val pattern: Pattern[Event, _] = ...
    val patternStream:PatternStream[Event] = CEP.pattern(input, pattern)

匹配事件的提取

创建PatternStream 之后,就可以应用select或者flatselect方法,从检测到的事件序列中提取事件了;

select()方法需要输入一个select function作为参数,每个成功匹配的事件序列都会调用它;

select()以一个Map[String, Iterable[Int] ]来接收匹配到的事件序列,其中key就是每个模式的名称,而value就是所有接收到的事件的Iterable类型;

 def selectFn(pattern: Map[String, Iterable[Int]]): OUT = {
      val startEvent = pattern.get("start").get.next
      val endEvent = pattern.get("end").get.next
     OUT(startEvent, endEvent)
}

超时事件的提取

当一个模式通过within关键字定义了检测窗口时间时,部分事件序列可能因为超过窗口长度被丢弃;

为了能够处理这些超时的部分匹配,select和flatSelectAPI调用允许指定超时处理程序;

超时处理程序会被接收到目前为止由模式匹配到的所有事件,由一个OutputTag定义接收到的超时事件序列(超时事件会放到另外侧输出流中):

 val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
val putputTag = OutputTag[String]("side-output")
val result = patternStream.select(outputTag){
        (pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent()
} {
      pattern: Map[String, Iterable[Event]] => ComplexEvent()
}
val timeoutResult: DataStream<TimeOutEvent> = result.getSideOutput(outputTag)

CEP架构

            

CEP用于分析低延迟、频繁产生的不同来源的事件流。CEP可以帮助在复杂的、不相关的事件流中找出有意义的模式和复杂的关系,以接近实时或准实时的获得通知并阻止一些行为。

CEP支持在流上进行模式匹配,根据模式的条件不同,分为连续的条件或不连续的条件;模式的条件允许有时间的限制,当在条件范围内没有达到满足的条件时,会导致模式匹配超时。

CEP的工作流图:

             

看起来很简单,但是它有很多不同的功能:

  1. 输入的流数据,尽快产生结果

  2. 2个event流上,基于时间进行聚合类的计算

  3. 提供实时/准实时的警告和通知

  4. 多样的数据源中产生关联并分析模式

  5. 高吞吐、低延迟的处理

市场上有多种CEP的解决方案,例如Spark、Samza、Beam等,但他们都没有提供专门的library支持。但是Flink提供了专门的CEP library

Flink CEP

Flink为CEP提供了专门的Flink CEP library,它包含如下组件:

  1. Event Stream

  2. pattern定义

  3. pattern检测

  4. 生成Alert

  

首先,开发人员要在DataStream流上定义出模式条件,之后Flink CEP引擎进行模式检测,必要时生成告警。

为了使用Flink CEP,我们需要导入依赖:

<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-cep_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
</dependency>

Event Streams

我们首先需要为Stream Event设计java pojo,但是注意,由于要对event对象进行对比,所以我们需要重写hashCode()方法和equals()方法。下面进行监控温度事件流。

创建抽象类MonitoringEvent,重写hashCode()和equals()方法;再创建POJO:TemperatureEvent,同样重写hashCode()和equals()方法:

MonitoringEvent:

 TemperatureEvent:

 创建env,创建source:

Pattern API

每个Pattern都应该包含几个步骤,或者叫做state。从一个state到另一个state,通常我们需要定义一些条件,例如下列的代码:

DataStream<Event> input = ...

Pattern<Event, ?> pattern = Pattern.begin("start").where(evt -> evt.getId() == 42)
    // next表示"middle"事件紧跟着"start"事件发生
    .next("middle").subtype(SubEvent.class).where(subEvt -> subEvt.getVolume() >= 10.0)
    // followedBy表示"end"事件不一定紧跟着"middle"事件发生
    .followedBy("end").where(evt -> evt.getName().equals("end"));

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

DataStream<Alert> result = patternStream.select(pattern -> {
    return createAlertFrom(pattern);
});

例子:

Flink-CEP:
登录事件检测:同一用户连续两次登录失败,报警; LoginEvent Stream
--> key by -->userid:LoginEvent Stream --> pattern matching -->select(定义报警输出) -->LoginWarning Stream -->print()
温度事件检测:同一机箱连续两次温度超标,报警; TemperatureEvent Stream
--> pattern matching(filter>27℃) --> select(定义报警输出) --> Alert Stream -->print()

异常检测:机箱温度检测

需求:同一个机箱连续两次温度超标,报警

拓展需求:锅炉房温度检测;信用卡反欺诈:连续大额消费;反作弊:同一个用户短时间内连续登陆失败 

  • flink cep

  • pattern定义

  • pattern匹配

  • select选出匹配到的事件,报警

public class CEPExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<TemperatureEvent> inputEventStream = env.fromElements( //不是DataStreamSource类型
                new TemperatureEvent("xyz", 22.0),
                new TemperatureEvent("xyz", 20.1), new TemperatureEvent("xyz", 21.1),
                new TemperatureEvent("xyz", 22.2), new TemperatureEvent("xyz", 22.1),
                new TemperatureEvent("xyz", 22.3), new TemperatureEvent("xyz", 22.1),
                new TemperatureEvent("xyz", 22.4), new TemperatureEvent("xyz", 22.7),
                new TemperatureEvent("xyz", 27.0));
        // 定义Pattern,检查10秒钟内温度是否高于26度
        Pattern<TemperatureEvent,?> warningPattern = Pattern.<TemperatureEvent>begin("start") //加泛型
                .subtype(TemperatureEvent.class)
                .where(new SimpleCondition<TemperatureEvent>() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public boolean filter(TemperatureEvent value) throws Exception {
                        if (value.getTemperature() >= 26) {
                            return true;
                        }
                        return false;
                    }
                }).within(Time.seconds(10));

        //匹配pattern并select事件,符合条件的发生警告,即输出
        //Alert类属性message,表示在满足一定的pattern条件后,需要告警的内容:
        DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern) //DataStream类型的
                .select(new PatternSelectFunction<TemperatureEvent, Alert>() {
                    @Override
                    public Alert select(Map<String, List<TemperatureEvent>> pattern) throws Exception {
                        return new Alert("Temperature Rise Detected: " + pattern.get("start").get(0).getTemperature() +
                                " on machine name: " + pattern.get("start").get(0).getMachineName());
                    }
                });
        patternStream.print();
        env.execute();
    }

}

 登录事件异常检测:同一个用户连续两次登陆失败,报警

  • flink cep

  • pattern定义

  • pattern匹配

  • select输出报警事件

//需求: 如果同一个userid在三秒之内连续两次登陆失败,报警。
public class FlinkLoginFail {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 这里mock了事件流,这个事件流一般从Kafka过来
        DataStream<LoginEvent> loginEventStream = env.fromCollection(Arrays.asList(  //自定义一个pojo类:userId、ip、type
                new LoginEvent("1", "192.168.0.1", "fail"),
                new LoginEvent("1", "192.168.0.2", "fail"),
                new LoginEvent("1", "192.168.0.3", "fail"),
                new LoginEvent("2", "192.168.10.10", "success")
        ));//不用DataStreamSource,使用DataStream

        Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>begin("start")
                //泛型类或泛型接口上的泛型形参是不能用于静态成员的,那么当静态方法需要用到泛型时,只能用泛型方法。
                .where(new IterativeCondition<LoginEvent>() { // 模式开始事件的匹配条件为事件类型为fail, 为迭代条件
                    @Override
                    public boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception {
                        return loginEvent.getType().equals("fail");
                    }
                }).next("next")
                .where(new IterativeCondition<LoginEvent>() { // 事件的匹配条件为事件类型为fail
                    @Override
                    public boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception {
                        return loginEvent.getType().equals("fail");
                    }
                }).within(Time.seconds(3));// 要求紧邻的两个事件发生的时间间隔不能超过3秒钟

        // 以userid分组,形成keyedStream,然后进行模式匹配   ::方法引用
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(LoginEvent::getUserId), loginFailPattern);

        DataStream<LoginWarning> loginFailDataStream = patternStream.select((Map<String, List<LoginEvent>> pattern) -> {
            List<LoginEvent> first = pattern.get("start");
            List<LoginEvent> second = pattern.get("next");
            return new LoginWarning(first.get(0).getUserId(), first.get(0).getIp(), first.get(0).getType());
        });//不用SingleOutputStreamOperator类型的,使用

        loginFailDataStream.print();
        env.execute();

    }

}
原文地址:https://www.cnblogs.com/shengyang17/p/10858981.html