从kafka读数据写到ES

http://kafka.apache.org/documentation.html#newconsumerconfigs

kafka是一款基于发布与订阅的消息系统,它一般被称为“分布式提交日志”或者“分布式流平台”。kafka的数据是按照一定顺序持久化保存的,可以按需读取。

 核心概念:

消费者与消费组

Kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费topic时,每个消费者会收到不同分区的消息。假设有一个topic T1,该topic有4个分区;同时我们有一个消费组G1,这个消费组只有一个消费者C1。那么消费者C1将会收到这4个分区的消息。如果我们增加新的消费者C2到消费组G1,那么每个消费者将会分别收到两个分区的消息,但如果我们继续增加消费者到这个消费组,剩余的消费者将会空闲,不会收到任何消息。所以,我们可以通过增加消费组的消费者来进行水平扩展提升消费能力。这也是为什么建议创建topic时使用较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。另外,消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助。

Kafka一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组

当消费者离开消费组(比如重启、宕机等)时,它所消费的分区会分配给其他分区。这种现象称为重平衡(rebalance)。重平衡是Kafka一个很重要的性质,这个性质保证了高可用和水平扩展。不过也需要注意到,在重平衡期间,所有消费者都不能消费消息,因此会造成整个消费组短暂的不可用。而且,将分区进行重平衡也会导致原来的消费者状态过期,从而导致消费者需要重新更新状态,这段期间也会降低消费性能。所以,尽量避免重平衡。

消费者通过定期发送心跳(hearbeat)到一个作为组协调者(group coordinator)的broker来保持在消费组内存活。这个broker不是固定的,每个消费组都可能不同。当消费者拉取消息或者提交时,便会发送心跳。如果消费者超过一定时间没有发送心跳,那么它的会话(session)就会过期,组协调者会认为该消费者已经宕机,然后触发重平衡。从消费者宕机到会话过期是有一定时间的,这段时间内该消费者的分区都不能进行消息消费;通常情况下,我们可以进行关闭,这样消费者会发送离开的消息到组协调者,这样组协调者可以立即进行重平衡而不需要等待会话过期。

在0.10.1版本,Kafka对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响。另外更高版本的Kafka支持配置一个消费者多长时间不拉取消息但仍然保持存活,这个配置可以避免活锁(livelock,指应用没有故障但是由于某些原因不能进一步消费)

注意:消费者对象不是线程安全的,也就是不能够多个线程同时使用一个消费者对象,也不能够一个线程有多个消费者对象,即一个线程一个消费者,如果需要多个消费者那么请用多线程来进行一一对应。

max.poll.records与session.timeout.ms参数问题

例如:consumer.properties配置中max.poll.records=40  (一次最多拉取40条数据)  session.timeout.ms=30000    (会话时间)

假设kafka此时一次拉取了40条数据,但在处理第31条的时候抛出了异常,就会导致本次offset不会提交,完了这40条消息都会在接下来的某刻被再次消费,这其中就包含了其实已经消费了的30条数据;

另一种情况是,如果poll下来数据后,处理这些数据的时间比 session.timeout.ms配置的时间要长,从而导致 rebalanced

所以,session.timeout.ms和小max.poll.records 具体配置为多少,得看你处理一条消息花费多长时间 x,需要满足 x乘以max.poll.records < session.timeout.ms

  读取kafka消息只需要创建一个kafka消费者(可以认为一个group是一个“订阅者”):

import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;

public class KafkaPoolData {
    private KafkaConsumer<String, Map<String, Object>> consumer;
    private String topic;
   
