TopN热门新闻计算

采用flink实时计算topn热门新闻,主程序参考自http://wuchong.me/blog/2018/11/07/use-flink-calculate-hot-items/,稍微整理了一下程序,数据可以从参考链接中下载。

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.io.PojoCsvInputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.io.File;
import java.net.URL;
import java.sql.Timestamp;
import java.util.*;
import java.util.concurrent.PriorityBlockingQueue;

/**
 * 每隔1分钟输出过去5分钟内点击量最多的前 N 个news
 */
public class HotNews {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //由于Java反射抽出的字段顺序是不确定的,需要显式指定字段顺序
        String[] fileOrder = new String[] {"userID", "itemID", "categoryID", "behavior", "timestamp"};
        String path = "userbehavior.csv";
        DataStream dataSource = getSource(env, path, fileOrder,UserBehavior.class);

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //显示设置按照eventtime模型式进行处理
        DataStream timeData = dataSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
            //真实业务场景一般都是存在乱序的,所以一般使用 BoundedOutOfOrdernessTimestampExtractor
            @Override
            public long extractAscendingTimestamp(UserBehavior userBehavior) {
                return userBehavior.getTimestamp() * 1000; //转为毫秒
            }
        });

        DataStream clickData = timeData.filter(new FilterFunction<UserBehavior>() {
            @Override
            public boolean filter(UserBehavior userBehavior) throws Exception {
                return userBehavior.getBehavior().equals("click");
            }
        }); //过滤出点击

        //每个商品在每个窗口的点击量的数据流
        DataStream windowData = clickData
                .keyBy("itemID")
                .timeWindow(Time.minutes(5), Time.minutes(1)) //每隔1分统计最近5分钟内的每个news的点击量
                .aggregate(new CountAgg(), new WindowResultFunction());

        //计算每个窗口的最热门新闻
        DataStream topItems = windowData
                .keyBy("windowEnd")
                .process(new TopNHotNews(5));

        topItems.print();
        env.execute("Hot news Job!");
    }

    public static<T> DataStream<T> getSource(StreamExecutionEnvironment env, String path, String[] fileOrder,
                                             Class<T> type) {
        //本地文件路径
        URL fileUrl = HotNews.class.getClassLoader().getResource(path);
        Path filePath = Path.fromLocalFile(new File(fileUrl.getPath()));
        //抽取TypeInformation,是一个PojoTypeInfo
        PojoTypeInfo<T> pojoType = (PojoTypeInfo<T>) TypeExtractor.createTypeInfo(type);
        //由于Java反射抽出的字段顺序是不确定的,需要显式指定字段顺序
        // 创建 PojoCsvInputFormat
        PojoCsvInputFormat<T> csvInput = new PojoCsvInputFormat<>(filePath, pojoType, fileOrder);
        return env.createInput(csvInput,pojoType);
    }

    /** COUNT 统计的聚合函数实现,每出现一条记录加1**/
    public static class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {
        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(UserBehavior userBehavior, Long acc) {
            return acc + 1;
        }

        @Override
        public Long getResult(Long acc) {
            return acc;
        }

        @Override
        public Long merge(Long acc1, Long acc2) {
            return acc1 + acc2;
        }
    }

    public static class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {

        @Override
        public void apply(Tuple key, TimeWindow window, Iterable<Long> aggregateResult,
                          Collector<ItemViewCount> collector) throws Exception {
            long itemID = ((Tuple1<Long>) key).f0;
            long count = aggregateResult.iterator().next();
            collector.collect(ItemViewCount.of(itemID, window.getEnd(), count));
        }
    }

    /** 求某个窗口中前 N 名的热门点击新闻,key 为窗口时间戳,输出为 TopN 的结果字符串 */
    public static class TopNHotNews extends KeyedProcessFunction<KeyedProcessFunction.Context, ItemViewCount,
            String> {
        private final int topSize;

        public TopNHotNews(int topSize) {
            this.topSize = topSize;
        }

        // 用于存储商品与点击数的状态,待收齐同一个窗口的数据后,再触发 TopN 计算
        private ListState<ItemViewCount> itemState;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            //状态注册
            ListStateDescriptor itemsStateDesc = new ListStateDescriptor("itemState-state", ItemViewCount.class);
            itemState = getRuntimeContext().getListState(itemsStateDesc);
        }

        @Override
        public void processElement(ItemViewCount input, Context context, Collector<String> collector) throws Exception {
            //每条数据都保存到状态中
            itemState.add(input);
            //注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
            context.timerService().registerEventTimeTimer(input.getWindowEnd() + 1);
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext cts, Collector<String> out) throws Exception {
            //这时使用prorityqueue,小顶堆找topn大
            PriorityBlockingQueue<ItemViewCount> topNItems = new PriorityBlockingQueue<>(topSize, new Comparator<ItemViewCount>() {
                @Override
                public int compare(ItemViewCount o1, ItemViewCount o2) {
                    return (int) (o1.getViewCount() - o2.getViewCount());
                }
            });

            for(ItemViewCount item: itemState.get()) {
                if(topNItems.size() < topSize) {
                    topNItems.offer(item);
                } else if(topNItems.peek().getViewCount() < item.getViewCount()) {
                    topNItems.poll();
                    topNItems.offer(item);
                }
            }

            List<ItemViewCount> list = InitCollect.newArrayList(topNItems.size());
            list.addAll(topNItems);

            Collections.sort(list, new Comparator<ItemViewCount>() {
                @Override
                public int compare(ItemViewCount o1, ItemViewCount o2) {
                    return (int)(o2.getViewCount() - o1.getViewCount());
                }
            });

            itemState.clear();

            //将排名信息格式化成String,便于打印
            StringBuilder result = new StringBuilder();
            result.append("=============================
");
            result.append("时间是:").append(new Timestamp(timestamp - 1)).append("
");
            for(ItemViewCount item: list) {
                result.append(item.getItemID()).append(" : ").append(item.getViewCount()).append("
");
            }
            out.collect(result.toString());
        }
    }
}
public class UserBehavior {
    private long userID; //userid
    private long itemID; //newsid
    private int categoryID; //news categor
    private String behavior; //user behavior ->点击、暴光、评论、转发(click,exposure,comment,forwarding)
    private long timestamp; //行为发生的时间戳,秒

    public long getUserID() {
        return userID;
    }

    public void setUserID(long userID) {
        this.userID = userID;
    }

    public long getItemID() {
        return itemID;
    }

    public void setItemID(long itemID) {
        this.itemID = itemID;
    }

    public int getCategoryID() {
        return categoryID;
    }

    public void setCategoryID(int categoryID) {
        this.categoryID = categoryID;
    }

    public String getBehavior() {
        return behavior;
    }

    public void setBehavior(String behavior) {
        this.behavior = behavior;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }
}
/**
 * 新闻点击量,窗口操作的输出类型
 */
public class ItemViewCount {
    private long itemID;//新闻id
    private long windowEnd;//窗口结束时间戳
    private long viewCount;//新闻点击量

    public long getItemID() {
        return itemID;
    }

    public void setItemID(long itemID) {
        this.itemID = itemID;
    }

    public long getWindowEnd() {
        return windowEnd;
    }

    public void setWindowEnd(long windowEnd) {
        this.windowEnd = windowEnd;
    }

    public long getViewCount() {
        return viewCount;
    }

    public void setViewCount(long viewCount) {
        this.viewCount = viewCount;
    }

    public static ItemViewCount of(long itemID, long windowEnd, long viewCount) {
        ItemViewCount itemViewCount = new ItemViewCount();
        itemViewCount.itemID = itemID;
        itemViewCount.windowEnd = windowEnd;
        itemViewCount.viewCount = viewCount;
        return itemViewCount;
    }
}
import java.util.*;

public class InitCollect {
    
    public static <K> List<K> newArrayList() {
        return new ArrayList<>();
    }
    
    public static <K> List<K> newArrayList(int size) {
        return new ArrayList<>(size);
    }

    public static <K> List<K> newLinkedList() {
        return new LinkedList<>();
    }

    public static <K,V> Map<K,V> newHashMap() {
        return new HashMap<>();
    }

    public static <K> Set<K> newHashSet() {
        return new HashSet<>();
    }
}
原文地址:https://www.cnblogs.com/little-horse/p/12464271.html