Python+Spark2.0+hadoop学习笔记——Python Spark MLlib支持向量机二分类

支持向量机是一个应用很广的机器学习模型,利用核空间变换可以将数据从一个空间变换到另外一个空间当中,从而使得数据呈现出更清晰的分布。支持向量机不论是在工业界还是在学界都有举足亲重的意义,在学界,基于支持向量机的改进方法有很多,通过算法层面的改进可以得到一种针对特定数据情况的优质算法,从而能更好的解决实际问题。

接下来还是使用哪个辨别网站是否是长青网站或者是暂时网站。和前面的逻辑斯蒂思路一样,开始支持向量机的学习。

第一步:导入相应的库函数

import sys
from time import time
import pandas as pd
import matplotlib.pyplot as plt
from pyspark import SparkConf, SparkContext
from pyspark.mllib.classification import SVMWithSGD
from pyspark.mllib.regression import LabeledPoint
import numpy as np
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.feature import StandardScaler

第二步:准备数据

def get_mapping(rdd, idx):
return rdd.map(lambda fields: fields[idx]).distinct().zipWithIndex().collectAsMap()

def extract_label(record):
label=(record[-1])
return float(label)

def extract_features(field,categoriesMap,featureEnd):
categoryIdx = categoriesMap[field[3]]
categoryFeatures = np.zeros(len(categoriesMap))
categoryFeatures[categoryIdx] = 1
numericalFeatures=[convert_float(field) for field in field[4: featureEnd]]
return np.concatenate(( categoryFeatures, numericalFeatures))

def convert_float(x):
return (0 if x=="?" else float(x))

def PrepareData(sc):
print("Data loading...")
rawDataWithHeader = sc.textFile(Path+"data/train.tsv")
header = rawDataWithHeader.first()
rawData = rawDataWithHeader.filter(lambda x:x !=header)
rData=rawData.map(lambda x: x.replace(""", ""))
lines = rData.map(lambda x: x.split(" "))
print("The number of data" + str(lines.count()))
print("Before normalization")
categoriesMap = lines.map(lambda fields: fields[3]).
distinct().zipWithIndex().collectAsMap()
labelRDD = lines.map(lambda r: extract_label(r))
featureRDD = lines.map(lambda r: extract_features(r,categoriesMap,len(r) - 1))

for i in featureRDD.first():
print (str(i)+","),
stdScaler = StandardScaler(withMean=True, withStd=True).fit(featureRDD)
ScalerFeatureRDD=stdScaler.transform(featureRDD)
print("After normalization")
for i in ScalerFeatureRDD.first():
print (str(i)+","),
labelpoint=labelRDD.zip(ScalerFeatureRDD)
labelpointRDD=labelpoint.map(lambda r: LabeledPoint(r[0], r[1]))
(trainData, validationData, testData) = labelpointRDD.randomSplit([8, 1, 1])
print("trainData:" + str(trainData.count()) +
"validationData:" + str(validationData.count()) +
"testData:" + str(testData.count()))
return (trainData, validationData, testData, categoriesMap)

第三步:对模型进行训练

def PredictData(sc,model,categoriesMap):
print("Data loading...")
rawDataWithHeader = sc.textFile(Path+"data/test.tsv")
header = rawDataWithHeader.first()
rawData = rawDataWithHeader.filter(lambda x:x !=header)
rData=rawData.map(lambda x: x.replace(""", ""))
lines = rData.map(lambda x: x.split(" "))
print("The number of data" + str(lines.count()))
dataRDD = lines.map(lambda r: ( r[0] ,
extract_features(r,categoriesMap,len(r) )))
DescDict = {
0: "ephemeral",
1: "evergreen"
}
for data in dataRDD.take(10):
predictResult = model.predict(data[1])
print (" Web:" +str(data[0])+" " +
"Predict:"+ str(predictResult)+
"Inllustration:"+DescDict[predictResult] +" ")

第四步:对模型进行评估(需要调节的参数有numIterations、stepSize和regParam)(注意:在MLlib里的SVM的调节的参数和SVM标准算法的参数有很大区别,可能是内置的核函数已经设置好了,不需要对核函数进行调节)

ef evaluateModel(model, validationData):
score = model.predict(validationData.map(lambda p: p.features))
score = score.map(lambda score : float(score))
Labels = validationData.map(lambda p: p.label)
Labels = Labels.map(lambda Labels : float(Labels))
scoreAndLabels=score.zip(Labels)
metrics = BinaryClassificationMetrics(scoreAndLabels)
AUC=metrics.areaUnderROC
return(AUC)

def trainEvaluateModel(trainData,validationData,
numIterations, stepSize, regParam):
startTime = time()
model = SVMWithSGD.train(trainData, numIterations, stepSize, regParam)
AUC = evaluateModel(model, validationData)
duration = time() - startTime
print( " numIterations="+str(numIterations) +
" stepSize="+str(stepSize) +
" regParam="+str(regParam) +
" Time="+str(duration) +
" AUC = " + str(AUC) )
return (AUC,duration, numIterations, stepSize, regParam,model)

