sparkSQL、dataframe

spark  读hive表:2.1.1

https://blog.csdn.net/qq_35741557/article/details/81135003

http://www.aboutyun.com/forum.php?mod=viewthread&tid=12358&page=1

空值填充:http://spark.apache.org/docs/1.5.0/api/python/_modules/pyspark/sql/dataframe.html

 spark 将dataframe数据写入Hive分区表:http://www.cnblogs.com/longjshz/p/5414051.html

 #df22.select("pkg","cnt01").sort("cnt01",ascending=False).show(100)  #按照某一个字段进行排序,降序

#从数据表读取数据,把数据读为数据框
df=sqlContext.sql("select * from zhangb.gedeng limit 2")

#把整张数据表变成数据框
df1=sqlContext.table("zhangb.gedeng")

 #把数据框转成rdd形式

dfrdd=df1.rdd #不正规
dfrdd1 =df1.rdd.map(tuple)
dfrdd2 =df1.rdd.map(list)

#把数据框注册为表
df1.registerTempTable("people")

# 将普通RDD转变为DataFrame
rdd = sparkContext.textFile("sex")
.flatMap(lambda line: line.split())
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a + b)
wordCounts = sqlContext.createDataFrame(rdd, ["word", "count"]

#实际数据练习rdd转换成df

rdd = sc.textFile("sex").map(lambda p :p.strip().split(' ')).
filter(lambda p:len(p)==3).map(lambda p:((re.split(";|,",p[2])),int(p[1]))).
flatMap(lambda p:[(p[0][i],p[1])for i in range(len(p[0])) if i%2==0]).filter(lambda p:p[0]!='')

pkg1 = sqlContext.createDataFrame(rdd, ["pkg", "sex"])

# 将本地数据容器转变为DataFrame
da = [("Alice", 21), ("Bob", 24)]
people = sqlContext.createDataFrame(da, ["name", "age"])

db=[("Alice", 100,46), ("Bob", 39,47),("cele", 89,30)]
score=sqlContext.createDataFrame(db,['name','math','eng'])

#join
dc=people.join(score,people.name==score.name,"left_outer")

# 将Pandas DataFrame转变为Spark DataFrame(Python API特有功能)
sparkDF = sqlContext.createDataFrame(pandasDF)

#=========对数据框进行查看操作

# 创建一个只包含"年轻"用户的DataFrame
young = users.filter(users.age < 21)

# 也可以使用Pandas风格的语法
young = users[users.age < 21]

# 将所有人的年龄加1

young2=young.select(young.name, young.age + 1)

# 统计年轻用户中各性别人数
young.groupBy("gender").count().show()

# 将所有年轻用户与另一个名为logs的DataFrame联接起来(合并)
young.join(logs, logs.userId == users.userId, "left_outer")

# 除DSL以外,我们当然也可以像以往一样,用SQL来处理DataFrame:

df1.registerTempTable("dd")

#==============保存输出

#最后,当数据分析逻辑编写完毕后,我们便可以将最终结果保存下来或展现出来:
# 保存为SQL表
young.saveAsTable(tableName="young", source="parquet" mode="overwrite")

# 转换为Pandas DataFrame(Python API特有功能)
pandasDF = young.toPandas()

#追加至HDFS上的Parquet文件
young.save(path="hdfs://path/to/data.parquet",
source="parquet",
mode="append")

#覆写S3上的JSON文件

young.save(path="s3n://path/to/data.json",
source="json",
mode="append")

#空值填充 

 

 1 pyspark --master yarn-client --executor-memory 5G --num-executors 50 
 2 import os
 3 import copy
 4 import codecs 
 5 import operator
 6 import re
 7 from math import log  
 8 from pyspark.sql import SQLContext,Row
 9 from pyspark.mllib.regression import LabeledPoint
10 from pyspark import SparkContext, SparkConf
11 
12 #从数据表读取数据,把数据读为数据框
13 df=sqlContext.sql("select * from zhangb.gedeng limit 2") 
14 
15 #把整张数据表变成数据框
16 df1=sqlContext.table("zhangb.gedeng") 
17 
18 #把数据框注册为表
19 df1.registerTempTable("people")
20 
21 # 将普通RDD转变为DataFrame
22 rdd = sparkContext.textFile("sex") 
23                   .flatMap(lambda line: line.split()) 
24                   .map(lambda word: (word, 1)) 
25                   .reduceByKey(lambda a, b: a + b) 
26 wordCounts = sqlContext.createDataFrame(rdd, ["word", "count"]
27                                         
28 #实际数据练习rdd转换成df
29                                      
30 rdd = sc.textFile("sex").map(lambda p :p.strip().split('	')).
31 filter(lambda p:len(p)==3).map(lambda p:((re.split(";|,",p[2])),int(p[1]))).
32 flatMap(lambda p:[(p[0][i],p[1])for i in range(len(p[0])) if i%2==0]).filter(lambda p:p[0]!='')                                
33 
34 pkg1 = sqlContext.createDataFrame(rdd, ["pkg", "sex"])
35     
36 # 将本地数据容器转变为DataFrame
37 da = [("Alice", 21), ("Bob", 24)]
38 people = sqlContext.createDataFrame(da, ["name", "age"])
39 
40 db=[("Alice", 100,46), ("Bob", 39,47),("cele", 89,30)]
41 score=sqlContext.createDataFrame(db,['name','math','eng'])
42 
43 #join
44 dc=people.join(score,people.name==score.name,"left_outer")
45 
46 # 将Pandas DataFrame转变为Spark DataFrame(Python API特有功能)
47 sparkDF = sqlContext.createDataFrame(pandasDF)    
48                       
49 #=========对数据框进行查看操作
50 
51 # 创建一个只包含"年轻"用户的DataFrame
52 young = users.filter(users.age < 21)
53 
54 # 也可以使用Pandas风格的语法
55 young = users[users.age < 21]
56 
57 # 将所有人的年龄加1
58 
59 young2=young.select(young.name, young.age + 1)
60 
61 # 统计年轻用户中各性别人数
62 young.groupBy("gender").count().show()
63 
64 # 将所有年轻用户与另一个名为logs的DataFrame联接起来
65 young.join(logs, logs.userId == users.userId, "left_outer")
66 
67 # 除DSL以外,我们当然也可以像以往一样,用SQL来处理DataFrame:
68 
69 df1.registerTempTable("dd")
70 
71 #==============保存输出
72 
73 #最后,当数据分析逻辑编写完毕后,我们便可以将最终结果保存下来或展现出来:
74 # 保存为SQL表
75 young.saveAsTable(tableName="young", source="parquet" mode="overwrite")
76 
77 # 转换为Pandas DataFrame(Python API特有功能)
78 pandasDF = young.toPandas()
79 
80 #追加至HDFS上的Parquet文件
81 young.save(path="hdfs://path/to/data.parquet",
82            source="parquet",
83            mode="append")
84 
85 #覆写S3上的JSON文件
86 
87 young.save(path="s3n://path/to/data.json",
88            source="json",
89            mode="append")
原文地址:https://www.cnblogs.com/zhangbojiangfeng/p/6073964.html