SparkMLlib—协同过滤之交替最小二乘法ALS原理与实践



相关内容原文地址:

CSDN:leboop:Spark MLlib协同过滤之交替最小二乘法ALS原理与实践
CSDN:Jantelope:Spark中MLlib中的ALS算法物品推荐代码实现;



一、Spark MLlib算法实现

数据准备:

1,101,5.0
1,102,3.0
1,103,2.5
2,101,2.0
2,102,2.5
2,103,5.0
2,104,2.0
3,101,2.5
3,104,4.0
3,105,4.5
3,107,5.0
4,101,5.0
4,103,3.0
4,104,4.5
4,106,4.0
5,101,4.0
5,102,3.0
5,103,2.0
5,104,4.0
5,105,3.5
5,106,4.0

第一列为用户id(userId),第二列为物品id(itemId),第三列为用户给物品的评分,转换成用户-物品评分矩阵后,如下:
在这里插入图片描述

1.1 显示反馈

Spark MLlib提供了两种API,一种基于RDD的,在spark.mllib下,该API已经进入维护状态,预计在Spark 3.0中放弃维护,最新的是基于DataFrame,该API在spark.ml下。

1.1.1 基于RDD

import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession
 
/**
  * 基于RDD的ALS API推荐Demo
  */
object ALSCFDemo {
 
//  case class Rating(userId: Int, itermId: Int, rating: Float)
 
  /**
    * 解析数据:将数据转换成Rating对象
    * @param str
    * @return
    */
  def parseRating(str: String): Rating = {
    val fields = str.split(",")
    assert(fields.size == 3)
    Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat)
  }
 
  def main(args: Array[String]): Unit = {
    //定义切入点
    val spark = SparkSession.builder().master("local").appName("ASL-Demo").getOrCreate()
    //读取数据,生成RDD并转换成Rating对象
    val ratingsRDD = spark.sparkContext.textFile("data/ratingdata.csv").map(parseRating)
    //隐藏因子数
    val rank=50
    //最大迭代次数
    val maxIter=10
    //正则化因子
    val labmda=0.01
    //训练模型
    val model=ALS.train(ratingsRDD,rank,maxIter,labmda)
    //推荐物品数
    val proNum=2
    //推荐
    val r=model.recommendProductsForUsers(proNum)
    //打印推荐结果
    r.foreach(x=>{
          println("用户 "+x._1)
          x._2.foreach(x=>{
            println(" 推荐物品 "+x.product+", 预测评分 "+x.rating)
            println()
          }
          )
          println("===============================")
        }
    )
 
  }
}

运行结果如下:

用户 4
 推荐物品 101, 预测评分 4.987222374679642
 
 推荐物品 104, 预测评分 4.498410352539908
 
===============================
用户 1
 推荐物品 101, 预测评分 4.9941397937874825
 
 推荐物品 104, 预测评分 4.482759123081623
 
===============================
用户 3
 推荐物品 107, 预测评分 4.9917963612098415
 
 推荐物品 105, 预测评分 4.50190214892064
 
===============================
用户 5
 推荐物品 101, 预测评分 4.023403087402049
 
 推荐物品 104, 预测评分 3.9938240731866506
 
===============================
用户 2
 推荐物品 103, 预测评分 4.985059400785903
 
 推荐物品 102, 预测评分 2.4974442131394214
 
===============================

均方根误差RESM:
在这里插入图片描述
T为真实值,在这里插入图片描述为预测值。

import com.leboop.mllib.ALSCFDemo.rems
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
 
/**
  * 基于RDD的ALS API推荐Demo
  */
object ALSCFDemo {
  /**
    * 解析数据:将数据转换成Rating对象
    *
    * @param str
    * @return
    */
  def parseRating(str: String): Rating = {
    val fields = str.split(",")
    assert(fields.size == 3)
    Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat)
  }
 
  /**
    * @param model 训练好的模型
    * @param data 真实数据
    * @param n 数据个数
    * @return 误差
    */
  def rems(model: MatrixFactorizationModel, data: RDD[Rating], n: Long): Double = {
    //预测值 Rating(userId,itermId,rating)
    val preRDD: RDD[Rating] = model.predict(data.map(d => (d.user, d.product)))
    //关联:组成(预测评分,真实评分)
    val doubleRating = preRDD.map(
      x => ((x.user, x.product), x.rating)
    ).join(
      data.map { x => ((x.user, x.product), x.rating) }
    ).values
    //计算RMES
    math.sqrt(doubleRating.map(x => math.pow(x._1 - x._2, 2)).reduce(_ + _) / n)
  }
 
  def main(args: Array[String]): Unit = {
    //定义切入点
    val spark = SparkSession.builder().master("local").appName("ASL-Demo").getOrCreate()
    //读取数据,生成RDD并转换成Rating对象
    val ratingsRDD = spark.sparkContext.textFile("data/ratingdata.csv").map(parseRating)
    //将数据随机分成训练数据和测试数据(权重分别为0.8和0.2)
    val Array(training, test) = ratingsRDD.randomSplit(Array(1, 0))
    //隐藏因子数
    val rank = 50
    //最大迭代次数
    val maxIter = 10
    //正则化因子
    val labmda = 0.01
    //训练模型
    val model = ALS.train(training, rank, maxIter, labmda)
    //计算误差
    val remsValue = rems(model, ratingsRDD, ratingsRDD.count)
    println("误差:  " + remsValue)
  }
}

