Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/Illuminate/Bus/Queueable.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use Illuminate\Queue\CallQueuedClosure;
use Illuminate\Support\Arr;
use Illuminate\Support\Collection;
use Laravel\SerializableClosure\SerializableClosure;
use PHPUnit\Framework\Assert as PHPUnit;
use RuntimeException;

Expand Down Expand Up @@ -34,6 +35,13 @@ trait Queueable
*/
public $messageGroup;

/**
* The job deduplicator callback the job should use to generate the deduplication ID.
*
* @var \Laravel\SerializableClosure\SerializableClosure|null
*/
public $deduplicator;

/**
* The number of seconds before the job should be made available.
*
Expand Down Expand Up @@ -124,6 +132,23 @@ public function onGroup($group)
return $this;
}

/**
* Set the desired job deduplicator callback.
*
* This feature is only supported by some queues, such as Amazon SQS FIFO.
*
* @param callable|null $deduplicator
* @return $this
*/
public function withDeduplicator($deduplicator)
{
$this->deduplicator = $deduplicator instanceof Closure
? new SerializableClosure($deduplicator)
: $deduplicator;

return $this;
}

/**
* Set the desired connection for the chain.
*
Expand Down
5 changes: 5 additions & 0 deletions src/Illuminate/Events/Dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,11 @@ protected function propagateListenerOptions($listener, $job)
$job->timeout = $listener->timeout ?? null;
$job->failOnTimeout = $listener->failOnTimeout ?? false;
$job->tries = method_exists($listener, 'tries') ? $listener->tries(...$data) : ($listener->tries ?? null);
$job->messageGroup = method_exists($listener, 'messageGroup') ? $listener->messageGroup(...$data) : ($listener->messageGroup ?? null);
$job->withDeduplicator(method_exists($listener, 'deduplicator')
? $listener->deduplicator(...$data)
: (method_exists($listener, 'deduplicationId') ? $listener->deduplicationId(...) : null)
);

$job->through(array_merge(
method_exists($listener, 'middleware') ? $listener->middleware(...$data) : [],
Expand Down
53 changes: 52 additions & 1 deletion src/Illuminate/Events/QueuedClosure.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ class QueuedClosure
*/
public $queue;

/**
* The job "group" the job should be sent to.
*
* @var string|null
*/
public $messageGroup;

/**
* The job deduplicator callback the job should use to generate the deduplication ID.
*
* @var \Laravel\SerializableClosure\SerializableClosure|null
*/
public $deduplicator;

/**
* The number of seconds before the job should be made available.
*
Expand Down Expand Up @@ -81,6 +95,38 @@ public function onQueue($queue)
return $this;
}

/**
* Set the desired job "group".
*
* This feature is only supported by some queues, such as Amazon SQS.
*
* @param \UnitEnum|string $group
* @return $this
*/
public function onGroup($group)
{
$this->messageGroup = enum_value($group);

return $this;
}

/**
* Set the desired job deduplicator callback.
*
* This feature is only supported by some queues, such as Amazon SQS FIFO.
*
* @param callable|null $deduplicator
* @return $this
*/
public function withDeduplicator($deduplicator)
{
$this->deduplicator = $deduplicator instanceof Closure
? new SerializableClosure($deduplicator)
: $deduplicator;

return $this;
}

/**
* Set the desired delay in seconds for the job.
*
Expand Down Expand Up @@ -121,7 +167,12 @@ public function resolve()
'catch' => (new Collection($this->catchCallbacks))
->map(fn ($callback) => new SerializableClosure($callback))
->all(),
]))->onConnection($this->connection)->onQueue($this->queue)->delay($this->delay);
]))
->onConnection($this->connection)
->onQueue($this->queue)
->delay($this->delay)
->onGroup($this->messageGroup)
->withDeduplicator($this->deduplicator);
};
}
}
15 changes: 15 additions & 0 deletions src/Illuminate/Foundation/Bus/PendingDispatch.php
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ public function onGroup($group)
return $this;
}

/**
* Set the desired job deduplicator callback.
*
* This feature is only supported by some queues, such as Amazon SQS FIFO.
*
* @param callable|null $deduplicator
* @return $this
*/
public function withDeduplicator($deduplicator)
{
$this->job->withDeduplicator($deduplicator);

return $this;
}

