Skip to content

Commit 48a57dd

Browse files
authored
chore: implement automatic cleanup on failure in ParallelCompositeUploadWritableByteChannel (#2238)
1 parent 0ae060b commit 48a57dd

File tree

4 files changed

+171
-56
lines changed

4 files changed

+171
-56
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -534,36 +534,47 @@ public boolean delete(String bucket, String blob, BlobSourceOption... options) {
534534

535535
@Override
536536
public boolean delete(BlobId blob, BlobSourceOption... options) {
537-
Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts);
538-
GrpcCallContext grpcCallContext =
539-
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
540-
DeleteObjectRequest.Builder builder =
541-
DeleteObjectRequest.newBuilder()
542-
.setBucket(bucketNameCodec.encode(blob.getBucket()))
543-
.setObject(blob.getName());
544-
ifNonNull(blob.getGeneration(), builder::setGeneration);
545-
DeleteObjectRequest req = opts.deleteObjectsRequest().apply(builder).build();
546-
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
547-
return Boolean.TRUE.equals(
548-
Retrying.run(
549-
getOptions(),
550-
retryAlgorithmManager.getFor(req),
551-
() -> {
552-
try {
553-
storageClient.deleteObjectCallable().call(req, merge);
554-
return true;
555-
} catch (NotFoundException e) {
556-
return false;
557-
}
558-
},
559-
Decoder.identity()));
537+
Opts<ObjectSourceOpt> opts = Opts.unwrap(options);
538+
try {
539+
internalObjectDelete(blob, opts);
540+
return true;
541+
} catch (NotFoundException e) {
542+
return false;
543+
} catch (StorageException e) {
544+
if (e.getCode() == 404) {
545+
return false;
546+
}
547+
throw e;
548+
}
560549
}
561550

562551
@Override
563552
public boolean delete(BlobId blob) {
564553
return delete(blob, new BlobSourceOption[0]);
565554
}
566555

556+
@Override
557+
public Void internalObjectDelete(BlobId id, Opts<ObjectSourceOpt> opts) {
558+
Opts<ObjectSourceOpt> finalOpts = opts.resolveFrom(id).prepend(defaultOpts);
559+
GrpcCallContext grpcCallContext =
560+
finalOpts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
561+
DeleteObjectRequest.Builder builder =
562+
DeleteObjectRequest.newBuilder()
563+
.setBucket(bucketNameCodec.encode(id.getBucket()))
564+
.setObject(id.getName());
565+
ifNonNull(id.getGeneration(), builder::setGeneration);
566+
DeleteObjectRequest req = finalOpts.deleteObjectsRequest().apply(builder).build();
567+
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
568+
return Retrying.run(
569+
getOptions(),
570+
retryAlgorithmManager.getFor(req),
571+
() -> {
572+
storageClient.deleteObjectCallable().call(req, merge);
573+
return null;
574+
},
575+
Decoder.identity());
576+
}
577+
567578
@Override
568579
public Blob compose(ComposeRequest composeRequest) {
569580
Opts<ObjectTargetOpt> opts = composeRequest.getTargetOpts().prepend(defaultOpts);

google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java

Lines changed: 127 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.api.core.SettableApiFuture;
2323
import com.google.api.gax.grpc.GrpcStatusCode;
2424
import com.google.api.gax.rpc.ApiExceptionFactory;
25+
import com.google.api.gax.rpc.NotFoundException;
2526
import com.google.cloud.BaseServiceException;
2627
import com.google.cloud.storage.ApiFutureUtils.OnFailureApiFutureCallback;
2728
import com.google.cloud.storage.ApiFutureUtils.OnSuccessApiFutureCallback;
@@ -117,6 +118,7 @@ final class ParallelCompositeUploadWritableByteChannel implements BufferedWritab
117118

118119
// immutable bootstrapped state
119120
private final Opts<ObjectTargetOpt> partOpts;
121+
private final Opts<ObjectSourceOpt> srcOpts;
120122
private final AsyncAppendingQueue<BlobInfo> queue;
121123
private final FailureForwarder failureForwarder;
122124
// mutable running state
@@ -154,6 +156,7 @@ final class ParallelCompositeUploadWritableByteChannel implements BufferedWritab
154156
this.totalObjectOffset = 0;
155157

156158
this.partOpts = getPartOpts(opts);
159+
this.srcOpts = partOpts.transformTo(ObjectSourceOpt.class);
157160
this.cumulativeHasher = Hashing.crc32c().newHasher();
158161
this.failureForwarder = new FailureForwarder();
159162
}
@@ -250,14 +253,7 @@ public synchronized void close() throws IOException {
250253
if (partCleanupStrategy.isDeleteOnError()) {
251254
ApiFuture<BlobInfo> cleaningFuture =
252255
ApiFutures.catchingAsync(
253-
validatingTransform,
254-
Throwable.class,
255-
t -> {
256-
// todo:
257-
return ApiFutures.immediateFailedFuture(t);
258-
},
259-
exec);
260-
// todo: verify this gets the first failure and not one from cleaning
256+
validatingTransform, Throwable.class, this::asyncCleanupAfterFailure, exec);
261257
ApiFutures.addCallback(cleaningFuture, failureForwarder, exec);
262258
} else {
263259
ApiFutures.addCallback(validatingTransform, failureForwarder, exec);
@@ -288,7 +284,6 @@ private void internalFlush(ByteBuffer buf) {
288284
// a precondition failure usually means the part was created, but we didn't get the
289285
// response. And when we tried to retry the object already exists.
290286
if (e.getCode() == 412) {
291-
Opts<ObjectSourceOpt> srcOpts = partOpts.transformTo(ObjectSourceOpt.class);
292287
return storage.internalObjectGet(info.getBlobId(), srcOpts);
293288
} else {
294289
throw e;
@@ -320,23 +315,46 @@ private void internalFlush(ByteBuffer buf) {
320315
}
321316

322317
Throwable cause = e.getCause();
323-
// create our exception containing information about the upload context
324-
ParallelCompositeUploadException pcue =
325-
buildParallelCompositeUploadException(cause, exec, pendingParts, successfulParts);
326-
BaseServiceException storageException = StorageException.coalesce(pcue);
327-
328-
// asynchronously fail the finalObject future
329-
CancellationException cancellationException =
330-
new CancellationException(storageException.getMessage());
331-
cancellationException.initCause(storageException);
332-
ApiFutures.addCallback(
333-
ApiFutures.immediateFailedFuture(cancellationException),
334-
(OnFailureApiFutureCallback<BlobInfo>) failureForwarder::onFailure,
335-
exec);
318+
BaseServiceException storageException;
336319
if (partCleanupStrategy.isDeleteOnError()) {
337-
// TODO: cleanup
320+
storageException = StorageException.coalesce(cause);
321+
ApiFuture<Object> cleanupFutures = asyncCleanupAfterFailure(storageException);
322+
// asynchronously fail the finalObject future
323+
CancellationException cancellationException =
324+
new CancellationException(storageException.getMessage());
325+
cancellationException.initCause(storageException);
326+
ApiFutures.addCallback(
327+
cleanupFutures,
328+
new ApiFutureCallback<Object>() {
329+
@Override
330+
public void onFailure(Throwable throwable) {
331+
cancellationException.addSuppressed(throwable);
332+
failureForwarder.onFailure(cancellationException);
333+
}
334+
335+
@Override
336+
public void onSuccess(Object o) {
337+
failureForwarder.onFailure(cancellationException);
338+
}
339+
},
340+
exec);
341+
// this will throw out if anything fails
342+
ApiFutureUtils.await(cleanupFutures);
343+
} else {
344+
// create our exception containing information about the upload context
345+
ParallelCompositeUploadException pcue =
346+
buildParallelCompositeUploadException(cause, exec, pendingParts, successfulParts);
347+
storageException = StorageException.coalesce(pcue);
348+
// asynchronously fail the finalObject future
349+
CancellationException cancellationException =
350+
new CancellationException(storageException.getMessage());
351+
cancellationException.initCause(storageException);
352+
ApiFutures.addCallback(
353+
ApiFutures.immediateFailedFuture(cancellationException),
354+
(OnFailureApiFutureCallback<BlobInfo>) failureForwarder::onFailure,
355+
exec);
356+
throw storageException;
338357
}
339-
throw storageException;
340358
} finally {
341359
current = null;
342360
}
@@ -383,13 +401,16 @@ private ApiFuture<BlobInfo> cleanupParts(BlobInfo finalInfo) {
383401
successfulParts.stream()
384402
// make sure we don't delete the object we're wanting to create
385403
.filter(id -> !id.equals(finalInfo.getBlobId()))
386-
.map(ApiFutures::immediateFuture)
387-
.map(f -> ApiFutures.transform(f, storage::delete, exec))
404+
.map(this::deleteAsync)
388405
.collect(Collectors.toList());
389406

390407
ApiFuture<List<Boolean>> deletes2 = ApiFutureUtils.quietAllAsList(deletes);
391408

392-
return ApiFutures.transform(deletes2, ignore -> finalInfo, exec);
409+
return ApiFutures.catchingAsync(
410+
ApiFutures.transform(deletes2, ignore -> finalInfo, exec),
411+
Throwable.class,
412+
cause -> ApiFutures.immediateFailedFuture(StorageException.coalesce(cause)),
413+
exec);
393414
}
394415

395416
private BlobInfo definePart(BlobInfo ultimateObject, PartRange partRange, long offset) {
@@ -409,6 +430,75 @@ private BlobInfo definePart(BlobInfo ultimateObject, PartRange partRange, long o
409430
return b.build();
410431
}
411432

433+
private <R> ApiFuture<R> asyncCleanupAfterFailure(Throwable originalFailure) {
434+
ApiFuture<ImmutableList<BlobId>> pendingAndSuccessfulBlobIds =
435+
getPendingAndSuccessfulBlobIds(exec, pendingParts, successfulParts);
436+
return ApiFutures.transformAsync(
437+
pendingAndSuccessfulBlobIds,
438+
blobIds -> {
439+
ImmutableList<ApiFuture<Boolean>> pendingDeletes =
440+
blobIds.stream().map(this::deleteAsync).collect(ImmutableList.toImmutableList());
441+
442+
ApiFuture<List<Boolean>> futureDeleteResults =
443+
ApiFutures.successfulAsList(pendingDeletes);
444+
445+
return ApiFutures.transformAsync(
446+
futureDeleteResults,
447+
deleteResults -> {
448+
List<BlobId> failedDeletes = new ArrayList<>();
449+
for (int i = 0; i < blobIds.size(); i++) {
450+
BlobId id = blobIds.get(i);
451+
Boolean deleteResult = deleteResults.get(i);
452+
// deleteResult not equal to true means the request completed but was
453+
// unsuccessful
454+
// deleteResult being null means the future failed
455+
if (!Boolean.TRUE.equals(deleteResult)) {
456+
failedDeletes.add(id);
457+
}
458+
}
459+
460+
if (!failedDeletes.isEmpty()) {
461+
String failedGsUris =
462+
failedDeletes.stream()
463+
.map(BlobId::toGsUtilUriWithGeneration)
464+
.collect(Collectors.joining(",\n", "[\n", "\n]"));
465+
466+
String message =
467+
String.format(
468+
"Incomplete parallel composite upload cleanup after previous error. Unknown object ids: %s",
469+
failedGsUris);
470+
StorageException storageException = new StorageException(0, message, null);
471+
originalFailure.addSuppressed(storageException);
472+
}
473+
return ApiFutures.immediateFailedFuture(originalFailure);
474+
},
475+
exec);
476+
},
477+
exec);
478+
}
479+
480+
@NonNull
481+
private ApiFuture<Boolean> deleteAsync(BlobId id) {
482+
return ApiFutures.transform(
483+
ApiFutures.immediateFuture(id),
484+
v -> {
485+
try {
486+
storage.internalObjectDelete(v, srcOpts);
487+
return true;
488+
} catch (NotFoundException e) {
489+
// not found means the part doesn't exist, which is what we want
490+
return true;
491+
} catch (StorageException e) {
492+
if (e.getCode() == 404) {
493+
return true;
494+
} else {
495+
throw e;
496+
}
497+
}
498+
},
499+
exec);
500+
}
501+
412502
@VisibleForTesting
413503
@NonNull
414504
static Opts<ObjectTargetOpt> getPartOpts(Opts<ObjectTargetOpt> opts) {
@@ -468,6 +558,15 @@ static ParallelCompositeUploadException buildParallelCompositeUploadException(
468558
Executor exec,
469559
List<ApiFuture<BlobInfo>> pendingParts,
470560
List<BlobId> successfulParts) {
561+
ApiFuture<ImmutableList<BlobId>> fCreatedObjects =
562+
getPendingAndSuccessfulBlobIds(exec, pendingParts, successfulParts);
563+
564+
return ParallelCompositeUploadException.of(cause, fCreatedObjects);
565+
}
566+
567+
@NonNull
568+
private static ApiFuture<ImmutableList<BlobId>> getPendingAndSuccessfulBlobIds(
569+
Executor exec, List<ApiFuture<BlobInfo>> pendingParts, List<BlobId> successfulParts) {
471570
ApiFuture<List<BlobInfo>> successfulList = ApiFutures.successfulAsList(pendingParts);
472571
// suppress any failure that might happen when waiting for any pending futures to resolve
473572
ApiFuture<List<BlobInfo>> catching =
@@ -490,7 +589,6 @@ static ParallelCompositeUploadException buildParallelCompositeUploadException(
490589
.distinct()
491590
.collect(ImmutableList.toImmutableList()),
492591
exec);
493-
494-
return ParallelCompositeUploadException.of(cause, fCreatedObjects);
592+
return fCreatedObjects;
495593
}
496594
}

google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ default BlobInfo internalDirectUpload(BlobInfo info, Opts<ObjectTargetOpt> opts,
3939
throw new UnsupportedOperationException("not implemented");
4040
}
4141

42-
default boolean delete(BlobId id) {
42+
// Void to allow easier mapping/use within streams and other mapping contexts
43+
@SuppressWarnings("UnusedReturnValue")
44+
default Void internalObjectDelete(BlobId id, Opts<ObjectSourceOpt> opts) {
4345
throw new UnsupportedOperationException("not implemented");
4446
}
4547

google-cloud-storage/src/test/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannelTest.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ public void badServerCrc32cResultsInException() throws Exception {
460460
bufferHandlePool,
461461
MoreExecutors.directExecutor(),
462462
partNamingStrategy,
463-
PartCleanupStrategy.never(),
463+
PartCleanupStrategy.always(),
464464
3,
465465
finalObject,
466466
new FakeStorageInternal() {
@@ -839,9 +839,13 @@ public BlobInfo compose(ComposeRequest composeRequest) {
839839
}
840840

841841
@Override
842-
public boolean delete(BlobId id) {
842+
public Void internalObjectDelete(BlobId id, Opts<ObjectSourceOpt> opts) {
843843
deleteRequests.add(id);
844-
return addedObjects.containsKey(id);
844+
boolean containsKey = addedObjects.containsKey(id);
845+
if (!containsKey) {
846+
throw ApiExceptionFactory.createException(null, GrpcStatusCode.of(Code.NOT_FOUND), false);
847+
}
848+
return null;
845849
}
846850

847851
@Override

0 commit comments

Comments
 (0)