Spark mllib 随机森林算法的简单应用(附代码)

此前用自己实现的随机森林算法,应用在titanic生还者预测的数据集上。事实上,有很多开源的算法包供我们使用。无论是本地的机器学习算法包sklearn 还是分布式的spark mllib,都是非常不错的选择。
  Spark是目前比较流行的分布式计算解决方案,同时支持集群模式和本地单机模式。由于其通过scala语言开发,原生支持scala,同时由于python在科学计算等领域的广泛应用,Spark也提供了python的接口。

Spark的常用操作详见官方文档:
http://spark.apache.org/docs/latest/programming-guide.html


在终端下面键入如下命令,切换到spark的目录,进入相应的环境:
cd $SPARK_HOME

cd ./bin

./pyspark

可以看到,出现了python 的版本号以及spark的logo

此时,仍然是输入一句,运行一句并输出。可以事先编辑好脚本保存为filename然后:

./spark-submit filename

下面给出详细的代码:

[python] view plain copy
 
print?
  1. import pandas as pd  
  2. import numpy as np  
  3. from pyspark.mllib.regression import LabeledPoint  
  4. from pyspark.mllib.tree import RandomForest  
  5.   
  6.   
  7. #将类别数量大于2的类别型变量进行重新编码,并把数据集变成labeledPoint格式  
  8. #df=pd.read_csv('/home/kim/t.txt',index_col=0)  
  9. #for col in ['Pclass','embrk']:  
  10. #    values=df[col].drop_duplicates()  
  11. #    for v in values:      
  12. #        col_name=col+str(v)  
  13. #        df[col_name]=(df[col]==v)  
  14. #        df[col_name]=df[col_name].apply(lambda x:int(x))  
  15. #df=df.drop(['Pclass','embrk'],axis=1)  
  16. #df.to_csv('train_data')  
  17.   
  18. #读入数据集变成弹性分布式数据集RDD ,由于是有监督学习,需要转换为模型输入的格式LabeledPoint  
  19. rdd=pyspark.SparkContext.textFile('/home/kim/train')  
  20. train=rdd.map(lambda x:x.split(',')[1])  
  21. train=train.map(lambda line:LabeledPoint(line[1],line[2:]))  
  22.   
  23. #模型训练  
  24. model=RandomForest.trainClassifier  
  25. (train, numClasses=2, categoricalFeaturesInfo={},numTrees=1000,  
  26. featureSubsetStrategy="auto",impurity='gini', maxDepth=4, maxBins=32)  
  27.   
  28. #包含LabeledPoint对象的RDD,应用features方法返回其输入变量的值,label方法返回其真实类别  
  29. data_p=train.map(lambda lp:lp.features)  
  30. v=train.map(lambda lp:lp.label)  
  31. prediction=model.predict(data_p)  
  32. vp=v.zip(prediction)  
  33.   
  34. #最后输出模型在训练集上的正确率  
  35. MSE=vp.map(lambda x:abs(x[0]-x[1]).sum())/vp.count()  
  36. print("MEAN SQURE ERROR: "+str(MSE))  
import pandas as pd
import numpy as np
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import RandomForest


#将类别数量大于2的类别型变量进行重新编码,并把数据集变成labeledPoint格式
#df=pd.read_csv('/home/kim/t.txt',index_col=0)
#for col in ['Pclass','embrk']:
#    values=df[col].drop_duplicates()
#    for v in values:    
#        col_name=col+str(v)
#        df[col_name]=(df[col]==v)
#        df[col_name]=df[col_name].apply(lambda x:int(x))
#df=df.drop(['Pclass','embrk'],axis=1)
#df.to_csv('train_data')

#读入数据集变成弹性分布式数据集RDD ,由于是有监督学习,需要转换为模型输入的格式LabeledPoint
rdd=pyspark.SparkContext.textFile('/home/kim/train')
train=rdd.map(lambda x:x.split(',')[1])
train=train.map(lambda line:LabeledPoint(line[1],line[2:]))

#模型训练
model=RandomForest.trainClassifier
(train, numClasses=2, categoricalFeaturesInfo={},numTrees=1000,
featureSubsetStrategy="auto",impurity='gini', maxDepth=4, maxBins=32)

#包含LabeledPoint对象的RDD,应用features方法返回其输入变量的值,label方法返回其真实类别
data_p=train.map(lambda lp:lp.features)
v=train.map(lambda lp:lp.label)
prediction=model.predict(data_p)
vp=v.zip(prediction)

#最后输出模型在训练集上的正确率
MSE=vp.map(lambda x:abs(x[0]-x[1]).sum())/vp.count()
print("MEAN SQURE ERROR: "+str(MSE))

后面可以多加测试,例如:

使用更大规模的数据集;

将数据集划分为训练集测试集,在训练集上建模在测试集上评估模型性能;

使用mllib里面的其他算法并比较效果,等等

欢迎大家与我交流!

原文地址:https://www.cnblogs.com/honey01/p/8044203.html