结果如下:

误差:  0.011343969370562474

1.1.2 基于DataFrame

import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.sql.SparkSession
 
/**
  * ASL基于DataFrame的Demo
  */
object ALSDFDemo {
  case class Rating(userId: Int, itemId: Int, rating: Float)
  /**
    * 解析数据:将数据转换成Rating对象
    * @param str
    * @return
    */
  def parseRating(str: String): Rating = {
    val fields = str.split(",")
    assert(fields.size == 3)
    Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat)
  }
 
  def main(args: Array[String]): Unit = {
    //定义切入点
    val spark = SparkSession.builder().master("local").appName("ASL-DF-Demo").getOrCreate()
    //读取数据,生成RDD并转换成Rating对象
    import spark.implicits._
    val ratingsDF = spark.sparkContext.textFile("data/ratingdata.csv").map(parseRating).toDF()
    //将数据随机分成训练数据和测试数据(权重分别为0.8和0.2)
    val Array(training, test) = ratingsDF.randomSplit(Array(0.8, 0.2))
    //定义ALS,参数初始化
    val als = new ALS().setRank(50)
      .setMaxIter(10)
      .setRegParam(0.01)
      .setUserCol("userId")
      .setItemCol("itemId")
      .setRatingCol("rating")
    //训练模型
    val model = als.fit(training)
 
    //推荐:每个用户推荐2个物品
    val r = model.recommendForAllUsers(2)
 
    //关闭冷启动(防止计算误差不产生NaN)
    model.setColdStartStrategy("drop")
    //预测测试数据
    val predictions = model.transform(test)
 
    //定义rmse误差计算器
    val evaluator = new RegressionEvaluator()
      .setMetricName("rmse")
      .setLabelCol("rating")
      .setPredictionCol("prediction")
    //计算误差
    val rmse = evaluator.evaluate(predictions)
 
    //打印训练数据
    training.foreach(x=>println("训练数据: "+x))
    //打印测试数据
    test.foreach(x=>println("测试数据: "+x))
    //打印推荐结果
    r.foreach(x=>print("用户 "+x(0)+" ,推荐物品 "+x(1)))
    //打印预测结果
    predictions.foreach(x=>print("预测结果:  "+x))
    //输出误差
    println(s"Root-mean-square error = $rmse")
  }
}

运行结果如下:

训练数据: [1,101,5.0]
训练数据: [1,102,3.0]
训练数据: [1,103,2.5]
训练数据: [2,101,2.0]
训练数据: [2,102,2.5]
训练数据: [2,104,2.0]
训练数据: [3,101,2.5]
训练数据: [3,105,4.5]
训练数据: [3,107,5.0]
训练数据: [4,101,5.0]
训练数据: [4,103,3.0]
训练数据: [4,104,4.5]
训练数据: [4,106,4.0]
训练数据: [5,102,3.0]
训练数据: [5,103,2.0]
训练数据: [5,104,4.0]
训练数据: [5,105,3.5]
 
测试数据: [2,103,5.0]
测试数据: [3,104,4.0]
测试数据: [5,101,4.0]
测试数据: [5,106,4.0]
 
用户 1 ,推荐物品 WrappedArray([101,4.98618], [105,3.477826])
用户 3 ,推荐物品 WrappedArray([107,4.9931526], [105,4.499714])
用户 5 ,推荐物品 WrappedArray([104,3.9853115], [105,3.4996033])
用户 4 ,推荐物品 WrappedArray([101,5.000056], [104,4.5001974])
用户 2 ,推荐物品 WrappedArray([105,3.0707152], [102,2.4903712])
 
预测结果:  [5,101,4.0,3.1271331]
预测结果:  [2,103,5.0,1.0486442]
预测结果:  [5,106,4.0,1.8420099]
预测结果:  [3,104,4.0,1.4847627]
 
