Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/Illuminate/Contracts/Queue/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

namespace Illuminate\Contracts\Queue;

/**
* @method int pendingSize(string|null $queue = null)
* @method int delayedSize(string|null $queue = null)
* @method int reservedSize(string|null $queue = null)
* @method int|null creationTimeOfOldestPendingJob(string|null $queue = null)
*/
interface Queue
{
/**
Expand Down Expand Up @@ -37,7 +43,6 @@ public function pushOn($queue, $job, $data = '');
*
* @param string $payload
* @param string|null $queue
* @param array $options
* @return mixed
*/
public function pushRaw($payload, $queue = null, array $options = []);
Expand Down
51 changes: 50 additions & 1 deletion src/Illuminate/Queue/BeanstalkdQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,56 @@ public function __construct(
*/
public function size($queue = null)
{
return (int) $this->pheanstalk->statsTube(new TubeName($this->getQueue($queue)))->currentJobsReady;
$stats = $this->pheanstalk->statsTube(new TubeName($this->getQueue($queue)));

return $stats->currentJobsReady
+ $stats->currentJobsDelayed
+ $stats->currentJobsReserved;
}

/**
* Get the number of pending jobs.
*
* @param string|null $queue
* @return int
*/
public function pendingSize($queue = null)
{
return $this->pheanstalk->statsTube(new TubeName($this->getQueue($queue)))->currentJobsReady;
}

/**
* Get the number of delayed jobs.
*
* @param string|null $queue
* @return int
*/
public function delayedSize($queue = null)
{
return $this->pheanstalk->statsTube(new TubeName($this->getQueue($queue)))->currentJobsDelayed;
}

/**
* Get the number of reserved jobs.
*
* @param string|null $queue
* @return int
*/
public function reservedSize($queue = null)
{
return $this->pheanstalk->statsTube(new TubeName($this->getQueue($queue)))->currentJobsReserved;
}

/**
* Get the creation timestamp of the oldest pending job, excluding delayed jobs.
*
* @param string|null $queue
* @return int|null
*/
public function creationTimeOfOldestPendingJob($queue = null)
{
// Not supported by Beanstalkd...
return null;
}

/**
Expand Down
16 changes: 16 additions & 0 deletions src/Illuminate/Queue/Console/MonitorCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,18 @@ protected function parseQueues($queues)
'connection' => $connection,
'queue' => $queue,
'size' => $size = $this->manager->connection($connection)->size($queue),
'pending' => method_exists($this->manager->connection($connection), 'pendingSize')
? $this->manager->connection($connection)->pendingSize($queue)
: null,
'delayed' => method_exists($this->manager->connection($connection), 'delayedSize')
? $this->manager->connection($connection)->delayedSize($queue)
: null,
'reserved' => method_exists($this->manager->connection($connection), 'reservedSize')
? $this->manager->connection($connection)->reservedSize($queue)
: null,
'oldest_pending' => method_exists($this->manager->connection($connection), 'oldestPending')
? $this->manager->connection($connection)->creationTimeOfOldestPendingJob($queue)
: null,
'status' => $size >= $this->option('max') ? '<fg=yellow;options=bold>ALERT</>' : '<fg=green;options=bold>OK</>',
];
});
Expand All @@ -121,6 +133,10 @@ protected function displaySizes(Collection $queues)
$status = '['.$queue['size'].'] '.$queue['status'];

$this->components->twoColumnDetail($name, $status);
$this->components->twoColumnDetail('Pending jobs', $queue['pending'] ?? 'N/A');
$this->components->twoColumnDetail('Delayed jobs', $queue['delayed'] ?? 'N/A');
$this->components->twoColumnDetail('Reserved jobs', $queue['reserved'] ?? 'N/A');
$this->line('');
});

$this->newLine();
Expand Down
60 changes: 60 additions & 0 deletions src/Illuminate/Queue/DatabaseQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,66 @@ public function size($queue = null)
->count();
}

/**
* Get the number of pending jobs.
*
* @param string|null $queue
* @return int
*/
public function pendingSize($queue = null)
{
return $this->database->table($this->table)
->where('queue', $this->getQueue($queue))
->whereNull('reserved_at')
->where('available_at', '<=', $this->currentTime())
->count();
}

/**
* Get the number of delayed jobs.
*
* @param string|null $queue
* @return int
*/
public function delayedSize($queue = null)
{
return $this->database->table($this->table)
->where('queue', $this->getQueue($queue))
->whereNull('reserved_at')
->where('available_at', '>', $this->currentTime())
->count();
}

/**
* Get the number of reserved jobs.
*
* @param string|null $queue
* @return int
*/
public function reservedSize($queue = null)
{
return $this->database->table($this->table)
->where('queue', $this->getQueue($queue))
->whereNotNull('reserved_at')
->count();
}

/**
* Get the creation timestamp of the oldest pending job, excluding delayed jobs.
*
* @param string|null $queue
* @return int|null
*/
public function creationTimeOfOldestPendingJob($queue = null)
{
return $this->database->table($this->table)
->where('queue', $this->getQueue($queue))
->whereNull('reserved_at')
->where('available_at', '<=', $this->currentTime())
->oldest('available_at')
->value('available_at');
}

/**
* Push a new job onto the queue.
*
Expand Down
44 changes: 44 additions & 0 deletions src/Illuminate/Queue/NullQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,50 @@ public function size($queue = null)
return 0;
}

/**
* Get the number of pending jobs.
*
* @param string|null $queue
* @return int
*/
public function pendingSize($queue = null)
{
return 0;
}

/**
* Get the number of delayed jobs.
*
* @param string|null $queue
* @return int
*/
public function delayedSize($queue = null)
{
return 0;
}

/**
* Get the number of reserved jobs.
*
* @param string|null $queue
* @return int
*/
public function reservedSize($queue = null)
{
return 0;
}

/**
* Get the creation timestamp of the oldest pending job, excluding delayed jobs.
*
* @param string|null $queue
* @return int|null
*/
public function creationTimeOfOldestPendingJob($queue = null)
{
return null;
}

/**
* Push a new job onto the queue.
*
Expand Down
52 changes: 52 additions & 0 deletions src/Illuminate/Queue/RedisQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,58 @@ public function size($queue = null)
);
}

/**
* Get the number of pending jobs.
*
* @param string|null $queue
* @return int
*/
public function pendingSize($queue = null)
{
return $this->getConnection()->llen($this->getQueue($queue));
}

/**
* Get the number of delayed jobs.
*
* @param string|null $queue
* @return int
*/
public function delayedSize($queue = null)
{
return $this->getConnection()->zcard($this->getQueue($queue).':delayed');
}

/**
* Get the number of reserved jobs.
*
* @param string|null $queue
* @return int
*/
public function reservedSize($queue = null)
{
return $this->getConnection()->zcard($this->getQueue($queue).':reserved');
}

/**
* Get the creation timestamp of the oldest pending job, excluding delayed jobs.
*
* @param string|null $queue
* @return int|null
*/
public function creationTimeOfOldestPendingJob($queue = null)
{
$payload = $this->getConnection()->lindex($this->getQueue($queue), 0);

if (! $payload) {
return null;
}

$data = json_decode($payload, true);

return $data['createdAt'] ?? null;
}

/**
* Push an array of jobs onto the queue.
*
Expand Down
72 changes: 70 additions & 2 deletions src/Illuminate/Queue/SqsQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,83 @@ public function __construct(
* @return int
*/
public function size($queue = null)
{
$response = $this->sqs->getQueueAttributes([
'QueueUrl' => $this->getQueue($queue),
'AttributeNames' => [
'ApproximateNumberOfMessages',
'ApproximateNumberOfMessagesDelayed',
'ApproximateNumberOfMessagesNotVisible',
],
]);

$a = $response['Attributes'];

return (int) $a['ApproximateNumberOfMessages']
+ (int) $a['ApproximateNumberOfMessagesDelayed']
+ (int) $a['ApproximateNumberOfMessagesNotVisible'];
}

/**
* Get the number of pending jobs.
*
* @param string|null $queue
* @return int
*/
public function pendingSize($queue = null)
{
$response = $this->sqs->getQueueAttributes([
'QueueUrl' => $this->getQueue($queue),
'AttributeNames' => ['ApproximateNumberOfMessages'],
]);

$attributes = $response->get('Attributes');
return (int) $response['Attributes']['ApproximateNumberOfMessages'] ?? 0;
}

/**
* Get the number of delayed jobs.
*
* @param string|null $queue
* @return int
*/
public function delayedSize($queue = null)
{
$response = $this->sqs->getQueueAttributes([
'QueueUrl' => $this->getQueue($queue),
'AttributeNames' => ['ApproximateNumberOfMessagesDelayed'],
]);

return (int) $response['Attributes']['ApproximateNumberOfMessagesDelayed'] ?? 0;
}

/**
* Get the number of reserved jobs.
*
* @param string|null $queue
* @return int
*/
public function reservedSize($queue = null)
{
$response = $this->sqs->getQueueAttributes([
'QueueUrl' => $this->getQueue($queue),
'AttributeNames' => ['ApproximateNumberOfMessagesNotVisible'],
]);

return (int) $response['Attributes']['ApproximateNumberOfMessagesNotVisible'] ?? 0;
}

return (int) $attributes['ApproximateNumberOfMessages'];
/**
* Get the creation timestamp of the oldest pending job, excluding delayed jobs.
*
* Not supported by SQS, returns null.
*
* @param string|null $queue
* @return int|null
*/
public function creationTimeOfOldestPendingJob($queue = null)
{
// Not supported by SQS...
return null;
}

/**
Expand Down
Loading