Skip to content

Commit 5160481

Browse files
authored
Add metric to track data downloaded from S3 (#886)
* Add metric to track data downloaded from S3
1 parent f4d5ffb commit 5160481

26 files changed

+470
-46
lines changed

src/main/java/com/yelp/nrtsearch/server/config/NrtsearchConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ public class NrtsearchConfig {
115115
private final DirectoryFactory.MMapGrouping mmapGrouping;
116116
private final boolean requireIdField;
117117
private final IsolatedReplicaConfig isolatedReplicaConfig;
118+
private final boolean s3Metrics;
118119

119120
@Inject
120121
public NrtsearchConfig(InputStream yamlStream) {
@@ -196,6 +197,7 @@ public NrtsearchConfig(InputStream yamlStream) {
196197
useSeparateCommitExecutor = configReader.getBoolean("useSeparateCommitExecutor", false);
197198
requireIdField = configReader.getBoolean("requireIdField", false);
198199
isolatedReplicaConfig = IsolatedReplicaConfig.fromConfig(configReader);
200+
s3Metrics = configReader.getBoolean("s3Metrics", false);
199201

200202
List<String> indicesWithOverrides = configReader.getKeysOrEmpty("indexLiveSettingsOverrides");
201203
Map<String, IndexLiveSettings> liveSettingsMap = new HashMap<>();
@@ -401,6 +403,10 @@ public IsolatedReplicaConfig getIsolatedReplicaConfig() {
401403
return isolatedReplicaConfig;
402404
}
403405

406+
public boolean getS3Metrics() {
407+
return s3Metrics;
408+
}
409+
404410
/**
405411
* Substitute all sub strings of the form ${FOO} with the environment variable value env[FOO].
406412
* Variable names may only contain letters, numbers, and underscores. If a variable is not present

src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@
9393
import com.yelp.nrtsearch.server.monitoring.NrtsearchMonitoringServerInterceptor;
9494
import com.yelp.nrtsearch.server.monitoring.ProcStatCollector;
9595
import com.yelp.nrtsearch.server.monitoring.QueryCacheCollector;
96+
import com.yelp.nrtsearch.server.monitoring.S3DownloadStreamWrapper;
9697
import com.yelp.nrtsearch.server.monitoring.SearchResponseCollector;
9798
import com.yelp.nrtsearch.server.monitoring.ThreadPoolCollector;
9899
import com.yelp.nrtsearch.server.monitoring.ThreadPoolCollector.RejectionCounterWrapper;
@@ -272,6 +273,7 @@ private void registerMetrics(GlobalState globalState) {
272273
prometheusRegistry.register(new SearchResponseCollector(globalState));
273274
// register Indexing metrics such as individual addDocument, updateDocValue latencies and qps
274275
IndexingMetrics.register(prometheusRegistry);
276+
S3DownloadStreamWrapper.register(prometheusRegistry);
275277
}
276278

277279
/** Main launches the server from the command line. */
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2025 Yelp Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.yelp.nrtsearch.server.monitoring;
17+
18+
import io.prometheus.metrics.core.datapoints.CounterDataPoint;
19+
import io.prometheus.metrics.core.metrics.Counter;
20+
import io.prometheus.metrics.model.registry.PrometheusRegistry;
21+
import java.io.InputStream;
22+
import org.apache.commons.io.input.ProxyInputStream;
23+
24+
/**
25+
* InputStream wrapper that counts the number of bytes read from an S3 download stream and updates a
26+
* Prometheus counter with the total bytes downloaded per index.
27+
*/
28+
public class S3DownloadStreamWrapper extends ProxyInputStream {
29+
public static final Counter nrtS3DownloadBytes =
30+
Counter.builder()
31+
.name("nrt_s3_download_bytes_total")
32+
.help("Total number of bytes downloaded from S3.")
33+
.labelNames("index")
34+
.build();
35+
36+
public static void register(PrometheusRegistry registry) {
37+
registry.register(nrtS3DownloadBytes);
38+
}
39+
40+
private final CounterDataPoint counterDataPoint;
41+
42+
public S3DownloadStreamWrapper(InputStream proxy, String indexName) {
43+
super(proxy);
44+
counterDataPoint = nrtS3DownloadBytes.labelValues(indexName);
45+
}
46+
47+
@Override
48+
protected void afterRead(int n) {
49+
if (n != -1) {
50+
counterDataPoint.inc(n);
51+
}
52+
}
53+
}

src/main/java/com/yelp/nrtsearch/server/remote/s3/S3Backend.java

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,11 @@
3434
import com.amazonaws.services.s3.transfer.Upload;
3535
import com.google.common.annotations.VisibleForTesting;
3636
import com.yelp.nrtsearch.server.config.NrtsearchConfig;
37+
import com.yelp.nrtsearch.server.monitoring.S3DownloadStreamWrapper;
3738
import com.yelp.nrtsearch.server.nrt.state.NrtFileMetaData;
3839
import com.yelp.nrtsearch.server.nrt.state.NrtPointState;
3940
import com.yelp.nrtsearch.server.remote.RemoteBackend;
41+
import com.yelp.nrtsearch.server.state.BackendGlobalState;
4042
import com.yelp.nrtsearch.server.state.StateUtils;
4143
import com.yelp.nrtsearch.server.utils.TimeStringUtils;
4244
import com.yelp.nrtsearch.server.utils.ZipUtils;
@@ -88,6 +90,7 @@ public class S3Backend implements RemoteBackend {
8890
private final AmazonS3 s3;
8991
private final String serviceBucket;
9092
private final TransferManager transferManager;
93+
private final boolean s3Metrics;
9194

9295
/**
9396
* Pair of file names, one for the local file and one for the backend file.
@@ -104,17 +107,23 @@ record FileNamePair(String fileName, String backendFileName) {}
104107
* @param s3 s3 client
105108
*/
106109
public S3Backend(NrtsearchConfig configuration, AmazonS3 s3) {
107-
this(configuration.getBucketName(), configuration.getSavePluginBeforeUnzip(), s3);
110+
this(
111+
configuration.getBucketName(),
112+
configuration.getSavePluginBeforeUnzip(),
113+
configuration.getS3Metrics(),
114+
s3);
108115
}
109116

110117
/**
111118
* Constructor.
112119
*
113120
* @param serviceBucket bucket name
114121
* @param savePluginBeforeUnzip save plugin before unzipping
122+
* @param s3Metrics enable s3 download metrics
115123
* @param s3 s3 client
116124
*/
117-
public S3Backend(String serviceBucket, boolean savePluginBeforeUnzip, AmazonS3 s3) {
125+
public S3Backend(
126+
String serviceBucket, boolean savePluginBeforeUnzip, boolean s3Metrics, AmazonS3 s3) {
118127
this.s3 = s3;
119128
this.executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(NUM_S3_THREADS);
120129
this.saveBeforeUnzip = savePluginBeforeUnzip;
@@ -125,6 +134,7 @@ public S3Backend(String serviceBucket, boolean savePluginBeforeUnzip, AmazonS3 s
125134
.withExecutorFactory(() -> executor)
126135
.withShutDownThreadPools(false)
127136
.build();
137+
this.s3Metrics = s3Metrics;
128138
}
129139

130140
public AmazonS3 getS3() {
@@ -346,7 +356,7 @@ public void uploadGlobalState(String service, byte[] data) throws IOException {
346356
@Override
347357
public InputStream downloadGlobalState(String service) throws IOException {
348358
String prefix = getGlobalStateResourcePrefix(service);
349-
return downloadResource(prefix);
359+
return downloadResource(prefix, null);
350360
}
351361

352362
@Override
@@ -360,7 +370,7 @@ public void uploadIndexState(String service, String indexIdentifier, byte[] data
360370
@Override
361371
public InputStream downloadIndexState(String service, String indexIdentifier) throws IOException {
362372
String prefix = getIndexResourcePrefix(service, indexIdentifier, IndexResourceType.INDEX_STATE);
363-
return downloadResource(prefix);
373+
return downloadResource(prefix, indexIdentifier);
364374
}
365375

366376
@Override
@@ -377,7 +387,7 @@ public InputStream downloadWarmingQueries(String service, String indexIdentifier
377387
throws IOException {
378388
String prefix =
379389
getIndexResourcePrefix(service, indexIdentifier, IndexResourceType.WARMING_QUERIES);
380-
return downloadResource(prefix);
390+
return downloadResource(prefix, indexIdentifier);
381391
}
382392

383393
@Override
@@ -472,7 +482,8 @@ public InputStream downloadIndexFile(
472482
String backendFileName = getIndexBackendFileName(fileName, fileMetaData);
473483
String backendPrefix = getIndexDataPrefix(service, indexIdentifier);
474484
String backendKey = backendPrefix + backendFileName;
475-
return downloadFromS3Path(serviceBucket, backendKey, false);
485+
return wrapDownloadStream(
486+
downloadFromS3Path(serviceBucket, backendKey, false), s3Metrics, indexIdentifier);
476487
}
477488

478489
@VisibleForTesting
@@ -486,6 +497,17 @@ static List<FileNamePair> getFileNamePairs(Map<String, NrtFileMetaData> files) {
486497
return fileList;
487498
}
488499

500+
@VisibleForTesting
501+
static InputStream wrapDownloadStream(
502+
InputStream inputStream, boolean s3Metrics, String indexIdentifier) {
503+
if (s3Metrics) {
504+
String indexName = BackendGlobalState.getBaseIndexName(indexIdentifier);
505+
return new S3DownloadStreamWrapper(inputStream, indexName);
506+
} else {
507+
return inputStream;
508+
}
509+
}
510+
489511
@Override
490512
public void uploadPointState(
491513
String service, String indexIdentifier, NrtPointState nrtPointState, byte[] data)
@@ -521,7 +543,10 @@ public InputStreamWithTimestamp downloadPointState(
521543
return null;
522544
}
523545
return new InputStreamWithTimestamp(
524-
downloadFromS3Path(serviceBucket, backendKeyWithTimestamp.backendKey(), true),
546+
wrapDownloadStream(
547+
downloadFromS3Path(serviceBucket, backendKeyWithTimestamp.backendKey(), true),
548+
s3Metrics,
549+
indexIdentifier),
525550
backendKeyWithTimestamp.timestamp());
526551
}
527552

@@ -654,10 +679,14 @@ private void uploadResource(
654679
setCurrentResource(prefix, fileName);
655680
}
656681

657-
private InputStream downloadResource(String prefix) throws IOException {
682+
private InputStream downloadResource(String prefix, String indexIdentifier) throws IOException {
658683
String fileName = getCurrentResourceName(prefix);
659684
String backendKey = prefix + fileName;
660-
return downloadFromS3Path(serviceBucket, backendKey, true);
685+
InputStream inputStream = downloadFromS3Path(serviceBucket, backendKey, true);
686+
if (indexIdentifier != null) {
687+
inputStream = wrapDownloadStream(inputStream, s3Metrics, indexIdentifier);
688+
}
689+
return inputStream;
661690
}
662691

663692
@VisibleForTesting

src/main/java/com/yelp/nrtsearch/tools/nrt_utils/backup/CleanupSnapshotsCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public Integer call() throws Exception {
123123
s3Client =
124124
StateCommandUtils.createS3Client(bucketName, region, credsFile, credsProfile, maxRetry);
125125
}
126-
S3Backend s3Backend = new S3Backend(bucketName, false, s3Client);
126+
S3Backend s3Backend = new S3Backend(bucketName, false, false, s3Client);
127127

128128
long deleteAfterMs = BackupCommandUtils.getTimeIntervalMs(deleteAfter);
129129
long minTimestampMs = System.currentTimeMillis() - deleteAfterMs;

src/main/java/com/yelp/nrtsearch/tools/nrt_utils/backup/RestoreCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ public Integer call() throws Exception {
147147
s3Client =
148148
StateCommandUtils.createS3Client(bucketName, region, credsFile, credsProfile, maxRetry);
149149
}
150-
S3Backend s3Backend = new S3Backend(bucketName, false, s3Client);
150+
S3Backend s3Backend = new S3Backend(bucketName, false, false, s3Client);
151151

152152
String resolvedSnapshotRoot =
153153
BackupCommandUtils.getSnapshotRoot(snapshotRoot, snapshotServiceName);

src/main/java/com/yelp/nrtsearch/tools/nrt_utils/backup/SnapshotCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public Integer call() throws Exception {
121121
s3Client =
122122
StateCommandUtils.createS3Client(bucketName, region, credsFile, credsProfile, maxRetry);
123123
}
124-
S3Backend s3Backend = new S3Backend(bucketName, false, s3Client);
124+
S3Backend s3Backend = new S3Backend(bucketName, false, false, s3Client);
125125

126126
String resolvedIndexResource =
127127
StateCommandUtils.getResourceName(s3Backend, serviceName, indexName, exactResourceName);

src/main/java/com/yelp/nrtsearch/tools/nrt_utils/cleanup/CleanupDataCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public Integer call() throws Exception {
128128
s3Client =
129129
StateCommandUtils.createS3Client(bucketName, region, credsFile, credsProfile, maxRetry);
130130
}
131-
S3Backend s3Backend = new S3Backend(bucketName, false, s3Client);
131+
S3Backend s3Backend = new S3Backend(bucketName, false, false, s3Client);
132132

133133
String resolvedIndexResource =
134134
StateCommandUtils.getResourceName(s3Backend, serviceName, indexName, exactResourceName);

src/main/java/com/yelp/nrtsearch/tools/nrt_utils/state/GetRemoteStateCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public Integer call() throws Exception {
9696
s3Client =
9797
StateCommandUtils.createS3Client(bucketName, region, credsFile, credsProfile, maxRetry);
9898
}
99-
S3Backend s3Backend = new S3Backend(bucketName, false, s3Client);
99+
S3Backend s3Backend = new S3Backend(bucketName, false, false, s3Client);
100100
String resolvedResourceName =
101101
StateCommandUtils.getResourceName(s3Backend, serviceName, resourceName, exactResourceName);
102102

src/main/java/com/yelp/nrtsearch/tools/nrt_utils/state/GetResourceVersionCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public Integer call() throws Exception {
9999
s3Client =
100100
StateCommandUtils.createS3Client(bucketName, region, credsFile, credsProfile, maxRetry);
101101
}
102-
S3Backend s3Backend = new S3Backend(bucketName, false, s3Client);
102+
S3Backend s3Backend = new S3Backend(bucketName, false, false, s3Client);
103103
String resolvedResourceName =
104104
StateCommandUtils.getResourceName(s3Backend, serviceName, resourceName, exactResourceName);
105105

0 commit comments

Comments
 (0)