Eureka 系列(06)消息广播(下):TaskDispacher 之 Acceptor

Eureka 系列(06)消息广播(下):TaskDispacher 之 Acceptor - Worker 模式

Spring Cloud 系列目录 - Eureka 篇

Eureka 消息广播主要分三部分讲解:

  1. 服务器列表管理:PeerEurekaNodes 管理了所有的 PeerEurekaNode 节点。
  2. 消息广播机制分析:PeerAwareInstanceRegistryImpl 收到客户端的消息后,第一步:先更新本地注册信息;第二步:遍历所有的 PeerEurekaNode,转发给其它节点。
  3. TaskDispacher 消息处理: Acceptor - Worker 模式分析。

首先回顾一下消息广播的流程,在上一篇 Eureka 系列(05)消息广播(上):消息广播原理分析 中对 Eureka 消息广播的源码进行了分析,PeerAwareInstanceRegistryImpl 将消息广播任务委托给 PeerEurekaNode。PeerEurekaNode 内部采用 TaskDispacher 的 Acceptor - Worker 模式进行异步处理。本文则重点分析这种异步处理机制。

1. Acceptor - Worker 模式原理

图1:Acceptor - Worker 模式原理
sequenceDiagram participant TaskDispatcher participant AcceptorExecutor participant WorkerRunnable note left of TaskDispatcher : 接收消息广播任务<br/>process TaskDispatcher ->> AcceptorExecutor : process WorkerRunnable ->> AcceptorExecutor : requestWorkItem WorkerRunnable ->> AcceptorExecutor : requestWorkItems note right of WorkerRunnable : run opt error WorkerRunnable -->> AcceptorExecutor : reprocess end

总结: TaskDispatcher 接收消息广播任务,实际由 AcceptorExecutor 线程处理,之后由 WorkerRunnable 线程执行。WorkerRunnable 线程执行逻辑已经分析过了,下面看一下 AcceptorExecutor 的源码。

AcceptorExecutor 主要方法:

  • process/reprocess 接收消息广播任务,存放到 acceptorQueue/reprocessQueue 队列中。
  • requestWorkItem/requestWorkItems 获取要执行的消息广播任务 singleItemWorkQueue/batchWorkQueue

2. AcceptorExecutor

图2:AcceptorRunner 任务处理
graph LR A(Client) B(reprocessQueue) C(acceptorQueue) D(pendingTasks<br/>processingOrder) E(batchWorkQueue) F(singleItemWorkQueue) A -- reprocess --> B A -- process --> C B -- drainReprocessQueue --> D C -- drainAcceptorQueue --> D D -- assignBatchWork --> E D -- assignSingleItemWork --> F

总结: AcceptorRunner 线程每 10s 轮询一次,消息广播任务从 acceptorQueue -> pendingTasks -> batchWorkQueue,WorkerRunner 执行线程直接获取 batchWorkQueue 任务执行。

如果 pendingTasks 任务超载(默认10000)丢弃的原则是:一是丢弃最老的任务和重试的任务,执行最新的任务。二是同taskId的任务只执行最新的任务

  • pendingTasks 队列满后,reprocessQueue 任务会全部丢弃,acceptorQueue 则丢弃最老的任务执行最新的任务。
  • AcceptorExecutor 的初始化是在 PeerEurekaNode 方法中。默认 pendingTasks 的最大任务数为 maxBufferSize=10000个,一次批处理的最大数为 maxBatchingSize=200个,批处理的最大延迟时间为 maxBatchingDelay=500ms。

2.1 属性

AcceptorExecutor 内部有多个队列,维护任务的执行,队列的功能如下:

private final int maxBufferSize;		// pendingTasks队列最大值,默认值 10000
private final int maxBatchingSize;		// 一次批处理的最大任务数,默认值 250
private final long maxBatchingDelay;	// 任务的最大延迟时间,默认值 500ms

// 1. 接收消息广播的任务
private final BlockingQueue<TaskHolder<ID, T>> acceptorQueue;
private final BlockingDeque<TaskHolder<ID, T>> reprocessQueue;

