Skip to content

Commit f64fb05

Browse files
heiytorgustavosbarreto
authored andcommitted
feat(api): add support for unique cron jobs
Introduce functionality to configure cron jobs as unique, preventing concurrent processing of the same job. When a job is marked as unique, it will not be enqueued or executed again until it completes or reaches a specified timeout. Added a new environment variable `SHELLHUB_ASYNQ_UNIQUENESS_TIMEOUT` to control the maximum duration (in hours) a job can lock the queue. This timeout ensures that stalled jobs do not block subsequent executions indefinitely.
1 parent 0075f9b commit f64fb05

File tree

8 files changed

+72
-9
lines changed

8 files changed

+72
-9
lines changed

.env

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,13 @@ SHELLHUB_ASYNQ_GROUP_GRACE_PERIOD=2
116116
# The maximum number of tasks that can be aggregated together by Asynq.
117117
SHELLHUB_ASYNQ_GROUP_MAX_SIZE=1000
118118

119+
# Defines the maximum duration, in hours, for which a unique job remains locked in the queue.
120+
# If the job does not complete within this timeout, the lock is released, allowing a new instance
121+
# of the job to be enqueued and executed.
122+
#
123+
# A value lower than or equal to 0 disables the uniqueness.
124+
SHELLHUB_ASYNQ_UNIQUENESS_TIMEOUT=24
125+
119126
# Allow SSH connections with an agent via a public key for versions below 0.6.0.
120127
# Values: true, false
121128
SHELLHUB_ALLOW_PUBLIC_KEY_ACCESS_BELLOW_0_6_0=false