/**
* Set the desired connection for the chain.
*
Expand Down
7 changes: 7 additions & 0 deletions src/Illuminate/Mail/Mailable.php
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,14 @@ public function later($delay, Queue $queue)
*/
protected function newQueuedJob()
{
$messageGroup = $this->messageGroup ?? null;

/** @phpstan-ignore callable.nonNativeMethod (false positive since method_exists guard is used) */
$deduplicator = $this->deduplicator ?? (method_exists($this, 'deduplicationId') ? $this->deduplicationId(...) : null);

return Container::getInstance()->make(SendQueuedMailable::class, ['mailable' => $this])
->onGroup($messageGroup)
->withDeduplicator($deduplicator)
->through(array_merge(
method_exists($this, 'middleware') ? $this->middleware() : [],
$this->middleware ?? []
Expand Down
14 changes: 14 additions & 0 deletions src/Illuminate/Notifications/NotificationSender.php
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,18 @@ protected function queueNotification($notifiables, $notification)
$delay = $notification->withDelay($notifiable, $channel) ?? null;
}

$messageGroup = $notification->messageGroup ?? null;

if (method_exists($notification, 'withMessageGroups')) {
$messageGroup = $notification->withMessageGroups($notifiable, $channel) ?? null;
}

$deduplicator = $notification->deduplicator ?? (method_exists($notification, 'deduplicationId') ? $notification->deduplicationId(...) : null);

if (method_exists($notification, 'withDeduplicators')) {
$deduplicator = $notification->withDeduplicators($notifiable, $channel) ?? null;
}

$middleware = $notification->middleware ?? [];

if (method_exists($notification, 'middleware')) {
Expand All @@ -265,6 +277,8 @@ protected function queueNotification($notifiables, $notification)
->onConnection($connection)
->onQueue($queue)
->delay(is_array($delay) ? ($delay[$channel] ?? null) : $delay)
->onGroup(is_array($messageGroup) ? ($messageGroup[$channel] ?? null) : $messageGroup)
->withDeduplicator(is_array($deduplicator) ? ($deduplicator[$channel] ?? null) : $deduplicator)
->through($middleware)
);
}
Expand Down
10 changes: 6 additions & 4 deletions src/Illuminate/Queue/SqsQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public function push($job, $data = '', $queue = null)
$queue,
null,
function ($payload, $queue) use ($job) {
return $this->pushRaw($payload, $queue, $this->getQueueableOptions($job, $queue));
return $this->pushRaw($payload, $queue, $this->getQueueableOptions($job, $queue, $payload));
}
);
}
Expand Down Expand Up @@ -200,7 +200,7 @@ public function later($delay, $job, $data = '', $queue = null)
$queue,
$delay,
function ($payload, $queue, $delay) use ($job) {
return $this->pushRaw($payload, $queue, $this->getQueueableOptions($job, $queue, $delay));
return $this->pushRaw($payload, $queue, $this->getQueueableOptions($job, $queue, $payload, $delay));
}
);
}
Expand All @@ -210,10 +210,11 @@ function ($payload, $queue, $delay) use ($job) {
*
* @param mixed $job
* @param string|null $queue
* @param string $payload
* @param \DateTimeInterface|\DateInterval|int|null $delay
* @return array{DelaySeconds?: int, MessageGroupId?: string, MessageDeduplicationId?: string}
*/
protected function getQueueableOptions($job, $queue, $delay = null): array
protected function getQueueableOptions($job, $queue, $payload, $delay = null): array
{
// Make sure we have a queue name to properly determine if it's a FIFO queue...
$queue ??= $this->default;
Expand Down Expand Up @@ -255,7 +256,8 @@ protected function getQueueableOptions($job, $queue, $delay = null): array

if ($isFifo) {
$messageDeduplicationId = match (true) {
$isObject && method_exists($job, 'deduplicationId') => transform($job->deduplicationId(), $transformToString),
$isObject && isset($job->deduplicator) && is_callable($job->deduplicator) => transform(call_user_func($job->deduplicator, $payload, $queue), $transformToString),
$isObject && method_exists($job, 'deduplicationId') => transform($job->deduplicationId($payload, $queue), $transformToString),
default => (string) Str::orderedUuid(),
};
}
Expand Down
133 changes: 133 additions & 0 deletions tests/Events/QueuedEventsTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Illuminate\Events\Dispatcher;
use Illuminate\Queue\QueueManager;
use Illuminate\Support\Testing\Fakes\QueueFake;
use Laravel\SerializableClosure\SerializableClosure;
use Mockery as m;
use PHPUnit\Framework\TestCase;

Expand Down Expand Up @@ -197,6 +198,82 @@ public function testQueuePropagateTries()
});
}

