Spark SQL

 1 from pyspark.sql import HiveContext
 2 from pyspark import SparkContext,SparkConf
 3 import pyspark.sql.functions as F
 4 from  pyspark.sql import SparkSession
 5 
 6 conf = SparkConf().setAppName("abc")
 7 sc = SparkContext(conf=conf)
 8 hiveCtx = HiveContext(sc)
 9 df  = hiveCtx.sql(sql) #用Hive拉数
10 
11 df.cache()  # 数据载入缓存
12 df.show()   # 不加参数默认展示前20行
13 df.count()  # 统计行数
14 df.printSchema() # 查看schema
15 df.columns # 查看字段
16 df.dtypes # 查看字段类型
17 df.select('age','name') # 带show才能看到结果
18 df.select(df.age.alias('age_value'),'name').show() #别名
19 df.filter(df.name=='Alice').show() # 筛选
20 df.drop_duplicates() #删除重复记录
21 df.distinct() #去重
22 df.drop('id') #删除列
23 df.na.drop(thresh=2).show() #如果一行至少2个缺失值才删除该行
24 df.na.fill('unknown').show() #对所有列用同一个值填充缺失值
25 df.na.fill({'name':'--', 'age':0}).show() # 不同的列用不同的值填充
26 df.groupby('name').agg(F.max(df['age'])) # 分组计算
27 df.groupby('name').agg(F.max(df['age'])) # join
28 df.describe("age").show() # 描述性统计分析
29 spark.catalog.listTables()  #查看temptable
30 
31 df.select(df.age+1,'age','name') # 增加列
32 df.select(F.lit(0).alias('id'),'age','name') # 增加列
33 df.unionAll(df2) # 增加行
34 
35 #spark = SparkSession.builder.master("local").appName("Word Count").config("spark.some.config.option", "some-value").getOrCreate()
36 spark=SparkSession.builder.appName("boye").getOrCreate()
37 #d = [{"name": "Alice", "age": 12},{"name": "Bob", "age": 53}]
38 #df = spark.createDataFrame(d)
39 df =spark.read.json("file:///usr/local/test/01.json")
40 #df = spark.read.csv(path=path,schema=["id","name"],sep="	",header=False)
41 df.show()
42 df.createTempView("student")
43 df.createOrReplaceTempView("student") #全局临时表,spark.sql("select avg(age) from global_temp.student").show()
44 spark.newSession().sql("SELECT * FROM global_temp.student").show()
45 df.createGlobalTempView("student")
46 spark.sql("select * from student where age<20").show()
原文地址:https://www.cnblogs.com/boye169/p/14678846.html