// 2. 默认每 10s 轮询一次,将接收的消息处理一次
//    AcceptorRunner 单线程处理,所以是普通队列
private final Map<ID, TaskHolder<ID, T>> pendingTasks;
private final Deque<ID> processingOrder;

// 3. 即将要处理的消息广播任务
private final Semaphore singleItemWorkRequests;
private final BlockingQueue<TaskHolder<ID, T>> singleItemWorkQueue;

private final Semaphore batchWorkRequests;
private final BlockingQueue<List<TaskHolder<ID, T>>> batchWorkQueue;

2.2 源码分析

2.2.1 AcceptorRunner 总体流程

class AcceptorRunner implements Runnable {
    @Override
    public void run() {
        long scheduleTime = 0;
        while (!isShutdown.get()) {
            try {
                // 1. 将任务从 reprocessQueue/acceptorQueue -> pendingTasks
                drainInputQueues();
                int totalItems = processingOrder.size();
                long now = System.currentTimeMillis();
                // 2. trafficShaper 流量整行,执行失败后的延迟时间
                //    congestionRetryDelayMs 100ms 执行一次
                //    networkFailureRetryMs 1000ms 执行一次
                if (scheduleTime < now) {
                    scheduleTime = now + trafficShaper.transmissionDelay();
                }
                // 3. pendingTasks -> batchWorkQueue
                if (scheduleTime <= now) {
                    assignBatchWork();
                    assignSingleItemWork();
                }
                // 4. 没有可执行的任务了,等待 10s
                if (totalItems == processingOrder.size()) {
                    Thread.sleep(10);
                }
            } catch (InterruptedException ex) {
            } catch (Throwable e) {
            }
        }
    }
}

总结: AcceptorRunner 线程每 10s 轮询一次,消息广播任务从从 acceptorQueue -> pendingTasks -> batchWorkQueue,WorkerRunner 执行线程直接获取 batchWorkQueue 任务执行。

  • drainInputQueues 任务从 acceptorQueue -> pendingTasks
  • assignBatchWork/assignSingleItemWork 任务从 pendingTasks -> batchWorkQueue

2.2.2 drainInputQueues

drainInputQueues 方法处理 reprocessQueue/acceptorQueue 队列中的任务到 pendingTasks,任务只有到 pendingTasks 中才会被处理,否则就丢弃。

// 先处理 reprocessQueue,再处理 acceptorQueue。
// 默认 acceptorQueue 会覆盖 reprocessQueue 中的任务,也就是最新的任务会覆盖重试的任务
private void drainInputQueues() throws InterruptedException {
    do {
        drainReprocessQueue();	// reprocessQueue
        drainAcceptorQueue();	// acceptorQueue

        if (!isShutdown.get()) {
            if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) {
                TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS);
                if (taskHolder != null) {
                    appendTaskHolder(taskHolder);
                }
            }
        }
    } while (!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty());
}

总结: drainInputQueues 方法将接收的消息广播任务从 reprocessQueue/acceptorQueue -> pendingTasks。如果 pendingTasks 任务大多执行的原则是: 丢弃最老的任务和重试的任务,执行最新的任务。

  1. drainReprocessQueue 处理 reprocessQueue 队列,也就是通过 repocess 接收的任务。当 pendingTasks 队列中的任务超出阈值,重试的任务直接丢弃。
  2. drainAcceptorQueue 处理 acceptorQueue 队列,也就是通过 process 接收的任务。pendingTasks 中最老的任务直接丢弃,将新的任务添加到队列中。
private void drainReprocessQueue() {
    long now = System.currentTimeMillis();
    // 1. 只要 pendingTasks 没有超过阈值,maxBufferSize=10000
    //    就将重试的任务添加到 pendingTasks 中 
    while (!reprocessQueue.isEmpty() && !isFull()) {
        TaskHolder<ID, T> taskHolder = reprocessQueue.pollLast();
        ID id = taskHolder.getId();
        if (taskHolder.getExpiryTime() <= now) {
            expiredTasks++;
        } else if (pendingTasks.containsKey(id)) {
            overriddenTasks++;
        } else {
            pendingTasks.put(id, taskHolder);
            processingOrder.addFirst(id);
        }
    }
    // 2. reprocessQueue 队列中剩余的任务全部丢弃。
    if (isFull()) {
        queueOverflows += reprocessQueue.size();
        reprocessQueue.clear();
    }
}

