spark 机器学习

两种机器学习库ml 与 mlib

mllib contains old RDD-based API

ml contains new API build around Dataset and ML Pipelines

  分类 回归
逻辑回归 二分类和多分类 不支持回归
决策树 二分类和多分类 支持回归
随机森林 二分类和多分类 支持回归

  GBDT  二分类    支持回归

多分类问题:

1、逻辑回归

def multiclassLr(spark: SparkSession, saveDir: String, dataSet: RDD[LabeledPoint]): Unit = {
        import spark.implicits._
        val data = spark.sqlContext.createDataset(dataSet.map {
            labelPoint =>
                (labelPoint.label, labelPoint.features.asML)
        }).toDF("label", "features").cache()

        /*val idfModel = new IDF()
                .setInputCol("rawFeatures")
                .setOutputCol("features")
                .fit(data)
        val tfIdfData = idfModel.transform(data)*/

        val Array(trainingData, testData) = data.randomSplit(Array(0.8, 0.2), seed = 123)

        val lr = new LogisticRegression()
                .setMaxIter(5)
                .setRegParam(0.0)
                .setElasticNetParam(0.0)
                .setStandardization(true)
        val lrModel = LogisticRegressionModel.load("/lrmodel")
        //            lr.fit(trainingData)
        //        lrModel.write.overwrite.save("/lrmodel")
        val result = lrModel.transform(testData).cache()
        println(
            result.select("label", "prediction").rdd
                    .map {
                        row => ((row(0), row(1)), 1)
                    }
                    .reduceByKey(_ + _)
                    .sortBy { case ((label, _), count) => label.toString.toDouble * 100000 + count }
                    .collect().mkString("
")
        )
        val recall = result.select("label", "prediction").rdd
                .map { row =>
                    ((row(0), row(1)), 1)
                }
                .reduceByKey(_ + _)
                .map {
                    case ((label, pre), count) =>
                        ((label, label.toString.toDouble.toInt == pre.toString.toDouble.toInt
                                /*|| (label.toString.toDouble.toInt == 77 && pre.toString.toDouble.toInt == 75)
                                || (label.toString.toDouble.toInt == 76 && pre.toString.toDouble.toInt == 75)*/ ), count)
                }
                .reduceByKey(_ + _)
                .map {
                    case ((label, right), count) =>
                        (label, (if (right) 1 else 0, count))
                }
                .reduceByKey {
                    case ((right1, count1), (_, count2)) => {
                        (count1 + count2, if (right1 == 1) count1 else count2)
                    }
                }
                .map {
                    case (label, (all, right)) =>
                        if (all > 1) {
                            (label.toString.toDouble.toInt, (all, right * 1.0 / all))
                        } else {
                            (label.toString.toDouble.toInt, (all + right, all * 1.0))
                        }
                }
        val prediction = result.select("prediction", "label").rdd
                .map { row =>
                    ((row(0), row(1)), 1)
                }
                .reduceByKey(_ + _)
                .map {
                    case ((label, pre), count) =>
                        ((label, label.toString.toDouble.toInt == pre.toString.toDouble.toInt
                                /*|| (label.toString.toDouble.toInt == 77 && pre.toString.toDouble.toInt == 75)
                                || (label.toString.toDouble.toInt == 76 && pre.toString.toDouble.toInt == 75)*/ ), count)
                }
                .reduceByKey(_ + _)
                .map {
                    case ((label, right), count) =>
                        (label, (if (right) 1 else 0, count))
                }
                .reduceByKey {
                    case ((right1, count1), (_, count2)) => {
                        (count1 + count2, if (right1 == 1) count1 else count2)
                    }
                }
                .map {
                    case (label, (all, right)) =>
                        if (all > 1) {
                            (label.toString.toDouble.toInt, (all, right * 1.0 / all))
                        } else {
                            (label.toString.toDouble.toInt, (right, all * 1.0))
                        }
                }

        println(recall.join(prediction).sortBy { case (_, ((_, r), (_, _))) => 0 - r }.collect().mkString("
"))
        val evaluator = new MulticlassClassificationEvaluator()
                .setLabelCol("label")
                .setPredictionCol("prediction")
        val weightedPrecision = evaluator.setMetricName("weightedPrecision").evaluate(result)
        val weightedRecall = evaluator.setMetricName("weightedRecall").evaluate(result)
        val accuracy = evaluator.setMetricName("accuracy").evaluate(result)
        println("weightedPrecision = " + weightedPrecision)
        println("weightedRecall = " + weightedRecall)
        println("accuracy = " + accuracy)
    }
View Code

2、朴素贝叶斯

def naiveBayes(spark: SparkSession, dataSet: RDD[LabeledPoint]): Unit = {
        import spark.implicits._
        val data = spark.sqlContext.createDataset(dataSet.map {
            labelPoint =>
                (labelPoint.label, labelPoint.features.asML)
        }).toDF("label", "features")

        /*val idfModel = new IDF()
                .setInputCol("rawFeatures")
                .setOutputCol("features")
                .fit(data)
        val tfIdfData = idfModel.transform(data)*/

        val Array(trainingData, testData) = data.select("label", "features").map {
            case Row(label: Double, features: org.apache.spark.ml.linalg.Vector) =>
                new LabeledPoint(label.toDouble, Vectors.dense(features.toArray))
        }.randomSplit(Array(0.8, 0.2), seed = 13)
        /*val model = new NaiveBayes()
                .setSmoothing(1.0)
                .fit(trainingData)*/
        val model = org.apache.spark.mllib.classification.NaiveBayes.train(trainingData.rdd, lambda = 0.5)
        testData.cache()
        val result = testData.rdd.map { labeledPoint: LabeledPoint => ((model.predict(labeledPoint.features), labeledPoint.label), 1) }
                .reduceByKey(_ + _)
                .map {
                    case ((label, predict), count) =>
                        ((label, if (label == predict) 1 else 0), count)
                }
                .reduceByKey(_ + _)
                .map { case ((label, right), count) => (label, (right * 1.0, count * 1.0)) }
                .reduceByKey {
                    case ((right1, count1), (right2, count2)) =>
                        if (right1 > 0.5) {
                            (count1 + count2, count1 / (count1 + count2))
                        } else {
                            (count1 + count2, count2 / (count1 + count2))
                        }
                }
                .sortBy { case (_, (_, r)) => r }

        val result2 = testData.rdd.map { labeledPoint: LabeledPoint => ((labeledPoint.label, model.predict(labeledPoint.features)), 1) }
                .reduceByKey(_ + _)
                .map {
                    case ((label, predict), count) =>
                        ((label, if (label == predict) 1 else 0), count)
                }
                .reduceByKey(_ + _)
                .map { case ((label, right), count) => (label, (right * 1.0, count * 1.0)) }
                .reduceByKey {
                    case ((right1, count1), (_, count2)) =>
                        if (right1 > 0.5) {
                            (count1 + count2, count1 / (count1 + count2))
                        } else {
                            (count1 + count2, count2 / (count1 + count2))
                        }
                }
                .sortBy { case (_, (_, r)) => r }
        println("==============准确率============")
        println(result.collect().mkString("
"))
        println("===============召回===========")
        println(result2.collect().mkString("
"))
        /*val evaluator = new MulticlassClassificationEvaluator()
                .setLabelCol("label")
                .setPredictionCol("prediction")
        val weightedPrecision = evaluator.setMetricName("weightedPrecision").evaluate(result)
        val weightedRecall = evaluator.setMetricName("weightedRecall").evaluate(result)
        val accuracy = evaluator.setMetricName("accuracy").evaluate(result)
        println("weightedPrecision = " + weightedPrecision)
        println("weightedRecall = " + weightedRecall)
        println("accuracy = " + accuracy)*/
    }
View Code

3、随机森林

def randomForest(spark: SparkSession, dataSet: RDD[LabeledPoint], saveDir: String, argsMap: Map[String, String]): Unit = {
        val treeCount = argsMap.getOrElse("treeCount", "10").toInt
        val treeDeepth = argsMap.getOrElse("treeDeepth", "10").toInt

        import spark.implicits._
        val data = spark.sqlContext.createDataset(dataSet.map {
            labelPoint =>
                (labelPoint.label, labelPoint.features.asML)
        }).toDF("label", "features")

        val Array(trainData, testData) = dataSet.randomSplit(Array(0.8, 0.2))
        val model = org.apache.spark.mllib.tree.RandomForest.trainClassifier(trainData, 29, Map[Int, Int](), treeCount, "auto", "gini", treeDeepth, 2, 123)

        val labelAndPreds = testData.map { point =>
            val prediction = model.predict(point.features)
            (point.label, prediction)
        }
        val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count()
        spark.sparkContext.parallelize(Seq("Test Error = " + testErr)).saveAsTextFile(saveDir + "testErr")
        spark.sparkContext.parallelize(Seq("Learned classification forest model:
" + model.toDebugString)).saveAsTextFile(saveDir + "model")
        //        println("Test Error = " + testErr)
        //        println("Learned classification forest model:
" + model.toDebugString)
    }
View Code
原文地址:https://www.cnblogs.com/tengpan-cn/p/7850950.html