flume接收http请求,并将数据写到kafka

flume接收http请求,并将数据写到kafka,spark消费kafka的数据。是数据采集的经典框架。

直接上flume的配置:

source : http

channel : file

sink : kafka

xx :~/software/flume1.8/conf$ cat http-file-kafka.conf 
# example.conf: A single-node Flume configuration
##########
# data example 
# use post request, select raw, data example : [{"body" : "{'xx':'xxxxx1'}"}]
# just use the office request demo

#[{
#  "headers" : {
#             "timestamp" : "434324343",
#             "host" : "random_host.example.com"
#             "topic" : "venn" # if headers contain topic, will replace the default topic
#             },
#  "body" : "random_body" # random_body is the message send to channel
#  }]

# Name the components on this agent1
agent1.sources = s1
agent1.sinks = k1
agent1.channels = c1
# Describe/configure the source
agent1.sources.s1.type = http
agent1.sources.s1.bind = spring  # localhost 只能接收本地请求
agent1.sources.s1.port = 8084  # http的端口
agent1.sources.s1.handler = org.apache.flume.source.http.JSONHandler  # 自带的接收http请求的handler
# Describe the sink
agent1.sinks.k1.type =  org.apache.flume.sink.kafka.KafkaSink  # kafkasink
agent1.sinks.k1.kafka.topic = mytopic  # topic
agent1.sinks.k1.kafka.bootstrap.servers = localhost:9092  # kafka host and port
agent1.sinks.k1.kafka.flumeBatchSize = 20
agent1.sinks.k1.kafka.producer.acks = 1
agent1.sinks.k1.kafka.producer.linger.ms = 1
agent1.sinks.k1.kafka.producer.compression.type = snappy  # 压缩
# Use a channel which buffers events in memory
agent1.channels.c1.type = file
#agent1.channels.c1.capacity = 1000  # 这两个参数要配置,需要配大一点,不然channel满了会报错,http返回503(通道已满)
#agent1.channels.c1.transactionCapacity = 100
agent1.channels.c1.checkpointDir = /opt/flume/checkpoint  
agent1.channels.c1.dataDirs = /opt/flume/channel
# Bind the source and sink to the channel
agent1.sources.s1.channels = c1
agent1.sinks.k1.channel = c1

有了flume的配置,下面启动flume:

./bin/flume-ng agent -n agent1 -c conf -f conf/http-to-kafka.properties -Dflume.root.logger=INFO,console

启动之后,就可以发http请求了。

http请求的格式如下:

[{
  "headers" : {
             "timestamp" : "434324343",
             "host" : "random_host.example.com",
             "topic" : "xxx"
             },
  "body" : "random_body"
  },
  {
  "headers" : {
             "namenode" : "namenode.example.com",
             "datanode" : "random_datanode.example.com"
             },
  "body" : "really_random_body"
  }]    

注: http请求的headers中又topic 会替代配置文件中的topic

  flume官网文档说:1.8.0版本的flume只支持0.9.x的kafka,不支持0.8.x的kafka了(没测过)

然后就是发数的程序了(自己请求太麻烦了。)

package com.venn.http;

import com.venn.entity.User;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.*;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.JSONEvent;
import com.google.gson.Gson;
import org.apache.flume.source.http.HTTPBadRequestException;
import org.apache.flume.source.http.HTTPSourceHandler;

import javax.servlet.http.HttpServletRequest;


/**
 * Created by venn on 19-1-17.
 */
public class HttpDemo  {

    private static String urlStr = "http://localhost:8084";
    private static Random random = new Random();
    public static void main(String[] args) throws InterruptedException {

        while (true){
            String message = new User().toString();
            send(message);

//            Thread.sleep(1);
        }
    }
    public static void send(String message){
        System.out.println("send  message : " + message);
        try{
            //创建连接
            URL url = new URL(urlStr);
            HttpURLConnection connection = (HttpURLConnection) url.openConnection();
            connection.setDoOutput(true);
            connection.setDoInput(true);
            connection.setRequestMethod("POST");
            connection.setUseCaches(false);
            connection.setInstanceFollowRedirects(true);
            connection.setRequestProperty("Content-Type",
                    "application/x-www-form-urlencoded");
            connection.connect();
            //POST请求
            DataOutputStream out = new DataOutputStream(
                    connection.getOutputStream());

            JSONEvent jsonEvent = new JSONEvent();
            Map header = new HashMap();
            header.put("timestamp", System.currentTimeMillis());
            header.put("host", "venn");
            header.put("topic","venn"+random.nextInt(4));
            jsonEvent.setBody(message.getBytes());
            jsonEvent.setHeaders(header);
            Gson gson = new Gson();
            List list = new ArrayList();
            list.add(jsonEvent);
            out.writeBytes(gson.toJson(list));
            out.flush();
            out.close();

            //读取响应
            BufferedReader reader = new BufferedReader(new InputStreamReader(
                    connection.getInputStream())); // 不会返回数据
            int code = connection.getResponseCode();
            String lines;
            StringBuffer sb = new StringBuffer("");
            while ((lines = reader.readLine()) != null) {
                lines = new String(lines.getBytes(), "utf-8");
                sb.append(lines);
            }
            System.out.println("code : " + code + ", message : " + sb);
            reader.close();
            // 断开连接
            connection.disconnect();
    } catch (MalformedURLException e) {
        e.printStackTrace();
    } catch (UnsupportedEncodingException e) {
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
    }
    }

}

搞定。。

发数:

 kafka接收到的数据:

注意: 由于在headers中加入了topic参数,实际接收到的数据是在不同的kafka topic中的

原文地址:https://www.cnblogs.com/Springmoon-venn/p/10483920.html