Spark SQL入门到实战之(8)数据倾斜优化

1.自定义UDF

1、依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>FlinkUdf</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>test</name>
    <!-- FIXME change it to the project's website -->
    <url>http://www.example.com</url>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
        <flink.version>1.11.1</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <scala.version>2.11.0</scala.version>
        <hadoop.version>3.0.0</hadoop.version>
        <hive.version>3.0.0</hive.version>
        <hbase.version>2.3.0</hbase.version>
        <spark.version>3.0.0</spark.version>
        <jedis.version>3.0.0</jedis.version>
    </properties>
    <dependencies>
        <!--        0、基本语言-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!--        1、Flink modules-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>scala-library</artifactId>
                    <groupId>org.scala-lang</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- 2、CLI dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--        3、alibaba的json依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
            <exclusions>
                <exclusion>
                    <artifactId>javassist</artifactId>
                    <groupId>org.javassist</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>scala-parser-combinators_2.11</artifactId>
                    <groupId>org.scala-lang.modules</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>snappy-java</artifactId>
                    <groupId>org.xerial.snappy</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--        4、kafka依赖-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.3</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>kafka-clients</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--        5、数据库依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--        ①、excel依赖包-->
        <dependency>
            <groupId>org.apache.poi</groupId>
            <artifactId>poi</artifactId>
            <version>3.14</version>
        </dependency>
        <dependency>
            <groupId>org.apache.poi</groupId>
            <artifactId>poi-ooxml</artifactId>
            <version>3.14</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>1.10.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hbase_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.12</artifactId>
            <version>1.10.2</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.37</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1.5</version>
            <exclusions>
                <exclusion>
                    <artifactId>force-shading</artifactId>
                    <groupId>org.apache.flink</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.9.5</version>
        </dependency>
        <dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
            <version>5.0.5.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.4.Final</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>${jedis.version}</version>
        </dependency>
        <!-- Add connector dependencies here. They must be in the default scope (compile). -->
        <!-- Example:

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>
        -->
        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <!--        5、log日志依赖-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>net.sf.json-lib</groupId>
            <artifactId>json-lib</artifactId>
            <version>2.4</version>
            <classifier>jdk15</classifier>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <!--        6、离线数仓hive依赖-->
        <!--        ①、hadoop依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <!--        ②、hive依赖-->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>${hive.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>${hive.version}</version>
        </dependency>
        <!--        ③、hbase依赖-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <!--        7、spark依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>
    <repositories>
        <!--    <repository>
              <id>nexus-aliyun</id>
              <name>Nexus aliyun</name>
              <layout>default</layout>
              <url>http://maven.aliyun.com/nexus/content/groups/public</url>
              <snapshots>
                <enabled>false</enabled>
              </snapshots>
              <releases>
                <enabled>true</enabled>
              </releases>
            </repository>-->
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <build>
        <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
            <plugins>
                <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
                <plugin>
                    <artifactId>maven-clean-plugin</artifactId>
                    <version>3.1.0</version>
                </plugin>
                <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
                <plugin>
                    <artifactId>maven-resources-plugin</artifactId>
                    <version>3.0.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.0</version>
                </plugin>
                <plugin>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.22.1</version>
                </plugin>
                <plugin>
                    <artifactId>maven-jar-plugin</artifactId>
                    <version>3.0.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-install-plugin</artifactId>
                    <version>2.5.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-deploy-plugin</artifactId>
                    <version>2.8.2</version>
                </plugin>
                <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
                <plugin>
                    <artifactId>maven-site-plugin</artifactId>
                    <version>3.7.1</version>
                </plugin>
                <plugin>
                    <artifactId>maven-project-info-reports-plugin</artifactId>
                    <version>3.0.0</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

2、添加随机前缀

package Spark.Udf;
import org.apache.spark.sql.api.java.UDF2;
import java.util.Random;

/**
 * @program: FlinkUdf
 * @description: 给字段添加随机前缀
 * @author: BigCodeKing
 * @create: 2021-02-01 17:48
 **/
public class RandomPrefixUDF implements UDF2<String, Integer, String> {
    private static final long serialVersionUID = 1L;
    @Override
    public String call(String val, Integer num) throws Exception {
        Random random=new Random();
        int randNum = random.nextInt(num);
        return randNum+"_"+val;
    }
}

3、去除随机前缀

package Spark.Udf;

import org.apache.spark.sql.api.java.UDF1;

/**
 * @program: FlinkUdf
 * @description: 去除随机前缀
 * @author: BigCodeKing
 * @create: 2021-02-01 17:51
 **/
public class RemoveRandomPrefixUDF implements UDF1<String,String> {
    private static final long serialVersionUID = 1L;
    @Override
    public String call(String val) throws Exception {
        String[] split = val.split("_");
        return split[1];
    }
}

2.数据流程

不使用随机前缀的流程

A 1
A 1
A 1
A 1
B 1

结果:

A 4
B 1

使用随机前缀的流程

A 1
A 1
A 1
A 1
B 1

--加随机前缀

0_A 1
0_A 1
1_A 1
1_A 1
0_B 1

--第一次GroupBy

0_A 2
1_A 2
0_B 1

--去掉随机前缀

A 2
A 2
B 1

--第二次GroupBy

A 4
B 1

3.Spark程序

/**
  * 通过StructType直接指定Schema,转换为DataFrame
  */
object TestUDF {
  def main(args: Array[String]): Unit = {
    val spark =
      SparkSession.builder()
        .appName("TestUDF")
        .master("local")
        .getOrCreate()

    val sc = spark.sparkContext
    sc.setLogLevel("WARN")

    spark.udf.register("random_prefix", new RandomPrefixUDF(), DataTypes.StringType)
    spark.udf.register("remove_random_prefix", new RemoveRandomPrefixUDF(), DataTypes.StringType)

    val personRDD =
      sc.parallelize(List("A", "A", "A", "A", "B"), 1)
        .map(x => (x, 1))
        .map(x => Row(x._1, x._2.toInt))

    // 创建Schema
    val schema: StructType = StructType(Seq(
      StructField("product", StringType, false),
      StructField("click", IntegerType, false)
    ))

    val personDF = spark.createDataFrame(personRDD, schema)

    //SQL语法操作
    personDF.createOrReplaceTempView("t_product_click")

    // 加随机前缀
    val sql1 =
      s"""
         |select
         |  random_prefix(product, 2) product,
         |  click
         |from
         |  t_product_click
       """.stripMargin

    // 分组求和
    val sql2 =
      s"""
         |select
         |  product,
         |  sum(click) click
         |from
         |  (
         |    select
         |      random_prefix(product, 2) product,
         |      click
         |    from
         |      t_product_click
         |  ) t1
         |group by
         |  product
       """.stripMargin

    // 去掉随机前缀
    val sql3 =
      s"""
         |select
         |  remove_random_prefix(product) product,
         |  click
         |from
         |  (
         |    select
         |      product,
         |      sum(click) click
         |    from
         |      (
         |        select
         |          random_prefix(product, 2) product,
         |          click
         |        from
         |          t_product_click
         |      ) t1
         |    group by
         |      product
         |  ) t2
         |
       """.stripMargin

    // 分组求和
    val sql4 =
      s"""
         |select
         |  product,
         |  sum(click) click
         |from
         |  (
         |    select
         |      remove_random_prefix(product) product,
         |      click
         |    from
         |      (
         |        select
         |          product,
         |          sum(click) click
         |        from
         |          (
         |            select
         |              random_prefix(product, 2) product,
         |              click
         |            from
         |              t_product_click
         |          ) t1
         |        group by
         |          product
         |      ) t2
         |  ) t3
         |group by
         |  product
       """.stripMargin

    spark.sql(sql1).show()
    spark.sql(sql2).show()
    spark.sql(sql3).show()
    spark.sql(sql4).show()
    sc.stop()
  }
}

4、sparksql程序

select
product,
sum(click) click
from
(
select
remove_random_prefix(product) product,
click
from
(
select
product,
sum(click) click
from
(
select
random_prefix(product, 2) product,
click
from t_product_click
) t1
group by
product
) t2
) t3
group by
product

执行结果:

+-------+-----+
|product|click|
+-------+-----+
|    0_A|    1|
|    1_A|    1|
|    0_A|    1|
|    0_A|    1|
|    1_B|    1|
+-------+-----+

+-------+-----+
|product|click|
+-------+-----+
|    1_A|    3|
|    1_B|    1|
|    0_A|    1|
+-------+-----+

+-------+-----+
|product|click|
+-------+-----+
|      A|    1|
|      B|    1|
|      A|    3|
+-------+-----+

+-------+-----+
|product|click|
+-------+-----+
|      B|    1|
|      A|    4|
+-------+-----+

 

作者:大码王

-------------------------------------------

个性签名:独学而无友,则孤陋而寡闻。做一个灵魂有趣的人!

如果觉得这篇文章对你有小小的帮助的话,记得在右下角点个“推荐”哦,博主在此感谢!

万水千山总是情,打赏一分行不行,所以如果你心情还比较高兴,也是可以扫码打赏博主,哈哈哈(っ•?ω•?)っ???!

原文地址:https://www.cnblogs.com/huanghanyu/p/14358163.html