RabbitMQ

RabbitMQ:
和python queue的区别
线程queue,多个线程之间数据同步用
进程queue,用于父进程与子进程交互,或同一父进程下多个子进程之间交互
rabbitmq,两个独立进程之间交互

依赖erlang
  名词解析:
    connection: tcp连接,队里在此之上
  channel: 定义queue和exchange的地方
  queue: 存储消息的地方
   exchange: 消息转发器
  Message acknowledgment: 收到消息确认,在接收到确认之前,mq不会删除消息
   Message durability: 持久化,分为队列持久化和消息持久化
   Prefetch count: 定义一个消费者能处理几个消息,在这些消息的确认收到之前,不会发送新的消息
   binding: 通过这个将exchange和queue联系起来
   vhost:虚拟主机,里面好多exchange,
user: 针对vhost配置权限


producer
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# -*- Author:Hinimix -*-

import pika

creditials = pika.PlainCredentials('admin', 'admin')
conn = pika.BlockingConnection(
    pika.ConnectionParameters("192.168.20.61", 5672, '/', creditials)
)

channel = conn.channel()    #声明一个管道

channel.queue_declare(queue='hinimix')

channel.basic_publish(
    exchange='',            #
    routing_key='hinimix',    #queue名字
    body='hello, world'     #内容
)

print("[x] sent 'hello world")

conn.close()
   consumer 
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# -*- Author:Hinimix -*-

import pika

creditials = pika.PlainCredentials('admin', 'admin')
conn = pika.BlockingConnection(
    pika.ConnectionParameters("192.168.20.61", 5672, '/', creditials)
)

channel = conn.channel()
channel.queue_declare(queue='hinimix')


def callback(ch, method, properties, body):
    print(ch, '@@', method,'@@', properties,'@@', body)
    print("[x] received %r" % body)

channel.basic_consume(
    callback,           # 如果收到消息调用callback来处理消息
    queue='hinimix',
    no_ack=True      # 不确认,消息不论处理完与否,都不会给服务器端发确认.默认是确认的
)

print("[*] waiting for messages.")
channel.start_consuming()
轮询:
启用多个消费者时候,生产者的消息是轮询分发到每个消费者上

消息持久化:
普通消息存在与内存里
在生产者里这样写
channel.queue_declare(queue='hinimix', durable=True) # 队列持久化
channel.basic_publish(
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
))
在消费者里加上
channel.queue_declare(queue='hinimix', durable=True) # 队列持久化

消息公平分发:
在消费者端加上channel.basic_qos(prefetch_count=1) #表示当前消费者在1个消息处理完之前不接受新消息
exchange类型:
  fanout:所有bind到这个exchange的queue都可以接受消息
  direct:通过routingKey和exchange决定哪个唯一的queue可以接收消息,完全匹配routingkey
  topic:所有符合routingKey(可能是表达式)的routingKey所bind的queue可以接受消息,模糊匹配routingkey
   # 收到所有消息
   * *.info收到所有以.info结尾的
  headers:不依赖routing key 和 binding key,通过消息的header匹配


fanout:
生产者
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# -*- Author:Hinimix -*-
# 生产者

import pika

creditials = pika.PlainCredentials('admin', 'admin')
conn = pika.BlockingConnection(
    pika.ConnectionParameters("192.168.20.61", 5672, '/', creditials)
)

channel = conn.channel()    #声明一个管道

channel.exchange_declare(exchange='jl',
                         exchange_type='fanout')

message = "hnm: Hello world"
channel.basic_publish(
    exchange='jl',            #exchange名字
    routing_key='',    #queue名字
    body=message,     #内容
    properties=pika.BasicProperties(
        delivery_mode=2, # 消息持久化
    )
)

print("[x] sent 'hello world")

conn.close()

        消费者

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# -*- Author:Hinimix -*-

import pika
import time

creditials = pika.PlainCredentials('admin', 'admin')
conn = pika.BlockingConnection(
    pika.ConnectionParameters("192.168.20.61", 5672, '/', creditials)
)

channel = conn.channel()
channel.exchange_declare(exchange='jl',
                         exchange_type='fanout')
# channel.queue_declare(queue='hinimix',
#                       durable=True)
# channel.basic_qos(prefetch_count=1)

result = channel.queue_declare(exclusive=True)   #排他的,唯一的
queue_name = result.method.queue
print("queue name: ", queue_name)
channel.queue_bind(exchange='jl',
                   queue=queue_name)



