Spark(二十)【SparkSQL将CSV导入Kudu】

SparkSql 将CSV导入kudu

pom 依赖

 <properties>
        <spark.version>2.1.0</spark.version>
        <scala.version>2.11</scala.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <maven.version.min>3.5.0</maven.version.min>
        <scala.binary.version>2.11</scala.binary.version>
        <scala.complete.version>${scala.binary.version}.4</scala.complete.version>
        <spark-sql.version>2.1.0</spark-sql.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.1.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>compile</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.codehaus.janino</groupId>
                    <artifactId>commons-compiler</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>commons-compiler</artifactId>
            <version>3.0.9</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.0.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.12</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.12</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>1.2.1</version>
            <scope>compile</scope>
        </dependency>
        <!-- scalikejdbc_2.11 -->
        <dependency>
            <groupId>org.scalikejdbc</groupId>
            <artifactId>scalikejdbc_2.11</artifactId>
            <version>2.5.0</version>
            <scope>compile</scope>
        </dependency>
        <!-- scalikejdbc-config_2.11 -->
        <dependency>
            <groupId>org.scalikejdbc</groupId>
            <artifactId>scalikejdbc-config_2.11</artifactId>
            <version>2.5.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.xes.bdc</groupId>
            <artifactId>galaxy-engine-common</artifactId>
            <version>1.0-SNAPSHOT</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.kudu</groupId>
                    <artifactId>kudu-client</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-spark2_2.11</artifactId>
            <version>1.9.0</version>
            <scope>compile</scope>
        </dependency>

    </dependencies>

    <build>
        <!--编译的文件目录-->
        <sourceDirectory>src/main/java</sourceDirectory>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
        </resources>
        <plugins>
            <!-- build-helper-maven-plugin, 设置多个源文件夹 -->
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>build-helper-maven-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <id>add-source</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>add-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>src/main/java</source>
                                <source>src/main/scala</source>
                                <!-- 我们可以通过在这里添加多个source节点,来添加任意多个源文件夹 -->
                            </sources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <!-- see http://davidb.github.com/scala-maven-plugin -->
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <scalaVersion>${scala.complete.version}</scalaVersion>
                            <fork>true</fork>
                            <encoding>UTF-8</encoding>
                            <args>
                                <!-- <arg>-make:transitive</arg> -->
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                            <recompileMode>modified-only</recompileMode>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

scala 代码

import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 *
 * @description: TODO 将Csv文件导入Kudu
 * @author: HaoWu
 * @create: 2021年04月02日
 */
object LoadCsvToKudu {
  private var kudu_host: String = _
  private var kudu_tableName: String = _
  private var input_path: String = _

  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf()
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

    kudu_host =args(0)
    kudu_tableName = args(1)
    input_path = args(2)

    // 专用的读Csv
    val df: DataFrame = spark.read
      .option("header","true") //第一行作为字段属性
      .csv(input_path)

    //spark.sql("select count(*) from normal_detail_view").show()

    val kuduContext = new KuduContext(kudu_host, spark.sparkContext)

    // real_lp_id,name,workcode,lp_stasus,position,position,dept
    kuduContext.upsertRows(df, kudu_tableName)

    spark.stop()
  }
}

启动脚本

csv_to_kudu.sh

#!/usr/bin/env bash

cd $(dirname $0)

# 用户基础配置
# kudu host
HOST="XXX:7051,XXX:7051,XXX:7051"
#kudu tableName
TABLENAME="impala::odsdb.XXX"
#csv文件路径 , 上传至hdfs
input_path="/user/wx_dp_hive/wuhao36/data/lp.csv"

# 默认配置
clazz=baopinke.LoadCsvToKudu
jarPath=/home/wx_dp_hive/wuhao/learn_poject/kudu_learning/csv-to-kudu.jar
# 提交任务
BASE_SPARK_SUBMIT=/usr/bin/spark2-submit
KEY_TAB=/home/wx_dp_hive/wx_dp_hive.keytab
USER=wx_dp_hive

#--master yarn --deploy-mode cluster 

$BASE_SPARK_SUBMIT 
--principal  $USER --keytab $KEY_TAB  --queue root.wangxiao.dp 
--master yarn --deploy-mode cluster 
--driver-memory 8G --executor-memory 16G 
--executor-cores 2 --num-executors  4 
--conf spark.dynamicAllocation.enabled=false 
--conf spark.driver.allowMultipleContexts=true 
--class $clazz $jarPath 
$HOST 
$TABLENAME 
$input_path

注意

1.需要将csv上传至hdfs,不然在yarn模式下文件找不到。

2.要设置csv的第一行为字段属性。

原文地址:https://www.cnblogs.com/wh984763176/p/14661685.html