SparkSql 将CSV导入kudu
pom 依赖
<properties>
<spark.version>2.1.0</spark.version>
<scala.version>2.11</scala.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<maven.version.min>3.5.0</maven.version.min>
<scala.binary.version>2.11</scala.binary.version>
<scala.complete.version>${scala.binary.version}.4</scala.complete.version>
<spark-sql.version>2.1.0</spark-sql.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
<version>3.0.9</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.0.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.12</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.2.1</version>
<scope>compile</scope>
</dependency>
<!-- scalikejdbc_2.11 -->
<dependency>
<groupId>org.scalikejdbc</groupId>
<artifactId>scalikejdbc_2.11</artifactId>
<version>2.5.0</version>
<scope>compile</scope>
</dependency>
<!-- scalikejdbc-config_2.11 -->
<dependency>
<groupId>org.scalikejdbc</groupId>
<artifactId>scalikejdbc-config_2.11</artifactId>
<version>2.5.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.xes.bdc</groupId>
<artifactId>galaxy-engine-common</artifactId>
<version>1.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
<version>1.9.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<!--编译的文件目录-->
<sourceDirectory>src/main/java</sourceDirectory>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
<plugins>
<!-- build-helper-maven-plugin, 设置多个源文件夹 -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/java</source>
<source>src/main/scala</source>
<!-- 我们可以通过在这里添加多个source节点,来添加任意多个源文件夹 -->
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<!-- see http://davidb.github.com/scala-maven-plugin -->
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<scalaVersion>${scala.complete.version}</scalaVersion>
<fork>true</fork>
<encoding>UTF-8</encoding>
<args>
<!-- <arg>-make:transitive</arg> -->
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
<recompileMode>modified-only</recompileMode>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
scala 代码
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
*
* @description: TODO 将Csv文件导入Kudu
* @author: HaoWu
* @create: 2021年04月02日
*/
object LoadCsvToKudu {
private var kudu_host: String = _
private var kudu_tableName: String = _
private var input_path: String = _
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
kudu_host =args(0)
kudu_tableName = args(1)
input_path = args(2)
// 专用的读Csv
val df: DataFrame = spark.read
.option("header","true") //第一行作为字段属性
.csv(input_path)
//spark.sql("select count(*) from normal_detail_view").show()
val kuduContext = new KuduContext(kudu_host, spark.sparkContext)
// real_lp_id,name,workcode,lp_stasus,position,position,dept
kuduContext.upsertRows(df, kudu_tableName)
spark.stop()
}
}
启动脚本
csv_to_kudu.sh
#!/usr/bin/env bash
cd $(dirname $0)
# 用户基础配置
# kudu host
HOST="XXX:7051,XXX:7051,XXX:7051"
#kudu tableName
TABLENAME="impala::odsdb.XXX"
#csv文件路径 , 上传至hdfs
input_path="/user/wx_dp_hive/wuhao36/data/lp.csv"
# 默认配置
clazz=baopinke.LoadCsvToKudu
jarPath=/home/wx_dp_hive/wuhao/learn_poject/kudu_learning/csv-to-kudu.jar
# 提交任务
BASE_SPARK_SUBMIT=/usr/bin/spark2-submit
KEY_TAB=/home/wx_dp_hive/wx_dp_hive.keytab
USER=wx_dp_hive
#--master yarn --deploy-mode cluster
$BASE_SPARK_SUBMIT
--principal $USER --keytab $KEY_TAB --queue root.wangxiao.dp
--master yarn --deploy-mode cluster
--driver-memory 8G --executor-memory 16G
--executor-cores 2 --num-executors 4
--conf spark.dynamicAllocation.enabled=false
--conf spark.driver.allowMultipleContexts=true
--class $clazz $jarPath
$HOST
$TABLENAME
$input_path
注意:
1.需要将csv上传至hdfs,不然在yarn模式下文件找不到。
2.要设置csv的第一行为字段属性。