Skip to content

Commit 3501167

Browse files
authored
Add transactWrite APIs to provide non-polymorphic build APIs within a transaction. (#23)
1 parent c578ef3 commit 3501167

9 files changed

+736
-22
lines changed

README.md

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,12 @@ let entryList: [TestTypeAWriteEntry] = [
356356
]
357357

358358
try await table.bulkWrite(entryList)
359-
//try await table.transactWrite(entryList) <<-- When implemented
359+
```
360+
361+
Or alternatively executed within a DynamoDB transaction-
362+
363+
```swift
364+
try await table.transactWrite(entryList)
360365
```
361366

362367
and similarly for polymorphic queries-
@@ -384,20 +389,29 @@ let entryList: [TestPolymorphicWriteEntry] = [
384389
]
385390

386391
try await table.polymorphicBulkWrite(entryList)
392+
```
393+
394+
Or alternatively executed within a DynamoDB transaction-
395+
396+
```swift
387397
try await table.polymorphicTransactWrite(entryList)
388398
```
389399

390400
For transactions, you can additionally specify a set of constraints to be part of the transaction-
391401

392402
```swift
393-
typealias TestTypeAStandardTransactionConstraintEntry = StandardTransactionConstraintEntry<TestTypeA>
403+
let constraintList: [StandardTransactionConstraintEntry<TestTypeA>] = [
404+
.required(existing: databaseItem3),
405+
.required(existing: databaseItem4),
406+
]
394407

395-
// Update when `transactWrite` API implemented
408+
try await table.transactWrite(entryList, constraints: constraintList)
396409
```
397410

398411
and similarly for polymorphic queries-
399412

400413
```swift
414+
typealias TestTypeAStandardTransactionConstraintEntry = StandardTransactionConstraintEntry<TestTypeA>
401415
typealias TestTypeBStandardTransactionConstraintEntry = StandardTransactionConstraintEntry<TestTypeB>
402416

403417
enum TestPolymorphicTransactionConstraintEntry: PolymorphicTransactionConstraintEntry {

Sources/DynamoDBTables/AWSDynamoDBCompositePrimaryKeyTable+updateItems.swift

Lines changed: 147 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,31 @@ public extension AWSDynamoDBCompositePrimaryKeyTable {
150150
throw DynamoDBTableError.batchErrorsReturned(errorCount: errorCount, messageMap: errorMap)
151151
}
152152

153+
private func writeTransactionItems<AttributesType, ItemType>(
154+
_ entries: [WriteEntry<AttributesType, ItemType>], constraints: [TransactionConstraintEntry<AttributesType, ItemType>]) async throws
155+
{
156+
// if there are no items, there is nothing to update
157+
guard entries.count > 0 else {
158+
return
159+
}
160+
161+
let entryStatements = try entries.map { entry -> DynamoDBClientTypes.ParameterizedStatement in
162+
let statement = try self.entryToStatement(entry)
163+
164+
return DynamoDBClientTypes.ParameterizedStatement(statement: statement)
165+
}
166+
167+
let requiredItemsStatements = try constraints.map { entry -> DynamoDBClientTypes.ParameterizedStatement in
168+
let statement = try self.entryToStatement(entry)
169+
170+
return DynamoDBClientTypes.ParameterizedStatement(statement: statement)
171+
}
172+
173+
let transactionInput = ExecuteTransactionInput(transactStatements: entryStatements + requiredItemsStatements)
174+
175+
_ = try await dynamodb.executeTransaction(input: transactionInput)
176+
}
177+
153178
private func writeTransactionItems(
154179
_ entries: [some PolymorphicWriteEntry], constraints: [some PolymorphicTransactionConstraintEntry]) async throws
155180
{
@@ -179,6 +204,18 @@ public extension AWSDynamoDBCompositePrimaryKeyTable {
179204
_ = try await dynamodb.executeTransaction(input: transactionInput)
180205
}
181206

207+
func transactWrite(_ entries: [WriteEntry<some Any, some Any>]) async throws {
208+
try await self.transactWrite(entries, constraints: [],
209+
retriesRemaining: self.retryConfiguration.numRetries)
210+
}
211+
212+
func transactWrite<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>],
213+
constraints: [TransactionConstraintEntry<AttributesType, ItemType>]) async throws
214+
{
215+
try await self.transactWrite(entries, constraints: constraints,
216+
retriesRemaining: self.retryConfiguration.numRetries)
217+
}
218+
182219
func polymorphicTransactWrite(_ entries: [some PolymorphicWriteEntry]) async throws {
183220
let noConstraints: [EmptyPolymorphicTransactionConstraintEntry] = []
184221
return try await self.polymorphicTransactWrite(entries, constraints: noConstraints,
@@ -192,6 +229,113 @@ public extension AWSDynamoDBCompositePrimaryKeyTable {
192229
retriesRemaining: self.retryConfiguration.numRetries)
193230
}
194231

232+
private func transactWrite<AttributesType, ItemType>(
233+
_ entries: [WriteEntry<AttributesType, ItemType>], constraints: [TransactionConstraintEntry<AttributesType, ItemType>],
234+
retriesRemaining: Int) async throws
235+
{
236+
let entryCount = entries.count + constraints.count
237+
238+
if entryCount > AWSDynamoDBLimits.maximumUpdatesPerTransactionStatement {
239+
throw DynamoDBTableError.transactionSizeExceeded(attemptedSize: entryCount,
240+
maximumSize: AWSDynamoDBLimits.maximumUpdatesPerTransactionStatement)
241+
}
242+
243+
let result: Swift.Result<Void, DynamoDBTableError>
244+
do {
245+
try await self.writeTransactionItems(entries, constraints: constraints)
246+
247+
result = .success(())
248+
} catch let exception as TransactionCanceledException {
249+
guard let cancellationReasons = exception.properties.cancellationReasons else {
250+
throw DynamoDBTableError.transactionCanceled(reasons: [])
251+
}
252+
253+
let keys = entries.map(\.compositePrimaryKey) + constraints.map(\.compositePrimaryKey)
254+
255+
var isTransactionConflict = false
256+
let reasons = try zip(cancellationReasons, keys).compactMap { cancellationReason, entryKey -> DynamoDBTableError? in
257+
let key: StandardCompositePrimaryKey?
258+
if let item = cancellationReason.item {
259+
key = try DynamoDBDecoder().decode(.m(item))
260+
} else {
261+
key = nil
262+
}
263+
264+
let partitionKey = key?.partitionKey ?? entryKey.partitionKey
265+
let sortKey = key?.sortKey ?? entryKey.sortKey
266+
267+
// https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_ExecuteTransaction.html
268+
switch cancellationReason.code {
269+
case "None":
270+
return nil
271+
case "ConditionalCheckFailed":
272+
return DynamoDBTableError.transactionConditionalCheckFailed(partitionKey: partitionKey,
273+
sortKey: sortKey,
274+
message: cancellationReason.message)
275+
case "DuplicateItem":
276+
return DynamoDBTableError.duplicateItem(partitionKey: partitionKey, sortKey: sortKey,
277+
message: cancellationReason.message)
278+
case "ItemCollectionSizeLimitExceeded":
279+
return DynamoDBTableError.transactionSizeExceeded(attemptedSize: entryCount,
280+
maximumSize: AWSDynamoDBLimits.maximumUpdatesPerTransactionStatement)
281+
case "TransactionConflict":
282+
isTransactionConflict = true
283+
284+
return DynamoDBTableError.transactionConflict(message: cancellationReason.message)
285+
case "ProvisionedThroughputExceeded":
286+
return DynamoDBTableError.transactionProvisionedThroughputExceeded(message: cancellationReason.message)
287+
case "ThrottlingError":
288+
return DynamoDBTableError.transactionThrottling(message: cancellationReason.message)
289+
case "ValidationError":
290+
return DynamoDBTableError.transactionValidation(partitionKey: partitionKey, sortKey: sortKey,
291+
message: cancellationReason.message)
292+
default:
293+
return DynamoDBTableError.transactionUnknown(code: cancellationReason.code, partitionKey: partitionKey,
294+
sortKey: sortKey, message: cancellationReason.message)
295+
}
296+
}
297+
298+
if isTransactionConflict, retriesRemaining > 0 {
299+
return try await retryTransactWrite(entries, constraints: constraints, retriesRemaining: retriesRemaining)
300+
}
301+
302+
result = .failure(DynamoDBTableError.transactionCanceled(reasons: reasons))
303+
} catch let exception as TransactionConflictException {
304+
if retriesRemaining > 0 {
305+
return try await retryTransactWrite(entries, constraints: constraints, retriesRemaining: retriesRemaining)
306+
}
307+
308+
let reason = DynamoDBTableError.transactionConflict(message: exception.message)
309+
310+
result = .failure(DynamoDBTableError.transactionCanceled(reasons: [reason]))
311+
}
312+
313+
let retryCount = self.retryConfiguration.numRetries - retriesRemaining
314+
self.tableMetrics.transactWriteRetryCountRecorder?.record(retryCount)
315+
316+
switch result {
317+
case .success:
318+
return
319+
case let .failure(failure):
320+
throw failure
321+
}
322+
}
323+
324+
private func retryTransactWrite<AttributesType, ItemType>(
325+
_ entries: [WriteEntry<AttributesType, ItemType>], constraints: [TransactionConstraintEntry<AttributesType, ItemType>],
326+
retriesRemaining: Int) async throws
327+
{
328+
// determine the required interval
329+
let retryInterval = Int(self.retryConfiguration.getRetryInterval(retriesRemaining: retriesRemaining))
330+
331+
logger.warning(
332+
"Transaction retried due to conflict. Remaining retries: \(retriesRemaining). Retrying in \(retryInterval) ms.")
333+
try await Task.sleep(nanoseconds: UInt64(retryInterval) * millisecondsToNanoSeconds)
334+
335+
logger.trace("Reattempting request due to remaining retries: \(retryInterval)")
336+
return try await self.transactWrite(entries, constraints: constraints, retriesRemaining: retriesRemaining - 1)
337+
}
338+
195339
private func polymorphicTransactWrite(
196340
_ entries: [some PolymorphicWriteEntry], constraints: [some PolymorphicTransactionConstraintEntry],
197341
retriesRemaining: Int) async throws
@@ -259,13 +403,13 @@ public extension AWSDynamoDBCompositePrimaryKeyTable {
259403
}
260404

261405
if isTransactionConflict, retriesRemaining > 0 {
262-
return try await retryTransactWrite(entries, constraints: constraints, retriesRemaining: retriesRemaining)
406+
return try await retryPolymorphicTransactWrite(entries, constraints: constraints, retriesRemaining: retriesRemaining)
263407
}
264408

265409
result = .failure(DynamoDBTableError.transactionCanceled(reasons: reasons))
266410
} catch let exception as TransactionConflictException {
267411
if retriesRemaining > 0 {
268-
return try await retryTransactWrite(entries, constraints: constraints, retriesRemaining: retriesRemaining)
412+
return try await retryPolymorphicTransactWrite(entries, constraints: constraints, retriesRemaining: retriesRemaining)
269413
}
270414

271415
let reason = DynamoDBTableError.transactionConflict(message: exception.message)
@@ -284,7 +428,7 @@ public extension AWSDynamoDBCompositePrimaryKeyTable {
284428
}
285429
}
286430

287-
private func retryTransactWrite(
431+
private func retryPolymorphicTransactWrite(
288432
_ entries: [some PolymorphicWriteEntry], constraints: [some PolymorphicTransactionConstraintEntry],
289433
retriesRemaining: Int) async throws
290434
{

Sources/DynamoDBTables/DynamoDBCompositePrimaryKeyTable.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ public protocol DynamoDBCompositePrimaryKeyTable {
137137
* The transaction will comprise of the write entries specified in `entries`.
138138
* The transaction will fail if the number of entries is greater than 100.
139139
*/
140+
func transactWrite<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>]) async throws
141+
140142
func polymorphicTransactWrite<WriteEntryType: PolymorphicWriteEntry>(
141143
_ entries: [WriteEntryType]) async throws
142144

@@ -147,6 +149,9 @@ public protocol DynamoDBCompositePrimaryKeyTable {
147149
* with a specified version must exist regardless of if it will be written to by the transaction).
148150
* The transaction will fail if the number of entries and constraints combined is greater than 100.
149151
*/
152+
func transactWrite<AttributesType, ItemType>(
153+
_ entries: [WriteEntry<AttributesType, ItemType>], constraints: [TransactionConstraintEntry<AttributesType, ItemType>]) async throws
154+
150155
func polymorphicTransactWrite<WriteEntryType: PolymorphicWriteEntry, TransactionConstraintEntryType: PolymorphicTransactionConstraintEntry>(
151156
_ entries: [WriteEntryType], constraints: [TransactionConstraintEntryType]) async throws
152157

Sources/DynamoDBTables/InMemoryDynamoDBCompositePrimaryKeyTable.swift

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,16 @@ public typealias ExecuteItemFilterType = @Sendable (String, String, String, Poly
4545
-> Bool
4646

4747
public protocol InMemoryTransactionDelegate {
48+
/**
49+
Inject errors into a `transactWrite` call.
50+
*/
51+
func injectErrors<AttributesType, ItemType>(
52+
_ entries: [WriteEntry<AttributesType, ItemType>], constraints: [TransactionConstraintEntry<AttributesType, ItemType>],
53+
table: InMemoryDynamoDBCompositePrimaryKeyTable) async throws -> [DynamoDBTableError]
54+
55+
/**
56+
Inject errors into a `polymorphicTransactWrite` call.
57+
*/
4858
func injectErrors<WriteEntryType: PolymorphicWriteEntry,
4959
TransactionConstraintEntryType: PolymorphicTransactionConstraintEntry>(
5060
_ entries: [WriteEntryType], constraints: [TransactionConstraintEntryType],
@@ -98,6 +108,21 @@ public struct InMemoryDynamoDBCompositePrimaryKeyTable: DynamoDBCompositePrimary
98108
try await self.storeWrapper.updateItem(newItem: newItem, existingItem: existingItem)
99109
}
100110

111+
public func transactWrite(_ entries: [WriteEntry<some Any, some Any>]) async throws {
112+
try await self.transactWrite(entries, constraints: [])
113+
}
114+
115+
public func transactWrite<AttributesType, ItemType>(_ entries: [WriteEntry<AttributesType, ItemType>],
116+
constraints: [TransactionConstraintEntry<AttributesType, ItemType>]) async throws
117+
{
118+
// if there is a transaction delegate and it wants to inject errors
119+
if let errors = try await transactionDelegate?.injectErrors(entries, constraints: constraints, table: self), !errors.isEmpty {
120+
throw DynamoDBTableError.transactionCanceled(reasons: errors)
121+
}
122+
123+
return try await self.storeWrapper.bulkWrite(entries, constraints: constraints, isTransaction: true)
124+
}
125+
101126
public func polymorphicTransactWrite(_ entries: [some PolymorphicWriteEntry]) async throws {
102127
let noConstraints: [EmptyPolymorphicTransactionConstraintEntry] = []
103128
return try await self.polymorphicTransactWrite(entries, constraints: noConstraints)
@@ -120,7 +145,7 @@ public struct InMemoryDynamoDBCompositePrimaryKeyTable: DynamoDBCompositePrimary
120145
}
121146

122147
public func bulkWrite(_ entries: [WriteEntry<some Any, some Any>]) async throws {
123-
try await self.storeWrapper.bulkWrite(entries)
148+
try await self.storeWrapper.bulkWrite(entries, constraints: [], isTransaction: false)
124149
}
125150

126151
public func bulkWriteWithFallback(_ entries: [WriteEntry<some Any, some Any>]) async throws {

0 commit comments

Comments
 (0)