flink--DateSet开发--简单入门

开发流程

1.    获得一个execution environment,
2.    加载/创建初始数据,
3.    指定这些数据的转换,
4.    指定将计算结果放在哪里,
5.    触发程序执行

例子:

object DataSet_WordCount {
  def main(args: Array[String]) {
    //TODO 初始化环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //TODO 加载/创建初始数据
    val text = env.fromElements(
      "Who's there?",
      "I think I hear them. Stand, ho! Who's there?")
    //TODO 指定这些数据的转换
    val split_words = text.flatMap(line => line.toLowerCase().split("\W+"))
    val filter_words = split_words.filter(x=> x.nonEmpty)
    val map_words = filter_words.map(x=> (x,1))
    val groupBy_words = map_words.groupBy(0)
    val sum_words = groupBy_words.sum(1)
    //todo 指定将计算结果放在哪里
//    sum_words.setParallelism(1)//汇总结果
    sum_words.writeAsText(args(0))//"/Users/niutao/Desktop/flink.txt"
    //TODO 触发程序执行
    env.execute("DataSet wordCount")
  }
}

将程序打包,提交到yarn

添加maven打包插件:

<build>
    <sourceDirectory>src/main/java</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.5.1</version>
            <configuration>
                <source>1.7</source>
                <target>1.7</target>
                <!--<encoding>${project.build.sourceEncoding}</encoding>-->
            </configuration>
        </plugin>

        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                    <configuration>
                        <args>
                            <!--<arg>-make:transitive</arg>-->
                            <arg>-dependencyfile</arg>
                            <arg>${project.build.directory}/.scala_dependencies</arg>
                        </args>

                    </configuration>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-surefire-plugin</artifactId>
            <version>2.18.1</version>
            <configuration>
                <useFile>false</useFile>
                <disableXmlReport>true</disableXmlReport>
                <includes>
                    <include>**/*Test.*</include>
                    <include>**/*Suite.*</include>
                </includes>
            </configuration>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <!--
                                    zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF
                                    -->
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>com.nt.DEMO.WordCount</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>

            </executions>
        </plugin>
    </plugins>
</build>
View Code

使用rz命令上传jar包,然后执行程序:

bin/flink run -m yarn-cluster -yn 2 /home/elasticsearch/flinkjar/itcast_learn_flink-1.0-SNAPSHOT.jar com.nt.DEMO.WordCount

在yarn的8088页面可以观察到提交的程序

去/opt/cdh/flink-1.3.2/flinkJAR文件夹下可以找到输出的运行结果

原文地址:https://www.cnblogs.com/niutao/p/10548371.html