Fink1.13.1(四)

第8章 Flink流处理高阶编程实战

8.1 基于埋点日志数据的网络流量统计

8.1.1 指定时间范围内网站总浏览量(PV)的统计

  实现一个网站总浏览量的统计。我们可以设置滚动时间窗口,实时统计每小时内的网站PV。此前我们已经完成了该需求的流数据操作,当前需求是在之前的基础上增加了窗口信息

package com.yuange.flink.day06;

import com.yuange.flink.day03.UserBehavior;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Date;

/**
 * @作者:袁哥
 * @时间:2021/7/21 22:06
 */
public class Flink_High_Project_PV {

    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",20000);
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(conf).setParallelism(2);

        environment.readTextFile("input/UserBehavior.csv")
                .map(line -> {
                    String[] split = line.split(",");
                    return new UserBehavior(Long.valueOf(split[0]),Long.valueOf(split[1]),Integer.valueOf(split[2]),
                            split[3],Long.parseLong(split[4]) * 1000);
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
                )
                .filter(behavior -> "pv".equals(behavior.getBehavior()))
                .keyBy(UserBehavior::getBehavior)
                .window(TumblingEventTimeWindows.of(Time.hours(1)))
                .aggregate(
                        new AggregateFunction<UserBehavior, Long, Long>() {
                            @Override
                            public Long createAccumulator() {
                                return 0L;
                            }

                            @Override
                            public Long add(UserBehavior value, Long accumulator) {
                                return accumulator + 1;
                            }

                            @Override
                            public Long getResult(Long accumulator) {
                                return accumulator;
                            }

                            @Override
                            public Long merge(Long a, Long b) {
                                return null;
                            }
                        },
                        new ProcessWindowFunction<Long, String, String, TimeWindow>() {
                            @Override
                            public void process(String key,
                                                Context context,
                                                Iterable<Long> elements,
                                                Collector<String> out) throws Exception {
                                out.collect("窗口开始:" + new Date(context.window().getStart()) +
                                        ",窗口结束:" + new Date(context.window().getEnd()) +
                                        ",pv:" + elements.iterator().next());
                            }
                        }
                )
                .print();
        try {
            environment.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

8.1.2 指定时间范围内网站独立访客数(UV)的统计

package com.yuange.flink.day06;

import com.yuange.flink.day03.UserBehavior;
import com.yuange.flink.utils.YuangeUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Date;
import java.util.List;

/**
 * @作者:袁哥
 * @时间:2021/7/22 9:45
 */
public class Flink_High_Project_UV {

    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",20000);
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(conf).setParallelism(2);

        environment.readTextFile("input/UserBehavior.csv")
                .map(line -> {
                    String[] split = line.split(",");
                    return new UserBehavior(Long.valueOf(split[0]),Long.valueOf(split[1]),Integer.valueOf(split[2]),
                            split[3],Long.valueOf(split[4]));
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
                )
                .filter(behavior -> "pv".equals(behavior.getBehavior()))
                .keyBy(UserBehavior::getBehavior)
                .window(TumblingEventTimeWindows.of(Time.hours(1)))
                .process(new ProcessWindowFunction<UserBehavior, String, String, TimeWindow>() {
                    MapState<Long, Object> userIdState;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        userIdState = getRuntimeContext()
                                .getMapState(new MapStateDescriptor<Long, Object>(
                                        "userIdState",
                                        Long.class,
                                        Object.class
                                ));
                    }

                    @Override
                    public void process(String key,
                                        Context context,
                                        Iterable<UserBehavior> elements,
                                        Collector<String> out) throws Exception {
                        userIdState.clear();

                        for (UserBehavior element : elements) {
                            userIdState.put(element.getUserId(),null);
                        }

                        List<Long> uids = YuangeUtil.toList(userIdState.keys());
                        out.collect("窗口开始:" + new Date(context.window().getStart()) +
                                ",窗口结束:" + new Date(context.window().getEnd()) +
                                ",uv:" + uids.size());
                    }
                })
                .print();
        try {
            environment.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

8.2 电商数据分析

  电商平台中的用户行为频繁且较复杂,系统上线运行一段时间后,可以收集到大量的用户行为数据,进而利用大数据技术进行深入挖掘和分析,得到感兴趣的商业指标并增强对风险的控制。电商用户行为数据多样,整体可以分为用户行为习惯数据业务行为数据两大类。

  用户的行为习惯数据包括了用户的登录方式、上线的时间点及时长、点击和浏览页面、页面停留时间以及页面跳转等等,我们可以从中进行流量统计和热门商品的统计,也可以深入挖掘用户的特征;这些数据往往可以从web服务器日志中直接读取到。而业务行为数据就是用户在电商平台中针对每个业务(通常是某个具体商品)所作的操作,我们一般会在业务系统中相应的位置埋点,然后收集日志进行分析。

8.2.1 实时热门商品统计

  1)需求分析:每隔5分钟输出最近1小时内点击量最多的前N个商品

    (1)最近一小时: 窗口长度

    (2)每隔5分钟: 窗口滑动步长

    (3)时间: 使用event-time

  2)数据准备:这里依然采用UserBehavior.csv作为数据源,通过采集数据统计商品点击信息。

package com.yuange.flink.day06;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @作者:袁哥
 * @时间:2021/7/21 21:15
 */
@AllArgsConstructor
@NoArgsConstructor
@Data
public class HotItem {
    private Long itemId;
    private Long count;
    private Long windowEndTime;
}

  3)具体实现代码一:

package com.yuange.flink.day06;

import com.yuange.flink.day03.UserBehavior;
import com.yuange.flink.utils.YuangeUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Date;
import java.util.List;

/**
 * @作者:袁哥
 * @时间:2021/7/22 10:02
 */
public class Flink_High_Project_TopN {

    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",20000);
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(conf).setParallelism(2);

        environment.readTextFile("input/UserBehavior.csv")
                .map(line -> {
                    String[] split = line.split(",");
                    return new UserBehavior(Long.valueOf(split[0]),Long.valueOf(split[1]),Integer.valueOf(split[2]),
                            split[3],Long.parseLong(split[4]) * 1000);
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
                )
                .filter(behavior -> "pv".equals(behavior.getBehavior()))
                .keyBy(UserBehavior::getItemId)
                .window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(20)))
                .aggregate(
                        new AggregateFunction<UserBehavior, Long, Long>() {
                            @Override
                            public Long createAccumulator() {
                                return 0L;
                            }

                            @Override
                            public Long add(UserBehavior value, Long accumulator) {
                                return accumulator + 1;
                            }

                            @Override
                            public Long getResult(Long accumulator) {
                                return accumulator;
                            }

                            @Override
                            public Long merge(Long a, Long b) {
                                return null;
                            }
                        },
                        // 商品id 窗口结束时间 点击量
                        new ProcessWindowFunction<Long, HotItem, Long, TimeWindow>() {
                            @Override
                            public void process(Long key,
                                                Context context,
                                                Iterable<Long> elements,
                                                Collector<HotItem> out) throws Exception {
                                out.collect(new HotItem(key,elements.iterator().next(),context.window().getEnd()));
                            }
                        }
                )
                .keyBy(HotItem::getWindowEndTime)
                .process(new KeyedProcessFunction<Long, HotItem, String>() {
                    private ListState<HotItem> hotItemListState;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        hotItemListState = getRuntimeContext()
                                .getListState(new ListStateDescriptor<HotItem>("hotItemListState", HotItem.class));
                    }

                    @Override
                    public void processElement(HotItem value,
                                               Context ctx,
                                               Collector<String> out) throws Exception {
                        // 每来一条数据, 把这条数据存入到一个状态, 等所有的都来齐了, 则让定时器去计算topN
                        Iterable<HotItem> hotItems = hotItemListState.get();
                        // 如果hotItems没有元素, 则当前就是第一个
                        if (!hotItems.iterator().hasNext()){
                            ctx.timerService().registerEventTimeTimer(value.getWindowEndTime() + 5000);
                        }
                        hotItemListState.add(value);
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        // 排序取topN
                        List<HotItem> list = YuangeUtil.toList(hotItemListState.get());
                        list.sort((o1, o2) -> o2.getCount().compareTo(o1.getCount()));

                        StringBuilder sb = new StringBuilder();
                        sb.append("========================\n");
                        for (int i = 0; i < Math.min(3, list.size()); i++) {
                            HotItem hotItem = list.get(i);
                            sb
                                    .append("窗口: ")
                                    .append(new Date(hotItem.getWindowEndTime()))
                                    .append(", 商品: ")
                                    .append(hotItem)
                                    .append("\n");
                        }

                        out.collect(sb.toString());
                    }
                })
                .print();
        try {
            environment.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

  4)具体实现代码二:

package com.yuange.flink.day06;

import com.yuange.flink.day03.UserBehavior;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Comparator;
import java.util.Date;
import java.util.TreeSet;

/**
 * @作者:袁哥
 * @时间:2021/7/22 10:28
 */
public class Flink_High_Project_TopN_Two {


    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",20000);
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(conf).setParallelism(2);

