【PySpark】学习记录1

一. Spark介绍

Spark是一个分布式计算平台。运算速度远超于HDFS,并且能与python、java更好地交互。
我的疑问:在数据处理/模型训练的过程中,Spark这个平台是需要我手动写一些代码,例如读取数据啥的,还是我只要在带有pyspark的kernal的平台上运行就可以?kernal是什么??为什么我在NAIE平台上选了pyspark的kernal,接下来就会报错呢?处理方式(数据读写这些)又不一样吗?

二. 今天的代码

导入的所需要的包

import os
from operator import add
from pyspark import SparkContext

查看当前文件所在路径。被路径整怕了……

os.getcwd()

输出:

'/home/ma-user/work'

查看这个路径下有什么文件:

os.listdir('/home/ma-user/work')

输出:

['naie_platform', '__train.json', 'preprocess.ipynb',  'requirements.txt']

可以看到,我自己建了一个testSpark.txt并没有显示在这里。。

1. 想要实现的功能:统计txt文件里单词数目

  • 读取文件并分割字符串:
if len(sys.argv) < 2:
        print("Usage:wordcount <filepath>") # ???
        exit(-1)
    # initialize sparkcontext
#     sc = SparkContext(appName="Python_Word_Count") # 实际上不需要这个,会报错,因为默认已经有了一个?或者只需要运行1次,最好是与sc.stop()一起用避免错误
    
# 将文本数据读为一个存放字符串的RDD
lines = sc.textFile('/home/ma-user/work/preprocess/requirements.txt') # sys.argv[1]是个json文件,但我不懂它是什么,也不知道会不会引起报错
# lines = sc.textFile('sys.argv[0]') # 这个函数大概不能读取.py 或者.json文件吧 反正会报错
# 把字符串切分成单词
words = lines.flatMap(lambda x:x.split(' '))
words.collect()

输出:

['#name',
 '[condition]',
 '[version]',
 '#condition',
 '',
 '',
 '',
 '==,',
 '>=,',
 '<=,',
 '>,',
 '<',
 '#tensorflow==1.8.1',
 'naie']
  • 每个单词映射为(x,1)的样子方便统计数目,利用map功能:
mapWords = words.map(lambda x:(x,1)) # PythonRDD[11] at RDD at PythonRDD.scala:52
mapWords.collect()

输出:

[('#name', 1),
 ('[condition]', 1),
 ('[version]', 1),
 ('#condition', 1),
 ('', 1),
 ('', 1),
 ('', 1),
 ('==,', 1),
 ('>=,', 1),
 ('<=,', 1),
 ('>,', 1),
 ('<', 1),
 ('#tensorflow==1.8.1', 1),
 ('naie', 1)]
  • 合并相同键值,实现统计单词数目。
    如果没有collect()这个函数,每个函数返回的都是一个PythonRDD,看不出RDD里的值的。
combine_same_keys = mapWords.reduceByKey(add) # PythonRDD[17] at RDD at PythonRDD.scala:52 
combine_same_keys.collect()

输出:

[('[version]', 1),
 ('#condition', 1),
 ('', 3),
 ('==,', 1),
 ('>=,', 1),
 ('naie', 1),
 ('#name', 1),
 ('[condition]', 1),
 ('<=,', 1),
 ('>,', 1),
 ('<', 1),
 ('#tensorflow==1.8.1', 1)]
  • 打印统计结果:
for (keys, counts) in combine_same_keys.collect():
    print(keys, counts)

输出:

[version] 1
#condition 1
 3
==, 1
>=, 1
naie 1
#name 1
[condition] 1
<=, 1
>, 1
< 1
#tensorflow==1.8.1 1
  • 关闭RDD
    一开始只要打开一次sc,然后关闭了之后下一次运行就需要再初始化一次textFile
sc.stop() # 关闭spark, 关闭后就会提示:AttributeError: 'NoneType' object has no attribute 'sc'
  • 想试一试其他功能:
    word_add1 = lines.flatMap(lambda x:x.split(' ')) # 对数据格式也有要求,并不会帮你把单词转为什么东西然后+1,你看这个.map(lambda x:x+1)就不行,提示TypeError:TypeError: must be str, not int
    word_add1.collect()

  • 过滤掉重复的:

filter_same = word_add1.distinct()
filter_same.collect()

输出:

['[version]',
 '#condition',
 '',
 '==,',
 '>=,',
 'naie',
 '#name',
 '[condition]',
 '<=,',
 '>,',
 '<',
 '#tensorflow==1.8.1']
  • 筛选,filter是保留符合条件的,也就是将不等于'==,'和''的字符留下:
# filter_same.filter(lambda x:x!=('' and'==,'and'>=,'and'<'and'>,')).collect() # 这样一个都删不掉.. and/or都一样
filter_same.filter(lambda x:x!= '==,' and x!='' ).collect() # 这样可以删掉两个
['[version]',
 '#condition',
 '>=,',
 'naie',
 '#name',
 '[condition]',
 '<=,',
 '>,',
 '<',
 '#tensorflow==1.8.1']

2. 总结:整体流程是先创建一个RDD,然后对它进行操作

例:对一个数据为{1,2,3,3}的RDD进行基本RDD转化操作

行动操作:

三. 报错

遇见的报错:

# ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=Python_Word_Count, master=local[*]) created by __init__ at <ipython-input-46-e6dabb8e53ad>:1 

出错语句:

sc = SparkContext(appName="Python_Word_Count")

原因是这个只要打开一次,在没有关闭之前,再次输入这个语句都会提示不能同时运行多个SparkContexts。

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe : org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/ma-user/work/readData/testSpark.text

找不到该文件,查看该路径下是否有这个文件(不要用眼睛看。。)

![](https://img2020.cnblogs.com/blog/2037199/202008/2037199-20200813220715363-996762160.png)

解决方法:给file加上单引号,变成'file'

原文地址:https://www.cnblogs.com/sweetsmartrange/p/13492316.html