Skip to content

Adding segment, totalSegments parameters in dynamodb-enhanced client to support parallel scan #2614

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AmazonDynamoDB-3c48c88.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"category": "Amazon DynamoDB",
"contributor": "asanthan-amazon",
"type": "feature",
"description": "Allow parallel scan via DynamoDBEnhanced client and modify ScanEnhancedRequest to support totalSegments and segment parameters.\nThe corresponding github issue is https://github.com/aws/aws-sdk-java-v2/issues/1851"
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public ScanRequest generateRequest(TableSchema<T> tableSchema,
ScanRequest.Builder scanRequest = ScanRequest.builder()
.tableName(operationContext.tableName())
.limit(this.request.limit())
.segment(this.request.segment())
.totalSegments(this.request.totalSegments())
.exclusiveStartKey(this.request.exclusiveStartKey())
.consistentRead(this.request.consistentRead())
.expressionAttributeValues(expressionValues)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,14 @@ public final class ScanEnhancedRequest {
private final Boolean consistentRead;
private final Expression filterExpression;
private final List<NestedAttributeName> attributesToProject;
private final Integer segment;
private final Integer totalSegments;

private ScanEnhancedRequest(Builder builder) {
this.exclusiveStartKey = builder.exclusiveStartKey;
this.limit = builder.limit;
this.segment = builder.segment;
this.totalSegments = builder.totalSegments;
this.consistentRead = builder.consistentRead;
this.filterExpression = builder.filterExpression;
this.attributesToProject = builder.attributesToProject != null
Expand Down Expand Up @@ -87,6 +91,20 @@ public Integer limit() {
return limit;
}

/**
* Returns the value of segment set on this request object, or null if it doesn't exist.
*/
public Integer segment() {
return segment;
}

/**
* Returns the value of totalSegments set on this request object, or null if it doesn't exist.
*/
public Integer totalSegments() {
return totalSegments;
}

/**
* Returns the value of consistent read, or false if it has not been set.
*/
Expand Down Expand Up @@ -141,6 +159,12 @@ public boolean equals(Object o) {
if (limit != null ? ! limit.equals(scan.limit) : scan.limit != null) {
return false;
}
if (segment != null ? ! segment.equals(scan.segment) : scan.segment != null) {
return false;
}
if (totalSegments != null ? ! totalSegments.equals(scan.totalSegments) : scan.totalSegments != null) {
return false;
}
if (consistentRead != null ? ! consistentRead.equals(scan.consistentRead) : scan.consistentRead != null) {
return false;
}
Expand All @@ -155,6 +179,8 @@ public boolean equals(Object o) {
public int hashCode() {
int result = exclusiveStartKey != null ? exclusiveStartKey.hashCode() : 0;
result = 31 * result + (limit != null ? limit.hashCode() : 0);
result = 31 * result + (segment != null ? segment.hashCode() : 0);
result = 31 * result + (totalSegments != null ? totalSegments.hashCode() : 0);
result = 31 * result + (consistentRead != null ? consistentRead.hashCode() : 0);
result = 31 * result + (filterExpression != null ? filterExpression.hashCode() : 0);
result = 31 * result + (attributesToProject != null ? attributesToProject.hashCode() : 0);
Expand All @@ -170,6 +196,8 @@ public static final class Builder {
private Boolean consistentRead;
private Expression filterExpression;
private List<NestedAttributeName> attributesToProject;
private Integer segment;
private Integer totalSegments;

private Builder() {
}
Expand Down Expand Up @@ -203,6 +231,45 @@ public Builder limit(Integer limit) {
return this;
}

/**
* For a parallel Scan request, Segment identifies an individual segment to be scanned by an application worker.
* <p>
* <b>Note:</b>Segment IDs are zero-based, so the first segment is always 0. For example, if you want to use four
* application threads to scan a table or an index, then the first thread specifies a Segment value of 0, the second
* thread specifies 1, and so on.
*
* The value for Segment must be greater than or equal to 0, and less than the value provided for TotalSegments.
*
* If you provide Segment, you must also provide <em>TotalSegments</em>.
*
* @param segment identifies an individual segment to be scanned
* @return a builder of this type
*/
public Builder segment(Integer segment) {
this.segment = segment;
return this;
}

/**
* For a parallel Scan request, TotalSegments represents the total number of segments into
* which the Scan operation will be divided.
* <p>
* <b>Note:</b>If you do not specify this value, the TotalSegements is effectively 1 and Scan operation
* will be sequential rather than parallel.
*
* If you specify TotalSegments, you must also specify Segment.
*
* If you specify TotalSegments of 2 and above, you can create separate thread for each segment and scan items
* in parallel across segments from multiple threads.
*
* @param totalSegments the total number of segments to divide the table.
* @return a builder of this type
*/
public Builder totalSegments(Integer totalSegments) {
this.totalSegments = totalSegments;
return this;
}

/**
* Determines the read consistency model: If set to true, the operation uses strongly consistent reads; otherwise,
* the operation uses eventually consistent reads.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,21 @@ public void generateRequest_limit() {
assertThat(request, is(expectedRequest));
}

@Test
public void generateRequest_segment_totalSegments() {
ScanOperation<FakeItem> operation = ScanOperation.create(ScanEnhancedRequest.builder().segment(0).totalSegments(5).build());
ScanRequest request = operation.generateRequest(FakeItem.getTableSchema(),
PRIMARY_CONTEXT,
null);

ScanRequest expectedRequest = ScanRequest.builder()
.tableName(TABLE_NAME)
.segment(0)
.totalSegments(5)
.build();
assertThat(request, is(expectedRequest));
}

@Test
public void generateRequest_filterCondition_expressionAndValues() {
Map<String, AttributeValue> expressionValues = singletonMap(":test-key", stringValue("test-value"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public void builder_minimal() {
assertThat(builtObject.filterExpression(), is(nullValue()));
assertThat(builtObject.attributesToProject(), is(nullValue()));
assertThat(builtObject.limit(), is(nullValue()));
assertThat(builtObject.segment(), is(nullValue()));
assertThat(builtObject.totalSegments(), is(nullValue()));
}

@Test
Expand All @@ -73,13 +75,17 @@ public void builder_maximal() {
.attributesToProject(attributesToProjectArray)
.addAttributeToProject(additionalElement)
.limit(3)
.segment(0)
.totalSegments(5)
.build();

assertThat(builtObject.exclusiveStartKey(), is(exclusiveStartKey));
assertThat(builtObject.consistentRead(), is(false));
assertThat(builtObject.filterExpression(), is(filterExpression));
assertThat(builtObject.attributesToProject(), is(attributesToProject));
assertThat(builtObject.limit(), is(3));
assertThat(builtObject.segment(), is(0));
assertThat(builtObject.totalSegments(), is(5));
}

@Test
Expand Down