function Cron::processQueues

Same name and namespace in other branches
  1. 9 core/lib/Drupal/Core/Cron.php \Drupal\Core\Cron::processQueues()
  2. 8.9.x core/lib/Drupal/Core/Cron.php \Drupal\Core\Cron::processQueues()
  3. 11.x core/lib/Drupal/Core/Cron.php \Drupal\Core\Cron::processQueues()

Processes cron queues.

1 call to Cron::processQueues()
Cron::run in core/lib/Drupal/Core/Cron.php
Executes a cron run.

File

core/lib/Drupal/Core/Cron.php, line 194

Class

Cron
The Drupal core Cron service.

Namespace

Drupal\Core

Code

protected function processQueues() {
  $max_wait = (double) $this->queueConfig['suspendMaximumWait'];
  // Build a stack of queues to work on.
  /** @var array<array{process_from: int<0, max>, queue: \Drupal\Core\Queue\QueueInterface, worker: \Drupal\Core\Queue\QueueWorkerInterface}> $queues */
  $queues = [];
  foreach ($this->queueManager
    ->getDefinitions() as $queue_name => $queue_info) {
    if (!isset($queue_info['cron'])) {
      continue;
    }
    $queue = $this->queueFactory
      ->get($queue_name);
    // Make sure every queue exists. There is no harm in trying to recreate
    // an existing queue.
    $queue->createQueue();
    $worker = $this->queueManager
      ->createInstance($queue_name);
    $queues[] = [
      // Set process_from to zero so each queue is always processed
      // immediately for the first time. This process_from timestamp will
      // change if a queue throws a delayable SuspendQueueException.
'process_from' => 0,
      'queue' => $queue,
      'worker' => $worker,
    ];
  }
  // Work through stack of queues, re-adding to the stack when a delay is
  // necessary.
  while ($item = array_shift($queues)) {
    [
      'queue' => $queue,
      'worker' => $worker,
      'process_from' => $process_from,
    ] = $item;
    // Each queue will be processed immediately when it is reached for the
    // first time, as zero > currentTime will never be true.
    if ($process_from > $this->time
      ->getCurrentMicroTime()) {
      $this->usleep((int) round($process_from - $this->time
        ->getCurrentMicroTime(), 3) * 1000000);
    }
    try {
      $this->processQueue($queue, $worker);
    } catch (SuspendQueueException $e) {
      // Return to this queue after processing other queues if the delay is
      // within the threshold.
      if ($e->isDelayable() && $e->getDelay() < $max_wait) {
        $item['process_from'] = $this->time
          ->getCurrentMicroTime() + $e->getDelay();
        // Place this queue back in the stack for processing later.
        array_push($queues, $item);
      }
    }
    // Reorder the queue by next 'process_from' timestamp.
    usort($queues, function (array $queueA, array $queueB) {
      return $queueA['process_from'] <=> $queueB['process_from'];
    });
  }
}

Buggy or inaccurate documentation? Please file an issue. Need support? Need help programming? Connect with the Drupal community.