项目实战 从 0 到 1 学习之Flink (23)Flink 读取hive并写入hive

1,读取实现了,也是找的资料,核心就是实现了

HCatInputFormat
HCatInputFormatBase

上面这两个类,底层也是 继承实现了 RichInputFormat:

public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopInputSplit> implements ResultTypeQueryabl

百度下载这个jar,然后把类找出来


 

依赖:(大概是这些)

<!--flink_hive依赖-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-hadoop-fs</artifactId>
    <version>1.6.2</version>
</dependency>
 
<dependency>
    <groupId>com.jolbox</groupId>
    <artifactId>bonecp</artifactId>
    <version>0.8.0.RELEASE</version>
</dependency>
 
<dependency>
    <groupId>com.twitter</groupId>
    <artifactId>parquet-hive-bundle</artifactId>
    <version>1.6.0</version>
</dependency>
 
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>2.1.0</version>
</dependency>
 
 
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-metastore</artifactId>
    <version>2.1.0</version>
</dependency>
 
 
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-cli</artifactId>
    <version>2.1.0</version>
</dependency>
 
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-common</artifactId>
    <version>2.1.0</version>
</dependency>
 
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-service</artifactId>
    <version>2.1.0</version>
</dependency>
 
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-shims</artifactId>
    <version>2.1.0</version>
</dependency>
 
<dependency>
    <groupId>org.apache.hive.hcatalog</groupId>
    <artifactId>hive-hcatalog-core</artifactId>
    <version>2.1.0</version>
</dependency>
 
<dependency>
    <groupId>org.apache.thrift</groupId>
    <artifactId>libfb303</artifactId>
    <version>0.9.3</version>
    <type>pom</type>
</dependency>
 
 
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-hadoop-compatibility_2.11</artifactId>
    <version>1.6.2</version>
 
</dependency>
 
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-shaded-hadoop2</artifactId>
    <version>1.6.2</version>
</dependency>

读取hive数据:

package com.coder.flink.core.FlinkHive
 
 
import org.apache.flink.api.scala.ExecutionEnvironment
 
import org.apache.hadoop.conf.Configuration
import org.apache.flink.api.scala._
 
 
//读取hive的数据
object ReadHive {
  def main(args: Array[String]): Unit = {
 
      val conf = new Configuration()
      conf.set("hive.metastore.local", "false")
 
      conf.set("hive.metastore.uris", "thrift://172.10.4.141:9083")
       //如果是高可用 就需要是nameserver
//      conf.set("hive.metastore.uris", "thrift://172.10.4.142:9083")
 
      val env = ExecutionEnvironment.getExecutionEnvironment
 
      //todo 返回类型
      val dataset: DataSet[TamAlert] = env.createInput(new HCatInputFormat[TamAlert]("aijiami", "test", conf))
 
      dataset.first(10).print()
//      env.execute("flink hive test")
 
 
  }
 
}

好消息是 Flink 1.9支持了Hive读写接口不过我们可以用Hive Jdbc的方式去读写hive,可能就是性能会比较慢:

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-jdbc</artifactId>
    <version>2.1.0</version>
</dependency>
package com.coder.flink.core.FlinkHive;
 
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
 
import java.sql.*;
 
public class FlinkReadHive {
    public static void main(String[] args) throws ClassNotFoundException, SQLException {
 
        Class.forName("org.apache.hive.jdbc.HiveDriver");
        Connection con = DriverManager.getConnection("jdbc:hive2://172.10.4.143:10000/aijiami","hive","hive");
        Statement st = con.createStatement();
        ResultSet rs = st.executeQuery("SELECT * from ods_scenes_detail_new limit 10");
        while (rs.next()){
            System.out.println(rs.getString(1) + "," + rs.getString(2));
        }
        rs.close();
        st.close();
        con.close();
 
 
    }
}
public class HiveApp {
     
    private static String driver = "org.apache.hive.jdbc.HiveDriver";
    private static String url = "jdbc:hive2://Master:10000/default";
    private static String user = "root"; //一般情况下可以使用匿名的方式,在这里使用了root是因为整个Hive的所有安装等操作都是root
    private static String password = "";
 
