rabbitmq之partitions

集群为了保证数据一致性,在同步数据的同时也会通过节点之间的心跳通信来保证对方存活。那如果集群节点通信异常会发生什么,系统如何保障正常提供服务,使用何种策略回复呢?

rabbitmq提供的处理脑裂的方法有两种:autoheal、pause_minority.

autoheal指的是在出现脑裂且恢复时采用分区中与客户端连接数最多的一个分区来作为winner,并将所有的losers分区重启。

pause_miniroty指的是在出现脑裂后判断自己是否为众数者majority,即自己所在分区是否为总节点数的一半以上length(AliveNodes) / length(Nodes) > 0.5,如果属于众数者则正常工作,否则做rabbit:stop()操作,并以1秒周期查询自己是否属于众数者。

下面主要介绍下autoheal的实现原理。

首先在rabbit.erl中根据启动流程中可以看到

-rabbit_boot_step({rabbit_node_monitor,
                   [{description, "node monitor"},
                    {mfa,         {rabbit_sup, start_restartable_child,
                                   [rabbit_node_monitor]}},
                    {requires,    [rabbit_alarm, guid_generator]},
                    {enables,     core_initialized}]}).

启动并注册系统事件来获取(rabbit_node_monitor.erl)

 {ok, _} = mnesia:subscribe(system),

当环境中出现分区恢复后,mneisa会收到{nodeup, Node},并根据此节点来获取远端有记录mnesia down过的节点,当两个节点都认为对方出现过down的情况下,即会发送{inconsistent_database, Context, Node}系统事件【1】,并在所有节点都起来的情况下开始autoheal, rabbit_node_monitor.erl。

handle_info({mnesia_system_event,
             {inconsistent_database, running_partitioned_network, Node}},
            State = #state{partitions = Partitions,
                           monitors   = Monitors}) ->
    %% We will not get a node_up from this node - yet we should treat it as
    %% up (mostly).
    State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
                 true  -> State;
                 false -> State#state{
                            monitors = pmon:monitor({rabbit, Node}, Monitors)}
             end,
    ok = handle_live_rabbit(Node),
    Partitions1 = lists:usort([Node | Partitions]),
    {noreply, maybe_autoheal(State1#state{partitions = Partitions1})};
maybe_autoheal(State = #state{autoheal = AState}) ->
    case all_nodes_up() of
        true  -> State#state{autoheal = rabbit_autoheal:maybe_start(AState)};
        false -> State
    end.

利用autoheal处理脑裂的时候,先在节点中找到一个leader,然后这个leader来公正地决定胜负。

maybe_start(not_healing) ->
    case enabled() of
        true  -> Leader = leader(),
                 send(Leader, {request_start, node()}),
                 rabbit_log:info("Autoheal request sent to ~p~n", [Leader]),
                 not_healing;
        false -> not_healing
    end;

针对获取到的所有分区开始做决定以选择分区中的winner和losers,决策原则是根据各个分区中所拥有的连接数的个数来确定的,如果连接数相同,则选择分区内节点最多的分区。

make_decision(AllPartitions) ->
    Sorted = lists:sort([{partition_value(P), P} || P <- AllPartitions]),
    [[Winner | _] | Rest] = lists:reverse([P || {_, P} <- Sorted]),
    {Winner, lists:append(Rest)}.

partition_value(Partition) ->
    Connections = [Res || Node <- Partition,
                          Res <- [rpc:call(Node, rabbit_networking,
                                           connections_local, [])],
                          is_list(Res)],
    {length(lists:append(Connections)), length(Partition)}.

之后loser会做重启,可能会因此而导致数据丢失。

参考文献

1. mnesia之inconsistent_database. http://my.oschina.net/hncscwc/blog/174416

2. rabbitmq对network partition的处理. http://my.oschina.net/hncscwc/blog/174417

原文地址:https://www.cnblogs.com/haoqingchuan/p/4354669.html