2: Windows安装1.9.3 flink && first demo project

windows 我试验:就能支持到flink 1.9的版本,后续的版本报错

1: 下载  flink-1.9.3-bin-scala_2.12.tgz    Index of /dist/flink/flink-1.9.3 (apache.org)

2: 解压后运行 bin/ start-cluster.bat  http://localhost:8081


 3:  下面搞第一个flink 程序


$ mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.9.0 \
    -DgroupId=wiki-edits \
    -DartifactId=wiki-edits \
    -Dversion=0.1 \
    -Dpackage=wikiedits \
BatchJob。java  和StreamingJob

package wikiedits;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;

public class WikipediaAnalysis {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        // is batch job use ExecutionEnvironment

        DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());

        KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
                .keyBy(new KeySelector<WikipediaEditEvent, String>() {
                    public String getKey(WikipediaEditEvent event) {
                        return event.getUser();

        DataStream<Tuple2<String, Long>> result = keyedEdits
                .aggregate(new AggregateFunction<WikipediaEditEvent, Tuple2<String, Long>, Tuple2<String, Long>>() {
                    public Tuple2<String, Long> createAccumulator() {
                        return new Tuple2<>("", 0L);

                    public Tuple2<String, Long> add(WikipediaEditEvent value, Tuple2<String, Long> accumulator) {
                        accumulator.f0 = value.getUser();
                        accumulator.f1 += value.getByteDiff();
                        return accumulator;

                    public Tuple2<String, Long> getResult(Tuple2<String, Long> accumulator) {
                        return accumulator;

                    public Tuple2<String, Long> merge(Tuple2<String, Long> a, Tuple2<String, Long> b) {
                        return new Tuple2<>(a.f0, a.f1 + b.f1);

                .map(new MapFunction<Tuple2<String,Long>, String>() {
                    public String map(Tuple2<String, Long> tuple) {
                        return tuple.toString();
                .addSink(new FlinkKafkaProducer011<>("localhost:9092", "iris", new SimpleStringSchema()));

      //  result.print();




<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">


    <name>Flink Quickstart Job</name>


            <name>Apache Development Snapshot Repository</name>



            <!-- Java Compiler -->

            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
                    <!-- Run shade goal on package phase -->
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">


                <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->

    <!-- This profile helps to make things run out of the box in IntelliJ -->
    <!-- Its adds Flink's core classes to the runtime class path. -->
    <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->




4: 注意上面有如下代码:把最后结果输出到kafka,如果不想安装kafak, 就直接用result.print() 输出来替换下面的代码

                .map(new MapFunction<Tuple2<String,Long>, String>() {
                    public String map(Tuple2<String, Long> tuple) {
                        return tuple.toString();
                .addSink(new FlinkKafkaProducer011<>("localhost:9092", "iris", new SimpleStringSchema()));
5: 安装kafak, 如果上面第四步用result.print() 而不是输出到kafak,就跳过
下载 kafka_2.11-
启动zookeeper服务,运行命令: bin\windows\zookeeper-server-start.bat config\zookeeper.properties

                                    启动kafka服务,运行命令:     bin\windows\kafka-server-start.bat config\server.properties


                  bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic iris
然后,就可以看使用如下命令,读取(消费)topic iris中的数据
                   bin\windows\kafka-console-consumer.bat --bootstrap-server --topic iris --from-beginning
6: 运行job

$ mvn clean package
$ mvn exec:java -Dexec.mainClass=wikiedits.WikipediaAnalysis

7:  然后就可以看到处理完数据打印出来  或 是存到kafak


 8: 可以看job的执行状态    http://localhost:8081/#/job/running