Root-mean-square error = 2.615265256309832

1.2 隐式反馈

与显式反馈基本相同,这里需要使用方法setImplicitPrefs()打开隐式反馈,代码如下:

val als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setImplicitPrefs(true)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating")

二、Spark中MLlib中的ALS算法物品推荐代码实现;

ALS 是交替最小二乘 (alternating least squares)的简称。

简单说明下原理:

从协同过滤的分类来说,ALS算法属于User-Item CF,也叫做混合CF。它同时考虑了User和Item两个方面。

用户和商品的关系,可以抽象为如下的三元组:<User,Item,Rating>。其中,Rating是用户对商品的评分,表征用户对该商品的喜好程度。

假设我们有一批用户数据,其中包含m个User和n个Item,则我们定义Rating矩阵,其中的元素表示第u个User对第i个Item的评分。

在实际使用中,由于n和m的数量都十分巨大,因此R矩阵的规模很容易就会突破1亿项。这时候,传统的矩阵分解方法对于这么大的数据量已经是很难处理了。

另一方面,一个用户也不可能给所有商品评分,因此,R矩阵注定是个稀疏矩阵。矩阵中所缺失的评分,又叫做missing item。
在这里插入图片描述

import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import scala.io.Source
import org.apache.spark.rdd.RDD
 
object BasedItermDemo {
  def main(args: Array[String]): Unit = {
     //屏蔽日志信息
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    //构建sc对象
    val conf = new SparkConf().setAppName("ALS iterm").setMaster("local")
    val sc = new SparkContext(conf)
    val seq = getSeqRDD("D:\tmp\ratingdata.txt")
    val ratings = sc.parallelize(seq, 1)
    val model = ALS.train(ratings, 15, 5, 0.01)
    /*
     * rating  评分矩阵
     * 经验值:
     * rank 是模型中隐语义因子的个数 推荐10-200 数据越大越准确,计算也就越复杂
     * iteration 模型迭代计算次数10-20 数据越大越准确,计算也就越复杂
     * lambda 惩罚函数的因数,是ALS的正则化参数,推荐值:0.01 
     */
    //获取用户ID 对每个用户 推荐2个商品
    val n = ratings.count()
    val userID = ratings.map(f=>f.user).distinct().collect().toArray
    for(s<-userID){
     val products = model.recommendProducts(s, 2)
     println("用户ID是:"+s)
     for(s<-products){
       println("推荐的商品是: "+s.product+"推荐的理由是: "+s.rating)
     }
     println("**********************")
    }
    //需要计算当前模型的误差RMSE
    val rmse = computeRMSE(model,ratings,n)
    println("当前模型误差值是:"+rmse)
  }
  
  //评分矩阵 ratings需要的是一个rdd 现在需要构建一个rdd,也就是一个序列
  def getSeqRDD(path:String):Seq[Rating]={
    val data = Source.fromFile(path).getLines().map(f=>f.split(",")match{
      case Array(user,product,rat)=>Rating(user.toInt,product.toInt,rat.toDouble)
    }).filter(f=>f.rating>0.0)
    
    if(data.isEmpty){
      println("数据错误")
      return null
    }else{
      data.toSeq
    }
  }
  //定义方法采用当前model 获取数据,误差是多少
  //3个参数,一个当前model值 以及 实际评分矩阵值,总共多少条评分数据
  def computeRMSE(model:MatrixFactorizationModel,data:RDD[Rating],n:Long):Double={
    /*获取等值条件
     * select 计算值,实际值
     * from model,data
     * where model.(user,product) = data.(user,product)
     */
    //等价条件 计算值
    val equal = model.predict((data.map(f=>(f.user,f.product))))
    //获取 计算值得矩阵
    val predictRating = equal.map(f=>((f.user,f.product),f.rating))
    //获取实际值得矩阵
    val realRating = data.map(f=>((f.user,f.product),f.rating))
    //将两个评分矩阵进行合并计算均方根误差  
    val predictAndReal = predictRating.join(realRating).values
    math.sqrt(predictAndReal.map(f=>(f._1-f._2)*(f._1-f._2)).reduce(_+_)/n)
    
  }
  
}

结果展示:

