10
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
11
// See the License for the specific language governing permissions and
12
12
// limitations under the License.
13
-
14
13
package collector
15
14
16
15
import (
17
16
"encoding/json"
18
17
"fmt"
19
- "github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo"
20
18
"net/http"
21
19
"net/url"
22
20
"path"
@@ -26,104 +24,66 @@ import (
26
24
"github.com/prometheus/client_golang/prometheus"
27
25
)
28
26
27
+ var (
28
+ defaultNodeShardLabels = []string {"node" }
29
+
30
+ defaultNodeShardLabelValues = func (node string ) []string {
31
+ return []string {
32
+ node ,
33
+ }
34
+ }
35
+ )
36
+
29
37
// ShardResponse has shard's node and index info
30
38
type ShardResponse struct {
31
39
Index string `json:"index"`
32
40
Shard string `json:"shard"`
33
- State string `json:"state"`
34
41
Node string `json:"node"`
35
42
}
36
43
37
44
// Shards information struct
38
45
type Shards struct {
39
- logger log.Logger
40
- client * http.Client
41
- url * url.URL
42
- clusterInfoCh chan * clusterinfo.Response
43
- lastClusterInfo * clusterinfo.Response
46
+ logger log.Logger
47
+ client * http.Client
48
+ url * url.URL
44
49
45
50
nodeShardMetrics []* nodeShardMetric
46
51
jsonParseFailures prometheus.Counter
47
52
}
48
53
49
- // ClusterLabelUpdates returns a pointer to a channel to receive cluster info updates. It implements the
50
- // (not exported) clusterinfo.consumer interface
51
- func (s * Shards ) ClusterLabelUpdates () * chan * clusterinfo.Response {
52
- return & s .clusterInfoCh
53
- }
54
-
55
- // String implements the stringer interface. It is part of the clusterinfo.consumer interface
56
- func (s * Shards ) String () string {
57
- return namespace + "shards"
58
- }
59
-
60
54
type nodeShardMetric struct {
61
55
Type prometheus.ValueType
62
56
Desc * prometheus.Desc
63
57
Value func (shards float64 ) float64
64
- Labels labels
58
+ Labels func ( node string ) [] string
65
59
}
66
60
67
61
// NewShards defines Shards Prometheus metrics
68
62
func NewShards (logger log.Logger , client * http.Client , url * url.URL ) * Shards {
69
-
70
- nodeLabels := labels {
71
- keys : func (... string ) []string {
72
- return []string {"node" , "cluster" }
73
- },
74
- values : func (lastClusterinfo * clusterinfo.Response , s ... string ) []string {
75
- if lastClusterinfo != nil {
76
- return append (s , lastClusterinfo .ClusterName )
77
- }
78
- // this shouldn't happen, as the clusterinfo Retriever has a blocking
79
- // Run method. It blocks until the first clusterinfo call has succeeded
80
- return append (s , "unknown_cluster" )
81
- },
82
- }
83
-
84
- shards := & Shards {
63
+ return & Shards {
85
64
logger : logger ,
86
65
client : client ,
87
66
url : url ,
88
67
89
- clusterInfoCh : make (chan * clusterinfo.Response ),
90
- lastClusterInfo : & clusterinfo.Response {
91
- ClusterName : "unknown_cluster" ,
92
- },
93
-
94
68
nodeShardMetrics : []* nodeShardMetric {
95
69
{
96
70
Type : prometheus .GaugeValue ,
97
71
Desc : prometheus .NewDesc (
98
72
prometheus .BuildFQName (namespace , "node_shards" , "total" ),
99
73
"Total shards per node" ,
100
- nodeLabels . keys () , nil ,
74
+ defaultNodeShardLabels , nil ,
101
75
),
102
76
Value : func (shards float64 ) float64 {
103
77
return shards
104
78
},
105
- Labels : nodeLabels ,
79
+ Labels : defaultNodeShardLabelValues ,
106
80
}},
107
81
108
82
jsonParseFailures : prometheus .NewCounter (prometheus.CounterOpts {
109
83
Name : prometheus .BuildFQName (namespace , "node_shards" , "json_parse_failures" ),
110
84
Help : "Number of errors while parsing JSON." ,
111
85
}),
112
86
}
113
-
114
- // start go routine to fetch clusterinfo updates and save them to lastClusterinfo
115
- go func () {
116
- level .Debug (logger ).Log ("msg" , "starting cluster info receive loop" )
117
- for ci := range shards .clusterInfoCh {
118
- if ci != nil {
119
- level .Debug (logger ).Log ("msg" , "received cluster info update" , "cluster" , ci .ClusterName )
120
- shards .lastClusterInfo = ci
121
- }
122
- }
123
- level .Debug (logger ).Log ("msg" , "exiting cluster info receive loop" )
124
- }()
125
-
126
- return shards
127
87
}
128
88
129
89
// Describe Shards
@@ -177,7 +137,7 @@ func (s *Shards) fetchAndDecodeShards() ([]ShardResponse, error) {
177
137
return sfr , err
178
138
}
179
139
180
- // Collect number of shards on each node
140
+ // Collect number of shards on each nodes
181
141
func (s * Shards ) Collect (ch chan <- prometheus.Metric ) {
182
142
183
143
defer func () {
@@ -196,8 +156,10 @@ func (s *Shards) Collect(ch chan<- prometheus.Metric) {
196
156
nodeShards := make (map [string ]float64 )
197
157
198
158
for _ , shard := range sr {
199
- if shard .State == "STARTED" {
200
- nodeShards [shard .Node ]++
159
+ if val , ok := nodeShards [shard .Node ]; ok {
160
+ nodeShards [shard .Node ] = val + 1
161
+ } else {
162
+ nodeShards [shard .Node ] = 1
201
163
}
202
164
}
203
165
@@ -207,7 +169,7 @@ func (s *Shards) Collect(ch chan<- prometheus.Metric) {
207
169
metric .Desc ,
208
170
metric .Type ,
209
171
metric .Value (shards ),
210
- metric .Labels . values ( s . lastClusterInfo , node )... ,
172
+ metric .Labels ( node )... ,
211
173
)
212
174
}
213
175
}
0 commit comments