public function testQueuePropagateMessageGroupProperty()
{
$d = new Dispatcher;

$fakeQueue = new QueueFake(new Container);

$d->setQueueResolver(function () use ($fakeQueue) {
return $fakeQueue;
});

$d->listen('some.event', TestDispatcherWithMessageGroupProperty::class.'@handle');
$d->dispatch('some.event', ['foo', 'bar']);

$fakeQueue->assertPushed(CallQueuedListener::class, function ($job) {
return $job->messageGroup === 'group-property';
});
}

public function testQueuePropagateMessageGroupMethodOverProperty()
{
$d = new Dispatcher;

$fakeQueue = new QueueFake(new Container);

$d->setQueueResolver(function () use ($fakeQueue) {
return $fakeQueue;
});

$d->listen('some.event', TestDispatcherWithMessageGroupMethod::class.'@handle');
$d->dispatch('some.event', ['foo', 'bar']);

$fakeQueue->assertPushed(CallQueuedListener::class, function ($job) {
return $job->messageGroup === 'group-method';
});
}

public function testQueuePropagateDeduplicationIdMethod()
{
$d = new Dispatcher;

$fakeQueue = new QueueFake(new Container);

$d->setQueueResolver(function () use ($fakeQueue) {
return $fakeQueue;
});

$d->listen('some.event', TestDispatcherWithDeduplicationIdMethod::class.'@handle');
$d->dispatch('some.event', ['foo', 'bar']);

$fakeQueue->assertPushed(CallQueuedListener::class, function ($job) {
$this->assertInstanceOf(SerializableClosure::class, $job->deduplicator);

return is_callable($job->deduplicator) && call_user_func($job->deduplicator, '', null) === 'deduplication-id-method';
});
}

public function testQueuePropagateDeduplicatorMethodOverDeduplicationIdMethod()
{
$d = new Dispatcher;

$fakeQueue = new QueueFake(new Container);

$d->setQueueResolver(function () use ($fakeQueue) {
return $fakeQueue;
});

$d->listen('some.event', TestDispatcherWithDeduplicatorMethod::class.'@handle');
$d->dispatch('some.event', ['foo', 'bar']);

$fakeQueue->assertPushed(CallQueuedListener::class, function ($job) {
$this->assertInstanceOf(SerializableClosure::class, $job->deduplicator);

return is_callable($job->deduplicator) && call_user_func($job->deduplicator, '', null) === 'deduplicator-method';
});
}

public function testQueuePropagateMiddleware()
{
$d = new Dispatcher;
Expand Down Expand Up @@ -323,6 +400,62 @@ public function handle()
}
}

class TestDispatcherWithMessageGroupProperty implements ShouldQueue
{
public $messageGroup = 'group-property';

public function handle()
{
//
}
}

class TestDispatcherWithMessageGroupMethod implements ShouldQueue
{
public $messageGroup = 'group-property';

public function handle()
{
//
}

public function messageGroup($event)
{
return 'group-method';
}
}

class TestDispatcherWithDeduplicationIdMethod implements ShouldQueue
{
public function handle()
{
//
}

public function deduplicationId($payload, $queue)
{
return 'deduplication-id-method';
}
}

class TestDispatcherWithDeduplicatorMethod implements ShouldQueue
{
public function handle()
{
//
}

public function deduplicationId($payload, $queue)
{
return 'deduplication-id-method';
}

public function deduplicator($event)
{
return fn ($payload, $queue) => 'deduplicator-method';
}
}

class TestDispatcherMiddleware implements ShouldQueue
{
public function middleware($a, $b)
Expand Down
Loading