diff --git a/src/Illuminate/Contracts/Queue/Queue.php b/src/Illuminate/Contracts/Queue/Queue.php index 1994cddf79eb..816f8a7620ee 100644 --- a/src/Illuminate/Contracts/Queue/Queue.php +++ b/src/Illuminate/Contracts/Queue/Queue.php @@ -2,6 +2,12 @@ namespace Illuminate\Contracts\Queue; +/** + * @method int pendingSize(string|null $queue = null) + * @method int delayedSize(string|null $queue = null) + * @method int reservedSize(string|null $queue = null) + * @method int|null creationTimeOfOldestPendingJob(string|null $queue = null) + */ interface Queue { /** @@ -37,7 +43,6 @@ public function pushOn($queue, $job, $data = ''); * * @param string $payload * @param string|null $queue - * @param array $options * @return mixed */ public function pushRaw($payload, $queue = null, array $options = []); diff --git a/src/Illuminate/Queue/BeanstalkdQueue.php b/src/Illuminate/Queue/BeanstalkdQueue.php index 56e9c4e0664b..9c0c3e0e7988 100755 --- a/src/Illuminate/Queue/BeanstalkdQueue.php +++ b/src/Illuminate/Queue/BeanstalkdQueue.php @@ -71,7 +71,56 @@ public function __construct( */ public function size($queue = null) { - return (int) $this->pheanstalk->statsTube(new TubeName($this->getQueue($queue)))->currentJobsReady; + $stats = $this->pheanstalk->statsTube(new TubeName($this->getQueue($queue))); + + return $stats->currentJobsReady + + $stats->currentJobsDelayed + + $stats->currentJobsReserved; + } + + /** + * Get the number of pending jobs. + * + * @param string|null $queue + * @return int + */ + public function pendingSize($queue = null) + { + return $this->pheanstalk->statsTube(new TubeName($this->getQueue($queue)))->currentJobsReady; + } + + /** + * Get the number of delayed jobs. + * + * @param string|null $queue + * @return int + */ + public function delayedSize($queue = null) + { + return $this->pheanstalk->statsTube(new TubeName($this->getQueue($queue)))->currentJobsDelayed; + } + + /** + * Get the number of reserved jobs. + * + * @param string|null $queue + * @return int + */ + public function reservedSize($queue = null) + { + return $this->pheanstalk->statsTube(new TubeName($this->getQueue($queue)))->currentJobsReserved; + } + + /** + * Get the creation timestamp of the oldest pending job, excluding delayed jobs. + * + * @param string|null $queue + * @return int|null + */ + public function creationTimeOfOldestPendingJob($queue = null) + { + // Not supported by Beanstalkd... + return null; } /** diff --git a/src/Illuminate/Queue/Console/MonitorCommand.php b/src/Illuminate/Queue/Console/MonitorCommand.php index 08ab98c5b8ae..a15e08e61116 100644 --- a/src/Illuminate/Queue/Console/MonitorCommand.php +++ b/src/Illuminate/Queue/Console/MonitorCommand.php @@ -99,6 +99,18 @@ protected function parseQueues($queues) 'connection' => $connection, 'queue' => $queue, 'size' => $size = $this->manager->connection($connection)->size($queue), + 'pending' => method_exists($this->manager->connection($connection), 'pendingSize') + ? $this->manager->connection($connection)->pendingSize($queue) + : null, + 'delayed' => method_exists($this->manager->connection($connection), 'delayedSize') + ? $this->manager->connection($connection)->delayedSize($queue) + : null, + 'reserved' => method_exists($this->manager->connection($connection), 'reservedSize') + ? $this->manager->connection($connection)->reservedSize($queue) + : null, + 'oldest_pending' => method_exists($this->manager->connection($connection), 'oldestPending') + ? $this->manager->connection($connection)->creationTimeOfOldestPendingJob($queue) + : null, 'status' => $size >= $this->option('max') ? 'ALERT' : 'OK', ]; }); @@ -121,6 +133,10 @@ protected function displaySizes(Collection $queues) $status = '['.$queue['size'].'] '.$queue['status']; $this->components->twoColumnDetail($name, $status); + $this->components->twoColumnDetail('Pending jobs', $queue['pending'] ?? 'N/A'); + $this->components->twoColumnDetail('Delayed jobs', $queue['delayed'] ?? 'N/A'); + $this->components->twoColumnDetail('Reserved jobs', $queue['reserved'] ?? 'N/A'); + $this->line(''); }); $this->newLine(); diff --git a/src/Illuminate/Queue/DatabaseQueue.php b/src/Illuminate/Queue/DatabaseQueue.php index 41d04e2b001c..2b76419a8fcf 100644 --- a/src/Illuminate/Queue/DatabaseQueue.php +++ b/src/Illuminate/Queue/DatabaseQueue.php @@ -79,6 +79,66 @@ public function size($queue = null) ->count(); } + /** + * Get the number of pending jobs. + * + * @param string|null $queue + * @return int + */ + public function pendingSize($queue = null) + { + return $this->database->table($this->table) + ->where('queue', $this->getQueue($queue)) + ->whereNull('reserved_at') + ->where('available_at', '<=', $this->currentTime()) + ->count(); + } + + /** + * Get the number of delayed jobs. + * + * @param string|null $queue + * @return int + */ + public function delayedSize($queue = null) + { + return $this->database->table($this->table) + ->where('queue', $this->getQueue($queue)) + ->whereNull('reserved_at') + ->where('available_at', '>', $this->currentTime()) + ->count(); + } + + /** + * Get the number of reserved jobs. + * + * @param string|null $queue + * @return int + */ + public function reservedSize($queue = null) + { + return $this->database->table($this->table) + ->where('queue', $this->getQueue($queue)) + ->whereNotNull('reserved_at') + ->count(); + } + + /** + * Get the creation timestamp of the oldest pending job, excluding delayed jobs. + * + * @param string|null $queue + * @return int|null + */ + public function creationTimeOfOldestPendingJob($queue = null) + { + return $this->database->table($this->table) + ->where('queue', $this->getQueue($queue)) + ->whereNull('reserved_at') + ->where('available_at', '<=', $this->currentTime()) + ->oldest('available_at') + ->value('available_at'); + } + /** * Push a new job onto the queue. * diff --git a/src/Illuminate/Queue/NullQueue.php b/src/Illuminate/Queue/NullQueue.php index 10493a1b699d..5c3c3c5798bf 100644 --- a/src/Illuminate/Queue/NullQueue.php +++ b/src/Illuminate/Queue/NullQueue.php @@ -17,6 +17,50 @@ public function size($queue = null) return 0; } + /** + * Get the number of pending jobs. + * + * @param string|null $queue + * @return int + */ + public function pendingSize($queue = null) + { + return 0; + } + + /** + * Get the number of delayed jobs. + * + * @param string|null $queue + * @return int + */ + public function delayedSize($queue = null) + { + return 0; + } + + /** + * Get the number of reserved jobs. + * + * @param string|null $queue + * @return int + */ + public function reservedSize($queue = null) + { + return 0; + } + + /** + * Get the creation timestamp of the oldest pending job, excluding delayed jobs. + * + * @param string|null $queue + * @return int|null + */ + public function creationTimeOfOldestPendingJob($queue = null) + { + return null; + } + /** * Push a new job onto the queue. * diff --git a/src/Illuminate/Queue/RedisQueue.php b/src/Illuminate/Queue/RedisQueue.php index 84cfbde358cf..ab2179a77f3d 100644 --- a/src/Illuminate/Queue/RedisQueue.php +++ b/src/Illuminate/Queue/RedisQueue.php @@ -109,6 +109,58 @@ public function size($queue = null) ); } + /** + * Get the number of pending jobs. + * + * @param string|null $queue + * @return int + */ + public function pendingSize($queue = null) + { + return $this->getConnection()->llen($this->getQueue($queue)); + } + + /** + * Get the number of delayed jobs. + * + * @param string|null $queue + * @return int + */ + public function delayedSize($queue = null) + { + return $this->getConnection()->zcard($this->getQueue($queue).':delayed'); + } + + /** + * Get the number of reserved jobs. + * + * @param string|null $queue + * @return int + */ + public function reservedSize($queue = null) + { + return $this->getConnection()->zcard($this->getQueue($queue).':reserved'); + } + + /** + * Get the creation timestamp of the oldest pending job, excluding delayed jobs. + * + * @param string|null $queue + * @return int|null + */ + public function creationTimeOfOldestPendingJob($queue = null) + { + $payload = $this->getConnection()->lindex($this->getQueue($queue), 0); + + if (! $payload) { + return null; + } + + $data = json_decode($payload, true); + + return $data['createdAt'] ?? null; + } + /** * Push an array of jobs onto the queue. * diff --git a/src/Illuminate/Queue/SqsQueue.php b/src/Illuminate/Queue/SqsQueue.php index a128be81109f..14c828d4bd3f 100755 --- a/src/Illuminate/Queue/SqsQueue.php +++ b/src/Illuminate/Queue/SqsQueue.php @@ -68,15 +68,83 @@ public function __construct( * @return int */ public function size($queue = null) + { + $response = $this->sqs->getQueueAttributes([ + 'QueueUrl' => $this->getQueue($queue), + 'AttributeNames' => [ + 'ApproximateNumberOfMessages', + 'ApproximateNumberOfMessagesDelayed', + 'ApproximateNumberOfMessagesNotVisible', + ], + ]); + + $a = $response['Attributes']; + + return (int) $a['ApproximateNumberOfMessages'] + + (int) $a['ApproximateNumberOfMessagesDelayed'] + + (int) $a['ApproximateNumberOfMessagesNotVisible']; + } + + /** + * Get the number of pending jobs. + * + * @param string|null $queue + * @return int + */ + public function pendingSize($queue = null) { $response = $this->sqs->getQueueAttributes([ 'QueueUrl' => $this->getQueue($queue), 'AttributeNames' => ['ApproximateNumberOfMessages'], ]); - $attributes = $response->get('Attributes'); + return (int) $response['Attributes']['ApproximateNumberOfMessages'] ?? 0; + } + + /** + * Get the number of delayed jobs. + * + * @param string|null $queue + * @return int + */ + public function delayedSize($queue = null) + { + $response = $this->sqs->getQueueAttributes([ + 'QueueUrl' => $this->getQueue($queue), + 'AttributeNames' => ['ApproximateNumberOfMessagesDelayed'], + ]); + + return (int) $response['Attributes']['ApproximateNumberOfMessagesDelayed'] ?? 0; + } + + /** + * Get the number of reserved jobs. + * + * @param string|null $queue + * @return int + */ + public function reservedSize($queue = null) + { + $response = $this->sqs->getQueueAttributes([ + 'QueueUrl' => $this->getQueue($queue), + 'AttributeNames' => ['ApproximateNumberOfMessagesNotVisible'], + ]); + + return (int) $response['Attributes']['ApproximateNumberOfMessagesNotVisible'] ?? 0; + } - return (int) $attributes['ApproximateNumberOfMessages']; + /** + * Get the creation timestamp of the oldest pending job, excluding delayed jobs. + * + * Not supported by SQS, returns null. + * + * @param string|null $queue + * @return int|null + */ + public function creationTimeOfOldestPendingJob($queue = null) + { + // Not supported by SQS... + return null; } /** diff --git a/src/Illuminate/Queue/SyncQueue.php b/src/Illuminate/Queue/SyncQueue.php index b3413d6a5821..b7e20873b342 100755 --- a/src/Illuminate/Queue/SyncQueue.php +++ b/src/Illuminate/Queue/SyncQueue.php @@ -36,6 +36,50 @@ public function size($queue = null) return 0; } + /** + * Get the number of pending jobs. + * + * @param string|null $queue + * @return int + */ + public function pendingSize($queue = null) + { + return 0; + } + + /** + * Get the number of delayed jobs. + * + * @param string|null $queue + * @return int + */ + public function delayedSize($queue = null) + { + return 0; + } + + /** + * Get the number of reserved jobs. + * + * @param string|null $queue + * @return int + */ + public function reservedSize($queue = null) + { + return 0; + } + + /** + * Get the creation timestamp of the oldest pending job, excluding delayed jobs. + * + * @param string|null $queue + * @return int|null + */ + public function creationTimeOfOldestPendingJob($queue = null) + { + return null; + } + /** * Push a new job onto the queue. * diff --git a/src/Illuminate/Support/Testing/Fakes/QueueFake.php b/src/Illuminate/Support/Testing/Fakes/QueueFake.php index c2fa139c5fb2..246c8be19b2f 100644 --- a/src/Illuminate/Support/Testing/Fakes/QueueFake.php +++ b/src/Illuminate/Support/Testing/Fakes/QueueFake.php @@ -408,6 +408,50 @@ public function size($queue = null) ->count(); } + /** + * Get the number of pending jobs. + * + * @param string|null $queue + * @return int + */ + public function pendingSize($queue = null) + { + return $this->size($queue); + } + + /** + * Get the number of delayed jobs. + * + * @param string|null $queue + * @return int + */ + public function delayedSize($queue = null) + { + return 0; + } + + /** + * Get the number of reserved jobs. + * + * @param string|null $queue + * @return int + */ + public function reservedSize($queue = null) + { + return 0; + } + + /** + * Get the creation timestamp of the oldest pending job, excluding delayed jobs. + * + * @param string|null $queue + * @return int|null + */ + public function creationTimeOfOldestPendingJob($queue = null) + { + return null; + } + /** * Push a new job onto the queue. * diff --git a/tests/Queue/QueueSqsQueueTest.php b/tests/Queue/QueueSqsQueueTest.php index 021e66484b68..656b73e43497 100755 --- a/tests/Queue/QueueSqsQueueTest.php +++ b/tests/Queue/QueueSqsQueueTest.php @@ -148,9 +148,25 @@ public function testSizeProperlyReadsSqsQueueSize() { $queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock(); $queue->expects($this->once())->method('getQueue')->with($this->queueName)->willReturn($this->queueUrl); - $this->sqs->shouldReceive('getQueueAttributes')->once()->with(['QueueUrl' => $this->queueUrl, 'AttributeNames' => ['ApproximateNumberOfMessages']])->andReturn($this->mockedQueueAttributesResponseModel); + + $this->sqs->shouldReceive('getQueueAttributes')->once()->with([ + 'QueueUrl' => $this->queueUrl, + 'AttributeNames' => [ + 'ApproximateNumberOfMessages', + 'ApproximateNumberOfMessagesDelayed', + 'ApproximateNumberOfMessagesNotVisible', + ], + ])->andReturn(new Result([ + 'Attributes' => [ + 'ApproximateNumberOfMessages' => 1, + 'ApproximateNumberOfMessagesDelayed' => 2, + 'ApproximateNumberOfMessagesNotVisible' => 3, + ], + ])); + $size = $queue->size($this->queueName); - $this->assertEquals(1, $size); + + $this->assertEquals(6, $size); // 1 + 2 + 3 } public function testGetQueueProperlyResolvesUrlWithPrefix()