rabbitmq method之queue.declare

queue.declare即申请队列,首先对队列名作处理,若未指定队列名则随机生成一个,然后查询数据库队列是否已经创建,若创建完成则会申请队列返回

handle_method(#'queue.declare'{queue       = QueueNameBin, 
                               passive     = false, 
                               durable     = DurableDeclare, 
                               exclusive   = ExclusiveDeclare, 
                               auto_delete = AutoDelete, 
                               nowait      = NoWait, 
                               arguments   = Args} = Declare, 
              _, State = #ch{virtual_host        = VHostPath, 
                             conn_pid            = ConnPid, 
                             queue_collector_pid = CollectorPid}) -> 
    Owner = case ExclusiveDeclare of 
                true  -> ConnPid; 
                false -> none 
            end, 
    Durable = DurableDeclare andalso not ExclusiveDeclare, 
    ActualNameBin = case QueueNameBin of 
                        <<>>  -> rabbit_guid:binary(rabbit_guid:gen_secure(), 
                                                    "amq.gen"); 
                        Other -> check_name('queue', Other) 
                    end, 
    QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), 
    check_configure_permitted(QueueName, State), 
    %%查找是否队列是否已经存在
    case rabbit_amqqueue:with( 
           QueueName, 
           fun (Q) -> ok = rabbit_amqqueue:assert_equivalence( 
                             Q, Durable, AutoDelete, Args, Owner), 
                      maybe_stat(NoWait, Q) 
           end) of 
        {ok, MessageCount, ConsumerCount} -> 
            return_queue_declare_ok(QueueName, NoWait, MessageCount, 
                                    ConsumerCount, State);         
        {error, not_found} -> 
            DlxKey = <<"x-dead-letter-exchange">>, 
            case rabbit_misc:r_arg(VHostPath, exchange, Args, DlxKey) of 
               undefined -> 
                   ok; 
               {error, {invalid_type, Type}} -> 
                    precondition_failed( 
                      "invalid type '~s' for arg '~s' in ~s", 
                      [Type, DlxKey, rabbit_misc:rs(QueueName)]); 
               DLX -> 
                   check_read_permitted(QueueName, State), 
                   check_write_permitted(DLX, State), 
                   ok 
            end, 
            case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, 
                                         Args, Owner) of 
                {new, #amqqueue{pid = QPid}} -> 
                    %% We need to notify the reader within the channel 
                    %% process so that we can be sure there are no 
                    %% outstanding exclusive queues being declared as 
                    %% the connection shuts down. 
                    ok = case Owner of 
                             none -> ok; 
                             _    -> rabbit_queue_collector:register( 
                                       CollectorPid, QPid) 
                         end, 
                    return_queue_declare_ok(QueueName, NoWait, 0, 0, State); 
                {existing, _Q} -> 
                    %% must have been created between the stat and the 
                    %% declare. Loop around again. 
                    handle_method(Declare, none, State); 
                {absent, Q, Reason} -> 
                    rabbit_misc:absent(Q, Reason); 
                {owner_died, _Q} -> 
                    %% Presumably our own days are numbered since the 
                    %% connection has died. Pretend the queue exists though, 
                    %% just so nothing fails. 
                    return_queue_declare_ok(QueueName, NoWait, 0, 0, State) 
            end; 
        {error, {absent, Q, Reason}} -> 
            rabbit_misc:absent(Q, Reason) 
    end;

rabbit_amqqueue.erl

其中的node()是为了指明master queue的位置,即收到申请队列消息的节点

declare(QueueName, Durable, AutoDelete, Args, Owner) -> 
    declare(QueueName, Durable, AutoDelete, Args, Owner, node()). 
选择主节点并对主节点发创建队列进程的消息
declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> 
    ok = check_declare_arguments(QueueName, Args), 
    Q = rabbit_queue_decorator:set( 
          rabbit_policy:set(#amqqueue{name               = QueueName, 
                                      durable            = Durable, 
                                      auto_delete        = AutoDelete, 
                                      arguments          = Args, 
                                      exclusive_owner    = Owner, 
                                      pid                = none, 
                                      slave_pids         = [], 
                                      sync_slave_pids    = [], 
                                      recoverable_slaves = [], 
                                      gm_pids            = [], 
                                      state              = live})), 
    Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), 
    gen_server2:call( 
      rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare), 
      {init, new}, infinity).

rabbit_amqqueue_sup.erl

在启动rabbit_amqp_process的时候,supervisor使用的Maker来标志此进程是否首次启动,以区别重启进程来做不同操作 。

start_link(Q, StartMode) -> 
    %%Marker存在的意义是什么?标志着是否为第一次启动
    Marker = spawn_link(fun() -> receive stop -> ok end end), 
    ChildSpec = {rabbit_amqqueue, 
                 {rabbit_prequeue, start_link, [Q, StartMode, Marker]}, 
                 intrinsic, ?MAX_WAIT, worker, [rabbit_amqqueue_process, 
                                                rabbit_mirror_queue_slave]}, 
    {ok, SupPid} = supervisor2:start_link(?MODULE, []), 
    {ok, QPid} = supervisor2:start_child(SupPid, ChildSpec), 
    unlink(Marker), 
    Marker ! stop, 
    {ok, SupPid, Qpid}.

之后,主节点会启动rabbit_amqp_process,用coordinator来完成数据同步(gm),而备节点则会启动rabbit_mirror_queue_slave进程,后者同时使用了gm behaviour,所以可以和coordinator来进程数据同步,以mq节点之间状态保持一致。

通过coordinator获取gm完成可靠同步,然后获取备节点在备节点增加镜像队列

init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> 
    {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( 
                   Q, undefined, sender_death_fun(), depth_fun()), 
    GM = rabbit_mirror_queue_coordinator:get_gm(CPid), 
    Self = self(), 
    ok = rabbit_misc:execute_mnesia_transaction( 
           fun () -> 
                   [Q1 = #amqqueue{gm_pids = GMPids}] 
                       = mnesia:read({rabbit_queue, QName}), 
                   ok = rabbit_amqqueue:store_queue( 
                          Q1#amqqueue{gm_pids = [{GM, Self} | GMPids], 
                                      state   = live}) 
           end), 

    {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), 

在所有的备节点上增加镜像队列,即创建备队列进程
    rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync), 
    #state { name                = QName, 
             gm                  = GM, 
             coordinator         = CPid, 
             backing_queue       = BQ, 
             backing_queue_state = BQS, 
             seen_status         = dict:new(), 
             confirmed           = [], 
             known_senders       = sets:new() }.



add_mirrors(QName, Nodes, SyncMode) -> 
    [add_mirror(QName, Node, SyncMode)  || Node <- Nodes], 
    ok. 

add_mirror(QName, MirrorNode, SyncMode) -> 
    case rabbit_amqqueue:lookup(QName) of 
        {ok, Q} -> 
            rabbit_misc:with_exit_handler( 
              rabbit_misc:const(ok), 
              fun () -> 
                      SPid = rabbit_amqqueue_sup_sup:start_queue_process( 
                               MirrorNode, Q, slave), 
                      log_info(QName, "Adding mirror on node ~p: ~p~n", 
                               [MirrorNode, SPid]), 
                      rabbit_mirror_queue_slave:go(SPid, SyncMode) 
              end); 
        {error, not_found} = E -> 
            E 
    end.

未完成待续
原文地址:https://www.cnblogs.com/haoqingchuan/p/4666206.html