Spark开发-SparkSQL读写数据

SparkSQL数据读写

 DataFrameReader
 DataFrameWriter
   DataFrameReader 对应的组件 SCHEMA  OPTION   FORMAT
    DataFrameReader 有两种访问方式, 
	      一种是使用 load 方法加载, 使用 format 指定加载格式, 
	       还有一种是使用封装方法, 类似 csv, json, jdbc 等

        //.第一种形式 READ + FORMAT +load读取 
           spark.read
             .format("csv")
             .option("header",value=true)
             .option("inferSchema",value = true)
             .load("dataset/1231.csv")
        
           //2.第二种形式 使用具体文件类型读取 READ 
           spark.read
             .option("header",value=true)
             .option("inferSchema",value = true)
             .csv("dataset/1231.csv")
	DataFrameWriter 也有两种使用方式, 一种是使用 format 配合 save, 
	   默认的 format 是 Parquet
	  还有一种是使用封装方法, 例如 csv, json, saveAsTable 等
	    def parquet(path: String): Unit = { format("parquet").save(path)}
	    def csv    (path: String): Unit = { format("csv").save(path)}

DataFrameWriter

增量操作

使用spark做增量操作的时候, 
   insertInto 和 mode(SaveMode.Append).saveAsTable()

insertInto 
    insertInto 使用基于位置
     It requires that the schema of the `DataFrame` is the same as the schema of the table.
     Unlike `saveAsTable`, `insertInto` ignores the column names and just uses position-based resolution
    
saveAsTable 
     基于列名 column_names-based
      有不同的模式,
	  如果是  SaveMode.Overwrite
       the schema of the `DataFrame` does not need to be the same as that of the existing table.
      如果是 `Append`, 
     if there is an existing table, we will use the format and options OF  the existing table. 
	 The column order in the schema of the `DataFrame` doesn't need to be same as that of the existing table. 
	 Unlike `insertInto`,
     `saveAsTable` will use the column names to find the correct column positions
  ####覆盖
   report_info.write.insertInto(tableName="dwd.t_dwd_report_info_d",overwrite=True)
   DataFrameWriter 可以将数据保存到 Hive 表中, 所以也可以指定分区和分桶信息
	 
对存储的
    SCHEMA
	文件的覆盖和追加

Spark的读写

  Spark can create distributed datasets from any storage source supported by Hadoop, 
        including your local file system, HDFS, Cassandra, HBase, Amazon S3, 
		        etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
写
  saveAsTextFile  saveAsSequenceFile  saveAsSequenceFile

存储问题

  存储的载体
  存储的数据格式
  存储所使用的命令或函数
  存数遇到的问题
    小文件问题
     spark.default.parallelism      在处理RDD时才会起作用,对SparkSql无效。
     spark.sql.shuffle.partitions   则是对sparks SQL专用的设置。

源码

org.apache.spark.sql
  Interface used to write a [[Dataset]] to external storage systems (e.g. file systems,key-value stores, etc). 
  Use `Dataset.write` to access this.
  使用了  // Builder pattern config options 构建者模式

本地数据写入到Hive表

在Hive中创建表,从外部数据源以txt的形式导入数据
 方案一: 利用系统ftp上传数据,并使用系统的数据交换任务将任务写入到Hive表中
 方案二: 利用程序 WriteToHive 将数据打包到程序中,程序中读取数据,然后解析写入到Hive表中

01.方案一步骤:

01.查看外部给的数据的情况,确认数据格式正确等情况
  做一些必要的查询。修正以及确认工作
02. 创建表
03. 修改数据文件的编码格式以及文件的类型,放到系统的位置-上传到服务器的位置要记录下来
04. 创建或者使用已存在的System_任务,导入数据任务配置和执行
05. 任务执行成功后,确认数据表的情况

02.具体过程

04任务配置和执行
    任务流:  数据导出专用
    任务  System_数据_ting_no_repeat_f
    需要完成的任务:
        001.源数据源配置
        	  选择数据源: ftp
        	  基本信息:  文件路径   /test  文件名   test_import_date_info_td.txt
        	高级配置:
        	  编码 UTF-8
        002.目标数据源配置
        		 数据源类型:Hive
        		 数据源名称:
        		 表名:  expbase.import_date_info_td       
        	导入配置:
    		SQL: use expbase; truncate table  import_date_info_td ;
    		Hive文件
    		  行分割符 
  列分隔符 	
05.任务执行成功后  -查看数据量以及示例数据情况
  select count(*) from expbase.import_date_info_td
  select * from expbase.import_date_info_td

03.注意事项

 01.数据确认工作:
  数据有无重复数据
  Excel的数据中的空格,tab的个数,引号等符号问题
  异常数据
  最终确认数据量,以及示例数据情况
 02.创建表的形式
  ROW FORMAT DELIMITED  FIELDS TERMINATED BY '	' 
  STORED AS INPUTFORMAT 
    'org.apache.hadoop.mapred.TextInputFormat' 
  OUTPUTFORMAT 
    'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
  ;
 03.修改文件的编码方式和格式,并上传到Unix系统中
    编码 UTF-8
    文件类型 Unix
 04.任务的配置
  配置工作
         Unix系统的文件 行分割符 

    	 数据文件的列分隔符保持一致,在这里是  	
 05.确认工作
    确认数据量是否准确
	查看示例数据各个字段是否对应,字段的值是否正确

参考

	https://github.com/apache/spark/pull/13013
	Spark写入hive表时saveAsTable和insertInto的区别 https://blog.csdn.net/huihuixia123/article/details/107658299
	Spark(三)-- SparkSQL扩展(数据读写) -- 读写 Parquet、Json 格式文件(二) https://blog.csdn.net/qq_18800463/article/details/101421490
	Spark SQL, DataFrames and Datasets Guide http://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html
	RDD Programming Guide http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-programming-guide
原文地址:https://www.cnblogs.com/ytwang/p/14251300.html