Spark(十)【RDD的读取和保存】

一.文件类型

1.Text文件

读写

读取
scala> val hdfsFile = sc.textFile("hdfs://hadoop102:9000/fruit.txt")
hdfsFile: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/fruit.txt MapPartitionsRDD[21] at textFile at <console>:24
保存
scala> hdfsFile.saveAsTextFile("/fruitOut")

2.Json文件

使用RDD读取JSON文件处理很复杂,同时SparkSQL集成了很好的处理JSON文件的方式,所以应用中多是采用SparkSQL处理JSON文件。

(1)导入解析json所需的包
scala> import scala.util.parsing.json.JSON
(2)上传json文件到HDFS
[atguigu@hadoop102 spark]$ hadoop fs -put ./examples/src/main/resources/people.json /
(3)读取文件
scala> val json = sc.textFile("/people.json")
json: org.apache.spark.rdd.RDD[String] = /people.json MapPartitionsRDD[8] at textFile at <console>:24
(4)解析json数据
scala> val result  = json.map(JSON.parseFull)
result: org.apache.spark.rdd.RDD[Option[Any]] = MapPartitionsRDD[10] at map at <console>:27

3.对象文件

对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFilek,v 函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用saveAsObjectFile() 实现对对象文件的输出。因为是序列化所以要指定类型

读写