        environment.readTextFile("input/UserBehavior.csv")
                .map(line -> {
                    String[] split = line.split(",");
                    return new UserBehavior(Long.valueOf(split[0]),Long.valueOf(split[1]),Integer.valueOf(split[2]),
                            split[3],Long.parseLong(split[4]) * 1000);
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
                )
                .filter(behavior -> "pv".equals(behavior.getBehavior()))
                .keyBy(UserBehavior::getItemId)
                .window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(20)))
                .aggregate(
                        new AggregateFunction<UserBehavior, Long, Long>() {
                            @Override
                            public Long createAccumulator() {
                                return 0L;
                            }

                            @Override
                            public Long add(UserBehavior value, Long accumulator) {
                                return accumulator + 1;
                            }

                            @Override
                            public Long getResult(Long accumulator) {
                                return accumulator;
                            }

                            @Override
                            public Long merge(Long a, Long b) {
                                return null;
                            }
                        },
                        // 商品id 窗口结束时间 点击量
                        new ProcessWindowFunction<Long, HotItem, Long, TimeWindow>() {
                            @Override
                            public void process(Long key,
                                                Context context,
                                                Iterable<Long> elements,
                                                Collector<HotItem> out) throws Exception {
                                out.collect(new HotItem(key,elements.iterator().next(),context.window().getEnd()));
                            }
                        }
                )
                .keyBy(HotItem::getWindowEndTime)
                .process(new KeyedProcessFunction<Long, HotItem, String>() {
                    ValueState<TreeSet<HotItem>> treeSetValueState;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        treeSetValueState = getRuntimeContext()
                                .getState(new ValueStateDescriptor<TreeSet<HotItem>>(
                                        "hotItmeListState",
                                        TypeInformation.of(new TypeHint<TreeSet<HotItem>>() {})
                                ));
                    }

                    @Override
                    public void processElement(HotItem value, Context ctx, Collector<String> out) throws Exception {
                        // 状态存储的时候,其实不需要存储所有的元素, 只需要存储已有最大的3个
                        // TreeSet 可以自动排序
                        if (treeSetValueState.value() == null){
                            TreeSet<HotItem> hotItems = new TreeSet<>(new Comparator<HotItem>() {
                                @Override
                                public int compare(HotItem o1, HotItem o2) {
                                    if (o1.getCount().equals(o2.getCount())) {
                                        return 1;
                                    } else {
                                        return o2.getCount().compareTo(o1.getCount());
                                    }
                                }
                            });
                            treeSetValueState.update(hotItems);
                            ctx.timerService().registerEventTimeTimer(value.getWindowEndTime() + 5000);
                        }
                        TreeSet<HotItem> set = treeSetValueState.value();
                        set.add(value);
                        if (set.size() > 3){
                            set.pollLast(); // 删除最后一个
                        }
                    }

                    @Override
                    public void onTimer(long timestamp,
                                        OnTimerContext ctx,
                                        Collector<String> out) throws Exception {
                        TreeSet<HotItem> set = treeSetValueState.value();

                        StringBuilder builder = new StringBuilder();
                        builder.append("----------------\n");
                        for (HotItem hotItem : set) {
                            builder.append("窗口:")
                                    .append(new Date(hotItem.getWindowEndTime()))
                                    .append(",商品:")
                                    .append(hotItem)
                                    .append("\n");
                        }
                        out.collect(builder.toString());
                    }
                })
                .print();
        try {
            environment.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

8.2.2 基于服务器log的热门页面浏览量统计

  对于一个电商平台而言,用户登录的入口流量、不同页面的访问流量都是值得分析的重要数据,而这些数据,可以简单地从web服务器的日志中提取出来。我们在这里先实现“热门页面浏览数”的统计,也就是读取服务器日志中的每一行log,统计在一段时间内用户访问每一个url的次数,然后排序输出显示。

  具体做法为:每隔5秒,输出最近10分钟内访问量最多的前NURL。可以看出,这个需求与之前“实时热门商品统计”非常类似,所以我们完全可以借鉴此前的代码。

  1)PageCount 

package com.yuange.flink.day06;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @作者:袁哥
 * @时间:2021/7/21 21:17
 */
@AllArgsConstructor
@NoArgsConstructor
@Data
public class PageCount {
    private String url;
    private Long count;
    private Long windowEnd;
}

  2)ApacheLog

package com.yuange.flink.day06;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @作者:袁哥
 * @时间:2021/7/21 21:18
 */
@AllArgsConstructor
@NoArgsConstructor
@Data
public class ApacheLog {
    private String ip;
    private Long eventTime;
    private String method;
    private String url;
}

  3)代码实现

package com.yuange.flink.day06;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.TreeSet;

/**
 * @作者:袁哥
 * @时间:2021/7/22 11:45
 */
