基于docker构建flink大数据处理平台

https://www.cnblogs.com/1ssqq1lxr/p/10417005.html

由于公司业务需求,需要搭建一套实时处理数据平台,基于多方面调研选择了Flink.

  • 初始化Swarm环境(也可以选择k8s)

  部署zookeeper集群 基于docker-compose ,使用 docker stack 部署在容器中,由于zookeeper存在数据持久化存储,这块后面可以考虑共享存储方案.

services:
  zoo1:
    image: zookeeper
    restart: always
    hostname: zoo1
    ports:
      - 2181:2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888

  zoo2:
    image: zookeeper
    restart: always
    hostname: zoo2
    ports:
      - 2182:2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=0.0.0.0:2888:3888 server.3=zoo3:2888:3888

  zoo3:
    image: zookeeper
    restart: always
    hostname: zoo3
    ports:
      - 2183:2181
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=0.0.0.0:2888:3888
  • 部署flink镜像
version: "3"

services:
  jobmanager:
    image: flink:1.7.2-scala_2.12-alpine
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

  taskmanager:
    image: flink:1.7.2-scala_2.12-alpine
    command: taskmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

此时只是一个jobmanager 存在单机问题,可以考虑将容器内部的 fluentd.conf 挂载出来,配置zookeeper HA。

  • 对于扩充 TaskManager直接 docker service scala  TaskManager-NAME=3即可

Flink案例demo,采用读取kafka中数据实时处理,然后将结果存储到influxDb中展示

// 实时流main
public class SportRealTimeJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        KafkaConnector connector = new KafkaConnector("192.168.30.60:9092","big-data");
        env
                .addSource(connector.getConsumerConnector(Lists.newArrayList("test0")))
                .<MessageBody>flatMap((sentence,out)->{
                    MessageBody body=JSON.parseObject(sentence, MessageBody.class);
                    out.collect(body);
                })
                .shuffle()
                .keyBy(messageBody -> messageBody.getPhone()+messageBody.getUserId())
                .timeWindow(Time.seconds(10))
                .reduce((t0, t1) -> new MessageBody(t0.getUserId(),t0.getPhone(),t0.getValue()+t1.getValue()))
                .addSink(new InfluxWriter())
                .setParallelism(1);
        env.execute("Window WordCount");
    }


}
// 数据处理实体类demo
@Data
@Measurement(name = "sport")
public class MessageBody {

    @Column(name = "userId",tag = true)
    private String userId;

    @Column(name = "phone",tag = true)
    private String phone;

    @Column(name = "value")
    private int value;


    public MessageBody() {
    }

    public MessageBody(String userId, String phone, int value) {
        this.userId = userId;
        this.phone = phone;
        this.value = value;
    }
}
// 自定义数据输出源
public class InfluxWriter extends RichSinkFunction<MessageBody> {

    private InfluxTemplate template;

    @Override
    public void open(Configuration parameters) throws Exception {
        InfluxBean bean= InfluxBean.builder().dbName("game")
                .url("http://localhost:8086")
                .username("admin")
                .password("admin")
                .build();
        template = new SimpleInfluxTemplate(bean);
    }

    @Override
    public void close() throws Exception {
        template.close();
    }

    @Override
    public void invoke(MessageBody value, Context context) throws Exception {
        template.write(Point.measurement("sport")
                .addField("value",value.getValue())
                .tag("userId",String.valueOf(value.getUserId()))
                .tag("phone",value.getPhone())
                .time(context.currentProcessingTime(), TimeUnit.MILLISECONDS).build());
    }
}
// influxDb操作类
public class SimpleInfluxTemplate implements InfluxTemplate {

    private final InfluxDB db;

    public SimpleInfluxTemplate(InfluxBean bean){
        this.db= InfluxDBFactory.connect(bean.getUrl(), bean.getUsername(), bean.getPassword());
        db.setDatabase(bean.getDbName());
        db.enableBatch(BatchOptions.DEFAULTS.exceptionHandler(
                (failedPoints, throwable) -> {
                    /* custom error handling here */ })
                .consistency(InfluxDB.ConsistencyLevel.ALL)
                .bufferLimit(100)
        );
    }

    @Override
    public void write(Point point) {
        db.write(point);
    }

    @Override
    public void bentchWrite(BatchPoints points) {
        db.write(points);
    }

    @Override
    public <T> List<T> query(Query query, Class<T> tClass) {
        QueryResult result=db.query(query);
        InfluxDBResultMapper resultMapper = new InfluxDBResultMapper(); // thread-safe - can be reused
        return resultMapper.toPOJO(result, tClass);
    }

    @Override
    public void close() {
        db.close();
    }



public interface InfluxTemplate {

    void write(Point point);

    void bentchWrite(BatchPoints points);

    <T> List<T> query(Query query, Class<T> tClass);

    void close();
}


@ToString
@Getter
@Setter
@Builder
public class InfluxBean {

    private String url;

    private String username;

    private String password;

    private String dbName;



}
原文地址:https://www.cnblogs.com/lenmom/p/11032666.html