|
17 | 17 | import org.elasticsearch.action.ActionRequestValidationException;
|
18 | 18 | import org.elasticsearch.action.ActionResponse;
|
19 | 19 | import org.elasticsearch.action.ActionType;
|
20 |
| -import org.elasticsearch.action.DelegatingActionListener; |
21 | 20 | import org.elasticsearch.action.LatchedActionListener;
|
22 | 21 | import org.elasticsearch.action.LegacyActionRequest;
|
23 | 22 | import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
|
27 | 26 | import org.elasticsearch.action.support.PlainActionFuture;
|
28 | 27 | import org.elasticsearch.client.internal.node.NodeClient;
|
29 | 28 | import org.elasticsearch.cluster.node.DiscoveryNode;
|
30 |
| -import org.elasticsearch.common.Strings; |
31 | 29 | import org.elasticsearch.common.io.stream.StreamInput;
|
32 | 30 | import org.elasticsearch.common.io.stream.StreamOutput;
|
33 | 31 | import org.elasticsearch.common.util.CollectionUtils;
|
|
36 | 34 | import org.elasticsearch.common.util.set.Sets;
|
37 | 35 | import org.elasticsearch.core.TimeValue;
|
38 | 36 | import org.elasticsearch.injection.guice.Inject;
|
39 |
| -import org.elasticsearch.logging.LogManager; |
40 |
| -import org.elasticsearch.logging.Logger; |
41 | 37 | import org.elasticsearch.plugins.ActionPlugin;
|
42 | 38 | import org.elasticsearch.plugins.Plugin;
|
43 | 39 | import org.elasticsearch.tasks.CancellableTask;
|
|
47 | 43 | import org.elasticsearch.tasks.TaskInfo;
|
48 | 44 | import org.elasticsearch.tasks.TaskManager;
|
49 | 45 | import org.elasticsearch.test.ESIntegTestCase;
|
50 |
| -import org.elasticsearch.test.junit.annotations.TestIssueLogging; |
51 | 46 | import org.elasticsearch.threadpool.ThreadPool;
|
52 | 47 | import org.elasticsearch.transport.ReceiveTimeoutTransportException;
|
53 | 48 | import org.elasticsearch.transport.SendRequestTransportException;
|
|
79 | 74 | @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
|
80 | 75 | public class CancellableTasksIT extends ESIntegTestCase {
|
81 | 76 |
|
82 |
| - // Temporary addition for investigation into https://github.com/elastic/elasticsearch/issues/123568 |
83 |
| - private static final Logger logger = LogManager.getLogger(CancellableTasksIT.class); |
84 |
| - |
85 | 77 | static int idGenerator = 0;
|
86 | 78 | static final Map<TestRequest, CountDownLatch> beforeSendLatches = ConcurrentCollections.newConcurrentMap();
|
87 | 79 | static final Map<TestRequest, CountDownLatch> arrivedLatches = ConcurrentCollections.newConcurrentMap();
|
@@ -374,42 +366,18 @@ public void testRemoveBanParentsOnDisconnect() throws Exception {
|
374 | 366 | }
|
375 | 367 | }
|
376 | 368 |
|
377 |
| - @TestIssueLogging( |
378 |
| - issueUrl = "https://github.com/elastic/elasticsearch/issues/123568", |
379 |
| - value = "org.elasticsearch.transport.TransportService.tracer:TRACE" |
380 |
| - + ",org.elasticsearch.tasks.TaskManager:TRACE" |
381 |
| - + ",org.elasticsearch.action.admin.cluster.node.tasks.CancellableTasksIT:DEBUG" |
382 |
| - ) |
383 | 369 | public void testChildrenTasksCancelledOnTimeout() throws Exception {
|
384 | 370 | Set<DiscoveryNode> nodes = clusterService().state().nodes().stream().collect(Collectors.toSet());
|
385 | 371 | final TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 4), true);
|
386 |
| - logger.info("generated request\n{}", buildTestRequestDescription(rootRequest, "", new StringBuilder()).toString()); |
387 | 372 | ActionFuture<TestResponse> rootTaskFuture = client().execute(TransportTestAction.ACTION, rootRequest);
|
388 |
| - logger.info("action executed"); |
389 | 373 | allowEntireRequest(rootRequest);
|
390 |
| - logger.info("execution released"); |
391 | 374 | waitForRootTask(rootTaskFuture, true);
|
392 |
| - logger.info("root task completed"); |
393 | 375 | ensureBansAndCancellationsConsistency();
|
394 |
| - logger.info("ensureBansAndCancellationsConsistency completed"); |
395 | 376 |
|
396 | 377 | // Make sure all descendent requests have completed
|
397 | 378 | for (TestRequest subRequest : rootRequest.descendants()) {
|
398 |
| - logger.info("awaiting completion of request {}", subRequest.getDescription()); |
399 | 379 | safeAwait(completedLatches.get(subRequest));
|
400 | 380 | }
|
401 |
| - logger.info("all requests completed"); |
402 |
| - } |
403 |
| - |
404 |
| - // Temporary addition for investigation into https://github.com/elastic/elasticsearch/issues/123568 |
405 |
| - static StringBuilder buildTestRequestDescription(TestRequest request, String prefix, StringBuilder stringBuilder) { |
406 |
| - stringBuilder.append(prefix) |
407 |
| - .append(Strings.format("id=%d [timeout=%s] %s", request.id, request.timeout, request.node.descriptionWithoutAttributes())) |
408 |
| - .append('\n'); |
409 |
| - for (TestRequest subRequest : request.subRequests) { |
410 |
| - buildTestRequestDescription(subRequest, prefix + " ", stringBuilder); |
411 |
| - } |
412 |
| - return stringBuilder; |
413 | 381 | }
|
414 | 382 |
|
415 | 383 | static TaskId getRootTaskId(TestRequest request) throws Exception {
|
@@ -538,8 +506,6 @@ public void writeTo(StreamOutput out) throws IOException {
|
538 | 506 |
|
539 | 507 | public static class TransportTestAction extends HandledTransportAction<TestRequest, TestResponse> {
|
540 | 508 |
|
541 |
| - private static final Logger logger = CancellableTasksIT.logger; |
542 |
| - |
543 | 509 | public static ActionType<TestResponse> ACTION = new ActionType<>("internal::test_action");
|
544 | 510 | private final TransportService transportService;
|
545 | 511 | private final NodeClient client;
|
@@ -599,22 +565,7 @@ protected void doExecute(Task task, TestRequest request, ActionListener<TestResp
|
599 | 565 | protected void startSubTask(TaskId parentTaskId, TestRequest subRequest, ActionListener<TestResponse> listener) {
|
600 | 566 | subRequest.setParentTask(parentTaskId);
|
601 | 567 | CountDownLatch completeLatch = completedLatches.get(subRequest);
|
602 |
| - ActionListener<TestResponse> latchedListener = new DelegatingActionListener<>( |
603 |
| - new LatchedActionListener<>(listener, completeLatch) |
604 |
| - ) { |
605 |
| - // Temporary logging addition for investigation into https://github.com/elastic/elasticsearch/issues/123568 |
606 |
| - @Override |
607 |
| - public void onResponse(TestResponse testResponse) { |
608 |
| - logger.debug("processing successful response to request [{}]", subRequest.getDescription()); |
609 |
| - delegate.onResponse(testResponse); |
610 |
| - } |
611 |
| - |
612 |
| - @Override |
613 |
| - public void onFailure(Exception e) { |
614 |
| - logger.debug("processing exceptional response to request [{}]: {}", subRequest.getDescription(), e.getMessage()); |
615 |
| - super.onFailure(e); |
616 |
| - } |
617 |
| - }; |
| 568 | + LatchedActionListener<TestResponse> latchedListener = new LatchedActionListener<>(listener, completeLatch); |
618 | 569 | transportService.getThreadPool().generic().execute(new AbstractRunnable() {
|
619 | 570 | @Override
|
620 | 571 | public void onFailure(Exception e) {
|
@@ -645,13 +596,6 @@ protected void doRun() throws Exception {
|
645 | 596 | TransportResponseHandler.TRANSPORT_WORKER
|
646 | 597 | )
|
647 | 598 | );
|
648 |
| - // Temporary addition for investigation into https://github.com/elastic/elasticsearch/issues/123568 |
649 |
| - logger.debug( |
650 |
| - "sent test request [{}] from [{}] to [{}]", |
651 |
| - subRequest.getDescription(), |
652 |
| - client.getLocalNodeId(), |
653 |
| - subRequest.node.descriptionWithoutAttributes() |
654 |
| - ); |
655 | 599 | }
|
656 | 600 | }
|
657 | 601 | });
|
|
0 commit comments