DAY 128 rabbitmq

1 消息队列介绍

1 先进先出”的一种数据机构--》消息队列(mq)
2 MQ解决什么问题
-应用解耦
   -流量消峰
   -消息分发(发布订阅)
   -异步消息(celery:本质是对消息队列的封装)
   
3 主流消息队列产品
-Kafka(互联网公司,数据量大用的多,吞吐量高,数据安全性低一些)
   -rabbitmq(吞吐量低一些,安全性,准确性高)
   

2 rabbitmq安装

1 源码安装
-https://zhuanlan.zhihu.com/p/375157411
2 yum 安装(rpm安装)
# 安装配置epel源
   yum install epel-release -y
   # 安装erlang
   yum -y install erlang
   # 安装RabbitMQ
   yum -y install rabbitmq-server
# 启动
   systemctl start rabbitmq-server
   # 查看是否启动
   ps aux |grep rabbitmq
3 docker安装
-拉一个镜像,启动起来即可
# 安装好Docker,执行下面命令
   docker pull rabbitmq:management
   docker run -di --name Myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management
   # 浏览器访问:
   http://10.0.0.103:15672
   # 输入用户名:admin 密码:admin ,进入到管理控制台

 

3 基本使用(生产者消费者模型)

1 使用python操作rabbitmq
2 pip3 install pika

3.1 生产者

import pika

# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166'))

# 有密码
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))


channel = connection.channel()

# 声明一个队列
channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                     routing_key='hello',
                     body='llnb')
print("Sent 'llnb'")
connection.close()

3.2 消费者

import pika

credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()

# 声明一个队列(创建一个队列),如果生产者还没放数据,这个队列根本不存,这句话保证代码不出错
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
   print("消费者接受到了任务: %r" % body)
   # 通知服务端,消息取走了,如果auto_ack=False,不加下面,消息会一直存在
   # ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='hello',on_message_callback=callback,auto_ack=False)

channel.start_consuming()

 

4 消息安全之ack

### 生产者
import pika

# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166'))

# 有密码
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))


channel = connection.channel()

# 声明一个队列
channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                     routing_key='hello',
                     body='llnb')
print("Sent 'llnb'")
connection.close()


### 消费者
import pika

credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()

# 声明一个队列(创建一个队列),如果生产者还没放数据,这个队列根本不存,这句话保证代码不出错
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
   print("消费者接受到了任务: %r" % body)


   # 通知服务端,消息取走了,如果auto_ack=False,不加下面,消息会一直存在
   # 保证数据安全
   ch.basic_ack(delivery_tag=method.delivery_tag)  # 回复确认,rabbitmq的server就把该消息删除


# 只要收到消息,立马回复,rabbitmq的server就把消息删除
channel.basic_consume(queue='hello',on_message_callback=callback,auto_ack=False)

channel.start_consuming()

 

原文地址:https://www.cnblogs.com/DEJAVU888/p/14938703.html