Skip to content

Commit 0fa6fb6

Browse files
committed
feat: add job failure callbacks to batches
1 parent d3e8791 commit 0fa6fb6

File tree

4 files changed

+282
-55
lines changed

4 files changed

+282
-55
lines changed

src/Illuminate/Bus/Batch.php

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -242,31 +242,31 @@ public function recordSuccessfulJob(string $jobId)
242242
$counts = $this->decrementPendingJobs($jobId);
243243

244244
if ($this->hasProgressCallbacks()) {
245-
$batch = $this->fresh();
246-
247-
(new Collection($this->options['progress']))->each(function ($handler) use ($batch) {
248-
$this->invokeHandlerCallback($handler, $batch);
249-
});
245+
$this->invokeCallbacks('progress');
250246
}
251247

252248
if ($counts->pendingJobs === 0) {
253249
$this->repository->markAsFinished($this->id);
254250
}
255251

256252
if ($counts->pendingJobs === 0 && $this->hasThenCallbacks()) {
257-
$batch = $this->fresh();
258-
259-
(new Collection($this->options['then']))->each(function ($handler) use ($batch) {
260-
$this->invokeHandlerCallback($handler, $batch);
261-
});
253+
$this->invokeCallbacks('then');
262254
}
263255

264256
if ($counts->allJobsHaveRanExactlyOnce() && $this->hasFinallyCallbacks()) {
265-
$batch = $this->fresh();
257+
$this->invokeCallbacks('finally');
258+
}
259+
}
260+
261+
/**
262+
* Invoke the callbacks of the given type.
263+
*/
264+
protected function invokeCallbacks(string $type, ?Throwable $e = null): void
265+
{
266+
$batch = $this->fresh();
266267

267-
(new Collection($this->options['finally']))->each(function ($handler) use ($batch) {
268-
$this->invokeHandlerCallback($handler, $batch);
269-
});
268+
foreach ($this->options[$type] ?? [] as $handler) {
269+
$this->invokeHandlerCallback($handler, $batch, $e);
270270
}
271271
}
272272

@@ -346,28 +346,22 @@ public function recordFailedJob(string $jobId, $e)
346346
$this->cancel();
347347
}
348348

349-
if ($this->hasProgressCallbacks() && $this->allowsFailures()) {
350-
$batch = $this->fresh();
349+
if ($this->allowsFailures()) {
350+
if ($this->hasProgressCallbacks()) {
351+
$this->invokeCallbacks('progress', $e);
352+
}
351353

352-
(new Collection($this->options['progress']))->each(function ($handler) use ($batch, $e) {
353-
$this->invokeHandlerCallback($handler, $batch, $e);
354-
});
354+
if ($this->hasFailureCallbacks()) {
355+
$this->invokeCallbacks('failure', $e);
356+
}
355357
}
356358

357359
if ($counts->failedJobs === 1 && $this->hasCatchCallbacks()) {
358-
$batch = $this->fresh();
359-
360-
(new Collection($this->options['catch']))->each(function ($handler) use ($batch, $e) {
361-
$this->invokeHandlerCallback($handler, $batch, $e);
362-
});
360+
$this->invokeCallbacks('catch', $e);
363361
}
364362

365363
if ($counts->allJobsHaveRanExactlyOnce() && $this->hasFinallyCallbacks()) {
366-
$batch = $this->fresh();
367-
368-
(new Collection($this->options['finally']))->each(function ($handler) use ($batch, $e) {
369-
$this->invokeHandlerCallback($handler, $batch, $e);
370-
});
364+
$this->invokeCallbacks('finally');
371365
}
372366
}
373367

@@ -402,6 +396,16 @@ public function hasFinallyCallbacks()
402396
return isset($this->options['finally']) && ! empty($this->options['finally']);
403397
}
404398

