Rabbitmq的connection连接池(Python版)

Rabbitmq Connect与Channel

RabbitMQ官方提供了Connection对象,本质就是一个TCP连接对象。
Channels对象,虚拟连接。虚拟连接建立在上面Connection对象的TCP连接中。数据流动都是在Channel中进行的。每个Connection对象的虚拟连接也是有限的,如果单个Connnection的Channel对象超出指定范围了,也会有性能问题,另外一个TCP连接上的多个虚拟连接,实际在传输数据时,传输数据的虚拟连接还是独占了TCP连接,其它虚拟连接在排队等待。

在单个的Connection对象创建多个Channel来实现数据传输,在channel信息比较大的情况下,Connection带宽会限制消息的传输。那么需要设计Connection池,将流量分摊到不同的connection上。

官网对于Connection的解读:

AMQP 0-9-1 connections are typically long-lived. AMQP 0-9-1 is an application level protocol that uses TCP for reliable delivery. Connections use authentication and can be protected using TLS. When an application no longer needs to be connected to the server, it should gracefully close its AMQP 0-9-1 connection instead of abruptly closing the underlying TCP connection.

大概意思就是:AMQP 0-9-1一般是一个TCP的长链接,当应用程序不再需要连接到服务器时,应该正常关闭AMQP 0-9-1连接而不是关闭TCP连接。

官网对于Channel的解读:

Some applications need multiple connections to the broker. However, it is undesirable to keep many TCP connections open at the same time because doing so consumes system resources and makes it more difficult to configure firewalls. AMQP 0-9-1 connections are multiplexed withchannels that can be thought of as "lightweight connections that share a single TCP connection".
Every protocol operation performed by a client happens on a channel. Communication on a particular channel is completely separate from communication on another channel, therefore every protocol method also carries a channel ID (a.k.a. channel number), an integer that both the broker and clients use to figure out which channel the method is for.
A channel only exists in the context of a connection and never on its own. When a connection is closed, so are all channels on it.

For applications that use multiple threads/processes for processing, it is very common to open a new channel per thread/process and not share channels between them.

大概的意思就是:一些应用需要同时创建多个连接到broker也就是RabbitMQ服务器上。然而因为防火墙的存在,很难同时创建多个连接。 AMQP 0-9-1连接使用多个channel连接实现对单一Connection的复用。
客户端的每一个协议操作都发送在channel上。每个协议方法携带者channel IDbrokerclient使用channel ID来确定方法对应的channel。因此实现channel之间的数据隔离。
channel不能单独存在,仅存在connection上下文中。当connection关闭时,channel也会关闭。
多线程/进程之间打开一个channel但不共享channels是很普遍的。

通道和并发注意事项(线程安全)

As a rule of thumb, sharing Channel instances between threads is something to be avoided. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads.

线程之间共享channel是无法避免的,应用程序跟喜欢每个线程使用一个channel而不是跨线程共享相同的channel

A classic anti-pattern to be avoided is opening a channel for each published message. Channels are supposed to be reasonably long-lived and opening a new one is a network round-trip which makes this pattern extremely inefficient.

要避免一个反例,为每一个发布的消息新建一个channel,开辟一个新的channel需要一个网络的往返,这种模式是很低效的。channel保持合理的存活时间。

It is possible to use channel pooling to avoid concurrent publishing on a shared channel: once a thread is done working with a channel, it returns it to the pool, making the channel available for another thread. Channel pooling can be thought of as a specific synchronization solution. It is recommended that an existing pooling library is used instead of a homegrown solution. For example, Spring AMQP which comes with a ready-to-use channel pooling feature.

可以使用channel pool来避免共享channel上并发发布:一旦一个线程使用完了channel,那么它将返回到pool中。其他线程便可使用这个Channel。线程池是一个解决方案,可以使用 Spring AMQP线程池而不是自己开发。

总结:频繁建立TCP连接和channel连接是消耗性能的,于是我们希望可以共享connection或者channel。达到连接的复用

Python实现rabbitmq connection连接池

import pika
import threading
import random
import uuid



