Skip to content

Commit 755aa1c

Browse files
authored
Avoiding failure when using frozen indices (#1842)
1 parent d015871 commit 755aa1c

File tree

6 files changed

+83
-2
lines changed

6 files changed

+83
-2
lines changed

mr/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,14 +311,16 @@ Scroll scroll(String query, BytesArray body, ScrollReader reader) throws IOExcep
311311
InputStream scroll = client.execute(POST, query, body).body();
312312
try {
313313
Scroll scrollResult = reader.read(scroll);
314-
if (settings.getInternalVersionOrThrow().onOrBefore(EsMajorVersion.V_2_X)) {
314+
if (scrollResult == null) {
315+
log.info(String.format("No scroll for query [%s/%s], likely because the index is frozen", query, body));
316+
} else if (settings.getInternalVersionOrThrow().onOrBefore(EsMajorVersion.V_2_X)) {
315317
// On ES 2.X and before, a scroll response does not contain any hits to start with.
316318
// Another request will be needed.
317319
scrollResult = new Scroll(scrollResult.getScrollId(), scrollResult.getTotalHits(), false);
318320
}
319321
return scrollResult;
320322
} finally {
321-
if (scroll instanceof StatsAware) {
323+
if (scroll != null && scroll instanceof StatsAware) {
322324
stats.aggregate(((StatsAware) scroll).stats());
323325
}
324326
}

mr/src/main/java/org/elasticsearch/hadoop/rest/ScrollQuery.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ public boolean hasNext() {
9292

9393
try {
9494
Scroll scroll = repository.scroll(query, body, reader);
95+
if (scroll == null) {
96+
finished = true;
97+
return false;
98+
}
9599
// size is passed as a limit (since we can't pass it directly into the request) - if it's not specified (<1) just scroll the whole index
96100
size = (size < 1 ? scroll.getTotalHits() : size);
97101
scrollId = scroll.getScrollId();
@@ -115,6 +119,10 @@ public boolean hasNext() {
115119

116120
try {
117121
Scroll scroll = repository.scroll(scrollId, reader);
122+
if (scroll == null) {
123+
finished = true;
124+
return false;
125+
}
118126
scrollId = scroll.getScrollId();
119127
batch = scroll.getHits();
120128
finished = scroll.isConcluded();

mr/src/main/java/org/elasticsearch/hadoop/serialization/ScrollReader.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,12 @@ public Scroll read(InputStream content) throws IOException {
268268
private Scroll read(Parser parser, BytesArray input) {
269269
// get scroll_id
270270
Token token = ParsingUtils.seek(parser, SCROLL_ID);
271+
if (token == null) { // no scroll id is returned for frozen indices
272+
if (log.isTraceEnabled()) {
273+
log.info("No scroll id found, likely because the index is frozen");
274+
}
275+
return null;
276+
}
271277
Assert.isTrue(token == Token.VALUE_STRING, "invalid response");
272278
String scrollId = parser.text();
273279

mr/src/test/java/org/elasticsearch/hadoop/rest/ScrollQueryTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,35 @@ private RestRepository mockRepository(boolean firstScrollReturnsHits) throws Exc
9797

9898

9999

100+
return mocked;
101+
}
102+
103+
@Test
104+
public void testFrozen() throws Exception {
105+
// Frozen indices return a null scroll
106+
RestRepository repository = mockRepositoryFrozenIndex();
107+
ScrollReader scrollReader = Mockito.mock(ScrollReader.class);
108+
109+
String query = "/index/type/_search?scroll=10m&etc=etc";
110+
BytesArray body = new BytesArray("{}");
111+
long size = 100;
112+
113+
ScrollQuery scrollQuery = new ScrollQuery(repository, query, body, size, scrollReader);
114+
115+
Assert.assertFalse(scrollQuery.hasNext());
116+
scrollQuery.close();
117+
Mockito.verify(repository).close();
118+
Stats stats = scrollQuery.stats();
119+
Assert.assertEquals(0, stats.docsReceived);
120+
}
121+
122+
private RestRepository mockRepositoryFrozenIndex() throws Exception {
123+
RestRepository mocked = Mockito.mock(RestRepository.class);
124+
Mockito.doReturn(null).when(mocked).scroll(Matchers.anyString(), Matchers.any(BytesArray.class), Matchers.any(ScrollReader.class));
125+
RestClient mockClient = Mockito.mock(RestClient.class);
126+
Mockito.when(mockClient.deleteScroll(Matchers.eq("mnop"))).thenReturn(true);
127+
Mockito.when(mockClient.deleteScroll(Matchers.anyString())).thenReturn(false);
128+
Mockito.doReturn(mockClient).when(mocked).getRestClient();
100129
return mocked;
101130
}
102131
}

mr/src/test/java/org/elasticsearch/hadoop/serialization/ScrollReaderTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,22 @@ public void testScrollWithHandlersThatCorrectsError() throws IOException {
604604
assertEquals(4L, JsonUtils.query("number").apply(scroll.getHits().get(0)[1]));
605605
}
606606

607+
@Test
608+
public void testNoScrollIdFromFrozenIndex() throws IOException {
609+
MappingSet mappings = getMappingSet("numbers-as-strings"); // The schema doesn't matter since there's no data
610+
InputStream stream = getClass().getResourceAsStream(scrollData("no-scroll-id"));
611+
Settings testSettings = new TestSettings();
612+
testSettings.setProperty(ConfigurationOptions.ES_READ_METADATA, "" + readMetadata);
613+
testSettings.setProperty(ConfigurationOptions.ES_READ_METADATA_FIELD, "" + metadataField);
614+
testSettings.setProperty(ConfigurationOptions.ES_OUTPUT_JSON, "" + readAsJson);
615+
testSettings.setProperty(DeserializationHandlerLoader.ES_READ_DATA_ERROR_HANDLERS , "fix");
616+
testSettings.setProperty(DeserializationHandlerLoader.ES_READ_DATA_ERROR_HANDLER + ".fix" , CorrectingHandler.class.getName());
617+
JdkValueReader valueReader = ObjectUtils.instantiate(JdkValueReader.class.getName(), testSettings);
618+
ScrollReader reader = new ScrollReader(ScrollReaderConfigBuilder.builder(valueReader, mappings.getResolvedView(), testSettings));
619+
ScrollReader.Scroll scroll = reader.read(stream);
620+
assertNull(scroll);
621+
}
622+
607623
/**
608624
* Case: Handler throws random Exceptions
609625
* Outcome: Processing fails fast.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{
2+
"took":0,
3+
"timed_out":false,
4+
"_shards":{
5+
"total":0,
6+
"successful":0,
7+
"skipped":0,
8+
"failed":0
9+
},
10+
"hits":{
11+
"total":{
12+
"value":0,
13+
"relation":"eq"
14+
},
15+
"max_score":0.0,
16+
"hits":[
17+
18+
]
19+
}
20+
}

0 commit comments

Comments
 (0)