Skip to content

Commit 9f2baf5

Browse files
[12.x] Batch Job Failure Callbacks Support (#55916)
* feat: add job failure callbacks to batches * cleanup & polish * formatting --------- Co-authored-by: Taylor Otwell <[email protected]>
1 parent f9e75e7 commit 9f2baf5

File tree

4 files changed

+270
-49
lines changed

4 files changed

+270
-49
lines changed

src/Illuminate/Bus/Batch.php

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -242,31 +242,19 @@ public function recordSuccessfulJob(string $jobId)
242242
$counts = $this->decrementPendingJobs($jobId);
243243

244244
if ($this->hasProgressCallbacks()) {
245-
$batch = $this->fresh();
246-
247-
(new Collection($this->options['progress']))->each(function ($handler) use ($batch) {
248-
$this->invokeHandlerCallback($handler, $batch);
249-
});
245+
$this->invokeCallbacks('progress');
250246
}
251247

252248
if ($counts->pendingJobs === 0) {
253249
$this->repository->markAsFinished($this->id);
254250
}
255251

256252
if ($counts->pendingJobs === 0 && $this->hasThenCallbacks()) {
257-
$batch = $this->fresh();
258-
259-
(new Collection($this->options['then']))->each(function ($handler) use ($batch) {
260-
$this->invokeHandlerCallback($handler, $batch);
261-
});
253+
$this->invokeCallbacks('then');
262254
}
263255

264256
if ($counts->allJobsHaveRanExactlyOnce() && $this->hasFinallyCallbacks()) {
265-
$batch = $this->fresh();
266-
267-
(new Collection($this->options['finally']))->each(function ($handler) use ($batch) {
268-
$this->invokeHandlerCallback($handler, $batch);
269-
});
257+
$this->invokeCallbacks('finally');
270258
}
271259
}
272260

@@ -281,6 +269,18 @@ public function decrementPendingJobs(string $jobId)
281269
return $this->repository->decrementPendingJobs($this->id, $jobId);
282270
}
283271

272+
/**
273+
* Invoke the callbacks of the given type.
274+
*/
275+
protected function invokeCallbacks(string $type, ?Throwable $e = null): void
276+
{
277+
$batch = $this->fresh();
278+
279+
foreach ($this->options[$type] ?? [] as $handler) {
280+
$this->invokeHandlerCallback($handler, $batch, $e);
281+
}
282+
}
283+
284284
/**
285285
* Determine if the batch has finished executing.
286286
*
@@ -346,28 +346,22 @@ public function recordFailedJob(string $jobId, $e)
346346
$this->cancel();
347347
}
348348

349-
if ($this->hasProgressCallbacks() && $this->allowsFailures()) {
350-
$batch = $this->fresh();
349+
if ($this->allowsFailures()) {
350+
if ($this->hasProgressCallbacks()) {
351+
$this->invokeCallbacks('progress', $e);
352+
}
351353

352-
(new Collection($this->options['progress']))->each(function ($handler) use ($batch, $e) {
353-
$this->invokeHandlerCallback($handler, $batch, $e);
354-
});
354+
if ($this->hasFailureCallbacks()) {
355+
$this->invokeCallbacks('failure', $e);
356+
}
355357
}
356358

357359
if ($counts->failedJobs === 1 && $this->hasCatchCallbacks()) {
358-
$batch = $this->fresh();
359-
360-
(new Collection($this->options['catch']))->each(function ($handler) use ($batch, $e) {
361-
$this->invokeHandlerCallback($handler, $batch, $e);
362-
});
360+
$this->invokeCallbacks('catch', $e);
363361
}
364362

365363
if ($counts->allJobsHaveRanExactlyOnce() && $this->hasFinallyCallbacks()) {
366-
$batch = $this->fresh();
367-
368-
(new Collection($this->options['finally']))->each(function ($handler) use ($batch, $e) {
369-
$this->invokeHandlerCallback($handler, $batch, $e);
370-
});
364+
$this->invokeCallbacks('finally');
371365
}
372366
}
373367

@@ -392,6 +386,14 @@ public function hasCatchCallbacks()
392386
return isset($this->options['catch']) && ! empty($this->options['catch']);
393387
}
394388

389+
/**
390+
* Determine if the batch has "failure" callbacks.
391+
*/
392+
public function hasFailureCallbacks(): bool
393+
{
394+
return isset($this->options['failure']) && ! empty($this->options['failure']);
395+
}
396+
395397
/**
396398
* Determine if the batch has "finally" callbacks.
397399
*

src/Illuminate/Bus/PendingBatch.php

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,7 @@ protected function ensureJobIsBatchable(object|array $job): void
119119
*/
120120
public function before($callback)
121121
{
122-
$this->options['before'][] = $callback instanceof Closure
123-
? new SerializableClosure($callback)
124-
: $callback;
122+
$this->registerCallback('before', $callback);
125123

126124
return $this;
127125
}
@@ -144,9 +142,7 @@ public function beforeCallbacks()
144142
*/
145143
public function progress($callback)
146144
{
147-
$this->options['progress'][] = $callback instanceof Closure
148-
? new SerializableClosure($callback)
149-
: $callback;
145+
$this->registerCallback('progress', $callback);
150146

151147
return $this;
152148
}
@@ -169,9 +165,7 @@ public function progressCallbacks()
169165
*/
170166
public function then($callback)
171167
{
172-
$this->options['then'][] = $callback instanceof Closure
173-
? new SerializableClosure($callback)
174-
: $callback;
168+
$this->registerCallback('then', $callback);
175169

176170
return $this;
177171
}
@@ -194,9 +188,7 @@ public function thenCallbacks()
194188
*/
195189
public function catch($callback)
196190
{
197-
$this->options['catch'][] = $callback instanceof Closure
198-
? new SerializableClosure($callback)
199-
: $callback;
191+
$this->registerCallback('catch', $callback);
200192

201193
return $this;
202194
}
@@ -219,9 +211,7 @@ public function catchCallbacks()
219211
*/
220212
public function finally($callback)
221213
{
222-
$this->options['finally'][] = $callback instanceof Closure
223-
? new SerializableClosure($callback)
224-
: $callback;
214+
$this->registerCallback('finally', $callback);
225215

226216
return $this;
227217
}
@@ -237,14 +227,28 @@ public function finallyCallbacks()
237227
}
238228

