实时流处理项目具体步骤

一、产生实时数据(使用python编写),为以后的处理打基础

构建:编写python脚本,用crontab定时执行

  1. 编写python脚本文件,行内容包括time,ip,url,statu_code,referer的数据

    generate_log.py

    #coding=UTF-8  
    
    # 导入random函数  
    import random
    import time
    
    # 定义url字段
    url_paths = [
        "class/301.html",
        "class/215.html",
        "class/172.html",
        "class/153.html",
        "class/322.html",
        "class/272.html",
        "learn/1102",
        "course/list"
    ] 
    
    # 定义ip字段
    ip_slices = [132,134,10,29,167,198,55,63,72,98,22,25]
    
    # 引流网站
    http_referers = [
        "https://www.baidu.com/s?wd={query}",
        "https://www.sogou.com/web?query={query}",
        "https://cn.bing.com/search?q={query}",
        "https://search.yahoo.com/search?p={query}"
    ]
    
    # 查看的key-value
    search_keyword = [
        "Spark SQL实战",
        "Hadoop基础",
        "Storm实战",
        "Spark Streaming实战",
        "大数据面试"
    ]
    
    # 状态码
    status_codes = ["200", "404", "500"]
    
    # 随机生产url
    def sample_url():
        return random.sample(url_paths, 1)[0]
    
    # 随机生成ip
    def sample_ip():
        slice = random.sample(ip_slices, 4)
        return ".".join([str(item) for item in slice])
    
    # 随机生成引流网站,没有就是 - 
    def sample_referer():
        if random.uniform(0, 1) > 0.2:
            return "-"
    
        refer_str = random.sample(http_referers, 1)
        query_str = random.sample(search_keyword, 1)
        return refer_str[0].format(query=query_str[0])
    
    # 随机生成状态码
    def sample_status_code():
        return random.sample(status_codes, 1)[0]
    
    # 将url与ip进行连接,生成查询日志
    def generate_log(count = 10):
        # 生成时间,需要导入time函数
        time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    
        f = open("/Users/hadoop/Desktop/data/logs/access.log", "w+")
    
        while count >= 1:
            query_log = "{ip}	{local_time}	"GET /{url} HTTP/1.1	{status_code}	{referer}".format(url=sample_url(), ip=sample_ip(), referer=sample_referer(),status_code=sample_status_code(),local_time=time_str)
            print query_log
    
            f.write(query_log + "
    ")
    
            count = count - 1 
    
    # 主函数
    # 保存后在Terminal中运行 python generate_log.py
    if __name__ == '__main__':
        generate_log(200)
    
    
  2. 定时执行脚本产生日志:一分钟一次

    crontab -e
        */1 * * * * /Users/hadoop/Desktop/data/log_generator.sh  
    

    Notes: crontab工具使用说明 linux crontab 网站:https://tool.lu/crontab/ 每一分钟执行一次的crontab表达式: */1 * * * *

    Linux
    *    *    *    *    *    *
    -    -    -    -    -    -
    |    |    |    |    |    |
    |    |    |    |    |    + year [optional]
    |    |    |    |    +----- day of week (0 - 7) (Sunday=0 or 7)
    |    |    |    +---------- month (1 - 12)
    |    |    +--------------- day of month (1 - 31)
    |    +-------------------- hour (0 - 23)
    +------------------------- min (0 - 59)
    


二 、对接python日志产生器输出的日志到Flume

Streaming_project.conf

    exec-memory-logger.sources = exec-source
    exec-memory-logger.sinks = logger-sink
    exec-memory-logger.channels = memory-channel

    exec-memory-logger.sources.exec-source.type = exec
    exec-memory-logger.sources.exec-source.command = tail -F /Users/hadoop/Desktop/data/logs/access.log
    exec-memory-logger.sources.exec-source.shell = /bin/sh -c

    exec-memory-logger.channels.memory-channel.type = memory

    exec-memory-logger.sinks.logger-sink.type = logger

    exec-memory-logger.sources.exec-source.channels = memory-channel
    exec-memory-logger.sinks.logger-sink.channel = memory-channel

启动flume

flume-ng agent 
--conf $FLUME_HOME/conf 
--conf-file /Users/hadoop/Desktop/data/streaming.conf 
--name exec-memory-logger 
-Dflume.root.logger=INFO,console


三 、Flume对接Kafka

日志 ==> flume ==> kafka

  1. 启动zk: zkServer.sh start
  2. 启动Kafka Server: ./kafka-server-start.sh -daemon /Users/hadoop/app/kafka/config/server.properties &
  3. 创建topic ./kafka-topics.sh --create --zookeeper localhost:2181 --repalication-factor 1 --partitions 1 --topic streaming_log_py
  4. 查看topic ./kafka-topics.sh --list --zookeeper localhost:2181
  5. 启动kafka消费者 ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic streaming_log_py
  6. 修改Flume配置文件使得flume sink数据到kafka

streaming_kafka.conf

exec-memory-kafka.sources = exec-source
exec-memory-kafka.sinks = kafka-sink
exec-memory-kafka.channels = memory-channel

exec-memory-kafka.sources.exec-source.type = exec
exec-memory-kafka.sources.exec-source.command = tail -F /Users/hadoop/Desktop/data/logs/access.log
exec-memory-kafka.sources.exec-source.shell = /bin/sh -c

exec-memory-kafka.channels.memory-channel.type = memory
exec-memory-kafka.channels.memory-channel.transactionCapacity = 10000
exec-memory-kafka.channels.memory-channel.byteCapacityBufferPercentage = 20
exec-memory-kafka.channels.memory-channel.byteCapacity = 800000
exec-memory-kafka.channels.memory-channel.keep-alive = 60
exec-memory-kafka.channels.memory-channel.capacity = 1000000

exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
exec-memory-kafka.sinks.kafka-sink.topic = streaming_log_py
exec-memory-kafka.sinks.kafka-sink.brokerList = localhost:9092
exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1
exec-memory-kafka.sinks.kafka-sink.batchSize = 5


exec-memory-kafka.sources.exec-source.channels = memory-channel
exec-memory-kafka.sinks.kafka-sink.channel = memory-channel

启动flume:

flume-ng agent 
--conf $FLUME_HOME/conf 
--conf-file /Users/hadoop/Desktop/data/streaming_kafka.conf 
--name exec-memory-kafka 
-Dflume.root.logger=INFO,console

flume正常启动后,查看crontab(启动),去kafka消费者控制台,查看是否有预计信息输出. Notes: 出现了数据的丢失 修改channel:

agent1.channels.memoryChannel.byteCapacity = 800000
agent1.channels.memoryChannel.keep-alive = 60
agent1.channels.memoryChannel.capacity = 1000000

修改java最大内存大小: ($FLUME_HOME/)目录下 vi bin/flume-ng

 JAVA_OPTS="-Xmx1024m"

解决方法,参照博客: https://www.cnblogs.com/zlslch/p/7253943.html 具体字段信息,可以flume官网的文档中查询 Memory Channel

项目实战

打通Flume&Kafka&Spark Streaming线路

  • 在Spark应用程序接收到数据并完成记录数统计 kafka采用的是receive模式,这是一种已经废弃的模式,要用direct模式
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 测试Kafka对接Spark Streaming
  */
object ImoocStatStreamingApp {

  def main(args: Array[String]): Unit = {
    if (args.length != 4) {
      System.err.println("Usage: <zkQuorum> <groupId> <topics> <numThreads>")
      System.exit(1)
    }

    val Array(zkQuorum, groupId, topics,numThreads) = args
    val sparkconf = new SparkConf()
      .setAppName("ImoocStatStreamingApp")
      .setMaster("local[2]")
    val ssc = new StreamingContext(sparkconf, Seconds(60))

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val message = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap)

// 测试步骤一: 测试数据接收
    message.map(_._2).count().print()
    
    ssc.start()
    ssc.awaitTermination()
  }
}
原文地址:https://www.cnblogs.com/suixingc/p/shi-shi-liu-chu-li-xiang-mu-ju-ti-bu-zhou.html