kafka 业务埋点

序列化使用json string

埋点数据类:

    @Id
    @Column(name = "user_foot_id")
    private int user_foot_id;

    @Column(name = "user_id")
    private int user_id;

    @Column(name = "target_id")
    private Long target_id;

    @Column(name = "section")
    private String section;

    @Column(name = "flag")
    private String flag;

    @Temporal(TemporalType.TIMESTAMP)
    @Column(name = "create_date")
    private Date createDate;


生产者:

import net.sf.json.JSONObject;

    public void sendUserFoot(UserFooter userFooter){
        JSONObject json = JSONObject.fromObject(userFooter);
        String strJson = json.toString();
        kafkaTemplate.send(footTopic,strJson);
    }

消费者:

    @KafkaListener(topics = {"${kafka.footTopic}"})
    public void consumerUserFoot(String message){

        try{
            JSONObject jsonObject=JSONObject.fromObject(message);
            UserFooter userFooter=(UserFooter)JSONObject.toBean(jsonObject, UserFooter.class);

            UserFootDao userFootDao = (UserFootDao) SpringUtil.getBean(UserFootDao.class);
            userFootDao.save(userFooter);
        } catch (Exception e) {
            logger.error(e);
        }

application.properties:

kafka.footTopic=userfootDev


原文地址:https://www.cnblogs.com/silyvin/p/9106582.html