def evalParameter(trainData, validationData, evalparm,
numIterationsList, stepSizeList, regParamList):
metrics = [trainEvaluateModel(trainData, validationData,
numIterations,stepSize, regParam )
for numIterations in numIterationsList
for stepSize in stepSizeList
for regParam in regParamList ]
if evalparm=="numIterations":
IndexList=numIterationsList[:]
elif evalparm=="stepSize":
IndexList=stepSizeList[:]
elif evalparm=="regParam":
IndexList=regParamList[:]
df = pd.DataFrame(metrics,index=IndexList,
columns=['AUC', 'duration','numIterations', 'stepSize', 'regParam','model'])
showchart(df,evalparm,'AUC','duration',0.5,0.7 )

def showchart(df,evalparm ,barData,lineData,yMin,yMax):
ax = df[barData].plot(kind='bar', title =evalparm,figsize=(10,6),legend=True, fontsize=12)
ax.set_xlabel(evalparm,fontsize=12)
ax.set_ylim([yMin,yMax])
ax.set_ylabel(barData,fontsize=12)
ax2 = ax.twinx()
ax2.plot(df[[lineData ]].values, linestyle='-', marker='o', linewidth=2.0,color='r')
plt.show()

def evalAllParameter(trainData, validationData,
numIterationsList, stepSizeList, regParamList):
metrics = [trainEvaluateModel(trainData, validationData,
numIterations,stepSize, regParam )
for numIterations in numIterationsList
for stepSize in stepSizeList
for regParam in regParamList ]
Smetrics = sorted(metrics, key=lambda k: k[0], reverse=True)
bestParameter=Smetrics[0]
print("numIterations:" + str(bestParameter[2]) +
" ,stepSize:" + str(bestParameter[3]) +
" ,regParam:" + str(bestParameter[4]) +
" ,AUC = " + str(bestParameter[0]))
return bestParameter[5]

def parametersEval(trainData, validationData):
print("numIterations")
evalParameter(trainData, validationData,"numIterations",
numIterationsList= [1, 3, 5, 15, 25],
stepSizeList=[100],
regParamList=[1 ])
print("stepSize")
evalParameter(trainData, validationData,"stepSize",
numIterationsList=[25],
stepSizeList= [10, 50, 100, 200],
regParamList=[1])
print("regParam")
evalParameter(trainData, validationData,"regParam",
numIterationsList=[25],
stepSizeList =[100],
regParamList=[0.01, 0.1, 1 ])

第五步:Spark相关设置

def SetLogger( sc ):
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
logger.LogManager.getRootLogger().setLevel(logger.Level.ERROR)

def SetPath(sc):
global Path
if sc.master[0:5]=="local" :
Path="file:/home/jorlinlee/pythonsparkexample/PythonProject/"
else:
Path="hdfs://master:9000/user/jorlinlee/"

def CreateSparkContext():
sparkConf = SparkConf()
.setAppName("SVM")
.set("spark.ui.showConsoleProgress", "false")
sc = SparkContext(conf = sparkConf)
print ("master="+sc.master)
SetLogger(sc)
SetPath(sc)
return (sc)

sc.stop()

第六步:运行主程序

if __name__ == "__main__":
print("SVM")
sc=CreateSparkContext()
print("Preparing")
(trainData, validationData, testData, categoriesMap) =PrepareData(sc)
trainData.persist(); validationData.persist(); testData.persist()
print("Evaluating")
(AUC,duration, numIterations, stepSize, regParam,model)=
trainEvaluateModel(trainData, validationData, 3, 50, 1)
if (len(sys.argv) == 2) and (sys.argv[1]=="-e"):
parametersEval(trainData, validationData)
elif (len(sys.argv) == 2) and (sys.argv[1]=="-a"):
print("Best parameter")
model=evalAllParameter(trainData, validationData,
[1, 3, 5, 15, 25],
[10, 50, 100, 200],
[0.01, 0.1, 1 ])
print("Test")
auc = evaluateModel(model, testData)
print("AUC:" + str(auc))
print("Predict")
PredictData(sc, model, categoriesMap)

结果:

Web:http://www.lynnskitchenadventures.com/2009/04/homemade-enchilada-sauce.html
Predict:1Inllustration:evergreen

Web:http://lolpics.se/18552-stun-grenade-ar
Predict:1Inllustration:evergreen

Web:http://www.xcelerationfitness.com/treadmills.html
Predict:1Inllustration:evergreen

Web:http://www.bloomberg.com/news/2012-02-06/syria-s-assad-deploys-tactics-of-father-to-crush-revolt-threatening-reign.html
Predict:1Inllustration:evergreen

Web:http://www.wired.com/gadgetlab/2011/12/stem-turns-lemons-and-limes-into-juicy-atomizers/
Predict:1Inllustration:evergreen

Web:http://www.latimes.com/health/boostershots/la-heb-fat-tax-denmark-20111013,0,2603132.story
Predict:1Inllustration:evergreen

Web:http://www.howlifeworks.com/a/a?AG_ID=1186&cid=7340ci
Predict:1Inllustration:evergreen

Web:http://romancingthestoveblog.wordpress.com/2010/01/13/sweet-potato-ravioli-with-lemon-sage-brown-butter-sauce/
Predict:1Inllustration:evergreen

Web:http://www.funniez.net/Funny-Pictures/turn-men-down.html
Predict:1Inllustration:evergreen

Web:http://youfellasleepwatchingadvd.com/
Predict:1Inllustration:evergreen

原文地址:https://www.cnblogs.com/zhuozige/p/12627637.html