def callback(ch, method, properties, body):
    print(ch, '@@', method,'@@', properties,'@@', body)
    time.sleep(.1)
    print("[x] received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)      #手动确认


channel.basic_consume(
    callback,           # 如果收到消息调用callback来处理消息
    queue=queue_name,
    # no_ack=True     # True自动确认,tcp层面确认,实际处理完与否并不知道,默认是False
)

print("[*] waiting for messages.")
channel.start_consuming()

  direct:

    生产者:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# -*- Author:Hinimix -*-

import pika
import sys


creditials = pika.PlainCredentials('admin', 'admin')
conn = pika.BlockingConnection(
    pika.ConnectionParameters("192.168.20.61", 5672, '/', creditials)
)

channel = conn.channel()    #声明一个管道

channel.exchange_declare(exchange='hi_direct',
                         exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'warning'

message = ' '.join(sys.argv[2:]) or "Hello World!222"

channel.basic_publish(
    exchange='hi_direct',            #exchange名字
    routing_key=severity,    #queue名字
    body=message,     #内容
    properties=pika.BasicProperties(
        delivery_mode=2, # 消息持久化
    )
)

print("[x] sent 'hello world222")

conn.close()

    消费者:


#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# -*- Author:Hinimix -*-

import pika
import time
import sys

creditials = pika.PlainCredentials('admin', 'admin')
conn = pika.BlockingConnection(
    pika.ConnectionParameters("192.168.20.61", 5672, '/', creditials)
)

channel = conn.channel()
channel.exchange_declare(exchange='hi_direct',
                         exchange_type='direct')

result = channel.queue_declare(exclusive=True)   #排他的,唯一的
queue_name = result.method.queue
print("queue name: ", queue_name)

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error] 
" % sys.argv[0])
    sys.exit(1)
print(severities)

for severity in severities:
    channel.queue_bind(exchange='hi_direct',
                       queue=queue_name,
                       routing_key=severity)


def callback(ch, method, properties, body):
    print(ch, '@@', method,'@@', properties,'@@', body)
    time.sleep(.1)
    print("[x] received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)      #手动确认


channel.basic_consume(
    callback,           # 如果收到消息调用callback来处理消息
    queue=queue_name,
)

print("[*] waiting for messages.")
channel.start_consuming()

  topic
    生产者

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# -*- Author:Hinimix -*-

import pika
import sys


creditials = pika.PlainCredentials('admin', 'admin')
conn = pika.BlockingConnection(
    pika.ConnectionParameters("192.168.20.61", 5672, '/', creditials)
)

channel = conn.channel()    #声明一个管道

channel.exchange_declare(exchange='hi_topic',
                         exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else "anonymous.info"
print(routing_key)

message = ' '.join(sys.argv[2:]) or "Hello World!222"
print(message)

channel.basic_publish(
    exchange='hi_topic',            #exchange名字
    routing_key=routing_key,    #queue名字
    body=message,     #内容
    properties=pika.BasicProperties(
        delivery_mode=2, # 消息持久化
    )
)

print("[x] sent %s" % message)

conn.close()

    消费者

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# -*- Author:Hinimix -*-

import pika
import time
import sys

creditials = pika.PlainCredentials('admin', 'admin')
conn = pika.BlockingConnection(
    pika.ConnectionParameters("192.168.20.61", 5672, '/', creditials)
)

channel = conn.channel()
channel.exchange_declare(exchange='hi_topic',
                         exchange_type='topic')


result = channel.queue_declare(exclusive=True)   #排他的,唯一的
queue_name = result.method.queue
print("queue name: ", queue_name)

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [*.info] [*.info mysql.*] [#] 
" % sys.argv[0])
    sys.exit(1)


for binding_key in binding_keys:
    channel.queue_bind(exchange='hi_topic',
                       queue=queue_name,
                       routing_key="*.info mysql.*")
print(binding_key)


def callback(ch, method, properties, body):
    print(ch, '@@', method,'@@', properties,'@@', body)
    time.sleep(.1)
    print("[x] received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)      #手动确认


channel.basic_consume(
    callback,           # 如果收到消息调用callback来处理消息
    queue=queue_name,
    # no_ack=True     # True自动确认,tcp层面确认,实际处理完与否并不知道,默认是False
)

print("[*] waiting for messages.")
channel.start_consuming()

   RPC

    producer:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# -*- Author:Hinimix -*-

import pika
from uuid import uuid4
import time
creditials = pika.PlainCredentials('admin', 'admin')


class Fibonacci(object):

    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters("192.168.20.61", 5672, '/', creditials)
        )
        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)     #生成一个随机queue
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            self.on_response,
            no_ack=True,
            queue=self.callback_queue
        )

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid4())     #随机一串字符串
        self.channel.basic_publish(
            exchange='',
            routing_key='hi_rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,       #
                correlation_id=self.corr_id         #correlation_id 确保数据一致性
            ),
            body=str(n)
        )

        while self.response is None:
            self.connection.process_data_events()   #非阻塞版 start_consuming
            print("no message...")
            time.sleep(.5)
        return int(self.response)

