Skip to content

Run TransportGetMappingsAction on local node #122921

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/122921.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 122921
summary: Run `TransportGetMappingsAction` on local node
area: Indices APIs
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
import org.elasticsearch.action.admin.indices.recovery.RecoveryAction;
import org.elasticsearch.action.admin.indices.template.get.GetComponentTemplateAction;
import org.elasticsearch.action.admin.indices.template.get.GetComposableIndexTemplateAction;
Expand Down Expand Up @@ -108,6 +109,11 @@ public void testGetPipelineCancellation() {
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_ingest/pipeline"), GetPipelineAction.NAME);
}

public void testGetMappingsCancellation() {
createIndex("test");
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/test/_mappings"), GetMappingsAction.NAME);
}

private void runRestActionCancellationTest(Request request, String actionName) {
final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import org.apache.http.client.methods.HttpGet;
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Cancellable;
Expand Down Expand Up @@ -41,10 +40,6 @@
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
public class RestClusterInfoActionCancellationIT extends HttpSmokeTestCase {

public void testGetMappingsCancellation() throws Exception {
runTest(GetMappingsAction.NAME, "/test/_mappings");
}

public void testGetIndicesCancellation() throws Exception {
runTest(GetIndexAction.NAME, "/test");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,12 @@
},
"master_timeout":{
"type":"time",
"description":"Specify timeout for connection to master"
"description":"Timeout for waiting for new cluster state in case it is blocked"
},
"local":{
"type":"boolean",
"description":"Return local information, do not retrieve the state from master node (default: false)",
"deprecated":{
"version":"7.8.0",
"description":"This parameter is a no-op and field mappings are always retrieved locally."
}
"deprecated":true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there a specific reason to remove the description and version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tbh, the reason is that I didn't add a description and version to any of the actions I converted similarly already because I didn't know they existed. It didn't seem worth it to go back and update all the previous ones to include a description and version, so I went for consistency here. Do you think it is worth to go back and update the previous ones to specify a description and version?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No that's fine, especially as we're trying to move away from manually editing rest-api-spec. Thanks for the explanation!

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAction;
import org.elasticsearch.action.admin.indices.open.OpenIndexAction;
Expand Down Expand Up @@ -516,16 +514,6 @@ public void testDeleteIndex() {
assertSameIndices(deleteIndexRequest, TransportDeleteIndexAction.TYPE.name());
}

public void testGetMappings() {
interceptTransportActions(GetMappingsAction.NAME);

GetMappingsRequest getMappingsRequest = new GetMappingsRequest(TEST_REQUEST_TIMEOUT).indices(randomIndicesOrAliases());
internalCluster().coordOnlyNodeClient().admin().indices().getMappings(getMappingsRequest).actionGet();

clearInterceptedActions();
assertSameIndices(getMappingsRequest, GetMappingsAction.NAME);
}

public void testPutMapping() {
interceptTransportActions(TransportPutMappingAction.TYPE.name());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ public void testLargeClusterStatePublishing() throws Exception {
MappingMetadata mappingMetadata = client.admin()
.indices()
.prepareGetMappings(TEST_REQUEST_TIMEOUT, "test")
.setLocal(true)
.get()
.getMappings()
.get("test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,44 @@

package org.elasticsearch.action.admin.indices.mapping.get;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.info.ClusterInfoRequest;
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.Map;

public class GetMappingsRequest extends ClusterInfoRequest<GetMappingsRequest> {
public class GetMappingsRequest extends LocalClusterStateRequest implements IndicesRequest.Replaceable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to #122885, this also gets rid of the ClusterInfo abstraction.


private String[] indices = Strings.EMPTY_ARRAY;
private IndicesOptions indicesOptions;

public GetMappingsRequest(TimeValue masterTimeout) {
super(masterTimeout, IndicesOptions.strictExpandOpen());
super(masterTimeout);
indicesOptions = IndicesOptions.strictExpandOpen();
}

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
public GetMappingsRequest(StreamInput in) throws IOException {
super(in);
indices = in.readStringArray();
if (in.getTransportVersion().before(TransportVersions.V_8_0_0)) {
in.readStringArray();
}
indicesOptions = IndicesOptions.readIndicesOptions(in);
}

@Override
Expand All @@ -40,4 +58,30 @@ public ActionRequestValidationException validate() {
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "", parentTaskId, headers);
}

@Override
public GetMappingsRequest indices(String... indices) {
this.indices = indices;
return this;
}

public GetMappingsRequest indicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
return this;
}

@Override
public String[] indices() {
return indices;
}

@Override
public IndicesOptions indicesOptions() {
return indicesOptions;
}

@Override
public boolean includeDataStreams() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,30 @@

package org.elasticsearch.action.admin.indices.mapping.get;

import org.elasticsearch.action.support.master.info.ClusterInfoRequestBuilder;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.ElasticsearchClient;
import org.elasticsearch.common.util.ArrayUtils;
import org.elasticsearch.core.TimeValue;

public class GetMappingsRequestBuilder extends ClusterInfoRequestBuilder<
GetMappingsRequest,
GetMappingsResponse,
GetMappingsRequestBuilder> {
public class GetMappingsRequestBuilder extends ActionRequestBuilder<GetMappingsRequest, GetMappingsResponse> {

public GetMappingsRequestBuilder(ElasticsearchClient client, TimeValue masterTimeout, String... indices) {
super(client, GetMappingsAction.INSTANCE, new GetMappingsRequest(masterTimeout).indices(indices));
}

public GetMappingsRequestBuilder setIndices(String... indices) {
request.indices(indices);
return this;
}

public GetMappingsRequestBuilder addIndices(String... indices) {
request.indices(ArrayUtils.concat(request.indices(), indices));
return this;
}

public GetMappingsRequestBuilder setIndicesOptions(IndicesOptions indicesOptions) {
request.indicesOptions(indicesOptions);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@

package org.elasticsearch.action.admin.indices.mapping.get;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;

Expand All @@ -35,21 +33,6 @@ public GetMappingsResponse(Map<String, MappingMetadata> mappings) {
this.mappings = mappings;
}

GetMappingsResponse(StreamInput in) throws IOException {
super(in);
mappings = in.readImmutableMap(in.getTransportVersion().before(TransportVersions.V_8_0_0) ? i -> {
int mappingCount = i.readVInt();
assert mappingCount == 1 || mappingCount == 0 : "Expected 0 or 1 mappings but got " + mappingCount;
if (mappingCount == 1) {
String type = i.readString();
assert MapperService.SINGLE_MAPPING_NAME.equals(type) : "Expected type [_doc] but got [" + type + "]";
return new MappingMetadata(i);
} else {
return MappingMetadata.EMPTY_MAPPINGS;
}
} : i -> i.readBoolean() ? new MappingMetadata(i) : MappingMetadata.EMPTY_MAPPINGS);
}

public Map<String, MappingMetadata> mappings() {
return mappings;
}
Expand All @@ -58,6 +41,11 @@ public Map<String, MappingMetadata> getMappings() {
return mappings();
}

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@Override
public void writeTo(StreamOutput out) throws IOException {
MappingMetadata.writeMappingMetadata(out, mappings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.info.TransportClusterInfoAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.CancellableTask;
Expand All @@ -28,12 +32,19 @@

import java.util.Map;

public class TransportGetMappingsAction extends TransportClusterInfoAction<GetMappingsRequest, GetMappingsResponse> {
public class TransportGetMappingsAction extends TransportLocalProjectMetadataAction<GetMappingsRequest, GetMappingsResponse> {

private static final Logger logger = LogManager.getLogger(TransportGetMappingsAction.class);

private final IndicesService indicesService;
private final IndexNameExpressionResolver indexNameExpressionResolver;

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@SuppressWarnings("this-escape")
@Inject
public TransportGetMappingsAction(
TransportService transportService,
Expand All @@ -46,28 +57,46 @@ public TransportGetMappingsAction(
) {
super(
GetMappingsAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
GetMappingsRequest::new,
indexNameExpressionResolver,
GetMappingsResponse::new,
transportService.getTaskManager(),
clusterService,
threadPool.executor(ThreadPool.Names.MANAGEMENT),
projectResolver
);
this.indicesService = indicesService;
this.indexNameExpressionResolver = indexNameExpressionResolver;

transportService.registerRequestHandler(
actionName,
executor,
false,
true,
GetMappingsRequest::new,
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
);
}

@Override
protected ClusterBlockException checkBlock(GetMappingsRequest request, ProjectState state) {
return state.blocks()
.indicesBlockedException(
state.projectId(),
ClusterBlockLevel.METADATA_READ,
indexNameExpressionResolver.concreteIndexNames(state.metadata(), request)
);
}

@Override
protected void doMasterOperation(
protected void localClusterStateOperation(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is missing a call to ((CancellableTask) task).ensureNotCancelled(); somewhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(and perhaps add a test for it)

I already added a test:

public void testGetMappingsCancellation() {
createIndex("test");
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/test/_mappings"), GetMappingsAction.NAME);
}

I think the reason the test isn't failing is that we already check for the task cancellation right before we execute this method:
if (task instanceof CancellableTask cancellableTask && cancellableTask.notifyIfCancelled(listener)) {
return;
}

In other words, all the ((CancellableTask) task).ensureNotCancelled()s I've added in the other classes aren't super valuable (in most places). They're not hurting anything, but they're not super valuable either.

I'll add it to this method too, for consistency, but let me know what you think about all this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was viewing the ((CancellableTask) task).ensureNotCancelled() call as a way to do a check prior to some (semi-)expensive work, to avoid doing it if it was going to be thrown away anyway. It seems like it would still have some value there, though only a little, as you say.

I'm fine either way, so I'll leave it to you whether to include them or not.

Task task,
final GetMappingsRequest request,
String[] concreteIndices,
final ClusterState state,
final ProjectState state,
final ActionListener<GetMappingsResponse> listener
) {
logger.trace("serving getMapping request based on version {}", state.version());
final Map<String, MappingMetadata> mappings = projectResolver.getProjectMetadata(state)
((CancellableTask) task).ensureNotCancelled();
logger.trace("serving getMapping request based on version {}", state.cluster().version());
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(state.metadata(), request);
final Map<String, MappingMetadata> mappings = state.metadata()
.findMappings(concreteIndices, indicesService.getFieldFilter(), () -> checkCancellation(task));
listener.onResponse(new GetMappingsResponse(mappings));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
Expand All @@ -25,7 +26,6 @@
import java.util.List;

import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestUtils.getMasterNodeTimeout;

@ServerlessScope(Scope.PUBLIC)
public class RestGetMappingAction extends BaseRestHandler {
Expand All @@ -50,10 +50,10 @@ public String getName() {
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
final String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
final GetMappingsRequest getMappingsRequest = new GetMappingsRequest(getMasterNodeTimeout(request));
final GetMappingsRequest getMappingsRequest = new GetMappingsRequest(RestUtils.getMasterNodeTimeout(request));
getMappingsRequest.indices(indices);
getMappingsRequest.indicesOptions(IndicesOptions.fromRequest(request, getMappingsRequest.indicesOptions()));
getMappingsRequest.local(request.paramAsBoolean("local", getMappingsRequest.local()));
RestUtils.consumeDeprecatedLocalParameter(request);
final HttpChannel httpChannel = request.getHttpChannel();
return channel -> new RestCancellableNodeClient(client, httpChannel).admin()
.indices()
Expand Down
Loading