三.Flink实时项目电商用户行为分析之市场营销商业指标统计分析

1.1 模块创建和数据准备

继续在Flink-Project下新建一个 maven module作为子项目,命名为gmall-market

这个模块中我们没有现成的数据,所以会用自定义的测试源来产生测试数据流,或者直接用生成测试数据文件。

1.2 APP市场推广统计

随着智能手机的普及,在如今的电商网站中已经有越来越多的用户来自移动端,相比起传统浏览器的登录方式,手机APP成为了更多用户访问电商网站的首选。对于电商企业来说,一般会通过各种不同的渠道对自己的APP进行市场推广,而这些渠道的统计数据(比如,不同网站上广告链接的点击量、APP下载量)就成了市场营销的重要商业指标。

首先我们考察分渠道的市场推广统计。由于没有现成的数据,所以我们需要自定义一个测试源来生成用户行为的事件流。

1.2.1 自定义测试数据源

定义一个源数据的javaBean类MarketUserBehavior,再定义一个SourceFunction,用于产生用户行为源数据:

1)定义JavaBean--MarketUserBehavior

@Data
@NoArgsConstructor
@AllArgsConstructor
public class MarketUserBehavior {
    // 属性:用户ID,用户行为,推广渠道,时间戳
    private Long userId;
    private String behavior;
    private String channel;
    private Long timestamp;
}

2)自定义数据源

public class MarketBehaviorSource implements ParallelSourceFunction<MarketUserBehavior> {
//是否运行的标识位
private Boolean running=true;
//定义用户行为和推广渠道的集合
private List<String> behaviorList= Arrays.asList("CLICK", "DOWNLOAD", "INSTALL", "UNINSTALL");
private List<String> channelList=Arrays.asList("app store", "wechat", "weibo", "tieba");
//定义随机数发生器
private Random random=new Random();

@Override
public void run(SourceContext<MarketUserBehavior> ctx) throws Exception {
while (running){
//随机生成所有字段
long id = random.nextLong();
String behavior = behaviorList.get(random.nextInt(behaviorList.size()));
String channel = channelList.get(random.nextInt(channelList.size()));
long timestamp = System.currentTimeMillis();
//发出数据
ctx.collect(new MarketUserBehavior(id,behavior,channel,timestamp) );
Thread.sleep(100L);

}
}

@Override
public void cancel() {
running=false;
}
}

1.2.2 分渠道统计

每隔5秒钟统计最近一个小时按照渠道的推广量。

1)定义JavaBean--ChannelBehaviorCount

@Data
@NoArgsConstructor
@AllArgsConstructor
public class ChannelBehaviorCount {
    private String channel;
    private String behavior;
    private String windowEnd;
    private Long count;
}

2)主类程序

public class MarketByChannelApp {
public static void main(String[] args) throws Exception {
//1.创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.读取自定义数据源数据
DataStreamSource<MarketUserBehavior> marketUserDS = env.addSource(new MarketBehaviorSource());

//3.过滤卸载数据,按照渠道和行为做分组,开窗
SingleOutputStreamOperator<ChannelBehaviorCount> result = marketUserDS.filter(data -> "UNINSTALL".equals(data.getBehavior()))
.keyBy("channel", "behavior")
.timeWindow(Time.hours(1), Time.seconds(5))
.aggregate(new MarketAggFunc(), new MarketWindowFunc());

//4.打印
result.print();
//5.执行任务
env.execute();
}

public static class MarketAggFunc implements AggregateFunction<MarketUserBehavior,Long,Long>{

@Override
public Long createAccumulator() {
return 0L;
}

@Override
public Long add(MarketUserBehavior value, Long accumulator) {
return accumulator+1L;
}

@Override
public Long getResult(Long accumulator) {
return accumulator;
}

@Override
public Long merge(Long a, Long b) {
return a+b;
}
}
public static class MarketWindowFunc implements WindowFunction<Long, ChannelBehaviorCount, Tuple, TimeWindow>{

@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<ChannelBehaviorCount> out) throws Exception {
String channel = tuple.getField(0);
String behavior = tuple.getField(1);
String windowEnd = new Timestamp(window.getEnd()).toString();
Long count = input.iterator().next();
out.collect(new ChannelBehaviorCount(channel,behavior,windowEnd,count));
}
}

}

1.2.3 不分渠道(总量)统计

同样我们还可以考察不分渠道的市场推广统计,这样得到的就是所有渠道推广的总量。

1.3 页面广告分析

电商网站的市场营销商业指标中,除了自身的APP推广,还会考虑到页面上的广告投放(包括自己经营的产品和其它网站的广告)。所以广告相关的统计分析,也是市场营销的重要指标。

对于广告的统计,最简单也最重要的就是页面广告的点击量,网站往往需要根据广告点击量来制定定价策略和调整推广方式,而且也可以借此收集用户的偏好信息。更加具体的应用是,我们可以根据用户的地理位置进行划分,从而总结出不同省份用户对不同广告的偏好,这样更有助于广告的精准投放。

1.3.1 页面广告点击量统计

接下来我们就进行页面广告按照省份划分的点击量的统计。

同样由于没有现成的数据,我们定义一些测试数据,放在AdClickLog.csv中,用来生成用户点击广告行为的事件流。

在代码中我们首先定义源数据的javaBean类AdClickEvent,以及输出统计数据的javaBean类AdCountByProvince。主函数中先以province进行keyBy,然后开一小时的时间窗口,滑动距离为5秒,统计窗口内的点击事件数量。具体代码实现如下

1JavaBean--AdClickEvent

@Data
@NoArgsConstructor
@AllArgsConstructor
public class AdClickEvent {
    private Long userId;
    private Long adId;
    private String province;
    private String city;
    private Long timestamp;
}

2JavaBean—AdCountByProvince

@Data
@NoArgsConstructor
@AllArgsConstructor
public class AdCountByProvince {
    private String province;
    private String windowEnd;
    private Long count;
}

3)主程序

public class AdStatisticsByProvince {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        // 读取数据
        DataStream<AdClickEvent> adClickEventStream = env.readTextFile("input/AdClickLog.csv")
                .map(data -> {
                    String[] fields = data.split(",");
                    return new AdClickEvent(new Long(fields[0]), new Long(fields[1]), fields[2], fields[3], new Long(fields[4]));
                })
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<AdClickEvent>() {
                    @Override
                    public long extractAscendingTimestamp(AdClickEvent element) {
                        return element.getTimestamp() * 1000L;
                    }
                });

        // 根据province分组,开窗聚合统计
        DataStream<AdCountByProvince> adCountStream = adClickEventStream
                .keyBy(AdClickEvent::getProvince)
                .timeWindow(Time.hours(1), Time.seconds(5))
                .aggregate(new AdCountAgg(), new AdCountResult());

        adCountStream.print();

        env.execute("ad statistics job");
    }
    // 实现自定义的增量聚合函数
    public static class AdCountAgg implements AggregateFunction<AdClickEvent, Long, Long> {
        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(AdClickEvent value, Long accumulator) {
            return accumulator + 1;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return a + b;
        }
    }

    // 实现自定义的全窗口函数
    public static class AdCountResult implements WindowFunction<Long, AdCountByProvince, String, TimeWindow> {
        @Override
        public void apply(String province, TimeWindow window, Iterable<Long> input, Collector<AdCountByProvince> out) throws Exception {
            out.collect(new AdCountByProvince(province, new Timestamp(window.getEnd()).toString(), input.iterator().next()));
        }
    }
}

1.3.2 黑名单过滤

上节我们进行的点击量统计,同一用户的重复点击是会叠加计算的。在实际场景中,同一用户确实可能反复点开同一个广告,这也说明了用户对广告更大的兴趣;但是如果用户在一段时间非常频繁地点击广告,这显然不是一个正常行为,有刷点击量的嫌疑。所以我们可以对一段时间内(比如一天内)的用户点击行为进行约束,如果对同一个广告点击超过一定限额(比如100次),应该把该用户加入黑名单并报警,此后其点击行为不应该再统计.

具体代码实现如下:

1JavaBean—BlackListWarning