drainReprocessQueue 方法当任务过多时直接丢弃了重试的任务,drainAcceptorQueue 则不同,丢弃最老的任务,执行最新的任务。目的是保证新任务肯定能执行,而旧的任务根据实际情况丢弃。

private void drainAcceptorQueue() {
    while (!acceptorQueue.isEmpty()) {
        appendTaskHolder(acceptorQueue.poll());
    }
}
private void appendTaskHolder(TaskHolder<ID, T> taskHolder) {
    // 1. 如果任务超出阈值,丢弃最老的任务
    if (isFull()) {
        pendingTasks.remove(processingOrder.poll());
        queueOverflows++;
    }
    // 2. 将最新的任务添加到队列中
    TaskHolder<ID, T> previousTask = pendingTasks.put(taskHolder.getId(), taskHolder);
    if (previousTask == null) {
        processingOrder.add(taskHolder.getId());
    } else {
        overriddenTasks++;
    }
}

总结: 还是那句话,如果 pendingTasks 超出阈值时执行的原则是:

  1. 丢弃最老的任务和重试的任务,执行最新的任务。

  2. 同taskId的任务只执行最新的任务

    appendTaskHolder 会覆盖同名的 taskId 任务,taskId 的生成是在 PeerEurekaNode 接收消息广播任务时生成的,生成的原则是:requestType(任务类型)+ appName(应用名称)+ id(实例id)。任务类型包括:register、cancel、heartbeat、statusUpdate、deleteStatusOverride。

    // PeerEurekaNode.process 是会生成 taskId  
    private static String taskId(String requestType, InstanceInfo info) {
        return taskId(requestType, info.getAppName(), info.getId());
    }
    private static String taskId(String requestType, String appName, String id) {
        return requestType + '#' + appName + '/' + id;
    }
    

2.2.3 assignBatchWork

将任务从 pendingTasks 从移动到 batchWorkQueue 中,requestWorkItem 直接获取 batchWorkQueue 进行处理。

// pendingTasks -> batchWorkQueue
void assignBatchWork() {
    // 1. pendingTasks 为空则 false,pendingTasks 队列满了肯定为 true。
    //    任务的延迟时间不超过 maxBatchingDelay=500ms
    if (hasEnoughTasksForNextBatch()) {
        if (batchWorkRequests.tryAcquire(1)) {
            long now = System.currentTimeMillis();
            // 2. 批处理任务最大为 maxBatchingSize=250
            int len = Math.min(maxBatchingSize, processingOrder.size());
            List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
            while (holders.size() < len && !processingOrder.isEmpty()) {
                ID id = processingOrder.poll();
                TaskHolder<ID, T> holder = pendingTasks.remove(id);
                // 3. 任务过期,直接丢弃
                if (holder.getExpiryTime() > now) {
                    holders.add(holder);
                } else {
                    expiredTasks++;
                }
            }
            // 4. 添加到 batchWorkQueue 队列中
            if (holders.isEmpty()) {
                batchWorkRequests.release();
            } else {
                batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS);
                batchWorkQueue.add(holders);
            }
        }
    }
}

总结: assignBatchWork 执行的条件:一是 pendingTasks 任务超载了,立即执行;二是任务延迟时间大于 maxBatchingDelay=500ms。目的就是为了控制任务的执行频率:任务太多或延迟时间过长立即执行。

private boolean hasEnoughTasksForNextBatch() {
    if (processingOrder.isEmpty()) {
        return false;
    }
    // 队列中任务大多立即执行 maxBufferSize=10000
    if (pendingTasks.size() >= maxBufferSize) {
        return true;
    }
	// 任务延迟时间太长立即执行 maxBatchingDelay=500ms
    TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek());
    long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp();
    return delay >= maxBatchingDelay;
}