public class Flink_Project_Page_TopN {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        environment.readTextFile("input/apache.log")
                .map(line -> {
                    String[] split = line.split(" ");
                    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
                    return new ApacheLog(split[0],simpleDateFormat.parse(split[3]).getTime(),split[5],split[6]);
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<ApacheLog>forBoundedOutOfOrderness(Duration.ofSeconds(60))
                        .withTimestampAssigner(new SerializableTimestampAssigner<ApacheLog>() {
                            @Override
                            public long extractTimestamp(ApacheLog element, long recordTimestamp) {
                                return element.getEventTime();
                            }
                        })
                )
                .keyBy(ApacheLog::getUrl)
                .window(SlidingEventTimeWindows.of(Time.minutes(10),Time.seconds(5)))
                .aggregate(new AggregateFunction<ApacheLog, Long, Long>() {
                               @Override
                               public Long createAccumulator() {
                                   return 0L;
                               }

                               @Override
                               public Long add(ApacheLog value, Long accumulator) {
                                   return accumulator + 1L;
                               }

                               @Override
                               public Long getResult(Long accumulator) {
                                   return accumulator;
                               }

                               @Override
                               public Long merge(Long a, Long b) {
                                   return a + b;
                               }
                           }, new ProcessWindowFunction<Long, PageCount, String, TimeWindow>() {
                               @Override
                               public void process(String key,
                                                   Context context,
                                                   Iterable<Long> elements,
                                                   Collector<PageCount> out) throws Exception {
                                   out.collect(new PageCount(key,elements.iterator().next(),context.window().getEnd()));
                               }
                           }
                )
                .keyBy(PageCount::getWindowEnd)
                .process(new KeyedProcessFunction<Long, PageCount, String>() {
                    ListState<PageCount> pageState;
                    ValueState<Long> timerTs;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        pageState = getRuntimeContext().getListState(new ListStateDescriptor<PageCount>("pageState", PageCount.class));
                        timerTs = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timerTs", Long.class));
                    }

                    @Override
                    public void processElement(PageCount value, Context ctx, Collector<String> out) throws Exception {
                        pageState.add(value);
                        if (timerTs.value() == null){
                            ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 10L);
                            timerTs.update(value.getWindowEnd());
                        }
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        // 换个排序的思路: 使用TreeSet的自动排序功能
                        TreeSet<PageCount> pageCounts = new TreeSet<>((o1,o2) -> {  //定义排序规则
                            if (o1.getCount() < o2.getCount()){
                                return 1;
                            }else {
                                return -1;
                            }
                        });

                        for (PageCount pageCount : pageState.get()) {
                            pageCounts.add(pageCount);
                            if (pageCounts.size() > 3){
                                pageCounts.pollLast(); // 如果长度超过N, 则删除最后一个
                            }
                        }
                        StringBuilder builder = new StringBuilder();
                        builder.append("窗口结束时间: " + (timestamp - 10) + "\n");
                        builder.append("------------------------\n");
                        for (PageCount pageCount : pageCounts) {
                            builder.append(pageCount + "\n");
                        }
                        builder.append("-----------------------\n");
                        out.collect(builder.toString());
                    }
                })
                .print();
        environment.execute();
    }
}

8.3 页面广告分析

8.3.1 页面广告点击量统计

  电商网站的市场营销商业指标中,除了自身的APP推广,还会考虑到页面上的广告投放(包括自己经营的产品和其它网站的广告)。所以广告相关的统计分析,也是市场营销的重要指标。

  对于广告的统计,最简单也最重要的就是页面广告的点击量,网站往往需要根据广告点击量来制定定价策略和调整推广方式,而且也可以借此收集用户的偏好信息。更加具体的应用是,我们可以根据用户的地理位置进行划分,从而总结出不同省份用户对不同广告的偏好,这样更有助于广告的精准投放。

  在之前的需求实现中,已经统计的广告的点击次数总和,但是没有实现窗口操作,并且也未增加排名处理,这次添加窗口,并增加排名

  1)AdsClickLog 

package com.yuange.flink.day06;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @作者:袁哥
 * @时间:2021/7/21 21:20
 */
@NoArgsConstructor
@AllArgsConstructor
@Data
public class AdsClickLog {
    private Long userId;
    private Long adsId;
    private String province;
    private String city;
    private Long timestamp;
}

  2)代码实现

package com.yuange.flink.day06;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;

/**
 * @作者:袁哥
 * @时间:2021/7/22 12:05
 */
public class Flink_Project_AdsClick {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        environment.readTextFile("input/AdClickLog.csv")
                .map(line -> {
                    String[] datas = line.split(",");
                    return new AdsClickLog(Long.valueOf(datas[0]),
                            Long.valueOf(datas[1]),
                            datas[2],
                            datas[3],
                            Long.valueOf(datas[4]));
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<AdsClickLog>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                                .withTimestampAssigner(new SerializableTimestampAssigner<AdsClickLog>() {
                                    @Override
                                    public long extractTimestamp(AdsClickLog element, long recordTimestamp) {
                                        return element.getTimestamp() * 1000L;
                                    }
                                })
                )
                .keyBy(new KeySelector<AdsClickLog, Tuple2<String,Long>>() {
                    @Override
                    public Tuple2<String, Long> getKey(AdsClickLog value) throws Exception {
                        return Tuple2.of(value.getProvince(),value.getAdsId());
                    }
                })
                .window(SlidingEventTimeWindows.of(Time.hours(1),Time.seconds(10)))
                .allowedLateness(Time.seconds(10))
                .sideOutputLateData(new OutputTag<AdsClickLog>("ads_late"){})
                .aggregate(new AggregateFunction<AdsClickLog, Long, Long>() {
                               @Override
                               public Long createAccumulator() {
                                   return 0L;
                               }

                               @Override
                               public Long add(AdsClickLog value, Long accumulator) {
                                   return accumulator + 1L;
                               }

                               @Override
                               public Long getResult(Long accumulator) {
                                   return accumulator;
                               }

                               @Override
                               public Long merge(Long a, Long b) {
                                   return a + b;
                               }
                           }, new ProcessWindowFunction<Long, Tuple4<String,Long,Long,Long>, Tuple2<String, Long>, TimeWindow>() {
                               @Override
                               public void process(Tuple2<String, Long> key,
                                                   Context context,
                                                   Iterable<Long> elements,
                                                   Collector<Tuple4<String, Long, Long, Long>> out) throws Exception {
                                   out.collect(Tuple4.of(key.f0,key.f1,elements.iterator().next(),context.window().getEnd()));
                               }
                           }
                )
                .keyBy(t -> t.f3)
                .process(new KeyedProcessFunction<Long, Tuple4<String, Long, Long, Long>, String>() {
                    private ValueState<Long> windowEnd;
                    private ListState<Tuple4<String, Long, Long, Long>> datas;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        datas = getRuntimeContext()
                                .getListState(new ListStateDescriptor<Tuple4<String, Long, Long, Long>>("datas",
                                        TypeInformation.of(new TypeHint<Tuple4<String, Long, Long, Long>>(){})));
                        windowEnd = getRuntimeContext().getState(new ValueStateDescriptor<Long>("windowEnd", Long.class));
                    }

                    @Override
                    public void processElement(Tuple4<String, Long, Long, Long> value,
                                               Context ctx, Collector<String> out) throws Exception {
                        // 存数据
                        datas.add(value);
                        // 注册定时器
                        if (windowEnd.value() == null) {
                            ctx.timerService().registerEventTimeTimer(value.f3 + 10L);
                            windowEnd.update(value.f3);
                        }
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        ArrayList<Tuple4<String, Long, Long, Long>> result = new ArrayList<>();
                        for (Tuple4<String, Long, Long, Long> t : datas.get()) {
                            result.add(t);
                        }

                        // 清空状态
                        windowEnd.clear();
                        datas.clear();

                        // 排序, 取top3
                        result.sort(new Comparator<Tuple4<String, Long, Long, Long>>() {
                            @Override
                            public int compare(Tuple4<String, Long, Long, Long> o1, Tuple4<String, Long, Long, Long> o2) {
                                return (int)(o2.f2 - o1.f2);
                            }
                        });

                        // 返回的数据
                        StringBuilder builder = new StringBuilder();
                        builder.append("窗口结束时间: ").append(timestamp - 10).append("\n");
                        builder.append("---------------------------------\n");
                        for (int i = 0; i < Math.min(3, result.size()); i++) {
                            builder.append(result.get(i)).append("\n");
                        }
                        builder.append("---------------------------------\n");
                        out.collect(builder.toString());
                    }
                })
                .print();
        environment.execute();
    }
}