"""
Class:  
Parameters:
    Connectionsize:int类型,Rabbitmqpool池连接的最大数
    Channelsize:int类型,Rabbitmqpool池Channel的最大数
return:None
"""
# 单例保证唯一
class Rabbitmqpool:
    # 定义类属性
    __instance = None
    __lock = threading.Lock()

    def __init__(self, Connectionsize, Channelsize):
        self.maxConnectionsize = Connectionsize
        self.maxChannelsize = Channelsize
        self.nowConnectionsize = 0
        self.nowChannelsize = 0
        self.connectpool = {}
        self.channelpool = {}
        self.certdic = {}
    def __new__(cls, Connectionsize, Channelsize):
        if not cls.__instance:
            cls.__instance = object.__new__(cls)
        return cls.__instance
    """
    function:  获取一个空闲Channel或者新建一个Channel
    Parameters:

    return:
        channel:channel
        cname:连接名
    """

    def get_channel(self):
        try:
            self.__lock.acquire()
            cname = ""
            channel = None
            # 在已存在键中查找空闲Channel
            for connectionname in self.connectpool:
                if len(self.channelpool[connectionname]) != 0:
                    channel = self.channelpool[connectionname][-1]
                    cname = connectionname
                    self.channelpool[connectionname] = self.channelpool[connectionname][0:-1]
                    print("取出一个Channel")
                    break
            # 如果没有找到空闲Channel,canme为"",则新建一个Channel
            if cname == "":
                if self.nowChannelsize < self.maxChannelsize:
                    # 从连接池返回一个连接的名字
                    if len(self.connectpool) != 0:
                        cname = random.choice(list(self.connectpool))
                        # 根据名字拿到此连接,传入连接和Pool池创建Channel
                        CreateChannel(self.connectpool[cname], self)
                        # 得到一个新Channel
                        channel = self.channelpool[cname][-1]
                        self.channelpool[cname] = self.channelpool[cname][0:-1]
                        print("创建一个Channel")
                    # 如果没有连接,则新建连接与channel
                    else:
                        if len(self.certdic) != 0:
                            cert = random.choice(list(self.certdic))
                            cname = str(uuid.uuid4().int)
                            print("创建一个连接")
                            CreateConnection(str(self.certdic[cert]["rabbitmq_host"]), str(self.certdic[cert]["rabbitmq_port"]),
                                             str(self.certdic[cert]["rabbitmq_virtual_host"]),
                                             str(self.certdic[cert]["rabbitmq_user"]),
                                             str(self.certdic[cert]["rabbitmq_password"]), self, cname)
                            CreateChannel(self.connectpool[cname], self)
                            # 得到一个新Channel
                            channel = self.channelpool[cname][-1]
                            self.channelpool[cname] = self.channelpool[cname][0:-1]
                            print("创建一个Channel")
                        else:
                            print("无法创建Channel,无连接凭证,不能创建连接!")
                else:
                    print("无法创建Channel,超过限制")

        finally:
            self.__lock.release()
        return channel, cname

    def create_channel(self):
        try:
            self.__lock.acquire()
            if len(self.certdic) != 0:
                cert = random.choice(list(self.certdic))
                cname = str(uuid.uuid4().int)
                print("创建一个连接")
                CreateConnection(str(self.certdic[cert]["rabbitmq_host"]), str(self.certdic[cert]["rabbitmq_port"]),
                                 str(self.certdic[cert]["rabbitmq_virtual_host"]),
                                 str(self.certdic[cert]["rabbitmq_user"]),
                                 str(self.certdic[cert]["rabbitmq_password"]), self, cname)
                CreateChannel(self.connectpool[cname], self)
                # 得到一个新Channel
                channel = self.channelpool[cname][-1]
                self.channelpool[cname] = self.channelpool[cname][0:-1]
                print("创建一个Channel")
                return channel, cname
            else:
                print("无法创建Channel,无连接凭证,不能创建连接!")
            return None,""
        finally:
            self.__lock.release()

    def return_channel(self, channel, connectionname):
        try:
            self.__lock.acquire()
            self.channelpool[connectionname].append(channel)
        finally:
            self.__lock.release()

    def closepool(self):
        pass

    def delconnection(self, connectionname):
        try:
            self.__lock.acquire()
            if connectionname in self.connectpool:
                del self.connectpool[connectionname]
                self.nowConnectionsize = self.nowConnectionsize -1
                self.nowChannelsize  = self.nowChannelsize - len(self.channelpool[connectionname])
                del self.channelpool[connectionname]

        finally:
            self.__lock.release()

    def get_certtemplate(self):
        return {"rabbitmq_host": "", "rabbitmq_port": 5672, "rabbitmq_virtual_host": "", "rabbitmq_user": "",
                "rabbitmq_password": ""}

    def addcert(self,cert):
        self.certdic[cert["rabbitmq_host"]] = cert


