Flink 1.11 集成 Hive 2.36 并写入数据到HIVE


注意 1. Flink使用1.11.0版本、HIVE使用2.3.6版本、Hadoop使用2.10.0版本

注意 2. 将hive-site.xml文件放在maven项目的resource目录下。

注意 3. 不编写脚本的话要执行 export HADOOP_CLASSPATH=`hadoop classpath` 语句




第一步:根据官网填入一下pom依赖


<!-- Flink Dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.11</artifactId>
<version>1.11.0</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.11.0</version>
<scope>provided</scope>
</dependency>

<!-- Hive Dependency -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.3.6</version>
<scope>provided</scope>
</dependency>


第二步:编写代码如下

import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

import java.sql.Timestamp;

public class StreamMain {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.enableCheckpointing(10000);
bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
DataStream<UserInfo> dataStream = bsEnv.addSource(new MySource())
.assignTimestampsAndWatermarks(
new AssignerWithPunctuatedWatermarks<UserInfo>() {
long water = 0l;

@Override
public Watermark checkAndGetNextWatermark(
UserInfo lastElement,
long extractedTimestamp) {
return new Watermark(water);
}

@Override
public long extractTimestamp(
UserInfo element,
long recordTimestamp) {
water = element.getTs().getTime();
return water;
}
});


//构造hive catalog
String name = "myhive"; // Catalog名称,定义一个唯一的名称表示
String defaultDatabase = "default"; // 默认数据库名称
String hiveConfDir = "/export/servers/nc/hive/conf/"; // hive-site.xml路径
String version = "2.3.6"; // Hive版本号


HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tEnv.registerCatalog("myhive", hive);
tEnv.useCatalog("myhive");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.useDatabase("default");

tEnv.createTemporaryView("users", dataStream);

// 如果hive中已经存在了相应的表,则这段代码省略
// String hiveSql = "CREATE external TABLE fs_table ( " +
// " user_id STRING, " +
// " order_amount DOUBLE" +
// ") partitioned by (dt string,h string,m string) " +
// "stored as ORC " +
// "TBLPROPERTIES ( " +
// " 'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00', " +
// " 'sink.partition-commit.delay'='0s', " +
// " 'sink.partition-commit.trigger'='partition-time', " +
// " 'sink.partition-commit.policy.kind'='metastore'" +
// ")";
// tEnv.executeSql(hiveSql);

String insertSql = "insert into fs_table SELECT userId, amount, " +
" DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users";
tEnv.executeSql(insertSql);
}


public static class MySource implements SourceFunction<UserInfo> {

String userids[] = {
"4760858d-2bec-483c-a535-291de04b2247", "67088699-d4f4-43f2-913c-481bff8a2dc5",
"72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
"aabbaa50-72f4-495c-b3a1-70383ee9d6a4", "3218bbb9-5874-4d37-a82d-3e35e52d1702",
"3ebfb9602ac07779||3ebfe9612a007979", "aec20d52-c2eb-4436-b121-c29ad4097f6c",
"e7e896cd939685d7||e7e8e6c1930689d7", "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
};

@Override
public void run(SourceFunction.SourceContext<UserInfo> sourceContext) throws Exception {

while (true) {
String userid = userids[(int) (Math.random() * (userids.length - 1))];
UserInfo userInfo = new UserInfo();
userInfo.setUserId(userid);
userInfo.setAmount(Math.random() * 100);
userInfo.setTs(new Timestamp(System.currentTimeMillis()));
sourceContext.collect(userInfo);
Thread.sleep(100);
}
}

@Override
public void cancel() {

}
}

public static class UserInfo implements java.io.Serializable {
private String userId;
private Double amount;
private Timestamp ts;

public String getUserId() {
return userId;
}

public void setUserId(String userId) {
this.userId = userId;
}

public Double getAmount() {
return amount;
}

public void setAmount(Double amount) {
this.amount = amount;
}

public Timestamp getTs() {
return ts;
}

public void setTs(Timestamp ts) {
this.ts = ts;
}
}
}


第三步:打包提交到服务器
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.tal.flink.hive.StreamMain</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>


第四步 编写任务提交脚本
cd /root/maoxiangyi
export HADOOP_CLASSPATH=`hadoop classpath`
/export/servers/nc/flink/bin/flink run -c com.tal.flink.hive.StreamMain /root/maoxiangyi/flink_integration_hive-1.0-SNAPSHOT.jar


第五步 遇到第一个错误

java.lang.NoClassDefFoundError: org/apache/flink/table/catalog/hive/HiveCatalog
         at com.tal.flink.hive.StreamMain.main(StreamMain.java:50)
         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.lang.reflect.Method.invoke(Method.java:498)
         at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
         at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
         at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
         at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
         at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
         at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
         at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
         at java.security.AccessController.doPrivileged(Native Method)
         at javax.security.auth.Subject.doAs(Subject.java:422)
         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
         at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
         at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.catalog.hive.HiveCatalog
         at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
         at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
         at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
         at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
         ... 17 more


第六步 下载驱动包到 Flink的lib目录 解决第一个错误

cd /export/servers/nc/flink/lib
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.6_2.11/1.11.0/flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar


第七步 再次提交作业-任务提交成功

image


第八步 查看HIVE中的数据

image





原文地址:https://www.cnblogs.com/maoxiangyi/p/13509782.html