Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions api/datastore/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const routesTableCreate = `CREATE TABLE IF NOT EXISTS routes (
maxc int NOT NULL,
memory int NOT NULL,
timeout int NOT NULL,
idle_timeout int NOT NULL,
type varchar(16) NOT NULL,
headers text NOT NULL,
config text NOT NULL,
Expand All @@ -39,7 +40,7 @@ const extrasTableCreate = `CREATE TABLE IF NOT EXISTS extras (
value varchar(256) NOT NULL
);`

const routeSelector = `SELECT app_name, path, image, format, maxc, memory, type, timeout, headers, config FROM routes`
const routeSelector = `SELECT app_name, path, image, format, maxc, memory, type, timeout, idle_timeout, headers, config FROM routes`

type rowScanner interface {
Scan(dest ...interface{}) error
Expand Down Expand Up @@ -302,10 +303,11 @@ func (ds *MySQLDatastore) InsertRoute(ctx context.Context, route *models.Route)
memory,
type,
timeout,
idle_timeout,
headers,
config
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`,
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`,
route.AppName,
route.Path,
route.Image,
Expand All @@ -314,6 +316,7 @@ func (ds *MySQLDatastore) InsertRoute(ctx context.Context, route *models.Route)
route.Memory,
route.Type,
route.Timeout,
route.IdleTimeout,
string(hbyte),
string(cbyte),
)
Expand Down Expand Up @@ -359,6 +362,7 @@ func (ds *MySQLDatastore) UpdateRoute(ctx context.Context, newroute *models.Rout
memory = ?,
type = ?,
timeout = ?,
idle_timeout = ?,
headers = ?,
config = ?
WHERE app_name = ? AND path = ?;`,
Expand All @@ -368,6 +372,7 @@ func (ds *MySQLDatastore) UpdateRoute(ctx context.Context, newroute *models.Rout
route.Memory,
route.Type,
route.Timeout,
route.IdleTimeout,
string(hbyte),
string(cbyte),
route.AppName,
Expand Down Expand Up @@ -431,6 +436,7 @@ func scanRoute(scanner rowScanner, route *models.Route) error {
&route.Memory,
&route.Type,
&route.Timeout,
&route.IdleTimeout,
&headerStr,
&configStr,
)
Expand Down
14 changes: 10 additions & 4 deletions api/datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ CREATE TABLE IF NOT EXISTS routes (
maxc integer NOT NULL,
memory integer NOT NULL,
timeout integer NOT NULL,
idle_timeout integer NOT NULL,
type character varying(16) NOT NULL,
headers text NOT NULL,
config text NOT NULL,
Expand All @@ -41,7 +42,7 @@ const extrasTableCreate = `CREATE TABLE IF NOT EXISTS extras (
value character varying(256) NOT NULL
);`

const routeSelector = `SELECT app_name, path, image, format, maxc, memory, type, timeout, headers, config FROM routes`
const routeSelector = `SELECT app_name, path, image, format, maxc, memory, type, timeout, idle_timeout, headers, config FROM routes`

type rowScanner interface {
Scan(dest ...interface{}) error
Expand Down Expand Up @@ -274,10 +275,11 @@ func (ds *PostgresDatastore) InsertRoute(ctx context.Context, route *models.Rout
memory,
type,
timeout,
idle_timeout,
headers,
config
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);`,
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11);`,
route.AppName,
route.Path,
route.Image,
Expand All @@ -286,6 +288,7 @@ func (ds *PostgresDatastore) InsertRoute(ctx context.Context, route *models.Rout
route.Memory,
route.Type,
route.Timeout,
route.IdleTimeout,
string(hbyte),
string(cbyte),
)
Expand Down Expand Up @@ -329,8 +332,9 @@ func (ds *PostgresDatastore) UpdateRoute(ctx context.Context, newroute *models.R
memory = $6,
type = $7,
timeout = $8,
headers = $9,
config = $10
idle_timeout = $9,
headers = $10,
config = $11
WHERE app_name = $1 AND path = $2;`,
route.AppName,
route.Path,
Expand All @@ -340,6 +344,7 @@ func (ds *PostgresDatastore) UpdateRoute(ctx context.Context, newroute *models.R
route.Memory,
route.Type,
route.Timeout,
route.IdleTimeout,
string(hbyte),
string(cbyte),
)
Expand Down Expand Up @@ -398,6 +403,7 @@ func scanRoute(scanner rowScanner, route *models.Route) error {
&route.Memory,
&route.Type,
&route.Timeout,
&route.IdleTimeout,
&headerStr,
&configStr,
)
Expand Down
5 changes: 5 additions & 0 deletions api/models/new_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ type NewTask struct {

*/
Timeout *int32 `json:"timeout,omitempty"`

/* Hot function idle timeout in seconds before termination.

*/
IdleTimeout *int32 `json:"idle_timeout,omitempty"`
}

// Validate validates this new task
Expand Down
14 changes: 14 additions & 0 deletions api/models/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

const (
defaultRouteTimeout = 30 // seconds
htfnScaleDownTimeout = 30 // seconds
)

var (
Expand Down Expand Up @@ -39,6 +40,7 @@ type Route struct {
Format string `json:"format"`
MaxConcurrency int `json:"max_concurrency"`
Timeout int32 `json:"timeout"`
IdleTimeout int32 `json:"idle_timeout"`
Config `json:"config"`
}

Expand All @@ -54,6 +56,7 @@ var (
ErrRoutesValidationMissingType = errors.New("Missing route Type")
ErrRoutesValidationPathMalformed = errors.New("Path malformed")
ErrRoutesValidationNegativeTimeout = errors.New("Negative timeout")
ErrRoutesValidationNegativeIdleTimeout = errors.New("Negative idle timeout")
ErrRoutesValidationNegativeMaxConcurrency = errors.New("Negative MaxConcurrency")
)

Expand Down Expand Up @@ -86,6 +89,10 @@ func (r *Route) SetDefaults() {
if r.Timeout == 0 {
r.Timeout = defaultRouteTimeout
}

//if r.IdleTimeout == 0 {
// r.IdleTimeout = htfnScaleDownTimeout
//}
}

// Validate validates field values, skipping zeroed fields if skipZero is true.
Expand Down Expand Up @@ -141,6 +148,10 @@ func (r *Route) Validate(skipZero bool) error {
res = append(res, ErrRoutesValidationNegativeTimeout)
}

if r.IdleTimeout < 0 {
res = append(res, ErrRoutesValidationNegativeIdleTimeout)
}

if len(res) > 0 {
return apiErrors.CompositeValidationError(res...)
}
Expand Down Expand Up @@ -171,6 +182,9 @@ func (r *Route) Update(new *Route) {
if new.Timeout != 0 {
r.Timeout = new.Timeout
}
if new.IdleTimeout != 0 {
r.IdleTimeout = new.IdleTimeout
}
if new.Format != "" {
r.Format = new.Format
}
Expand Down
6 changes: 5 additions & 1 deletion api/runner/async_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,18 @@ func getTask(ctx context.Context, url string) (*models.Task, error) {
}

func getCfg(t *models.Task) *task.Config {
timeout := int32(30)
if t.Timeout == nil {
timeout := int32(30)
t.Timeout = &timeout
}
if t.IdleTimeout == nil {
t.IdleTimeout = &timeout
}

cfg := &task.Config{
Image: *t.Image,
Timeout: time.Duration(*t.Timeout) * time.Second,
IdleTimeout: time.Duration(*t.IdleTimeout) * time.Second,
ID: t.ID,
AppName: t.AppName,
Env: t.EnvVars,
Expand Down
3 changes: 2 additions & 1 deletion api/runner/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func (t *containerTask) Id() string { return t.cfg.ID }
func (t *containerTask) Route() string { return "" }
func (t *containerTask) Image() string { return t.cfg.Image }
func (t *containerTask) Timeout() time.Duration { return t.cfg.Timeout }
func (t *containerTask) Logger() (stdout, stderr io.Writer) { return t.cfg.Stdout, t.cfg.Stderr }
func (t *containerTask) IdleTimeout() time.Duration { return t.cfg.IdleTimeout }
func (t *containerTask) Logger() (io.Writer, io.Writer) { return t.cfg.Stdout, t.cfg.Stderr }
func (t *containerTask) Volumes() [][2]string { return [][2]string{} }
func (t *containerTask) WorkDir() string { return "" }

Expand Down
19 changes: 10 additions & 9 deletions api/runner/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import (
)

type Config struct {
ID string
Path string
Image string
Timeout time.Duration
AppName string
Memory uint64
Env map[string]string
Format string
MaxConcurrency int
ID string
Path string
Image string
Timeout time.Duration
IdleTimeout time.Duration
AppName string
Memory uint64
Env map[string]string
Format string
MaxConcurrency int

Stdin io.Reader
Stdout io.Writer
Expand Down
29 changes: 14 additions & 15 deletions api/runner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ import (
// Terminate
// (internal clock)

const (
// Terminate hot function after this timeout
htfnScaleDownTimeout = 30 * time.Second
)

// RunTask helps sending a task.Request into the common concurrency stream.
// Refer to StartWorkers() to understand what this is about.
Expand Down Expand Up @@ -264,17 +260,29 @@ func newhtfn(cfg *task.Config, proto protocol.Protocol, tasks <-chan task.Reques
func (hc *htfn) serve(ctx context.Context) {
lctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
cfg := *hc.cfg
logger := logrus.WithFields(logrus.Fields{
"app": cfg.AppName,
"route": cfg.Path,
"image": cfg.Image,
"memory": cfg.Memory,
"format": cfg.Format,
"max_concurrency": cfg.MaxConcurrency,
"idle_timeout": cfg.IdleTimeout,
})

wg.Add(1)
go func() {
defer wg.Done()
for {
inactivity := time.After(htfnScaleDownTimeout)
inactivity := time.After(cfg.IdleTimeout)

select {
case <-lctx.Done():
return

case <-inactivity:
logger.Info("Canceling inactive hot function")
cancel()

case t := <-hc.tasks:
Expand All @@ -295,7 +303,6 @@ func (hc *htfn) serve(ctx context.Context) {
}
}()

cfg := *hc.cfg
cfg.Env["FN_FORMAT"] = cfg.Format
cfg.Timeout = 0 // add a timeout to simulate ab.end. failure.
cfg.Stdin = hc.containerIn
Expand Down Expand Up @@ -324,22 +331,14 @@ func (hc *htfn) serve(ctx context.Context) {
defer wg.Done()
scanner := bufio.NewScanner(errr)
for scanner.Scan() {
logrus.WithFields(logrus.Fields{
"app": cfg.AppName,
"route": cfg.Path,
"image": cfg.Image,
"memory": cfg.Memory,
"format": cfg.Format,
"max_concurrency": cfg.MaxConcurrency,
}).Info(scanner.Text())
logger.Info(scanner.Text())
}
}()

result, err := hc.rnr.Run(lctx, &cfg)
if err != nil {
logrus.WithError(err).Error("hot function failure detected")
}
cancel()
errw.Close()
wg.Wait()
logrus.WithField("result", result).Info("hot function terminated")
Expand Down
23 changes: 12 additions & 11 deletions api/server/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,18 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, foun
}

cfg := &task.Config{
AppName: appName,
Path: found.Path,
Env: envVars,
Format: found.Format,
ID: reqID,
Image: found.Image,
MaxConcurrency: found.MaxConcurrency,
Memory: found.Memory,
Stdin: payload,
Stdout: &stdout,
Timeout: time.Duration(found.Timeout) * time.Second,
AppName: appName,
Path: found.Path,
Env: envVars,
Format: found.Format,
ID: reqID,
Image: found.Image,
MaxConcurrency: found.MaxConcurrency,
Memory: found.Memory,
Stdin: payload,
Stdout: &stdout,
Timeout: time.Duration(found.Timeout) * time.Second,
IdleTimeout: time.Duration(found.IdleTimeout) * time.Second,
}

s.Runner.Enqueue()
Expand Down
Loading