使用框架的各种代码示例

一、装饰器方式调度函数

from function_scheduling_distributed_framework import task_deco, BrokerEnum


# qps可以指定每秒运行多少次,可以设置0.001到10000随意。
# broker_kind 指定使用什么中间件,如果用redis,就需要在 distributed_frame_config.py 设置redis相关的配置。
@task_deco('queue_test_f01', qps=0.2, broker_kind=BrokerEnum.REDIS_ACK_ABLE)  # qps 0.2表示每5秒运行一次函数,broker_kind=2表示使用redis作中间件。
def add(a, b):
    print(a + b)


for i in range(10, 20):
    add.pub(dict(a=i, b=i * 2))  # 使用add.pub 发布任务
    add.push(i, b=i * 2)  # 使用add.push 发布任务
add.consume()  # 使用add.consume 消费任务
add.multi_process_consume(4)  # 这是开启4进程 叠加 细粒度(协程/线程)并发,速度更强。

二、非装饰器调度函数

from function_scheduling_distributed_framework import get_consumer, BrokerEnum


def add(a, b):
    print(a + b)


# 非装饰器方式,多了一个入参,需要手动指定consuming_function入参的值。
consumer = get_consumer('queue_test_f01', consuming_function=add, qps=0.2, broker_kind=BrokerEnum.REDIS_ACK_ABLE)
for i in range(10, 20):
    consumer.publisher_of_same_queue.publish(dict(a=i, b=i * 2))  # consumer.publisher_of_same_queue.publish 发布任务
consumer.start_consuming_message()  # 使用consumer.start_consuming_message 消费任务

三、如何解决多个步骤的消费函数

import time

from function_scheduling_distributed_framework import task_deco, BrokerEnum


@task_deco('queue_test_step1', qps=0.5, broker_kind=BrokerEnum.LOCAL_PYTHON_QUEUE)
def step1(x):
    print(f'x 的值是 {x}')
    if x == 0:
        for i in range(1, 300):
            step1.pub(dict(x=x + i))
    for j in range(10):
        step2.push(x * 100 + j)  # push是直接发送多个参数,pub是发布一个字典
    time.sleep(10)


@task_deco('queue_test_step2', qps=3, broker_kind=BrokerEnum.LOCAL_PYTHON_QUEUE)
def step2(y):
    print(f'y 的值是 {y}')
    time.sleep(10)


if __name__ == '__main__':
    # step1.clear()
    step1.push(0)  # 给step1的队列推送任务。

    step1.consume()  # 可以连续启动两个消费者,因为conusme是启动独立线程里面while 1调度的,不会阻塞主线程,所以可以连续运行多个启动消费。
    step2.consume()

四、演示如何定时运行

# 定时运行消费演示,定时方式入参用法可以百度 apscheduler 定时包。
import datetime
from function_scheduling_distributed_framework import task_deco, BrokerEnum, fsdf_background_scheduler, timing_publish_deco


@task_deco('queue_test_666', broker_kind=BrokerEnum.LOCAL_PYTHON_QUEUE)
def consume_func(x, y):
    print(f'{x} + {y} = {x + y}')


if __name__ == '__main__':
    fsdf_background_scheduler.add_job(timing_publish_deco(consume_func), 'interval', id='3_second_job', seconds=3, kwargs={"x": 5, "y": 6})  # 每隔3秒发布一次任务,自然就能每隔3秒消费一次任务了。
    fsdf_background_scheduler.add_job(timing_publish_deco(consume_func), 'date', run_date=datetime.datetime(2020, 7, 24, 13, 53, 6), args=(5, 6,))  # 定时,只执行一次
    fsdf_background_scheduler.add_timing_publish_job(consume_func, 'cron', day_of_week='*', hour=14, minute=51, second=20, args=(5, 6,))  # 定时,每天的11点32分20秒都执行一次。
    # 启动定时
    fsdf_background_scheduler.start()
    # 启动消费
    consume_func.consume()

五、多进程并发 + 多线程/携程

ff.multi_process_start(2) 就是代表启动2个独立进程并发 + 叠加 asyncio、gevent、eventlet、threding细粒度并发,这样运行性能炸裂。

多进程消费

import time
from function_scheduling_distributed_framework import task_deco, BrokerEnum, IdeAutoCompleteHelper, PriorityConsumingControlConfig, run_consumer_with_multi_process

"""
演示多进程启动消费,多进程和 asyncio/threading/gevnt/evntlet是叠加关系,不是平行的关系。
"""

# qps=5,is_using_distributed_frequency_control=True 分布式控频每秒执行5次。
# 如果is_using_distributed_frequency_control不设置为True,默认每个进程都会每秒执行5次。
@task_deco('test_queue', broker_kind=BrokerEnum.REDIS, qps=5, is_using_distributed_frequency_control=True)
def ff(x, y):
    import os
    time.sleep(2)
    print(os.getpid(), x, y)


if __name__ == '__main__':
    ff.clear() # 清除
    # ff.publish()
    for i in range(1000):
        ff.push(i, y=i * 2)

        # 这个与push相比是复杂的发布,第一个参数是函数本身的入参字典,后面的参数为任务控制参数,例如可以设置task_id,设置延时任务,设置是否使用rpc模式等。
        ff.publish({'x': i * 10, 'y': i * 2}, priority_control_config=PriorityConsumingControlConfig(countdown=1, misfire_grace_time=15))

    ff(666, 888)  # 直接运行函数
    ff.start()  # 和 conusme()等效
    ff.consume()  # 和 start()等效
    run_consumer_with_multi_process(ff, 2)  # 启动两个进程
    ff.multi_process_start(2)  # 启动两个进程,和上面的run_consumer_with_multi_process等效,现在新增这个multi_process_start方法。
    IdeAutoCompleteHelper(ff).multi_process_start(3)  # IdeAutoCompleteHelper 可以补全提示,但现在装饰器加了类型注释,ff. 已近可以在pycharm下补全了。

