Skip to content

Commit c65596b

Browse files
authored
Run TransportGetWatcherSettingsAction on local node (#122857)
This action solely needs the cluster state, it can run on any node. Additionally, it needs to be cancellable to avoid doing unnecessary work after a client failure or timeout. Relates #101805
1 parent 449cc29 commit c65596b

File tree

4 files changed

+56
-23
lines changed

4 files changed

+56
-23
lines changed

docs/changelog/122857.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 122857
2+
summary: Run `TransportGetWatcherSettingsAction` on local node
3+
area: Watcher
4+
type: enhancement
5+
issues: []

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/put/GetWatcherSettingsAction.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,20 @@
1111
import org.elasticsearch.action.ActionRequestValidationException;
1212
import org.elasticsearch.action.ActionResponse;
1313
import org.elasticsearch.action.ActionType;
14-
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
14+
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
1515
import org.elasticsearch.common.io.stream.StreamInput;
1616
import org.elasticsearch.common.io.stream.StreamOutput;
1717
import org.elasticsearch.common.settings.Settings;
1818
import org.elasticsearch.core.TimeValue;
19+
import org.elasticsearch.core.UpdateForV10;
20+
import org.elasticsearch.tasks.CancellableTask;
21+
import org.elasticsearch.tasks.Task;
22+
import org.elasticsearch.tasks.TaskId;
1923
import org.elasticsearch.xcontent.ToXContentObject;
2024
import org.elasticsearch.xcontent.XContentBuilder;
2125

2226
import java.io.IOException;
27+
import java.util.Map;
2328

2429
public class GetWatcherSettingsAction extends ActionType<GetWatcherSettingsAction.Response> {
2530

@@ -30,12 +35,17 @@ public GetWatcherSettingsAction() {
3035
super(NAME);
3136
}
3237

33-
public static class Request extends MasterNodeReadRequest<Request> {
38+
public static class Request extends LocalClusterStateRequest {
3439

3540
public Request(TimeValue masterNodeTimeout) {
3641
super(masterNodeTimeout);
3742
}
3843

44+
/**
45+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
46+
* we no longer need to support calling this action remotely.
47+
*/
48+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
3949
public static Request readFrom(StreamInput in) throws IOException {
4050
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
4151
return new Request(in);
@@ -49,15 +59,13 @@ private Request(StreamInput in) throws IOException {
4959
}
5060

5161
@Override
52-
public void writeTo(StreamOutput out) throws IOException {
53-
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
54-
super.writeTo(out);
55-
}
62+
public ActionRequestValidationException validate() {
63+
return null;
5664
}
5765

5866
@Override
59-
public ActionRequestValidationException validate() {
60-
return null;
67+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
68+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
6169
}
6270
}
6371

@@ -69,10 +77,11 @@ public Response(Settings settings) {
6977
this.settings = settings;
7078
}
7179

72-
public Response(StreamInput in) throws IOException {
73-
this.settings = Settings.readSettingsFromStream(in);
74-
}
75-
80+
/**
81+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
82+
* we no longer need to support calling this action remotely.
83+
*/
84+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
7685
@Override
7786
public void writeTo(StreamOutput out) throws IOException {
7887
this.settings.writeTo(out);

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestGetWatcherSettingsAction.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.rest.BaseRestHandler;
1212
import org.elasticsearch.rest.RestRequest;
1313
import org.elasticsearch.rest.RestUtils;
14+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
1415
import org.elasticsearch.rest.action.RestToXContentListener;
1516
import org.elasticsearch.xpack.core.watcher.transport.actions.put.GetWatcherSettingsAction;
1617

@@ -37,6 +38,10 @@ public List<Route> routes() {
3738
@Override
3839
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
3940
GetWatcherSettingsAction.Request req = new GetWatcherSettingsAction.Request(RestUtils.getMasterNodeTimeout(request));
40-
return channel -> client.execute(GetWatcherSettingsAction.INSTANCE, req, new RestToXContentListener<>(channel));
41+
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
42+
GetWatcherSettingsAction.INSTANCE,
43+
req,
44+
new RestToXContentListener<>(channel)
45+
);
4146
}
4247
}

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/TransportGetWatcherSettingsAction.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99

1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.support.ActionFilters;
12-
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
12+
import org.elasticsearch.action.support.ChannelActionListener;
13+
import org.elasticsearch.action.support.local.TransportLocalClusterStateAction;
1314
import org.elasticsearch.cluster.ClusterState;
1415
import org.elasticsearch.cluster.block.ClusterBlockException;
1516
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@@ -18,9 +19,10 @@
1819
import org.elasticsearch.cluster.service.ClusterService;
1920
import org.elasticsearch.common.settings.Settings;
2021
import org.elasticsearch.common.util.concurrent.EsExecutors;
22+
import org.elasticsearch.core.UpdateForV10;
2123
import org.elasticsearch.injection.guice.Inject;
24+
import org.elasticsearch.tasks.CancellableTask;
2225
import org.elasticsearch.tasks.Task;
23-
import org.elasticsearch.threadpool.ThreadPool;
2426
import org.elasticsearch.transport.TransportService;
2527
import org.elasticsearch.xpack.core.watcher.transport.actions.put.GetWatcherSettingsAction;
2628

@@ -30,40 +32,52 @@
3032
import static org.elasticsearch.xpack.watcher.transport.actions.TransportUpdateWatcherSettingsAction.WATCHER_INDEX_NAME;
3133
import static org.elasticsearch.xpack.watcher.transport.actions.TransportUpdateWatcherSettingsAction.WATCHER_INDEX_REQUEST;
3234

33-
public class TransportGetWatcherSettingsAction extends TransportMasterNodeAction<
35+
public class TransportGetWatcherSettingsAction extends TransportLocalClusterStateAction<
3436
GetWatcherSettingsAction.Request,
3537
GetWatcherSettingsAction.Response> {
3638

3739
private final IndexNameExpressionResolver indexNameExpressionResolver;
3840

41+
/**
42+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
43+
* we no longer need to support calling this action remotely.
44+
*/
45+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
46+
@SuppressWarnings("this-escape")
3947
@Inject
4048
public TransportGetWatcherSettingsAction(
4149
TransportService transportService,
4250
ClusterService clusterService,
43-
ThreadPool threadPool,
4451
ActionFilters actionFilters,
4552
IndexNameExpressionResolver indexNameExpressionResolver
4653
) {
4754
super(
4855
GetWatcherSettingsAction.NAME,
49-
transportService,
50-
clusterService,
51-
threadPool,
5256
actionFilters,
53-
GetWatcherSettingsAction.Request::readFrom,
54-
GetWatcherSettingsAction.Response::new,
57+
transportService.getTaskManager(),
58+
clusterService,
5559
EsExecutors.DIRECT_EXECUTOR_SERVICE
5660
);
5761
this.indexNameExpressionResolver = indexNameExpressionResolver;
62+
63+
transportService.registerRequestHandler(
64+
actionName,
65+
executor,
66+
false,
67+
true,
68+
GetWatcherSettingsAction.Request::readFrom,
69+
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
70+
);
5871
}
5972

6073
@Override
61-
protected void masterOperation(
74+
protected void localClusterStateOperation(
6275
Task task,
6376
GetWatcherSettingsAction.Request request,
6477
ClusterState state,
6578
ActionListener<GetWatcherSettingsAction.Response> listener
6679
) {
80+
((CancellableTask) task).ensureNotCancelled();
6781
IndexMetadata metadata = state.metadata().index(WATCHER_INDEX_NAME);
6882
if (metadata == null) {
6983
listener.onResponse(new GetWatcherSettingsAction.Response(Settings.EMPTY));

0 commit comments

Comments
 (0)