Skip to content

Commit ef5a797

Browse files
committed
Fixing docsReceived counter (elastic#1840)
This commit fixes a bug that had been introduced to the "Documents Received" mapreduce counter. It had been missing counting the first set of hits that were returned by each scroll. Closes elastic#1470
1 parent 425a3ac commit ef5a797

File tree

2 files changed

+22
-7
lines changed

2 files changed

+22
-7
lines changed

mr/src/main/java/org/elasticsearch/hadoop/rest/ScrollQuery.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ public boolean hasNext() {
100100
} catch (IOException ex) {
101101
throw new EsHadoopIllegalStateException(String.format("Cannot create scroll for query [%s/%s]", query, body), ex);
102102
}
103-
103+
read += batch.size();
104+
stats.docsReceived += batch.size();
104105
// no longer needed
105106
body = null;
106107
query = null;

mr/src/test/java/org/elasticsearch/hadoop/rest/ScrollQueryTest.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,8 @@
3636

3737
public class ScrollQueryTest {
3838

39-
@Test
40-
public void test() throws Exception {
41-
RestRepository repository = mockRepository();
39+
public void test(boolean firstScrollReturnsHits) throws Exception {
40+
RestRepository repository = mockRepository(firstScrollReturnsHits);
4241
ScrollReader scrollReader = Mockito.mock(ScrollReader.class);
4342

4443
String query = "/index/type/_search?scroll=10m&etc=etc";
@@ -54,23 +53,38 @@ public void test() throws Exception {
5453
Mockito.verify(repository).close();
5554
Stats stats = scrollQuery.stats();
5655
Assert.assertEquals(1, stats.docsReceived);
56+
Assert.assertEquals(1, scrollQuery.getRead());
57+
}
58+
59+
@Test
60+
public void testWithEmptyFirstScroll() throws Exception {
61+
test(false);
62+
}
63+
64+
@Test
65+
public void testWithNonEmptyFirstScroll() throws Exception {
66+
test(true);
5767
}
5868

59-
private RestRepository mockRepository() throws Exception {
69+
private RestRepository mockRepository(boolean firstScrollReturnsHits) throws Exception {
6070
Map<String, Object> data = new HashMap<String, Object>();
6171
data.put("field", "value");
6272
String id = "1";
6373
Object[] hit = new Object[]{id, data};
6474

6575
RestRepository mocked = Mockito.mock(RestRepository.class);
6676

67-
ScrollReader.Scroll start = new ScrollReader.Scroll("abcd", 10, Collections.<Object[]>emptyList(), 5, 5);
77+
ScrollReader.Scroll start = new ScrollReader.Scroll("abcd", 10,
78+
firstScrollReturnsHits ? Collections.singletonList(hit) : Collections.<Object[]>emptyList(),
79+
5, 5);
6880

6981
Mockito.doReturn(start).when(mocked).scroll(Matchers.anyString(), Matchers.any(BytesArray.class), Matchers.any(ScrollReader.class));
7082

7183
ScrollReader.Scroll middle = new ScrollReader.Scroll("efgh", 10, Collections.<Object[]>emptyList(), 3, 3);
7284
Mockito.doReturn(middle).when(mocked).scroll(Matchers.eq("abcd"), Matchers.any(ScrollReader.class));
73-
ScrollReader.Scroll end = new ScrollReader.Scroll("ijkl", 10, Collections.singletonList(hit), 2, 1);
85+
ScrollReader.Scroll end = new ScrollReader.Scroll("ijkl", 10,
86+
firstScrollReturnsHits ? Collections.<Object[]>emptyList() : Collections.singletonList(hit),
87+
2, 1);
7488
Mockito.doReturn(end).when(mocked).scroll(Matchers.eq("efgh"), Matchers.any(ScrollReader.class));
7589
ScrollReader.Scroll finalScroll = new ScrollReader.Scroll("mnop", 10, true);
7690
Mockito.doReturn(finalScroll).when(mocked).scroll(Matchers.eq("ijkl"), Matchers.any(ScrollReader.class));

0 commit comments

Comments
 (0)