3. 问题总结

3.1 消费速度控制

(1)服务器忙或网络异常

当出现服务器忙或网络IO异常时就需要等待一段时间再发送请求。默认情况下:

  • congestionRetryDelayMs:服务器忙时至少 100ms
  • networkFailureRetryMs:网络IO异常时至少 1000ms
public void run() {
    ...
    if (scheduleTime < now) {
        scheduleTime = now + trafficShaper.transmissionDelay();
    }
    if (scheduleTime <= now) {
        assignBatchWork();
        assignSingleItemWork();
    }
}

总结: 如果出现服务器忙(503)或网络 IO 异常时,至少要等待一定的时间,再次发送请求。TrafficShaper 是流量整行的意思,即控制请求发送的频率。

long transmissionDelay() {
    // 没有任务异常,不能等待
    if (lastCongestionError == -1 && lastNetworkFailure == -1) {
        return 0;
    }
	// 出现对方服务器忙,至少 congestionRetryDelayMs=100ms
    long now = System.currentTimeMillis();
    if (lastCongestionError != -1) {
        long congestionDelay = now - lastCongestionError;
        if (congestionDelay >= 0 && congestionDelay < congestionRetryDelayMs) {
            return congestionRetryDelayMs - congestionDelay;
        }
        lastCongestionError = -1;
    }

	// 出现网IO异常,至少 networkFailureRetryMs=1000ms
    if (lastNetworkFailure != -1) {
        long failureDelay = now - lastNetworkFailure;
        if (failureDelay >= 0 && failureDelay < networkFailureRetryMs) {
            return networkFailureRetryMs - failureDelay;
        }
        lastNetworkFailure = -1;
    }
    return 0;
}

(2)最大任务数限制

如果消息广播任务超出阈值,丢弃的原则是:一是丢弃最老的任务和重试的任务,执行最新的任务。二是同taskId的任务只执行最新的任务。这在 2.2.2小节有详细的说明,默认 maxBufferSize=10000。

(3)批处理任务延迟时间

在 PeerEurekaNode 接收广播任务,生成 TaskHolder 时,会生成任务的提交时间,如果任务延迟赶时间超过 maxBatchingDelay 则立即执行。这个时间是在 hasEnoughTasksForNextBatch 方法中进行控制的。默认 maxBatchingDelay=500ms。

3.2 Semaphore batchWorkRequests 作用分析

在任务的处理过程中都会使用 Semaphore 这个信号锁,它的作用是什么呢?

private final Semaphore batchWorkRequests = new Semaphore(0);
private final BlockingQueue<List<TaskHolder<ID, T>>> batchWorkQueue = new LinkedBlockingQueue<>();

// AcceptorRunner 分配任务
void assignBatchWork() {
    if (hasEnoughTasksForNextBatch()) {
        if (batchWorkRequests.tryAcquire(1)) {
            List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
           	...
            if (holders.isEmpty()) {
                // 如果没有分配任务,下一次可继续分配任务
                batchWorkRequests.release();
            } else {
                // 如果已经分配任务,则必须等到消费才消费才开始重新分配任务
                // 如果任务一直没有被消费,则 AcceptorRunner 轮询时会丢弃老的任务
                batchWorkQueue.add(holders);
            }
        }
    }
}
// WorkerRunable 获取任务
BlockingQueue<List<TaskHolder<ID, T>>> requestWorkItems() {
    batchWorkRequests.release();
    return batchWorkQueue;
}

总结: AcceptorRunner 线程轮询时进行任务分配,如果没有获取 Semaphore 锁,也就是说任务一直没有被消费,当 pendingTasks 任务过多,会按照丢弃老的执行新的任务原则进行处理。如果有 WorkerRunable 线程进行消费则会释放锁,重新进行任务分配。


每天用心记录一点点。内容也许不重要,但习惯很重要!

原文地址:https://www.cnblogs.com/binarylei/p/11617710.html