java streaming转换算子window,countByWindow

spark streaming 代码

package streaming;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

/**
 * # _*_ coding:utf-8 _*_
 * # Author:xiaoshubiao
 * # Time : 2020/5/15 9:59
 **/
public class sparkstreaming_test {
    public static void main(String[] args) throws Exception{
        SparkConf conf = new SparkConf().setAppName("spark streaming test").setMaster("local[*]");
        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(2));
        JavaReceiverInputDStream<String> line = ssc.socketTextStream("localhost", 9999);
        // 参数一:窗口的长度;参数二:窗口滑动的间隔
        line.window(Durations.seconds(8), Durations.seconds(4)).print();
        line.countByWindow(Durations.seconds(8),Durations.seconds(4)).print();
     

        ssc.start();
        ssc.awaitTermination();

    }
}

socket的代码

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;

public class socket_test {
    static ServerSocket serverSocket = null;
    static PrintWriter pw = null;

    public static void main(String[] args) {
        try {
            serverSocket = new ServerSocket(9999);
            System.out.println("服务启动,等待连接");
            Socket socket = serverSocket.accept();
            System.out.println("连接成功,来自:" + socket.getRemoteSocketAddress());
            pw = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()));
            int j = 0;
            while (j < 100) {
                j++;
                String str = "spark streaming test " + j;
                pw.println(str);
                pw.flush();
                System.out.println(str);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                pw.close();
                serverSocket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

先执行socket代码,此时在等待状态,然后执行spark streaming代码。

打印结果如下:

0~4s

spark streaming test 1
spark streaming test 2
spark streaming test 3

4~8s

spark streaming test 1
spark streaming test 2
spark streaming test 3
spark streaming test 4
spark streaming test 5
spark streaming test 6
spark streaming test 7

8~12s

spark streaming test 4
spark streaming test 5
spark streaming test 6
spark streaming test 7
spark streaming test 8
spark streaming test 9
spark streaming test 10
spark streaming test 11

12~16s

spark streaming test 8
spark streaming test 9
spark streaming test 10
spark streaming test 11
spark streaming test 12
spark streaming test 13
spark streaming test 14
spark streaming test 15

打印的长度对应的就是window的第一个参数(一秒一条数据刚好是8),两次打印的差值刚好对应的第二个参数(4秒间隔,刚好4条数据)

原文地址:https://www.cnblogs.com/7749ha/p/12893506.html