项目实战 从 0 到 1 学习之Flink (29)UDF实现

1、pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project
    xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>FlinkUdf</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>test</name>
    <!-- FIXME change it to the project's website -->
    <url>http://www.example.com</url>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
        <flink.version>1.11.1</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <scala.version>2.11.0</scala.version>
        <hadoop.version>3.0.0</hadoop.version>
        <hive.version>3.0.0</hive.version>
        <hbase.version>2.3.0</hbase.version>
        <spark.version>3.0.0</spark.version>
        <jedis.version>3.0.0</jedis.version>
    </properties>
    <dependencies>
        <!--        0、基本语言-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!--        1、Flink modules-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>scala-library</artifactId>
                    <groupId>org.scala-lang</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- 2、CLI dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--        3、alibaba的json依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
            <exclusions>
                <exclusion>
                    <artifactId>javassist</artifactId>
                    <groupId>org.javassist</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>scala-parser-combinators_2.11</artifactId>
                    <groupId>org.scala-lang.modules</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>snappy-java</artifactId>
                    <groupId>org.xerial.snappy</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--        4、kafka依赖-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.3</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>kafka-clients</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--        5、数据库依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>1.10.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hbase_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.12</artifactId>
            <version>1.10.2</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.37</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1.5</version>
            <exclusions>
                <exclusion>
                    <artifactId>force-shading</artifactId>
                    <groupId>org.apache.flink</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.9.5</version>
        </dependency>
        <dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
            <version>5.0.5.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.4.Final</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>${jedis.version}</version>
        </dependency>
        <!-- Add connector dependencies here. They must be in the default scope (compile). -->
        <!-- Example:

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>
        -->
        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <!--        5、log日志依赖-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>net.sf.json-lib</groupId>
            <artifactId>json-lib</artifactId>
            <version>2.4</version>
            <classifier>jdk15</classifier>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <!--        6、离线数仓hive依赖-->
        <!--        ①、hadoop依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <!--        ②、hive依赖-->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>${hive.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>${hive.version}</version>
        </dependency>
        <!--        ③、hbase依赖-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <!--        7、spark依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>
    <repositories>
        <!--    <repository><id>nexus-aliyun</id><name>Nexus aliyun</name><layout>default</layout><url>http://maven.aliyun.com/nexus/content/groups/public</url><snapshots><enabled>false</enabled></snapshots><releases><enabled>true</enabled></releases></repository>-->
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>
    <build>
        <pluginManagement>
            <!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
            <plugins>
                <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
                <plugin>
                    <artifactId>maven-clean-plugin</artifactId>
                    <version>3.1.0</version>
                </plugin>
                <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
                <plugin>
                    <artifactId>maven-resources-plugin</artifactId>
                    <version>3.0.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.0</version>
                </plugin>
                <plugin>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.22.1</version>
                </plugin>
                <plugin>
                    <artifactId>maven-jar-plugin</artifactId>
                    <version>3.0.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-install-plugin</artifactId>
                    <version>2.5.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-deploy-plugin</artifactId>
                    <version>2.8.2</version>
                </plugin>
                <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
                <plugin>
                    <artifactId>maven-site-plugin</artifactId>
                    <version>3.7.1</version>
                </plugin>
                <plugin>
                    <artifactId>maven-project-info-reports-plugin</artifactId>
                    <version>3.0.0</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

2、数组转字符串

1、方法①

package Udf;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
//方法1:数组按照指定的分隔符转成字符串
public class ArrToString extends ScalarFunction {
    private static final  long serialVersionUID=1L;
    public ArrToString() {
    }

    @Override
    public void open(FunctionContext context) throws Exception {
        super.open(context);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    public String eval(String[] bodyRow, String split) {
        try {
            if(bodyRow !=null && bodyRow.length>0)
            {
                StringBuilder stringBuilder=new StringBuilder();
                for(int i=0;i<bodyRow.length-1;i++)
                {
                    stringBuilder.append(bodyRow[i]).append(split);
                }
                stringBuilder.append(bodyRow[bodyRow.length-1]);
                return stringBuilder.toString();
            }
            else
            {
                return null;
            }
        }
        catch (Exception ex)
        {
            return ex.getMessage();
        }
    }
}

2、方法②

package Udf;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;

//方法3:数组按照指定的分隔符转成字符串
public class ArrToString2 extends ScalarFunction {
    private static final  long serialVersionUID=1L;
    public ArrToString2() {
    }

    @Override
    public void open(FunctionContext context) throws Exception {
        super.open(context);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }
    public String eval(String[] arr, String split)
    {
        try {
           if(arr !=null && arr.length>0)
           {
                return ArrayUtils.toString(arr,split);
           }
           else
           {
               return null;
           }
        }
        catch(Exception ex)
        {
            return ex.getMessage();
        }
    }
}

3、方法③

package Udf;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
import java.util.Arrays;
//方法4:数组转成数组字符串
public class ArrToString3 extends ScalarFunction {
    private static final  long serialVersionUID=1L;
    public ArrToString3() {
    }

    @Override
    public void open(FunctionContext context) throws Exception {
        super.open(context);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    public String eval(String[] arr)
    {
        try {
           if(arr !=null && arr.length>0)
           {
              return Arrays.toString(arr);
           }
           else
           {
               return null;
           }
        }
        catch(Exception ex)
        {
            return ex.getMessage();
        }
    }
}

3、字符串转数组

package Udf;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
//方法1:字符串按照指定的分隔符转成数组
public class StringToArr extends ScalarFunction {
    private static final  long serialVersionUID=1L;
    public StringToArr() {
    }

    @Override
    public void open(FunctionContext context) throws Exception {
        super.open(context);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    public String[] eval(String arr, String split)
    {
        try {
           if(arr !=null)
           {
                return arr.split(split);
           }
           else
           {
               return null;
           }
        }
        catch(Exception ex)
        {
            return null;
        }
    }
}

 4、字符串转map

package Udf;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
import java.util.HashMap;
import java.util.Map;
//方法1:字符串按照指定的分隔符转成map类型
public class StringToMap extends ScalarFunction {
    private static final  long serialVersionUID=1L;
    public StringToMap() {
    }

    @Override
    public void open(FunctionContext context) throws Exception {
        super.open(context);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    public Map eval(String str, String split, String split1)
    {
        try {
           if(str !=null)
           {
                String [] arr=str.split(split);
               Map<String, Object> map = new HashMap<>();
               for (int i=0;i<arr.length;i++)
               {
                   String [] arr1=arr[i].split(split1);
                   map.put(arr1[0],arr1[1]);
                  return map;
               }
           }
           else
           {
              return null;
           }
        }
        catch(Exception ex)
        {
            ex.printStackTrace();
        }
        return null;
    }
}

import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
 
public class StringUtilsTest {
 
    @Test
    public void testDataToMap() {
        String data = "certificatetype=0&certificateno=220182&depositacct=622848";
        Map map = new HashMap();
 
        if (null != data) {
            String[] param = data.split("&");
            for (int i = 0; i < param.length; i++) {
                int index = param[i].indexOf('=');
                map.put(param[i].substring(0,index), param[i].substring((index + 1)));
            }
        }
 
        System.out.println(map);
 
        System.out.println("----------------分割线---------------");
        Map result = new HashMap();
        String[] params = data.split("\&");
        for (String entry : params) {
            if (entry.contains("=")) {
                String[] sub = entry.split("\=");
                if (sub.length > 1) {
                    result.put(sub[0], sub[1]);
                } else {
                    result.put(sub[0], "");
                }
            }
        }
        System.out.println(result);
    }
 
}




String str1 = "d3fe1e186e41475ea965f4722f5488a8";
                String str2 = "5093";
                String str3 = "公共设施";

                String str = str1 + "1" + str2 + "1" + str3;
                System.out.println(str);
                // 也就是下面的字符串,分隔符为 u0001
                str = "d3fe1e186e41475ea965f4722f5488a8u00015093u0001公共设施";
                String[] split = str.split("1");
                for (String s : split) {
                    System.out.println(s);
                }
                System.out.println(split.length);
            

 ③

package Flink.Udf;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedHashMap;
import java.util.Map;
/**
 * mapStr转Map
 */
public class MapStrToMap extends ScalarFunction {
    private static final long serialVersionUID = 1L;
    private static final Logger logger = LoggerFactory.getLogger(MapStrToMap.class);
    @Override
    public void open(FunctionContext context) throws Exception {
        super.open(context);
    }
    @Override
    public void close() throws Exception {
        super.close();
    }
    public String eval(String data,String spt1,String spt2) {
        Map<String, String> map = new LinkedHashMap<>();
        try {
            if (StringUtils.isEmpty(data)) {
                return "";
            }
            else {
                String[] split = data.split(spt1);
                for (String s : split) {
                    String[] s1 = s.split(spt2);
                    if (s1.length > 1) {
                        map.put(s1[0], s1[1]);
                    } else {
                        map.put(s1[0], null);
                    }
                }
            }
        } catch (Exception e) {
            logger.error("MapStr to Json Error!,mapStr={}", e);
        }
        return map.toString();
    }
}

5、map转字符串

package Udf;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;

//方法1:map按照指定的分隔符转成字符串
public class MapToString extends ScalarFunction {
    private static final  long serialVersionUID=1L;
    public MapToString() {
    }
    public String eval(Map<String,Object> map,String split,String split1)
    {
        try {
            Set<String> keySet = map.keySet();
            //将set集合转换为数组
            String[] keyArray = keySet.<String>toArray(new String[keySet.size()]);
            //给数组排序(升序)
            Arrays.sort(keyArray);
            //因为String拼接效率会很低的,所以转用StringBuilder
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < keyArray.length; i++) {
                // 参数值为空,则不参与签名 这个方法trim()是去空格
                if ((String.valueOf(map.get(keyArray[i]))).trim().length() > 0) {
                    sb.append(keyArray[i]).append(split1).append(String.valueOf(map.get(keyArray[i])).trim());
                }
                if(i != keyArray.length-1){
                    sb.append(split);
                }
            }
            return sb.toString();
        }
        catch(Exception ex)
        {
            ex.printStackTrace();
            return null;
        }
    }
}

6、map转Json

package Flink.Udf;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
import java.util.LinkedHashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * mapStr转Json
 */
public class MapStrToJson extends ScalarFunction {
    private static final long serialVersionUID = 1L;
    private static final Logger logger = LoggerFactory.getLogger(MapStrToJson.class);
    @Override
    public void open(FunctionContext context) throws Exception {
        super.open(context);
    }
    @Override
    public void close() throws Exception {
        super.close();
    }
    public String eval(String data,String spt1,String spt2) {
        Map<String, String> map = new LinkedHashMap<>();
        try {
            if (StringUtils.isEmpty(data)) {
                return "";
            }
            else {
                String[] split = data.split(spt1);
                for (String s : split) {
                    String[] s1 = s.split(spt2);
                    if (s1.length > 1) {
                        map.put(s1[0], s1[1]);
                    } else {
                        map.put(s1[0], null);
                    }
                }
            }
        } catch (Exception e) {
            logger.error("MapStr to Json Error!,mapStr={}", e);
        }
        return JSONObject.toJSONString(map);
    }
}

package com.oppo.dc.ostream.udf.common;

import com.alibaba.fastjson.JSON;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;

import java.util.Map;

public class
MapToJSONString extends ScalarFunction {
    private static final long serialVersionUID = 1L;
    @Override
    public void open(FunctionContext context) throws Exception {
        super.open(context);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    public String eval(Map map) {
        if (map == null) {
            return null;
        }

        String jsonStr = JSON.toJSONString(map);
        if (jsonStr.length() > 5000) {
            jsonStr = jsonStr.substring(0, 5000);
        }

        return jsonStr;
    }
}
package com.oppo.dc.ostream.udf.common;

import com.alibaba.fastjson.JSON;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;

import java.util.Map;

/**
 * Author: Fisher Xiang
 * Date: 2019-10-28 15:21
 *
 * @description:MapToJSONStr
 * @version: 1.0.0
 */

public class MapToJSONStr extends ScalarFunction {

    @Override
    public void open(FunctionContext context) throws Exception {
        super.open(context);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    public String eval(Map map) {
        if (map == null) {
            return null;
        }
        if (map.containsKey("data")) {
            map.put("data", ((String) map.get("data")).replace(""%2C"", "",""));
        }

        String jsonStr = JSON.toJSONString(map);
        /*if (jsonStr.length() > 5000) {
            jsonStr = jsonStr.substring(0, 5000);
        }*/

        return jsonStr;
    }

}
④map字符串转json(按照hash进行排序)
package FlinkDW.Udf;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * mapStr转Json
 */
public class MapStrToJson extends ScalarFunction
{
    private static final long serialVersionUID = 1 L;
    private static final Logger logger = LoggerFactory.getLogger(MapStrToJson.class);@
    Override
    public void open(FunctionContext context) throws Exception
    {
        super.open(context);
    }@
    Override
    public void close() throws Exception
    {
        super.close();
    }
    public String eval(String data, String spt1, String spt2)
    {
        Map < String, String > map = new HashMap < > ();
        try
        {
            if(StringUtils.isEmpty(data))
            {
                return "";
            }
            else
            {
                String[] split = data.split(spt1);
                for(String s: split)
                {
                    String[] s1 = s.split(spt2);
                    if(s1.length > 1)
                    {
                        map.put(s1[0], s1[1]);
                    }
                    else
                    {
                        map.put(s1[0], "null");
                    }
                }
            }
        }
        catch(Exception e)
        {
            logger.error("MapStr to Json Error!,mapStr={}", data, e);
        }
        return JSONObject.toJSONString(map);
    }
}
⑤优化(固定顺序)
package Flink.Udf;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
import java.util.LinkedHashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * mapStr转Json
 */
public class MapStrToJson extends ScalarFunction
{
    private static final long serialVersionUID = 1 L;
    private static final Logger logger = LoggerFactory.getLogger(MapStrToJson.class);@
    Override
    public void open(FunctionContext context) throws Exception
    {
        super.open(context);
    }@
    Override
    public void close() throws Exception
    {
        super.close();
    }
    public String eval(String data, String spt1, String spt2)
    {
        Map < String, String > map = new LinkedHashMap < > ();
        try
        {
            if(StringUtils.isEmpty(data))
            {
                return "";
            }
            else
            {
                String[] split = data.split(spt1);
                for(String s: split)
                {
                    String[] s1 = s.split(spt2);
                    if(s1.length > 1)
                    {
                        map.put(s1[0], s1[1]);
                    }
                    else
                    {
                        map.put(s1[0], null);
                    }
                }
            }
        }
        catch(Exception e)
        {
            logger.error("MapStr to Json Error!,mapStr={}", e);
        }
        return JSONObject.toJSONString(map);
    }
}

测试

package Flink.Udf;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class Test {
    private static final Logger logger = LoggerFactory.getLogger(Test.class);
    /**
    * @Description: 1、map字符串转json
    * @Param: [data, spt1, spt2]
    * @return: java.lang.String
    * @Author: BigData
    * @Date: 2020/12/3
    */
    public static String  testDataToJson(String data,String spt1,String spt2) {
        Map<String, String> map = new LinkedHashMap<>();
        try {
            if (StringUtils.isEmpty(data)) {
                return "";
            }
            else {
                String[] split = data.split(spt1);
                for (String s : split) {
                    String[] s1 = s.split(spt2);
                    if (s1.length > 1) {
                        map.put(s1[0], s1[1]);
                    } else {
                        map.put(s1[0], "null");
                    }
                }
            }
        } catch (Exception e) {
            logger.error("MapStr to Json Error!,mapStr={}", data, e);
        }
        return JSONObject.toJSONString(map);
    }
    /**
    * @Description: 2、map字符串转map格式
    * @Param: [data, spt1, spt2]
    * @return: java.lang.String
    * @Author: BigData
    * @Date: 2020/12/3
    */
    public static String  testDataToMap(String data,String spt1,String spt2) {
        Map<String,String> map = new LinkedHashMap<>();
        try
        {
            if (data instanceof String)
            {
                if (StringUtils.isEmpty(data) || data.length() <= 2) {
                    return null;
                }
                else
                {
                    String[] split = data.split(spt1);
                    for (String s : split) {
                        String[] s1=s.split(spt2);
                        if(s1.length>1)
                        {
                            map.put(s1[0], s1[1]);
                        }
                        else
                        {
                            map.put(s1[0],"null");
                        }
                    }

                }
            }
            else if(data!=null || data.length()>2)
            {
                return null;
            }
        }
        catch (Exception e)
        {
            e.getMessage();
        }
        return map.toString();
    }
    public static String getType(Object o){ //获取变量类型方法
        return o.getClass().toString();
    }
    public static void main(String[] args) {
        String data1 = "certificatetype=0&certificateno=220182&depositacct=622848&name";
        System.out.println(testDataToJson(data1,"&", "="));
        String str="k1:,k2:V2";
        System.out.println(testDataToJson(str, ",", ":"));
        String str1="stayTimeu00020u0001firstScreenTimeu00020u0001apiNameu0002stringValueu0001apiParamu0002idu003d44u0001methodTypeu0002GETu0001loadingTimeu0002453u0001websiteIdu0002stringValueu0001pagePathu0002/stringValueu0001pageNameu0002/valueu0001sessionKeyu0002eyJhbX0AwqPisoE6V8Au0001userNameu000280254861u0001ipAddressu0002XX.XX.XCX.XXXu0001uploadTimeu00022020-11-23 13:07:13u0001receiveTimeu00022020-11-26 11:08:55u0001eventCodeu00025";
        System.out.println(testDataToMap(str1,"u0001", "u0002"));
        System.out.println(testDataToJson(str1,"u0001", "u0002"));
        String data ="client_timestampu00011587375750618u0002apply_valueu00013u0002pre_page_idu0001DraftActivityu0002close_valueu00010u0002item_idu0001no_applyu0002start_timestampu00011587375592167u0002template_id_autou0001-1u0002music_nameu0001少年2u0002play_cntu00010u0002duration_valueu00015506u0002video_idu00011587375633000u0002is_storyu0001false";
        System.out.println(testDataToJson(data,"u0002", "u0001"));
        String str2="ts=1529995388&channel_id=164292&program_id=9081951&play_duration=20&position=10&os=iOS&os_version=4.3&app_name=aliyun&app_version=1.0&device_model=Samsumg_br_FM__samsung_SCH-N719";
        System.out.println(testDataToJson(str2,"&","="));
        System.out.println(testDataToMap(str2,"&","="));
        String logMap ="websiteIdu0002ota-recruit-TEST_ENVu0001pagePathu0002/configuration/indexu0001pageNameu0002????u0001sessionKeyu0002sessionKeyu0001userNameu00020u0001ipAddressu0002172.17.75.8u0001uploadTimeu00022020-12-09 15:10:13u0001receiveTimeu00022020-12-09 15:10:20u0001eventCodeu00021";
        System.out.println(testDataToJson(logMap,"u0001", "u0002"));
        System.out.println(testDataToMap(logMap, "u0001", "u0002"));
    }
}

测试结果

"C:Program FilesJavajdk1.8.0_221injava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2019.3.3libidea_rt.jar=54638:C:Program FilesJetBrainsIntelliJ IDEA 2019.3.3in" -Dfile.encoding=UTF-8 -classpath C:Usersdefaultuser0AppDataLocalTempclasspath1090826265.jar Flink.Udf.Test
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/Users/defaultuser0/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/defaultuser0/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.10.0/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
{"certificatetype":"0","certificateno":"220182","depositacct":"622848","name":"null"}
{"k1":"null","k2":"V2"}
{stayTime=0, firstScreenTime=0, apiName=stringValue, apiParam=id=44, methodType=GET, loadingTime=453, websiteId=stringValue, pagePath=/stringValue, pageName=/value, sessionKey=eyJhbX0AwqPisoE6V8A, userName=80254861, ipAddress=XX.XX.XCX.XXX, uploadTime=2020-11-23 13:07:13, receiveTime=2020-11-26 11:08:55, eventCode=5}
{"stayTime":"0","firstScreenTime":"0","apiName":"stringValue","apiParam":"id=44","methodType":"GET","loadingTime":"453","websiteId":"stringValue","pagePath":"/stringValue","pageName":"/value","sessionKey":"eyJhbX0AwqPisoE6V8A","userName":"80254861","ipAddress":"XX.XX.XCX.XXX","uploadTime":"2020-11-23 13:07:13","receiveTime":"2020-11-26 11:08:55","eventCode":"5"}
{"client_timestamp":"1587375750618","apply_value":"3","pre_page_id":"DraftActivity","close_value":"0","item_id":"no_apply","start_timestamp":"1587375592167","template_id_auto":"-1","music_name":"少年2","play_cnt":"0","duration_value":"5506","video_id":"1587375633000","is_story":"false"}
{"ts":"1529995388","channel_id":"164292","program_id":"9081951","play_duration":"20","position":"10","os":"iOS","os_version":"4.3","app_name":"aliyun","app_version":"1.0","device_model":"Samsumg_br_FM__samsung_SCH-N719"}
{ts=1529995388, channel_id=164292, program_id=9081951, play_duration=20, position=10, os=iOS, os_version=4.3, app_name=aliyun, app_version=1.0, device_model=Samsumg_br_FM__samsung_SCH-N719}
{"websiteId":"ota-recruit-TEST_ENV","pagePath":"/configuration/index","pageName":"????","sessionKey":"sessionKey","userName":"0","ipAddress":"172.17.75.8","uploadTime":"2020-12-09 15:10:13","receiveTime":"2020-12-09 15:10:20","eventCode":"1"}
{websiteId=ota-recruit-TEST_ENV, pagePath=/configuration/index, pageName=????, sessionKey=sessionKey, userName=0, ipAddress=172.17.75.8, uploadTime=2020-12-09 15:10:13, receiveTime=2020-12-09 15:10:20, eventCode=1}

Process finished with exit code 0

 测试2

package FlinkDW.Udf;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Test {
    public static String  testDataToMap(String data,String spt1,String spt2) {
        Map<String,String> map = new HashMap<String,String>();
        try
        {
            if (data instanceof String)
            {
                if (StringUtils.isEmpty(data) || data.length() <= 2) {
                    return data;
                }
                else
                {
                    String[] split = data.split(spt1);
                    for (String s : split) {
                        String[] s1=s.split(spt2);
                        if(s1.length>1)
                        {
                            map.put(s1[0], s1[1]);
                        }
                        else
                        {
                            map.put(s1[0],"null");
                        }
                    }

                }
            }
            else if(data!=null || data.length()>2)
            {
                return "please input type of string "+getType(data);
            }
        }
        catch (Exception e)
        {
            e.getMessage();
        }
        return JSONObject.toJSONString(map);
    }
    public static String getType(Object o){ //获取变量类型方法
        return o.getClass().toString();
    }
    public static void main(String[] args) {
        String data1 = "certificatetype=0&certificateno=220182&depositacct=622848&name";
        String data ="client_timestampu00011587375750618u0002apply_valueu00013u0002pre_page_idu0001DraftActivityu0002close_valueu00010u0002item_idu0001no_applyu0002start_timestampu00011587375592167u0002template_id_autou0001-1u0002music_nameu0001少年2u0002play_cntu00010u0002duration_valueu00015506u0002video_idu00011587375633000u0002is_storyu0001false";
        System.out.println(testDataToMap(data,"u0002", "u0001"));
        System.out.println(testDataToMap(data1,"&", "="));
        String str="k1:,k2:V2";
        System.out.println(testDataToMap(str, ",", ":"));
    }
}

结果:

"C:Program FilesJavajdk1.8.0_221injava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2019.3.3libidea_rt.jar=65426:C:Program FilesJetBrainsIntelliJ IDEA 2019.3.3in" -Dfile.encoding=UTF-8 -classpath C:Usersdefaultuser0AppDataLocalTempclasspath1898420598.jar FlinkDW.Udf.Test
{"client_timestamp":"1587375750618","apply_value":"3","pre_page_id":"DraftActivity","close_value":"0","item_id":"no_apply","start_timestamp":"1587375592167","template_id_auto":"-1","music_name":"少年2","play_cnt":"0","duration_value":"5506","video_id":"1587375633000","is_story":"false"}
{"name":"null","certificatetype":"0","certificateno":"220182","depositacct":"622848"}
{"k1":"null","k2":"V2"}

Process finished with exit code 0

7、json转map

package Udf;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
import java.util.Map;
//方法1:json转成map
public class JsonToMap extends ScalarFunction {
    private static final  long serialVersionUID=1L;
    public JsonToMap() {
    }

    @Override
    public void open(FunctionContext context) throws Exception {
        super.open(context);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    public Map eval(String str)
    {
        try {
            if(str !=null)
            {
                JSONObject jsonObject = JSONObject.parseObject(str);
                Map<String,Object> map = jsonObject;
                return map;
            }
            else
            {
                return null;
            }
        }
        catch(Exception ex)
        {
            ex.printStackTrace();
            return null;
        }
    }
}

注意:FastJson的json转map的6种方式

package Udf;
import java.util.Map;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
public class Test1 {
    public static void main(String[] args){

        String str = "{"0":"zhangsan","1":"lisi","2":"wangwu","3":"maliu"}";
        //第一种方式
        Map maps = (Map) JSON.parse(str);
        System.out.println("这个是用JSON类来解析JSON字符串!!!");
        for (Object map : maps.entrySet()){
            System.out.println(((Map.Entry)map).getKey()+"     " + ((Map.Entry)map).getValue());
        }
        //第二种方式
        Map mapTypes = JSON.parseObject(str);
        System.out.println("这个是用JSON类的parseObject来解析JSON字符串!!!");
        for (Object obj : mapTypes.keySet()){
            System.out.println("key为:"+obj+"值为:"+mapTypes.get(obj));
        }
        //第三种方式
        Map mapType = JSON.parseObject(str,Map.class);
        System.out.println("这个是用JSON类,指定解析类型,来解析JSON字符串!!!");
        for (Object obj : mapType.keySet()){
            System.out.println("key为:"+obj+"值为:"+mapType.get(obj));
        }
        //第四种方式
        /**
         * JSONObject是Map接口的一个实现类
         */
        Map json = (Map) JSONObject.parse(str);
        System.out.println("这个是用JSONObject类的parse方法来解析JSON字符串!!!");
        for (Object map : json.entrySet()){
            System.out.println(((Map.Entry)map).getKey()+"  "+((Map.Entry)map).getValue());
        }
        //第五种方式
        /**
         * JSONObject是Map接口的一个实现类
         */
        JSONObject jsonObject = JSONObject.parseObject(str);
        System.out.println("这个是用JSONObject的parseObject方法来解析JSON字符串!!!");
        for (Object map : json.entrySet()){
            System.out.println(((Map.Entry)map).getKey()+"  "+((Map.Entry)map).getValue());
        }
        //第六种方式
        /**
         * JSONObject是Map接口的一个实现类
         */
        Map mapObj = JSONObject.parseObject(str,Map.class);
        System.out.println("这个是用JSONObject的parseObject方法并执行返回类型来解析JSON字符串!!!");
        for (Object map: json.entrySet()){
            System.out.println(((Map.Entry)map).getKey()+"  "+((Map.Entry)map).getValue());
        }
        String strArr = "{{"0":"zhangsan","1":"lisi","2":"wangwu","3":"maliu"}," +
                "{"00":"zhangsan","11":"lisi","22":"wangwu","33":"maliu"}}";
        // JSONArray.parse()
        System.out.println(json);
    }
}

结果:

"C:Program FilesJavajdk1.8.0_221injava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2019.3.3libidea_rt.jar=55064:C:Program FilesJetBrainsIntelliJ IDEA 2019.3.3in" -Dfile.encoding=UTF-8 -classpath "C:Program FilesJavajdk1.8.0_221jrelibcharsets.jar;C:Program FilesJavajdk1.8.0_221jrelibdeploy.jar;C:Program FilesJavajdk1.8.0_221jrelibextaccess-bridge-64.jar;C:Program FilesJavajdk1.8.0_221jrelibextcldrdata.jar;C:Program FilesJavajdk1.8.0_221jrelibextdnsns.jar;C:Program FilesJavajdk1.8.0_221jrelibextjaccess.jar;C:Program FilesJavajdk1.8.0_221jrelibextjfxrt.jar;C:Program FilesJavajdk1.8.0_221jrelibextlocaledata.jar;C:Program FilesJavajdk1.8.0_221jrelibext
ashorn.jar;C:Program FilesJavajdk1.8.0_221jrelibextsunec.jar;C:Program FilesJavajdk1.8.0_221jrelibextsunjce_provider.jar;C:Program FilesJavajdk1.8.0_221jrelibextsunmscapi.jar;C:Program FilesJavajdk1.8.0_221jrelibextsunpkcs11.jar;C:Program FilesJavajdk1.8.0_221jrelibextzipfs.jar;C:Program FilesJavajdk1.8.0_221jrelibjavaws.jar;C:Program FilesJavajdk1.8.0_221jrelibjce.jar;C:Program FilesJavajdk1.8.0_221jrelibjfr.jar;C:Program FilesJavajdk1.8.0_221jrelibjfxswt.jar;C:Program FilesJavajdk1.8.0_221jrelibjsse.jar;C:Program FilesJavajdk1.8.0_221jrelibmanagement-agent.jar;C:Program FilesJavajdk1.8.0_221jrelibplugin.jar;C:Program FilesJavajdk1.8.0_221jrelib
esources.jar;C:Program FilesJavajdk1.8.0_221jrelib
t.jar;C:appFlinkUdf	arget	est-classes;C:Usersdefaultuser0.m2
epositoryjunitjunit4.11junit-4.11.jar;C:Usersdefaultuser0.m2
epositoryorghamcresthamcrest-core1.3hamcrest-core-1.3.jar;C:Usersdefaultuser0.m2
epositoryorgapacheflinkflink-table-api-java-bridge_2.111.11.1flink-table-api-java-bridge_2.11-1.11.1.jar;C:Usersdefaultuser0.m2
epositoryorgapacheflinkflink-table-api-java1.11.1flink-table-api-java-1.11.1.jar;C:Usersdefaultuser0.m2
epositoryorgapacheflinkflink-java1.11.1flink-java-1.11.1.jar;C:Usersdefaultuser0.m2
epositoryorgapachecommonscommons-lang33.3.2commons-lang3-3.3.2.jar;C:Usersdefaultuser0.m2
epositoryorgapachecommonscommons-math33.5commons-math3-3.5.jar;C:Usersdefaultuser0.m2
epositoryorgapacheflinkflink-streaming-java_2.111.11.1flink-streaming-java_2.11-1.11.1.jar;C:Usersdefaultuser0.m2
epositoryorgapacheflinkflink-runtime_2.111.11.1flink-runtime_2.11-1.11.1.jar;C:Usersdefaultuser0.m2
epositoryorgapacheflinkflink-queryable-state-client-java1.11.1flink-queryable-state-client-java-1.11.1.jar;C:Usersdefaultuser0.m2
epositoryorgapacheflinkflink-hadoop-fs1.11.1flink-hadoop-fs-1.11.1.jar;C:Usersdefaultuser0.m2
epositorycommons-iocommons-io2.4commons-io-2.4.jar;C:Usersdefaultuser0.m2
epositoryorgapacheflinkflink-shaded-netty4.1.39.Final-11.0flink-shaded-netty-4.1.39.Final-11.0.jar;C:Usersdefaultuser0.m2
epositoryorgapacheflinkflink-shaded-jackson2.10.1-11.0flink-shaded-jackson-2.10.1-11.0.jar;C:Usersdefaultuser0.m2
epositoryorgapacheflinkflink-shaded-zookeeper-33.4.14-11.0flink-shaded-zookeeper-3-3.4.14-11.0.jar;C:Usersdefaultuser0.m2
epositorycommons-clicommons-cli1.3.1commons-cli-1.3.1.jar;C:Usersdefaultuser0.m2
epositorycom	ypesafeakkaakka-actor_2.112.5.21akka-actor_2.11-2.5.21.jar;C:Usersdefaultuser0.m2
epositorycom	ypesafeconfig1.3.3config-1.3.3.jar;C:Usersdefaultuser0.m2
epositoryorgscala-langmodulesscala-java8-compat_2.11.7.0scala-java8-compat_2.11-0.7.0.jar;C:Usersdefaultuser0.m2
epositorycom	ypesafeakkaakka-stream_2.112.5.21akka-stream_2.11-2.5.21.jar;C:Usersdefaultuser0.m2
epositoryorg
eactivestreams
eactive-streams1.0.2
eactive-streams-1.0.2.jar;C:Usersdefaultuser0.m2
epositorycom	ypesafessl-config-core_2.11.3.7ssl-config-core_2.11-0.3.7.jar;C:Usersdefaultuser0.m2
epositorycom	ypesafeakkaakka-protobuf_2.112.5.21akka-protobuf_2.11-2.5.21.jar;C:Usersdefaultuser0.m2
epositorycom	ypesafeakkaakka-slf4j_2.112.5.21akka-slf4j_2.11-2.5.21.jar;C:Usersdefaultuser0.m2
epositoryorgclappergrizzled-slf4j_2.111.3.2grizzled-slf4j_2.11-1.3.2.jar;C:Usersdefaultuser0.m2
epositorycomgithubscoptscopt_2.113.5.0scopt_2.11-3.5.0.jar;C:Usersdefaultuser0.m2
epositoryorgxerialsnappysnappy-java1.1.4snappy-java-1.1.4.jar;C:Usersdefaultuser0.m2
epositorycom	witterchill_2.11.7.6chill_2.11-0.7.6.jar;C:Usersdefaultuser0.m2
epositorycom	witterchill-java.7.6chill-java-0.7.6.jar;C:Usersdefaultuser0.m2
epositoryorglz4lz4-java1.6.0lz4-java-1.6.0.jar;C:Usersdefaultuser0.m2
epositoryorgapacheflinkflink-shaded-guava18.0-11.0flink-shaded-guava-18.0-11.0.jar;C:Usersdefaultuser0.m2
epositoryorgapacheflinkflink-streaming-java_2.111.11.1flink-streaming-java_2.11-1.11.1-tests.jar;C:Usersdefaultuser0.m2
epositoryorgslf4jslf4j-api1.7.15slf4j-api-1.7.15.jar;C:Usersdefaultuser0.m2
epositorycomgooglecodefindbugsjsr3051.3.9jsr305-1.3.9.jar;C:Usersdefaultuser0.m2
epositoryorgapacheflinkforce-shading1.11.1force-shading-1.11.1.jar;C:Usersdefaultuser0.m2
epositoryorgapacheflinkflink-table-planner-blink_2.111.11.1flink-table-planner-blink_2.11-1.11.1.jar;C:Usersdefaultuser0.m2
epositoryorgapacheflinkflink-table-api-scala_2.111.11.1flink-table-api-scala_2.11-1.11.1.jar;C:Usersdefaultuser0.m2
epositoryorgapacheflinkflink-table-api-scala-bridge_2.111.11.1flink-table-api-scala-bridge_2.11-1.11.1.jar;C:Usersdefaultuser0.m2
epositoryorgapacheflinkflink-table-runtime-blink_2.111.11.1flink-table-runtime-blink_2.11-1.11.1.jar;C:Usersdefaultuser0.m2
epositoryorgcodehausjaninojanino3.0.9janino-3.0.9.jar;C:Usersdefaultuser0.m2
epositoryorgcodehausjaninocommons-compiler3.0.9commons-compiler-3.0.9.jar;C:Usersdefaultuser0.m2
epositoryorgapachecalciteavaticaavatica-core1.16.0avatica-core-1.16.0.jar;C:Usersdefaultuser0.m2
epositoryorg
eflections
eflections.9.10
eflections-0.9.10.jar;C:Usersdefaultuser0.m2
epositoryorgjavassistjavassist3.19.0-GAjavassist-3.19.0-GA.jar;C:Usersdefaultuser0.m2
epositoryorgapacheflinkflink-streaming-scala_2.111.11.1flink-streaming-scala_2.11-1.11.1.jar;C:Usersdefaultuser0.m2
epositoryorgapacheflinkflink-scala_2.111.11.1flink-scala_2.11-1.11.1.jar;C:Usersdefaultuser0.m2
epositoryorgscala-langscala-reflect2.11.12scala-reflect-2.11.12.jar;C:Usersdefaultuser0.m2
epositoryorgscala-langscala-library2.11.12scala-library-2.11.12.jar;C:Usersdefaultuser0.m2
epositoryorgscala-langscala-compiler2.11.12scala-compiler-2.11.12.jar;C:Usersdefaultuser0.m2
epositoryorgscala-langmodulesscala-xml_2.111.0.5scala-xml_2.11-1.0.5.jar;C:Usersdefaultuser0.m2
epositoryorgscala-langmodulesscala-parser-combinators_2.111.0.4scala-parser-combinators_2.11-1.0.4.jar;C:Usersdefaultuser0.m2
epositoryorgapacheflinkflink-table-common1.11.1flink-table-common-1.11.1.jar;C:Usersdefaultuser0.m2
epositoryorgapacheflinkflink-core1.11.1flink-core-1.11.1.jar;C:Usersdefaultuser0.m2
epositoryorgapacheflinkflink-annotations1.11.1flink-annotations-1.11.1.jar;C:Usersdefaultuser0.m2
epositoryorgapacheflinkflink-metrics-core1.11.1flink-metrics-core-1.11.1.jar;C:Usersdefaultuser0.m2
epositorycomesotericsoftwarekryokryo2.24.0kryo-2.24.0.jar;C:Usersdefaultuser0.m2
epositorycomesotericsoftwareminlogminlog1.2minlog-1.2.jar;C:Usersdefaultuser0.m2
epositoryorgobjenesisobjenesis2.1objenesis-2.1.jar;C:Usersdefaultuser0.m2
epositoryorgapachecommonscommons-compress1.20commons-compress-1.20.jar;C:Usersdefaultuser0.m2
epositoryorgapacheflinkflink-shaded-asm-77.1-11.0flink-shaded-asm-7-7.1-11.0.jar;C:Usersdefaultuser0.m2
epositorycomalibabafastjson1.2.73fastjson-1.2.73.jar;C:Usersdefaultuser0.m2
epositoryorgslf4jslf4j-log4j121.7.7slf4j-log4j12-1.7.7.jar;C:Usersdefaultuser0.m2
epositorylog4jlog4j1.2.17log4j-1.2.17.jar;C:Usersdefaultuser0.m2
epository
etsfjson-libjson-lib2.4json-lib-2.4-jdk15.jar;C:Usersdefaultuser0.m2
epositorycommons-beanutilscommons-beanutils1.8.0commons-beanutils-1.8.0.jar;C:Usersdefaultuser0.m2
epositorycommons-collectionscommons-collections3.2.1commons-collections-3.2.1.jar;C:Usersdefaultuser0.m2
epositorycommons-langcommons-lang2.5commons-lang-2.5.jar;C:Usersdefaultuser0.m2
epositorycommons-loggingcommons-logging1.1.1commons-logging-1.1.1.jar;C:Usersdefaultuser0.m2
epository
etsfezmorphezmorph1.0.6ezmorph-1.0.6.jar" Udf.Test1
这个是用JSON类来解析JSON字符串!!!
0     zhangsan
1     lisi
2     wangwu
3     maliu
这个是用JSON类的parseObject来解析JSON字符串!!!
key为:0值为:zhangsan
key为:1值为:lisi
key为:2值为:wangwu
key为:3值为:maliu
这个是用JSON类,指定解析类型,来解析JSON字符串!!!
key为:0值为:zhangsan
key为:1值为:lisi
key为:2值为:wangwu
key为:3值为:maliu
这个是用JSONObject类的parse方法来解析JSON字符串!!!
0  zhangsan
1  lisi
2  wangwu
3  maliu
这个是用JSONObject的parseObject方法来解析JSON字符串!!!
0  zhangsan
1  lisi
2  wangwu
3  maliu
这个是用JSONObject的parseObject方法并执行返回类型来解析JSON字符串!!!
0  zhangsan
1  lisi
2  wangwu
3  maliu
{"0":"zhangsan","1":"lisi","2":"wangwu","3":"maliu"}

Process finished with exit code 0

 8、数组转字符串数组

package Udf;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
import java.util.Arrays;
//方法4:数组转成数组字符串
public class ArrToString3 extends ScalarFunction {
    private static final  long serialVersionUID=1L;
    public ArrToString3() {
    }

    @Override
    public void open(FunctionContext context) throws Exception {
        super.open(context);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    public String eval(String[] arr)
    {
        try {
           if(arr !=null && arr.length>0)
           {
              return Arrays.toString(arr);
           }
           else
           {
               return null;
           }
        }
        catch(Exception ex)
        {
            return ex.getMessage();
        }
    }
}

9、数组转json

package Udf;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
import org.json.JSONArray;
//方法1:数组转成json数组
public class ArrToJson extends ScalarFunction {
    private static final long serialVersionUID = 1L;

    @Override
    public void open(FunctionContext context) throws Exception {
        super.open(context);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    public ArrToJson() {
    }

    public String eval(String[] arr) {
        try {
            if (arr != null && arr.length > 0) {
                JSONArray jsonArray=new JSONArray(arr);
                return jsonArray.toString();
            } else {
                return null;
            }
        } catch (Exception ex) {
            return ex.getMessage();
        }
    }
}

 10、LogMapInfo数据格式处理(logmap字段解析udf)

1、模型包

1.CellInfo
package Test.Model;

/**
 * @program: FlinkUdf
 * @description: CellInfo
 * @author: BigData
 * @create: 2020-11-16 14:50
 **/
public class CellInfo {
    private String mcc;
    private String mnc;
    private String ci;
    private String pci;
    private String tac;
    private String type;
    public String getMcc() {
        return mcc;
    }

    public void setMcc(String mcc) {
        this.mcc = mcc;
    }

    public String getMnc() {
        return mnc;
    }

    public void setMnc(String mnc) {
        this.mnc = mnc;
    }

    public String getCi() {
        return ci;
    }

    public void setCi(String ci) {
        this.ci = ci;
    }

    public String getPci() {
        return pci;
    }

    public void setPci(String pci) {
        this.pci = pci;
    }

    public String getTac() {
        return tac;
    }

    public void setTac(String tac) {
        this.tac = tac;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }
}
2.GPS
package Test.Model;

/**
 * @program: FlinkUdf
 * @description: Gps
 * @author: BigData
 * @create: 2020-11-16 14:51
 **/
public class Gps {
    private String longitude;
    private String latitude;

    public String getLongitude() {
        return longitude;
    }

    public void setLongitude(String longitude) {
        this.longitude = longitude;
    }

    public String getLatitude() {
        return latitude;
    }

    public void setLatitude(String latitude) {
        this.latitude = latitude;
    }
}
package Test.Model;

/**
 * @program: FlinkUdf
 * @description: Gps
 * @author: BigData
 * @create: 2020-11-16 14:51
 **/
public class Gps {
    private String longitude;
    private String latitude;

    public String getLongitude() {
        return longitude;
    }

    public void setLongitude(String longitude) {
        this.longitude = longitude;
    }

    public String getLatitude() {
        return latitude;
    }

    public void setLatitude(String latitude) {
        this.latitude = latitude;
    }
}
3.StationInfo
package Test.Model;

import java.util.List;

/**
 * @program: FlinkUdf
 * @description: 地铁模型
 * @author: BigData
 * @create: 2020-11-16 14:49
 **/
public final class StationInfo {
    private String station;
    private CellInfo cell_info;
    private List<String> wifi_infos;
    private String timestamp;
    private String reason;
    private String imei;
    private String guid;
    private Gps gps;
    private String city;
    private String group;
    private String version_name;
    private String version_code;

    public String getStation() {
        return station;
    }

    public void setStation(String station) {
        this.station = station;
    }

    public CellInfo getCell_info() {
        return cell_info;
    }

    public void setCell_info(CellInfo cell_info) {
        this.cell_info = cell_info;
    }

    public List<String> getWifi_infos() {
        return wifi_infos;
    }

    public void setWifi_infos(List<String> wifi_infos) {
        this.wifi_infos = wifi_infos;
    }

    public String getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(String timestamp) {
        this.timestamp = timestamp;
    }

    public String getReason() {
        return reason;
    }

    public void setReason(String reason) {
        this.reason = reason;
    }

    public String getImei() {
        return imei;
    }

    public void setImei(String imei) {
        this.imei = imei;
    }

    public String getGuid() {
        return guid;
    }

    public void setGuid(String guid) {
        this.guid = guid;
    }

    public Gps getGps() {
        return gps;
    }

    public void setGps(Gps gps) {
        this.gps = gps;
    }

    public String getCity() {
        return city;
    }

    public void setCity(String city) {
        this.city = city;
    }

    public String getGroup() {
        return group;
    }

    public void setGroup(String group) {
        this.group = group;
    }

    public String getVersion_name() {
        return version_name;
    }

    public void setVersion_name(String version_name) {
        this.version_name = version_name;
    }

    public String getVersion_code() {
        return version_code;
    }

    public void setVersion_code(String version_code) {
        this.version_code = version_code;
    }
}

2、工具包

1、LogMapInfoSplit(logmap数据格式解析)
package Test.Util;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import Test.Model.StationInfo;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
/**
 * @program: FlinkUdf
 * @description: LogMap字段解析
 * @author: BigData
 * @create: 2020-11-16 14:53
 **/
public class LogMapInfoSplit extends TableFunction<Row> {
    private static final long serialVersionUID = 5485869053798534732L;
    private static Gson gson = new Gson();
    private static Logger logger= LoggerFactory.getLogger(LogMapInfoSplit.class);
    /**
     * 接受参数格式转换
     * @param param
     */
    public void eval(String param) {
        if(StringUtils.isBlank(param)) {
            logger.debug("param is empty");
            return;
        }
        try {
            final Base64.Decoder decoder = Base64.getUrlDecoder();
            String ocaUploadJson = new String(
                    decoder.decode(param.replace(" ", "+").replace("$", "=").getBytes(StandardCharsets.UTF_8)),
                    StandardCharsets.UTF_8);
            logger.info("sourceParam :" + ocaUploadJson);
            if (StringUtils.isBlank(ocaUploadJson)) {
                return;
            }
            StationInfo stationInfo = gson.fromJson(ocaUploadJson, StationInfo.class);
            if (stationInfo == null)
                return;
            logger.info("received wifiInfo:" + stationInfo.toString());
            String[] wifiInfos = null;
            if (stationInfo.getWifi_infos() != null && stationInfo.getWifi_infos().size() != 0) {
                wifiInfos = stationInfo.getWifi_infos().toArray(new String[stationInfo.getWifi_infos().size()]);
            }
            Row row = new Row(13);
            row.setField(0, stationInfo.getStation());
            row.setField(1, stationInfo.getImei());
            row.setField(2, objectToMap(stationInfo.getCell_info()));
            row.setField(3, wifiInfos);
            row.setField(4, stationInfo.getCity());
            row.setField(5, stationInfo.getReason());
            logger.info("station is:" + stationInfo.getStation());
            Map<String, String> objMap = objectToMap(stationInfo.getGps());
            logger.info("obj map is:" + objMap);
            row.setField(6, objMap);
            row.setField(7, new Timestamp(Long.parseLong(stationInfo.getTimestamp())));
            row.setField(8, stationInfo.getGroup());
            row.setField(9, DateFormatUtils.ISO_DATE_FORMAT.format(Long.parseLong(stationInfo.getTimestamp())));
            row.setField(10,stationInfo.getGuid());
            row.setField(11,stationInfo.getVersion_name());
            row.setField(12,stationInfo.getVersion_code());
            collect(row);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

    /**
     * 指定返回类型
     */
    @Override
    public TypeInformation<Row> getResultType() {
        return Types.ROW(Types.STRING, Types.STRING, Types.MAP(Types.STRING, Types.STRING)
                , Types.OBJECT_ARRAY(Types.STRING), Types.STRING, Types.STRING, Types.MAP(Types.STRING, Types.STRING), Types.SQL_TIMESTAMP, Types.STRING, Types.STRING,Types.STRING,Types.STRING,Types.STRING);
    }
    /**
     * object转换为Map
     * @param obj
     * @return
     */
    public static Map<String, String> objectToMap(Object obj) {
        if(obj==null)
        {
            return null;
        }
        else {
            Map<String, String> map = new HashMap<String, String>();
            Class<?> clazz = obj.getClass();
            for (Field field : clazz.getDeclaredFields()) {
                field.setAccessible(true);
                String fieldName = field.getName();
                try {
                    map.put(fieldName, String.valueOf(field.get(obj)));
                } catch (Exception e) {
                    logger.error(e.getMessage());
                }
            }
            return map;
        }
    }
}

 flinksql代码:

INSERT INTO
  software_research.subwayaware_info
SELECT
  station,
  logMap_imei as `imei`,
  cell_info,
  wifi_infos,
  city,
  reason,
  gps,
  CAST(createtime as VARCHAR) as `timestamp`,
  logMap_group as `group`,
  datestr,
  model,
  event_id,
  guid,
  version_name,
  version_code
FROM
  software_research.subwayaware_dcs
  LEFT JOIN LATERAL TABLE(logMapInfoSplit(log_map ['oca_upload_json'])) AS T(
    station,
    logMap_imei,
    cell_info,
    wifi_infos,
    city,
    reason,
    gps,
    createtime,
    logMap_group,
    datestr,
    guid,
    version_name,
    version_code
  ) ON TRUE
原文地址:https://www.cnblogs.com/huanghanyu/p/13924746.html