python scala kafka 集成一个流程项目 spark

想在windows 下 ,搭建一个spark kafka 的 最简单的实时流计算:
python 随机生成0-100 的随机数据,发送给spark 进行统计
scala  2.11
python 3.5
java 1.8
kafka_2.11-0.11.0.0.tgz
zookeeper-3.4.9.tar.gz
spark 2.2

step 1
zk  配置  ,启动zkserver

step 2
kafka 配置 启动 kafka  ,创建topic



数据生成端:
# -*- coding: utf-8 -*-
from kafka import KafkaProducer
import random
import time

'''
c:p_not_imprtkafka_2.11-0.11.0.0inwindows>kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkaid
'''

'''
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic kafkaid --from-beginning
kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic kafkaid --from-beginning
'''

class Kafka_Proceduer():
    def __init__(self,kafkahost,kafkaport,kafkatopic):
        self.kafkahost=kafkahost
        self.kafkaport=kafkaport
        self.kafkatopic=kafkatopic
        self.producer=KafkaProducer(bootstrap_servers = '{kafka_host}:{kafka_port}'.format(kafka_host=self.kafkahost,kafka_port=self.kafkaport))


    def sendvalues(self):
        values = random.randint(0,100)
        proceduer = self.producer
        proceduer.send(self.kafkatopic,bytes(values))
        print(values)
        proceduer.flush()

def main():
    proceduer = Kafka_Proceduer('127.0.0.1',9092,"kafkaid")
    while 1==1:
        proceduer.sendvalues()
        time.sleep(5)

if __name__ == '__main__':
    main()


    
数据消费端 ,用于测试:
# encoding:utf-8
from kafka import KafkaConsumer
"""
测试数据可以消费
"""
class Kafka_Consumer():
    def __init__(self, kafkahost, kafkaport, kafkatopic):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic

        self.consumer = KafkaConsumer(self.kafkatopic,
                                      bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
            kafka_host=self.kafkaHost,
            kafka_port=self.kafkaPort ))

    def dataToConsumer(self):
        for data in self.consumer:
            print (data)

def main():
    con = Kafka_Consumer('127.0.0.1',9092,'kafkaid')
    con.dataToConsumer()

if __name__=='__main__':
   main()


   """
ConsumerRecord(topic='kafkaid', partition=0, offset=305, timestamp=1503113220641, timestamp_type=0, key=None, value=b'x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00', checksum=-1933145318, serialized_key_size=-1, serialized_value_size=48)
ConsumerRecord(topic='kafkaid', partition=0, offset=306, timestamp=1503113225648, timestamp_type=0, key=None, value=b'x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00', checksum=-341178958, serialized_key_size=-1, serialized_value_size=77)
ConsumerRecord(topic='kafkaid', partition=0, offset=307, timestamp=1503113230657, timestamp_type=0, key=None, value=b'x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00', checksum=825828613, serialized_key_size=-1, serialized_value_size=22)
ConsumerRecord(topic='kafkaid', partition=0, offset=308, timestamp=1503113235667, timestamp_type=0, key=None, value=b'x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00', checksum=-1954167954, serialized_key_size=-1, serialized_value_size=21)
ConsumerRecord(topic='kafkaid', partition=0, offset=309, timestamp=1503113240673, timestamp_type=0, key=None, value=b'x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00', checksum=341811256, serialized_key_size=-1, serialized_value_size=16)
ConsumerRecord(topic='kafkaid', partition=0, offset=310, timestamp=1503113245678, timestamp_type=0, key=None, value=b'x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00', checksum=979573176, serialized_key_size=-1, serialized_value_size=88)
ConsumerRecord(topic='kafkaid', partition=0, offset=311, timestamp=1503113250685, timestamp_type=0, key=None, value=b'x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00', checksum=1345984197, serialized_key_size=-1, serialized_value_size=90)
ConsumerRecord(topic='kafkaid', partition=0, offset=312, timestamp=1503113255695, timestamp_type=0, key=None, value=b'x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00', checksum=-1404551612, serialized_key_size=-1, serialized_value_size=89)
ConsumerRecord(topic='kafkaid', partition=0, offset=313, timestamp=1503113260700, timestamp_type=0, key=None, value=b'x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00', checksum=-96665387, serialized_key_size=-1, serialized_value_size=50)
ConsumerRecord(topic='kafkaid', partition=0, offset=314, timestamp=1503113265708, timestamp_type=0, key=None, value=b'x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00', checksum=479929368, serialized_key_size=-1, serialized_value_size=25)
ConsumerRecord(topic='kafkaid', partition=0, offset=315, timestamp=1503113270715, timestamp_type=0, key=None, value=b'x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00x00', checksum=-1410866262, serialized_key_size=-1, serialized_value_size=92)
ConsumerRecord(topic='kafkaid', partition=0, offset=316, timestamp=1503113275722, timestamp_type=0, key=None, value=b'x00x00x00x00x00x00x00x00x00x00', checksum=149549586, serialized_key_size=-1, serialized_value_size=10)
   """



   
   

Spark 数据消费端:


貌似 现在还不能从 0.11 kafka读取数据 ,

不过现实的话需要两个重要的函数

updateStateByKey

reduceByKey





原文地址:https://www.cnblogs.com/TendToBigData/p/10501212.html