[Back] <?php
namespace Illuminate\Queue;
use Illuminate\Contracts\Events\Dispatcher as EventDispatcher;
use Illuminate\Contracts\Queue\Queue as QueueContract;
use Illuminate\Queue\Events\QueueFailedOver;
use RuntimeException;
use Throwable;
class FailoverQueue extends Queue implements QueueContract
{
/**
* The queues which failed on the last action.
*
* @var list<string>
*/
protected array $failingQueues = [];
/**
* Create a new failover queue instance.
*/
public function __construct(
public QueueManager $manager,
public EventDispatcher $events,
public array $connections
) {
}
/**
* Get the size of the queue.
*
* @param string|null $queue
* @return int
*/
public function size($queue = null)
{
return $this->manager->connection($this->connections[0])->size($queue);
}
/**
* Get the number of pending jobs.
*
* @param string|null $queue
* @return int
*/
public function pendingSize($queue = null)
{
return $this->manager->connection($this->connections[0])->pendingSize($queue);
}
/**
* Get the number of delayed jobs.
*
* @param string|null $queue
* @return int
*/
public function delayedSize($queue = null)
{
return $this->manager->connection($this->connections[0])->delayedSize($queue);
}
/**
* Get the number of reserved jobs.
*
* @param string|null $queue
* @return int
*/
public function reservedSize($queue = null)
{
return $this->manager->connection($this->connections[0])->reservedSize($queue);
}
/**
* Get the creation timestamp of the oldest pending job, excluding delayed jobs.
*
* @param string|null $queue
* @return int|null
*/
public function creationTimeOfOldestPendingJob($queue = null)
{
return $this->manager
->connection($this->connections[0])
->creationTimeOfOldestPendingJob($queue);
}
/**
* Push a new job onto the queue.
*
* @param object|string $job
* @param mixed $data
* @param string|null $queue
* @return mixed
*/
public function push($job, $data = '', $queue = null)
{
return $this->attemptOnAllConnections(__FUNCTION__, func_get_args(), $job);
}
/**
* Push a raw payload onto the queue.
*
* @param string $payload
* @param string|null $queue
* @return mixed
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
return $this->attemptOnAllConnections(__FUNCTION__, func_get_args());
}
/**
* Push a new job onto the queue after (n) seconds.
*
* @param \DateTimeInterface|\DateInterval|int $delay
* @param string $job
* @param mixed $data
* @param string|null $queue
* @return mixed
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->attemptOnAllConnections(__FUNCTION__, func_get_args(), $job);
}
/**
* Pop the next job off of the queue.
*
* @param string|null $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
return $this->manager->connection($this->connections[0])->pop($queue);
}
/**
* Attempt the given method on all connections.
*
* @param mixed $job
* @return mixed
*
* @throws \Throwable
*/
protected function attemptOnAllConnections(string $method, array $arguments, $job = null)
{
[$lastException, $failedQueues] = [null, []];
try {
foreach ($this->connections as $connection) {
try {
return $this->manager->connection($connection)->{$method}(...$arguments);
} catch (Throwable $e) {
$lastException = $e;
$failedQueues[] = $connection;
if ($job !== null && ! in_array($connection, $this->failingQueues)) {
$this->events->dispatch(new QueueFailedOver($connection, $job, $e));
}
}
}
} finally {
$this->failingQueues = $failedQueues;
}
throw $lastException ?? new RuntimeException('All failover queue connections failed.');
}
}