-
Notifications
You must be signed in to change notification settings - Fork 132
chore: custom exporter for Client Built-in Metrics #3164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* Copyright 2024 Google LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.cloud.spanner; | ||
|
||
import com.google.common.collect.ImmutableSet; | ||
import io.opentelemetry.api.common.AttributeKey; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
|
||
public class BuiltInMetricsConstant { | ||
|
||
public static final String METER_NAME = "spanner.googleapis.com/internal/client"; | ||
|
||
public static final String GAX_METER_NAME = "gax-java"; | ||
|
||
static final String OPERATION_LATENCIES_NAME = "operation_latencies"; | ||
static final String ATTEMPT_LATENCIES_NAME = "attempt_latencies"; | ||
static final String OPERATION_LATENCY_NAME = "operation_latency"; | ||
static final String ATTEMPT_LATENCY_NAME = "attempt_latency"; | ||
static final String OPERATION_COUNT_NAME = "operation_count"; | ||
static final String ATTEMPT_COUNT_NAME = "attempt_count"; | ||
|
||
public static final Set<String> SPANNER_METRICS = | ||
ImmutableSet.of( | ||
OPERATION_LATENCIES_NAME, | ||
ATTEMPT_LATENCIES_NAME, | ||
OPERATION_COUNT_NAME, | ||
ATTEMPT_COUNT_NAME) | ||
.stream() | ||
.map(m -> METER_NAME + '/' + m) | ||
.collect(Collectors.toSet()); | ||
|
||
public static final String SPANNER_RESOURCE_TYPE = "spanner_instance_client"; | ||
|
||
public static final AttributeKey<String> PROJECT_ID_KEY = AttributeKey.stringKey("project_id"); | ||
public static final AttributeKey<String> INSTANCE_ID_KEY = AttributeKey.stringKey("instance_id"); | ||
public static final AttributeKey<String> LOCATION_ID_KEY = AttributeKey.stringKey("location"); | ||
public static final AttributeKey<String> INSTANCE_CONFIG_ID_KEY = | ||
AttributeKey.stringKey("instance_config"); | ||
|
||
// These metric labels will be promoted to the spanner monitored resource fields | ||
public static final Set<AttributeKey<String>> SPANNER_PROMOTED_RESOURCE_LABELS = | ||
ImmutableSet.of(PROJECT_ID_KEY, INSTANCE_ID_KEY, INSTANCE_CONFIG_ID_KEY, LOCATION_ID_KEY); | ||
|
||
public static final AttributeKey<String> DATABASE_KEY = AttributeKey.stringKey("database"); | ||
public static final AttributeKey<String> CLIENT_UID_KEY = AttributeKey.stringKey("client_uid"); | ||
public static final AttributeKey<String> CLIENT_NAME_KEY = AttributeKey.stringKey("client_name"); | ||
public static final AttributeKey<String> METHOD_KEY = AttributeKey.stringKey("method"); | ||
public static final AttributeKey<String> STATUS_KEY = AttributeKey.stringKey("status"); | ||
public static final AttributeKey<String> DIRECT_PATH_ENABLED_KEY = | ||
AttributeKey.stringKey("directpath_enabled"); | ||
public static final AttributeKey<String> DIRECT_PATH_USED_KEY = | ||
AttributeKey.stringKey("directpath_used"); | ||
|
||
public static final Set<AttributeKey> COMMON_ATTRIBUTES = | ||
ImmutableSet.of( | ||
PROJECT_ID_KEY, | ||
INSTANCE_ID_KEY, | ||
LOCATION_ID_KEY, | ||
INSTANCE_CONFIG_ID_KEY, | ||
CLIENT_UID_KEY, | ||
METHOD_KEY, | ||
STATUS_KEY, | ||
DATABASE_KEY, | ||
CLIENT_NAME_KEY, | ||
DIRECT_PATH_ENABLED_KEY, | ||
DIRECT_PATH_USED_KEY); | ||
} |
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
@@ -0,0 +1,236 @@ | ||||
/* | ||||
* Copyright 2024 Google LLC | ||||
* | ||||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||||
* you may not use this file except in compliance with the License. | ||||
* You may obtain a copy of the License at | ||||
* | ||||
* http://www.apache.org/licenses/LICENSE-2.0 | ||||
* | ||||
* Unless required by applicable law or agreed to in writing, software | ||||
* distributed under the License is distributed on an "AS IS" BASIS, | ||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
* See the License for the specific language governing permissions and | ||||
* limitations under the License. | ||||
*/ | ||||
|
||||
package com.google.cloud.spanner; | ||||
|
||||
import static com.google.cloud.spanner.BuiltInMetricsConstant.SPANNER_METRICS; | ||||
|
||||
import com.google.api.core.ApiFuture; | ||||
import com.google.api.core.ApiFutureCallback; | ||||
import com.google.api.core.ApiFutures; | ||||
import com.google.api.gax.core.CredentialsProvider; | ||||
import com.google.api.gax.core.FixedCredentialsProvider; | ||||
import com.google.api.gax.core.NoCredentialsProvider; | ||||
import com.google.api.gax.rpc.PermissionDeniedException; | ||||
import com.google.auth.Credentials; | ||||
import com.google.cloud.monitoring.v3.MetricServiceClient; | ||||
import com.google.cloud.monitoring.v3.MetricServiceSettings; | ||||
import com.google.common.annotations.VisibleForTesting; | ||||
import com.google.common.base.MoreObjects; | ||||
import com.google.common.collect.Iterables; | ||||
import com.google.common.util.concurrent.MoreExecutors; | ||||
import com.google.monitoring.v3.CreateTimeSeriesRequest; | ||||
import com.google.monitoring.v3.ProjectName; | ||||
import com.google.monitoring.v3.TimeSeries; | ||||
import com.google.protobuf.Empty; | ||||
import io.opentelemetry.sdk.common.CompletableResultCode; | ||||
import io.opentelemetry.sdk.metrics.InstrumentType; | ||||
import io.opentelemetry.sdk.metrics.data.AggregationTemporality; | ||||
import io.opentelemetry.sdk.metrics.data.MetricData; | ||||
import io.opentelemetry.sdk.metrics.export.MetricExporter; | ||||
import java.io.IOException; | ||||
import java.util.ArrayList; | ||||
import java.util.Collection; | ||||
import java.util.List; | ||||
import java.util.concurrent.atomic.AtomicBoolean; | ||||
import java.util.logging.Level; | ||||
import java.util.logging.Logger; | ||||
import java.util.stream.Collectors; | ||||
import javax.annotation.Nullable; | ||||
import org.threeten.bp.Duration; | ||||
|
||||
/** | ||||
* Spanner Cloud Monitoring OpenTelemetry Exporter. | ||||
* | ||||
* <p>The exporter will look for all spanner owned metrics under spanner.googleapis.com | ||||
* instrumentation scope and upload it via the Google Cloud Monitoring API. | ||||
*/ | ||||
class SpannerCloudMonitoringExporter implements MetricExporter { | ||||
|
||||
private static final Logger logger = | ||||
Logger.getLogger(SpannerCloudMonitoringExporter.class.getName()); | ||||
|
||||
// This system property can be used to override the monitoring endpoint | ||||
// to a different environment. It's meant for internal testing only. | ||||
private static final String MONITORING_ENDPOINT = | ||||
surbhigarg92 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
MoreObjects.firstNonNull( | ||||
System.getProperty("spanner.test-monitoring-endpoint"), | ||||
MetricServiceSettings.getDefaultEndpoint()); | ||||
|
||||
// This the quota limit from Cloud Monitoring. More details in | ||||
// https://cloud.google.com/monitoring/quotas#custom_metrics_quotas. | ||||
private static final int EXPORT_BATCH_SIZE_LIMIT = 200; | ||||
private final AtomicBoolean spannerExportFailureLogged = new AtomicBoolean(false); | ||||
private CompletableResultCode lastExportCode; | ||||
private final MetricServiceClient client; | ||||
private final String spannerProjectId; | ||||
|
||||
static SpannerCloudMonitoringExporter create(String projectId, @Nullable Credentials credentials) | ||||
throws IOException { | ||||
MetricServiceSettings.Builder settingsBuilder = MetricServiceSettings.newBuilder(); | ||||
CredentialsProvider credentialsProvider; | ||||
if (credentials == null) { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Normally, when a user has not set any credentials, we will use the default credentials found in the environment. Is that what will normally happen here as well? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My plan was to use getCredentials when I call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean this method: Line 1439 in bee79af
That method is only usable in case the user uses the Connection API (which no one uses directly, meaning it is only usable in combination with the JDBC driver). But you could use the same type of logic and/or move that logic to a place where it is usable directly in the client library. Would it maybe make more sense to make the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, not getCredentials . We will use Keeping it Nullable for now, this was referred from the current user facing exporter. |
||||
credentialsProvider = NoCredentialsProvider.create(); | ||||
} else { | ||||
credentialsProvider = FixedCredentialsProvider.create(credentials); | ||||
} | ||||
settingsBuilder.setCredentialsProvider(credentialsProvider); | ||||
settingsBuilder.setEndpoint(MONITORING_ENDPOINT); | ||||
surbhigarg92 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
|
||||
org.threeten.bp.Duration timeout = Duration.ofMinutes(1); | ||||
// TODO: createServiceTimeSeries needs special handling if the request failed. Leaving | ||||
// it as not retried for now. | ||||
settingsBuilder.createServiceTimeSeriesSettings().setSimpleTimeoutNoRetries(timeout); | ||||
|
||||
return new SpannerCloudMonitoringExporter( | ||||
projectId, MetricServiceClient.create(settingsBuilder.build())); | ||||
} | ||||
|
||||
@VisibleForTesting | ||||
SpannerCloudMonitoringExporter(String projectId, MetricServiceClient client) { | ||||
this.client = client; | ||||
this.spannerProjectId = projectId; | ||||
} | ||||
|
||||
@Override | ||||
public CompletableResultCode export(Collection<MetricData> collection) { | ||||
if (client.isShutdown()) { | ||||
logger.log(Level.WARNING, "Exporter is shut down"); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: "Exporter was already shut down" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also I think it is an error to try to use an already shutdown exporter, hence perhaps change the log level to Error? |
||||
return CompletableResultCode.ofFailure(); | ||||
} | ||||
|
||||
this.lastExportCode = exportSpannerClientMetrics(collection); | ||||
return lastExportCode; | ||||
} | ||||
|
||||
/** Export client built in metrics */ | ||||
private CompletableResultCode exportSpannerClientMetrics(Collection<MetricData> collection) { | ||||
// Filter spanner metrics | ||||
List<MetricData> spannerMetricData = | ||||
collection.stream() | ||||
.filter(md -> SPANNER_METRICS.contains(md.getName())) | ||||
surbhigarg92 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
.collect(Collectors.toList()); | ||||
|
||||
// Skips exporting if there's none | ||||
if (spannerMetricData.isEmpty()) { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Useful to perform an info log so that debugging will reveal truly that .isEmpty() and that data wasn't exported, instead of one scratching their head wondering why metrics weren't exported. |
||||
return CompletableResultCode.ofSuccess(); | ||||
} | ||||
|
||||
// Verifies metrics project id is the same as the spanner project id set on this client | ||||
if (!spannerMetricData.stream() | ||||
.flatMap(metricData -> metricData.getData().getPoints().stream()) | ||||
.allMatch( | ||||
pd -> spannerProjectId.equals(SpannerCloudMonitoringExporterUtils.getProjectId(pd)))) { | ||||
logger.log(Level.WARNING, "Metric data has a different projectId. Skipping export."); | ||||
return CompletableResultCode.ofFailure(); | ||||
} | ||||
|
||||
List<TimeSeries> spannerTimeSeries; | ||||
try { | ||||
spannerTimeSeries = | ||||
SpannerCloudMonitoringExporterUtils.convertToSpannerTimeSeries(spannerMetricData); | ||||
} catch (Throwable e) { | ||||
logger.log( | ||||
Level.WARNING, | ||||
"Failed to convert spanner metric data to cloud monitoring timeseries.", | ||||
e); | ||||
return CompletableResultCode.ofFailure(); | ||||
} | ||||
|
||||
ProjectName projectName = ProjectName.of(spannerProjectId); | ||||
|
||||
ApiFuture<List<Empty>> futureList = exportTimeSeriesInBatch(projectName, spannerTimeSeries); | ||||
|
||||
CompletableResultCode spannerExportCode = new CompletableResultCode(); | ||||
ApiFutures.addCallback( | ||||
futureList, | ||||
new ApiFutureCallback<List<Empty>>() { | ||||
@Override | ||||
public void onFailure(Throwable throwable) { | ||||
if (spannerExportFailureLogged.compareAndSet(false, true)) { | ||||
String msg = "createServiceTimeSeries request failed for spanner metrics."; | ||||
if (throwable instanceof PermissionDeniedException) { | ||||
surbhigarg92 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
// TODO: Add the link of public documentation when available in the log message. | ||||
msg += | ||||
String.format( | ||||
" Need monitoring metric writer permission on project=%s.", | ||||
projectName.getProject()); | ||||
} | ||||
logger.log(Level.WARNING, msg, throwable); | ||||
} | ||||
spannerExportCode.fail(); | ||||
} | ||||
|
||||
@Override | ||||
public void onSuccess(List<Empty> empty) { | ||||
// When an export succeeded reset the export failure flag to false so if there's a | ||||
// transient failure it'll be logged. | ||||
spannerExportFailureLogged.set(false); | ||||
spannerExportCode.succeed(); | ||||
} | ||||
}, | ||||
MoreExecutors.directExecutor()); | ||||
|
||||
return spannerExportCode; | ||||
} | ||||
|
||||
private ApiFuture<List<Empty>> exportTimeSeriesInBatch( | ||||
ProjectName projectName, List<TimeSeries> timeSeries) { | ||||
List<ApiFuture<Empty>> batchResults = new ArrayList<>(); | ||||
|
||||
for (List<TimeSeries> batch : Iterables.partition(timeSeries, EXPORT_BATCH_SIZE_LIMIT)) { | ||||
CreateTimeSeriesRequest req = | ||||
CreateTimeSeriesRequest.newBuilder() | ||||
.setName(projectName.toString()) | ||||
.addAllTimeSeries(batch) | ||||
.build(); | ||||
batchResults.add(this.client.createServiceTimeSeriesCallable().futureCall(req)); | ||||
} | ||||
|
||||
return ApiFutures.allAsList(batchResults); | ||||
} | ||||
|
||||
@Override | ||||
public CompletableResultCode flush() { | ||||
return CompletableResultCode.ofSuccess(); | ||||
} | ||||
|
||||
@Override | ||||
public CompletableResultCode shutdown() { | ||||
if (client.isShutdown()) { | ||||
logger.log(Level.WARNING, "shutdown is called multiple times"); | ||||
return CompletableResultCode.ofSuccess(); | ||||
} | ||||
CompletableResultCode shutdownResult = new CompletableResultCode(); | ||||
try { | ||||
client.shutdown(); | ||||
shutdownResult.succeed(); | ||||
} catch (Throwable e) { | ||||
logger.log(Level.WARNING, "failed to shutdown the monitoring client", e); | ||||
shutdownResult.fail(); | ||||
} | ||||
return shutdownResult; | ||||
} | ||||
|
||||
/** | ||||
* For Google Cloud Monitoring always return CUMULATIVE to keep track of the cumulative value of a | ||||
* metric over time. | ||||
*/ | ||||
@Override | ||||
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) { | ||||
return AggregationTemporality.CUMULATIVE; | ||||
} | ||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, iirc the
project_id
MonitoredResource label must always match the project the metrics are being written to, and so it is unnecessary to send it. We've taken this approach in our exporters since there is no benefit in sending it