Skip to content

Commit f489645

Browse files
Adding segment, totalSegments parameters in dynamodb-enhanced client to support parallel scan (#2614)
* Adding segment,totalSegments parameters in dynamodb-enhanced client to support parallel scan Note: DynamoDB local does not support parallel scan. So, this change was tested manually using integration test. * Fix documentation for totalSegment field. Co-authored-by: Matthew Miller <[email protected]>
1 parent 32f6f1b commit f489645

File tree

5 files changed

+96
-0
lines changed

5 files changed

+96
-0
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"category": "Amazon DynamoDB",
3+
"contributor": "asanthan-amazon",
4+
"type": "feature",
5+
"description": "Allow parallel scan via DynamoDBEnhanced client and modify ScanEnhancedRequest to support totalSegments and segment parameters.\nThe corresponding github issue is https://github.com/aws/aws-sdk-java-v2/issues/1851"
6+
}

services-custom/dynamodb-enhanced/src/main/java/software/amazon/awssdk/enhanced/dynamodb/internal/operations/ScanOperation.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ public ScanRequest generateRequest(TableSchema<T> tableSchema,
7272
ScanRequest.Builder scanRequest = ScanRequest.builder()
7373
.tableName(operationContext.tableName())
7474
.limit(this.request.limit())
75+
.segment(this.request.segment())
76+
.totalSegments(this.request.totalSegments())
7577
.exclusiveStartKey(this.request.exclusiveStartKey())
7678
.consistentRead(this.request.consistentRead())
7779
.expressionAttributeValues(expressionValues)

services-custom/dynamodb-enhanced/src/main/java/software/amazon/awssdk/enhanced/dynamodb/model/ScanEnhancedRequest.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,14 @@ public final class ScanEnhancedRequest {
4444
private final Boolean consistentRead;
4545
private final Expression filterExpression;
4646
private final List<NestedAttributeName> attributesToProject;
47+
private final Integer segment;
48+
private final Integer totalSegments;
4749

4850
private ScanEnhancedRequest(Builder builder) {
4951
this.exclusiveStartKey = builder.exclusiveStartKey;
5052
this.limit = builder.limit;
53+
this.segment = builder.segment;
54+
this.totalSegments = builder.totalSegments;
5155
this.consistentRead = builder.consistentRead;
5256
this.filterExpression = builder.filterExpression;
5357
this.attributesToProject = builder.attributesToProject != null
@@ -87,6 +91,20 @@ public Integer limit() {
8791
return limit;
8892
}
8993

94+
/**
95+
* Returns the value of segment set on this request object, or null if it doesn't exist.
96+
*/
97+
public Integer segment() {
98+
return segment;
99+
}
100+
101+
/**
102+
* Returns the value of totalSegments set on this request object, or null if it doesn't exist.
103+
*/
104+
public Integer totalSegments() {
105+
return totalSegments;
106+
}
107+
90108
/**
91109
* Returns the value of consistent read, or false if it has not been set.
92110
*/
@@ -141,6 +159,12 @@ public boolean equals(Object o) {
141159
if (limit != null ? ! limit.equals(scan.limit) : scan.limit != null) {
142160
return false;
143161
}
162+
if (segment != null ? ! segment.equals(scan.segment) : scan.segment != null) {
163+
return false;
164+
}
165+
if (totalSegments != null ? ! totalSegments.equals(scan.totalSegments) : scan.totalSegments != null) {
166+
return false;
167+
}
144168
if (consistentRead != null ? ! consistentRead.equals(scan.consistentRead) : scan.consistentRead != null) {
145169
return false;
146170
}
@@ -155,6 +179,8 @@ public boolean equals(Object o) {
155179
public int hashCode() {
156180
int result = exclusiveStartKey != null ? exclusiveStartKey.hashCode() : 0;
157181
result = 31 * result + (limit != null ? limit.hashCode() : 0);
182+
result = 31 * result + (segment != null ? segment.hashCode() : 0);
183+
result = 31 * result + (totalSegments != null ? totalSegments.hashCode() : 0);
158184
result = 31 * result + (consistentRead != null ? consistentRead.hashCode() : 0);
159185
result = 31 * result + (filterExpression != null ? filterExpression.hashCode() : 0);
160186
result = 31 * result + (attributesToProject != null ? attributesToProject.hashCode() : 0);
@@ -170,6 +196,8 @@ public static final class Builder {
170196
private Boolean consistentRead;
171197
private Expression filterExpression;
172198
private List<NestedAttributeName> attributesToProject;
199+
private Integer segment;
200+
private Integer totalSegments;
173201

174202
private Builder() {
175203
}
@@ -203,6 +231,45 @@ public Builder limit(Integer limit) {
203231
return this;
204232
}
205233

234+
/**
235+
* For a parallel Scan request, Segment identifies an individual segment to be scanned by an application worker.
236+
* <p>
237+
* <b>Note:</b>Segment IDs are zero-based, so the first segment is always 0. For example, if you want to use four
238+
* application threads to scan a table or an index, then the first thread specifies a Segment value of 0, the second
239+
* thread specifies 1, and so on.
240+
*
241+
* The value for Segment must be greater than or equal to 0, and less than the value provided for TotalSegments.
242+
*
243+
* If you provide Segment, you must also provide <em>TotalSegments</em>.
244+
*
245+
* @param segment identifies an individual segment to be scanned
246+
* @return a builder of this type
247+
*/
248+
public Builder segment(Integer segment) {
249+
this.segment = segment;
250+
return this;
251+
}
252+
253+
/**
254+
* For a parallel Scan request, TotalSegments represents the total number of segments into
255+
* which the Scan operation will be divided.
256+
* <p>
257+
* <b>Note:</b>If you do not specify this value, the TotalSegements is effectively 1 and Scan operation
258+
* will be sequential rather than parallel.
259+
*
260+
* If you specify TotalSegments, you must also specify Segment.
261+
*
262+
* If you specify TotalSegments of 2 and above, you can create separate thread for each segment and scan items
263+
* in parallel across segments from multiple threads.
264+
*
265+
* @param totalSegments the total number of segments to divide the table.
266+
* @return a builder of this type
267+
*/
268+
public Builder totalSegments(Integer totalSegments) {
269+
this.totalSegments = totalSegments;
270+
return this;
271+
}
272+
206273
/**
207274
* Determines the read consistency model: If set to true, the operation uses strongly consistent reads; otherwise,
208275
* the operation uses eventually consistent reads.

services-custom/dynamodb-enhanced/src/test/java/software/amazon/awssdk/enhanced/dynamodb/internal/operations/ScanOperationTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,21 @@ public void generateRequest_limit() {
141141
assertThat(request, is(expectedRequest));
142142
}
143143

144+
@Test
145+
public void generateRequest_segment_totalSegments() {
146+
ScanOperation<FakeItem> operation = ScanOperation.create(ScanEnhancedRequest.builder().segment(0).totalSegments(5).build());
147+
ScanRequest request = operation.generateRequest(FakeItem.getTableSchema(),
148+
PRIMARY_CONTEXT,
149+
null);
150+
151+
ScanRequest expectedRequest = ScanRequest.builder()
152+
.tableName(TABLE_NAME)
153+
.segment(0)
154+
.totalSegments(5)
155+
.build();
156+
assertThat(request, is(expectedRequest));
157+
}
158+
144159
@Test
145160
public void generateRequest_filterCondition_expressionAndValues() {
146161
Map<String, AttributeValue> expressionValues = singletonMap(":test-key", stringValue("test-value"));

services-custom/dynamodb-enhanced/src/test/java/software/amazon/awssdk/enhanced/dynamodb/model/ScanEnhancedRequestTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public void builder_minimal() {
4747
assertThat(builtObject.filterExpression(), is(nullValue()));
4848
assertThat(builtObject.attributesToProject(), is(nullValue()));
4949
assertThat(builtObject.limit(), is(nullValue()));
50+
assertThat(builtObject.segment(), is(nullValue()));
51+
assertThat(builtObject.totalSegments(), is(nullValue()));
5052
}
5153

5254
@Test
@@ -73,13 +75,17 @@ public void builder_maximal() {
7375
.attributesToProject(attributesToProjectArray)
7476
.addAttributeToProject(additionalElement)
7577
.limit(3)
78+
.segment(0)
79+
.totalSegments(5)
7680
.build();
7781

7882
assertThat(builtObject.exclusiveStartKey(), is(exclusiveStartKey));
7983
assertThat(builtObject.consistentRead(), is(false));
8084
assertThat(builtObject.filterExpression(), is(filterExpression));
8185
assertThat(builtObject.attributesToProject(), is(attributesToProject));
8286
assertThat(builtObject.limit(), is(3));
87+
assertThat(builtObject.segment(), is(0));
88+
assertThat(builtObject.totalSegments(), is(5));
8389
}
8490

8591
@Test

0 commit comments

Comments
 (0)