Flink 读取 Kafka 数据 (极简版)

本文讲述:本地 Flink 1.7.0 (Java SDK) 读取本地 Kafka 数据,不做任何处理直接打印输出到控制台

环境:win10 + WSL

步骤略过

1. 启动 Kafka 并创建 topic

以下命令都在解压后的 Kafka 文件夹内执行

1.1 启动 Kafka

启动 zookeeper

 ./bin/zookeeper-server-start.sh config/zookeeper.properties

启动 kafka 服务

./bin/kafka-server-start.sh config/server.properties

1.2 创建 topic

./bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

1.3 启动生产者和消费者

启动生产者

./bin/kafka-console-producer.sh --topic topic2 --broker-list localhost:9092

启动消费者

./bin/kafka-console-consumer.sh --topic topic2 --from-beginning --bootstrap-server localhost:9092

此时,在生产者窗口中输入任意字符,即可在消费者窗口中看到相应的输出

进入解压后的 flink 文件夹内

./bin/start-cluster.sh

3. 使用 JAVA 中读取 Kafka 内容

代码如下:

package com.xjr7670;


import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.util.Properties;

public class StreamingJob {

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
		properties.setProperty("group.id", "g2");    // 第 1 个参数是固定值 group.id,第 2 个参数是自定义的组 ID
		DeserializationSchema<String> deserializationSchema = new SimpleStringSchema();
		String topic = "topic2";
		DataStream<String> text = env.addSource(new FlinkKafkaConsumer011<String>(topic, deserializationSchema, properties));
		text.print();
		
		env.execute("Flink-Kafka demo");
	}
}

POM 依赖配置略过。以上代码可直接执行,执行后,在 Kafka 生产者窗口中输入消息,即可在代码输出窗口中看到同样的消息输出
如果执行时遇到报错:

java.lang.ClassNotFoundException: org.apache.flink.api.common.serialization.DeserializationSchema

需要添加 flink/lib 下的包到项目内

原文地址:https://www.cnblogs.com/wuzhiblog/p/14251788.html