六、延时运行任务

因为有很多人有这样的需求,希望发布后不是马上运行,而是延迟60秒或者现在发布晚上18点运行。
然来是希望用户自己亲自在消费函数内部写个sleep(60)秒再执行业务逻辑,来达到延时执行的目的,
但这样会被sleep占据大量的并发线程/协程,如果是用户消费函数内部写sleep7200秒这么长的时间,那
sleep等待会占据99.9%的并发工作线程/协程的时间,导致真正的执行函数的速度大幅度下降,所以框架
现在从框架层面新增这个延时任务的功能。

之前已做的功能是定时任务,现在新增延时任务,这两个概念有一些不同。

定时任务一般情况下是配置为周期重复性任务,延时任务是一次性任务。
1)框架实现定时任务原理是定时发布,自然而然就能达到定时消费的目的。
2)框架实现延时任务的原理是马上立即发布,当消费者取出消息后,并不是立刻去运行,
   而是使用定时运行一次的方式延迟这个任务的运行。

在需求开发过程中,我们经常会遇到一些类似下面的场景:
1)外卖订单超过15分钟未支付,自动取消
2)使用抢票软件订到车票后,1小时内未支付,自动取消
3)待处理申请超时1天,通知审核人员经理,超时2天通知审核人员总监
4)客户预定自如房子后,24小时内未支付,房源自动释放


分布式函数调度框架的延时任务概念类似celery的countdown和eta入参,  add.apply_async(args=(1, 2),countdown=20)  # 规定取出后20秒再运行
此框架的入参名称那就也叫 countdown和eta。
countdown 传一个数字,表示多少秒后运行。
eta传一个datetime对象表示,精确的运行时间运行一次。

消费,消费代码没有任何变化

from function_scheduling_distributed_framework import task_deco, BrokerEnum


@task_deco('test_delay', broker_kind=BrokerEnum.REDIS_ACK_ABLE)
def f(x):
    print(x)


if __name__ == '__main__':
    f.consume()

发布延时任务

# 需要用publish,而不是push,这个前面已经说明了,如果要传函数入参本身以外的参数到中间件,需要用publish。
# 不然框架分不清哪些是函数入参,哪些是控制参数。如果无法理解就,就好好想想琢磨下celery的 apply_async 和 delay的关系。

from test_frame.test_delay_task.test_delay_consume import f
import datetime
import time
from function_scheduling_distributed_framework import PriorityConsumingControlConfig

"""
测试发布延时任务,不是发布后马上就执行函数。

countdown 和 eta 只能设置一个。
countdown 指的是 离发布多少秒后执行,
eta是指定的精确时间运行一次。

misfire_grace_time 是指定消息轮到被消费时候,如果已经超过了应该运行的时间多少秒之内,仍然执行。
misfire_grace_time 如果设置为None,则消息一定会被运行,不会由于大连消息积压导致消费时候已近太晚了而取消运行。
misfire_grace_time 如果不为None,必须是大于等于1的整数,此值表示消息轮到消费时候超过本应该运行的时间的多少秒内仍然执行。
此值的数字设置越小,如果由于消费慢的原因,就有越大概率导致消息被丢弃不运行。如果此值设置为1亿,则几乎不会导致放弃运行(1亿的作用接近于None了)
如果还是不懂这个值的作用,可以百度 apscheduler 包的 misfire_grace_time 概念

"""
for i in range(1, 20):
    time.sleep(1)

    # 消息发布10秒后再执行。如果消费慢导致任务积压,misfire_grace_time为None,即使轮到消息消费时候离发布超过10秒了仍然执行。
    f.publish({'x': i}, priority_control_config=PriorityConsumingControlConfig(countdown=10))

    # 规定消息在17点56分30秒运行,如果消费慢导致任务积压,misfire_grace_time为None,即使轮到消息消费时候已经过了17点56分30秒仍然执行。
    f.publish({'x': i * 10}, priority_control_config=PriorityConsumingControlConfig(
        eta=datetime.datetime(2021, 5, 19, 17, 56, 30) + datetime.timedelta(seconds=i)))

    # 消息发布10秒后再执行。如果消费慢导致任务积压,misfire_grace_time为30,如果轮到消息消费时候离发布超过40 (10+30) 秒了则放弃执行,
    # 如果轮到消息消费时候离发布时间是20秒,由于 20 < (10 + 30),则仍然执行
    f.publish({'x': i * 100}, priority_control_config=PriorityConsumingControlConfig(
        countdown=10, misfire_grace_time=30))

    # 规定消息在17点56分30秒运行,如果消费慢导致任务积压,如果轮到消息消费时候已经过了17点57分00秒,
    # misfire_grace_time为30,如果轮到消息消费时候超过了17点57分0秒 则放弃执行,
    # 如果如果轮到消息消费时候是17点56分50秒则执行。
    f.publish({'x': i * 1000}, priority_control_config=PriorityConsumingControlConfig(
        eta=datetime.datetime(2021, 5, 19, 17, 56, 30) + datetime.timedelta(seconds=i),
        misfire_grace_time=30))

    # 这个设置了消息由于推挤导致运行的时候比本应该运行的时间如果小于1亿秒,就仍然会被执行,所以几乎肯定不会被放弃运行
    f.publish({'x': i * 10000}, priority_control_config=PriorityConsumingControlConfig(
        eta=datetime.datetime(2021, 5, 19, 17, 56, 30) + datetime.timedelta(seconds=i),
        misfire_grace_time=100000000))
原文地址:https://www.cnblogs.com/ltyc/p/14893771.html