flink之kafka生产和消费实战-将生产数据存放到mongodb中

传统要构建一个kafka的生产者和消费者,还是比较费劲的,但是在引入flink插件后,就会变的非常容易;

我的场景:监听一个topic, 然后消费者将该topic的消息存放到数据库中,展示在前端,然后在测试需要的时候在前端修改消息,然后将消息重新发送出去;因此在生产者和消费者里面加了一个字段test, 来表示是从自己的服务这里发出去的消息,因此不需要消费并入库;

在测试生产者和消费者的时候,可以先在自己本地起一个kafka,然后本地生产,服务消费,看代码是否ok;  或者在服务生产,本地消费,看代码是否ok

1. 后端是一个springboot工程,首先需要在pom文件中引入依赖


<properties>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>

<
dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>1.7.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.7.0</version> </dependency>

2. 话不多说,直接开始先写生产者

public void sendKafka(String topic, String server, String message) throws Exception {
        log.info("开始生产");
        JSONObject obj = JSONObject.parseObject(message);
        obj.put("test","test");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", server);
        DataStreamSource<String> text = env.addSource(new MyNoParalleSource(obj.toString())).setParallelism(1);
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer(topic, new SimpleStringSchema(), properties);
        text.addSink(producer);
        env.execute("send kafka ok");
    }

可以看到里面用到了MyNoParalleSource类,其作用是构建一个并行度为1的数据流,来生产数据

public class MyNoParalleSource implements SourceFunction<String> {

    String message;

    public MyNoParalleSource(){

    }

    public MyNoParalleSource(String message) {
        this.message = message;
    }

    @Override
    public void run(SourceContext<String> sourceContext) throws Exception {
            sourceContext.collect(this.message);
    }

    @Override
    public void cancel() {

    }
}

此时生产者就写完了,是不是很优秀,超级简单;

3. 消费者(由于我的目的是将生产者生产的东西在消费者端存入mongdb数据库中,因此会比生产者稍微复杂一点)

public  void consumeKafka(String topic, String server) throws Exception {
        log.info("开始消费");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", server);

        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);
        //从最早开始消费
        consumer.setStartFromLatest();

        DataStream<String> stream = env.addSource(consumer);
        DataStream<Tuple4<String, String, String, String>> sourceStreamTra = stream.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                Boolean flag = true;
                JSONObject obj = JSONObject.parseObject(value);
                if(obj.containsKey("test")){
                    flag = false;
                }
                return StringUtils.isNotBlank(value) && flag;
            }
        }).map(new MapFunction<String, Tuple4<String, String, String, String>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple4<String, String, String, String> map(String value)
                    throws Exception {
                JSONObject obj = JSONObject.parseObject(value);
                String dataBase = null;
                String table = null;
                if(obj.containsKey("database")){
                    dataBase = obj.getString("database");
                    table = obj.getString("table");
                }
                
                return new Tuple4<String, String, String, String>(server ,topic, dataBase+"->"+table, obj.toString());
            }
        });
        sourceStreamTra.addSink(new MongoSink());
        env.execute();
    }
public class MongoSink extends RichSinkFunction<Tuple4<String, String, String, String>> {
    private static final long serialVersionUID = 1L;
    MongoClient mongoClient = null;
//    MongoCollection mongoCollection = null;

    @Override
    public void invoke(Tuple4<String, String, String, String> value) throws Exception {
        KafkaRecord kafkaRecord = new KafkaRecord("", value.f0 , value.f1, value.f2, value.f3, new Date(new Timestamp(System.currentTimeMillis()).getTime()));

        if(mongoClient != null){
            mongoClient = MongoDBUtil.getConnect();
            MongoDatabase db = mongoClient.getDatabase("databBase"); // 是自己的数据库
            MongoCollection mongoCollection = db.getCollection("kafkaRecord");
            mongoCollection.insertOne(new Document(CommonMethod.objectToMap(kafkaRecord)));
        }

    }
    @Override
    public void open(Configuration parms) throws Exception {
        super.open(parms);
        mongoClient = MongoDBUtil.getConnect();
    }

    @Override
    public void close() throws Exception {
        if (mongoClient != null) {
            mongoClient.close();
        }
    }
}
import lombok.Data;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.mapping.Field;

import java.io.Serializable;
import java.util.Date;

@Data
@Document(collection = "kafkaRecord")
public class KafkaRecord implements Serializable {
    @Field("_id")
    String id;
    // 具体信息
    String msg;
    //topic
    String topic;

    String server;

    String source;
    //操作时间
    Date time;

    public KafkaRecord(){

    }

    public KafkaRecord(String id, String server, String topic, String source, String msg, Date time){
        this.id = id;
        this.server = server;
        this.msg = msg;
        this.topic = topic;
        this.source = source;
        this.time = time;
    }
}

此时消费者也完事了;

启动后端服务,生产者发送一条消息,消费者则拿到该消息存到数据库中; 

原文地址:https://www.cnblogs.com/leavescy/p/14310404.html