Skip to content

Commit 5ba39e6

Browse files
authored
Harmonise error behaviour between the APIs that use BatchExecuteStatement. (#61)
1 parent 60161fa commit 5ba39e6

14 files changed

+283
-463
lines changed

README.md

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -420,20 +420,6 @@ let constraintList: [TestPolymorphicTransactionConstraintEntry] = [
420420
try await table.polymorphicTransactWrite(entryList, constraints: constraintList)
421421
```
422422

423-
Both the `PolymorphicWriteEntry` and `PolymorphicTransactionConstraintEntry` conforming types can
424-
optionally provide a `compositePrimaryKey` property that will allow the API to return more information
425-
about failed transactions. This is enabled by default when using the `@PolymorphicWriteEntry` and
426-
`@PolymorphicTransactionConstraintEntry` macros but can be disabled by setting the
427-
`passCompositePrimaryKey` argument.
428-
429-
```swift
430-
@PolymorphicWriteEntry(passCompositePrimaryKey: false)
431-
enum TestPolymorphicWriteEntry {
432-
case testTypeA(TestTypeAWriteEntry)
433-
case testTypeB(TestTypeBWriteEntry)
434-
}
435-
```
436-
437423
## Recording updates in a historical partition
438424

439425
This package contains a number of convenience functions for storing versions of a row in a historical partition

Sources/DynamoDBTables/AWSDynamoDBCompositePrimaryKeyTable+deleteItems.swift

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@ private let maximumUpdatesPerExecuteStatement = 25
3333

3434
/// DynamoDBTable conformance updateItems function
3535
public extension AWSDynamoDBCompositePrimaryKeyTable {
36-
private func deleteChunkedItems(_ keys: [CompositePrimaryKey<some Any>]) async throws {
36+
private func deleteChunkedItems(_ keys: [CompositePrimaryKey<some Any>]) async throws -> [DynamoDBClientTypes.BatchStatementResponse] {
3737
// if there are no keys, there is nothing to update
3838
guard keys.count > 0 else {
39-
return
39+
return []
4040
}
4141

4242
let statements = try keys.map { existingKey -> DynamoDBClientTypes.BatchStatementRequest in
@@ -49,13 +49,15 @@ public extension AWSDynamoDBCompositePrimaryKeyTable {
4949
let executeInput = BatchExecuteStatementInput(statements: statements)
5050

5151
let response = try await self.dynamodb.batchExecuteStatement(input: executeInput)
52-
try throwOnBatchExecuteStatementErrors(response: response)
52+
return response.responses ?? []
5353
}
5454

55-
private func deleteChunkedItems(_ existingItems: [TypedTTLDatabaseItem<some Any, some Any, some Any>]) async throws {
55+
private func deleteChunkedItems(_ existingItems: [TypedTTLDatabaseItem<some Any, some Any, some Any>]) async throws
56+
-> [DynamoDBClientTypes.BatchStatementResponse]
57+
{
5658
// if there are no items, there is nothing to update
5759
guard existingItems.count > 0 else {
58-
return
60+
return []
5961
}
6062

6163
let statements = try existingItems.map { existingItem -> DynamoDBClientTypes.BatchStatementRequest in
@@ -68,24 +70,45 @@ public extension AWSDynamoDBCompositePrimaryKeyTable {
6870
let executeInput = BatchExecuteStatementInput(statements: statements)
6971

7072
let response = try await self.dynamodb.batchExecuteStatement(input: executeInput)
71-
try throwOnBatchExecuteStatementErrors(response: response)
73+
return response.responses ?? []
7274
}
7375

7476
func deleteItems(forKeys keys: [CompositePrimaryKey<some Any>]) async throws {
7577
// BatchExecuteStatement has a maximum of 25 statements
7678
// This function handles pagination internally.
7779
let chunkedKeys = keys.chunked(by: maximumUpdatesPerExecuteStatement)
78-
try await chunkedKeys.concurrentForEach { chunk in
79-
try await self.deleteChunkedItems(chunk)
80+
let zippedResponses = try await chunkedKeys.concurrentFlatMap { chunk in
81+
let responses = try await self.deleteChunkedItems(chunk)
82+
83+
return zip(responses, chunk)
84+
}
85+
86+
let errors = zippedResponses.compactMap { response, key in
87+
response.error?.asDynamoDBTableError(partitionKey: key.partitionKey, sortKey: key.sortKey, entryCount: keys.count)
88+
}
89+
90+
if !errors.isEmpty {
91+
throw DynamoDBTableError.batchFailures(errors: errors.removeDuplicates())
8092
}
8193
}
8294

8395
func deleteItems(existingItems: [TypedTTLDatabaseItem<some Any, some Any, some Any>]) async throws {
8496
// BatchExecuteStatement has a maximum of 25 statements
8597
// This function handles pagination internally.
8698
let chunkedItems = existingItems.chunked(by: maximumUpdatesPerExecuteStatement)
87-
try await chunkedItems.concurrentForEach { chunk in
88-
try await self.deleteChunkedItems(chunk)
99+
let zippedResponses = try await chunkedItems.concurrentFlatMap { chunk in
100+
let responses = try await self.deleteChunkedItems(chunk)
101+
102+
return zip(responses, chunk)
103+
}
104+
105+
let errors = zippedResponses.compactMap { response, item in
106+
response.error?.asDynamoDBTableError(partitionKey: item.compositePrimaryKey.partitionKey,
107+
sortKey: item.compositePrimaryKey.sortKey, entryCount: existingItems.count)
108+
}
109+
110+
if !errors.isEmpty {
111+
throw DynamoDBTableError.batchFailures(errors: errors.removeDuplicates())
89112
}
90113
}
91114
}

Sources/DynamoDBTables/AWSDynamoDBCompositePrimaryKeyTable+execute.swift

Lines changed: 91 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ public extension AWSDynamoDBCompositePrimaryKeyTable {
7474
// ExecuteStatement API has a maximum limit on the number of decomposed read operations per request.
7575
// Caller of this function needs to handle pagination on their side.
7676
guard partitionKeys.count <= maximumKeysPerExecuteStatement else {
77-
throw DynamoDBTableError.validationError(
78-
reason: "Execute API has a maximum limit of \(maximumKeysPerExecuteStatement) partition keys per request.")
77+
throw DynamoDBTableError.validation(partitionKey: nil, sortKey: nil,
78+
message: "Execute API has a maximum limit of \(maximumKeysPerExecuteStatement) partition keys per request.")
7979
}
8080

8181
let statement = self.getStatement(partitionKeys: partitionKeys,
@@ -143,8 +143,8 @@ public extension AWSDynamoDBCompositePrimaryKeyTable {
143143
// ExecuteStatement API has a maximum limit on the number of decomposed read operations per request.
144144
// Caller of this function needs to handle pagination on their side.
145145
guard partitionKeys.count <= maximumKeysPerExecuteStatement else {
146-
throw DynamoDBTableError.validationError(
147-
reason: "Execute API has a maximum limit of \(maximumKeysPerExecuteStatement) partition keys per request.")
146+
throw DynamoDBTableError.validation(partitionKey: nil, sortKey: nil,
147+
message: "Execute API has a maximum limit of \(maximumKeysPerExecuteStatement) partition keys per request.")
148148
}
149149

150150
let statement = self.getStatement(partitionKeys: partitionKeys,
@@ -256,3 +256,90 @@ public extension AWSDynamoDBCompositePrimaryKeyTable {
256256
}
257257
}
258258
}
259+
260+
extension DynamoDBClientTypes.BatchStatementError {
261+
func asDynamoDBTableError(partitionKey: String, sortKey: String, entryCount: Int) -> DynamoDBTableError? {
262+
guard let code = self.code else {
263+
return nil
264+
}
265+
266+
// https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchStatementError.html
267+
return switch code {
268+
case .accessdenied:
269+
DynamoDBTableError.accessDenied(message: self.message)
270+
case .conditionalcheckfailed:
271+
DynamoDBTableError.conditionalCheckFailed(partitionKey: partitionKey,
272+
sortKey: sortKey,
273+
message: self.message)
274+
case .duplicateitem:
275+
DynamoDBTableError.duplicateItem(partitionKey: partitionKey, sortKey: sortKey,
276+
message: self.message)
277+
case .internalservererror:
278+
DynamoDBTableError.internalServerError(message: self.message)
279+
case .itemcollectionsizelimitexceeded:
280+
DynamoDBTableError.itemCollectionSizeLimitExceeded(attemptedSize: entryCount,
281+
maximumSize: AWSDynamoDBLimits.maximumUpdatesPerTransactionStatement)
282+
case .provisionedthroughputexceeded:
283+
DynamoDBTableError.provisionedThroughputExceeded(message: self.message)
284+
case .requestlimitexceeded:
285+
DynamoDBTableError.requestLimitExceeded(message: self.message)
286+
case .resourcenotfound:
287+
DynamoDBTableError.resourceNotFound(partitionKey: partitionKey, sortKey: sortKey,
288+
message: self.message)
289+
case .throttlingerror:
290+
DynamoDBTableError.throttling(message: self.message)
291+
case .transactionconflict:
292+
DynamoDBTableError.transactionConflict(message: self.message)
293+
case .validationerror:
294+
DynamoDBTableError.validation(partitionKey: partitionKey, sortKey: sortKey,
295+
message: self.message)
296+
case let .sdkUnknown(message):
297+
DynamoDBTableError.unknown(code: message, partitionKey: partitionKey,
298+
sortKey: sortKey, message: self.message)
299+
}
300+
}
301+
}
302+
303+
extension [DynamoDBTableError] {
304+
func removeDuplicates() -> [DynamoDBTableError] {
305+
var seenAccessDenied = false
306+
var seenInternalServerError = false
307+
var seenRequestLimitExceeded = false
308+
var seenStatementLengthExceeded = false
309+
var seenItemCollectionSizeLimitExceeded = false
310+
var seenProvisionedThroughputExceeded = false
311+
312+
func canPassThrough(state: inout Bool) -> Bool {
313+
if state {
314+
return false
315+
} else {
316+
state = true
317+
return true
318+
}
319+
}
320+
321+
// iterate through all errors
322+
return self.compactMap { error in
323+
return switch error {
324+
case .accessDenied:
325+
canPassThrough(state: &seenAccessDenied) ? error : nil
326+
case .internalServerError:
327+
canPassThrough(state: &seenInternalServerError) ? error : nil
328+
case .requestLimitExceeded:
329+
canPassThrough(state: &seenRequestLimitExceeded) ? error : nil
330+
case .statementLengthExceeded:
331+
canPassThrough(state: &seenStatementLengthExceeded) ? error : nil
332+
case .itemCollectionSizeLimitExceeded:
333+
canPassThrough(state: &seenItemCollectionSizeLimitExceeded) ? error : nil
334+
case .provisionedThroughputExceeded:
335+
canPassThrough(state: &seenProvisionedThroughputExceeded) ? error : nil
336+
case .conditionalCheckFailed, .duplicateItem, .concurrencyError, .validation, .throttling, .databaseError,
337+
.unexpectedError, .unexpectedResponse, .resourceNotFound, .typeMismatch, .batchAPIExceededRetries,
338+
.unexpectedType, .unableToUpdateError, .unrecognizedError, .multipleUnexpectedErrors, .transactionCanceled,
339+
.transactionConflict, .batchFailures, .unknown:
340+
// always pass through these errors
341+
error
342+
}
343+
}
344+
}
345+
}

0 commit comments

Comments
 (0)