016 Spark中关于购物篮的设计,以及优化(两个点)

一:介绍

1.购物篮的定义

  

2.适用场景

  

3.相关概念

  

  

 4.步骤

  

5.编程实现

  

6.步骤

  

  

二:程序

1.程序

  1 package com.ibeifeng.senior.mba.association
  2 
  3 import org.apache.hadoop.fs.{FileSystem, Path}
  4 import org.apache.spark.rdd.RDD
  5 import org.apache.spark.{SparkConf, SparkContext}
  6 
  7 import scala.collection.mutable
  8 
  9 /**
 10   * 使用SparkCore实现购物篮分析
 11   * Created by ibf on 01/12.
 12   */
 13 object FindAssociationRulesSparkCore {
 14   /**
 15     * 先从缓存中获取数据,如果不存在,直接重新获取
 16     *
 17     * @param items
 18     * @param size
 19     * @param cache
 20     * @return
 21     */
 22   def findItemSetsByCache(items: List[(String, Int)], size: Int, cache: mutable.Map[Int, List[List[(String, Int)]]]): List[List[(String, Int)]] = {
 23     cache.get(size).orElse {
 24       // 获取值
 25       val result = findItemSets(items, size, cache)
 26 
 27       // 更新缓存
 28       cache += size -> result
 29 
 30       // 返回值
 31       Some(result)
 32     }.get
 33   }
 34 
 35 
 36   /**
 37     * 构建项集基于items商品列表,项集中的商品数量是size指定
 38     *
 39     * @param items 商品列表:eg: [A, B, C]
 40     * @param size  最终项集包含商品的数量
 41     * @return
 42     */
 43   def findItemSets(items: List[(String, Int)], size: Int, cache: mutable.Map[Int, List[List[(String, Int)]]]): List[List[(String, Int)]] = {
 44     if (size == 1) {
 45       // items中的每个商品都是一个项集
 46       items.map(item => item :: Nil)
 47     } else {
 48       // 当size不是1的时候
 49       // 1. 获取项集大小为size-1的项集列表
 50       val tmpItemSets = findItemSetsByCache(items, size - 1, cache)
 51       // 2. 给tmpItemSets中添加一个新的不重复的项 ==> 数据的转换
 52       val itemSets = tmpItemSets.flatMap(itemSets => {
 53         // 给itemSets项集添加一个新的商品ID,要求不重复
 54         val newItemSets = items
 55           // 将包含的商品过滤掉&要求下标必须大于以及存在
 56           .filter(item => !itemSets.contains(item) && itemSets.forall(_._2 < item._2))
 57           // 将商品添加到项集中,产生一个新的项集
 58           // 为了使用distinct做去重操作,进行一个排序操作
 59           .map(item => (item :: itemSets))
 60 
 61         // 返回值
 62         newItemSets
 63       })
 64 
 65       // 返回项集的值
 66       itemSets
 67     }
 68   }
 69 
 70   def main(args: Array[String]): Unit = {
 71     // 1. 创建SparkContext
 72     val conf = new SparkConf()
 73       .setAppName("find-association-rules")
 74       .setMaster("local[*]")
 75     val sc = SparkContext.getOrCreate(conf)
 76 
 77     // ===========================================
 78     // 测试数据存储的路径
 79     val path = "data/transactions/10"
 80     val savePath = "data/transactions/result"
 81     // 最小支持度
 82     val minSupport = 2
 83     // 最小置信度
 84     val minConfidence = 0.4
 85 
 86     // 创建rdd读取原始的交易数据,
 87     // 假设交易数据是按行存储的,每行是一条交易,每条交易数据包含的商品ID使用","分割
 88     val rdd = sc.textFile(path, 20)
 89 
 90     // 1. 计算频繁项集
 91     // 1.1 获取每条交易存在的项集
 92     val itemSetsRDD: RDD[String] = rdd.flatMap(transaction => {
 93       // 1) 获取当前交易所包含的商品ID
 94       val items = transaction
 95         .split(",") // 分割
 96         .filter(!_.isEmpty) // 过滤
 97         .sorted //排序
 98         .toList // 转换为list
 99         .zipWithIndex // 将数据和下标合并,下标从0开始
100 
101       // 2) 构建辅助对象
102       val itemSize = items.size
103       val cache = mutable.Map[Int, List[List[(String, Int)]]]()
104 
105       // 3) 根据获取的商品ID的信息产生项集
106       // allItemSets集合中最后数据量是:2^itemSize - 1
107       val allItemSets: List[List[String]] = (1 to itemSize).map(size => {
108         // 产生项集中项的数量是size的项集
109         findItemSets(items, size, cache)
110       }).foldLeft(List[List[String]]())((v1, v2) => {
111         v2.map(_.map(_._1)) ::: v1
112       })
113 
114       // 4) 返回结果
115       allItemSets.map(_.mkString(","))
116     })
117 
118     // 1.2 获取频繁项集
119     val supportedItemSetsRDD = itemSetsRDD
120       // 数据转换
121       .map(items => (items, 1))
122       // 聚合求支持度
123       .reduceByKey(_ + _)
124       // 过滤产生频繁项集
125       .filter(_._2 >= minSupport)
126 
127     // 2. 计算关联规则
128     // 2.1 对每个频繁项集获取子项集
129     val subSupportedItemSetsRDD = supportedItemSetsRDD.flatMap(tuple => {
130       val itemSets = tuple._1.split(",").toList.zipWithIndex // 频繁项集
131       val frequency = tuple._2 // 该频繁项集的支持度
132 
133       // 2) 构建辅助对象
134       val itemSize = itemSets.size
135       val cache = mutable.Map[Int, List[List[(String, Int)]]]()
136 
137       // 3) 获取子项集
138       val allSubItemSets: List[List[String]] = (1 to itemSize).map(size => {
139         // 产生项集中项的数量是size的项集
140         findItemSets(itemSets, size, cache)
141       }).foldLeft(List[List[String]]())((v1, v2) => {
142         v2.map(_.map(_._1)) ::: v1
143       })
144 
145       // 4) 转换数据并输出
146       val items = itemSets.map(_._1)
147       allSubItemSets.map(subItemSets => {
148         // (A,B,frequency) ==> 表示A出现的时候B也出现的次数是frequency次
149         // 当subItemSets就是itemSets的时候,返回的二元组的第二个元素的(元组)第一个元素是空的列表
150         (subItemSets.mkString(","), ((items.toBuffer -- subItemSets).toList.mkString(","), frequency))
151       })
152     })
153 
154     // 2.2 计算置信度
155     val assocRulesRDD = subSupportedItemSetsRDD
156       .groupByKey() // 数据聚合
157       .flatMap(tuple => {
158       // 计算执行度: (A, B, k) => A存在的时候B也存储的几率是k
159       // A就是tuple的第一个元素
160       // 获取左件
161       val lhs = tuple._1.split(",").mkString("<", ",", ">")
162 
163       // 获取左件在所有的交易中出现的总的次数 tuple._2中第一个元素为空的数据就是总的次数
164       val frequency = tuple._2
165         // 只要第一个元素为空的值,表示from本身
166         .filter(_._1.isEmpty)
167         // 需要的是第二个元素
168         .map(_._2).toList match {
169         case head :: Nil => head
170         case _ => {
171           throw new IllegalArgumentException("异常")
172         }
173       }
174 
175       // 计算右件出现次数占左件次数的百分比, 并返回最终结果
176       tuple._2
177         // 要求第一个数据非空
178         .filter(!_._1.isEmpty)
179         // 数据转换,获取置信度
180         .map {
181         case (rhs, support) => {
182           // 计算置信度
183           (lhs, rhs.split(",").mkString("<", ",", ">"), 1.0 * support / frequency)
184         }
185       }
186     })
187 
188     // 2.3 过滤置信度太低的数据
189     val resultRDD = assocRulesRDD.filter(_._3 >= minConfidence)
190 
191     // 3. RDD数据保存
192     //resultRDD.collect()
193     FileSystem.get(sc.hadoopConfiguration).delete(new Path(savePath), true)
194     //resultRDD.repartition(1).saveAsTextFile(savePath)
195 
196     // ===========================================
197     sc.stop()
198   }
199 }

