大数据实践(十) Spark多种开发语言、与Hive集成

Spark 可以使用scala、Java、Sql、Python、R语言进行开发。

在bin目录下也提供了spark-shell、spark-sql、sparkR、pyspark等交互方式。

SparkSQL实现了Hive的模型、Hive在新版本中也建议使用Spark作为计算引擎。

一、Spark实现wordCount(TopK)

使用以下文本进行词频统计。

Java hadoop Spark  Hbase
Spark hadoop Java 
hive mysql
hadoop Spark hive	ClickHouse
Spark Flink hadoop
Java scala hadoop
Spark hadoop Java,hadoop
0、HiveQL/SparkSQL

在hive中就是写sql,然后转换为MR。现在Hive已经建议使用SparkTez等作为计算引擎。

hive命令行和spark-sql命令行都是写sql,语句基本一样。

Spark bin目录下也有sparkR工具可以使用,和这种方式基本一样,就是写SQL.

select t.word word,count(word) as count from (select explode(split(name,'\s+')) as word from sparkdemotable) t group by word;

--hive beeline命令行
+-----------------+--------+
|      word       | count  |
+-----------------+--------+
|                 | 1      |
| Flink           | 1      |
| Hbase           | 1      |
| Java            | 3      |
| Java,hadoop     | 1      |
| Spark           | 5      |
| hadoop          | 6      |
| hive            | 1      |
| hiveClickHouse  | 1      |
| mysql           | 1      |
| scala           | 1      |
+-----------------+--------+
11 rows selected (2.162 seconds)



--求topkey也很方便,写sql就行
select t.word word,count(word) as count from (select explode(split(name,'\s+')) as word from sparkdemotable) t group by word order by count desc limit 3;

+---------+--------+
|  word   | count  |
+---------+--------+
| hadoop  | 6      |
| Spark   | 5      |
| Java    | 3      |
+---------+--------+
3 rows selected (3.122 seconds)

1、Scala

Scala开发Spark比较方便快捷。

在交互式环境下连接到HDFS,使用RDD进行词频统计、排序。

scala> var res=sc.textFile("hdfs://192.168.x.x:9000/hadoop/SparkDemo.txt")
res: org.apache.spark.rdd.RDD[String] = hdfs://192.168.x.x:9000/hadoop/SparkDemo.txt MapPartitionsRDD[15] at textFile at <console>:24

scala> var line=res.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
line: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:25


scala> line.foreach(println)
(hive,1)
(mysql,1)
(hello,1)
(java,2)
(spark,2)
(hadoop,2)


//所有参数都可以显式指定
  def main(args: Array[String]): Unit = {

    val conf:SparkConf=new SparkConf().setAppName(s"${args(0)}")
      .setMaster("local[3]") //提交到Spark中,这个参数就不要了
    val sparkContext = new SparkContext(conf)

    val lines: RDD[String] = sparkContext.textFile(s"${args(1)}")

    val text=lines.flatMap(_.split("\s+")).map(_.toUpperCase).map((_,1)).reduceByKey((_+_))

    text.map(_.swap).sortByKey(ascending = false).map(_.swap).take(3).foreach(println)

  }
2、 Python

pyspark是Spark的Python实现,api基本和scala版本一样。

在bin目录下也可以直接使用pyspark进行python编程(需要有python环境)。

>>> text=sc.textFile('/usr/local/SparkDemo.txt')

>>> text.first()
'java hadoop spark'                                                             

>>> lines=text.flatMap(lambda line:line.split(' '))
>>> line=lines.map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b)

>>> line.collect()
[('java', 3), ('hadoop', 5), ('hive', 2), ('Spark', 2), ('Flink', 1), ('scala', 1), ('spark', 2), ('hello', 1), ('mysql', 1), ('SparkHadoopJava', 1)]
                       
>>> line.foreach(lambda t:print(t))
('spark', 2)
('hello', 1)
('mysql', 1)
('SparkHadoopJava', 1)
('java', 3)
('hadoop', 5)
('hive', 2)
('Spark', 2)
('Flink', 1)
('scala', 1)
>>> 