8.3.2 黑名单过滤

  我们进行的点击量统计,同一用户的重复点击是会叠加计算的。在实际场景中,同一用户确实可能反复点开同一个广告,这也说明了用户对广告更大的兴趣;但是如果用户在一段时间非常频繁地点击广告,这显然不是一个正常行为,有刷点击量的嫌疑。所以我们可以对一段时间内(比如一天内)的用户点击行为进行约束,如果对同一个广告点击超过一定限额(比如100次),应该把该用户加入黑名单并报警,此后其点击行为不应该再统计。

  两个功能: 

    1)告警: 使用侧输出流

    2)已经进入黑名单的用户的广告点击记录不再进行统计

  代码实现

package com.yuange.flink.day06;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;
import java.time.Duration;

/**
 * @作者:袁哥
 * @时间:2021/7/22 11:26
 */
public class Flink_High_Project_Blacklist {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        SingleOutputStreamOperator<String> process = environment.readTextFile("input/AdClickLog.csv")
                .map(line -> {
                    String[] split = line.split(",");
                    return new AdsClickLog(Long.valueOf(split[0]), Long.valueOf(split[1]), split[2], split[3], Long.valueOf(split[4]));
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<AdsClickLog>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                                .withTimestampAssigner(new SerializableTimestampAssigner<AdsClickLog>() {
                                    @Override
                                    public long extractTimestamp(AdsClickLog element, long recordTimestamp) {
                                        return element.getTimestamp() * 1000L;
                                    }
                                })
                )
                .keyBy(log -> log.getUserId() + "_" + log.getAdsId())
                .process(new KeyedProcessFunction<String, AdsClickLog, String>() {
                    ReducingState<Long> conutState;
                    ValueState<Boolean> warnState;
                    ValueState<String> yesterdayState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        conutState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Long>("conutState", Long::sum, Long.class));
                        warnState = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("warnState", Boolean.class));
                        yesterdayState = getRuntimeContext().getState(new ValueStateDescriptor<String>("yesterdayState", String.class));
                    }

                    @Override
                    public void processElement(AdsClickLog log, Context ctx, Collector<String> out) throws Exception {
                        // 如何知道到了第二天, 清空状态
                        String yesterday = yesterdayState.value();
                        String today = new SimpleDateFormat("yyyy-MM-dd").format(log.getTimestamp() * 1000);
//                        if (log.getUserId() == 937166) System.out.println(yesterday + "   " + today);
                        if (!today.equals(yesterday)) {
                            if (log.getUserId() == 937166) System.out.println("xxxxx");
                            conutState.clear();
                            warnState.clear();
                            yesterdayState.update(today);
                        }

                        // 聚合
                        if (warnState.value() == null) {
                            conutState.add(1L);
                        }

                        String msg = "用户:" + log.getUserId() + "  对广告:" + log.getAdsId() + "的点击量是:" + conutState.get();
                        if (conutState.get() > 99) {
                            if (warnState.value() == null) {
                                ctx.output(new OutputTag<String>("blackList") {
                                }, msg + "加入黑名单");
                                warnState.update(true);
                            }
                        } else {
                            out.collect(msg);
                        }
                    }
                });
        process.print("main");
        process.getSideOutput(new OutputTag<String>("blackList"){}).print("black");

        environment.execute();
    }
}

8.4 恶意登录监控

  对于网站而言,用户登录并不是频繁的业务操作。如果一个用户短时间内频繁登录失败,就有可能是出现了程序的恶意攻击,比如密码暴力破解。

  因此我们考虑,应该对用户的登录失败动作进行统计,具体来说,如果同一用户(可以是不同IP)在2秒之内连续两次登录失败,就认为存在恶意登录的风险,输出相关的信息进行报警提示。这是电商网站、也是几乎所有网站风控的基本一环。

8.4.1 数据源:LoginLog.csv

8.4.2 封装数据的JavaBean

package com.yuange.flink.day06;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @作者:袁哥
 * @时间:2021/7/21 21:25
 */
@NoArgsConstructor
@AllArgsConstructor
@Data
public class LoginEvent {
    private Long userId;
    private String ip;
    private String eventType;
    private Long eventTime;
}

8.4.3 具体实现代码

  统计连续失败的次数:

    1)把失败的时间戳放入到List中,

    2)当List中的长度到达2的时候, 判断这个两个时间戳的差是否小于等于2s

    3)如果是, 则这个用户在恶意登录

    4)否则不是, 然后删除List的第一个元素

    5)用于保持List的长度为2

    6)如果出现成功, 则需要清空List集合

  代码实现一:

package com.yuange.flink.day06;

import com.yuange.flink.utils.YuangeUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.List;

/**
 * @作者:袁哥
 * @时间:2021/7/23 17:47
 */
public class Flink_High_Project_Login {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(2);

        environment.readTextFile("input/LoginLog.csv")
                .map(line -> {
                    String[] split = line.split(",");
                    return new LoginEvent(Long.valueOf(split[0]),split[1],split[2],Long.parseLong(split[3]) * 1000L);
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                                .withTimestampAssigner(new SerializableTimestampAssigner<LoginEvent>() {
                                    @Override
                                    public long extractTimestamp(LoginEvent element, long recordTimestamp) {
                                        return element.getEventTime();
                                    }
                                })
                )
                .keyBy(LoginEvent::getUserId)
                .countWindow(2,1)   //使用次数窗口不能解决数据的乱序问题!!!
                .process(new ProcessWindowFunction<LoginEvent, String, Long, GlobalWindow>() {
                    @Override
                    public void process(Long key,
                                        Context context,
                                        Iterable<LoginEvent> elements,
                                        Collector<String> out) throws Exception {
                        // elements 个数: 最多是2
                        List<LoginEvent> loginEvents = YuangeUtil.toList(elements);
                        if (loginEvents.size() == 2){
                            LoginEvent loginEvent = loginEvents.get(0);
                            LoginEvent loginEvent2 = loginEvents.get(1);

                            String type = loginEvent.getEventType();
                            String type2 = loginEvent2.getEventType();

                            long ts = loginEvent.getEventTime() / 1000;
                            long ts2 = loginEvent2.getEventTime() / 1000;

                            if ("fail".equals(type) && "fail".equals(type2) && Math.abs(ts - ts2) <= 2){
                                out.collect(key + "正在恶意登录");
                            }
                        }
                    }
                })
                .print();
        environment.execute();
    }
}

  代码实现二:

package com.yuange.flink.day06;

import com.yuange.flink.utils.YuangeUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

/**
 * @作者:袁哥
 * @时间:2021/7/23 17:47
 */
public class Flink_High_Project_Login_Two {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(2);

