rabbitmq

RabbitMQ  是一个在AMQP协议标准基础上完整的,可服用的企业消息系统。它遵循Mozilla Public License开源协议,采用 Erlang 实现的工业级的消息队列(MQ)服务器。

一、应用场景

异步处理

应用解耦

流量消峰

日志收集

二、支持多种语言

比如Js广告收集,大量的广告收集  会用到消息队列

消息中间件--》流实时计算

消息中间件--》离线计算

三、java操作rabbitmq

1 simple 简单队列

2 work queues 工作队列、公平分发、轮询分发

3 publish/subscribe 发布订阅

4 routing 路由选择 通配符模式

5 topics 主题

6 手动和自动确认消息

7 队列的持久化和非持久化

8 rabbitmq 的延迟队列        场景  :未支付订单30分钟取消

百度统计 cnzz架构

rabbitmq AMQP协议

添加用户

创建库   以/开头

virtual host

对用户授权

控制台 overview

ProtocolNodeBound toPort
amqp rabbit1@mqnode1 :: 30007
amqp rabbit2@mqnode2 :: 30007
amqp rabbit3@mqnode3 :: 30007
clustering rabbit1@mqnode1 :: 50001
clustering rabbit2@mqnode2 :: 50001
clustering rabbit3@mqnode3 :: 50001
http rabbit1@mqnode1 :: 30008
http rabbit2@mqnode2 :: 30008
http rabbit3@mqnode3 :: 30008

amqp 开发所有接口,qmqp协议

clustering 集群端口

http  http访问端口

connections 当前有哪些连接

chaannels 当前有那些频道

exchanges 交换机

队列

简单队列

下载安装

RabbitMQ 

https://www.rabbitmq.com/install-rpm.html#downloads

https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.17

erlang

https://www.erlang-solutions.com/resources/download.html

教程

https://blog.csdn.net/u013887008/article/details/100859070

安装erlang

rpm -ivh esl-erlang_22.0.7-1~centos~7_amd64.rpm

如果报错如下:
    警告:esl-erlang_22.0-1_centos_7_amd64.rpm: 头V4 RSA/SHA256 Signature, 密钥 ID a14f4fca: NOKEY
    
    则需要先执行下面命令,安装依赖,在执行安装的命令:
  1. sudo yum install epel-release
  2. sudo yum install unixODBC unixODBC-devel wxBase wxGTK SDL wxGTK-gl

3.执行下面命令进行验证:erl

4.执行如下命令退出:halt().        "注意这个.点,不能丢"

查看rpm 包安装到哪了

rpm -ql XXX.rpm

二、开始安装rabbitmq

pm -ivh --prefix= /opt/temp  xxx.rpm

1.下载3.7.15版本的rabbitmq,与Erlang版本要对应上

wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.15/rabbitmq-server-generic-unix-3.7.15.tar.xz
rpm -ivh rabbitmq-server-3.7.17-1.el7.noarch.rpm
## 可能需要先安装!!
yum install socat

启动rabbitmq

1.开机启动:

chkconfig rabbitmq-server on

2.查看启动状态:

rabbitmqctl status

3.启动,关闭,重启:

systemctl start rabbitmq-server.service
systemctl stop rabbitmq-server.service
systemctl restart rabbitmq-server.service

启动web管理台

rabbitmq-plugins enable rabbitmq_management

 

访问: http://192.168.93.129:15672,默认用户:guest/guest,但登陆时显示User can only log in via localhost!!!

解决方案

找到文件/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.17/ebin/rabbit.app:

{loopback_users, [<<"guest">>]},

改为{loopback_users, []},

然后重启服务即可:

systemctl restart rabbitmq-server.service

简单消息队列

java 调用mq,一对一   单个生产者,一个消息者

<dependencies>
         <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.0.0</version>
        </dependency>
        
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-nop</artifactId>
            <version>1.7.25</version>
        </dependency>
         <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

生产者

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ProducterDirectDemo {
    
    private static String queneName = "testQuene";
    
    
    public static Connection getConnection() {
        Connection connection = null;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.80.110");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
            // 创建与RabbitMQ服务器的TCP连接
            connection = factory.newConnection();
        } catch (Exception ex) {
            ex.printStackTrace();
        } 
        
        return connection;
    }
    
    
    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = getConnection();
        Channel channel = null;
        try {
            // 创建一个频道
            channel = connection.createChannel();
            // 声明默认的队列
            channel.queueDeclare(queneName, false, false, false, null);
            String msg = "发送消息";
            channel.basicPublish("", queneName, null, msg.getBytes());
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
}

消费者

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Consumer {
    private final static String QUEUE_NAME = "testQuene"; //队列名称

    public static void receive() throws IOException, TimeoutException {
        //由连接工厂创建连接
        Connection connection = ProducterDirectDemo.getConnection();
        //通过连接创建信道
        Channel channel = connection.createChannel();
        //创建消费者,指定要使用的channel。QueueingConsume类已经弃用,使用DefaultConsumer代替
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //监听的queue中有消息进来时,会自动调用此方法来处理消息。但此方法默认是空的,需要重写
            @Override
            public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                java.lang.String msg = new java.lang.String(body);
                System.out.println("received msg: " + msg);
            }
        };

        //监听指定的queue。会一直监听。
        //参数:要监听的queue、是否自动确认消息、使用的Consumer
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
    
    
    public static void main(String[] args) throws IOException, TimeoutException {
        receive();
    }
    
}

2、work queues工作队列

一个生产者多个消费者

多个客户端会均分,不管哪个处理的快哪个处理的慢

采用轮询的机制

采用公平分发,要关闭自动应答

3、autoAck      自动确认后从内存中删除

4、如果强制杀死正在执行的消费者就会丢失消息

如果不想丢失消息,要设置成手动

5、发布订阅模式

生产者把消息发送到交换机(exchange),交换机把消息发送到消息队列,一个队列对应一个消费者

路由规则

往交换机中发送消息
如果没有提前将队列绑定到交换机,那么直接运行生产者的话,消息是不会发到任何队列里的

6、topic 主题模式

将路由键和某模式匹配

# 匹配一个或多个

*  匹配一个

Good.#

7.rabbitmq 消息确认机制(事务)

生产有没有把消息发送到消息队列,默认是不知道的

两种方式

amqp 实现了事务机制

txSelect  用户将当前chanel 设置成transation

txCommit  提交事务

txRollback 回滚事务

降低了消息的吞吐量

confirm 模式

生产者端confirm 实现原理

confirm 异步模式

开启confirm 模式

发一条 waitforconfim 单条效率偏低

发一批 waitforconfim

异步模式

原文地址:https://www.cnblogs.com/jentary/p/13579603.html