Skip to content

Commit 3a087e4

Browse files
authored
Enhance the DSL error store to capture more error info (#101475)
This enhances the DSL error store to caputre more information about each error, namely: the initial encounter timestamp, the current occurrence timestamp and the retry count. This also introduces a new setting, `data_streams.lifecycle.signalling.error_retry_interval` to control when we emit an `error` log entry. It defaults to 10 retry counts, namely, if the error for an index remains the same for 10 consecutive DSL runs, we log a message at level `error`. This also exposes all the new error information as part of the `_lifecycle/explain` API.
1 parent 93b69a9 commit 3a087e4

File tree

16 files changed

+365
-59
lines changed

16 files changed

+365
-59
lines changed

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
2828
import org.elasticsearch.action.datastreams.GetDataStreamAction;
2929
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
30+
import org.elasticsearch.action.datastreams.lifecycle.ErrorEntry;
3031
import org.elasticsearch.action.datastreams.lifecycle.ExplainIndexDataStreamLifecycle;
3132
import org.elasticsearch.action.index.IndexRequest;
3233
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
@@ -405,7 +406,7 @@ public void testErrorRecordingOnRollover() throws Exception {
405406

406407
assertBusy(() -> {
407408
String writeIndexName = getBackingIndices(dataStreamName).get(1);
408-
String writeIndexRolloverError = null;
409+
ErrorEntry writeIndexRolloverError = null;
409410
Iterable<DataStreamLifecycleService> lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class);
410411

411412
for (DataStreamLifecycleService lifecycleService : lifecycleServices) {
@@ -416,7 +417,7 @@ public void testErrorRecordingOnRollover() throws Exception {
416417
}
417418

418419
assertThat(writeIndexRolloverError, is(notNullValue()));
419-
assertThat(writeIndexRolloverError, containsString("maximum normal shards open"));
420+
assertThat(writeIndexRolloverError.error(), containsString("maximum normal shards open"));
420421
});
421422

422423
// let's reset the cluster max shards per node limit to allow rollover to proceed and check the error store is empty
@@ -497,7 +498,7 @@ public void testErrorRecordingOnRetention() throws Exception {
497498
String writeIndex = backingIndices.get(1).getName();
498499
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
499500

500-
String recordedRetentionExecutionError = null;
501+
ErrorEntry recordedRetentionExecutionError = null;
501502
Iterable<DataStreamLifecycleService> lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class);
502503

503504
for (DataStreamLifecycleService lifecycleService : lifecycleServices) {
@@ -508,7 +509,7 @@ public void testErrorRecordingOnRetention() throws Exception {
508509
}
509510

510511
assertThat(recordedRetentionExecutionError, is(notNullValue()));
511-
assertThat(recordedRetentionExecutionError, containsString("blocked by: [FORBIDDEN/5/index read-only (api)"));
512+
assertThat(recordedRetentionExecutionError.error(), containsString("blocked by: [FORBIDDEN/5/index read-only (api)"));
512513
});
513514

514515
// let's mark the index as writeable and make sure it's deleted and the error store is empty

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/ExplainDataStreamLifecycleIT.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import static org.elasticsearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
5050
import static org.hamcrest.Matchers.containsString;
5151
import static org.hamcrest.Matchers.equalTo;
52+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
5253
import static org.hamcrest.Matchers.is;
5354
import static org.hamcrest.Matchers.notNullValue;
5455
import static org.hamcrest.Matchers.nullValue;
@@ -252,7 +253,9 @@ public void testExplainLifecycleForIndicesWithErrors() throws Exception {
252253
// index has not been rolled over yet
253254
assertThat(explainIndex.getGenerationTime(System::currentTimeMillis), nullValue());
254255

255-
assertThat(explainIndex.getError(), containsString("maximum normal shards open"));
256+
assertThat(explainIndex.getError(), notNullValue());
257+
assertThat(explainIndex.getError().error(), containsString("maximum normal shards open"));
258+
assertThat(explainIndex.getError().retryCount(), greaterThanOrEqualTo(1));
256259
}
257260
});
258261

modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ public List<Setting<?>> getSettings() {
141141
pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING);
142142
pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING);
143143
pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING);
144+
pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING);
144145
return pluginSettings;
145146
}
146147

@@ -155,7 +156,7 @@ public Collection<?> createComponents(PluginServices services) {
155156
);
156157
this.updateTimeSeriesRangeService.set(updateTimeSeriesRangeService);
157158
components.add(this.updateTimeSeriesRangeService.get());
158-
errorStoreInitialisationService.set(new DataStreamLifecycleErrorStore());
159+
errorStoreInitialisationService.set(new DataStreamLifecycleErrorStore(services.threadPool()::absoluteTimeInMillis));
159160
dataLifecycleInitialisationService.set(
160161
new DataStreamLifecycleService(
161162
settings,

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStore.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@
99
package org.elasticsearch.datastreams.lifecycle;
1010

1111
import org.elasticsearch.ElasticsearchException;
12+
import org.elasticsearch.action.datastreams.lifecycle.ErrorEntry;
1213
import org.elasticsearch.common.Strings;
1314
import org.elasticsearch.core.Nullable;
1415

1516
import java.util.List;
1617
import java.util.concurrent.ConcurrentHashMap;
1718
import java.util.concurrent.ConcurrentMap;
19+
import java.util.function.LongSupplier;
1820

1921
import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
2022

@@ -26,7 +28,12 @@
2628
public class DataStreamLifecycleErrorStore {
2729

2830
public static final int MAX_ERROR_MESSAGE_LENGTH = 1000;
29-
private final ConcurrentMap<String, String> indexNameToError = new ConcurrentHashMap<>();
31+
private final ConcurrentMap<String, ErrorEntry> indexNameToError = new ConcurrentHashMap<>();
32+
private final LongSupplier nowSupplier;
33+
34+
public DataStreamLifecycleErrorStore(LongSupplier nowSupplier) {
35+
this.nowSupplier = nowSupplier;
36+
}
3037

3138
/**
3239
* Records a string representation of the provided exception for the provided index.
@@ -35,13 +42,24 @@ public class DataStreamLifecycleErrorStore {
3542
* Returns the previously recorded error for the provided index, or null otherwise.
3643
*/
3744
@Nullable
38-
public String recordError(String indexName, Exception e) {
45+
public ErrorEntry recordError(String indexName, Exception e) {
3946
String exceptionToString = Strings.toString((builder, params) -> {
4047
ElasticsearchException.generateThrowableXContent(builder, EMPTY_PARAMS, e);
4148
return builder;
4249
});
43-
String recordedError = Strings.substring(exceptionToString, 0, MAX_ERROR_MESSAGE_LENGTH);
44-
return indexNameToError.put(indexName, recordedError);
50+
String newError = Strings.substring(exceptionToString, 0, MAX_ERROR_MESSAGE_LENGTH);
51+
ErrorEntry existingError = indexNameToError.get(indexName);
52+
long recordedTimestamp = nowSupplier.getAsLong();
53+
if (existingError == null) {
54+
indexNameToError.put(indexName, new ErrorEntry(recordedTimestamp, newError, recordedTimestamp, 0));
55+
} else {
56+
if (existingError.error().equals(newError)) {
57+
indexNameToError.put(indexName, ErrorEntry.incrementRetryCount(existingError, nowSupplier));
58+
} else {
59+
indexNameToError.put(indexName, new ErrorEntry(recordedTimestamp, newError, recordedTimestamp, 0));
60+
}
61+
}
62+
return existingError;
4563
}
4664

4765
/**
@@ -62,7 +80,7 @@ public void clearStore() {
6280
* Retrieves the recorded error for the provided index.
6381
*/
6482
@Nullable
65-
public String getError(String indexName) {
83+
public ErrorEntry getError(String indexName) {
6684
return indexNameToError.get(indexName);
6785
}
6886

0 commit comments

Comments
 (0)