Flink实战(107):connector(十六)hdfs 读写(一)读

1. 依赖HDFS

pom.xml 添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>FlinkHdfs</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.11.0</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <log4j.version>2.12.1</log4j.version>
        <hive.version>3.1.2</hive.version>
        <hadoop.version>3.1.3</hadoop.version>
    </properties>
    <dependencies>
        <!-- 运行FLINK必须-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.56</version>
        </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <!--            <scope>provided</scope>-->
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <!--            <scope>provided</scope>-->
    </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- 读HDFS必须-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hadoop-compatibility_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <!-- 写HDFS必须-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-filesystem_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>




    </dependencies>
</project>
View Code

2. 配置 HDFS

hdfs-site.xmlcore-site.xml放入到src/main/resources目录下面

3. 读取HDFS上面文件

package com.atguigu

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object ReadFromHDFS {
  def main(args: Array[String]): Unit = {
    val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = bsEnv.readTextFile("hdfs://hadoop102:9820/test/person.txt")
    stream.print()
    bsEnv.execute()
  }

}

TIP

  1. 请关闭HDFS 权限,不关闭需要把认证copy到resources目录下
 <property>
        <name>dfs.permissions</name>
        <value>false</value>
    </property>

本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/14161930.html

原文地址:https://www.cnblogs.com/qiu-hua/p/14161930.html