day10-python-协程异步IORabbitMQ队列 edis缓存

一、协程

协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

协程的好处:

  • 无需线程上下文切换的开销
  • 无需原子操作锁定及同步的开销
    •   "原子操作(atomic operation)是不需要synchronized",所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。
  • 方便切换控制流,简化编程模型
  • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

缺点:

  • 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
  • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

使用yield实现协程操作例子

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import time
import queue

def consumer(name):
    print("--->starting eating baozi...")
    while True:
        new_baozi = yield
        print("[%s] is eating baozi %s" % (name, new_baozi))
        # time.sleep(1)

def producer():
    r = con.__next__()
    r = con2.__next__()
    n = 0
    while n < 5:
        n += 1
        con.send(n)
        con2.send(n)
        print("33[32;1m[producer]33[0m is making baozi %s" % n)

if __name__ == '__main__':
    con = consumer("c1")
    con2 = consumer("c2")
    p = producer()

看楼上的例子,我问你这算不算做是协程呢?你说,我他妈哪知道呀,你前面说了一堆废话,但是并没告诉我协程的标准形态呀,我腚眼一想,觉得你说也对,那好,我们先给协程一个标准定义,即符合什么条件就能称之为协程:

  1. 必须在只有一个单线程里实现并发
  2. 修改共享数据不需加锁
  3. 用户程序里自己保存多个控制流的上下文栈
  4. 一个协程遇到IO操作自动切换到其它协程

基于上面这4点定义,我们刚才用yield实现的程并不能算是合格的线程,因为它有一点功能没实现,哪一点呢?

Greenlet

greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from greenlet import greenlet

def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()

def test2():
    print(56)
    gr1.switch()
    print(78)

gr1 = greenlet(test1)   #启动一个协程
gr2 = greenlet(test2)
gr1.switch()

感觉确实用着比generator还简单了呢,但好像还没有解决一个问题,就是遇到IO操作,自动切换,对不对?

Gevent

Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import gevent


def foo():
    print('Running in foo')
    gevent.sleep(2)
    print('Explicit context switch to foo again')


def bar():
    print('Explicit精确的 context内容 to bar')
    gevent.sleep(1)
    print('Implicit context switch back to bar')

def func3():
    print("running func3")
    gevent.sleep(0)
    print("running func3 again")

gevent.joinall([
    gevent.spawn(foo),  #生成
    gevent.spawn(bar),
    gevent.spawn(func3)
])

同步和异步的性能区别

import gevent
 
def task(pid):
    """
    Some non-deterministic task
    """
    gevent.sleep(0.5)
    print('Task %s done' % pid)
 
def synchronous():
    for i in range(1,10):
        task(i)
 
def asynchronous():
    threads = [gevent.spawn(task, i) for i in range(10)]
    gevent.joinall(threads)
 
print('Synchronous:')
synchronous()
 
print('Asynchronous:')
asynchronous()

上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行所有给定的greenlet。执行流程只会在 所有greenlet执行完后才会继续向下走。

遇到IO阻塞时会自动切换任务

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from urllib import request
import gevent,time
from gevent import monkey

monkey.patch_all()  #把当前程序的所有io操作给我单独的做上标记

def f(url):
    print('GET: %s' % url)
    resp = request.urlopen(url)
    data = resp.read()
    print('%d bytes received from %s.' % (len(data), url))

urls = ['https://www.python.org/',
        'https://www.yahoo.com/',
        'https://github.com/']
time_start = time.time()
for url in urls:
    f(url)
print("同步cost",time.time() - time_start)

async_time_start = time.time()
gevent.joinall([
    gevent.spawn(f, 'https://www.python.org/'),
    gevent.spawn(f, 'https://www.yahoo.com/'),
    gevent.spawn(f, 'https://github.com/'),
])
print("异步cost",time.time() - async_time_start)

通过gevent实现单线程下的多socket并发

server side 

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import sys
import socket
import time
import gevent

from gevent import socket, monkey

monkey.patch_all()


def server(port):
    s = socket.socket()
    s.bind(('0.0.0.0', port))
    s.listen(500)
    while True:
        cli, addr = s.accept()
        gevent.spawn(handle_request, cli)


def handle_request(conn):
    try:
        while True:
            data = conn.recv(1024)
            print("recv:", data)
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)

    except Exception as  ex:
        print(ex)
    finally:
        conn.close()


if __name__ == '__main__':
    server(8001)

client side 

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket

HOST = 'localhost'  # The remote host
PORT = 8001  # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
    msg = bytes(input(">>:"), encoding="utf8")
    s.sendall(msg)
    data = s.recv(1024)
    # print(data)
    #
    print('Received', repr(data))
s.close()

并发100个sock连接

import socket
import threading

def sock_conn():

    client = socket.socket()

    client.connect(("localhost",8001))
    count = 0
    while True:
        #msg = input(">>:").strip()
        #if len(msg) == 0:continue
        client.send( ("hello %s" %count).encode("utf-8"))

        data = client.recv(1024)

        print("[%s]recv from server:" % threading.get_ident(),data.decode()) #结果
        count +=1
    client.close()


for i in range(100):
    t = threading.Thread(target=sock_conn)
    t.start()

二、事件驱动和异步IO