fibonacci_rpc = Fibonacci()

print(" [x] Requesting fib 30")
response = fibonacci_rpc.call(2)
print(" [.] Response fib", response)

    consumer:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# -*- Author:Hinimix -*-

import pika

creditials = pika.PlainCredentials('admin', 'admin')
conn = pika.BlockingConnection(
    pika.ConnectionParameters("192.168.20.61", 5672, '/', creditials)
)

channel = conn.channel()    #声明一个管道
channel.queue_declare(queue='hi_rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)


def on_request(ch, method, props, body):
    n = int(body)
    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(
        exchange='',
        routing_key=props.reply_to,
        properties=pika.BasicProperties(
            correlation_id=props.correlation_id
        ),
        body=str(response)
    )
channel.basic_consume(
    on_request,
    queue="hi_rpc_queue"
)

print(" [x] waiting RPC request")
channel.start_consuming()
一堆命令:
  软件:
    rabbitmqctl start_app  #启动rabbitmq
    rabbitmqctl stop_app  #关闭rabbitmq
    rabbitmqctl status    #查看rabbitmq状态
    rabbitmqctl reset    #重置
    rabbitmqctl force_reset  #强制重置(主从坏了这么做)

  用户:
    rabbitmqctl list_users #查看用户
    rabbitmqctl add_user hinimix 1  #创建用户hinimix,密码是1
    rabbitmqctl change_password hinimix 2  #更改用户hinimix密码为2
    rabbitmqctl delete_user hinimix  #删除用户hinimix
    rabbitmqctl clear_password    #清除用户密码
    rabbitmqctl set_user_tags    #设置用户tag
  队列:
    rabbitmqctl list_queues -p vhost#查看多少queue和消息
    rabbitmqctl purge_queue  #清除队列
  exchange:
    rabbitmqctl list_exchanges
  vhost:
    rabbitmqctl address_vhost  #创建虚拟主机
    rabbitmqctl delete_vhost  #删除虚拟主机
    rabbitmqctl list_vhosts    #查看vhosts
  连接:
    rabbitmqctl list_connections  #查看连接
    rabbitmqctl list_channels    #查看channel
    rabbitmqctl list_consumers -p vhost #查看消费者
    rabbitmqctl close_connection connection_id

  权限:
    rabbitmqctl set_permissions -p vhost user conf write read
      #vhost: 默认是/
      #user:目标用户
      #permission:
        conf:配置权限正则
        write:写权限的正则
        read:读权限正则
        如:
          rabbitmqctl set_permission -p /sb hinimix "^mysql.*" ".*" ".*"
          用户hinimix在名叫sb的vhost上有以mysql开头的配置权限,其他全部读写权限
    rabbitmqctl clear_permission  #清除权限
    rabbitmqctl list_permissions  #查看权限
    rabbitmqctl list_user_permission user  #查看用户权限
  策略:在集群范围上修改队列和交换机的行为
    rabbitmqctl list_policies #查看策略,策略能配置镜像队列
    rabbitmqctl set_policy -p vhost -priority n -apply-to str name pattern definition #设置策略
      name:策略名字
      pattern:匹配该策略的正则
      definition:JSON形式,在shell中需要这个
      priority:优先级越高越高,默认0
      apply-to:策略对象,值为"queue" "exchange" "all" 默认是all
    rabbitmqctl clear_policies:清除策略
  集群:
    rabbitmqctl cluster_status  #查看集群状态
    rabbitmqctl join_cluster    #加入集群
    rabbitmqctl set_cluster_name  #设置集群名字


参考连接:https://blog.csdn.net/wulex/article/details/64127224
他写的贼好
原文地址:https://www.cnblogs.com/hinimix/p/9179535.html