        environment.readTextFile("input/LoginLog.csv")
                .map(line -> {
                    String[] split = line.split(",");
                    return new LoginEvent(Long.valueOf(split[0]),split[1],split[2],Long.parseLong(split[3]) * 1000L);
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                                .withTimestampAssigner(new SerializableTimestampAssigner<LoginEvent>() {
                                    @Override
                                    public long extractTimestamp(LoginEvent element, long recordTimestamp) {
                                        return element.getEventTime();
                                    }
                                })
                )
                .keyBy(LoginEvent::getUserId)
                .process(new KeyedProcessFunction<Long, LoginEvent, String>() {
                    ListState<Long> failTs;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        failTs = getRuntimeContext().getListState(new ListStateDescriptor<Long>("failTs", Long.class));
                    }

                    /*
                    * 1.把失败的时间戳放入到ListState状态中
                    * 2.当List中的长度到达2的时候, 判断这个两个时间戳的差是否小于等于2s
                    * 3.如果是, 则这个用户在恶意登录
                    * 4.若不是, 删除List的第一个元素用于保持List的长度为2
                    * 5.如果出现登录成功, 则需要清空List集合, 重新开始计算
                    * */
                    @Override
                    public void processElement(LoginEvent value,
                                               Context ctx,
                                               Collector<String> out) throws Exception {
                        switch (value.getEventType()){
                            case "fail" :
                                failTs.add(value.getEventTime());
                                List<Long> ts = YuangeUtil.toList(failTs.get());
                                if (ts.size() == 2){
                                    long delta = ts.get(1) - ts.get(0);
                                    if (delta / 1000 <= 2){
                                        out.collect(value.getUserId() + "存在恶意登录");
                                    }else {
                                        ts.remove(0);
                                        failTs.update(ts);
                                    }
                                }
                                break;
                            case "success":
                                failTs.clear();
                                break;
                            default:
                        }
                    }
                })
                .print();
        environment.execute();
    }
}

8.5 订单支付实时监控

  在电商网站中,订单的支付作为直接与营销收入挂钩的一环,在业务流程中非常重要。对于订单而言,为了正确控制业务流程,也为了增加用户的支付意愿,网站一般会设置一个支付失效时间,超过一段时间不支付的订单就会被取消。另外,对于订单的支付,我们还应保证用户支付的正确性,这可以通过第三方支付平台的交易数据来做一个实时对账。

  代码实现

package com.yuange.flink.day06;

import com.yuange.flink.day03.OrderEvent;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @作者:袁哥
 * @时间:2021/7/23 18:24
 */
public class Flink_High_Project_Order {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(2);

        environment.readTextFile("input/OrderLog.csv")
                .map(line -> {
                    String[] split = line.split(",");
                    return new OrderEvent(Long.valueOf(split[0]),split[1],split[2],Long.parseLong(split[3]) * 1000);
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<OrderEvent>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                                .withTimestampAssigner(new SerializableTimestampAssigner<OrderEvent>() {
                                    @Override
                                    public long extractTimestamp(OrderEvent element, long recordTimestamp) {
                                        return element.getEventTime();
                                    }
                                })
                )
                .keyBy(OrderEvent::getOrderId)
                .process(new KeyedProcessFunction<Long, OrderEvent, String>() {
                    ValueState<OrderEvent> createState;
                    ValueState<OrderEvent> payState;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        createState = getRuntimeContext().getState(new ValueStateDescriptor<OrderEvent>("createState", OrderEvent.class));
                        payState = getRuntimeContext().getState(new ValueStateDescriptor<OrderEvent>("payState", OrderEvent.class));
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        if (createState.value() == null){
                            out.collect(ctx.getCurrentKey() + " 订单只有支付, 没有创建, 请检测系统漏洞");
                        }else if (payState.value() == null){
                            out.collect(ctx.getCurrentKey() + " 订单有创建, 没有支付, 这应该是个穷人");
                        }
                    }

                    @Override
                    public void processElement(OrderEvent value, Context ctx, Collector<String> out) throws Exception {

                        // 同一个订单, 当第一个来的时候注册定时器
                        if (createState.value() == null && payState.value() == null){   // 第一个进来
                            // 注册定时器
                            ctx.timerService().registerEventTimeTimer(value.getEventTime() + 45 * 60 * 1000);
                        }else { // 第二个进来
                            // 把定时器取消掉
                            Long timeTs = createState.value() == null ? payState.value().getEventTime() : createState.value().getEventTime();
                            ctx.timerService().deleteEventTimeTimer(timeTs + 45 * 60 * 1000);
                        }

                        String eventType = value.getEventType();
                        if ("create".equals(eventType)){ // 如果本次是create
                            if (payState.value() != null){  // 判断pay数据有没有到
                                out.collect(ctx.getCurrentKey() + "正常支付");
                            }else {
                                createState.update(value);
                            }
                        }else { // 本次是pay
                            if (createState.value() != null){   // 判断create有没有到
                                out.collect(ctx.getCurrentKey() + "正常支付");
                            }else {
                                payState.update(value);
                            }
                        }
                    }
                })
                .print();
        environment.execute();
    }
}

第9章 Flink CEP编程

9.1 什么是FlinkCEP

  FlinkCEP(Complex event processing for Flink) 是在Flink实现的复杂事件处理,它可以让你在无界流中检测出特定的数据,有机会掌握数据中重要的那部分。是一种基于动态环境中事件流的分析技术,事件在这里通常是有意义的状态变化,通过分析事件间的关系,利用过滤、关联、聚合等技术,根据事件间的时序关系和聚合关系制定检测规则,持续地从事件流中查询出符合要求的事件序列,最终分析得到更复杂的复合事件。

  1)目标:从有序的简单事件流中发现一些高阶特征

  2)输入:一个或多个由简单事件构成的事件流

  3)处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件

  4)输出:满足规则的复杂事件

9.2 Flink CEP应用场景

  1)风险控制:对用户异常行为模式进行实时检测,当一个用户发生了不该发生的行为,判定这个用户是不是有违规操作的嫌疑。

  2)策略营销:用预先定义好的规则对用户的行为轨迹进行实时跟踪,对行为轨迹匹配预定义规则的用户实时发送相应策略的推广。

  3)运维监控:灵活配置多指标、多依赖来实现更复杂的监控模式。

9.3 CEP开发基本步骤

9.3.1 导入CEP相关依赖(你实际的版本使用多少的就改为多少的)

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep_2.12</artifactId>
    <version>1.13.1</version>
</dependency>

9.3.2 基本使用

  1)sensor.txt数据

sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60

  2)代码

package com.yuange.flink.day07;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;
import java.util.List;
import java.util.Map;

/**
 * @作者:袁哥
 * @时间:2021/7/23 19:14
 */
public class Flink_CEP_BasicUse {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(2);

        SingleOutputStreamOperator<WaterSensor> stream = environment.readTextFile("input/sensor.txt")
                .map(line -> {
                    String[] split = line.split(",");
                    return new WaterSensor(split[0], Long.parseLong(split[1]) * 1000, Integer.parseInt(split[2]));
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                );

        // 1. 定义模式
        Pattern<WaterSensor, WaterSensor> pattern = Pattern.<WaterSensor>begin("start")
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                });

        // 2. 在流上应用模式
        // 3. 获取匹配到的结果
        CEP.pattern(stream, pattern)
                .select(new PatternSelectFunction<WaterSensor, String>() {
                    @Override
                    public String select(Map<String, List<WaterSensor>> map) throws Exception {
                        return map.toString();
                    }
                })
                .print();

        environment.execute();
    }
}

  3)结果