2.注意点(本地的完全运行)

  不需要开启服务,也不需要上传文件,讲文件保存在本地的方式

  

三:优化程序

  1.优化的是相集的个数

  2.使用广播变量

  1 package com.ibeifeng.senior.mba.association
  2 
  3 import org.apache.hadoop.fs.{FileSystem, Path}
  4 import org.apache.spark.broadcast.Broadcast
  5 import org.apache.spark.rdd.RDD
  6 import org.apache.spark.{SparkConf, SparkContext}
  7 
  8 import scala.collection.mutable
  9 
 10 /**
 11   * 使用SparkCore实现购物篮分析
 12   * Created by ibf on 01/12.
 13   */
 14 object FindAssociationRulesSparkCore {
 15   /**
 16     * 先从缓存中获取数据,如果不存在,直接重新获取
 17     *
 18     * @param items
 19     * @param size
 20     * @param cache
 21     * @return
 22     */
 23   def findItemSetsByCache(items: List[(String, Int)], size: Int, cache: mutable.Map[Int, List[List[(String, Int)]]]): List[List[(String, Int)]] = {
 24     cache.get(size).orElse {
 25       // 获取值
 26       val result = findItemSets(items, size, cache)
 27 
 28       // 更新缓存
 29       cache += size -> result
 30 
 31       // 返回值
 32       Some(result)
 33     }.get
 34   }
 35 
 36 
 37   /**
 38     * 构建项集基于items商品列表,项集中的商品数量是size指定
 39     *
 40     * @param items 商品列表:eg: [A, B, C]
 41     * @param size  最终项集包含商品的数量
 42     * @return
 43     */
 44   def findItemSets(items: List[(String, Int)], size: Int, cache: mutable.Map[Int, List[List[(String, Int)]]]): List[List[(String, Int)]] = {
 45     if (size == 1) {
 46       // items中的每个商品都是一个项集
 47       items.map(item => item :: Nil)
 48     } else {
 49       // 当size不是1的时候
 50       // 1. 获取项集大小为size-1的项集列表
 51       val tmpItemSets = findItemSetsByCache(items, size - 1, cache)
 52       // 2. 给tmpItemSets中添加一个新的不重复的项 ==> 数据的转换
 53       val itemSets = tmpItemSets.flatMap(itemSets => {
 54         // 给itemSets项集添加一个新的商品ID,要求不重复
 55         val newItemSets = items
 56           // 将包含的商品过滤掉&要求下标必须大于以及存在
 57           .filter(item => !itemSets.contains(item) && itemSets.forall(_._2 < item._2))
 58           // 将商品添加到项集中,产生一个新的项集
 59           // 为了使用distinct做去重操作,进行一个排序操作
 60           .map(item => (item :: itemSets))
 61 
 62         // 返回值
 63         newItemSets
 64       })
 65 
 66       // 返回项集的值
 67       itemSets
 68     }
 69   }
 70 
 71   def main(args: Array[String]): Unit = {
 72     val n = 10000
 73     // 1. 创建SparkContext
 74     val conf = new SparkConf()
 75       .setAppName(s"find-association-rules-${n}")
 76       .setMaster("local[*]")
 77     //      .set("spark.eventLog.enabled", "true")
 78     //      .set("spark.eventLog.dir","hdfs://hadoop-senior01:8020/spark-history")
 79     //      .set("spark.executor.memory","3g")
 80     val sc = SparkContext.getOrCreate(conf)
 81 
 82     // ===========================================
 83     // 测试数据存储的路径
 84     val path = s"data/transactions/${n}"
 85     val savePath = s"result2/${n}"
 86     // 最小支持度
 87     val minSupport = 2
 88     // 最小置信度
 89     val minConfidence = 0.1
 90 
 91     // 创建rdd读取原始的交易数据,
 92     // 假设交易数据是按行存储的,每行是一条交易,每条交易数据包含的商品ID使用","分割
 93     val rdd = sc.textFile(path, 20)
 94 
 95     // 过滤无效数据:对于在整个交易集合中出现比较少的商品过滤掉,先进行需要过滤的商品的RDD数据
 96     val minGoodCount = 3 // 要求商品在整个交易集中至少出现3次
 97     val needFilterGoodsRDD = rdd
 98       .flatMap(transaction => transaction
 99         .split(",")
100         .filter(!_.isEmpty)
101         .map(good => (good, 1))
102       )
103       .reduceByKey(_ + _)
104       .filter(_._2 < minGoodCount)
105       .map(_._1)
106     // 使用广播变量将数据广播输出
107     val needFilterGoods: Broadcast[List[String]] = sc.broadcast(needFilterGoodsRDD.collect().toList)
108 
109     // 1. 计算频繁项集
110     // 1.1 获取每条交易存在的项集
111     val itemSetsRDD: RDD[String] = rdd.flatMap(transaction => {
112       // 1) 获取当前交易所包含的商品ID
113       val goods: Array[String] = transaction
114         .split(",") // 分割
115         .filter(!_.isEmpty) // 过滤
116 
117 
118       // 将需要过滤的数据过滤掉
119       val items = (goods.toBuffer -- needFilterGoods.value)
120         .sorted //排序
121         .toList // 转换为list
122         .zipWithIndex // 将数据和下标合并,下标从0开始
123 
124       // 2) 构建辅助对象
125       // 最大的项集只允许存在5个项的,5怎么来?根据业务规则&根据运行之后的情况
126       val itemSize = Math.min(items.size, 5)
127       val cache = mutable.Map[Int, List[List[(String, Int)]]]()
128 
129       // 3) 根据获取的商品ID的信息产生项集
130       // allItemSets集合中最后数据量是:2^itemSize - 1
131       val allItemSets: List[List[String]] = (1 to itemSize).map(size => {
132         // 产生项集中项的数量是size的项集
133         findItemSets(items, size, cache)
134       }).foldLeft(List[List[String]]())((v1, v2) => {
135         v2.map(_.map(_._1)) ::: v1
136       })
137 
138       // 4) 返回结果
139       allItemSets.map(_.mkString(","))
140     })
141 
142     // 1.2 获取频繁项集
143     val supportedItemSetsRDD = itemSetsRDD
144       // 数据转换
145       .map(items => (items, 1))
146       // 聚合求支持度
147       .reduceByKey(_ + _)
148       // 过滤产生频繁项集
149       .filter(_._2 >= minSupport)
150 
151     // 2. 计算关联规则
152     // 2.1 对每个频繁项集获取子项集
153     val subSupportedItemSetsRDD = supportedItemSetsRDD.flatMap(tuple => {
154       val itemSets = tuple._1.split(",").toList.zipWithIndex // 频繁项集
155       val frequency = tuple._2 // 该频繁项集的支持度
156 
157       // 2) 构建辅助对象
158       val itemSize = itemSets.size
159       val cache = mutable.Map[Int, List[List[(String, Int)]]]()
160 
161       // 3) 获取子项集
162       val allSubItemSets: List[List[String]] = (1 to itemSize).map(size => {
163         // 产生项集中项的数量是size的项集
164         findItemSets(itemSets, size, cache)
165       }).foldLeft(List[List[String]]())((v1, v2) => {
166         v2.map(_.map(_._1)) ::: v1
167       })
168 
169       // 4) 转换数据并输出
170       val items = itemSets.map(_._1)
171       allSubItemSets.map(subItemSets => {
172         // (A,B,frequency) ==> 表示A出现的时候B也出现的次数是frequency次
173         // 当subItemSets就是itemSets的时候,返回的二元组的第二个元素的(元组)第一个元素是空的列表
174         (subItemSets.mkString(","), ((items.toBuffer -- subItemSets).toList.mkString(","), frequency))
175       })
176     })
177 
178     // 2.2 计算置信度
179     val assocRulesRDD = subSupportedItemSetsRDD
180       .groupByKey() // 数据聚合
181       .flatMap(tuple => {
182       // 计算执行度: (A, B, k) => A存在的时候B也存储的几率是k
183       // A就是tuple的第一个元素
184       // 获取左件
185       val lhs = tuple._1.split(",").mkString("<", ",", ">")
186 
187       // 获取左件在所有的交易中出现的总的次数 tuple._2中第一个元素为空的数据就是总的次数
188       val frequency = tuple._2
189         // 只要第一个元素为空的值,表示from本身
190         .filter(_._1.isEmpty)
191         // 需要的是第二个元素
192         .map(_._2).toList match {
193         case head :: Nil => head
194         case _ => {
195           throw new IllegalArgumentException("异常")
196         }
197       }
198 
199       // 计算右件出现次数占左件次数的百分比, 并返回最终结果
200       tuple._2
201         // 要求第一个数据非空
202         .filter(!_._1.isEmpty)
203         // 数据转换,获取置信度
204         .map {
205         case (rhs, support) => {
206           // 计算置信度
207           (lhs, rhs.split(",").mkString("<", ",", ">"), 1.0 * support / frequency)
208         }
209       }
210     })
211 
212     // 2.3 过滤置信度太低的数据
213     val resultRDD = assocRulesRDD.filter(_._3 >= minConfidence)
214 
215     // 3. RDD数据保存
216     FileSystem.get(sc.hadoopConfiguration).delete(new Path(savePath), true)
217     resultRDD.repartition(1).saveAsTextFile(savePath)
218 
219     // ===========================================
220     sc.stop()
221   }
222 }
原文地址:https://www.cnblogs.com/juncaoit/p/6390989.html