#写在文件中
from pyspark import SparkContext
from pyspark.sql.session import SparkSession


# conf = SparkConf().setAppName('test_parquet')
sc = SparkContext('local[6]', 'test')
# spark = SparkSession(sc)
# sc.setLogLevel("INFO")

text = sc.textFile(name ="wordcount.txt")
import re

#压扁、分割、聚合
word = text.flatMap(lambda x:re.split("\W+", x)).map(lambda x:(x, 1)).reduceByKey(lambda a, b: a + b)

#收集
top = word.collect()

print(top)

sc.stop()

3、Java

在Java中写wordCount,有Java8的函数式编程支持,写起来还好。

public static void main(String[] args) {

        SparkConf conf = new SparkConf();

        //wordCount local[3]  dir/wordcount.txt
        conf.setAppName(args[0]).setMaster(args[1]);

        //转换成Java上下文
        JavaSparkContext sC = new JavaSparkContext(conf);

        //读取文件
        JavaRDD<String> rdd = sC.textFile(args[2]);

        //压扁,分割,要返回迭代器
        JavaRDD<String> javaRDD = rdd.flatMap(lines -> Stream.of(lines.split("\W+")).iterator()).
                map(String::toUpperCase);//转大写


//        JavaPairRDD<String, Integer> mapRDD = javaRDD.mapToPair(new PairFunction<String, String, Integer>() {
//
//            @Override
//            public Tuple2<String, Integer> call(String s) throws Exception {
//                return new Tuple2<String, Integer>(s, 1);
//            }
//        });

        //转成元组,(word,1)的格式
        JavaPairRDD<String, Integer> mapRDD = javaRDD.mapToPair(w -> Tuple2.<String, Integer>apply(w, 1));

        //求和
        JavaPairRDD<String, Integer> reduceRDD = mapRDD.reduceByKey(Integer::sum);

        //交换、排序,取TopK
        List<Tuple2<String, Integer>> topK = reduceRDD.map(Tuple2::swap).sortBy(Tuple2::_1, false, 2).
                map(Tuple2::swap).take(3);

        //打印
        topK.forEach(System.out::println);

        sC.stop();

    }

二、Spark操作Hive

Spark操作hive:

1、将hive中conf目录下的hive-site.xml移动Spark的conf目录下。
2、spark执行命令中加入数据库驱动
	#如果报错,将jars换成--driver-class-path
	spark-shell --master local[2] --jars /usr/local/hive/lib/mysql-connector-java-8.0.20.jar
3、hdfs要启动,即使是Spark本地模式,但是hive的数据是在Hdfs中的。

spark-shell中使用Spark的api进行操作,pyspark的api一样。

#启动Spark-shell命令行,使用sparkSQL的api操作hive
spark-shell --master local[2] --driver-class-path /usr/local/hive/lib/mysql-connector-java-8.0.20.jar

scala> spark.sql("select count(name),name from sparkdemotable group by name").show()
+-----------+--------------------+                                              
|count(name)|                name|
+-----------+--------------------+
|          1|     SparkHadoopJava|
|          1|spark hadoop java...|
|          1|   java scala hadoop|
|          1|          hive mysql|
|          1|   hadoop Spark hive|
|          1|  Spark Flink hadoop|
|          2|   java hadoop spark|
+-----------+--------------------+

spark-sql中就是写sql,方式和hive中一样

#启动spark-sql命令行,使用sql的方式操作hive
>>spark-sql  --driver-class-path /usr/local/hive/lib/mysql-connector-java-8.0.20.jar

select * from sparkdemotable;

java hadoop spark
java hadoop spark
spark hadoop java hello
hive mysql
hadoop Spark hive
Spark Flink hadoop
java scala hadoop
SparkHadoopJava

#两种方式出来的数据格式都不一样,
#方式一好快啊,方式二慢了许多,底层应该都是一样的RDD
原文地址:https://www.cnblogs.com/cgl-dong/p/14034964.html