SparkSQL

Spark SQL 增加了DataFrame 即带有Schema信息的RDD

 DataFrame 创建

启动pyspark(由于内存不够 启动本地,模式)

pyspark --master local

pyspark 自动生成 sc,sparksession

from pyspark import SparkContext,SparkConf

from pyspark.sql import SparkSession

spark=SparkSession.builder.config(conf=SparkConf()).getOrCreate()

df=spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")

df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

1。利用反射机制推断RDD

schemaPeople.createOrReplaceTempView("people")的people 是一个名称

persionDF是新建的DataFrame 每个sql就是一个DataFrame

下面从rdd转到df再到rdd

 2。使用编程方法定义RDD

 for循环2次

名字,数据类型,可否为空

 

记录放前面,表头放后面

sparksql读取Mysql

 Mysql数据库的准备:

http://dblab.xmu.edu.cn/blog/install-mysql/ mysql安装

  1. sudo apt-get update #更新软件源
  2. sudo apt-get install mysql-server #安装mysql
  3. service mysql start

 sudo mysql 启动mysql

修改用户 hadoop

select user, host from mysql.user;      看当前的所有用户

grant all privileges on *.* to 'hadoop'@'localhost' identified by "Unsw2016";  给hadoop权限并设密码

flush privileges;    刷新权限

show grants for 'hadoop'@'localhost';     看权限

sql基本操作:

https://www.jianshu.com/p/652495b6fea6

mysql -u hadoop -p
启动mysql

 

https://dev.mysql.com/downloads/connector/j/ 

使用spark sql读取mysql

通过JDBC链接mysql

老是他秒的读不出 必须在pyspark启动时添加jar包

pyspark --master local --jars /usr/local/spark/jars/mysql-connector-java-5.1.48.jar

pyspark下执行  通过jdbc连接mysql

jdbcDF=spark.read.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://localhost:3306/spark")
option("dbtable","student").option("user","hadoop").option("password","Unsw2016").load()

jdbcDF.show()

通过从mysql中读取数据生成dataframe

使用spark sql写入mysql

 Spark官方推荐使用Row对象来代替dict: from pyspark.sql import Row >>> User = Row('name', 'id', 'age')

先生成表头(模式信心)structuredType提供一个列表对象

没一个structField描述一个字段,Ture代表可以为空

3步:得到表头,得到表中记录,表头和表中记录拼接形成打他frame,第四步是写入底部数据库

表中记录是row对象封装的记录

 .map操作不会改变元素的个数

p代表列表 并去掉尾部空格

 

from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession

spark=SparkSession.builder.config(conf=SparkConf()).getOrCreate()


#model info table head, table record and combine them

schema = StructType([StructField("id",IntegerType(),True),
        StructField("name",StringType(),True),
        StructField("gender",StringType(),True),
        StructField("age",IntegerType(),True)])

#above is table head, below set two data,means two students
studentRDD=spark.sparkContext
        .parallelize(["3 Rongcheng M 26","4 Guanhua M 27"])
        .map(lambda x:x.split(" "))

#below create Row object, every Row object is a line in rowRDD
rowRDD=studentRDD.map(lambda p:Row(int(p[0].strip()),p[1].strip(),p[2].strip(),int(p[3].strip())))

#combine table head and table content
studentDF=spark.createDataFrame(rowRDD,schema)

#write into database
prop={}
prop["user"]="hadoop"
prop["password"]="Unsw2016"
prop["driver"]="com.mysql.jdbc.Driver"
studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark","student","append",prop)

 写入成功

 下面复习所有spark知识点,python看完

原文地址:https://www.cnblogs.com/cschen588/p/11827898.html