diff --git a/collector/ilm_indices.go b/collector/ilm_indices.go new file mode 100644 index 00000000..f3611017 --- /dev/null +++ b/collector/ilm_indices.go @@ -0,0 +1,184 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "path" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" +) + +type ilmMetric struct { + Type prometheus.ValueType + Desc *prometheus.Desc + Value func(timeMillis float64) float64 + Labels []string +} + +// Index Lifecycle Management information object +type IlmIndiciesCollector struct { + logger log.Logger + client *http.Client + url *url.URL + + up prometheus.Gauge + totalScrapes prometheus.Counter + jsonParseFailures prometheus.Counter + + ilmMetric ilmMetric +} + +type IlmResponse struct { + Indices map[string]IlmIndexResponse `json:"indices"` +} + +type IlmIndexResponse struct { + Index string `json:"index"` + Managed bool `json:"managed"` + Phase string `json:"phase"` + Action string `json:"action"` + Step string `json:"step"` + StepTimeMillis float64 `json:"step_time_millis"` +} + +var ( + defaultIlmIndicesMappingsLabels = []string{"index", "phase", "action", "step"} +) + +// NewIlmIndicies defines Index Lifecycle Management Prometheus metrics +func NewIlmIndicies(logger log.Logger, client *http.Client, url *url.URL) *IlmIndiciesCollector { + subsystem := "ilm_index" + + return &IlmIndiciesCollector{ + logger: logger, + client: client, + url: url, + + up: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: prometheus.BuildFQName(namespace, subsystem, "up"), + Help: "Was the last scrape of the ElasticSearch ILM endpoint successful.", + }), + totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{ + Name: prometheus.BuildFQName(namespace, subsystem, "total_scrapes"), + Help: "Current total ElasticSearch ILM scrapes.", + }), + jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{ + Name: prometheus.BuildFQName(namespace, subsystem, "json_parse_failures"), + Help: "Number of errors while parsing JSON.", + }), + ilmMetric: ilmMetric{ + Type: prometheus.GaugeValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "status"), + "Status of ILM policy for index", + defaultIlmIndicesMappingsLabels, nil), + Value: func(timeMillis float64) float64 { + return timeMillis + }, + }, + } +} + +// Describe adds metrics description +func (i *IlmIndiciesCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- i.ilmMetric.Desc + ch <- i.up.Desc() + ch <- i.totalScrapes.Desc() + ch <- i.jsonParseFailures.Desc() +} + +func (i *IlmIndiciesCollector) fetchAndDecodeIlm() (IlmResponse, error) { + var ir IlmResponse + + u := *i.url + u.Path = path.Join(u.Path, "/_all/_ilm/explain") + + res, err := i.client.Get(u.String()) + if err != nil { + return ir, fmt.Errorf("failed to get index stats from %s://%s:%s%s: %s", + u.Scheme, u.Hostname(), u.Port(), u.Path, err) + } + + defer func() { + err = res.Body.Close() + if err != nil { + _ = level.Warn(i.logger).Log( + "msg", "failed to close http.Client", + "err", err, + ) + } + }() + + if res.StatusCode != http.StatusOK { + return ir, fmt.Errorf("HTTP Request failed with code %d", res.StatusCode) + } + + bts, err := ioutil.ReadAll(res.Body) + if err != nil { + i.jsonParseFailures.Inc() + return ir, err + } + + if err := json.Unmarshal(bts, &ir); err != nil { + i.jsonParseFailures.Inc() + return ir, err + } + + return ir, nil +} + +func bool2int(managed bool) float64 { + if managed { + return 1 + } + return 0 +} + +// Collect pulls metric values from Elasticsearch +func (i *IlmIndiciesCollector) Collect(ch chan<- prometheus.Metric) { + defer func() { + ch <- i.up + ch <- i.totalScrapes + ch <- i.jsonParseFailures + }() + + // indices + ilmResp, err := i.fetchAndDecodeIlm() + if err != nil { + i.up.Set(0) + _ = level.Warn(i.logger).Log( + "msg", "failed to fetch and decode ILM stats", + "err", err, + ) + return + } + i.totalScrapes.Inc() + i.up.Set(1) + + for indexName, indexIlm := range ilmResp.Indices { + ch <- prometheus.MustNewConstMetric( + i.ilmMetric.Desc, + i.ilmMetric.Type, + i.ilmMetric.Value(bool2int(indexIlm.Managed)), + indexName, indexIlm.Phase, indexIlm.Action, indexIlm.Step, + ) + } +} diff --git a/collector/ilm_indices_test.go b/collector/ilm_indices_test.go new file mode 100644 index 00000000..27218f51 --- /dev/null +++ b/collector/ilm_indices_test.go @@ -0,0 +1,117 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/go-kit/log" +) + +func TestILMMetrics(t *testing.T) { + // Testcases created using: + // docker run -d -p 9200:9200 elasticsearch:VERSION + // curl -XPUT http://localhost:9200/twitter + // curl -X PUT "localhost:9200/_ilm/policy/my_policy?pretty" -H 'Content-Type: application/json' -d' + // { + // "policy": { + // "phases": { + // "warm": { + // "min_age": "10d", + // "actions": { + // "forcemerge": { + // "max_num_segments": 1 + // } + // } + // }, + // "delete": { + // "min_age": "30d", + // "actions": { + // "delete": {} + // } + // } + // } + // } + // } + // ' + // curl -X PUT "localhost:9200/facebook?pretty" -H 'Content-Type: application/json' -d' + // { + // "settings": { + // "index": { + // "lifecycle": { + // "name": "my_policy" + // } + // } + // } + // } + // ' + // curl http://localhost:9200/_all/_ilm/explain + tcs := map[string]string{ + "6.6.0": `{ + "indices": { + "twitter": { "index": "twitter", "managed": false }, + "facebook": { + "index": "facebook", + "managed": true, + "policy": "my_policy", + "lifecycle_date_millis": 1660799138565, + "phase": "new", + "phase_time_millis": 1660799138651, + "action": "complete", + "action_time_millis": 1660799138651, + "step": "complete", + "step_time_millis": 1660799138651 + } + } + }`, + } + for ver, out := range tcs { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, out) + })) + defer ts.Close() + + u, err := url.Parse(ts.URL) + if err != nil { + t.Fatalf("Failed to parse URL: %s", err) + } + c := NewIlmIndicies(log.NewNopLogger(), http.DefaultClient, u) + chr, err := c.fetchAndDecodeIlm() + if err != nil { + t.Fatalf("Failed to fetch or decode indices ilm metrics: %s", err) + } + t.Logf("[%s] indices ilm metrics Response: %+v", ver, chr) + + if chr.Indices["twitter"].Managed != false { + t.Errorf("Invalid ilm metrics at twitter.managed") + } + if chr.Indices["facebook"].Managed != true { + t.Errorf("Invalid ilm metrics at facebook.managed") + } + if chr.Indices["facebook"].Phase != "new" { + t.Errorf("Invalid ilm metrics at facebook.phase") + } + if chr.Indices["facebook"].Action != "complete" { + t.Errorf("Invalid ilm metrics at facebook.action") + } + if chr.Indices["facebook"].Step != "complete" { + t.Errorf("Invalid ilm metrics at facebook.step") + } + + } +} diff --git a/collector/ilm_status.go b/collector/ilm_status.go new file mode 100644 index 00000000..3536b5be --- /dev/null +++ b/collector/ilm_status.go @@ -0,0 +1,167 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "path" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + ilmStatuses = []string{"STOPPED", "RUNNING", "STOPPING"} +) + +type ilmStatusMetric struct { + Type prometheus.ValueType + Desc *prometheus.Desc + Value func(ilm *IlmStatusResponse, status string) float64 + Labels func(status string) []string +} + +// IlmStatusCollector information struct +type IlmStatusCollector struct { + logger log.Logger + client *http.Client + url *url.URL + + up prometheus.Gauge + totalScrapes, jsonParseFailures prometheus.Counter + + metric ilmStatusMetric +} + +type IlmStatusResponse struct { + OperationMode string `json:"operation_mode"` +} + +// NewIlmStatus defines Indices IndexIlms Prometheus metrics +func NewIlmStatus(logger log.Logger, client *http.Client, url *url.URL) *IlmStatusCollector { + subsystem := "ilm" + + return &IlmStatusCollector{ + logger: logger, + client: client, + url: url, + + up: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: prometheus.BuildFQName(namespace, subsystem, "up"), + Help: "Was the last scrape of the ElasticSearch Indices Ilms endpoint successful.", + }), + totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{ + Name: prometheus.BuildFQName(namespace, subsystem, "scrapes_total"), + Help: "Current total ElasticSearch Indices Ilms scrapes.", + }), + jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{ + Name: prometheus.BuildFQName(namespace, subsystem, "json_parse_failures_total"), + Help: "Number of errors while parsing JSON.", + }), + metric: ilmStatusMetric{ + Type: prometheus.GaugeValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "status"), + "Current status of ilm. Status can be STOPPED, RUNNING, STOPPING.", + ilmStatuses, nil, + ), + Value: func(ilm *IlmStatusResponse, status string) float64 { + if ilm.OperationMode == status { + return 1 + } + return 0 + }, + }, + } +} + +// Describe add Snapshots metrics descriptions +func (im *IlmStatusCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- im.metric.Desc + ch <- im.up.Desc() + ch <- im.totalScrapes.Desc() + ch <- im.jsonParseFailures.Desc() +} + +func (im *IlmStatusCollector) fetchAndDecodeIlm() (*IlmStatusResponse, error) { + u := *im.url + u.Path = path.Join(im.url.Path, "/_ilm/status") + + res, err := im.client.Get(u.String()) + if err != nil { + return nil, fmt.Errorf("failed to get from %s://%s:%s%s: %s", + u.Scheme, u.Hostname(), u.Port(), u.Path, err) + } + + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("HTTP Request failed with code %d", res.StatusCode) + } + + body, err := ioutil.ReadAll(res.Body) + if err != nil { + _ = level.Warn(im.logger).Log("msg", "failed to read response body", "err", err) + return nil, err + } + + err = res.Body.Close() + if err != nil { + _ = level.Warn(im.logger).Log("msg", "failed to close response body", "err", err) + return nil, err + } + + var imr IlmStatusResponse + if err := json.Unmarshal(body, &imr); err != nil { + im.jsonParseFailures.Inc() + return nil, err + } + + return &imr, nil +} + +// Collect gets all indices Ilms metric values +func (im *IlmStatusCollector) Collect(ch chan<- prometheus.Metric) { + + im.totalScrapes.Inc() + defer func() { + ch <- im.up + ch <- im.totalScrapes + ch <- im.jsonParseFailures + }() + + indicesIlmsResponse, err := im.fetchAndDecodeIlm() + if err != nil { + im.up.Set(0) + _ = level.Warn(im.logger).Log( + "msg", "failed to fetch and decode cluster ilm status", + "err", err, + ) + return + } + im.up.Set(1) + + for _, status := range ilmStatuses { + ch <- prometheus.MustNewConstMetric( + im.metric.Desc, + im.metric.Type, + im.metric.Value(indicesIlmsResponse, status), + status, + ) + } + +} diff --git a/collector/ilm_status_test.go b/collector/ilm_status_test.go new file mode 100644 index 00000000..8744eec1 --- /dev/null +++ b/collector/ilm_status_test.go @@ -0,0 +1,53 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/go-kit/log" +) + +func TestILMStatus(t *testing.T) { + // Testcases created using: + // docker run -d -p 9200:9200 elasticsearch:VERSION + // curl http://localhost:9200/_ilm/status + tcs := map[string]string{ + "6.6.0": `{ "operation_mode": "RUNNING" }`, + } + for ver, out := range tcs { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, out) + })) + defer ts.Close() + + u, err := url.Parse(ts.URL) + if err != nil { + t.Fatalf("Failed to parse URL: %s", err) + } + c := NewIlmStatus(log.NewNopLogger(), http.DefaultClient, u) + chr, err := c.fetchAndDecodeIlm() + if err != nil { + t.Fatalf("Failed to fetch or decode ilm status: %s", err) + } + t.Logf("[%s] ILM Status Response: %+v", ver, chr) + if chr.OperationMode != "RUNNING" { + t.Errorf("Invalid ilm status") + } + } +} diff --git a/main.go b/main.go index 57db3aaa..857504d9 100644 --- a/main.go +++ b/main.go @@ -83,6 +83,9 @@ func main() { esExportClusterSettings = kingpin.Flag("es.cluster_settings", "Export stats for cluster settings."). Default("false").Bool() + esExportILM = kingpin.Flag("es.ilm", + "Export index lifecycle politics for indices in the cluster."). + Default("false").Bool() esExportShards = kingpin.Flag("es.shards", "Export stats for shards in the cluster (implies --es.indices)."). Default("false").Bool() @@ -235,6 +238,11 @@ func main() { prometheus.MustRegister(collector.NewIndicesMappings(logger, httpClient, esURL)) } + if *esExportILM { + prometheus.MustRegister(collector.NewIlmStatus(logger, httpClient, esURL)) + prometheus.MustRegister(collector.NewIlmIndicies(logger, httpClient, esURL)) + } + // create a http server server := &http.Server{}