Skip to content

Commit 1274a7d

Browse files
committed
chore: custom exporter
1 parent e00b884 commit 1274a7d

File tree

5 files changed

+889
-2
lines changed

5 files changed

+889
-2
lines changed

google-cloud-spanner/pom.xml

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -418,12 +418,10 @@
418418
<dependency>
419419
<groupId>io.opentelemetry</groupId>
420420
<artifactId>opentelemetry-sdk-common</artifactId>
421-
<scope>test</scope>
422421
</dependency>
423422
<dependency>
424423
<groupId>io.opentelemetry</groupId>
425424
<artifactId>opentelemetry-sdk-metrics</artifactId>
426-
<scope>test</scope>
427425
</dependency>
428426
<dependency>
429427
<groupId>io.opentelemetry</groupId>
@@ -435,6 +433,16 @@
435433
<artifactId>opentelemetry-sdk-testing</artifactId>
436434
<scope>test</scope>
437435
</dependency>
436+
<dependency>
437+
<groupId>com.google.cloud</groupId>
438+
<artifactId>google-cloud-monitoring</artifactId>
439+
<version>3.38.0</version>
440+
</dependency>
441+
<dependency>
442+
<groupId>com.google.api.grpc</groupId>
443+
<artifactId>proto-google-cloud-monitoring-v3</artifactId>
444+
<version>3.38.0</version>
445+
</dependency>
438446
</dependencies>
439447
<profiles>
440448
<profile>
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import com.google.common.collect.ImmutableSet;
20+
import io.opentelemetry.api.common.AttributeKey;
21+
import java.util.Set;
22+
import java.util.stream.Collectors;
23+
24+
public class BuiltInMetricsConstant {
25+
26+
public static final String METER_NAME = "spanner.googleapis.com/internal/client";
27+
28+
public static final String GAX_METER_NAME = "gax-java";
29+
30+
static final String OPERATION_LATENCIES_NAME = "operation_latencies";
31+
static final String ATTEMPT_LATENCIES_NAME = "attempt_latencies";
32+
static final String OPERATION_LATENCY_NAME = "operation_latency";
33+
static final String ATTEMPT_LATENCY_NAME = "attempt_latency";
34+
static final String OPERATION_COUNT_NAME = "operation_count";
35+
static final String ATTEMPT_COUNT_NAME = "attempt_count";
36+
37+
public static final Set<String> SPANNER_METRICS =
38+
ImmutableSet.of(
39+
OPERATION_LATENCIES_NAME,
40+
ATTEMPT_LATENCIES_NAME,
41+
OPERATION_COUNT_NAME,
42+
ATTEMPT_COUNT_NAME)
43+
.stream()
44+
.map(m -> METER_NAME + '/' + m)
45+
.collect(Collectors.toSet());
46+
47+
public static final String SPANNER_RESOURCE_TYPE = "spanner_instance_client";
48+
49+
public static final AttributeKey<String> PROJECT_ID_KEY = AttributeKey.stringKey("project_id");
50+
public static final AttributeKey<String> INSTANCE_ID_KEY = AttributeKey.stringKey("instance_id");
51+
public static final AttributeKey<String> LOCATION_ID_KEY = AttributeKey.stringKey("location");
52+
public static final AttributeKey<String> INSTANCE_CONFIG_ID_KEY =
53+
AttributeKey.stringKey("instance_config");
54+
55+
// These metric labels will be promoted to the spanner monitored resource fields
56+
public static final Set<AttributeKey<String>> SPANNER_PROMOTED_RESOURCE_LABELS =
57+
ImmutableSet.of(PROJECT_ID_KEY, INSTANCE_ID_KEY, INSTANCE_CONFIG_ID_KEY, LOCATION_ID_KEY);
58+
59+
public static final AttributeKey<String> DATABASE_KEY = AttributeKey.stringKey("database");
60+
public static final AttributeKey<String> CLIENT_UID_KEY = AttributeKey.stringKey("client_uid");
61+
public static final AttributeKey<String> CLIENT_NAME_KEY = AttributeKey.stringKey("client_name");
62+
public static final AttributeKey<String> METHOD_KEY = AttributeKey.stringKey("method");
63+
public static final AttributeKey<String> STATUS_KEY = AttributeKey.stringKey("status");
64+
public static final AttributeKey<String> DIRECT_PATH_ENABLED_KEY =
65+
AttributeKey.stringKey("directpath_enabled");
66+
public static final AttributeKey<String> DIRECT_PATH_USED_KEY =
67+
AttributeKey.stringKey("directpath_used");
68+
69+
public static final Set<AttributeKey> COMMON_ATTRIBUTES =
70+
ImmutableSet.of(
71+
PROJECT_ID_KEY,
72+
INSTANCE_ID_KEY,
73+
LOCATION_ID_KEY,
74+
INSTANCE_CONFIG_ID_KEY,
75+
CLIENT_UID_KEY,
76+
METHOD_KEY,
77+
STATUS_KEY,
78+
DATABASE_KEY,
79+
CLIENT_NAME_KEY,
80+
DIRECT_PATH_ENABLED_KEY,
81+
DIRECT_PATH_USED_KEY);
82+
}
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import static com.google.cloud.spanner.BuiltInMetricsConstant.SPANNER_METRICS;
20+
21+
import com.google.api.core.ApiFuture;
22+
import com.google.api.core.ApiFutureCallback;
23+
import com.google.api.core.ApiFutures;
24+
import com.google.api.gax.core.CredentialsProvider;
25+
import com.google.api.gax.core.FixedCredentialsProvider;
26+
import com.google.api.gax.core.NoCredentialsProvider;
27+
import com.google.api.gax.rpc.PermissionDeniedException;
28+
import com.google.auth.Credentials;
29+
import com.google.cloud.monitoring.v3.MetricServiceClient;
30+
import com.google.cloud.monitoring.v3.MetricServiceSettings;
31+
import com.google.common.annotations.VisibleForTesting;
32+
import com.google.common.base.MoreObjects;
33+
import com.google.common.collect.Iterables;
34+
import com.google.common.util.concurrent.MoreExecutors;
35+
import com.google.monitoring.v3.CreateTimeSeriesRequest;
36+
import com.google.monitoring.v3.ProjectName;
37+
import com.google.monitoring.v3.TimeSeries;
38+
import com.google.protobuf.Empty;
39+
import io.opentelemetry.sdk.common.CompletableResultCode;
40+
import io.opentelemetry.sdk.metrics.InstrumentType;
41+
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
42+
import io.opentelemetry.sdk.metrics.data.MetricData;
43+
import io.opentelemetry.sdk.metrics.export.MetricExporter;
44+
import java.io.IOException;
45+
import java.util.ArrayList;
46+
import java.util.Arrays;
47+
import java.util.Collection;
48+
import java.util.List;
49+
import java.util.concurrent.atomic.AtomicBoolean;
50+
import java.util.logging.Level;
51+
import java.util.logging.Logger;
52+
import java.util.stream.Collectors;
53+
import javax.annotation.Nullable;
54+
import org.threeten.bp.Duration;
55+
56+
/**
57+
* Spanner Cloud Monitoring OpenTelemetry Exporter.
58+
*
59+
* <p>The exporter will look for all spanner owned metrics under spanner.googleapis.com
60+
* instrumentation scope and upload it via the Google Cloud Monitoring API.
61+
*/
62+
class SpannerCloudMonitoringExporter implements MetricExporter {
63+
64+
private static final Logger logger =
65+
Logger.getLogger(SpannerCloudMonitoringExporter.class.getName());
66+
67+
// This system property can be used to override the monitoring endpoint
68+
// to a different environment. It's meant for internal testing only.
69+
private static final String MONITORING_ENDPOINT =
70+
MoreObjects.firstNonNull(
71+
System.getProperty("spanner.test-monitoring-endpoint"),
72+
MetricServiceSettings.getDefaultEndpoint());
73+
74+
// This the quota limit from Cloud Monitoring. More details in
75+
// https://cloud.google.com/monitoring/quotas#custom_metrics_quotas.
76+
private static final int EXPORT_BATCH_SIZE_LIMIT = 200;
77+
private final AtomicBoolean spannerExportFailureLogged = new AtomicBoolean(false);
78+
private CompletableResultCode lastExportCode;
79+
private final MetricServiceClient client;
80+
private final String spannerProjectId;
81+
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
82+
83+
static SpannerCloudMonitoringExporter create(String projectId, @Nullable Credentials credentials)
84+
throws IOException {
85+
MetricServiceSettings.Builder settingsBuilder = MetricServiceSettings.newBuilder();
86+
CredentialsProvider credentialsProvider;
87+
if (credentials == null) {
88+
credentialsProvider = NoCredentialsProvider.create();
89+
} else {
90+
credentialsProvider = FixedCredentialsProvider.create(credentials);
91+
}
92+
settingsBuilder.setCredentialsProvider(credentialsProvider);
93+
settingsBuilder.setEndpoint(MONITORING_ENDPOINT);
94+
95+
org.threeten.bp.Duration timeout = Duration.ofMinutes(1);
96+
// TODO: createServiceTimeSeries needs special handling if the request failed. Leaving
97+
// it as not retried for now.
98+
settingsBuilder.createServiceTimeSeriesSettings().setSimpleTimeoutNoRetries(timeout);
99+
100+
return new SpannerCloudMonitoringExporter(
101+
projectId, MetricServiceClient.create(settingsBuilder.build()));
102+
}
103+
104+
@VisibleForTesting
105+
SpannerCloudMonitoringExporter(String projectId, MetricServiceClient client) {
106+
this.client = client;
107+
this.spannerProjectId = projectId;
108+
}
109+
110+
@Override
111+
public CompletableResultCode export(Collection<MetricData> collection) {
112+
if (isShutdown.get()) {
113+
logger.log(Level.WARNING, "Exporter is shutting down");
114+
return CompletableResultCode.ofFailure();
115+
}
116+
117+
lastExportCode = exportSpannerClientMetrics(collection);
118+
return lastExportCode;
119+
}
120+
121+
/** Export client built in metrics */
122+
private CompletableResultCode exportSpannerClientMetrics(Collection<MetricData> collection) {
123+
// Filter spanner metrics
124+
List<MetricData> spannerMetricData =
125+
collection.stream()
126+
.filter(md -> SPANNER_METRICS.contains(md.getName()))
127+
.collect(Collectors.toList());
128+
129+
// Skips exporting if there's none
130+
if (spannerMetricData.isEmpty()) {
131+
return CompletableResultCode.ofSuccess();
132+
}
133+
134+
// Verifies metrics project id are the same as the spanner project id set on this client
135+
if (!spannerMetricData.stream()
136+
.flatMap(metricData -> metricData.getData().getPoints().stream())
137+
.allMatch(
138+
pd -> spannerProjectId.equals(SpannerCloudMonitoringExporterUtils.getProjectId(pd)))) {
139+
logger.log(Level.WARNING, "Metric data has different a projectId. Skip exporting.");
140+
return CompletableResultCode.ofFailure();
141+
}
142+
143+
List<TimeSeries> spannerTimeSeries;
144+
try {
145+
spannerTimeSeries =
146+
SpannerCloudMonitoringExporterUtils.convertToSpannerTimeSeries(spannerMetricData);
147+
} catch (Throwable e) {
148+
logger.log(
149+
Level.WARNING,
150+
"Failed to convert spanner metric data to cloud monitoring timeseries.",
151+
e);
152+
return CompletableResultCode.ofFailure();
153+
}
154+
155+
ProjectName projectName = ProjectName.of(spannerProjectId);
156+
157+
ApiFuture<List<Empty>> futureList = exportTimeSeriesInBatch(projectName, spannerTimeSeries);
158+
159+
CompletableResultCode spannerExportCode = new CompletableResultCode();
160+
ApiFutures.addCallback(
161+
futureList,
162+
new ApiFutureCallback<List<Empty>>() {
163+
@Override
164+
public void onFailure(Throwable throwable) {
165+
if (spannerExportFailureLogged.compareAndSet(false, true)) {
166+
String msg = "createServiceTimeSeries request failed for spanner metrics.";
167+
if (throwable instanceof PermissionDeniedException) {
168+
// TODO: Add the link of public documentation when available in the log message.
169+
msg +=
170+
String.format(
171+
" Need monitoring metric writer permission on project=%s.",
172+
projectName.getProject());
173+
}
174+
logger.log(Level.WARNING, msg, throwable);
175+
}
176+
spannerExportCode.fail();
177+
}
178+
179+
@Override
180+
public void onSuccess(List<Empty> empty) {
181+
// When an export succeeded reset the export failure flag to false so if there's a
182+
// transient failure it'll be logged.
183+
spannerExportFailureLogged.set(false);
184+
spannerExportCode.succeed();
185+
}
186+
},
187+
MoreExecutors.directExecutor());
188+
189+
return spannerExportCode;
190+
}
191+
192+
private ApiFuture<List<Empty>> exportTimeSeriesInBatch(
193+
ProjectName projectName, List<TimeSeries> timeSeries) {
194+
List<ApiFuture<Empty>> batchResults = new ArrayList<>();
195+
196+
for (List<TimeSeries> batch : Iterables.partition(timeSeries, EXPORT_BATCH_SIZE_LIMIT)) {
197+
CreateTimeSeriesRequest req =
198+
CreateTimeSeriesRequest.newBuilder()
199+
.setName(projectName.toString())
200+
.addAllTimeSeries(batch)
201+
.build();
202+
ApiFuture<Empty> f = this.client.createServiceTimeSeriesCallable().futureCall(req);
203+
batchResults.add(f);
204+
}
205+
206+
return ApiFutures.allAsList(batchResults);
207+
}
208+
209+
@Override
210+
public CompletableResultCode flush() {
211+
if (lastExportCode != null) {
212+
return lastExportCode;
213+
}
214+
return CompletableResultCode.ofSuccess();
215+
}
216+
217+
@Override
218+
public CompletableResultCode shutdown() {
219+
if (!isShutdown.compareAndSet(false, true)) {
220+
logger.log(Level.WARNING, "shutdown is called multiple times");
221+
return CompletableResultCode.ofSuccess();
222+
}
223+
CompletableResultCode flushResult = flush();
224+
CompletableResultCode shutdownResult = new CompletableResultCode();
225+
flushResult.whenComplete(
226+
() -> {
227+
try {
228+
client.shutdown();
229+
shutdownResult.succeed();
230+
} catch (Throwable e) {
231+
logger.log(Level.WARNING, "failed to shutdown the monitoring client", e);
232+
shutdownResult.fail();
233+
}
234+
});
235+
return CompletableResultCode.ofAll(Arrays.asList(flushResult, shutdownResult));
236+
}
237+
238+
/**
239+
* For Google Cloud Monitoring always return CUMULATIVE to keep track of the cumulative value of a
240+
* metric over time.
241+
*/
242+
@Override
243+
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
244+
return AggregationTemporality.CUMULATIVE;
245+
}
246+
}

0 commit comments

Comments
 (0)