Skip to content
2 changes: 2 additions & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,8 @@ func (s *Server) Open() error {
return fmt.Errorf("open points writer: %s", err)
}

s.Monitor.WithCompactThroughputLimiter(s.TSDBStore.EngineOptions.CompactionThroughputLimiter)

for _, service := range s.Services {
if err := service.Open(); err != nil {
return fmt.Errorf("open service: %s", err)
Expand Down
21 changes: 20 additions & 1 deletion monitor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor/diagnostics"
"github.com/influxdata/influxdb/pkg/limiter"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
"go.uber.org/zap"
Expand Down Expand Up @@ -100,7 +101,8 @@ type Monitor struct {
// TSDB configuration for diagnostics
TSDBConfig *tsdb.Config

Logger *zap.Logger
Logger *zap.Logger
Limiter limiter.Rate
}

// PointsWriter is a simplified interface for writing the points the monitor gathers.
Expand Down Expand Up @@ -150,6 +152,15 @@ func (m *Monitor) Open() error {
m.RegisterDiagnosticsClient("runtime", &goRuntime{})
m.RegisterDiagnosticsClient("network", &network{})
m.RegisterDiagnosticsClient("system", &system{})

if m.Limiter != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a lock around the accesses to m.limiter?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or are we assuming Open is safely single-threaded?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe Open should be single threaded. It's called during Server.Open

	for _, service := range s.Services {
		if err := service.Open(); err != nil {
			return fmt.Errorf("open service: %s", err)
		}
	}

func (s *Server) Open() error {

Which is called from within Main.Run.

if err := cmd.Run(args...); err != nil {
return fmt.Errorf("run: %s", err)
}

m.RegisterDiagnosticsClient("stats", &stats{
comp: compactThroughputStats{
limiter: m.Limiter,
},
})
}

if m.TSDBConfig != nil {
m.RegisterDiagnosticsClient("config", m.TSDBConfig)
}
Expand Down Expand Up @@ -200,6 +211,12 @@ func (m *Monitor) writePoints(p models.Points) error {
return nil
}

func (m *Monitor) WithCompactThroughputLimiter(limiter limiter.Rate) {
m.mu.Lock()
defer m.mu.Unlock()
m.Limiter = limiter
}

// Close closes the monitor system.
func (m *Monitor) Close() error {
if !m.open() {
Expand All @@ -222,6 +239,8 @@ func (m *Monitor) Close() error {
m.DeregisterDiagnosticsClient("runtime")
m.DeregisterDiagnosticsClient("network")
m.DeregisterDiagnosticsClient("system")
m.DeregisterDiagnosticsClient("stats")
m.DeregisterDiagnosticsClient("config")
Copy link
Contributor

@davidby-influx davidby-influx Aug 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this missing from a previous PR?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

return nil
}

Expand Down
34 changes: 34 additions & 0 deletions monitor/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package monitor

import (
"math"

"github.com/influxdata/influxdb/monitor/diagnostics"
"github.com/influxdata/influxdb/pkg/limiter"
"golang.org/x/time/rate"
)

// stats captures statistics
type stats struct {
comp compactThroughputStats
}

type compactThroughputStats struct {
limiter limiter.Rate
}

// CompactThroughputUsage calculates the percentage of burst capacity currently consumed by compaction.
func (s *stats) CompactThroughputUsage() float64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only calculates the burst percentage (maximum tokens that can be requested). But we also need the overall percentage used of the limit over time (Limiter.Limit), don't we?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can likely generate two fields

compact-throughput-burst-usage

compact-throughput-usage

And return the usage of our burst limit and our standard limit?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the burst limiter is much less interesting; it's going to be highly variable moment to moment, and what I think Support wants is to answer the question: are we, on average, running up against our compaction throughput limit? Check with the FR author on the requirements.

Copy link
Author

@devanbenz devanbenz Aug 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment on the FR.

The complex part of this would be tracking a time window that's meaningful for grabbing our bytes per second. Running curl and getting this data at any moment in time would likely be meaningless without a moving average as far as I'm aware.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will see what Andrew's input is

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the correct computation on this is

100*(1-rate.Limit(l.Tokens())/l.Limit())

See this Go Playground for an example, and how token debt is handled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we wanting the percentage to go over 100%, or be limited to 100%? The limiter should keep the actual usage more or less at or under 100%.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be okay to go over 100%.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused about how going above 100% should be interpreted. Is there an EAR or feature request that drove this feature? Maybe if I see that it will make more sense to me.

percentage := 100 * (1 - rate.Limit(s.comp.limiter.Tokens())/s.comp.limiter.Limit())
return float64(percentage)
}

func (s *stats) Diagnostics() (*diagnostics.Diagnostics, error) {
compactThroughputUsage := s.CompactThroughputUsage()
compactThroughputUsageTrunc := math.Round(compactThroughputUsage*100.0) / 100.0
d := map[string]interface{}{
"compact-throughput-usage-percentage": compactThroughputUsageTrunc,
}

return diagnostics.RowFromMap(d), nil
}
31 changes: 31 additions & 0 deletions monitor/stats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package monitor_test

import (
"testing"

"github.com/influxdata/influxdb/monitor"
"github.com/influxdata/influxdb/pkg/limiter"
"github.com/influxdata/influxdb/tsdb"
"github.com/stretchr/testify/require"
)

func TestDiagnostics_Stats(t *testing.T) {
s := monitor.New(nil, monitor.Config{}, &tsdb.Config{})
compactLimiter := limiter.NewRate(100, 100)

s.WithCompactThroughputLimiter(compactLimiter)

require.NoError(t, s.Open(), "opening monitor")
defer func() {
require.NoError(t, s.Close(), "closing monitor")
}()

d, err := s.Diagnostics()
require.NoError(t, err, "getting diagnostics")

diags, ok := d["stats"]
require.True(t, ok, "expected stats diagnostic client to be registered")

got, exp := diags.Columns, []string{"compact-throughput-usage-percentage"}
require.Equal(t, exp, got)
}
2 changes: 2 additions & 0 deletions pkg/limiter/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type Writer struct {
type Rate interface {
WaitN(ctx context.Context, n int) error
Burst() int
Tokens() float64
Limit() rate.Limit
}

func NewRate(bytesPerSec, burstLimit int) Rate {
Expand Down
24 changes: 24 additions & 0 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2327,6 +2327,30 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) {
}
}

if val := diags["stats"]; val != nil {
if len(val.Rows) > 0 && len(val.Columns) > 0 {
// Create a map of column names to values
statsMap := make(map[string]interface{})
for i, col := range val.Columns {
if i < len(val.Rows[0]) {
statsMap[col] = val.Rows[0][i]
}
}

data, err := json.Marshal(statsMap)
if err != nil {
h.httpError(w, err.Error(), http.StatusInternalServerError)
return
}

if !first {
fmt.Fprintln(w, ",")
}
first = false
fmt.Fprintf(w, "\"stats\": %s", data)
}
}

// We're going to print some kind of crypto data, we just
// need to find the proper source for it.
{
Expand Down