399+
/**
400+
* Determine if the batch has "failure" callbacks.
401+
*
402+
* @return bool
403+
*/
404+
public function hasFailureCallbacks()
405+
{
406+
return isset($this->options['failure']) && ! empty($this->options['failure']);
407+
}
408+
405409
/**
406410
* Cancel the batch.
407411
*

src/Illuminate/Bus/PendingBatch.php

Lines changed: 53 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,7 @@ protected function ensureJobIsBatchable(object|array $job): void
119119
*/
120120
public function before($callback)
121121
{
122-
$this->options['before'][] = $callback instanceof Closure
123-
? new SerializableClosure($callback)
124-
: $callback;
122+
$this->registerCallback('before', $callback);
125123

126124
return $this;
127125
}
@@ -144,9 +142,7 @@ public function beforeCallbacks()
144142
*/
145143
public function progress($callback)
146144
{
147-
$this->options['progress'][] = $callback instanceof Closure
148-
? new SerializableClosure($callback)
149-
: $callback;
145+
$this->registerCallback('progress', $callback);
150146

151147
return $this;
152148
}
@@ -169,9 +165,7 @@ public function progressCallbacks()
169165
*/
170166
public function then($callback)
171167
{
172-
$this->options['then'][] = $callback instanceof Closure
173-
? new SerializableClosure($callback)
174-
: $callback;
168+
$this->registerCallback('then', $callback);
175169

176170
return $this;
177171
}
@@ -194,9 +188,7 @@ public function thenCallbacks()
194188
*/
195189
public function catch($callback)
196190
{
197-
$this->options['catch'][] = $callback instanceof Closure
198-
? new SerializableClosure($callback)
199-
: $callback;
191+
$this->registerCallback('catch', $callback);
200192

201193
return $this;
202194
}
@@ -219,9 +211,7 @@ public function catchCallbacks()
219211
*/
220212
public function finally($callback)
221213
{
222-
$this->options['finally'][] = $callback instanceof Closure
223-
? new SerializableClosure($callback)
224-
: $callback;
214+
$this->registerCallback('finally', $callback);
225215

226216
return $this;
227217
}
@@ -237,14 +227,31 @@ public function finallyCallbacks()
237227
}
238228

239229
/**
240-
* Indicate that the batch should not be cancelled when a job within the batch fails.
230+
* Indicate that the batch should not be canceled when a job within the batch fails;
231+
* optionally add callbacks to be executed upon each job failure.
241232
*
242-
* @param bool $allowFailures
243-
* @return $this
233+
* When passed a boolean true, enables failure tolerance without callbacks.
234+
* When passed a boolean false, disables failure tolerance.
235+
* When passed callable(s), registers failure callbacks and enables failure tolerance.
236+
* Invalid callables are silently ignored during registration.
237+
*
238+
* @template TParam of Closure(\Illuminate\Bus\Batch, \Throwable|null): mixed)|(callable(\Illuminate\Bus\Batch, \Throwable|null): mixed)
239+
*
240+
* @param bool|TParam|array<array-key, TParam> $param
244241
*/
245-
public function allowFailures($allowFailures = true)
242+
public function allowFailures(bool|Closure|callable|array $param = true): self
246243
{
247-
$this->options['allowFailures'] = $allowFailures;
244+
if (! is_bool($param)) {
245+
$param = Arr::wrap($param);
246+
247+
foreach ($param as $callback) {
248+
if (is_callable($callback)) {
249+
$this->registerCallback('failure', $callback);
250+
}
251+
}
252+
}
253+
254+
$this->options['allowFailures'] = ! ($param === false);
248255

249256
return $this;
250257
}
@@ -259,6 +266,26 @@ public function allowsFailures()
259266
return Arr::get($this->options, 'allowFailures', false) === true;
260267
}
261268