9.4 模式API

  模式API可以让你定义想从输入流中抽取的复杂模式序列。下面有几个相关概念:

  1)模式:比如找拥有相同属性事件序列的模式(前面案例中的拥有相同的id), 我们一般把简单模式称之为模式

    (1)每个模式必须有一个独一无二的名字,你可以在后面使用它来识别匹配到的事件。(比如前面的start模式)

    (2)模式的名字不能包含字符":"

  2)模式序列:每个复杂的模式序列包括多个简单的模式,也叫模式序列. 你可以把模式序列看作是这样的模式构成的图, 这些模式基于用户指定的条件从一个转换到另外一 

  3)匹配:输入事件的一个序列,这些事件通过一系列有效的模式转换,能够访问到复杂模式图中的所有模式。

9.4.1 单个模式

  1)数据准备

sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60

  2)单例模式:单例模式只接受一个事件,默认情况模式都是单例模式,前面的例子就是一个单例模式

  3)循环模式:循环模式可以接受多个事件,单例模式配合上量词就是循环模式(非常类似我们熟悉的正则表达式)

    (1)固定次数

package com.yuange.flink.day07;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;
import java.util.List;
import java.util.Map;

/**
 * @作者:袁哥
 * @时间:2021/7/23 19:14
 */
public class Flink_CEP_Basic_Loop {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> stream = environment.readTextFile("input/sensor.txt")
                .map(line -> {
                    String[] split = line.split(",");
                    return new WaterSensor(split[0], Long.parseLong(split[1]) * 1000, Integer.parseInt(split[2]));
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                );

        // 1. 定义模式
        Pattern<WaterSensor, WaterSensor> pattern = Pattern.<WaterSensor>begin("start")
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                })
                .times(2);  //使用量词 出现两次

        // 2. 在流上应用模式
        // 3. 获取匹配到的结果
        CEP.pattern(stream, pattern)
                .select(new PatternSelectFunction<WaterSensor, String>() {
                    @Override
                    public String select(Map<String, List<WaterSensor>> map) throws Exception {
                        return map.toString();
                    }
                })
                .print();

        environment.execute();
    }
}

    (2)范围内的次数

// 1. 定义模式
        Pattern<WaterSensor, WaterSensor> pattern = Pattern.<WaterSensor>begin("start")
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                })
                .times(2,4);    //使用量词 [2,4]   2次,3次或4次

     (3)一次或多次

// 1. 定义模式
        Pattern<WaterSensor, WaterSensor> pattern = Pattern.<WaterSensor>begin("start")
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                })
                .oneOrMore();   //一次或多次

     (4)多次及多次以上

        // 1. 定义模式
        Pattern<WaterSensor, WaterSensor> pattern = Pattern.<WaterSensor>begin("start")
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                })
                .timesOrMore(2);    //2次或2次一样

   4)条件:对每个模式你可以指定一个条件来决定一个进来的事件是否被接受进入这个模式,例如前面用到的where就是一种条件

    (1)迭代条件:这是最普遍的条件类型。使用它可以指定一个基于前面已经被接受的事件的属性或者它们的一个子集的统计数据来决定是否接受时间序列的条件。

package com.yuange.flink.day07;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;
import java.util.List;
import java.util.Map;

/**
 * @作者:袁哥
 * @时间:2021/7/23 19:45
 */
public class Flink_CEP_Base_Whrere {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> stream = environment.readTextFile("input/sensor.txt")
                .map(line -> {
                    String[] split = line.split(",");
                    return new WaterSensor(split[0], Long.parseLong(split[1]) * 1000, Integer.parseInt(split[2]));
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                );

        //定义规则(模式)
        Pattern<WaterSensor, WaterSensor> pattern = Pattern.<WaterSensor>begin("start")
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                });

        //把模式运用在流上-> 得到一个模式流
        //从模式流中取出匹配到的数据
        CEP.pattern(stream,pattern)
                .select(new PatternSelectFunction<WaterSensor, String>() {
                    @Override
                    public String select(Map<String, List<WaterSensor>> map) throws Exception {
                        return map.toString();
                    }
                })
                .print();
        environment.execute();
    }
}

    (2)简单条件:这种类型的条件扩展了前面提到的IterativeCondition类,它决定是否接受一个事件只取决于事件自身的属性。

//定义规则(模式)
        Pattern<WaterSensor, WaterSensor> pattern = Pattern.<WaterSensor>begin("start")
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                });

    (3)组合条件:把多个条件结合起来使用. 这适用于任何条件,你可以通过依次调用where()来组合条件,最终的结果是每个单一条件的结果的逻辑AND,如果想使用OR来组合条件,你可以像

下面这样使用or()方法

//定义规则(模式)
        Pattern<WaterSensor, WaterSensor> pattern = Pattern.<WaterSensor>begin("start")
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                })
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return value.getVc() > 30;
                    }
                })
                .or(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return value.getVc() > 3000;
                    }
                });

    (4)停止条件:如果使用循环模式(oneOrMore, timesOrMore), 可以指定一个停止条件, 否则有可能会内存吃不消,意思是满足了给定的条件的事件出现后,就不会再有事件被接受进入模式了

package com.yuange.flink.day07;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;
import java.util.List;
import java.util.Map;

/**
 * @作者:袁哥
 * @时间:2021/7/23 19:45
 */
public class Flink_CEP_Base_Whrere_Until {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> stream = environment.readTextFile("input/sensor.txt")
                .map(line -> {
                    String[] split = line.split(",");
                    return new WaterSensor(split[0], Long.parseLong(split[1]) * 1000, Integer.parseInt(split[2]));
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                );

        //定义规则(模式)
        Pattern<WaterSensor, WaterSensor> pattern = Pattern.<WaterSensor>begin("start")
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                })
                .timesOrMore(2)
                .until(new SimpleCondition<WaterSensor>() {     //当符合该条件时停止程序
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return value.getVc() > 40;
                    }
                });

        //把模式运用在流上-> 得到一个模式流
        //从模式流中取出匹配到的数据
        CEP.pattern(stream,pattern)
                .select(new PatternSelectFunction<WaterSensor, String>() {
                    @Override
                    public String select(Map<String, List<WaterSensor>> map) throws Exception {
                        return map.toString();
                    }
                })
                .print();
        environment.execute();
    }
}

9.4.2 组合模式(模式序列)

  把多个单个模式组合在一起就是组合模式.  组合模式由一个初始化模式(.begin(...))开头

  1)严格连续(严格紧邻):期望所有匹配的事件严格的一个接一个出现,中间没有任何不匹配的事件

package com.yuange.flink.day07;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;
import java.util.List;
import java.util.Map;

/**
 * @作者:袁哥
 * @时间:2021/7/23 19:45
 */
public class Flink_CEP_Base_Whrere_Combine {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> stream = environment.readTextFile("input/sensor.txt")
                .map(line -> {
                    String[] split = line.split(",");
                    return new WaterSensor(split[0], Long.parseLong(split[1]) * 1000, Integer.parseInt(split[2]));
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                );

        //定义规则(模式)
        Pattern<WaterSensor, WaterSensor> pattern = Pattern.<WaterSensor>begin("start")
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                })
                .next("end")    //严格连续,而notNext:如果不想后面直接连着一个特定事件
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return "sensor_2".equals(value.getId());
                    }
                });

        //把模式运用在流上-> 得到一个模式流
        //从模式流中取出匹配到的数据
        CEP.pattern(stream,pattern)
                .select(new PatternSelectFunction<WaterSensor, String>() {
                    @Override
                    public String select(Map<String, List<WaterSensor>> map) throws Exception {
                        return map.toString();
                    }
                })
                .print();
        environment.execute();
    }
}

  2)松散连续:忽略匹配的事件之间的不匹配的事件

