flink 安装及wordcount

1、下载

http://mirror.bit.edu.cn/apache/flink/

2、安装

确保已经安装java8以上

解压flink
tar zxvf flink-1.8.0-bin-scala_2.11.tgz

启动本地模式
$ ./bin/start-cluster.sh  # Start Flink
[hadoop@bigdata-senior01 flink-1.8.0]$ ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host bigdata-senior01.home.com.
Starting taskexecutor daemon on host bigdata-senior01.home.com.
[hadoop@bigdata-senior01 flink-1.8.0]$ jps
1995 StandaloneSessionClusterEntrypoint
2443 TaskManagerRunner
2526 Jps

3、访问flink

http://localhost:8081

 4、第一个程序wordcount,从一个socket流中读出字符串,计算10秒内的词频

4.1 引入依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.8.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.8.0</version>
            <scope>provided</scope>
        </dependency>

    </dependencies>

4.2 代码

public class SocketWindowWordCount {

    public static void main(String args[]) throws Exception {

        // the host and the port to connect to
        final String hostname;
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            hostname = params.has("hostname") ? params.get("hostname") : "localhost";
            port = params.getInt("port");
        } catch (Exception e) {
            e.printStackTrace();
            System.err.println(e.getMessage());
            System.err.println("No port specified. Please run 'SocketWindowWordCount " +
                    "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
                    "and port is the address of the text server");
            System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
                    "type the input text into the command line");
            return;
        }

        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream(hostname, port, "
");

        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                        for (String word : value.split("\s")) {
                            out.collect(new WordWithCount(word,1L));
                        }
                    }
                })
                .keyBy("word")
                .timeWindow(Time.seconds(10))
                .reduce(new ReduceFunction<WordWithCount>() {
                    @Override
                    public WordWithCount reduce(WordWithCount value1, WordWithCount value2) throws Exception {
                        return new WordWithCount(value1.word,value1.count+value2.count);
                    }
                });

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }

    /**
     * Data type for words with count.
     */
    public static class WordWithCount {
        public String word;
        public long count;

        public WordWithCount() {
        }

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

4.4 编译成jar包上传

先用nc启动侦听并接受连接

nc -lk 9000

启动SocketWindowWordCount
[hadoop@bigdata-senior01 bin]$ ./flink run /home/hadoop/SocketWindowWordCount.jar --port 9000


查看输出
[root@bigdata-senior01 log]# tail -f flink-hadoop-taskexecutor-0-bigdata-senior01.home.com.out
在nc端输入字符串,在日志监控端10秒为一个周期就可以看到输出合计。


原文地址:https://www.cnblogs.com/asker009/p/10926615.html