Skip to content

Commit f394dde

Browse files
authored
chore: custom exporter for Client Built-in Metrics (#3164)
1 parent 92b1e07 commit f394dde

File tree

5 files changed

+889
-10
lines changed

5 files changed

+889
-10
lines changed

google-cloud-spanner/pom.xml

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,24 @@
255255
<groupId>io.opentelemetry</groupId>
256256
<artifactId>opentelemetry-context</artifactId>
257257
</dependency>
258+
<dependency>
259+
<groupId>io.opentelemetry</groupId>
260+
<artifactId>opentelemetry-sdk-common</artifactId>
261+
</dependency>
262+
<dependency>
263+
<groupId>io.opentelemetry</groupId>
264+
<artifactId>opentelemetry-sdk-metrics</artifactId>
265+
</dependency>
266+
<dependency>
267+
<groupId>com.google.cloud</groupId>
268+
<artifactId>google-cloud-monitoring</artifactId>
269+
<version>3.38.0</version>
270+
</dependency>
271+
<dependency>
272+
<groupId>com.google.api.grpc</groupId>
273+
<artifactId>proto-google-cloud-monitoring-v3</artifactId>
274+
<version>3.38.0</version>
275+
</dependency>
258276
<dependency>
259277
<groupId>com.google.auth</groupId>
260278
<artifactId>google-auth-library-oauth2-http</artifactId>
@@ -415,16 +433,6 @@
415433
<artifactId>opentelemetry-sdk</artifactId>
416434
<scope>test</scope>
417435
</dependency>
418-
<dependency>
419-
<groupId>io.opentelemetry</groupId>
420-
<artifactId>opentelemetry-sdk-common</artifactId>
421-
<scope>test</scope>
422-
</dependency>
423-
<dependency>
424-
<groupId>io.opentelemetry</groupId>
425-
<artifactId>opentelemetry-sdk-metrics</artifactId>
426-
<scope>test</scope>
427-
</dependency>
428436
<dependency>
429437
<groupId>io.opentelemetry</groupId>
430438
<artifactId>opentelemetry-sdk-trace</artifactId>
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import com.google.common.collect.ImmutableSet;
20+
import io.opentelemetry.api.common.AttributeKey;
21+
import java.util.Set;
22+
import java.util.stream.Collectors;
23+
24+
public class BuiltInMetricsConstant {
25+
26+
public static final String METER_NAME = "spanner.googleapis.com/internal/client";
27+
28+
public static final String GAX_METER_NAME = "gax-java";
29+
30+
static final String OPERATION_LATENCIES_NAME = "operation_latencies";
31+
static final String ATTEMPT_LATENCIES_NAME = "attempt_latencies";
32+
static final String OPERATION_LATENCY_NAME = "operation_latency";
33+
static final String ATTEMPT_LATENCY_NAME = "attempt_latency";
34+
static final String OPERATION_COUNT_NAME = "operation_count";
35+
static final String ATTEMPT_COUNT_NAME = "attempt_count";
36+
37+
public static final Set<String> SPANNER_METRICS =
38+
ImmutableSet.of(
39+
OPERATION_LATENCIES_NAME,
40+
ATTEMPT_LATENCIES_NAME,
41+
OPERATION_COUNT_NAME,
42+
ATTEMPT_COUNT_NAME)
43+
.stream()
44+
.map(m -> METER_NAME + '/' + m)
45+
.collect(Collectors.toSet());
46+
47+
public static final String SPANNER_RESOURCE_TYPE = "spanner_instance_client";
48+
49+
public static final AttributeKey<String> PROJECT_ID_KEY = AttributeKey.stringKey("project_id");
50+
public static final AttributeKey<String> INSTANCE_ID_KEY = AttributeKey.stringKey("instance_id");
51+
public static final AttributeKey<String> LOCATION_ID_KEY = AttributeKey.stringKey("location");
52+
public static final AttributeKey<String> INSTANCE_CONFIG_ID_KEY =
53+
AttributeKey.stringKey("instance_config");
54+
55+
// These metric labels will be promoted to the spanner monitored resource fields
56+
public static final Set<AttributeKey<String>> SPANNER_PROMOTED_RESOURCE_LABELS =
57+
ImmutableSet.of(PROJECT_ID_KEY, INSTANCE_ID_KEY, INSTANCE_CONFIG_ID_KEY, LOCATION_ID_KEY);
58+
59+
public static final AttributeKey<String> DATABASE_KEY = AttributeKey.stringKey("database");
60+
public static final AttributeKey<String> CLIENT_UID_KEY = AttributeKey.stringKey("client_uid");
61+
public static final AttributeKey<String> CLIENT_NAME_KEY = AttributeKey.stringKey("client_name");
62+
public static final AttributeKey<String> METHOD_KEY = AttributeKey.stringKey("method");
63+
public static final AttributeKey<String> STATUS_KEY = AttributeKey.stringKey("status");
64+
public static final AttributeKey<String> DIRECT_PATH_ENABLED_KEY =
65+
AttributeKey.stringKey("directpath_enabled");
66+
public static final AttributeKey<String> DIRECT_PATH_USED_KEY =
67+
AttributeKey.stringKey("directpath_used");
68+
69+
public static final Set<AttributeKey> COMMON_ATTRIBUTES =
70+
ImmutableSet.of(
71+
PROJECT_ID_KEY,
72+
INSTANCE_ID_KEY,
73+
LOCATION_ID_KEY,
74+
INSTANCE_CONFIG_ID_KEY,
75+
CLIENT_UID_KEY,
76+
METHOD_KEY,
77+
STATUS_KEY,
78+
DATABASE_KEY,
79+
CLIENT_NAME_KEY,
80+
DIRECT_PATH_ENABLED_KEY,
81+
DIRECT_PATH_USED_KEY);
82+
}
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import static com.google.cloud.spanner.BuiltInMetricsConstant.SPANNER_METRICS;
20+
21+
import com.google.api.core.ApiFuture;
22+
import com.google.api.core.ApiFutureCallback;
23+
import com.google.api.core.ApiFutures;
24+
import com.google.api.gax.core.CredentialsProvider;
25+
import com.google.api.gax.core.FixedCredentialsProvider;
26+
import com.google.api.gax.core.NoCredentialsProvider;
27+
import com.google.api.gax.rpc.PermissionDeniedException;
28+
import com.google.auth.Credentials;
29+
import com.google.cloud.monitoring.v3.MetricServiceClient;
30+
import com.google.cloud.monitoring.v3.MetricServiceSettings;
31+
import com.google.common.annotations.VisibleForTesting;
32+
import com.google.common.base.MoreObjects;
33+
import com.google.common.collect.Iterables;
34+
import com.google.common.util.concurrent.MoreExecutors;
35+
import com.google.monitoring.v3.CreateTimeSeriesRequest;
36+
import com.google.monitoring.v3.ProjectName;
37+
import com.google.monitoring.v3.TimeSeries;
38+
import com.google.protobuf.Empty;
39+
import io.opentelemetry.sdk.common.CompletableResultCode;
40+
import io.opentelemetry.sdk.metrics.InstrumentType;
41+
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
42+
import io.opentelemetry.sdk.metrics.data.MetricData;
43+
import io.opentelemetry.sdk.metrics.export.MetricExporter;
44+
import java.io.IOException;
45+
import java.util.ArrayList;
46+
import java.util.Collection;
47+
import java.util.List;
48+
import java.util.concurrent.atomic.AtomicBoolean;
49+
import java.util.logging.Level;
50+
import java.util.logging.Logger;
51+
import java.util.stream.Collectors;
52+
import javax.annotation.Nullable;
53+
import org.threeten.bp.Duration;
54+
55+
/**
56+
* Spanner Cloud Monitoring OpenTelemetry Exporter.
57+
*
58+
* <p>The exporter will look for all spanner owned metrics under spanner.googleapis.com
59+
* instrumentation scope and upload it via the Google Cloud Monitoring API.
60+
*/
61+
class SpannerCloudMonitoringExporter implements MetricExporter {
62+
63+
private static final Logger logger =
64+
Logger.getLogger(SpannerCloudMonitoringExporter.class.getName());
65+
66+
// This system property can be used to override the monitoring endpoint
67+
// to a different environment. It's meant for internal testing only.
68+
private static final String MONITORING_ENDPOINT =
69+
MoreObjects.firstNonNull(
70+
System.getProperty("spanner.test-monitoring-endpoint"),
71+
MetricServiceSettings.getDefaultEndpoint());
72+
73+
// This the quota limit from Cloud Monitoring. More details in
74+
// https://cloud.google.com/monitoring/quotas#custom_metrics_quotas.
75+
private static final int EXPORT_BATCH_SIZE_LIMIT = 200;
76+
private final AtomicBoolean spannerExportFailureLogged = new AtomicBoolean(false);
77+
private CompletableResultCode lastExportCode;
78+
private final MetricServiceClient client;
79+
private final String spannerProjectId;
80+
81+
static SpannerCloudMonitoringExporter create(String projectId, @Nullable Credentials credentials)
82+
throws IOException {
83+
MetricServiceSettings.Builder settingsBuilder = MetricServiceSettings.newBuilder();
84+
CredentialsProvider credentialsProvider;
85+
if (credentials == null) {
86+
credentialsProvider = NoCredentialsProvider.create();
87+
} else {
88+
credentialsProvider = FixedCredentialsProvider.create(credentials);
89+
}
90+
settingsBuilder.setCredentialsProvider(credentialsProvider);
91+
settingsBuilder.setEndpoint(MONITORING_ENDPOINT);
92+
93+
org.threeten.bp.Duration timeout = Duration.ofMinutes(1);
94+
// TODO: createServiceTimeSeries needs special handling if the request failed. Leaving
95+
// it as not retried for now.
96+
settingsBuilder.createServiceTimeSeriesSettings().setSimpleTimeoutNoRetries(timeout);
97+
98+
return new SpannerCloudMonitoringExporter(
99+
projectId, MetricServiceClient.create(settingsBuilder.build()));
100+
}
101+
102+
@VisibleForTesting
103+
SpannerCloudMonitoringExporter(String projectId, MetricServiceClient client) {
104+
this.client = client;
105+
this.spannerProjectId = projectId;
106+
}
107+
108+
@Override
109+
public CompletableResultCode export(Collection<MetricData> collection) {
110+
if (client.isShutdown()) {
111+
logger.log(Level.WARNING, "Exporter is shut down");
112+
return CompletableResultCode.ofFailure();
113+
}
114+
115+
this.lastExportCode = exportSpannerClientMetrics(collection);
116+
return lastExportCode;
117+
}
118+
119+
/** Export client built in metrics */
120+
private CompletableResultCode exportSpannerClientMetrics(Collection<MetricData> collection) {
121+
// Filter spanner metrics
122+
List<MetricData> spannerMetricData =
123+
collection.stream()
124+
.filter(md -> SPANNER_METRICS.contains(md.getName()))
125+
.collect(Collectors.toList());
126+
127+
// Skips exporting if there's none
128+
if (spannerMetricData.isEmpty()) {
129+
return CompletableResultCode.ofSuccess();
130+
}
131+
132+
// Verifies metrics project id is the same as the spanner project id set on this client
133+
if (!spannerMetricData.stream()
134+
.flatMap(metricData -> metricData.getData().getPoints().stream())
135+
.allMatch(
136+
pd -> spannerProjectId.equals(SpannerCloudMonitoringExporterUtils.getProjectId(pd)))) {
137+
logger.log(Level.WARNING, "Metric data has a different projectId. Skipping export.");
138+
return CompletableResultCode.ofFailure();
139+
}
140+
141+
List<TimeSeries> spannerTimeSeries;
142+
try {
143+
spannerTimeSeries =
144+
SpannerCloudMonitoringExporterUtils.convertToSpannerTimeSeries(spannerMetricData);
145+
} catch (Throwable e) {
146+
logger.log(
147+
Level.WARNING,
148+
"Failed to convert spanner metric data to cloud monitoring timeseries.",
149+
e);
150+
return CompletableResultCode.ofFailure();
151+
}
152+
153+
ProjectName projectName = ProjectName.of(spannerProjectId);
154+
155+
ApiFuture<List<Empty>> futureList = exportTimeSeriesInBatch(projectName, spannerTimeSeries);
156+
157+
CompletableResultCode spannerExportCode = new CompletableResultCode();
158+
ApiFutures.addCallback(
159+
futureList,
160+
new ApiFutureCallback<List<Empty>>() {
161+
@Override
162+
public void onFailure(Throwable throwable) {
163+
if (spannerExportFailureLogged.compareAndSet(false, true)) {
164+
String msg = "createServiceTimeSeries request failed for spanner metrics.";
165+
if (throwable instanceof PermissionDeniedException) {
166+
// TODO: Add the link of public documentation when available in the log message.
167+
msg +=
168+
String.format(
169+
" Need monitoring metric writer permission on project=%s.",
170+
projectName.getProject());
171+
}
172+
logger.log(Level.WARNING, msg, throwable);
173+
}
174+
spannerExportCode.fail();
175+
}
176+
177+
@Override
178+
public void onSuccess(List<Empty> empty) {
179+
// When an export succeeded reset the export failure flag to false so if there's a
180+
// transient failure it'll be logged.
181+
spannerExportFailureLogged.set(false);
182+
spannerExportCode.succeed();
183+
}
184+
},
185+
MoreExecutors.directExecutor());
186+
187+
return spannerExportCode;
188+
}
189+
190+
private ApiFuture<List<Empty>> exportTimeSeriesInBatch(
191+
ProjectName projectName, List<TimeSeries> timeSeries) {
192+
List<ApiFuture<Empty>> batchResults = new ArrayList<>();
193+
194+
for (List<TimeSeries> batch : Iterables.partition(timeSeries, EXPORT_BATCH_SIZE_LIMIT)) {
195+
CreateTimeSeriesRequest req =
196+
CreateTimeSeriesRequest.newBuilder()
197+
.setName(projectName.toString())
198+
.addAllTimeSeries(batch)
199+
.build();
200+
batchResults.add(this.client.createServiceTimeSeriesCallable().futureCall(req));
201+
}
202+
203+
return ApiFutures.allAsList(batchResults);
204+
}
205+
206+
@Override
207+
public CompletableResultCode flush() {
208+
return CompletableResultCode.ofSuccess();
209+
}
210+
211+
@Override
212+
public CompletableResultCode shutdown() {
213+
if (client.isShutdown()) {
214+
logger.log(Level.WARNING, "shutdown is called multiple times");
215+
return CompletableResultCode.ofSuccess();
216+
}
217+
CompletableResultCode shutdownResult = new CompletableResultCode();
218+
try {
219+
client.shutdown();
220+
shutdownResult.succeed();
221+
} catch (Throwable e) {
222+
logger.log(Level.WARNING, "failed to shutdown the monitoring client", e);
223+
shutdownResult.fail();
224+
}
225+
return shutdownResult;
226+
}
227+
228+
/**
229+
* For Google Cloud Monitoring always return CUMULATIVE to keep track of the cumulative value of a
230+
* metric over time.
231+
*/
232+
@Override
233+
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
234+
return AggregationTemporality.CUMULATIVE;
235+
}
236+
}

0 commit comments

Comments
 (0)