通常,我们写服务器处理模型的程序时,有以下几种模型:
(1)每收到一个请求,创建一个新的进程,来处理该请求;
(2)每收到一个请求,创建一个新的线程,来处理该请求;
(3)每收到一个请求,放入一个事件列表,让主进程通过非阻塞I/O方式来处理请求
上面的几种方式,各有千秋,
第(1)中方法,由于创建新的进程的开销比较大,所以,会导致服务器性能比较差,但实现比较简单。
第(2)种方式,由于要涉及到线程的同步,有可能会面临死锁等问题。
第(3)种方式,在写应用程序代码时,逻辑比前面两种都复杂。
综合考虑各方面因素,一般普遍认为第(3)种方式是大多数网络服务器采用的方式
 

看图说话讲事件驱动模型

在UI编程中,常常要对鼠标点击进行相应,首先如何获得鼠标点击呢?
方式一:创建一个线程,该线程一直循环检测是否有鼠标点击,那么这个方式有以下几个缺点
1. CPU资源浪费,可能鼠标点击的频率非常小,但是扫描线程还是会一直循环检测,这会造成很多的CPU资源浪费;如果扫描鼠标点击的接口是阻塞的呢?
2. 如果是堵塞的,又会出现下面这样的问题,如果我们不但要扫描鼠标点击,还要扫描键盘是否按下,由于扫描鼠标时被堵塞了,那么可能永远不会去扫描键盘;
3. 如果一个循环需要扫描的设备非常多,这又会引来响应时间的问题;
所以,该方式是非常不好的。

方式二:就是事件驱动模型
目前大部分的UI编程都是事件驱动模型,如很多UI平台都会提供onClick()事件,这个事件就代表鼠标按下事件。事件驱动模型大体思路如下:
1. 有一个事件(消息)队列;
2. 鼠标按下时,往这个队列中增加一个点击事件(消息);
3. 有个循环,不断从队列取出事件,根据不同的事件,调用不同的函数,如onClick()、onKeyDown()等;
4. 事件(消息)一般都各自保存各自的处理函数指针,这样,每个消息都有独立的处理函数;

 

事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。

让我们用例子来比较和对比一下单线程、多线程以及事件驱动编程模型。下图展示了随着时间的推移,这三种模式下程序所做的工作。这个程序有3个任务需要完成,每个任务都在等待I/O操作时阻塞自身。阻塞在I/O操作上所花费的时间已经用灰色框标示出来了。

 

在单线程同步模型中,任务按照顺序执行。如果某个任务因为I/O而阻塞,其他所有的任务都必须等待,直到它完成之后它们才能依次执行。这种明确的执行顺序和串行化处理的行为是很容易推断得出的。如果任务之间并没有互相依赖的关系,但仍然需要互相等待的话这就使得程序不必要的降低了运行速度。

在多线程版本中,这3个任务分别在独立的线程中执行。这些线程由操作系统来管理,在多处理器系统上可以并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其他线程得以继续执行。与完成类似功能的同步程序相比,这种方式更有效率,但程序员必须写代码来保护共享资源,防止其被多个线程同时访问。多线程程序更加难以推断,因为这类程序不得不通过线程同步机制如锁、可重入函数、线程局部存储或者其他机制来处理线程安全问题,如果实现不当就会导致出现微妙且令人痛不欲生的bug。

在事件驱动版本的程序中,3个任务交错执行,但仍然在一个单独的线程控制中。当处理I/O或者其他昂贵的操作时,注册一个回调到事件循环中,然后当I/O操作完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询所有的事件,当事件到来时将它们分配给等待处理事件的回调函数。这种方式让程序尽可能的得以执行而不需要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,因为程序员不需要关心线程安全问题。

当我们面对如下的环境时,事件驱动模型通常是一个好的选择:

  1. 程序中有许多任务,而且…
  2. 任务之间高度独立(因此它们不需要互相通信,或者等待彼此)而且…
  3. 在等待事件到来时,某些任务会阻塞。

当应用程序需要在任务间共享可变的数据时,这也是一个不错的选择,因为这里不需要采用同步处理。

网络应用程序通常都有上述这些特点,这使得它们能够很好的契合事件驱动编程模型。

此处要提出一个问题,就是,上面的事件驱动模型中,只要一遇到IO就注册一个事件,然后主程序就可以继续干其它的事情了,只到io处理完毕后,继续恢复之前中断的任务,这本质上是怎么实现的呢?哈哈,下面我们就来一起揭开这神秘的面纱。。。。

三、select、poll、epoll三者的区别

 https://www.cnblogs.com/alex3714/p/4372426.html

四、IO多路复用

https://www.cnblogs.com/alex3714/articles/5876749.html

五、select/selectors模块

select 多并发socket 例子

select socket server

import select
import socket
import sys
import queue


server = socket.socket()
server.setblocking(0)

server_addr = ('localhost',10000)

print('starting up on %s port %s' % server_addr)
server.bind(server_addr)

server.listen(5)


inputs = [server, ] #自己也要监测呀,因为server本身也是个fd
outputs = []

message_queues = {}

