pyspark连接mysql

from pyspark import SparkContext  
from pyspark.sql import SQLContext 

if __name__=="__main__":
    sc = SparkContext(appName="local")  
    sqlContext = SQLContext(sc)  
    df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/test?user=root&password=root",dbtable="test_customer").load()  
    df.show() 
    sc.stop()

如果报错 no suitable driver

需要把连接mysqljdbcjar包拷到spark文件夹里的jars文件夹里。

附:使用SQL查询的代码

from pyspark import SparkContext  
from pyspark.sql import SQLContext 

if __name__=="__main__":
    sc = SparkContext(appName="local")  
    sqlContext = SQLContext(sc)  
    df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/test?user=root&password=root",dbtable="test_customer").load()  
    df.registerTempTable("test1");
    ls = sqlContext.sql("select * from test1 where did = 1").collect()
    for it in ls:
        print("1")
    sc.stop()

 再附,HiveContext的使用及RDD转DataFrame:

from pyspark import SparkContext  
from pyspark.sql import HiveContext,SQLContext,Row 

if __name__=="__main__":
    sc = SparkContext(appName="local")
    hc = HiveContext(sc) #HiveContext
    sqlContext = SQLContext(sc) #SqlContext
    datas = ["1 a 28","2 b 29", "3 c 30"]
    source = sc.parallelize(datas) #加载数组
    splits = source.map(lambda line: line.split(" ")) #map方法返回的RDD格式的数据
    rows = splits.map(lambda words: Row(id = words[0], name = words[1], age = words[2]))
    structType = hc._inferSchema(rows);  #获得StructType
    people = sqlContext.createDataFrame(rows, structType)  #通过StructType和查询出来的数据转换成DataFrame
    people.registerTempTable("people") #注册表
    results = hc.sql("select name from people").collect()
    #results1 = results.map(lambda row: row.name.upper()).collect()
    for result in results:
        print("name:"+result.name)
    sc.stop()
原文地址:https://www.cnblogs.com/wpcnblog/p/8086505.html