//定义规则(模式)
        Pattern<WaterSensor, WaterSensor> pattern = Pattern.<WaterSensor>begin("start")
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                })
                .followedBy("end")  //松散连续,而notFollowBy:如果不想一个特定事件发生在两个事件之间的任何地方。(notFollowBy不能位于事件的最后)
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return "sensor_2".equals(value.getId());
                    }
                });

  3)非确定的松散连续:更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配,当且仅当数据为a,c,b,b时,对于followedBy模式而言命中的为{a,b},对于followedByAny而言会有两次命中{a,b},{a,b}

//定义规则(模式)
        Pattern<WaterSensor, WaterSensor> pattern = Pattern.<WaterSensor>begin("start")
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                })
                .followedByAny("end")   //非确定的松散连续
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return "sensor_2".equals(value.getId());
                    }
                });

9.4.3 模式知识补充

  1)循环模式的连续性:前面的连续性也可以运用在单个循环模式,连续性会被运用在被接受进入模式的事件之间。

    (1)松散连续:默认是松散连续

// 1. 定义模式
        Pattern<WaterSensor, WaterSensor> pattern = Pattern.<WaterSensor>begin("start")
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                })
                .times(2);  //使用量词 出现两次

    (2)严格连续

// 1. 定义模式
        Pattern<WaterSensor, WaterSensor> pattern = Pattern.<WaterSensor>begin("start")
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                })
                .times(2)       //使用量词 出现两次
                .consecutive(); //相当于next()

    (3)非确定的松散连续

// 1. 定义模式
        Pattern<WaterSensor, WaterSensor> pattern = Pattern.<WaterSensor>begin("start")
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                })
                .times(2)               //使用量词 出现两次
                .allowCombinations();   //相当于followByAny

  2)循环模式的贪婪性:在组合模式情况下, 对次数的处理尽可能获取最多个的那个次数, 就是贪婪!当一个事件同时满足两个模式的时候起作用

    (1)数据准备

sensor_1,1,10
sensor_1,2,20
sensor_1,3,30
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60

    (2)代码

package com.yuange.flink.day07;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;
import java.util.List;
import java.util.Map;

/**
 * @作者:袁哥
 * @时间:2021/7/23 19:14
 */
public class Flink_CEP_Basic_Loop_Greedy {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> stream = environment.readTextFile("input/sensor.txt")
                .map(line -> {
                    String[] split = line.split(",");
                    return new WaterSensor(split[0], Long.parseLong(split[1]) * 1000, Integer.parseInt(split[2]));
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                );

        // 1. 定义模式
        Pattern<WaterSensor, WaterSensor> pattern = Pattern.<WaterSensor>begin("start")
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                })
                .times(2,3)     //使用量词 出现两次
                .greedy()       //贪婪,一般贪婪要比非贪婪的结果要少,模式组不能设置为greedy
                .next("end")
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return value.getVc() == 30;
                    }
                });

        // 2. 在流上应用模式
        // 3. 获取匹配到的结果
        CEP.pattern(stream, pattern)
                .select(new PatternSelectFunction<WaterSensor, String>() {
                    @Override
                    public String select(Map<String, List<WaterSensor>> map) throws Exception {
                        return map.toString();
                    }
                })
                .print();

        environment.execute();
    }
}

    (3)结果

{start=[WaterSensor(id=sensor_1, ts=1000, vc=10), WaterSensor(id=sensor_1, ts=2000, vc=20), WaterSensor(id=sensor_1, ts=3000, vc=30)], end=[WaterSensor(id=sensor_2, ts=4000, vc=30)]}
{start=[WaterSensor(id=sensor_1, ts=2000, vc=20), WaterSensor(id=sensor_1, ts=3000, vc=30)], end=[WaterSensor(id=sensor_2, ts=4000, vc=30)]}

    (4)分析:sensor_1,3,30  在匹配的的时候, 既能匹配第一个模式也可以匹配的第二个模式, 由于第一个模式使用量词则使用greedy的时候会优先匹配第一个模式, 因为要尽可能多的次数

  3)模式可选性:可以使用pattern.optional()方法让所有的模式变成可选的,不管是否是循环模式

package com.yuange.flink.day07;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;
import java.util.List;
import java.util.Map;

/**
 * @作者:袁哥
 * @时间:2021/7/23 19:14
 */
public class Flink_CEP_Basic_Loop_Greedy_Two {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> stream = environment.readTextFile("input/sensor.txt")
                .map(line -> {
                    String[] split = line.split(",");
                    return new WaterSensor(split[0], Long.parseLong(split[1]) * 1000, Integer.parseInt(split[2]));
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                );

        // 1. 定义模式
        Pattern<WaterSensor, WaterSensor> pattern = Pattern.<WaterSensor>begin("start")
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                })
                .times(2)     //使用量词 出现两次
                .optional()     //0次或2次
                .next("end")
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return "sensor_2".equals(value.getId());
                    }
                });

        // 2. 在流上应用模式
        // 3. 获取匹配到的结果
        CEP.pattern(stream, pattern)
                .select(new PatternSelectFunction<WaterSensor, String>() {
                    @Override
                    public String select(Map<String, List<WaterSensor>> map) throws Exception {
                        return map.toString();
                    }
                })
                .print();

        environment.execute();
    }
}

9.4.4 模式组

  在前面的代码中次数只能用在某个模式上比如: .begin(...).where(...).next(...).where(...).times(2),这里的次数只会用在next这个模式上, 而不会用在begin模式上,如果需要用在多个模式上,可以使用模式组

  1)代码

package com.yuange.flink.day07;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;
import java.util.List;
import java.util.Map;

/**
 * @作者:袁哥
 * @时间:2021/7/23 19:14
 */
public class Flink_CEP_Basic_Group {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> stream = environment.readTextFile("input/sensor.txt")
                .map(line -> {
                    String[] split = line.split(",");
                    return new WaterSensor(split[0], Long.parseLong(split[1]) * 1000, Integer.parseInt(split[2]));
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                );

        // 1. 定义模式
        Pattern<WaterSensor, WaterSensor> pattern = Pattern
                .begin(
                        Pattern
                            .<WaterSensor>begin("s1")
                            .where(new SimpleCondition<WaterSensor>() {
                                @Override
                                public boolean filter(WaterSensor value) throws Exception {
                                    return "sensor_1".equals(value.getId());
                                }
                            })
                            .next("s2")
                            .where(new SimpleCondition<WaterSensor>() {
                                @Override
                                public boolean filter(WaterSensor value) throws Exception {
                                    return "sensor_2".equals(value.getId());
                                }
                            })
                )
                .times(2)
                .consecutive();

        // 2. 在流上应用模式
        // 3. 获取匹配到的结果
        CEP.pattern(stream, pattern)
                .select(new PatternSelectFunction<WaterSensor, String>() {
                    @Override
                    public String select(Map<String, List<WaterSensor>> map) throws Exception {
                        return map.toString();
                    }
                })
                .print();

        environment.execute();
    }
}

  2)结果

{s1=[WaterSensor(id=sensor_1, ts=3000, vc=30), WaterSensor(id=sensor_1, ts=4000, vc=400)], s2=[WaterSensor(id=sensor_2, ts=4000, vc=30), WaterSensor(id=sensor_2, ts=5000, vc=50)]}

