SparkStreaming python 读取kafka数据将结果输出到单个指定本地文件

# -*- coding: UTF-8 -*-
#!/bin/env python3

# filename readFromKafkaStreamingGetLocation.py

import IP
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import datetime


class KafkaMessageParse:

    def extractFromKafka(self,kafkainfo):
        if type(kafkainfo) is tuple and len(kafkainfo) == 2:
            return kafkainfo[1]

    def lineFromLines(self,lines):
        if lines is not None and len(lines) > 0:
            return lines.strip().split("
")

    def messageFromLine(self,line):
        if line is not None and "message" in line.keys():
            return line.get("message")

    def ip2location(self,ip):
        result = []
        country = 'country'
        province = 'province'
        city = 'city'
        ipinfo = IP.find(ip.strip())
        try:
            location = ipinfo.split("	")
            if len(location) == 3:
                country = location[0]
                province = location[1]
                city = location[2]
            elif len(location) == 2:
                country = location[0]
                province = location[1]
            else:
                pass
        except Exception:
            pass
        result.append(ip)
        result.append(country)
        result.append(province)
        result.append(city)
        return result

    def vlistfromkv(self, strori, sep1, sep2):
        resultlist = []
        fields = strori.split(sep1)
        for field in fields:
            kv = field.split(sep2)
            resultlist.append(kv[1])
        return resultlist


    def extractFromMessage(self, message):
        if message is not None and len(message) > 1:
            if len(message.split("u0001")) == 8:
                resultlist = self.vlistfromkv(message, "x01", "x02")
                source = resultlist.pop()
                ip = resultlist.pop()
                resultlist.extend(self.ip2location(ip))
                resultlist.append(source)
                result = "x01".join(resultlist)
        return result


def tpprint(val, num=10000):
    """
    Print the first num elements of each RDD generated in this DStream.
    @param num: the number of elements from the first will be printed.
    """
    def takeAndPrint(time, rdd):
        taken = rdd.take(num + 1)
        print("########################")
        print("Time: %s" % time)
        print("########################")
        DATEFORMAT = '%Y%m%d'
        today = datetime.datetime.now().strftime(DATEFORMAT)
        myfile = open("/data/speech/speech." + today, "a")
        for record in taken[:num]:
            print(record)
            myfile.write(str(record)+"
")
        myfile.close()
        if len(taken) > num:
            print("...")
        print("")

    val.foreachRDD(takeAndPrint)


if __name__ == '__main__':
    zkQuorum = 'datacollect-1:2181,datacollect-2:2181,datacollect-3:2181'
    topic = {'speech-1': 1, 'speech-2': 1, 'speech-3': 1, 'speech-4':1, 'speech-5':1}
    groupid = "rokid-speech-get-location"
    master = "local[*]"
    appName = "SparkStreamingRokid"
    timecell = 5

    sc = SparkContext(master=master, appName=appName)
    ssc = StreamingContext(sc, timecell)
    # ssc.checkpoint("checkpoint_"+time.strftime("%Y-%m-%d", time.localtime(time.time())))

    kvs = KafkaUtils.createStream(ssc, zkQuorum, groupid, topic)
    kmp = KafkaMessageParse()
    lines = kvs.map(lambda x: kmp.extractFromKafka(x))
    lines1 = lines.flatMap(lambda x: kmp.lineFromLines(x))
    valuedict = lines1.map(lambda x: eval(x))
    message = valuedict.map(lambda x: kmp.messageFromLine(x))
    rdd2 = message.map(lambda x: kmp.extractFromMessage(x))

    # rdd2.pprint()

    tpprint(rdd2)
    # rdd2.fileprint(filepath="result.txt")

    # rdd2.foreachRDD().saveAsTextFiles("/home/admin/agent/spark/result.txt")

    # sc.parallelize(rdd2.cache()).saveAsTextFile("/home/admin/agent/spark/result", "txt")

    # rdd2.repartition(1).saveAsTextFiles("/home/admin/agent/spark/result.txt")

    ssc.start()
    ssc.awaitTermination()

主要是重写pprint()函数

参考:https://stackoverflow.com/questions/37864526/append-spark-dstream-to-a-single-file-in-python

原文地址:https://www.cnblogs.com/zhzhang/p/7347636.html