【原创】Kafka接受发送消息对象Object基础版

首先感谢 kafka 中国社区 王扬庭例子的帮助和指导~~~~~(kafka_2.9.2-0.8.1.1)

kafka常用的发送消息的方法如下:

Properties props = new Properties();
props.put("zookeeper.connect", "slaves2:2181,slaves3:2181,slaves4:2181");  
props.put("serializer.class", "kafka.serializer.StringEncoder");       
props.put("metadata.broker.list","slaves2:9092,slaves3:9092,slaves4:9092");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);  
String str = "test";
producer.send(new KeyedMessage<String, String>("exhibition",str));

但是如果用kafka发送对象的话就需要重写serializer.class中byte[] toBytes方法:

Producer示例:其中MessageBean是自己定义的实体类:

Properties props = new Properties();
props.put("zookeeper.connect", "slaves2:2181,slaves3:2181,slaves4:2181");  
props.put("serializer.class", "com.performanceTest.BeanSerializer"); // 需要修改
props.put("metadata.broker.list","slaves2:9092,slaves3:9092,slaves4:9092");
ProducerConfig config = new ProducerConfig(props);
Producer<MessageBean, MessageBean> producer = new Producer<MessageBean, MessageBean>(config);  
MessageBean str =  new MessageBean();
	str.setFromJID("2"+i);
	str.setToJID("3"+i);
	str.setMessage("京"+i);
	str.setSendtime(System.currentTimeMillis());
KeyedMessage<MessageBean, MessageBean> data = new KeyedMessage<MessageBean, MessageBean>("exhibition",str);
	producer.send(data);

com.performanceTest.BeanSerializer代码:

package com.performanceTest;
import com.performanceTest.BeanUtils;
import kafka.serializer.Encoder;
import kafka.utils.VerifiableProperties;
import kafka.serializer.Encoder;
import kafka.utils.VerifiableProperties;
import com.performanceTest.MessageBean;
public class BeanSerializer implements Encoder<MessageBean>{

	 public BeanSerializer(VerifiableProperties props) {
		 
	 }

	@Override
	public byte[] toBytes(MessageBean mb) {
		System.out.println("encoder ---> " + mb);
		return BeanUtils.object2Bytes(mb);
	}
	
}

BeanUtils的代码:

public class BeanUtils {
	public static Object bytes2Object(byte[] bytes) {
		Object obj = null;
		ByteArrayInputStream bais = null;
		ObjectInputStream ois = null;
		try {
			bais = new ByteArrayInputStream(bytes);
			ois = new ObjectInputStream(bais);
			obj = (Object) ois.readObject();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				ois.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		
		return obj;
	}

	public static byte[] object2Bytes(Object obj) {
		byte[] bytes = null;
		ByteArrayOutputStream baos = null;
		ObjectOutputStream oos = null;
		try {
			baos = new ByteArrayOutputStream();
			oos = new ObjectOutputStream(baos);
			oos.writeObject(obj);
			bytes = baos.toByteArray();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				oos.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		
		return bytes;
	}
}

Consumer示例:

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
				.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
		byte[] bytes = it.next().message();
		MessageBean mb = (MessageBean) BeanUtils.bytes2Object(bytes);
                ...
                ... 
                ...
}

OK,至此基本的应用kafka传输接受对象的例子完毕,尝试看过高端的代码如SimpleConsumer,基础不够,实在费劲,接着努力吧~~~~

PS:转载请注明出处

原文地址:https://www.cnblogs.com/nanxin521/p/4514971.html