@Data
@NoArgsConstructor
@AllArgsConstructor
public class BlackListWarning {
    private Long userId;
    private Long adId;
    private String warningMsg;
}

 2)主程序

 public class AdClickByProvinceApp {
public static void main(String[] args) throws Exception {
//1.创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//2.读取文本数据创建流转换位javaBean
SingleOutputStreamOperator<AdClickEvent> adClickDS = env.readTextFile("input/dClickLog.csv")
.map(line -> {
String[] fields = line.split(",");
return new AdClickEvent(Long.parseLong(fields[0]),
Long.parseLong(fields[1]),
fields[2],
fields[3],
Long.parseLong(fields[4]));
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<AdClickEvent>() {
@Override
public long extractAscendingTimestamp(AdClickEvent element) {
return element.getTimestamp() * 1000L;
}
});
//3.根据点击量进行数据数据过滤(单日某个用户点击某个广告超过100,则加入黑名单)
SingleOutputStreamOperator<AdClickEvent> filterByClickCount = adClickDS.
keyBy("userId", "adId")
.process(new AdClickKeyProcessFunc(100L));
//4.按照省份分组,开窗,计算各个省份广告点击总数
SingleOutputStreamOperator<AdCountByProvince> result = filterByClickCount.keyBy(data -> data.getProvince())
.timeWindow(Time.hours(1), Time.seconds(5))
.aggregate(new AdClickAgg(), new AdClickWindowFunc());
//5.获取侧输出流
DataStream<BlackListWarning> sideOutput = filterByClickCount.getSideOutput(new OutputTag<BlackListWarning>("OutputTag") {
});
//6.打印
result.print();
sideOutput.print("sideOutput");
//7.启动任务
env.execute();

}

public static class AdClickAgg implements AggregateFunction<AdClickEvent,Long,Long>{

@Override
public Long createAccumulator() {
return 0L;
}

@Override
public Long add(AdClickEvent value, Long accumulator) {
return accumulator + 1L;
}

@Override
public Long getResult(Long accumulator) {
return accumulator;
}

@Override
public Long merge(Long a, Long b) {
return a+b;
}
}
public static class AdClickWindowFunc implements WindowFunction<Long, AdCountByProvince,String, TimeWindow>{

@Override
public void apply(String province, TimeWindow window, Iterable<Long> input, Collector<AdCountByProvince> out) throws Exception {
String windowEnd = new Timestamp(window.getEnd()).toString();
Long count = input.iterator().next();
out.collect(new AdCountByProvince(province,windowEnd,count));
}
}
public static class AdClickKeyProcessFunc extends KeyedProcessFunction<Tuple,AdClickEvent,AdClickEvent>{
//定义单日单人点击某个广告上界
private Long maxClick;

public AdClickKeyProcessFunc() {
}

public AdClickKeyProcessFunc(Long maxClick) {
this.maxClick = maxClick;
}

 // 定义状态,保存当前用户对某一广告的点击次数
private ValueState<Long> countState;

  // 定义状态,用来标记当前用户和广告ID是否已经发送到黑名单
private ValueState<Boolean> isBlackList;

@Override
public void open(Configuration parameters) throws Exception {
countState= getRuntimeContext().getState(new ValueStateDescriptor<Long>("count-state",Long.class));
isBlackList=getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is-black-list",Boolean.class));
}

@Override
public void processElement(AdClickEvent value, Context ctx, Collector<AdClickEvent> out) throws Exception {
//获取状态中的数据
Long count = countState.value();
//判断是否是第一条数据
if(count == null){
//如果是第一条数据
countState.update(1L);
//定义定时器,注册定时,每天0点用于清空状态
// value.getTimestamp用的是东八时区,flink默认的是零时区
//东八比零时区早,所以我们要减8小时,即回到零时区
Long ts=(value.getTimestamp()/(60 * 60 * 24)+1)*(24 * 60 * 60 * 1000L) - (8 * 60 * 60 * 1000L);
System.out.println(new Timestamp(ts));
ctx.timerService().registerEventTimeTimer(ts);
}else{
//如果不是第一条数据,更新状态为之前的数据+1
long curClickCount=count + 1L ;
countState.update(curClickCount);
if(curClickCount>=maxClick){
//判断是否已经被拉黑
if(isBlackList.value()==null){
//超过单日点击次数,将数据输出到侧输出流
ctx.output(new OutputTag<BlackListWarning>("output"){},new BlackListWarning(value.getUserId(),value.getAdId(),"点击次数超过"+maxClick+"! "));
//更新状态为true
isBlackList.update(true);
}
return;
}
}
//将数据写入主流
out.collect(value);

}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<AdClickEvent> out) throws Exception {
countState.clear();
isBlackList.clear();
}
}

}

 

 

 

原文地址:https://www.cnblogs.com/whdd/p/14058617.html