<?php namespace Illuminate\Http\Client; use Carbon\CarbonImmutable; use Closure; use GuzzleHttp\Exception\RequestException; use GuzzleHttp\Promise\EachPromise; use GuzzleHttp\Utils; use Illuminate\Http\Client\Promises\LazyPromise; use Illuminate\Support\Defer\DeferredCallback; use function Illuminate\Support\defer; /** * @mixin \Illuminate\Http\Client\Factory */ class Batch { /** * The factory instance. * * @var \Illuminate\Http\Client\Factory */ protected $factory; /** * The array of requests. * * @var array<array-key, \Illuminate\Http\Client\PendingRequest> */ protected $requests = []; /** * The total number of requests that belong to the batch. * * @var non-negative-int */ public $totalRequests = 0; /** * The total number of requests that are still pending. * * @var non-negative-int */ public $pendingRequests = 0; /** * The total number of requests that have failed. * * @var non-negative-int */ public $failedRequests = 0; /** * The handler function for the Guzzle client. * * @var callable */ protected $handler; /** * The callback to run before the first request from the batch runs. * * @var (\Closure($this): void)|null */ protected $beforeCallback = null; /** * The callback to run after a request from the batch succeeds. * * @var (\Closure($this, int|string, \Illuminate\Http\Client\Response): void)|null */ protected $progressCallback = null; /** * The callback to run after a request from the batch fails. * * @var (\Closure($this, int|string, \Illuminate\Http\Client\Response|\Illuminate\Http\Client\RequestException|\Illuminate\Http\Client\ConnectionException): void)|null */ protected $catchCallback = null; /** * The callback to run if all the requests from the batch succeeded. * * @var (\Closure($this, array<int|string, \Illuminate\Http\Client\Response>): void)|null */ protected $thenCallback = null; /** * The callback to run after all the requests from the batch finish. * * @var (\Closure($this, array<int|string, \Illuminate\Http\Client\Response>): void)|null */ protected $finallyCallback = null; /** * If the batch already was sent. * * @var bool */ protected $inProgress = false; /** * The date when the batch was created. * * @var \Carbon\CarbonImmutable|null */ public $createdAt = null; /** * The date when the batch finished. * * @var \Carbon\CarbonImmutable|null */ public $finishedAt = null; /** * The maximum number of concurrent requests. * * @var int|null */ protected $concurrencyLimit = null; /** * Create a new request batch instance. */ public function __construct(?Factory $factory = null) { $this->factory = $factory ?: new Factory; $this->handler = Utils::chooseHandler(); $this->createdAt = new CarbonImmutable; } /** * Add a request to the batch with a key. * * @param string $key * @return \Illuminate\Http\Client\PendingRequest * * @throws \Illuminate\Http\Client\BatchInProgressException */ public function as(string $key) { if ($this->inProgress) { throw new BatchInProgressException(); } $this->incrementPendingRequests(); return $this->requests[$key] = $this->asyncRequest(); } /** * Add a request to the batch with a numeric index. * * @return \Illuminate\Http\Client\PendingRequest|\GuzzleHttp\Promise\Promise * * @throws \Illuminate\Http\Client\BatchInProgressException */ public function newRequest() { if ($this->inProgress) { throw new BatchInProgressException(); } $this->incrementPendingRequests(); return $this->requests[] = $this->asyncRequest(); } /** * Register a callback to run before the first request from the batch runs. * * @param (\Closure($this): void) $callback * @return Batch */ public function before(Closure $callback): self { $this->beforeCallback = $callback; return $this; } /** * Register a callback to run after a request from the batch succeeds. * * @param (\Closure($this, int|string, \Illuminate\Http\Client\Response): void) $callback * @return Batch */ public function progress(Closure $callback): self { $this->progressCallback = $callback; return $this; } /** * Register a callback to run after a request from the batch fails. * * @param (\Closure($this, int|string, \Illuminate\Http\Client\Response|\Illuminate\Http\Client\RequestException|\Illuminate\Http\Client\ConnectionException): void) $callback * @return Batch */ public function catch(Closure $callback): self { $this->catchCallback = $callback; return $this; } /** * Register a callback to run after all the requests from the batch succeed. * * @param (\Closure($this, array<int|string, \Illuminate\Http\Client\Response>): void) $callback * @return Batch */ public function then(Closure $callback): self { $this->thenCallback = $callback; return $this; } /** * Register a callback to run after all the requests from the batch finish. * * @param (\Closure($this, array<int|string, \Illuminate\Http\Client\Response>): void) $callback * @return Batch */ public function finally(Closure $callback): self { $this->finallyCallback = $callback; return $this; } /** * Set the maximum number of concurrent requests. * * @param int $limit * @return Batch */ public function concurrency(int $limit): self { $this->concurrencyLimit = $limit; return $this; } /** * Defer the batch to run in the background after the current task has finished. * * @return \Illuminate\Support\Defer\DeferredCallback */ public function defer(): DeferredCallback { return defer(fn () => $this->send()); } /** * Send all of the requests in the batch. * * @return array<int|string, \Illuminate\Http\Client\Response|\Illuminate\Http\Client\RequestException> */ public function send(): array { $this->inProgress = true; if ($this->beforeCallback !== null) { call_user_func($this->beforeCallback, $this); } $results = []; if (! empty($this->requests)) { $eachPromiseOptions = [ 'fulfilled' => function ($result, $key) use (&$results) { $results[$key] = $result; $this->decrementPendingRequests(); if ($result instanceof Response && $result->successful()) { if ($this->progressCallback !== null) { call_user_func($this->progressCallback, $this, $key, $result); } return $result; } if ( ($result instanceof Response && $result->failed()) || $result instanceof RequestException || $result instanceof ConnectionException ) { $this->incrementFailedRequests(); if ($this->catchCallback !== null) { call_user_func($this->catchCallback, $this, $key, $result); } } return $result; }, 'rejected' => function ($reason, $key) { $this->decrementPendingRequests(); if ($reason instanceof RequestException || $reason instanceof ConnectionException) { $this->incrementFailedRequests(); if ($this->catchCallback !== null) { call_user_func($this->catchCallback, $this, $key, $reason); } } return $reason; }, ]; if ($this->concurrencyLimit !== null) { $eachPromiseOptions['concurrency'] = $this->concurrencyLimit; } $promiseGenerator = function () { foreach ($this->requests as $key => $item) { $promise = $item instanceof PendingRequest ? $item->getPromise() : $item; yield $key => $promise instanceof LazyPromise ? $promise->buildPromise() : $promise; } }; (new EachPromise($promiseGenerator(), $eachPromiseOptions)) ->promise() ->wait(); } // Before returning the results, we must ensure that the results are sorted // in the same order as the requests were defined, respecting any custom // key names that were assigned to this request using the "as" method. uksort($results, function ($key1, $key2) { return array_search($key1, array_keys($this->requests), true) <=> array_search($key2, array_keys($this->requests), true); }); if (! $this->hasFailures() && $this->thenCallback !== null) { call_user_func($this->thenCallback, $this, $results); } if ($this->finallyCallback !== null) { call_user_func($this->finallyCallback, $this, $results); } $this->finishedAt = new CarbonImmutable; $this->inProgress = false; return $results; } /** * Retrieve a new async pending request. * * @return \Illuminate\Http\Client\PendingRequest */ protected function asyncRequest() { return $this->factory->setHandler($this->handler)->async(); } /** * Get the total number of requests that have been processed by the batch thus far. * * @return non-negative-int */ public function processedRequests(): int { return $this->totalRequests - $this->pendingRequests; } /** * Determine if the batch has finished executing. * * @return bool */ public function finished(): bool { return ! is_null($this->finishedAt); } /** * Increment the count of total and pending requests in the batch. * * @return void */ protected function incrementPendingRequests(): void { $this->totalRequests++; $this->pendingRequests++; } /** * Decrement the count of pending requests in the batch. * * @return void */ protected function decrementPendingRequests(): void { $this->pendingRequests--; } /** * Determine if the batch has job failures. * * @return bool */ public function hasFailures(): bool { return $this->failedRequests > 0; } /** * Increment the count of failed requests in the batch. * * @return void */ protected function incrementFailedRequests(): void { $this->failedRequests++; } /** * Get the requests in the batch. * * @return array<array-key, \Illuminate\Http\Client\PendingRequest> */ public function getRequests(): array { return $this->requests; } /** * Add a request to the batch with a numeric index. * * @param string $method * @param array $parameters * @return \Illuminate\Http\Client\PendingRequest|\GuzzleHttp\Promise\Promise * * @throws \Illuminate\Http\Client\BatchInProgressException */ public function __call(string $method, array $parameters) { return $this->newRequest()->{$method}(...$parameters); } }