pyspark streaming

一、一个例子

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# create sc with two working threads
sc = SparkContext("local[2]","test")
# create local StreamingContext with batch interval of 1 second
ssc = StreamingContext(sc,1)
# create DStream that connects to localhost:9999
lines = ssc.socketTextStream("localhost",9999)
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda x: (x,1))
wordcount = pairs.reduceByKey(lambda x,y: x+y)
# 打印DStream里每个RDD的前10个元素
wordcount.pprint()
ssc.start()
ssc.awaitTermination()

运行过程:
1、linux 首先查看9999端口是否已经使用

netstat -ntpl | grep 9999

2、开启999端口

nc -lk 9999

如果在win10,使用

nc -l -p 9999

3、在新的窗口运行脚本,在之前的窗口输入字符串,在新窗口查看打印输出

-------------------------------------------
Time: 2021-10-21 15:49:17
-------------------------------------------
('kaka', 2)
('tt', 1)

二 spark连接kafka实现在线计算

wordcount案例

# kafka_spark.py
import findspark

findspark.init()

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
    sc = SparkContext(appName="streamingkafka")
    sc.setLogLevel("WARN") # 减少shell打印日志
    ssc = StreamingContext(sc, 5) # 5秒的计算窗口  the time interval (in seconds) at which streaming data will be divided into batches.
    brokers="""hadoop102:9092,hadoop103:9092,hadoop104:9092"""
    topic = "static_online"
    # 使用streaming使用直连模式消费kafka
    # 参数:ssc StreamingContext
    #        topic 名称
    #       kafka节点
    kafka_streaming_rdd = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    lines_rdd = kafka_streaming_rdd.map(lambda x: x[1])
    counts = lines_rdd.flatMap(lambda line: line.split(" ")) 
        .map(lambda word: (word, 1)) 
        .reduceByKey(lambda a, b: a+b)
    # 将workcount结果打印到当前shell
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()

提交任务

spark-submit  --jars spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar  kafka_spark.py

然后启动kakfa生产者,向static_online这个topic发生数据

from kafka import KafkaProducer
from kafka.errors import kafka_errors
import traceback
import json
bootstrap_servers = ['hadoop102:9092','hadoop103:9092','hadoop104:9092']

def producer_demo():
    # 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
    producer = KafkaProducer(
        bootstrap_servers=bootstrap_servers,
        key_serializer=lambda k: json.dumps(k).encode(),
        value_serializer=lambda v: json.dumps(v).encode())
    # 发送三条消息
    for i in range(0, 300):
        future = producer.send(
            'static_online',
            key='count_num',  # 同一个key值,会被送至同一个分区
            value=str(i) + "@qq.22com",
            partition=1)  # 向分区1发送消息
        print("send {}".format(str(i) + "@qq.22com")
        try:
            future.get(timeout=10) # 监控是否发送成功
        except kafka_errors:  # 发送失败抛出kafka_errors
            traceback.format_exc()

if __name__ == '__main__':
    producer_demo()

参考资料

1、基于PySpark整合Spark Streaming与Kafka

原文地址:https://www.cnblogs.com/leimu/p/15434664.html