api/server.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ type config struct {
106106
// Check [https://github.com/hibiken/asynq/wiki/Task-aggregation] for more information.
107107
AsynqGroupMaxSize int `env:"ASYNQ_GROUP_MAX_SIZE,default=1000"`
108108

109+
// AsynqUniquenessTimeout defines the maximum duration, in hours, for which a unique job
110+
// remains locked in the queue. If the job does not complete within this timeout, the lock
111+
// is released, allowing a new instance of the job to be enqueued and executed.
112+
AsynqUniquenessTimeout int `env:"ASYNQ_UNIQUENESS_TIMEOUT,default=24"`
113+
109114
// GeoipMirror specifies an alternative mirror URL for downloading the GeoIP databases.
110115
// This field takes precedence over [GeoipMaxmindLicense]; when both are configured,
111116
// GeoipMirror will be used as the primary source for database downloads.
@@ -191,7 +196,12 @@ func startServer(ctx context.Context, cfg *config, store store.Store, cache stor
191196
routerOptions = append(routerOptions, routes.WithReporter(reporter))
192197
}
193198

194-
worker := asynq.NewServer(cfg.RedisURI, asynq.BatchConfig(cfg.AsynqGroupMaxSize, cfg.AsynqGroupMaxDelay, int(cfg.AsynqGroupGracePeriod)))
199+
worker := asynq.NewServer(
200+
cfg.RedisURI,
201+
asynq.BatchConfig(cfg.AsynqGroupMaxSize, cfg.AsynqGroupMaxDelay, int(cfg.AsynqGroupGracePeriod)),
202+
asynq.UniquenessTimeout(cfg.AsynqUniquenessTimeout),
203+
)
204+
195205
worker.HandleTask(services.TaskDevicesHeartbeat, service.DevicesHeartbeat(), asynq.BatchTask())
196206

197207
if err := worker.Start(); err != nil {

docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ services:
4747
- ASYNQ_GROUP_MAX_DELAY=${SHELLHUB_ASYNQ_GROUP_MAX_DELAY}
4848
- ASYNQ_GROUP_GRACE_PERIOD=${SHELLHUB_ASYNQ_GROUP_GRACE_PERIOD}
4949
- ASYNQ_GROUP_MAX_SIZE=${SHELLHUB_ASYNQ_GROUP_MAX_SIZE}
50+
- ASYNQ_UNIQUENESS_TIMEOUT=${SHELLHUB_ASYNQ_UNIQUENESS_TIMEOUT}
5051
- REDIS_CACHE_POOL_SIZE=${SHELLHUB_REDIS_CACHE_POOL_SIZE}
5152
- MAXIMUM_ACCOUNT_LOCKOUT=${SHELLHUB_MAXIMUM_ACCOUNT_LOCKOUT}
5253
depends_on:

pkg/worker/asynq/cron.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package asynq
2+
3+
import (
4+
"github.com/shellhub-io/shellhub/pkg/worker"
5+
)
6+
7+
// Unique configures a cron job to prevent concurrent processing.
8+
// When enabled, the job will not be enqueued or executed again until it completes
9+
// or the timeout specified in `SHELLHUB_ASYNQ_UNIQUENESS_TIMEOUT` is reached.
10+
func Unique() worker.CronjobOption {
11+
return func(c *worker.Cronjob) {
12+
c.Unique = true
13+
}
14+
}

pkg/worker/asynq/server.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,24 @@ func BatchConfig(maxSize, maxDelay, gracePeriod int) ServerOption {
3232
}
3333
}
3434

35+
// UniquenessTimeout defines the maximum duration, in hours, for which a unique job remains locked
36+
// in the queue. If the job does not complete within this timeout, the lock is released, allowing
37+
// a new instance of the job to be enqueued and executed.
38+
func UniquenessTimeout(timeout int) ServerOption {
39+
return func(s *server) error {
40+
s.uniquenessTimeout = timeout
41+
42+
return nil
43+
}
44+
}
45+
3546
type server struct {
36-
redisURI string
37-
asynqSrv *asynq.Server
38-
asynqMux *asynq.ServeMux
39-
asynqSch *asynq.Scheduler
40-
batchConfig *batchConfig
47+
redisURI string
48+
asynqSrv *asynq.Server
49+
asynqMux *asynq.ServeMux
50+
asynqSch *asynq.Scheduler
51+
batchConfig *batchConfig
52+
uniquenessTimeout int
4153

4254
queues queues
4355
tasks []worker.Task
@@ -77,7 +89,7 @@ func (s *server) HandleTask(pattern worker.TaskPattern, handler worker.TaskHandl
7789
s.tasks = append(s.tasks, task)
7890
}
7991

80-
func (s *server) HandleCron(spec worker.CronSpec, handler worker.CronHandler) {
92+
func (s *server) HandleCron(spec worker.CronSpec, handler worker.CronHandler, opts ...worker.CronjobOption) {
8193
spec.MustValidate()
8294

8395
cronjob := worker.Cronjob{
@@ -86,6 +98,10 @@ func (s *server) HandleCron(spec worker.CronSpec, handler worker.CronHandler) {
8698
Handler: handler,
8799
}
88100

101+
for _, opt := range opts {
102+
opt(&cronjob)
103+
}
104+
89105
s.cronjobs = append(s.cronjobs, cronjob)
90106
}
91107

@@ -137,7 +153,7 @@ func (s *server) setupAsynq() error {
137153
for _, c := range s.cronjobs {
138154
s.asynqMux.HandleFunc(c.Identifier, cronToAsynq(c.Handler))
139155
task := asynq.NewTask(c.Identifier, nil, asynq.Queue(cronQueue))
140-
if _, err := s.asynqSch.Register(c.Spec.String(), task); err != nil {
156+
if _, err := s.asynqSch.Register(c.Spec.String(), task, buildCronOptions(s, &c)...); err != nil {
141157
return worker.ErrHandleCronFailed
142158
}
143159
}

pkg/worker/asynq/utils.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,17 @@ func cronToAsynq(h worker.CronHandler) func(context.Context, *asynq.Task) error
4848
}
4949
}
5050

51+
// buildCronOptions generates a slice of asynq.Options for configuring a cron job.
52+
func buildCronOptions(s *server, c *worker.Cronjob) []asynq.Option {
53+
opts := make([]asynq.Option, 0)
54+
55+
if c.Unique && s.uniquenessTimeout > 0 {
56+
opts = append(opts, asynq.Unique(time.Duration(s.uniquenessTimeout)*time.Hour))
57+
}
58+
59+
return opts
60+
}
61+
5162
// taskToAsynq converts a [github.com/shellhub-io/shellhub/pkg/api/worker.TaskHandler] to an asynq handler.
5263
func taskToAsynq(h worker.TaskHandler) func(context.Context, *asynq.Task) error {
5364
return func(ctx context.Context, task *asynq.Task) error {

pkg/worker/cron.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,8 @@ type Cronjob struct {
3838
Spec CronSpec
3939
// Handler is the callback function that will be executed when the cron specification is met.
4040
Handler CronHandler
41+
// Unique defines whether the task cannot be perfomed concurrently.
42+
Unique bool
4143
}
44+
45+
type CronjobOption func(c *Cronjob)

pkg/worker/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ type Server interface {
1212
// HandleCron executes the cronFunc every time the cron specification is met.
1313
//
1414
// It panics if the cron specification is invalid.
15-
HandleCron(spec CronSpec, cronFunc CronHandler)
15+
HandleCron(spec CronSpec, cronFunc CronHandler, opts ...CronjobOption)
1616
// Start initializes and starts the worker in a non-blocking manner. The server is
1717
// turned off whedn the context was done.
1818
//

0 commit comments

Comments
 (0)