while True:
    print("waiting for next event...")

    readable, writeable, exeptional = select.select(inputs,outputs,inputs) #如果没有任何fd就绪,那程序就会一直阻塞在这里

    for s in readable: #每个s就是一个socket

        if s is server: #别忘记,上面我们server自己也当做一个fd放在了inputs列表里,传给了select,如果这个s是server,代表server这个fd就绪了,
            #就是有活动了, 什么情况下它才有活动? 当然 是有新连接进来的时候 呀
            #新连接进来了,接受这个连接
            conn, client_addr = s.accept()
            print("new connection from",client_addr)
            conn.setblocking(0)
            inputs.append(conn) #为了不阻塞整个程序,我们不会立刻在这里开始接收客户端发来的数据, 把它放到inputs里, 下一次loop时,这个新连接
            #就会被交给select去监听,如果这个连接的客户端发来了数据 ,那这个连接的fd在server端就会变成就续的,select就会把这个连接返回,返回到
            #readable 列表里,然后你就可以loop readable列表,取出这个连接,开始接收数据了, 下面就是这么干 的

            message_queues[conn] = queue.Queue() #接收到客户端的数据后,不立刻返回 ,暂存在队列里,以后发送

        else: #s不是server的话,那就只能是一个 与客户端建立的连接的fd了
            #客户端的数据过来了,在这接收
            data = s.recv(1024)
            if data:
                print("收到来自[%s]的数据:" % s.getpeername()[0], data)
                message_queues[s].put(data) #收到的数据先放到queue里,一会返回给客户端
                if s not  in outputs:
                    outputs.append(s) #为了不影响处理与其它客户端的连接 , 这里不立刻返回数据给客户端


            else:#如果收不到data代表什么呢? 代表客户端断开了呀
                print("客户端断开了",s)

                if s in outputs:
                    outputs.remove(s) #清理已断开的连接

                inputs.remove(s) #清理已断开的连接

                del message_queues[s] ##清理已断开的连接


    for s in writeable:
        try :
            next_msg = message_queues[s].get_nowait()

        except queue.Empty:
            print("client [%s]" %s.getpeername()[0], "queue is empty..")
            outputs.remove(s)

        else:
            print("sending msg to [%s]"%s.getpeername()[0], next_msg)
            s.send(next_msg.upper())


    for s in exeptional:
        print("handling exception for ",s.getpeername())
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()

        del message_queues[s]

select socket client

import socket
import sys

messages = [ b'This is the message. ',
             b'It will be sent ',
             b'in parts.',
             ]
server_address = ('localhost', 10000)

# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          ]

# Connect the socket to the port where the server is listening
print('connecting to %s port %s' % server_address)
for s in socks:
    s.connect(server_address)

for message in messages:

    # Send messages on both sockets
    for s in socks:
        print('%s: sending "%s"' % (s.getsockname(), message) )
        s.send(message)

    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print( '%s: received "%s"' % (s.getsockname(), data) )
        if not data:
            print(sys.stderr, 'closing socket', s.getsockname() )

selectors模块

这个模块允许高级和高效的I/O多路复用,建立在选择模块原语的基础上。我们鼓励用户使用这个模块,除非他们希望对所使用的os级原语进行精确控制。

import selectors
import socket
 
sel = selectors.DefaultSelector()
 
def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)
 
def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()
 
sock = socket.socket()
sock.bind(('localhost', 10000))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)
 
while True:
    events = sel.select()
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)

六、RabbitMQ队列

实现最简单的队列通信


send端

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()  #声明一个管道

#声明queue
channel.queue_declare(queue='hello2')

# n RabbitMQ a message can never be sent directly to the queue,it always
channel.basic_publish(exchange='',
                      routing_key='hello2',#queue名字
                      body='Hello World!'
                      )
                      )
print(" [x] Sent 'Hello World!'")
connection.close()

receive端

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

#You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
#was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs.
channel.queue_declare(queue='hello2')

def callback(ch, method, properties, body): #回调函数
    print("-->",ch,method,properties)
    print(" [x] Receiverd %r" %body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(#消费信息
                      'hello2',
                      callback,) #如果收到消息,就调用CALLBACK 函数来处理消息


print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

远程连接rabbitmq server的话,需要配置权限 噢 

首先在rabbitmq server上创建一个用户

sudo rabbitmqctl  add_user hhh hhh1234

同时还要配置权限,允许从外面访问

sudo rabbitmqctl set_permissions -p / hhh ".*" ".*" ".*"

set_permissions [-p vhost] {user} {conf} {write} {read}

vhost

要授予用户访问权的虚拟主机的名称,默认为/。

user

授予对指定虚拟主机的访问权的用户的名称。

conf

一个正则表达式,它与授予用户配置权限的资源名称相匹配。

write

为用户授予写权限的与资源名称匹配的正则表达式。

 read

匹配资源名称的正则表达式,为用户授予读权限。

客户端连接的时候需要配置认证参数

credentials = pika.PlainCredentials('hhh', 'hhh1234')
 
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.1.40',5672,'/',credentials))
channel = connection.channel()

Work queues

在这种模式下,RabbitMQ会默认把p发的消息依次分发给各个消费者(c),跟负载均衡差不多

消息提供者代码

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()  #声明一个管道

#声明queue
channel.queue_declare(queue='hello2',durable=True)

# n RabbitMQ a message can never be sent directly to the queue,it always
channel.basic_publish(exchange='',
                      routing_key='hello2',#queue名字
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  #make message persistent
                      )
                      )
print(" [x] Sent 'Hello World!'")
connection.close()

消费者代码

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import pika, time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

#You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
#was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs.
channel.queue_declare(queue='hello2',durable=True)

