function Cron::processQueues

Same name in other branches
  1. 9 core/lib/Drupal/Core/Cron.php \Drupal\Core\Cron::processQueues()
  2. 8.9.x core/lib/Drupal/Core/Cron.php \Drupal\Core\Cron::processQueues()
  3. 11.x core/lib/Drupal/Core/Cron.php \Drupal\Core\Cron::processQueues()

Processes cron queues.

1 call to Cron::processQueues()
Cron::run in core/lib/Drupal/Core/Cron.php

File

core/lib/Drupal/Core/Cron.php, line 194

Class

Cron
The Drupal core Cron service.

Namespace

Drupal\Core

Code

protected function processQueues() {
    $max_wait = (double) $this->queueConfig['suspendMaximumWait'];
    // Build a stack of queues to work on.
    
    /** @var array<array{process_from: int<0, max>, queue: \Drupal\Core\Queue\QueueInterface, worker: \Drupal\Core\Queue\QueueWorkerInterface}> $queues */
    $queues = [];
    foreach ($this->queueManager
        ->getDefinitions() as $queue_name => $queue_info) {
        if (!isset($queue_info['cron'])) {
            continue;
        }
        $queue = $this->queueFactory
            ->get($queue_name);
        // Make sure every queue exists. There is no harm in trying to recreate
        // an existing queue.
        $queue->createQueue();
        $worker = $this->queueManager
            ->createInstance($queue_name);
        $queues[] = [
            // Set process_from to zero so each queue is always processed
            // immediately for the first time. This process_from timestamp will
            // change if a queue throws a delayable SuspendQueueException.
'process_from' => 0,
            'queue' => $queue,
            'worker' => $worker,
        ];
    }
    // Work through stack of queues, re-adding to the stack when a delay is
    // necessary.
    while ($item = array_shift($queues)) {
        [
            'queue' => $queue,
            'worker' => $worker,
            'process_from' => $process_from,
        ] = $item;
        // Each queue will be processed immediately when it is reached for the
        // first time, as zero > currentTime will never be true.
        if ($process_from > $this->time
            ->getCurrentMicroTime()) {
            $this->usleep((int) round($process_from - $this->time
                ->getCurrentMicroTime(), 3) * 1000000);
        }
        try {
            $this->processQueue($queue, $worker);
        } catch (SuspendQueueException $e) {
            // Return to this queue after processing other queues if the delay is
            // within the threshold.
            if ($e->isDelayable() && $e->getDelay() < $max_wait) {
                $item['process_from'] = $this->time
                    ->getCurrentMicroTime() + $e->getDelay();
                // Place this queue back in the stack for processing later.
                array_push($queues, $item);
            }
        }
        // Reorder the queue by next 'process_from' timestamp.
        usort($queues, function (array $queueA, array $queueB) {
            return $queueA['process_from'] <=> $queueB['process_from'];
        });
    }
}

Buggy or inaccurate documentation? Please file an issue. Need support? Need help programming? Connect with the Drupal community.