    public static void main(String[] args) {
        ResultSet res = null;
         
        try {
            /**
             * 第一步:把JDBC驱动通过反射的方式加载进来
             */
            Class.forName(driver);
             
            /**
             * 第二步:通过JDBC建立和Hive的连接器,默认端口是10000,默认用户名和密码都为空
             */
            Connection conn = DriverManager.getConnection(url, user, password); 
             
            /**
             * 第三步:创建Statement句柄,基于该句柄进行SQL的各种操作;
             */
            Statement stmt = conn.createStatement();
             
            /**
             * 接下来就是SQL的各种操作;
             * 第4.1步骤:建表Table,如果已经存在的话就要首先删除;
             */
            String tableName = "testHiveDriverTable";
            stmt.execute("drop table if exists " + tableName );
            
             
            stmt.execute("create table " + tableName + " (id int, name string)" + "ROW FORMAT DELIMITED FIELDS TERMINATED BY '	' LINES TERMINATED BY '
'");
            /**
             *  第4.2步骤:查询建立的Table;
             */
            String sql = "show tables '" + tableName + "'";
            System.out.println("Running: " + sql);
            res = stmt.executeQuery(sql);
            if (res.next()) {
              System.out.println(res.getString(1));
            }
            /**
             *  第4.3步骤:查询建立的Table的schema;
             */
            sql = "describe " + tableName;
            System.out.println("Running: " + sql);
            res = stmt.executeQuery(sql);
            while (res.next()) {
              System.out.println(res.getString(1) + "	" + res.getString(2));
            }
          
            /**
             *  第4.4步骤:加载数据进入Hive中的Table;
             */
            String filepath = "/root/Documents/data/sql/testHiveDriver.txt";
            sql = "load data local inpath '" + filepath + "' into table " + tableName;
            System.out.println("Running: " + sql);
            stmt.execute(sql);
          
            /**
             *  第4.5步骤:查询进入Hive中的Table的数据;
             */
            sql = "select * from " + tableName;
            System.out.println("Running: " + sql);
            res = stmt.executeQuery(sql);
            while (res.next()) {
              System.out.println(String.valueOf(res.getInt(1)) + "	" + res.getString(2));
            }
          
            /**
             *  第4.6步骤:Hive中的对Table进行统计操作;
             */
            sql = "select count(1) from " + tableName;   //在执行select count(*) 时候会生成mapreduce 操作  ,那么需要启动资源管理器 yarn  : start-yarn.sh 
            System.out.println("Running: " + sql);
            res = stmt.executeQuery(sql);
           
            while (res.next()) {
              System.out.println("Total lines :" + res.getString(1));
            }    
             
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }   
         
         
 
    }
 
}

 

写入HDFS的简单案例:

package com.coder.flink.core.test_demo
 
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
import org.apache.flink.core.fs.FileSystem.WriteMode
 
object WriteToHDFS {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    //2.定义数据 stu(age,name,height)
    val stu: DataSet[(Int, String, String)] = env.fromElements(
      (19, "zhangsan","aaaa"),
      (1449, "zhangsan","aaaa"),
      (33, "zhangsan","aaaa"),
      (22, "zhangsan","aaaa")
    )
 
    //todo 输出到本地
    stu.setParallelism(1).writeAsText("file:///C:/Users/Administrator/Desktop/Flink代码/测试数据/test001.txt",
      WriteMode.OVERWRITE)
    env.execute()
 
 
    //todo 写入到hdfs,文本文档,路径不存在则自动创建路径。
    stu.setParallelism(1).writeAsText("hdfs:///output/flink/datasink/test001.txt",
      WriteMode.OVERWRITE)
    env.execute()
 
    //todo 写入到hdfs,CSV文档
    //3.1读取csv文件
    val inPath = "hdfs:///input/flink/sales.csv"
    case class Sales(transactionId: String, customerId: Int, itemId: Int, amountPaid: Double)
    val ds2 = env.readCsvFile[Sales](
      filePath = inPath,
      lineDelimiter = "
",
      fieldDelimiter = ",",
      lenient = false,
      ignoreFirstLine = true,
      includedFields = Array(0, 1, 2, 3),
      pojoFields = Array("transactionId", "customerId", "itemId", "amountPaid")
    )
    //3.2将CSV文档写入到hdfs
    val outPath = "hdfs:///output/flink/datasink/sales.csv"
    ds2.setParallelism(1).writeAsCsv(filePath = outPath, rowDelimiter = "
",fieldDelimiter = "|", WriteMode.OVERWRITE)
 
    env.execute()
  }
}
作者:大码王

-------------------------------------------

个性签名:独学而无友,则孤陋而寡闻。做一个灵魂有趣的人!

如果觉得这篇文章对你有小小的帮助的话,记得在右下角点个“推荐”哦,博主在此感谢!

万水千山总是情,打赏一分行不行,所以如果你心情还比较高兴,也是可以扫码打赏博主,哈哈哈(っ•?ω•?)っ???!

原文地址:https://www.cnblogs.com/huanghanyu/p/13632961.html