kafka序列化对象

kafka 有两种序列化方式,以及一个序列化接口

// 序列化方式
org.apache.kafka.common.serialization.ByteArraySerializer
org.apache.kafka.common.serialization.ByteArrayDeserializer
org.apache.kafka.common.serialization.StringDeserializer
org.apache.kafka.common.serialization.StringSerializer
//序列化接口
org.apache.kafka.common.serialization.Serializer<T>
org.apache.kafka.common.serialization.Deserializer<T>

因此,如果使用原生的序列化方式,就需要把传输内容拼接成字符串,或者转成字符数组的方式。好在kafka提供了序列化和反序列化的接口。可以自定义对象的序列化方式,达到传输对象的目的。

首先看看kafka 关于producer 和consumer的配置方式。

//producer 配置
public KafkaProducerHelper(String kafkaHosts) {
            Properties props = new Properties();
            props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                    kafkaHosts); // localhost:9092
            props.setProperty(ProducerConfig.ACKS_CONFIG, "1");
            props.setProperty(ProducerConfig.RETRIES_CONFIG, "1");
            props.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "120000");
            props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
            props.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "8388608");
            //(key,value)中关于value的序列化方式,这里将通过StringSerializer来序列化
            props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.StringSerializer");
 
             //(key,value)中关于key的序列化方式,这里将通过KafkaTestSeralizer来序列化
            props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    "com.util.KafkaTestSeralizer");
            kafkaProducer = new KafkaProducer<String, Object>(props);
    }
 
//Consumer 配置
private static KafkaConsumer<String, Object> kc= null;
    public KafkaConsumer<String, Object> getConsumer(String kafkaHost, String groupId) {
        if(kc == null) {
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
            //这个groupId会被用来作为offset等内容的记录标志
            props.put("group.id", groupId);
            //autoCommit可以自动记录offset,如果为false,每次启动都将从offset=0开始消费
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("receive.buffer.bytes", 10485760);
            props.put("max.partition.fetch.bytes", 8*1024*1024);
            //key的反序列化方式
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            //value的反序列化方式
            props.put("value.deserializer", "com.util.KafkaTestDeseralizer");
            kc = new KafkaConsumer<String, Object>(props);
        }
        return kc;
    }

上面的配置指定了key和value的序列化和反序列化方式。以及一些其他的配置。
这一步非常重要,特别是指定K/V的序列化方式。后面所有的工作都是为了去实现这个序列化和反序列化的功能。

本文采用java.io原生的序列化方式,所以需要对待传输的类添加可序列化接口。
接下来,把需要传输的类实现可序列化接口。

import java.io.Serializable;
 
public class KafkaTest implements Serializable{
 
    /**
     *
     */
    private static final long serialVersionUID = 1L;
 
    private String id;
    private String name;
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
 
}

然后添加一种序列化和反序列化工具。

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
 
public class SerializeUtil {
    public static byte[] serialize(Object object) {
        ObjectOutputStream oos = null;
        ByteArrayOutputStream baos = null;
        try {
            // 序列化
            baos = new ByteArrayOutputStream();
            oos = new ObjectOutputStream(baos);
            oos.writeObject(object);
            byte[] bytes = baos.toByteArray();
            return bytes;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
 
    @SuppressWarnings("unchecked")
    public static <T> Object deserialize(byte[] bytes,Class<T> className) {
        ByteArrayInputStream bais = null;
        T tmpObject = null;
        try {
            // 反序列化
            bais = new ByteArrayInputStream(bytes);
            ObjectInputStream ois = new ObjectInputStream(bais);
            tmpObject = (T)ois.readObject();
            return tmpObject;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

然后,实现consumer和producer配置里的序列化方式。

//KafkaTestSeralizer
import java.util.Map;
 
import org.apache.kafka.common.serialization.Serializer;
 
import com.chinaventure.webspider.model.KafkaTest;
 
public class KafkaTestSeralizer implements Serializer<KafkaTest>{
 
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // TODO Auto-generated method stub
 
    }
    @Override
    public byte[] serialize(String topic, KafkaTest data) {
         return SerializeUtil.serialize(data);
    }
    @Override
    public void close() {
        // TODO Auto-generated method stub
 
    }
}
 
//KafkaTestDeseralizer 
import java.util.Map;
 
import org.apache.kafka.common.serialization.Deserializer;
 
import com.chinaventure.webspider.model.KafkaTest;
 
public class KafkaTestDeseralizer implements Deserializer<KafkaTest>{
 
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // TODO Auto-generated method stub
 
    }
    @Override
    public KafkaTest deserialize(String topic, byte[] bytes) {
         return (KafkaTest)SerializeUtil.deserialize(bytes,KafkaTest.class);
    }
    @Override
    public void close() {
        // TODO Auto-generated method stub
 
    }
}

最后,可以试着使用了

//producer
public static void main(String[] args) throws InterruptedException, ExecutionException {
        String topic = "test";
 
        KafkaTest tmp = new KafkaTest();
        tmp.setId("123");
        tmp.setName("王者荣耀");
        int count = 1;
 
        while (true) {
 
            KafkaProducerHelper.getInstance("192.168.0.25:9092").send(topic, "123456", tmp);
            Thread.sleep(1000);
            System.out.println("producer queue: " + count++);
        }
    }
 
 
//consumer
 public static void main(String[] args) throws IOException, InterruptedException {
 
        String topic = "test";
        KafkaConsumer<String, Object> consumer = KafkaConsumerHelper.getInstance().getConsumer("192.168.0.25:9092", topic);
 
        consumer.subscribe(Arrays.asList(topic));
        int messagecounter = 0;
 
        while(true) {
            ConsumerRecords<String, Object> records = consumer.poll(100);
            for(ConsumerRecord<String, Object> record : records) {
                System.out.println("receive object, key:"+record.key());
                KafkaTest tmp = (KafkaTest) record.value();
                System.out.println("the game name: "+ tmp.getName());
                messagecounter++;
            }
            System.out.println("received messages:"+messagecounter);
            Thread.sleep(3000);
        }
 
    }

这就是kafka序列化传输对象的全过程,简单说就是把待传输对象用kafka提供的序列化接口封装一下。具体的序列化方式可以使用很多方式,比如一些第三方的工具都可以。然后把封装好的类配置到producer和consumer里面。这样就能实现对象的传输。


链接:https://www.jianshu.com/p/88a114d9268c

原文地址:https://www.cnblogs.com/liuys635/p/13413219.html