# 连接可以自己创建
class CreateConnection:
    def __init__(self, rabbitmq_host, rabbitmq_port, rabbitmq_virtual_host, rabbitmq_user, rabbitmq_password,
                 Rabbitmqpool, Connectionname = str(uuid.uuid4().int), heartbeat=60):
        if Rabbitmqpool.nowConnectionsize < Rabbitmqpool.maxConnectionsize:
            if Connectionname not in Rabbitmqpool.connectpool:
                self.rabbitmq_user = str(rabbitmq_user)
                self.rabbitmq_password = str(rabbitmq_password)
                self.rabbitmq_host = rabbitmq_host
                self.rabbitmq_port = rabbitmq_port
                self.rabbitmq_virtual_host = rabbitmq_virtual_host
                self.connectionname = Connectionname
                print(self.rabbitmq_user,self.rabbitmq_password,self.rabbitmq_host,self.rabbitmq_port,self.rabbitmq_virtual_host,self.connectionname)
                credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password)
                try:
                    self.connection = pika.BlockingConnection(
                        pika.ConnectionParameters(
                            host=rabbitmq_host,
                            port=rabbitmq_port,
                            virtual_host=rabbitmq_virtual_host,
                            heartbeat=heartbeat,
                            credentials=credentials))
                    Rabbitmqpool.connectpool[Connectionname] = self
                    Rabbitmqpool.nowConnectionsize += 1
                    if self.connectionname not in Rabbitmqpool.channelpool:
                        Rabbitmqpool.channelpool[self.connectionname] = []
                    print("创建连接:", Connectionname)
                except Exception as e:
                    print("创建连接失败:", e)
            else:
                print("创建连接失败,此连接名已存在:", Connectionname)
        else:
            print("创建连接失败,连接池已满,无法创建连接池")

    def get_connection(self):
        return self.connection


class CreateChannel:
    def __init__(self, Connection, Rabbitmqpool):
        Rabbitmqpool.channelpool[Connection.connectionname].append(Connection.get_connection().channel())
        Rabbitmqpool.nowChannelsize += 1

我这里并没有增加过期时间:

初始化

rabbitmq_host = ""
rabbitmq_port = ""
rabbitmq_user = ""
rabbitmq_password = ""
rabbitmq_virtual_host = ""
credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password)
Pool = Rabbitmqpool(3, 20)
cert = Pool.get_certtemplate()
cert['rabbitmq_host'] = rabbitmq_host
cert['rabbitmq_virtual_host'] = rabbitmq_virtual_host
cert['rabbitmq_user'] = rabbitmq_user
cert['rabbitmq_password'] = rabbitmq_password
cert['rabbitmq_port'] = rabbitmq_port
Pool.addcert(cert)

发送消息代码

 try:
        c, cname = Pool.get_channel()
        c.basic_publish(exchange='',
                        routing_key='队列名',
                        body=str(data),
                        )
        Pool.return_channel(c, cname)
    except Exception as e:
        print("发送错误:",e) #链接过期
        Pool.delconnection(cname) #channel过期时,删除此链接和此链接下的所有channel
        c, cname = Pool.create_channel() #创建一个新的链接和channel
        c.basic_publish(exchange='',
                        routing_key='队列名',
                        body=str(senddata),
                        )
        Pool.return_channel(c, cname)

refer:

https://www.jianshu.com/p/24e541170ace

原文地址:https://www.cnblogs.com/-wenli/p/13578837.html