9.4.5 超时数据

  当一个模式上通过within加上窗口长度后,部分匹配的事件序列就可能因为超过窗口长度而被丢弃。

package com.yuange.flink.day07;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

import java.time.Duration;
import java.util.List;
import java.util.Map;

/**
 * @作者:袁哥
 * @时间:2021/7/23 19:14
 */
public class Flink_CEP_WithIn {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> stream = environment.readTextFile("input/sensor.txt")
                .map(line -> {
                    String[] split = line.split(",");
                    return new WaterSensor(split[0], Long.parseLong(split[1]) * 1000, Integer.parseInt(split[2]));
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                );

        // 1. 定义模式
        Pattern<WaterSensor, WaterSensor> pattern = Pattern.<WaterSensor>begin("start")
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                })
                .next("end")
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return "sensor_2".equals(value.getId());
                    }
                })
                .within(Time.seconds(2));

        // 2. 在流上应用模式
        // 3. 获取匹配到的结果
        SingleOutputStreamOperator result = CEP.pattern(stream, pattern)
                .select(
                        new OutputTag<String>("timeout") {
                        },
                        new PatternTimeoutFunction() {
                            @Override
                            public Object timeout(Map pattern,
                                                  long timeoutTimestamp) throws Exception {
                                //pattern 就是超时数据,flink会自动的把返回值, 放入到侧输出流中
                                return pattern.toString();
                            }
                        },
                        new PatternSelectFunction<WaterSensor, String>() {
                            @Override
                            public String select(Map<String, List<WaterSensor>> map) throws Exception {
                                return map.toString();
                            }
                        }
                );
        result.print("main");
        result.getSideOutput(new OutputTag<String>("timeout"){}).print("timeout");

        environment.execute();
    }
}

9.4.6 匹配后跳过策略

  对于一个给定的模式,同一个事件可能会分配到多个成功的匹配上。为了控制一个事件会分配到多少个匹配上,你需要指定跳过策略AfterMatchSkipStrategy。 有五种跳过策略,如下:

    1)NO_SKIP:每个成功的匹配都会被输出。

    2)SKIP_TO_NEXT:丢弃以相同事件开始的所有部分匹配。

    3)SKIP_PAST_LAST_EVENT:丢弃起始在这个匹配的开始和结束之间的所有部分匹配。

    4)SKIP_TO_FIRST:丢弃起始在这个匹配的开始和第一个出现的名称为PatternName事件之间的所有部分匹配。

    5)SKIP_TO_LAST:丢弃起始在这个匹配的开始和最后一个出现的名称为PatternName事件之间的所有部分匹配。

// 1. 定义模式
Pattern<WaterSensor, WaterSensor> pattern = Pattern.<WaterSensor>begin("start", AfterMatchSkipStrategy.noSkip())

第10章 Flink CEP编程实战

10.1 恶意登录监控

package com.yuange.flink.day07;

import com.yuange.flink.day06.LoginEvent;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

import java.time.Duration;
import java.util.List;
import java.util.Map;

/**
 * @作者:袁哥
 * @时间:2021/7/23 21:09
 */
public class Flink_Login {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(2);

        KeyedStream<LoginEvent, Long> stream = environment.readTextFile("input/LoginLog.csv")
                .map(line -> {
                    String[] split = line.split(",");
                    return new LoginEvent(Long.valueOf(split[0]), split[1], split[2], Long.parseLong(split[3]) * 1000L);
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getEventTime())
                )
                .keyBy(LoginEvent::getUserId);

        //定义模式,紧挨着两个fail, 并且在2s以内
        Pattern<LoginEvent, LoginEvent> pattern = Pattern.<LoginEvent>begin("firstFail")
                .where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
                        return "fail".equals(value.getEventType());
                    }
                })
                .next("secondFail")
                .where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
                        return "fail".equals(value.getEventType());
                    }
                })
                .within(Time.milliseconds(2001));

        //把模式应用到流上并取出数据
        SingleOutputStreamOperator<String> main = CEP.pattern(stream, pattern)
                .select(
                        new OutputTag<String>("异常") {
                        },
                        new PatternTimeoutFunction<LoginEvent, String>() {
                            @Override
                            public String timeout(Map<String, List<LoginEvent>> map, long l) throws Exception {
                                return null;
                            }
                        },
                        new PatternSelectFunction<LoginEvent, String>() {
                            @Override
                            public String select(Map<String, List<LoginEvent>> map) throws Exception {
                                LoginEvent firstFail = map.get("firstFail").get(0);
                                LoginEvent firstFail2 = map.get("secondFail").get(0);
                                return firstFail.getUserId() + "在恶意登录";
                            }
                        }
                );
        main.print();

        environment.execute();
    }
}

10.2 订单支付实时监控

package com.yuange.flink.day07;

import com.yuange.flink.day03.OrderEvent;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

import java.time.Duration;
import java.util.List;
import java.util.Map;

/**
 * @作者:袁哥
 * @时间:2021/7/23 21:19
 */
public class Flink_High_Project_Order {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        KeyedStream<OrderEvent, Long> stream = environment.readTextFile("input/OrderLog.csv")
                .map(line -> {
                    String[] datas = line.split(",");
                    return new OrderEvent(
                            Long.valueOf(datas[0]),
                            datas[1],
                            datas[2],
                            Long.parseLong(datas[3]) * 1000);
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<OrderEvent>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getEventTime())
                )
                .keyBy(OrderEvent::getOrderId);

        /*
        * 1.有create也有pay
        * 2.有create没有pay , 或者有pay没有create
        * 3.create和pay超时
        * */
        Pattern<OrderEvent, OrderEvent> pattern = Pattern.<OrderEvent>begin("create", AfterMatchSkipStrategy.skipPastLastEvent())
                .where(new SimpleCondition<OrderEvent>() {
                    @Override
                    public boolean filter(OrderEvent value) throws Exception {
                        return "create".equals(value.getEventType());
                    }
                })
                .optional()
                .next("pay")
                .where(new SimpleCondition<OrderEvent>() {
                    @Override
                    public boolean filter(OrderEvent value) throws Exception {
                        return "pay".equals(value.getEventType());
                    }
                })
                .within(Time.minutes(45));

        SingleOutputStreamOperator<String> main = CEP.pattern(stream, pattern)
                .select(
                        new OutputTag<String>("timeout") {
                        },
                        new PatternTimeoutFunction<OrderEvent, String>() {
                            @Override
                            public String timeout(Map<String, List<OrderEvent>> pattern, long timeoutTimestamp) throws Exception {
                                if (pattern.containsKey("create")) {
                                    return pattern.get("create").get(0).getOrderId() + " 有创建, 没有支付";
                                } else {
                                    return pattern.get("pay").get(0).getOrderId() + " 有支付, 没有创建, 系统有问题";
                                }
                            }
                        },
                        new PatternSelectFunction<OrderEvent, String>() {
                            @Override
                            public String select(Map<String, List<OrderEvent>> pattern) throws Exception {
                                Long orderId = pattern.get("pay").get(0).getOrderId();
                                if (pattern.keySet().size() == 1) {
                                    return "订单: " + orderId + " 只有pay可以create";
                                }
                                return "";
                            }
                        }
                );
        main.getSideOutput(new OutputTag<String>("timeout"){})
                .union(main.filter(s -> s.length() > 0))
                .print();
        environment.execute();
    }
}
原文地址:https://www.cnblogs.com/LzMingYueShanPao/p/15041689.html