【Spark机器学习速成宝典】基础篇03数据读取与保存(Python版)

目录

  保存为文本文件:saveAsTextFile

  保存为json:saveAsTextFile

  保存为SequenceFile:saveAsSequenceFile

  读取hive


保存为文本文件:saveAsTextFile

 

# -*-coding=utf-8 -*-  
from pyspark import SparkConf, SparkContext
sc = SparkContext('local')

sc.parallelize([1, 2, 3, 4]).saveAsTextFile("a") #保存

print sc.textFile("a").collect() #读取

 返回目录

保存为json:saveAsTextFile 

 

# -*-coding=utf-8 -*-  
from pyspark import SparkConf, SparkContext
import json
sc = SparkContext('local')

sc.parallelize({(11,2),(103,4),(103,6)}).sortByKey().map(lambda x:json.dumps(x)).saveAsTextFile("b") #保存

print sc.textFile("b").collect() #读取

 返回目录

保存为SequenceFile:saveAsSequenceFile 

 

# -*-coding=utf-8 -*-  
from pyspark import SparkConf, SparkContext
sc = SparkContext('local')

sc.parallelize({("11","2"),("103","4"),("103","6")}).saveAsSequenceFile("c") #保存

print sc.sequenceFile("c","org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable").collect() #读取

 返回目录

读取hive 

 

# -*- coding: utf-8 -*-
from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext

conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)

hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("select * from db.table where pt=20171111 limit 10")
firstRow = rows.first()
print firstRow.userid

'''运行结果:
11308542
'''

提交到集群:

/opt/local/spark-2.1.1-bin-hadoop2.7/bin/spark-submit /home/rainymorns/aaa.py --master yarn-cluster --executor-memory 1G --num-executors 1

 返回目录

原文地址:https://www.cnblogs.com/itmorn/p/7999111.html