spark之Markov马尔可夫智能邮件预测

一.来源

  此项目来源《Data Algorithms Recipes for Scaling Up with Hadoop and Spark》第11章,本程序利用spark3.0以及java8进行改写,

改写的有:

  1.利用spark3.0与java8

  2.直接利用spark生成最终的状态转移矩阵,不用分开统计

  3.利用python加载状态转移矩阵进行预测

二.目的

  用户的购买行为看起来是没有规律可循的,但其实从时间有序的角度看,也许是有规律可循的,例如,用户可能每一个月发工资时购买得多,每年某个时间(双十一、生日)等购买得比较多马尔科夫模型能够挖掘出时间上的规律,假设我们能够根据用户上一次购买记录推测其下一次购买时间,就可以在推测时间向其发送邮件进行营销至于营销的商品内容,可以根据其他推荐算法的结果。

三.程序

完整程序见,Markov马尔可夫智能邮件预测(https://github.com/jiangnanboy/spark_tutorial)

1.利用spark统计状态转移矩阵

    /**
     * 建立状态转移概率矩阵
     * @param session
     */
    public static void buildStateTransitionMatrix(SparkSession session, Broadcast<Map<String, String>> broadcastStatesMap, Broadcast<List<Tuple2<String, List<Double>>>> broadcastInitStateList) {
        //customerID,transactionID,purchaseDate,amount(顾客ID,交易ID,交易日期,金额)
        String path = PropertiesReader.get("intermediate_smart_email_txt");
        JavaRDD<String> javaRDD = session.read().textFile(path).toJavaRDD().coalesce(10);

        //key=customerID,v=(purchaseDate,amount)
        JavaPairRDD<String, Tuple2<Long, Integer>> javaPairRDD = javaRDD.mapToPair(line -> {
            String[] tokens = StringUtils.split(line, ",");
            if(4 != tokens.length) {
                return null;
            }
            long date = DateUtils.parseDate(tokens[2], "yyyy-MM-dd").getTime();
            int amount = Integer.parseInt(tokens[3]);
            Tuple2<Long, Integer> t2 = new Tuple2<>(date, amount);
            return new Tuple2<>(tokens[0], t2);
        });

        //group by customerID
        JavaPairRDD<String, Iterable<Tuple2<Long, Integer>>> customerRDD = javaPairRDD.groupByKey();

        //创建状态序列
        JavaPairRDD<String, List<String>> stateSequence = customerRDD.mapValues(dateAndAmount -> {
            List<Tuple2<Long, Integer>> list = toList(dateAndAmount);
            Collections.sort(list, TupleComparatorAscending.INSTANCE);//对list按日期排序
            return toStateSequence(list);
        });

        /**
         * customerID, List<State>
         * 所有状态的频率为1 =》((fromState, toState),1)
         *   | S1   S2   S3   ...
         *---+-----------------------
         *S1 | <probability-value>
         *   |
         *S2 |
         *   |
         *S3 |
         *   |
         *...|
         */
        JavaPairRDD<Tuple2<String, String>, Integer> model = stateSequence.flatMapToPair(s -> {
            List<String> states = s._2;
            List<Tuple2<Tuple2<String, String>, Integer>> mapOut = new ArrayList<>();
            if((null == states) || (states.size() < 2)) {
                return Collections.emptyIterator();
            }
            for(int i = 0; i < (states.size() - 1); i++) {
                String fromState = states.get(i);
                String toState = states.get(i+1);
                Tuple2<String, String> t2 = new Tuple2<>(fromState, toState);
                mapOut.add(new Tuple2<>(t2, 1));
            }
            return mapOut.iterator();
        });

        // 统计所有状态频率:  ((fromState, toState), frequence)
        JavaPairRDD<Tuple2<String, String>, Integer> fromStateToStateFrequence1 = model.reduceByKey((i1, i2) -> i1 + i2);

        // ((fromState, toState), frequence) =》 (fromState, (toState, frequence))
        JavaPairRDD<String, Tuple2<String, Integer>> fromStateToStateFrequence2 = fromStateToStateFrequence1.mapToPair(s -> {
            String key = s._1._1;
            Tuple2<String, Integer> value = new Tuple2<>(s._1._2, s._2);
            return new Tuple2<>(key, value);
        });

        // group by fromState =》 fromState,List<Tuple2<toState, frequence>> => rowNumber,List<Tuple2<toState, frequence>>
        JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> groupState = fromStateToStateFrequence2.groupByKey().mapToPair(st2 -> {
            String rowNumber = broadcastStatesMap.getValue().get(st2._1);
            return new Tuple2<>(rowNumber, st2._2);
        });

        //初始化矩阵状态,value = 1.0 / size
        //List<Tuple2<String, List<Double>>> initStateList = initState(broadcastStatesMap.getValue().size());
        JavaPairRDD<String, List<Double>> initStatePairRDD = JavaSparkContext.fromSparkContext(session.sparkContext()).parallelizePairs(broadcastInitStateList.getValue());

        //initStatePairRDD.leftOuterJoin(groupState)
        JavaPairRDD<String, Tuple2<List<Double>, Optional<Iterable<Tuple2<String, Integer>>>>> joinPairRDD = initStatePairRDD.leftOuterJoin(groupState);

        //规范化转移矩阵,使行的概率和为“1”
        JavaPairRDD<String, List<Double>> resultJavaPairRDD = joinPairRDD.mapValues(lot2 -> {
            int size = broadcastStatesMap.getValue().size();
            List<Double> listDouble = lot2._1;
            Optional<Iterable<Tuple2<String, Integer>>> option = lot2._2;
            if(option.isPresent()) {
                Iterable<Tuple2<String, Integer>> toStateFrequence = option.get();
                Iterator<Tuple2<String, Integer>> iter = toStateFrequence.iterator();
                List<Tuple2<String, Integer>> iterList = new ArrayList<>();
                int sum = 0;
                while(iter.hasNext()) {
                    Tuple2<String, Integer> t2 = iter.next();
                    iterList.add(t2);
                    sum += t2._2;
                }
                //加入平滑,防止概率为0
                if(iterList.size() < size) {
                    sum += size;
                    for(int i = 0; i < listDouble.size(); i ++) {
                        listDouble.set(i, 1.0/sum);
                    }
                }

                for(int i = 0; i < iterList.size(); i++) {
                    String stateNumber = broadcastStatesMap.getValue().get(iterList.get(i)._1);
                    double numalizeValue = iterList.get(i)._2 / (double)sum;
                    listDouble.set(Integer.parseInt(stateNumber), numalizeValue);
                }

            } else {
                return listDouble;
            }
            return listDouble;
        });

        //1.利用sortByKey对转移状态排序,最终的状态转移概率矩阵
        //List<Tuple2<String, List<Double>>> stateResult = resultJavaPairRDD.sortByKey().collect();

        //2.利用takeOrdered对转移状态排序,最终的状态转移概率矩阵
        List<Tuple2<String, List<Double>>> stateResult = resultJavaPairRDD.takeOrdered(broadcastStatesMap.getValue().size(), StateTupleComparatorAscending.INSTANCE);

        //打印转移概率矩阵
        for(Tuple2<String, List<Double>> s : stateResult) {
            StringBuilder sb = new StringBuilder();
            sb.append(s._1).append(",");
            for(int i = 0; i < (s._2.size() - 1); i ++) {
                sb.append(s._2.get(i)).append(" ");
            }
            sb.append(s._2.get(s._2.size() - 1));
            System.out.println(sb.toString());
        }

    }

2.利用python加载状态转移矩阵并进行预测

import os
import time
import datetime

# 根据(spark)Markov.java统计出的马尔可夫模型(model.txt),对validate.txt中的数据进行预测什么时间应该发出营销邮件
user_action = {}
model = []
#9大状态
states = ["SL", "SE", "SG", "ML", "ME", "MG", "LL", "LE", "LG"]

validate_path = os.path.join(os.getcwd(), "validate.txt")
model_path = os.path.join(os.getcwd(), "model.txt")

#读取validate data
with open(validate_path, 'r', encoding='utf-8') as f_read:
    for line in f_read:
        items = line.strip().split(',')
        user_id = items[0]
        if user_id in user_action.keys():
            hist = user_action[user_id]
            lst = [items[2], items[-1]]
            hist.append(lst)
        else:
            hist = []
            hist.append([items[2], items[-1]])
            user_action[user_id] = hist
print(user_action)

#读取model data
with open(model_path, 'r', encoding='utf-8') as f_read:
    for line in f_read:
        items = line.strip().split()
        row = []
        for item in items:
            row.append(float(item))
        model.append(row)
print(model)

#根据最近客户的行为数据(至少两次交易)make prediciton
for user_id,user_action_list in user_action.items():
    if len(user_action_list) < 2:
        continue
    state_sequence = []
    last_date = ''
    prior = user_action_list[0]
    for i in range(1, len(user_action_list)):
        current = user_action_list[i]
        prior_date = prior[0]
        current_date = current[0]

        #相隔天数
        prior_date = time.strptime(prior_date, '%Y-%m-%d')
        current_date = time.strptime(current_date, '%Y-%m-%d')
        prior_date = datetime.datetime(prior_date[0], prior_date[1], prior_date[2])
        current_date = datetime.datetime(current_date[0], current_date[1], current_date[2])
        days_diff = (current_date - prior_date).days

        dd = 'L'
        if days_diff < 30:
            dd = 'S'
        elif days_diff < 60:
            dd = 'M'

        #相差金额
        prior_amount = int(prior[1])
        current_amount = int(current[1])

        ad = 'G'
        if prior_amount < 0.9 * current_amount:
            ad = 'L'
        elif prior_amount < 1.1 * current_amount:
            ad = 'E'

        state_sequence.append(dd+ad)

        prior = current
        last_date = current_date

    if state_sequence:
        #根据最近一个状态发送营销邮件日期
        last_state = state_sequence[-1]
        row_index = states.index(last_state)
        row_value = model[row_index] #转移矩阵中行号为row_index的这一行值
        max_value = max(row_value) #row_value中最大值
        col_index = row_value.index(max_value) #max_value的索引号
        next_state = states[col_index]

        if next_state.startswith('S'):
            next_date = last_date + datetime.timedelta(15)
        elif next_state.startswith('E'):
            next_date = last_date + datetime.timedelta(45)
        else:
            next_date = last_date + datetime.timedelta(90)

    print('用户:{}, 预测下次邮件发送时间:{}'.format(user_id, next_date))

 3.状态转移矩阵

4.396976638863118E-6 4.396976638863118E-6 0.8062208425486636 0.15858575643387607 4.396976638863118E-6 4.396976638863118E-6 0.035153828227710626 4.396976638863118E-6 4.396976638863118E-6
0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111
0.8043794973142799 3.1671227323401235E-6 3.1671227323401235E-6 0.1594804651869869 3.1671227323401235E-6 3.1671227323401235E-6 0.03611153339414209 3.1671227323401235E-6 3.1671227323401235E-6
1.225940909648155E-5 1.225940909648155E-5 0.8156797842343999 1.225940909648155E-5 1.225940909648155E-5 0.15299742552408974 0.031212455559642024 1.225940909648155E-5 1.225940909648155E-5
0.010869565217391304 0.010869565217391304 0.7282608695652174 0.010869565217391304 0.010869565217391304 0.11956521739130435 0.05434782608695652 0.010869565217391304 0.010869565217391304
4.122521334047904E-5 4.122521334047904E-5 0.8190625386486375 0.14758626375891495 4.122521334047904E-5 4.122521334047904E-5 0.03298017067238323 4.122521334047904E-5 4.122521334047904E-5
4.703226413319537E-5 4.703226413319537E-5 0.8348697206283511 4.703226413319537E-5 4.703226413319537E-5 0.14081459881478695 4.703226413319537E-5 4.703226413319537E-5 0.023845357915530052
0.017857142857142856 0.017857142857142856 0.6785714285714286 0.017857142857142856 0.017857142857142856 0.125 0.017857142857142856 0.017857142857142856 0.03571428571428571
5.083884087442806E-4 5.083884087442806E-4 0.7961362480935434 5.083884087442806E-4 5.083884087442806E-4 0.16573462125063548 0.03304524656837824 5.083884087442806E-4 5.083884087442806E-4

 4.预测结果

用户:user1, 预测下次邮件发送时间:2020-01-22 00:00:00
用户:user2, 预测下次邮件发送时间:2020-02-16 00:00:00
用户:user3, 预测下次邮件发送时间:2020-01-25 00:00:00
用户:user4, 预测下次邮件发送时间:2020-04-05 00:00:00
用户:user5, 预测下次邮件发送时间:2020-01-30 00:00:00
用户:user6, 预测下次邮件发送时间:2020-01-16 00:00:00
原文地址:https://www.cnblogs.com/little-horse/p/14018540.html