(1)创建一个RDD
scala> val rdd = sc.parallelize(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at <console>:24
(2)将RDD保存为Object文件
scala> rdd.saveAsObjectFile("file:///opt/module/spark/objectFile")
(3)查看该文件
[hadoop@hadoop102 objectFile]$ pwd
/opt/module/spark/objectFile

[hadoop@hadoop102 objectFile]$ ll
总用量 8
-rw-r--r-- 1 atguigu atguigu 142 10月  9 10:37 part-00000
-rw-r--r-- 1 atguigu atguigu 142 10月  9 10:37 part-00001
-rw-r--r-- 1 atguigu atguigu   0 10月  9 10:37 _SUCCESS

[hadoop@hadoop102 objectFile]$ cat part-00000 
SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableW@`l
(4)读取Object文件
scala> val objFile = sc.objectFile[Int]("file:///opt/module/spark/objectFile")
objFile: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at objectFile at <console>:24
(5)打印读取后的Sequence文件
scala> objFile.collect
res19: Array[Int] = Array(1, 2, 3, 4)

4.Sequence文件

很少用了。。

注意:SequenceFile文件只针对PairRDD
(1)创建一个RDD
scala> val rdd = sc.parallelize(Array((1,2),(3,4),(5,6)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:24
(2)将RDD保存为Sequence文件
scala> rdd.saveAsSequenceFile("file:///opt/module/spark/seqFile")
(3)查看该文件
[hadoop@hadoop102 seqFile]$ pwd
/opt/module/spark/seqFile

[hadoop@hadoop102 seqFile]$ ll
总用量 8
-rw-r--r-- 1 atguigu atguigu 108 10月  9 10:29 part-00000
-rw-r--r-- 1 atguigu atguigu 124 10月  9 10:29 part-00001
-rw-r--r-- 1 atguigu atguigu   0 10月  9 10:29 _SUCCESS
[hadoop@hadoop102 seqFile]$ cat part-00000
SEQ org.apache.hadoop.io.IntWritable org.apache.hadoop.io.IntWritableط
(4)读取Sequence文件
scala> val seq = sc.sequenceFile[Int,Int]("file:///opt/module/spark/seqFile")
seq: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[18] at sequenceFile at <console>:24
(5)打印读取后的Sequence文件
scala> seq.collect
res14: Array[(Int, Int)] = Array((1,2), (3,4), (5,6))

二.文件系统

1. MySQL

依赖

  <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.27</version>
  </dependency>

读取

import java.sql.DriverManager
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @description: 从Mysql读取数据
 * @author: HaoWu
 * @create: 2020年08月05日
 */
object MySqlReadWriteTest {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("JdbcApp")
    val sc: SparkContext = new SparkContext(sparkConf)
    val rdd: JdbcRDD[(Int, String)] = new JdbcRDD(
      sc,
      () => {
        Class.forName("com.mysql.jdbc.Driver").newInstance()
        DriverManager.getConnection("jdbc:mysql://hadoop102:3306/azkaban", "root", "root")
      },
      "select * from project_files where project_id >= ? and project_id <= ?;",
      1,
      4,
      1,
      //返回值是个数组,已经将JDBC返回的结果处理过。
      r => (r.getInt(1), r.getString(2)))
    println(rdd.count())
    rdd.foreach(println(_))
    sc.stop()
  }
}

保存

import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @description: 向Mysql中插入数据
 * @author: HaoWu
 * @create: 2020年08月05日
 */
object MySqlReadWriteTest {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("JdbcApp")
    val sc: SparkContext = new SparkContext(sparkConf)
    val list = List((1, 9), (1, 10))
    val rdd: RDD[(Int, Int)] = sc.makeRDD(list)
    //使用foreachPartition效率更高,批量,不用频繁创建mysql连接
    rdd.foreachPartition(iter => {
      // 创建Connection
      val con: Connection = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/azkaban", "root", "root")
      //准备sql
      val sql="insert into project_files(project_id,version) values(?,?)"

      //PreapredStatement
      val ps: PreparedStatement = con.prepareStatement(sql)
      //将批量数据依次插入
      iter.foreach{
        case(project_id,version) => {
          //插入int类型
          ps.setInt(1,project_id)
          ps.setInt(2,version)
          //执行sql
          ps.executeUpdate()
        }
      }
      ps.close()
      con.close()
    })
    sc.stop()
  }
}

2. Hbase

依赖

   <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-server</artifactId>
       <version>2.0.0</version>
   </dependency>

   <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-client</artifactId>
       <version>2.0.0</version>
   </dependency>

   <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-mapreduce</artifactId>
       <version>2.0.0</version>
   </dependency>

将hbase的配置文件hbase-site.xml,放到resource目录,保留连接zookeeper

	<property>
		<name>hbase.zookeeper.quorum</name>
		<value>hadoop102,hadoop103,hadoop104</value>
	</property>

读取


import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.spark.{SparkConf, SparkContext}
/**
 * @description: Hbase的读取
 * @author: HaoWu
 * @create: 2020年08月05日
 */
object HbaseReadWriterTest {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("RDDTest").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    // 创建连接   默认读取hadoop中的配置文件,和hbase中的配置文件  默认使用的还是TextInputFormat
    val conf: Configuration = HBaseConfiguration.create()
    // 设置当前要读取哪个表
    conf.set(TableInputFormat.INPUT_TABLE, "bigdata:user")
    //核心创建RDD
    val rdd = new NewHadoopRDD[ImmutableBytesWritable, Result](sc,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result],
      conf)
    rdd.foreach {
      case (rowKey, result) => {
        // CellUtil  : 取出Cell某个属性   Bytes: 将Java中的数据类型 和byte[]互转
        // 获取一条数据的所有cell
        val cells: Array[Cell] = result.rawCells()
        for (cell <- cells) {
          println(Bytes.toString(CellUtil.cloneRow(cell)) + " " +
            Bytes.toString(CellUtil.cloneFamily(cell)) + ":" + Bytes.toString(CellUtil.cloneQualifier(cell)) + " " +
            Bytes.toString(CellUtil.cloneValue(cell)))
        }
      }
    }
  }
}

写入


import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd. RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
 * @description: Hbase的保存
 * @author: HaoWu
 * @create: 2020年08月05日
 */
object HbaseReadWriterTest {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("RDDTest").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    // 创建连接   默认读取hadoop中的配置文件,和hbase中的配置文件  默认使用的还是TextInputFormat
    val conf: Configuration = HBaseConfiguration.create()
    // 设置当前要写出到哪个表
    conf.set(TableOutputFormat.OUTPUT_TABLE, "bigdata:user")

    //在Conf中设置各种参数
    val job: Job = Job.getInstance(conf)

    //设置输出格式
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    // 设置输出的key,value的类型
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Put])

    // 用list封装数据(rowkey,(列簇,列,值))
    val list = List(("1005", ("info2", "age", "20")), ("1005",( "info2", "name", "marry")), ("1006", ("info2", "age", "21")))

    val rdd: RDD[(String, (String, String, String))] = sc.makeRDD(list, 2)

    // 使用spark将数据封装为输出的key-value类型
    val rdd2: RDD[(ImmutableBytesWritable, Put)] = rdd.map {
      case (rowkey, (cf, cq, v)) => {
        //封装rowkey
        val key = new ImmutableBytesWritable()
        key.set(Bytes.toBytes(rowkey))
        //封装put
        val value = new Put(Bytes.toBytes(rowkey))
        value.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cq), Bytes.toBytes(v))
        (key, value)
      }
    }

    //之前设置的配置传入
    rdd2.saveAsNewAPIHadoopDataset(job.getConfiguration)
  }
}

踩的坑

在跑读取hbase数据的时候发现程序报错:

原因:pom的hbase依赖包必须放置spark-core包后面,不然就报这个错误。

java.lang.ExceptionInInitializerError
	at org.apache.spark.SparkContext.withScope(SparkContext.scala:751)
	at org.apache.spark.SparkContext.textFile(SparkContext.scala:882)
	at com.spark.rdd.RDDTest.testMap(RDDTest.scala:62)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Scala module 2.10.0 requires Jackson Databind version >= 2.10.0 and < 2.11.0
	at com.fasterxml.jackson.module.scala.JacksonModule.setupModule(JacksonModule.scala:61)
	at com.fasterxml.jackson.module.scala.JacksonModule.setupModule$(JacksonModule.scala:46)
	at com.fasterxml.jackson.module.scala.DefaultScalaModule.setupModule(DefaultScalaModule.scala:17)
	at com.fasterxml.jackson.databind.ObjectMapper.registerModule(ObjectMapper.java:751)
	at org.apache.spark.rdd.RDDOperationScope$.<init>(RDDOperationScope.scala:82)
	at org.apache.spark.rdd.RDDOperationScope$.<clinit>(RDDOperationScope.scala)
	... 27 more
原文地址:https://www.cnblogs.com/wh984763176/p/13444602.html