相关文章链接
1、Flink的Source源之从集合中获取数据
1 //1.env
2 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
3
4 //2.source
5 // * 1.env.fromElements(可变参数);
6 DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");
7 // * 2.env.fromColletion(各种集合);
8 DataStream<String> ds2 = env.fromCollection(Arrays.asList("hadoop", "spark", "flink"));
9 // * 3.env.generateSequence(开始,结束);
10 DataStream<Long> ds3 = env.generateSequence(1, 10);
11
12 //3.Transformation
13
14 //4.sink
15 ds1.print();
16 ds2.print();
17 ds3.print();
18
19 //5.execute
20 env.execute();
2、Flink的Source源之从集合中获取数据
//创建Flink流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.source
// * 1.env.readTextFile(本地文件/HDFS文件);//压缩文件也可以
DataStream<String> ds1 = env.readTextFile("D:\Project\IDEA\bigdata-study-tutorial\flink-tutorial-java\src\main\data\input\words.txt");
DataStream<String> ds2 = env.readTextFile("data/input/dir");
DataStream<String> ds3 = env.readTextFile("hdfs://node1:8020//wordcount/input/words.txt");
DataStream<String> ds4 = env.readTextFile("data/input/wordcount.txt.gz");
//3.Transformation
//4.sink
ds1.print();
ds2.print();
ds3.print();
ds4.print();
//5.execute
env.execute();
3、Flink之自定义Source源
public static void main(String[] args) throws Exception {
//##### 创建Flink流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// env.setParallelism(4);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 添加自定义source源
DataStreamSource<SensorReading> dataStream = env.addSource(new SensorReadingSource(1, 1000L));
// 打印数据
dataStream.print();
//##### 启动执行环境
env.execute("SourceDemo02_UDF");
}
/**
* 温度传感器的bean类
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class SensorReading {
private String id;
private Long timestamp;
private Double temperature;
}
/**
* 自定义SensorReading源
*/
public static class SensorReadingSource extends RichSourceFunction<SensorReading> {
/**
* 设置标识
*/
private boolean running = true;
private Integer num = 5;
private Long interval = 1000L;
public SensorReadingSource() {
}
/**
* 根据传入的 数量 和 时间间隔 创建自定义源
*
* @param num 要创建的传感器数量
* @param interval 创建的传感器时间间隔
*/
public SensorReadingSource(Integer num, Long interval) {
this.num = num;
this.interval = interval;
}
@Override
public void run(SourceContext<SensorReading> ctx) throws Exception {
// 定义一个随机对象
Random random = new Random();
// 初始化num个传感器
ArrayList<SensorReading> sensorReadings = new ArrayList<>();
for (int i = 1; i <= num; i++) {
String id = "sensor_" + i;
long timestamp = System.currentTimeMillis();
double temperature = 60 + random.nextGaussian() * 10;
sensorReadings.add(new SensorReading(id, timestamp, temperature));
}
// 当running为true时,生成数据
while (running) {
long timestamp = System.currentTimeMillis();
for (SensorReading sensorReading : sensorReadings) {
sensorReading.setTimestamp(timestamp);
sensorReading.setTemperature(sensorReading.getTemperature() + random.nextGaussian());
ctx.collect(sensorReading);
}
Thread.sleep(interval);
}
}
@Override
public void cancel() {
running = false;
}
}