  public KafkaPoolData(String topic) {
        Properties consumerConfig = new Properties();
 
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "*.dct-*.com:9092");
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "xtt-source-group1"); //消费者组,保证要跟其他的不一样
        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 这个参数指定了当消费者第一次读取分区或者上一次的位置太老(比如消费者下线时间太久)时的行为,可以取值为latest(读取消息队列中最新的消息)或者earliest(从最老的消息开始消费)
 /**kafka根据key值确定消息发往哪个分区(如果分区被指定则发往指定的分区),具有相同key的消息被发往同一个分区,如果key为NONE则随机选择分区,可以使用key_serializer参数序列化为字节类型
        value为要发送的消息值,必须为bytes类型,如果这个值为空,则必须有对应的key值,并且空值被标记为删除。可以通过配置value_serializer参数序列化为字节类型*/
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.apache.kafka.common.serialization.StringDeserializer"); 
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");  //用来做反序列化的,也就是将字节数组转换成对象
        consumer = new KafkaConsumer<String, Map<String, Object>>(consumerConfig); //创建一个消费者
        this.topic = topic;
    }

 public void testConsumer(long offset,int number) {
        consumer.poll(1);//我们不断调用poll拉取数据,如果停止拉取,那么Kafka会认为此消费者已经死亡并进行重平衡。参数值是一个超时时间,指明线程如果没有数据时等待多长时间,0表示不等待立即返回
        consumer.seekToBeginning();//在给消费者分配分区的时候将消息偏移量跳转到起始位置 。
        List<TopicPartition> partitions = new ArrayList<TopicPartition>();
       //与subscirbe方法不同,assign方法由用户直接手动consumer实例消费哪些具体分区,assign的consumer不会拥有kafka的group management机制,也就是当group内消费者数量变化的时候不会有reblance行为发生。assign的方法不能和subscribe方法同时使用。
        for (int i = 0; i < 10; i++) {
            TopicPartition partition = new TopicPartition(topic, i);//i是指定分区partition
            partitions.add(partition);
            consumer.assign(partitions);
            consumer.seek(partition, offset);// 指定从这个topic和partition的哪个位置获取
        }  //consumer要消费0-9分区
       //consumer.close(); 主动关闭可以使得Kafka立即进行重平衡而不需要等待会话过期
        boolean result = true;
        int count = 0;
        System.out.println("
Consumer begin!
");
        while (result) {
            System.out.println("
Consumer data begin!
");
            ConsumerRecords<String, Map<String, Object>> records = consumer.poll(1000); //拉取消费记录

            for (ConsumerRecord<String, Map<String, Object>> record : records) {
                JSONObject json = new JSONObject();
                Map<String, Object> map = record.value();
                //System.out.println("
Parse data!
");
                for (Map.Entry<?, ?> entry : map.entrySet()) {
                    String filedName = String.valueOf(entry.getKey());
                    if (filedName.equals("image_data")) {

                    }else {
                        json.put(filedName, String.valueOf(entry.getValue())); //将非图片信息保存到json
                    }
                }
                 if(json.containsKey("time_stamp")) {
                    if (json.get("time_stamp") != null && (json.get("time_stamp").toString().compareTo("2018-01-19 10:30:00")) > 0 && json.get("time_stamp").toString().compareTo("2018-01-19 11:00:00") < 0) {
                        /*SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        String timeStamp = sdf.format(json.getString("time_stamp"));
                        json.put("time_stamp",timeStamp);*/
                        System.out.println("put to es");
                        //生成es索引的文档id
                        UUID uuid = UUID.randomUUID();
                        String id = uuid.toString();
                        PutDataToEs.PutToEs(json,id); 
                    }
                }
                count++;
                if (count % 100 == 0) {
                    System.out.println("parse data number : " + count); //打印写数据的数量
                }
                if (count == number) {
                    result = false;
                    break;
                }
            }
        }
        System.out.println("
Parse data Finished! count=" + count + "
");
        System.out.println("
Consumer Finished!
");
    }

public static void main(String[] args) {
        //初始化ES
        PutDataToEs.initClient();
        //topic名称
        String topic = "*-analysis-v1-1-production";
        //String topic = args[0];
        long offset =110000; //从哪里开始
       // long offset = Long.parseLong(args[0]);
        int number=1000000;  //需要拉取多少数据
       // int number = Integer.parseInt(args[1]);
        KafkaPoolData consumer = new KafkaPoolData(topic);
        consumer.testConsumer(offset,number);
        PutDataToEs.closeClient();
    }
}

 写入ES:

public class PutDataToEs {

    private static TransportClient client = null;

    private static long randomNum(long begin, long end)
    {
        long rtn = begin + (long)(Math.random() * (end - begin));
        if (rtn == begin || rtn == end)
        {
            return randomNum(begin,end);
        }
        return rtn;
    }

    public static void initClient() {
        try {
            // on startup
            Settings settings = Settings.builder()
                    .put("cluster.name", "*-*.com-es").build();
            client = new PreBuiltTransportClient(settings)
                    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.45.*.*"), 9300));
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
    }


    public static void closeClient() {
        client.close();
    }

    public static void PutToEs(JSONObject json,String id) {

        try {
            String msgType="";
            String resultType="";
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
            if(json.containsKey("msg_type")){
                msgType = json.getString("msg_type");
            }
            if(json.containsKey("result_type")){
                resultType = json.getString("result_type");
            }

          BulkRequestBuilder bulkRequest = client.prepareBulk();

          bulkRequest.add(client.prepareIndex("kafka_to_es_test", "kafkadata", String.valueOf(id)).setSource(jsonBuilder()
                            .startObject()
                            .field("time_stamp", sdf.parse(timeStamp))
                            .field("msg_type", msgType)
                            .endObject()
                    )
            );
          BulkResponse bulkResponse = bulkRequest.get();
             if (bulkResponse.hasFailures()) {
                // process failures by iterating through each bulk response item
                System.out.println(bulkRequest.toString());
             }
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
}
原文地址:https://www.cnblogs.com/zling/p/10450259.html