diff --git a/collector/pg_stat_statements.go b/collector/pg_stat_statements.go index d9a29ea65..824605190 100644 --- a/collector/pg_stat_statements.go +++ b/collector/pg_stat_statements.go @@ -24,11 +24,15 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -const statStatementsSubsystem = "stat_statements" +const ( + statStatementsSubsystem = "stat_statements" + defaultStatementLimit = "100" +) var ( includeQueryFlag *bool = nil statementLengthFlag *uint = nil + statementLimitFlag *uint = nil ) func init() { @@ -47,12 +51,18 @@ func init() { "Maximum length of the statement text."). Default("120"). Uint() + statementLimitFlag = kingpin.Flag( + fmt.Sprint(collectorFlagPrefix, statStatementsSubsystem, ".limit"), + "Maximum number of statements to return."). + Default(defaultStatementLimit). + Uint() } type PGStatStatementsCollector struct { log *slog.Logger includeQueryStatement bool statementLength uint + statementLimit uint } func NewPGStatStatementsCollector(config collectorConfig) (Collector, error) { @@ -60,6 +70,7 @@ func NewPGStatStatementsCollector(config collectorConfig) (Collector, error) { log: config.logger, includeQueryStatement: *includeQueryFlag, statementLength: *statementLengthFlag, + statementLimit: *statementLimitFlag, }, nil } @@ -126,9 +137,9 @@ const ( FROM pg_stat_statements ) ORDER BY seconds_total DESC - LIMIT 100;` + LIMIT %s;` - pgStatStatementsNewQuery = `SELECT + pgStatStatementsQuery_PG13 = `SELECT pg_get_userbyid(userid) as user, pg_database.datname, pg_stat_statements.queryid, @@ -148,7 +159,7 @@ const ( FROM pg_stat_statements ) ORDER BY seconds_total DESC - LIMIT 100;` + LIMIT %s;` pgStatStatementsQuery_PG17 = `SELECT pg_get_userbyid(userid) as user, @@ -170,7 +181,7 @@ const ( FROM pg_stat_statements ) ORDER BY seconds_total DESC - LIMIT 100;` + LIMIT %s;` ) func (c PGStatStatementsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { @@ -179,20 +190,24 @@ func (c PGStatStatementsCollector) Update(ctx context.Context, instance *instanc case instance.version.GE(semver.MustParse("17.0.0")): queryTemplate = pgStatStatementsQuery_PG17 case instance.version.GE(semver.MustParse("13.0.0")): - queryTemplate = pgStatStatementsNewQuery + queryTemplate = pgStatStatementsQuery_PG13 default: queryTemplate = pgStatStatementsQuery } - var querySelect = "" + querySelect := "" if c.includeQueryStatement { querySelect = fmt.Sprintf(pgStatStatementQuerySelect, c.statementLength) } - query := fmt.Sprintf(queryTemplate, querySelect) + statementLimit := defaultStatementLimit + if c.statementLimit > 0 { + statementLimit = fmt.Sprintf("%d", c.statementLimit) + } + query := fmt.Sprintf(queryTemplate, querySelect, statementLimit) db := instance.getDB() rows, err := db.QueryContext(ctx, query) - var presentQueryIds = make(map[string]struct{}) + presentQueryIds := make(map[string]struct{}) if err != nil { return err diff --git a/collector/pg_stat_statements_test.go b/collector/pg_stat_statements_test.go index 0497ba380..8763f693d 100644 --- a/collector/pg_stat_statements_test.go +++ b/collector/pg_stat_statements_test.go @@ -24,7 +24,7 @@ import ( "github.com/smartystreets/goconvey/convey" ) -func TestPGStateStatementsCollector(t *testing.T) { +func TestPGStatStatementsCollector(t *testing.T) { db, mock, err := sqlmock.New() if err != nil { t.Fatalf("Error opening a stub db connection: %s", err) @@ -36,7 +36,7 @@ func TestPGStateStatementsCollector(t *testing.T) { columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} rows := sqlmock.NewRows(columns). AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2) - mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery, ""))).WillReturnRows(rows) + mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery, "", defaultStatementLimit))).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() { @@ -67,7 +67,7 @@ func TestPGStateStatementsCollector(t *testing.T) { } } -func TestPGStateStatementsCollectorWithStatement(t *testing.T) { +func TestPGStatStatementsCollectorWithStatement(t *testing.T) { db, mock, err := sqlmock.New() if err != nil { t.Fatalf("Error opening a stub db connection: %s", err) @@ -79,7 +79,7 @@ func TestPGStateStatementsCollectorWithStatement(t *testing.T) { columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 100) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} rows := sqlmock.NewRows(columns). AddRow("postgres", "postgres", 1500, "select 1 from foo", 5, 0.4, 100, 0.1, 0.2) - mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery, fmt.Sprintf(pgStatStatementQuerySelect, 100)))).WillReturnRows(rows) + mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery, fmt.Sprintf(pgStatStatementQuerySelect, 100), defaultStatementLimit))).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() { @@ -111,7 +111,51 @@ func TestPGStateStatementsCollectorWithStatement(t *testing.T) { } } -func TestPGStateStatementsCollectorNull(t *testing.T) { +func TestPGStatStatementsCollectorWithStatementAndLimit(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db, version: semver.MustParse("12.0.0")} + + columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 100) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} + rows := sqlmock.NewRows(columns). + AddRow("postgres", "postgres", 1500, "select 1 from foo", 5, 0.4, 100, 0.1, 0.2) + mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery, fmt.Sprintf(pgStatStatementQuerySelect, 100), "10"))).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatStatementsCollector{includeQueryStatement: true, statementLength: 100, statementLimit: 10} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 5}, + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.4}, + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 100}, + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.1}, + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.2}, + {labels: labelMap{"queryid": "1500", "query": "select 1 from foo"}, metricType: dto.MetricType_COUNTER, value: 1}, + } + + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} + +func TestPGStatStatementsCollectorNull(t *testing.T) { db, mock, err := sqlmock.New() if err != nil { t.Fatalf("Error opening a stub db connection: %s", err) @@ -123,7 +167,7 @@ func TestPGStateStatementsCollectorNull(t *testing.T) { columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} rows := sqlmock.NewRows(columns). AddRow(nil, nil, nil, nil, nil, nil, nil, nil) - mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsNewQuery, ""))).WillReturnRows(rows) + mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery_PG13, "", defaultStatementLimit))).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() { @@ -154,7 +198,7 @@ func TestPGStateStatementsCollectorNull(t *testing.T) { } } -func TestPGStateStatementsCollectorNullWithStatement(t *testing.T) { +func TestPGStatStatementsCollectorNullWithStatement(t *testing.T) { db, mock, err := sqlmock.New() if err != nil { t.Fatalf("Error opening a stub db connection: %s", err) @@ -166,7 +210,7 @@ func TestPGStateStatementsCollectorNullWithStatement(t *testing.T) { columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 200) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} rows := sqlmock.NewRows(columns). AddRow(nil, nil, nil, nil, nil, nil, nil, nil, nil) - mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsNewQuery, fmt.Sprintf(pgStatStatementQuerySelect, 200)))).WillReturnRows(rows) + mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery_PG13, fmt.Sprintf(pgStatStatementQuerySelect, 200), defaultStatementLimit))).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() { @@ -198,7 +242,51 @@ func TestPGStateStatementsCollectorNullWithStatement(t *testing.T) { } } -func TestPGStateStatementsCollectorNewPG(t *testing.T) { +func TestPGStatStatementsCollectorNullWithStatementAndLimit(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db, version: semver.MustParse("13.3.7")} + + columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 200) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} + rows := sqlmock.NewRows(columns). + AddRow(nil, nil, nil, nil, nil, nil, nil, nil, nil) + mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery_PG13, fmt.Sprintf(pgStatStatementQuerySelect, 200), "10"))).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatStatementsCollector{includeQueryStatement: true, statementLength: 200, statementLimit: 10} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"queryid": "unknown", "query": "unknown"}, metricType: dto.MetricType_COUNTER, value: 1}, + } + + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} + +func TestPGStatStatementsCollector_PG13(t *testing.T) { db, mock, err := sqlmock.New() if err != nil { t.Fatalf("Error opening a stub db connection: %s", err) @@ -210,7 +298,7 @@ func TestPGStateStatementsCollectorNewPG(t *testing.T) { columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} rows := sqlmock.NewRows(columns). AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2) - mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsNewQuery, ""))).WillReturnRows(rows) + mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery_PG13, "", defaultStatementLimit))).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() { @@ -241,7 +329,7 @@ func TestPGStateStatementsCollectorNewPG(t *testing.T) { } } -func TestPGStateStatementsCollectorNewPGWithStatement(t *testing.T) { +func TestPGStatStatementsCollector_PG13_WithStatement(t *testing.T) { db, mock, err := sqlmock.New() if err != nil { t.Fatalf("Error opening a stub db connection: %s", err) @@ -253,7 +341,7 @@ func TestPGStateStatementsCollectorNewPGWithStatement(t *testing.T) { columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 300) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} rows := sqlmock.NewRows(columns). AddRow("postgres", "postgres", 1500, "select 1 from foo", 5, 0.4, 100, 0.1, 0.2) - mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsNewQuery, fmt.Sprintf(pgStatStatementQuerySelect, 300)))).WillReturnRows(rows) + mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery_PG13, fmt.Sprintf(pgStatStatementQuerySelect, 300), defaultStatementLimit))).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() { @@ -285,7 +373,51 @@ func TestPGStateStatementsCollectorNewPGWithStatement(t *testing.T) { } } -func TestPGStateStatementsCollector_PG17(t *testing.T) { +func TestPGStatStatementsCollector_PG13_WithStatementAndLimit(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db, version: semver.MustParse("13.3.7")} + + columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 300) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} + rows := sqlmock.NewRows(columns). + AddRow("postgres", "postgres", 1500, "select 1 from foo", 5, 0.4, 100, 0.1, 0.2) + mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery_PG13, fmt.Sprintf(pgStatStatementQuerySelect, 300), "10"))).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatStatementsCollector{includeQueryStatement: true, statementLength: 300, statementLimit: 10} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 5}, + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.4}, + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 100}, + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.1}, + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.2}, + {labels: labelMap{"queryid": "1500", "query": "select 1 from foo"}, metricType: dto.MetricType_COUNTER, value: 1}, + } + + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} + +func TestPGStatStatementsCollector_PG17(t *testing.T) { db, mock, err := sqlmock.New() if err != nil { t.Fatalf("Error opening a stub db connection: %s", err) @@ -297,7 +429,7 @@ func TestPGStateStatementsCollector_PG17(t *testing.T) { columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} rows := sqlmock.NewRows(columns). AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2) - mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery_PG17, ""))).WillReturnRows(rows) + mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery_PG17, "", defaultStatementLimit))).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() { @@ -328,7 +460,7 @@ func TestPGStateStatementsCollector_PG17(t *testing.T) { } } -func TestPGStateStatementsCollector_PG17_WithStatement(t *testing.T) { +func TestPGStatStatementsCollector_PG17_WithStatement(t *testing.T) { db, mock, err := sqlmock.New() if err != nil { t.Fatalf("Error opening a stub db connection: %s", err) @@ -340,7 +472,7 @@ func TestPGStateStatementsCollector_PG17_WithStatement(t *testing.T) { columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 300) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} rows := sqlmock.NewRows(columns). AddRow("postgres", "postgres", 1500, "select 1 from foo", 5, 0.4, 100, 0.1, 0.2) - mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery_PG17, fmt.Sprintf(pgStatStatementQuerySelect, 300)))).WillReturnRows(rows) + mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery_PG17, fmt.Sprintf(pgStatStatementQuerySelect, 300), defaultStatementLimit))).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() { @@ -371,3 +503,47 @@ func TestPGStateStatementsCollector_PG17_WithStatement(t *testing.T) { t.Errorf("there were unfulfilled exceptions: %s", err) } } + +func TestPGStatStatementsCollector_PG17_WithStatementAndLimit(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db, version: semver.MustParse("17.0.0")} + + columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 300) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} + rows := sqlmock.NewRows(columns). + AddRow("postgres", "postgres", 1500, "select 1 from foo", 5, 0.4, 100, 0.1, 0.2) + mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery_PG17, fmt.Sprintf(pgStatStatementQuerySelect, 300), "10"))).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatStatementsCollector{includeQueryStatement: true, statementLength: 300, statementLimit: 10} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 5}, + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.4}, + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 100}, + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.1}, + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.2}, + {labels: labelMap{"queryid": "1500", "query": "select 1 from foo"}, metricType: dto.MetricType_COUNTER, value: 1}, + } + + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +}