Skip to content

Commit fcdaa73

Browse files
patrickcarlohickmantaylorotwell
authored andcommitted
[12.x] Fix SQS FIFO and fair queue support (laravel#57080)
* Add tests to ensure that job objects are properly queued on SQS using push or pending dispatch. * Add tests to ensure that job objects can properly enable SQS fair queues. * Add a test to ensure that job strings are properly pushed onto SQS FIFO queues. * Add tests to ensure that job objects are properly pushed onto SQS FIFO queues using push or pending dispatch. * Add tests to ensure that DelaySeconds are not sent to SQS FIFO queues when delayed. * Properly handle the message group and deduplication queable options for SQS FIFO and fair queues. * Ensure DelaySeconds is not sent to SQS FIFO queues. * Restore normal uuid creation at end of tests. * formatting --------- Co-authored-by: Taylor Otwell <[email protected]>
1 parent aacd2b8 commit fcdaa73

File tree

3 files changed

+252
-17
lines changed

3 files changed

+252
-17
lines changed

src/Illuminate/Queue/SqsQueue.php

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -200,10 +200,7 @@ public function later($delay, $job, $data = '', $queue = null)
200200
$queue,
201201
$delay,
202202
function ($payload, $queue, $delay) use ($job) {
203-
return $this->pushRaw($payload, $queue, [
204-
'DelaySeconds' => $this->secondsUntil($delay),
205-
...$this->getQueueableOptions($job, $queue),
206-
]);
203+
return $this->pushRaw($payload, $queue, $this->getQueueableOptions($job, $queue, $delay));
207204
}
208205
);
209206
}
@@ -213,27 +210,59 @@ function ($payload, $queue, $delay) use ($job) {
213210
*
214211
* @param mixed $job
215212
* @param string|null $queue
216-
* @return array{MessageGroupId?: string, MessageDeduplicationId?: string}
213+
* @param \DateTimeInterface|\DateInterval|int|null $delay
214+
* @return array{DelaySeconds?: int, MessageGroupId?: string, MessageDeduplicationId?: string}
217215
*/
218-
protected function getQueueableOptions($job, $queue): array
216+
protected function getQueueableOptions($job, $queue, $delay = null): array
219217
{
220-
if (! is_object($job) || ! str_ends_with((string) $queue, '.fifo')) {
221-
return [];
218+
// Make sure we have a queue name to properly determine if it's a FIFO queue...
219+
$queue ??= $this->default;
220+
221+
$isObject = is_object($job);
222+
$isFifo = str_ends_with((string) $queue, '.fifo');
223+
224+
$options = [];
225+
226+
// DelaySeconds cannot be used with FIFO queues. AWS will return an error...
227+
if (! empty($delay) && ! $isFifo) {
228+
$options['DelaySeconds'] = $this->secondsUntil($delay);
229+
}
230+
231+
// If the job is a string job on a standard queue, there are no more options...
232+
if (! $isObject && ! $isFifo) {
233+
return $options;
222234
}
223235

224236
$transformToString = fn ($value) => strval($value);
225237

226-
$messageGroupId = transform($job->messageGroup ?? null, $transformToString);
238+
// The message group ID is required for FIFO queues and is optional for
239+
// standard queues. Job objects contain a group ID. With string jobs
240+
// sent to FIFO queues, assign these to the same message group ID.
241+
$messageGroupId = null;
242+
243+
if ($isObject) {
244+
$messageGroupId = transform($job->messageGroup ?? null, $transformToString);
245+
} elseif ($isFifo) {
246+
$messageGroupId = transform($queue, $transformToString);
247+
}
227248

228-
$messageDeduplicationId = match (true) {
229-
method_exists($job, 'deduplicationId') => transform($job->deduplicationId(), $transformToString),
230-
default => (string) Str::orderedUuid(),
231-
};
249+
$options['MessageGroupId'] = $messageGroupId;
232250

233-
return array_filter([
234-
'MessageGroupId' => $messageGroupId,
235-
'MessageDeduplicationId' => $messageDeduplicationId,
236-
]);
251+
// The message deduplication ID is only valid for FIFO queues. Every job
252+
// without the method will be considered unique. To use content-based
253+
// deduplication enable it in AWS and have the method return empty.
254+
$messageDeduplicationId = null;
255+
256+
if ($isFifo) {
257+
$messageDeduplicationId = match (true) {
258+
$isObject && method_exists($job, 'deduplicationId') => transform($job->deduplicationId(), $transformToString),
259+
default => (string) Str::orderedUuid(),
260+
};
261+
}
262+
263+
$options['MessageDeduplicationId'] = $messageDeduplicationId;
264+
265+
return array_filter($options);
237266
}
238267

239268
/**
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<?php
2+
3+
namespace Illuminate\Tests\Queue\Fixtures;
4+
5+
use Illuminate\Contracts\Queue\ShouldQueue;
6+
use Illuminate\Foundation\Queue\Queueable;
7+
8+
class FakeSqsJob implements ShouldQueue
9+
{
10+
use Queueable;
11+
12+
public function handle(): void
13+
{
14+
//
15+
}
16+
}

tests/Queue/QueueSqsQueueTest.php

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,14 @@
44

55
use Aws\Result;
66
use Aws\Sqs\SqsClient;
7+
use Illuminate\Bus\Dispatcher;
78
use Illuminate\Container\Container;
9+
use Illuminate\Contracts\Bus\Dispatcher as DispatcherContract;
810
use Illuminate\Queue\Jobs\SqsJob;
911
use Illuminate\Queue\SqsQueue;
1012
use Illuminate\Support\Carbon;
13+
use Illuminate\Support\Str;
14+
use Illuminate\Tests\Queue\Fixtures\FakeSqsJob;
1115
use Mockery as m;
1216
use PHPUnit\Framework\TestCase;
1317

@@ -16,13 +20,17 @@ class QueueSqsQueueTest extends TestCase
1620
protected $sqs;
1721
protected $account;
1822
protected $queueName;
23+
protected $fifoQueueName;
1924
protected $baseUrl;
2025
protected $prefix;
2126
protected $queueUrl;
27+
protected $fifoQueueUrl;
2228
protected $mockedJob;
2329
protected $mockedData;
2430
protected $mockedPayload;
2531
protected $mockedDelay;
32+
protected $mockedMessageGroupId;
33+
protected $mockedDeduplicationId;
2634
protected $mockedMessageId;
2735
protected $mockedReceiptHandle;
2836
protected $mockedSendMessageResponseModel;
@@ -42,16 +50,20 @@ protected function setUp(): void
4250

4351
$this->account = '1234567891011';
4452
$this->queueName = 'emails';
53+
$this->fifoQueueName = 'emails.fifo';
4554
$this->baseUrl = 'https://sqs.someregion.amazonaws.com';
4655

4756
// This is how the modified getQueue builds the queueUrl
4857
$this->prefix = $this->baseUrl.'/'.$this->account.'/';
4958
$this->queueUrl = $this->prefix.$this->queueName;
59+
$this->fifoQueueUrl = $this->prefix.$this->fifoQueueName;
5060

5161
$this->mockedJob = 'foo';
5262
$this->mockedData = ['data'];
5363
$this->mockedPayload = json_encode(['job' => $this->mockedJob, 'data' => $this->mockedData]);
5464
$this->mockedDelay = 10;
65+
$this->mockedMessageGroupId = 'group-1';
66+
$this->mockedDeduplicationId = 'deduplication-id-1';
5567
$this->mockedMessageId = 'e3cd03ee-59a3-4ad8-b0aa-ee2e3808ac81';
5668
$this->mockedReceiptHandle = '0NNAq8PwvXuWv5gMtS9DJ8qEdyiUwbAjpp45w2m6M4SJ1Y+PxCh7R930NRB8ylSacEmoSnW18bgd4nK\/O6ctE+VFVul4eD23mA07vVoSnPI4F\/voI1eNCp6Iax0ktGmhlNVzBwaZHEr91BRtqTRM3QKd2ASF8u+IQaSwyl\/DGK+P1+dqUOodvOVtExJwdyDLy1glZVgm85Yw9Jf5yZEEErqRwzYz\/qSigdvW4sm2l7e4phRol\/+IjMtovOyH\/ukueYdlVbQ4OshQLENhUKe7RNN5i6bE\/e5x9bnPhfj2gbM';
5769

@@ -237,4 +249,182 @@ public function testGetFifoQueueEnsuresTheQueueIsOnlySuffixedOnce()
237249
$queueUrl = $this->baseUrl.'/'.$this->account.'/test'.$suffix.'.fifo';
238250
$this->assertEquals($queueUrl, $queue->getQueue('test-staging.fifo'));
239251
}
252+
253+
public function testPushProperlyPushesJobObjectOntoSqs()
254+
{
255+
$job = new FakeSqsJob();
256+
257+
$queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['createPayload', 'getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock();
258+
$queue->setContainer($container = m::spy(Container::class));
259+
$queue->expects($this->once())->method('createPayload')->with($job, $this->queueName, $this->mockedData)->willReturn($this->mockedPayload);
260+
$queue->expects($this->once())->method('getQueue')->with($this->queueName)->willReturn($this->queueUrl);
261+
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload])->andReturn($this->mockedSendMessageResponseModel);
262+
$id = $queue->push($job, $this->mockedData, $this->queueName);
263+
$this->assertEquals($this->mockedMessageId, $id);
264+
$container->shouldHaveReceived('bound')->with('events')->twice();
265+
}
266+
267+
public function testPendingDispatchProperlyPushesJobObjectOntoSqs()
268+
{
269+
$job = new FakeSqsJob();
270+
271+
$queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['createPayload', 'getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock();
272+
$queue->setContainer($container = m::spy(Container::class));
273+
$queue->expects($this->once())->method('createPayload')->with($job, $this->queueName, '')->willReturn($this->mockedPayload);
274+
$queue->expects($this->once())->method('getQueue')->with(null)->willReturn($this->queueUrl);
275+
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload])->andReturn($this->mockedSendMessageResponseModel);
276+
277+
$dispatcher = new Dispatcher($container, fn () => $queue);
278+
app()->instance(DispatcherContract::class, $dispatcher);
279+
280+
FakeSqsJob::dispatch();
281+
282+
$container->shouldHaveReceived('bound')->with('events')->twice();
283+
}
284+
285+
public function testPushProperlyPushesJobObjectOntoSqsFairQueue()
286+
{
287+
$job = (new FakeSqsJob())->onGroup($this->mockedMessageGroupId);
288+
289+
$queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['createPayload', 'getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock();
290+
$queue->setContainer($container = m::spy(Container::class));
291+
$queue->expects($this->once())->method('createPayload')->with($job, $this->queueName, $this->mockedData)->willReturn($this->mockedPayload);
292+
$queue->expects($this->once())->method('getQueue')->with($this->queueName)->willReturn($this->queueUrl);
293+
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload, 'MessageGroupId' => $this->mockedMessageGroupId])->andReturn($this->mockedSendMessageResponseModel);
294+
$id = $queue->push($job, $this->mockedData, $this->queueName);
295+
$this->assertEquals($this->mockedMessageId, $id);
296+
$container->shouldHaveReceived('bound')->with('events')->twice();
297+
}
298+
299+
public function testPendingDispatchProperlyPushesJobObjectOntoSqsFairQueue()
300+
{
301+
$job = (new FakeSqsJob())->onGroup($this->mockedMessageGroupId);
302+
303+
$queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['createPayload', 'getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock();
304+
$queue->setContainer($container = m::spy(Container::class));
305+
$queue->expects($this->once())->method('createPayload')->with($job, $this->queueName, '')->willReturn($this->mockedPayload);
306+
$queue->expects($this->once())->method('getQueue')->with(null)->willReturn($this->queueUrl);
307+
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload, 'MessageGroupId' => $this->mockedMessageGroupId])->andReturn($this->mockedSendMessageResponseModel);
308+
309+
$dispatcher = new Dispatcher($container, fn () => $queue);
310+
app()->instance(DispatcherContract::class, $dispatcher);
311+
312+
FakeSqsJob::dispatch()->onGroup($this->mockedMessageGroupId);
313+
314+
$container->shouldHaveReceived('bound')->with('events')->twice();
315+
}
316+
317+
public function testPushProperlyPushesJobStringOntoSqsFifoQueue()
318+
{
319+
Str::createUuidsUsing(fn () => $this->mockedDeduplicationId);
320+
321+
$queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['createPayload', 'getQueue'])->setConstructorArgs([$this->sqs, $this->fifoQueueName, $this->account])->getMock();
322+
$queue->setContainer($container = m::spy(Container::class));
323+
$queue->expects($this->once())->method('createPayload')->with($this->mockedJob, $this->fifoQueueName, $this->mockedData)->willReturn($this->mockedPayload);
324+
$queue->expects($this->once())->method('getQueue')->with($this->fifoQueueName)->willReturn($this->fifoQueueUrl);
325+
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->fifoQueueUrl, 'MessageBody' => $this->mockedPayload, 'MessageGroupId' => $this->fifoQueueName, 'MessageDeduplicationId' => $this->mockedDeduplicationId])->andReturn($this->mockedSendMessageResponseModel);
326+
$id = $queue->push($this->mockedJob, $this->mockedData, $this->fifoQueueName);
327+
$this->assertEquals($this->mockedMessageId, $id);
328+
$container->shouldHaveReceived('bound')->with('events')->twice();
329+
330+
Str::createUuidsNormally();
331+
}
332+
333+
public function testPushProperlyPushesJobObjectOntoSqsFifoQueue()
334+
{
335+
Str::createUuidsUsing(fn () => $this->mockedDeduplicationId);
336+
337+
$job = (new FakeSqsJob())->onGroup($this->mockedMessageGroupId);
338+
339+
$queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['createPayload', 'getQueue'])->setConstructorArgs([$this->sqs, $this->fifoQueueName, $this->account])->getMock();
340+
$queue->setContainer($container = m::spy(Container::class));
341+
$queue->expects($this->once())->method('createPayload')->with($job, $this->fifoQueueName, $this->mockedData)->willReturn($this->mockedPayload);
342+
$queue->expects($this->once())->method('getQueue')->with($this->fifoQueueName)->willReturn($this->fifoQueueUrl);
343+
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->fifoQueueUrl, 'MessageBody' => $this->mockedPayload, 'MessageGroupId' => $this->mockedMessageGroupId, 'MessageDeduplicationId' => $this->mockedDeduplicationId])->andReturn($this->mockedSendMessageResponseModel);
344+
$id = $queue->push($job, $this->mockedData, $this->fifoQueueName);
345+
$this->assertEquals($this->mockedMessageId, $id);
346+
$container->shouldHaveReceived('bound')->with('events')->twice();
347+
348+
Str::createUuidsNormally();
349+
}
350+
351+
public function testPendingDispatchProperlyPushesJobObjectOntoSqsFifoQueue()
352+
{
353+
Str::createUuidsUsing(fn () => $this->mockedDeduplicationId);
354+
355+
$job = (new FakeSqsJob())->onGroup($this->mockedMessageGroupId);
356+
357+
$queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['createPayload', 'getQueue'])->setConstructorArgs([$this->sqs, $this->fifoQueueName, $this->account])->getMock();
358+
$queue->setContainer($container = m::spy(Container::class));
359+
$queue->expects($this->once())->method('createPayload')->with($job, $this->fifoQueueName, '')->willReturn($this->mockedPayload);
360+
$queue->expects($this->once())->method('getQueue')->with(null)->willReturn($this->fifoQueueUrl);
361+
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->fifoQueueUrl, 'MessageBody' => $this->mockedPayload, 'MessageGroupId' => $this->mockedMessageGroupId, 'MessageDeduplicationId' => $this->mockedDeduplicationId])->andReturn($this->mockedSendMessageResponseModel);
362+
363+
$dispatcher = new Dispatcher($container, fn () => $queue);
364+
app()->instance(DispatcherContract::class, $dispatcher);
365+
366+
FakeSqsJob::dispatch()->onGroup($this->mockedMessageGroupId);
367+
368+
$container->shouldHaveReceived('bound')->with('events')->twice();
369+
370+
Str::createUuidsNormally();
371+
}
372+
373+
public function testDelayedPushProperlyPushesJobStringOntoSqsFifoQueueWithoutDelay()
374+
{
375+
Str::createUuidsUsing(fn () => $this->mockedDeduplicationId);
376+
377+
$queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['createPayload', 'secondsUntil', 'getQueue'])->setConstructorArgs([$this->sqs, $this->fifoQueueName, $this->account])->getMock();
378+
$queue->setContainer($container = m::spy(Container::class));
379+
$queue->expects($this->once())->method('createPayload')->with($this->mockedJob, $this->fifoQueueName, $this->mockedData)->willReturn($this->mockedPayload);
380+
$queue->expects($this->never())->method('secondsUntil')->with($this->mockedDelay)->willReturn($this->mockedDelay);
381+
$queue->expects($this->once())->method('getQueue')->with($this->fifoQueueName)->willReturn($this->fifoQueueUrl);
382+
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->fifoQueueUrl, 'MessageBody' => $this->mockedPayload, 'MessageGroupId' => $this->fifoQueueName, 'MessageDeduplicationId' => $this->mockedDeduplicationId])->andReturn($this->mockedSendMessageResponseModel);
383+
$id = $queue->later($this->mockedDelay, $this->mockedJob, $this->mockedData, $this->fifoQueueName);
384+
$this->assertEquals($this->mockedMessageId, $id);
385+
$container->shouldHaveReceived('bound')->with('events')->twice();
386+
387+
Str::createUuidsNormally();
388+
}
389+
390+
public function testDelayedPushProperlyPushesJobObjectOntoSqsFifoQueueWithoutDelay()
391+
{
392+
Str::createUuidsUsing(fn () => $this->mockedDeduplicationId);
393+
394+
$job = (new FakeSqsJob())->onGroup($this->mockedMessageGroupId);
395+
396+
$queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['createPayload', 'secondsUntil', 'getQueue'])->setConstructorArgs([$this->sqs, $this->fifoQueueName, $this->account])->getMock();
397+
$queue->setContainer($container = m::spy(Container::class));
398+
$queue->expects($this->once())->method('createPayload')->with($job, $this->fifoQueueName, $this->mockedData)->willReturn($this->mockedPayload);
399+
$queue->expects($this->never())->method('secondsUntil')->with($this->mockedDelay)->willReturn($this->mockedDelay);
400+
$queue->expects($this->once())->method('getQueue')->with($this->fifoQueueName)->willReturn($this->fifoQueueUrl);
401+
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->fifoQueueUrl, 'MessageBody' => $this->mockedPayload, 'MessageGroupId' => $this->mockedMessageGroupId, 'MessageDeduplicationId' => $this->mockedDeduplicationId])->andReturn($this->mockedSendMessageResponseModel);
402+
$id = $queue->later($this->mockedDelay, $job, $this->mockedData, $this->fifoQueueName);
403+
$this->assertEquals($this->mockedMessageId, $id);
404+
$container->shouldHaveReceived('bound')->with('events')->twice();
405+
406+
Str::createUuidsNormally();
407+
}
408+
409+
public function testDelayedPendingDispatchProperlyPushesJobObjectOntoSqsFifoQueueWithoutDelay()
410+
{
411+
Str::createUuidsUsing(fn () => $this->mockedDeduplicationId);
412+
413+
$job = (new FakeSqsJob())->onGroup($this->mockedMessageGroupId)->delay($this->mockedDelay);
414+
415+
$queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['createPayload', 'getQueue'])->setConstructorArgs([$this->sqs, $this->fifoQueueName, $this->account])->getMock();
416+
$queue->setContainer($container = m::spy(Container::class));
417+
$queue->expects($this->once())->method('createPayload')->with($job, $this->fifoQueueName, '')->willReturn($this->mockedPayload);
418+
$queue->expects($this->once())->method('getQueue')->with(null)->willReturn($this->fifoQueueUrl);
419+
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->fifoQueueUrl, 'MessageBody' => $this->mockedPayload, 'MessageGroupId' => $this->mockedMessageGroupId, 'MessageDeduplicationId' => $this->mockedDeduplicationId])->andReturn($this->mockedSendMessageResponseModel);
420+
421+
$dispatcher = new Dispatcher($container, fn () => $queue);
422+
app()->instance(DispatcherContract::class, $dispatcher);
423+
424+
FakeSqsJob::dispatch()->onGroup($this->mockedMessageGroupId)->delay($this->mockedDelay);
425+
426+
$container->shouldHaveReceived('bound')->with('events')->twice();
427+
428+
Str::createUuidsNormally();
429+
}
240430
}

0 commit comments

Comments
 (0)