用户ID是:4
推荐的商品是: 101推荐的理由是: 4.998812482106746
推荐的商品是: 104推荐的理由是: 4.499347431643257
**********************
用户ID是:1
推荐的商品是: 101推荐的理由是: 4.986825763711995
推荐的商品是: 104推荐的理由是: 3.500596045600693
**********************
用户ID是:3
推荐的商品是: 107推荐的理由是: 4.992594312342743
推荐的商品是: 105推荐的理由是: 4.498912380875409
**********************
用户ID是:5
推荐的商品是: 104推荐的理由是: 4.019718120441245
推荐的商品是: 101推荐的理由是: 4.012689154167877
**********************
用户ID是:2
推荐的商品是: 103推荐的理由是: 4.981664427426299
推荐的商品是: 102推荐的理由是: 2.5151597024240413
**********************
当前模型误差值是:0.01120138424326239

当前代码采用的是经验值,对于大多数模型来书,经验值可以满足条件,模型中的值也可以自己测试

代码如下:

import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import scala.io.Source
import org.apache.spark.rdd.RDD
 
 
object BaedItermCFDemo01 {
  def main(args: Array[String]): Unit = {
    //屏蔽日志信息
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    //构建sc对象
    val conf = new SparkConf().setAppName("ALS iterm").setMaster("local")
    val sc = new SparkContext(conf)
    val seq = getSeqRDD("D:\tmp\ratingdata.txt")
    val ratings = sc.parallelize(seq, 1)
    //计算商品个数,用户个数,评分个数
    val pNum = ratings.map(f=>f.product).distinct().count()
    val uNum = ratings.map(f=>f.user).distinct().count()
    val rNum = ratings.map(f=>f.rating).count()
    //val model1 = ALS.train(ratings, 15, 5, 0.01)
    /*
     * rating  评分矩阵
     * 经验值:
     * rank 数据循环训练的此时 设置10-200 数据越大越准确,计算也就越复杂
     * iteration 模型迭代计算次数10-20 数据越大越准确,计算也就越复杂
     * lambda 惩罚函数的因数,是ALS的正则化参数,推荐值:0.01 
     */
    //推测最佳模型
    val ranks = List(5,15)
    val iterations = List(2,5)
    val lambdas = List(0.1,1)
    // 寻找 最佳模型
    var bestModel:Option[MatrixFactorizationModel] = None
    var bestRank = 0
    var bestIteration = -1
    var bestLambda = -1.0
    //初始误差,寻找最小误差
    var bestRMSE:Double= Double.MaxValue
    for(rank<-ranks;iteration<-iterations;lambda<-lambdas){
      //寻找最佳模型
      val model = ALS.train(ratings, rank, iteration, lambda)
      //计算每个数值的误差
      val rmse = computeRMSE(model,ratings,rNum)
      //判断得出最佳模型
      if(rmse<bestRMSE){
        bestModel = Some(model)
        bestRMSE = rmse
        bestRank = rank
        bestIteration = iteration 
        bestLambda = lambda
      }      
    }
    println("最佳模型:" + bestModel)
    println("最佳的RMSE:"+bestRMSE)
    println("最佳的Lambda:"+bestLambda)
    println("最佳的迭代次数Iteration:"+bestIteration)
   
  }
  
  //评分矩阵 ratings需要的是一个rdd 现在需要构建一个rdd,也就是一个序列
  def getSeqRDD(path:String):Seq[Rating]={
    val data = Source.fromFile(path).getLines().map(f=>f.split(",")match{
      case Array(user,product,rat)=>Rating(user.toInt,product.toInt,rat.toDouble)
    }).filter(f=>f.rating>0.0)
    
    if(data.isEmpty){
      println("数据错误")
      return null
    }else{
      data.toSeq
    }
  }
  //定义方法采用当前model 获取数据,误差是多少
  //3个参数,一个当前model值 以及 实际评分矩阵值,总共多少条评分数据
  def computeRMSE(model:MatrixFactorizationModel,data:RDD[Rating],n:Long):Double={
    /*获取等值条件
     * select 计算值,实际值
     * from model,data
     * where model.(user,product) = data.(user,product)
     */
    //等价条件 计算值
    val equal = model.predict((data.map(f=>(f.user,f.product))))
    //获取 计算值得矩阵
    val predictRating = equal.map(f=>((f.user,f.product),f.rating))
    //获取实际值得矩阵
    val realRating = data.map(f=>((f.user,f.product),f.rating))
    //将两个评分矩阵进行合并计算均方根误差  
    val predictAndReal = predictRating.join(realRating).values
    math.sqrt(predictAndReal.map(f=>(f._1-f._2)*(f._1-f._2)).reduce(_+_)/n)
    
  }
  
}

结果展示:

最佳模型:Some(org.apache.spark.mllib.recommendation.MatrixFactorizationModel@1e57b783)
最佳的RMSE:0.11470480237681456
最佳的Lambda:0.1
最佳的迭代次数Iteration:2
原文地址:https://www.cnblogs.com/aixing/p/13327216.html