269+
/**
270+
* Get the "failure" callbacks that have been registered with the pending batch.
271+
*
272+
* @return array<array-key, Closure|callable>
273+
*/
274+
public function failureCallbacks(): array
275+
{
276+
return $this->options['failure'] ?? [];
277+
}
278+
279+
/**
280+
* Register a callback with proper serialization.
281+
*/
282+
private function registerCallback(string $type, Closure|callable $callback): void
283+
{
284+
$this->options[$type][] = $callback instanceof Closure
285+
? new SerializableClosure($callback)
286+
: $callback;
287+
}
288+
262289
/**
263290
* Set the name for the batch.
264291
*
@@ -335,7 +362,7 @@ public function withOption(string $key, $value)
335362
/**
336363
* Dispatch the batch.
337364
*
338-
* @return \Illuminate\Bus\Batch
365+
* @return Batch
339366
*
340367
* @throws \Throwable
341368
*/
@@ -365,7 +392,7 @@ public function dispatch()
365392
/**
366393
* Dispatch the batch after the response is sent to the browser.
367394
*
368-
* @return \Illuminate\Bus\Batch
395+
* @return Batch
369396
*/
370397
public function dispatchAfterResponse()
371398
{
@@ -385,7 +412,7 @@ public function dispatchAfterResponse()
385412
/**
386413
* Dispatch an existing batch.
387414
*
388-
* @param \Illuminate\Bus\Batch $batch
415+
* @param Batch $batch
389416
* @return void
390417
*
391418
* @throws \Throwable
@@ -409,7 +436,7 @@ protected function dispatchExistingBatch($batch)
409436
* Dispatch the batch if the given truth test passes.
410437
*
411438
* @param bool|\Closure $boolean
412-
* @return \Illuminate\Bus\Batch|null
439+
* @return Batch|null
413440
*/
414441
public function dispatchIf($boolean)
415442
{
@@ -420,7 +447,7 @@ public function dispatchIf($boolean)
420447
* Dispatch the batch unless the given truth test passes.
421448
*
422449
* @param bool|\Closure $boolean
423-
* @return \Illuminate\Bus\Batch|null
450+
* @return Batch|null
424451
*/
425452
public function dispatchUnless($boolean)
426453
{
@@ -431,7 +458,7 @@ public function dispatchUnless($boolean)
431458
* Store the batch using the given repository.
432459
*
433460
* @param \Illuminate\Bus\BatchRepository $repository
434-
* @return \Illuminate\Bus\Batch
461+
* @return Batch
435462
*/
436463
protected function store($repository)
437464
{

tests/Bus/BusBatchTest.php

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
use Illuminate\Bus\PendingBatch;
1111
use Illuminate\Bus\Queueable;
1212
use Illuminate\Container\Container;
13+
use Illuminate\Contracts\Container\BindingResolutionException;
1314
use Illuminate\Contracts\Queue\Factory;
1415
use Illuminate\Contracts\Queue\ShouldQueue;
1516
use Illuminate\Database\Capsule\Manager as DB;
@@ -297,6 +298,63 @@ public function test_failed_jobs_can_be_recorded_while_allowing_failures()
297298
$this->assertSame('Something went wrong.', $_SERVER['__catch.exception']->getMessage());
298299
}
299300

301+
public function test_failure_callbacks_execute_correctly(): void
302+
{
303+
$queue = m::mock(Factory::class);
304+
305+
$repository = new DatabaseBatchRepository(new BatchFactory($queue), DB::connection(), 'job_batches');
306+
307+
$pendingBatch = (new PendingBatch(new Container, collect()))
308+
->allowFailures([
309+
static fn (Batch $batch, $e): true => $_SERVER['__failure1.invoked'] = true,
310+
function (Batch $batch, $e) {
311+
$_SERVER['__failure2.invoked'] = true;
312+
},
313+
function (Batch $batch, $e) {
314+
$_SERVER['__failure3.batch'] = $batch;
315+
$_SERVER['__failure3.exception'] = $e;
316+
$_SERVER['__failure3.batch_id'] = $batch->id;
317+
$_SERVER['__failure3.batch_class'] = get_class($batch);
318+
$_SERVER['__failure3.exception_class'] = get_class($e);
319+
$_SERVER['__failure3.exception_message'] = $e->getMessage();
320+
$_SERVER['__failure3.param_count'] = func_num_args();
321+
},
322+
])
323+
->onConnection('test-connection')
324+
->onQueue('test-queue');
325+
326+
$batch = $repository->store($pendingBatch);
327+
328+
$job = new class
329+
{
330+
use Batchable;
331+
};
332+
333+
$queue->shouldReceive('connection')->once()
334+
->with('test-connection')
335+
->andReturn($connection = m::mock(stdClass::class));
336+
337+
$connection->shouldReceive('bulk')->once();
338+
339+
$batch = $batch->add([$job]);
340+
341+
$_SERVER['__failure1.invoked'] = false;
342+
$_SERVER['__failure2.invoked'] = false;
343+
$_SERVER['__failure3.batch'] = null;
344+
$_SERVER['__failure3.exception'] = null;
345+
346+
$batch->recordFailedJob('test-id', new RuntimeException('Comprehensive callback test.'));
347+
348+
$this->assertTrue($_SERVER['__failure1.invoked']);
349+
$this->assertTrue($_SERVER['__failure2.invoked']);
350+
$this->assertInstanceOf(Batch::class, $_SERVER['__failure3.batch']);
351+
$this->assertSame('Comprehensive callback test.', $_SERVER['__failure3.exception']->getMessage());
352+
$this->assertSame($batch->id, $_SERVER['__failure3.batch_id']);
353+
$this->assertSame(Batch::class, $_SERVER['__failure3.batch_class']);
354+
$this->assertSame(RuntimeException::class, $_SERVER['__failure3.exception_class']);
355+
$this->assertEquals(2, $_SERVER['__failure3.param_count']);
356+
}
357+
300358
public function test_batch_can_be_cancelled()
301359
{
302360
$queue = m::mock(Factory::class);

0 commit comments

Comments
 (0)