239229
/**
240-
* Indicate that the batch should not be cancelled when a job within the batch fails.
230+
* Indicate that the batch should not be canceled when a job within the batch fails.
231+
*
232+
* Optionally, add callbacks to be executed upon each job failure.
233+
*
234+
* @template TParam of Closure(\Illuminate\Bus\Batch, \Throwable|null): mixed)|(callable(\Illuminate\Bus\Batch, \Throwable|null): mixed)
241235
*
242-
* @param bool $allowFailures
236+
* @param bool|TParam|array<array-key, TParam> $param
243237
* @return $this
244238
*/
245-
public function allowFailures($allowFailures = true)
239+
public function allowFailures(Closure|callable|array|bool $param = true)
246240
{
247-
$this->options['allowFailures'] = $allowFailures;
241+
if (! is_bool($param)) {
242+
$param = Arr::wrap($param);
243+
244+
foreach ($param as $callback) {
245+
if (is_callable($callback)) {
246+
$this->registerCallback('failure', $callback);
247+
}
248+
}
249+
}
250+
251+
$this->options['allowFailures'] = ! ($param === false);
248252

249253
return $this;
250254
}
@@ -259,6 +263,26 @@ public function allowsFailures()
259263
return Arr::get($this->options, 'allowFailures', false) === true;
260264
}
261265

266+
/**
267+
* Get the "failure" callbacks that have been registered with the pending batch.
268+
*
269+
* @return array<array-key, Closure|callable>
270+
*/
271+
public function failureCallbacks(): array
272+
{
273+
return $this->options['failure'] ?? [];
274+
}
275+
276+
/**
277+
* Register a callback with proper serialization.
278+
*/
279+
private function registerCallback(string $type, Closure|callable $callback): void
280+
{
281+
$this->options[$type][] = $callback instanceof Closure
282+
? new SerializableClosure($callback)
283+
: $callback;
284+
}
285+
262286
/**
263287
* Set the name for the batch.
264288
*

tests/Bus/BusBatchTest.php

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,63 @@ public function test_failed_jobs_can_be_recorded_while_allowing_failures()
297297
$this->assertSame('Something went wrong.', $_SERVER['__catch.exception']->getMessage());
298298
}
299299

300+
public function test_failure_callbacks_execute_correctly(): void
301+
{
302+
$queue = m::mock(Factory::class);
303+
304+
$repository = new DatabaseBatchRepository(new BatchFactory($queue), DB::connection(), 'job_batches');
305+
306+
$pendingBatch = (new PendingBatch(new Container, collect()))
307+
->allowFailures([
308+
static fn (Batch $batch, $e): true => $_SERVER['__failure1.invoked'] = true,
309+
function (Batch $batch, $e) {
310+
$_SERVER['__failure2.invoked'] = true;
311+
},
312+
function (Batch $batch, $e) {
313+
$_SERVER['__failure3.batch'] = $batch;
314+
$_SERVER['__failure3.exception'] = $e;
315+
$_SERVER['__failure3.batch_id'] = $batch->id;
316+
$_SERVER['__failure3.batch_class'] = get_class($batch);
317+
$_SERVER['__failure3.exception_class'] = get_class($e);
318+
$_SERVER['__failure3.exception_message'] = $e->getMessage();
319+
$_SERVER['__failure3.param_count'] = func_num_args();
320+
},
321+
])
322+
->onConnection('test-connection')
323+
->onQueue('test-queue');
324+
325+
$batch = $repository->store($pendingBatch);
326+
327+
$job = new class
328+
{
329+
use Batchable;
330+
};
331+
332+
$queue->shouldReceive('connection')->once()
333+
->with('test-connection')
334+
->andReturn($connection = m::mock(stdClass::class));
335+
336+
$connection->shouldReceive('bulk')->once();
337+
338+
$batch = $batch->add([$job]);
339+
340+
$_SERVER['__failure1.invoked'] = false;
341+
$_SERVER['__failure2.invoked'] = false;
342+
$_SERVER['__failure3.batch'] = null;
343+
$_SERVER['__failure3.exception'] = null;
344+
345+
$batch->recordFailedJob('test-id', new RuntimeException('Comprehensive callback test.'));
346+
347+
$this->assertTrue($_SERVER['__failure1.invoked']);
348+
$this->assertTrue($_SERVER['__failure2.invoked']);
349+
$this->assertInstanceOf(Batch::class, $_SERVER['__failure3.batch']);
350+
$this->assertSame('Comprehensive callback test.', $_SERVER['__failure3.exception']->getMessage());
351+
$this->assertSame($batch->id, $_SERVER['__failure3.batch_id']);
352+
$this->assertSame(Batch::class, $_SERVER['__failure3.batch_class']);
353+
$this->assertSame(RuntimeException::class, $_SERVER['__failure3.exception_class']);
354+
$this->assertEquals(2, $_SERVER['__failure3.param_count']);
355+
}
356+
300357
public function test_batch_can_be_cancelled()
301358
{
302359
$queue = m::mock(Factory::class);

0 commit comments

Comments
 (0)