def callback(ch, method, properties, body): #回调函数
    print("-->",ch,method,properties)
    # time.sleep(30)
    print(" [x] Receiverd %r" %body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(#消费信息
                      'hello2',
                      callback,) #如果收到消息,就调用CALLBACK 函数来处理消息
                      #True  #不确认

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

此时,先启动消息生产者,然后再分别启动3个消费者,通过生产者多发送几条消息,你会发现,这几条消息会被依次分配到各个消费者身上


完成一项任务需要几秒钟的时间。您可能想知道,如果其中一个消费者启动了一个很长的任务,但只完成了一部分就挂掉了,会发生什么情况。对于我们当前的代码,一旦RabbitMQ将消息传递给客户,它就会立即将其从内存中删除。在这种情况下,如果您杀死了一个工作人员,我们将丢失它正在处理的消息。我们还将丢失所有发送给这个特定worker但尚未处理的消息。

但我们不想失去任何任务。如果一个工人死了,我们希望把这个任务交给另一个工人。

为了确保消息永不丢失,RabbitMQ支持消息确认。从使用者发回一个ack(nowledgement),告诉RabbitMQ已经接收并处理了特定的消息,RabbitMQ可以自由地删除它。

如果使用者在不发送ack的情况下死亡(其通道关闭、连接关闭或TCP连接丢失),RabbitMQ将理解消息未被完全处理,并将重新对其排队。如果同时有其他的消费者在线,它会很快将其重新发送给另一个消费者。这样你可以确保没有信息丢失,即使工人偶尔死亡。

没有任何消息超时;当使用者死亡时,RabbitMQ将重新传递消息。即使处理一条消息需要很长很长的时间,也没关系。

消息确认在默认情况下是打开的。在前面的示例中,我们通过no_ack=True标志显式地关闭了它们。一旦我们完成了一个任务,就应该移除这个标志并从worker那里发送一个适当的确认。

def callback(ch, method, properties, body): #回调函数
    print("-->",ch,method,properties)
    # time.sleep(30)
    print(" [x] Receiverd %r" %body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(#消费信息
                      'hello2',
                      callback,) #如果收到消息,就调用CALLBACK 函数来处理消息

使用这段代码,我们可以确保即使您在处理消息时使用CTRL+C杀死了一个工作人员,也不会丢失任何东西。在工作人员死后不久,所有未确认的消息将被重新发送

消息持久化

我们已经学习了如何确保即使使用者死亡,任务也不会丢失(默认情况下,如果想禁用use no_ack=True)。但是,如果RabbitMQ服务器停止,我们的任务仍然会丢失。

当RabbitMQ退出或崩溃时,它将忘记队列和消息,除非您告诉它不要这样做。确保消息不丢失需要做两件事:我们需要将队列和消息都标记为持久的。

首先,我们需要确保RabbitMQ永远不会丢失我们的队列。为了做到这一点,我们需要声明它是耐用的:

channel.queue_declare(queue='hello2',durable=True)

虽然这个命令本身是正确的,但是它在我们的设置中不能工作。那是因为我们已经定义了一个名为hello的队列,它不是持久的。RabbitMQ不允许您使用不同的参数重新定义现有队列,并将向任何试图这样做的程序返回一个错误。但是有一个快速的解决方法——让我们声明一个不同名称的队列,例如:

channel.queue_declare(queue='task_queue', durable=True)

这个queue_declare更改需要同时应用到生产者代码和消费者代码。

此时,我们确信即使RabbitMQ重新启动,task_queue队列也不会丢失。现在,我们需要将消息标记为persistent—通过提供一个值为2的delivery_mode属性。

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

消息公平分发

如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。

channel.basic_qos(prefetch_count=1)

带消息持久化+公平分发的完整代码

生产者端

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print(" [x] Sent %r" % message)
connection.close()

消费者端

#!/usr/bin/env python
import pika
import time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')
 
channel.start_consuming()

PublishSubscribe(消息发布订阅)

之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,

交换是一件非常简单的事情。一边接收来自生产者的消息,另一边将消息推送到队列。交换器必须确切地知道如何处理接收到的消息。它应该被附加到一个特定的队列吗?它应该被添加到许多队列中吗?或者它应该被丢弃。这些规则由exchange类型定义。

Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息


fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

   表达式符号说明:#代表一个或多个字符,*代表任何字符
      例:#.a会匹配a.a,aa.a,aaa.a等
          *.a会匹配a.a,b.a,c.a等
     注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout 

headers: 通过headers 来决定把消息发给哪些queue

消息publisher

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

# message = ' '.join(sys.argv[1:]) or "info: Hello World!"
message = "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

消息subscriber

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

result = channel.queue_declare('',exclusive=True)  #exclusive排他,唯一的,不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = result.method.queue
print("random queuename",queue_name)

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print('[*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(queue_name,
                      callback,
                      True)

channel.start_consuming()

有选择的接收消息(exchange type=direct)

RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

publisher

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

consumer

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

result = channel.queue_declare('',exclusive=True)
queue_name = result.method.queue

serverities = sys.argv[1:]
if not serverities:
    sys.stderr.write("Usage: %s [info] [warning] [error]
" % sys.argv[0])
    sys.exit(1)
print(serverities)
for serverity in serverities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=serverity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
    print(" [x] %r:%r" %(method.routing_key, body))

channel.basic_consume(queue_name,
                      callback,
                      True)
channel.start_consuming()

更细致的消息过滤

虽然使用直接交换改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由。

在我们的日志系统中,我们可能不仅希望根据严重性订阅日志,还希望根据发出日志的源订阅日志。您可能从syslog unix工具中了解了这个概念,该工具根据严重程度(info/warn/crit…)和设施(auth/cron/kern…)来路由日志。

这将给我们很大的灵活性——我们可能希望只监听来自“cron”的关键错误,但也要监听来自“kern”的所有日志。

 publisher

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

consumer

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...
" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(queue_name,
                      callback,
                      True)

channel.start_consuming()

接收所有运行的日志:

python receive_logs_topic.py "#"

接收设施“kern”的日志:

python receive_logs_topic.py "kern.*"

或者你只想接收“critical”的日志:

python receive_logs_topic.py "*.critical"

你可以创建多个绑定:

python receive_logs_topic.py "kern.*" "*.critical"

并发出一个带有路由键“kern”的日志“critical”类型:

python emit_log_topic.py "kern.critical" "A critical kernel error"

Remote producer call(RPC)

为了说明如何使用RPC服务,我们将创建一个简单的客户机类。它将公开一个名为call的方法,发送一个RPC请求并阻塞,直到收到答案:

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print("fib(4) is %r" % result)

 RPC server

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id=
                                                     props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume('rpc_queue',on_request)

print("[x] Awaiting RPC requests")
channel.start_consuming()

RPC client

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
import uuid
import time

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters
                                                  (host='localhost'))
        self.channel = self.connection.channel()

        result = self.channel.queue_declare('',exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.callback_queue,
                                   self.on_response,    #只要一收到消息就调用on_response
                                   True)
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.corr_id,
                                   ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()   #非阻塞版的start_consuming()
            print("no msg")
            time.sleep(0.5)
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print("[x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print("[.] Got %r" % response)

七、Memcached

http://www.cnblogs.com/wupeiqi/articles/5132791.html 

八、redis

介绍

redis是业界主流的key-value nosql 数据库之一。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash(哈希类型)。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,redis支持各种不同方式的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。

Redis优点

  • 异常快速 : Redis是非常快的,每秒可以执行大约110000设置操作,81000个/每秒的读取操作。

  • 支持丰富的数据类型 : Redis支持最大多数开发人员已经知道如列表,集合,可排序集合,哈希等数据类型。

    这使得在应用中很容易解决的各种问题,因为我们知道哪些问题处理使用哪种数据类型更好解决。
  • 操作都是原子的 : 所有 Redis 的操作都是原子,从而确保当两个客户同时访问 Redis 服务器得到的是更新后的值(最新值)。

  • MultiUtility工具:Redis是一个多功能实用工具,可以在很多如:缓存,消息传递队列中使用(Redis原生支持发布/订阅),在应用程序中,如:Web应用程序会话,网站页面点击数等任何短暂的数据;

安装Redis环境

要在 Ubuntu 上安装 Redis,打开终端,然后输入以下命令:
$sudo apt-get update
$sudo apt-get install redis-server
这将在您的计算机上安装Redis

启动 Redis

$redis-server

查看 redis 是否还在运行

$redis-cli
这将打开一个 Redis 提示符,如下图所示:
redis 127.0.0.1:6379>
在上面的提示信息中:127.0.0.1 是本机的IP地址,6379是 Redis 服务器运行的端口。现在输入 PING 命令,如下图所示:
redis 127.0.0.1:6379> ping
PONG
这说明现在你已经成功地在计算机上安装了 Redis。
 
连接方式

1、操作模式

redis-py提供两个类Redis和StrictRedis用于实现Redis的命令,StrictRedis用于实现大部分官方的命令,并使用官方的语法和命令,Redis是StrictRedis的子类,用于向后兼容旧版本的redis-py。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import redis

r = redis.Redis(host='192.168.1.40',port=6379)
r.set('foo','Bar')

print(r.get('foo'))

2、连接池

redis-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数Redis,这样就可以实现多个Redis实例共享一个连接池。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import redis

pool = redis.ConnectionPool(host='192.168.1.40',port=6379)

r = redis.Redis(connection_pool=pool)
r.set('foo','Bar')

print(r.get('foo'))

操作

1. String操作

redis中的String在在内存中按照一个name对应一个value来存储。如图:

set(name, value, ex=None, px=None, nx=False, xx=False)

在Redis中设置值,默认,不存在则创建,存在则修改
参数:
     ex,过期时间(秒)
     px,过期时间(毫秒)
     nx,如果设置为True,则只有name不存在时,当前set操作才执行
     xx,如果设置为True,则只有name存在时,岗前set操作才执行

setnx(name, value)

设置值,只有name不存在时,执行设置操作(添加)

setex(name, value, time)

# 设置值
# 参数:
    # time,过期时间(数字秒 或 timedelta对象)

psetex(name, time_ms, value)

# 设置值
# 参数:
    # time_ms,过期时间(数字毫秒 或 timedelta对象)

mset(*args, **kwargs)

批量设置值
如:
    mset(k1='v1', k2='v2')
    
    mget({'k1''v1''k2''v2'})

get(name)

获取值

mget(keys, *args)

批量获取
如:
    mget('ylr''wupeiqi')
    
    r.mget(['ylr''wupeiqi'])

getset(name, value)

设置新值并获取原来的值

getrange(key, start, end)

# 获取子序列(根据字节获取,非字符)
# 参数:
    # name,Redis 的 name
    # start,起始位置(字节)
    # end,结束位置(字节)
# 如: "武沛齐" ,0-3表示 "武"

setrange(name, offset, value)

# 修改字符串内容,从指定字符串索引开始向后替换(新值太长时,则向后添加)
# 参数:
    # offset,字符串的索引,字节(一个汉字三个字节)
    # value,要设置的值

setbit(name, offset, value)

# 对name对应值的二进制表示的位进行操作
 
# 参数:
    # name,redis的name
    # offset,位的索引(将值变换成二进制后再进行索引)
    # value,值只能是 1 或 0
 
# 注:如果在Redis中有一个对应: n1 = "foo",
        那么字符串foo的二进制表示为:01100110 01101111 01101111
    所以,如果执行 setbit('n1'71),则就会将第7位设置为1
        那么最终二进制则变成 01100111 01101111 01101111,即:"goo"
 
# 扩展,转换二进制表示:
 
    # source = "武沛齐"
    source = "foo"
 
    for in source:
        num = ord(i)
        print bin(num).replace('b','')
 
    特别的,如果source是汉字 "武沛齐"怎么办?
    答:对于utf-8,每一个汉字占 3 个字节,那么 "武沛齐" 则有 9个字节
       对于汉字,for循环时候会按照 字节 迭代,那么在迭代时,将每一个字节转换 十进制数,然后再将十进制数转换成二进制
        11100110 10101101 10100110 11100110 10110010 10011011 11101001 10111101 10010000
        -------------------------- ----------------------------- -----------------------------
                    武                         沛                           齐

*用途举例,用最省空间的方式,存储在线用户数及分别是哪些用户在线

getbit(name, offset)

# 获取name对应的值的二进制表示中的某位的值 (0或1)

bitcount(key, start=None, end=None)

# 获取name对应的值的二进制表示中 1 的个数
# 参数:
    # key,Redis的name
    # start,位起始位置
    # end,位结束位置

strlen(name)

# 返回name对应值的字节长度(一个汉字3个字节)

incr(self, name, amount=1)

# 自增 name对应的值,当name不存在时,则创建name=amount,否则,则自增。
 
# 参数:
    # name,Redis的name
    # amount,自增数(必须是整数)
 
# 注:同incrby

incrbyfloat(self, name, amount=1.0)

# 自增 name对应的值,当name不存在时,则创建name=amount,否则,则自增。
 
# 参数:
    # name,Redis的name
    # amount,自增数(浮点型)

decr(self, name, amount=1)

# 自减 name对应的值,当name不存在时,则创建name=amount,否则,则自减。
 
# 参数:
    # name,Redis的name
    # amount,自减数(整数)

append(key, value)

# 在redis name对应的值后面追加内容
 
# 参数:
    key, redis的name
    value, 要追加的字符串

  

2. Hash操作

hash表现形式上有些像pyhton中的dict,可以存储一组关联性较强的数据 , redis中Hash在内存中的存储格式如下图:  

hset(name, key, value)

# name对应的hash中设置一个键值对(不存在,则创建;否则,修改)
 
# 参数:
    # name,redis的name
    # key,name对应的hash中的key
    # value,name对应的hash中的value
 
# 注:
    # hsetnx(name, key, value),当name对应的hash中不存在当前key时则创建(相当于添加)

hmset(name, mapping)

# 在name对应的hash中批量设置键值对
 
# 参数:
    # name,redis的name
    # mapping,字典,如:{'k1':'v1', 'k2': 'v2'}
 
# 如:
    # r.hmset('xx', {'k1':'v1', 'k2': 'v2'})

hget(name,key)

# 在name对应的hash中获取根据key获取value

hmget(name, keys, *args)

# 在name对应的hash中获取多个key的值
 
# 参数:
    # name,reids对应的name
    # keys,要获取key集合,如:['k1', 'k2', 'k3']
    # *args,要获取的key,如:k1,k2,k3
 
# 如:
    # r.mget('xx', ['k1', 'k2'])
    # 或
    # print r.hmget('xx', 'k1', 'k2')

hgetall(name)

获取name对应hash的所有键值

hlen(name)

# 获取name对应的hash中键值对的个数

hkeys(name)

# 获取name对应的hash中所有的key的值

hvals(name)

# 获取name对应的hash中所有的value的值

hexists(name, key)

# 检查name对应的hash是否存在当前传入的key

hdel(name,*keys)

# 将name对应的hash中指定key的键值对删除

hincrby(name, key, amount=1)

# 自增name对应的hash中的指定key的值,不存在则创建key=amount
# 参数:
    # name,redis中的name
    # key, hash对应的key
    # amount,自增数(整数)

hincrbyfloat(name, key, amount=1.0)

# 自增name对应的hash中的指定key的值,不存在则创建key=amount
 
# 参数:
    # name,redis中的name
    # key, hash对应的key
    # amount,自增数(浮点数)
 
# 自增name对应的hash中的指定key的值,不存在则创建key=amount

hscan(name, cursor=0, match=None, count=None)

Start a full hash scan with:

HSCAN myhash 0

Start a hash scan with fields matching a pattern with:

HSCAN myhash 0 MATCH order_*

Start a hash scan with fields matching a pattern and forcing the scan command to do more scanning with:

HSCAN myhash 0 MATCH order_* COUNT 1000

 

# 增量式迭代获取,对于数据大的数据非常有用,hscan可以实现分片的获取数据,并非一次性将数据全部获取完,从而放置内存被撑爆
 
# 参数:
    # name,redis的name
    # cursor,游标(基于游标分批取获取数据)
    # match,匹配指定key,默认None 表示所有的key
    # count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数
 
# 如:
    # 第一次:cursor1, data1 = r.hscan('xx', cursor=0, match=None, count=None)
    # 第二次:cursor2, data1 = r.hscan('xx', cursor=cursor1, match=None, count=None)
    # ...
    # 直到返回值cursor的值为0时,表示数据已经通过分片获取完毕

hscan_iter(name, match=None, count=None)

# 利用yield封装hscan创建生成器,实现分批去redis中获取数据
  
# 参数:
    # match,匹配指定key,默认None 表示所有的key
    # count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数
  
# 如:
    # for item in r.hscan_iter('xx'):
    #     print item

3. list

List操作,redis中的List在在内存中按照一个name对应一个List来存储。如图:  

lpush(name,values)

# 在name对应的list中添加元素,每个新的元素都添加到列表的最左边
 
# 如:
    # r.lpush('oo', 11,22,33)
    # 保存顺序为: 33,22,11
 
# 扩展:
    # rpush(name, values) 表示从右向左操作

lpushx(name,value)

# 在name对应的list中添加元素,只有name已经存在时,值添加到列表的最左边
 
# 更多:
    # rpushx(name, value) 表示从右向左操作

llen(name)

# name对应的list元素的个数

linsert(name, where, refvalue, value))

# 在name对应的列表的某一个值前或后插入一个新值
 
# 参数:
    # name,redis的name
    # where,BEFORE或AFTER
    # refvalue,标杆值,即:在它前后插入数据
    # value,要插入的数据

r.lset(name, index, value)

# 对name对应的list中的某一个索引位置重新赋值
 
# 参数:
    # name,redis的name
    # index,list的索引位置
    # value,要设置的值

r.lrem(name, value, num)

# 在name对应的list中删除指定的值
 
# 参数:
    # name,redis的name
    # value,要删除的值
    # num,  num=0,删除列表中所有的指定值;
           # num=2,从前到后,删除2个;
           # num=-2,从后向前,删除2个

lpop(name)

# 在name对应的列表的左侧获取第一个元素并在列表中移除,返回值则是第一个元素
 
# 更多:
    # rpop(name) 表示从右向左操作

lindex(name, index)

在name对应的列表中根据索引获取列表元素

lrange(name, start, end)

# 在name对应的列表分片获取数据
# 参数:
    # name,redis的name
    # start,索引的起始位置
    # end,索引结束位置

ltrim(name, start, end)

# 在name对应的列表中移除没有在start-end索引之间的值
# 参数:
    # name,redis的name
    # start,索引的起始位置
    # end,索引结束位置

rpoplpush(src, dst)

# 从一个列表取出最右边的元素,同时将其添加至另一个列表的最左边
# 参数:
    # src,要取数据的列表的name
    # dst,要添加数据的列表的name

blpop(keys, timeout)

# 将多个列表排列,按照从左到右去pop对应列表的元素
 
# 参数:
    # keys,redis的name的集合
    # timeout,超时时间,当元素所有列表的元素获取完之后,阻塞等待列表内有数据的时间(秒), 0 表示永远阻塞
 
# 更多:
    # r.brpop(keys, timeout),从右向左获取数据

brpoplpush(src, dst, timeout=0)

# 从一个列表的右侧移除一个元素并将其添加到另一个列表的左侧
 
# 参数:
    # src,取出并要移除元素的列表对应的name
    # dst,要插入元素的列表对应的name
    # timeout,当src对应的列表中没有数据时,阻塞等待其有数据的超时时间(秒),0 表示永远阻塞

4.set集合操作

Set操作,Set集合就是不允许重复的列表

sadd(name,values)

# name对应的集合中添加元素

scard(name)

获取name对应的集合中元素个数

sdiff(keys, *args)

在第一个name对应的集合中且不在其他name对应的集合的元素集合

sdiffstore(dest, keys, *args)

# 获取第一个name对应的集合中且不在其他name对应的集合,再将其新加入到dest对应的集合中

sinter(keys, *args)

# 获取多一个name对应集合的并集

sinterstore(dest, keys, *args)

# 获取多一个name对应集合的并集,再讲其加入到dest对应的集合中

sismember(name, value)

# 检查value是否是name对应的集合的成员

smembers(name)

# 获取name对应的集合的所有成员

smove(src, dst, value)

# 将某个成员从一个集合中移动到另外一个集合

spop(name)

# 从集合的右侧(尾部)移除一个成员,并将其返回

srandmember(name, numbers)

# 从name对应的集合中随机获取 numbers 个元素

srem(name, values)

# 在name对应的集合中删除某些值

sunion(keys, *args)

# 获取多一个name对应的集合的并集

sunionstore(dest,keys, *args)

# 获取多一个name对应的集合的并集,并将结果保存到dest对应的集合中

sscan(name, cursor=0, match=None, count=None)
sscan_iter(name, match=None, count=None)

# 同字符串的操作,用于增量迭代分批获取元素,避免内存消耗太大

有序集合,在集合的基础上,为每元素排序;元素的排序需要根据另外一个值来进行比较,所以,对于有序集合,每一个元素有两个值,即:值和分数,分数专门用来做排序。

zadd(name, *args, **kwargs)

# 在name对应的有序集合中添加元素
# 如:
     # zadd('zz', 'n1', 1, 'n2', 2)
     # 或
     # zadd('zz', n1=11, n2=22)

zcard(name)

# 获取name对应的有序集合元素的数量

zcount(name, min, max)

# 获取name对应的有序集合中分数 在 [min,max] 之间的个数

zincrby(name, value, amount)

# 自增name对应的有序集合的 name 对应的分数

r.zrange( name, start, end, desc=False, withscores=False, score_cast_func=float)

# 按照索引范围获取name对应的有序集合的元素
 
# 参数:
    # name,redis的name
    # start,有序集合索引起始位置(非分数)
    # end,有序集合索引结束位置(非分数)
    # desc,排序规则,默认按照分数从小到大排序
    # withscores,是否获取元素的分数,默认只获取元素的值
    # score_cast_func,对分数进行数据转换的函数
 
# 更多:
    # 从大到小排序
    # zrevrange(name, start, end, withscores=False, score_cast_func=float)
 
    # 按照分数范围获取name对应的有序集合的元素
    # zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float)
    # 从大到小排序
    # zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)

zrank(name, value)

# 获取某个值在 name对应的有序集合中的排行(从 0 开始)
 
# 更多:
    # zrevrank(name, value),从大到小排序

zrem(name, values)

# 删除name对应的有序集合中值是values的成员
 
# 如:zrem('zz', ['s1', 's2'])

zremrangebyrank(name, min, max)

# 根据排行范围删除

zremrangebyscore(name, min, max)

# 根据分数范围删除

zscore(name, value)

# 获取name对应有序集合中 value 对应的分数

zinterstore(dest, keys, aggregate=None)

# 获取两个有序集合的交集,如果遇到相同值不同分数,则按照aggregate进行操作
# aggregate的值为:  SUM  MIN  MAX

zunionstore(dest, keys, aggregate=None)

# 获取两个有序集合的并集,如果遇到相同值不同分数,则按照aggregate进行操作
# aggregate的值为:  SUM  MIN  MAX

zscan(name, cursor=0, match=None, count=None, score_cast_func=float)
zscan_iter(name, match=None, count=None,score_cast_func=float)

# 同字符串相似,相较于字符串新增score_cast_func,用来对分数进行操作

其他常用操作

delete(*names)

# 根据删除redis中的任意数据类型

exists(name)

# 检测redis的name是否存在

keys(pattern='*')

# 根据模型获取redis的name
 
# 更多:
    # KEYS * 匹配数据库中所有 key 。
    # KEYS h?llo 匹配 hello , hallo 和 hxllo 等。
    # KEYS h*llo 匹配 hllo 和 heeeeello 等。
    # KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo

expire(name ,time)

# 为某个redis的某个name设置超时时间

rename(src, dst)

# 对redis的name重命名为

move(name, db))

