diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 8d4af811e2e..e6e38b0c3b1 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -513,6 +513,8 @@ func (s *Server) Open() error { return fmt.Errorf("open points writer: %s", err) } + s.Monitor.WithCompactThroughputLimiter(s.TSDBStore.EngineOptions.CompactionThroughputLimiter) + for _, service := range s.Services { if err := service.Open(); err != nil { return fmt.Errorf("open service: %s", err) diff --git a/monitor/service.go b/monitor/service.go index 56aa821a235..0283e796a72 100644 --- a/monitor/service.go +++ b/monitor/service.go @@ -16,6 +16,7 @@ import ( "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/monitor/diagnostics" + "github.com/influxdata/influxdb/pkg/limiter" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/tsdb" "go.uber.org/zap" @@ -100,7 +101,8 @@ type Monitor struct { // TSDB configuration for diagnostics TSDBConfig *tsdb.Config - Logger *zap.Logger + Logger *zap.Logger + Limiter limiter.Rate } // PointsWriter is a simplified interface for writing the points the monitor gathers. @@ -150,6 +152,15 @@ func (m *Monitor) Open() error { m.RegisterDiagnosticsClient("runtime", &goRuntime{}) m.RegisterDiagnosticsClient("network", &network{}) m.RegisterDiagnosticsClient("system", &system{}) + + if m.Limiter != nil { + m.RegisterDiagnosticsClient("stats", &stats{ + comp: compactThroughputStats{ + limiter: m.Limiter, + }, + }) + } + if m.TSDBConfig != nil { m.RegisterDiagnosticsClient("config", m.TSDBConfig) } @@ -200,6 +211,12 @@ func (m *Monitor) writePoints(p models.Points) error { return nil } +func (m *Monitor) WithCompactThroughputLimiter(limiter limiter.Rate) { + m.mu.Lock() + defer m.mu.Unlock() + m.Limiter = limiter +} + // Close closes the monitor system. func (m *Monitor) Close() error { if !m.open() { @@ -222,6 +239,8 @@ func (m *Monitor) Close() error { m.DeregisterDiagnosticsClient("runtime") m.DeregisterDiagnosticsClient("network") m.DeregisterDiagnosticsClient("system") + m.DeregisterDiagnosticsClient("stats") + m.DeregisterDiagnosticsClient("config") return nil } diff --git a/monitor/stats.go b/monitor/stats.go new file mode 100644 index 00000000000..b49eb87078f --- /dev/null +++ b/monitor/stats.go @@ -0,0 +1,34 @@ +package monitor + +import ( + "math" + + "github.com/influxdata/influxdb/monitor/diagnostics" + "github.com/influxdata/influxdb/pkg/limiter" + "golang.org/x/time/rate" +) + +// stats captures statistics +type stats struct { + comp compactThroughputStats +} + +type compactThroughputStats struct { + limiter limiter.Rate +} + +// CompactThroughputUsage calculates the percentage of burst capacity currently consumed by compaction. +func (s *stats) CompactThroughputUsage() float64 { + percentage := 100 * (1 - rate.Limit(s.comp.limiter.Tokens())/s.comp.limiter.Limit()) + return float64(percentage) +} + +func (s *stats) Diagnostics() (*diagnostics.Diagnostics, error) { + compactThroughputUsage := s.CompactThroughputUsage() + compactThroughputUsageTrunc := math.Round(compactThroughputUsage*100.0) / 100.0 + d := map[string]interface{}{ + "compact-throughput-usage-percentage": compactThroughputUsageTrunc, + } + + return diagnostics.RowFromMap(d), nil +} diff --git a/monitor/stats_test.go b/monitor/stats_test.go new file mode 100644 index 00000000000..1971fa3f42b --- /dev/null +++ b/monitor/stats_test.go @@ -0,0 +1,31 @@ +package monitor_test + +import ( + "testing" + + "github.com/influxdata/influxdb/monitor" + "github.com/influxdata/influxdb/pkg/limiter" + "github.com/influxdata/influxdb/tsdb" + "github.com/stretchr/testify/require" +) + +func TestDiagnostics_Stats(t *testing.T) { + s := monitor.New(nil, monitor.Config{}, &tsdb.Config{}) + compactLimiter := limiter.NewRate(100, 100) + + s.WithCompactThroughputLimiter(compactLimiter) + + require.NoError(t, s.Open(), "opening monitor") + defer func() { + require.NoError(t, s.Close(), "closing monitor") + }() + + d, err := s.Diagnostics() + require.NoError(t, err, "getting diagnostics") + + diags, ok := d["stats"] + require.True(t, ok, "expected stats diagnostic client to be registered") + + got, exp := diags.Columns, []string{"compact-throughput-usage-percentage"} + require.Equal(t, exp, got) +} diff --git a/pkg/limiter/writer.go b/pkg/limiter/writer.go index 4beb0dfe370..39816550aaf 100644 --- a/pkg/limiter/writer.go +++ b/pkg/limiter/writer.go @@ -18,6 +18,8 @@ type Writer struct { type Rate interface { WaitN(ctx context.Context, n int) error Burst() int + Tokens() float64 + Limit() rate.Limit } func NewRate(bytesPerSec, burstLimit int) Rate { diff --git a/services/httpd/handler.go b/services/httpd/handler.go index d12e9dab857..59cc9fce870 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -2327,6 +2327,30 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) { } } + if val := diags["stats"]; val != nil { + if len(val.Rows) > 0 && len(val.Columns) > 0 { + // Create a map of column names to values + statsMap := make(map[string]interface{}) + for i, col := range val.Columns { + if i < len(val.Rows[0]) { + statsMap[col] = val.Rows[0][i] + } + } + + data, err := json.Marshal(statsMap) + if err != nil { + h.httpError(w, err.Error(), http.StatusInternalServerError) + return + } + + if !first { + fmt.Fprintln(w, ",") + } + first = false + fmt.Fprintf(w, "\"stats\": %s", data) + } + } + // We're going to print some kind of crypto data, we just // need to find the proper source for it. {