Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ SHELLHUB_ASYNQ_GROUP_GRACE_PERIOD=2
# The maximum number of tasks that can be aggregated together by Asynq.
SHELLHUB_ASYNQ_GROUP_MAX_SIZE=1000

# Defines the maximum duration, in hours, for which a unique job remains locked in the queue.
# If the job does not complete within this timeout, the lock is released, allowing a new instance
# of the job to be enqueued and executed.
#
# A value lower than or equal to 0 disables the uniqueness.
SHELLHUB_ASYNQ_UNIQUENESS_TIMEOUT=24

# Allow SSH connections with an agent via a public key for versions below 0.6.0.
# Values: true, false
SHELLHUB_ALLOW_PUBLIC_KEY_ACCESS_BELLOW_0_6_0=false
Expand Down
12 changes: 11 additions & 1 deletion api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ type config struct {
// Check [https://github.com/hibiken/asynq/wiki/Task-aggregation] for more information.
AsynqGroupMaxSize int `env:"ASYNQ_GROUP_MAX_SIZE,default=1000"`

// AsynqUniquenessTimeout defines the maximum duration, in hours, for which a unique job
// remains locked in the queue. If the job does not complete within this timeout, the lock
// is released, allowing a new instance of the job to be enqueued and executed.
AsynqUniquenessTimeout int `env:"ASYNQ_UNIQUENESS_TIMEOUT,default=24"`

// GeoipMirror specifies an alternative mirror URL for downloading the GeoIP databases.
// This field takes precedence over [GeoipMaxmindLicense]; when both are configured,
// GeoipMirror will be used as the primary source for database downloads.
Expand Down Expand Up @@ -191,7 +196,12 @@ func startServer(ctx context.Context, cfg *config, store store.Store, cache stor
routerOptions = append(routerOptions, routes.WithReporter(reporter))
}

worker := asynq.NewServer(cfg.RedisURI, asynq.BatchConfig(cfg.AsynqGroupMaxSize, cfg.AsynqGroupMaxDelay, int(cfg.AsynqGroupGracePeriod)))
worker := asynq.NewServer(
cfg.RedisURI,
asynq.BatchConfig(cfg.AsynqGroupMaxSize, cfg.AsynqGroupMaxDelay, int(cfg.AsynqGroupGracePeriod)),
asynq.UniquenessTimeout(cfg.AsynqUniquenessTimeout),
)

worker.HandleTask(services.TaskDevicesHeartbeat, service.DevicesHeartbeat(), asynq.BatchTask())

if err := worker.Start(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ services:
- ASYNQ_GROUP_MAX_DELAY=${SHELLHUB_ASYNQ_GROUP_MAX_DELAY}
- ASYNQ_GROUP_GRACE_PERIOD=${SHELLHUB_ASYNQ_GROUP_GRACE_PERIOD}
- ASYNQ_GROUP_MAX_SIZE=${SHELLHUB_ASYNQ_GROUP_MAX_SIZE}
- ASYNQ_UNIQUENESS_TIMEOUT=${SHELLHUB_ASYNQ_UNIQUENESS_TIMEOUT}
- REDIS_CACHE_POOL_SIZE=${SHELLHUB_REDIS_CACHE_POOL_SIZE}
- MAXIMUM_ACCOUNT_LOCKOUT=${SHELLHUB_MAXIMUM_ACCOUNT_LOCKOUT}
depends_on:
Expand Down
14 changes: 14 additions & 0 deletions pkg/worker/asynq/cron.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package asynq

import (
"github.com/shellhub-io/shellhub/pkg/worker"
)

// Unique configures a cron job to prevent concurrent processing.
// When enabled, the job will not be enqueued or executed again until it completes
// or the timeout specified in `SHELLHUB_ASYNQ_UNIQUENESS_TIMEOUT` is reached.
func Unique() worker.CronjobOption {
return func(c *worker.Cronjob) {
c.Unique = true
}
}
30 changes: 23 additions & 7 deletions pkg/worker/asynq/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,24 @@ func BatchConfig(maxSize, maxDelay, gracePeriod int) ServerOption {
}
}

// UniquenessTimeout defines the maximum duration, in hours, for which a unique job remains locked
// in the queue. If the job does not complete within this timeout, the lock is released, allowing
// a new instance of the job to be enqueued and executed.
func UniquenessTimeout(timeout int) ServerOption {
return func(s *server) error {
s.uniquenessTimeout = timeout

return nil
}
}

type server struct {
redisURI string
asynqSrv *asynq.Server
asynqMux *asynq.ServeMux
asynqSch *asynq.Scheduler
batchConfig *batchConfig
redisURI string
asynqSrv *asynq.Server
asynqMux *asynq.ServeMux
asynqSch *asynq.Scheduler
batchConfig *batchConfig
uniquenessTimeout int

queues queues
tasks []worker.Task
Expand Down Expand Up @@ -77,7 +89,7 @@ func (s *server) HandleTask(pattern worker.TaskPattern, handler worker.TaskHandl
s.tasks = append(s.tasks, task)
}

func (s *server) HandleCron(spec worker.CronSpec, handler worker.CronHandler) {
func (s *server) HandleCron(spec worker.CronSpec, handler worker.CronHandler, opts ...worker.CronjobOption) {
spec.MustValidate()

cronjob := worker.Cronjob{
Expand All @@ -86,6 +98,10 @@ func (s *server) HandleCron(spec worker.CronSpec, handler worker.CronHandler) {
Handler: handler,
}

for _, opt := range opts {
opt(&cronjob)
}

s.cronjobs = append(s.cronjobs, cronjob)
}

Expand Down Expand Up @@ -137,7 +153,7 @@ func (s *server) setupAsynq() error {
for _, c := range s.cronjobs {
s.asynqMux.HandleFunc(c.Identifier, cronToAsynq(c.Handler))
task := asynq.NewTask(c.Identifier, nil, asynq.Queue(cronQueue))
if _, err := s.asynqSch.Register(c.Spec.String(), task); err != nil {
if _, err := s.asynqSch.Register(c.Spec.String(), task, buildCronOptions(s, &c)...); err != nil {
return worker.ErrHandleCronFailed
}
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/worker/asynq/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ func cronToAsynq(h worker.CronHandler) func(context.Context, *asynq.Task) error
}
}

// buildCronOptions generates a slice of asynq.Options for configuring a cron job.
func buildCronOptions(s *server, c *worker.Cronjob) []asynq.Option {
opts := make([]asynq.Option, 0)

if c.Unique && s.uniquenessTimeout > 0 {
opts = append(opts, asynq.Unique(time.Duration(s.uniquenessTimeout)*time.Hour))
}

return opts
}

// taskToAsynq converts a [github.com/shellhub-io/shellhub/pkg/api/worker.TaskHandler] to an asynq handler.
func taskToAsynq(h worker.TaskHandler) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, task *asynq.Task) error {
Expand Down
4 changes: 4 additions & 0 deletions pkg/worker/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,8 @@ type Cronjob struct {
Spec CronSpec
// Handler is the callback function that will be executed when the cron specification is met.
Handler CronHandler
// Unique defines whether the task cannot be perfomed concurrently.
Unique bool
}

type CronjobOption func(c *Cronjob)
2 changes: 1 addition & 1 deletion pkg/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type Server interface {
// HandleCron executes the cronFunc every time the cron specification is met.
//
// It panics if the cron specification is invalid.
HandleCron(spec CronSpec, cronFunc CronHandler)
HandleCron(spec CronSpec, cronFunc CronHandler, opts ...CronjobOption)
// Start initializes and starts the worker in a non-blocking manner. The server is
// turned off whedn the context was done.
//
Expand Down
Loading