# 将redis的某个值移动到指定的db下

randomkey()

# 随机获取一个redis的name(不删除)

type(name)

# 获取name对应值的类型

scan(cursor=0, match=None, count=None)
scan_iter(match=None, count=None)

# 同字符串操作,用于增量迭代获取key

管道

redis-py默认在执行每次请求都会创建(连接池申请连接)和断开(归还连接池)一次连接操作,如果想要在一次请求中指定多个命令,则可以使用pipline实现一次请求指定多个命令,并且默认情况下一次pipline 是原子性操作。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import redis
import time

pool = redis.ConnectionPool(host='192.168.1.40',port=6379)

r = redis.Redis(connection_pool=pool)

# pipe = r.pipeline(transaction=False)
pipe = r.pipeline(transaction=True)

pipe.set('name', 'alex')
# time.sleep(50)
pipe.set('role', 'sb')

pipe.execute()

发布订阅

发布者:服务器

订阅者:Dashboad和数据处理

Demo如下:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import redis


class RedisHelper:

    def __init__(self):
        self.__conn = redis.Redis(host='192.168.1.40')
        self.chan_sub = 'fm104.5'
        self.chan_pub = 'fm104.5'

    def public(self, msg):
        self.__conn.publish(self.chan_pub, msg)
        return True

    def subscribe(self):
        pub = self.__conn.pubsub()  #打开收音机
        pub.subscribe(self.chan_sub)    #调频道
        pub.parse_response()    #准备接收
        return pub

订阅者

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from redis_helper import RedisHelper

obj = RedisHelper()
redis_sub = obj.subscribe()

while True:
    msg = redis_sub.parse_response()
    print(msg)

发布者

#!/usr/bin/env python
# -*- coding:utf-8 -*-



from redis_helper import RedisHelper

obj = RedisHelper()
obj.public('hello')

作业一:

题目:IO多路复用版FTP

需求:

  1. 实现文件上传及下载功能
  2. 支持多连接并发传文件
  3. 使用select or selectors

作业二:

题目:rpc命令端

需求:

  1. 可以异步的执行多个命令
  2. 对多台机器

>>:run "df -h" --hosts 192.168.3.55 10.4.3.4
task id: 45334
>>: check_task 45334
>>:

原文地址:https://www.cnblogs.com/guantou1992/p/11777444.html