-
Notifications
You must be signed in to change notification settings - Fork 16
feat: implement BufferedCipherSubscriber to enforce buffered decrypti… #99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
f5a6723
feat: implement BufferedCipherSubscriber to enforce buffered decrypti…
1023e92
look within runtime exception for AEADBagTagException
358e317
add some simple logging to StreamTest
9c90565
turn off all tests except decrypt stream, add more logging
6394888
signal onComplete from onNext when all data has been read
7560520
nuke test coverage
f1d78c8
add a guard against duplicate onComplete calls
c559d0c
a little more logging...
e69e35a
move to concurrentLinkedQueue
29fa14e
try passing an empty BB
13918cb
run test 100 times for moar samples
e576f63
move bounded zeros inputstream to 12 to avoid any weirdness with 0 by…
3a940b5
cleanup: reactivate other tests, remove logging
cd36329
reinstate test coverage
c9d0e27
remove printStackTrace from BufferedCipherSubscriber
346c57b
add Nonnull annotation to contentLength, also remove some dead code a…
62654e6
Revert "add Nonnull annotation to contentLength, also remove some dea…
92dbd72
Merge branch 'main' of github.com:aws/aws-s3-encryption-client-java i…
ea75319
separate out null check on contentLength
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
35 changes: 35 additions & 0 deletions
35
src/main/java/software/amazon/encryption/s3/internal/BufferedCipherPublisher.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
package software.amazon.encryption.s3.internal; | ||
|
||
import org.reactivestreams.Subscriber; | ||
import software.amazon.awssdk.core.async.SdkPublisher; | ||
import software.amazon.encryption.s3.legacy.internal.RangedGetUtils; | ||
|
||
import javax.crypto.Cipher; | ||
import java.nio.ByteBuffer; | ||
|
||
public class BufferedCipherPublisher implements SdkPublisher<ByteBuffer> { | ||
|
||
private final SdkPublisher<ByteBuffer> wrappedPublisher; | ||
private final Cipher cipher; | ||
private final Long contentLength; | ||
private final long[] range; | ||
private final String contentRange; | ||
private final int cipherTagLengthBits; | ||
|
||
public BufferedCipherPublisher(final Cipher cipher, final SdkPublisher<ByteBuffer> wrappedPublisher, final Long contentLength, long[] range, String contentRange, int cipherTagLengthBits) { | ||
this.wrappedPublisher = wrappedPublisher; | ||
this.cipher = cipher; | ||
this.contentLength = contentLength; | ||
this.range = range; | ||
this.contentRange = contentRange; | ||
this.cipherTagLengthBits = cipherTagLengthBits; | ||
} | ||
|
||
@Override | ||
public void subscribe(Subscriber<? super ByteBuffer> subscriber) { | ||
// Wrap the (customer) subscriber in a CipherSubscriber, then subscribe it | ||
// to the wrapped (ciphertext) publisher | ||
Subscriber<? super ByteBuffer> wrappedSubscriber = RangedGetUtils.adjustToDesiredRange(subscriber, range, contentRange, cipherTagLengthBits); | ||
wrappedPublisher.subscribe(new BufferedCipherSubscriber(wrappedSubscriber, cipher, contentLength)); | ||
} | ||
} |
129 changes: 129 additions & 0 deletions
129
src/main/java/software/amazon/encryption/s3/internal/BufferedCipherSubscriber.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
package software.amazon.encryption.s3.internal; | ||
|
||
import org.reactivestreams.Subscriber; | ||
import org.reactivestreams.Subscription; | ||
import software.amazon.awssdk.utils.BinaryUtils; | ||
import software.amazon.encryption.s3.S3EncryptionClientException; | ||
import software.amazon.encryption.s3.S3EncryptionClientSecurityException; | ||
|
||
import javax.crypto.Cipher; | ||
import java.nio.ByteBuffer; | ||
import java.security.GeneralSecurityException; | ||
import java.util.Queue; | ||
import java.util.concurrent.ConcurrentLinkedQueue; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
/** | ||
* A subscriber which decrypts data by buffering the object's contents | ||
* so that authentication can be done before any plaintext is released. | ||
* This prevents "release of unauthenticated plaintext" at the cost of | ||
* allocating a large buffer. | ||
*/ | ||
public class BufferedCipherSubscriber implements Subscriber<ByteBuffer> { | ||
|
||
// 64MiB ought to be enough for most usecases | ||
private static final long BUFFERED_MAX_CONTENT_LENGTH_MiB = 64; | ||
private static final long BUFFERED_MAX_CONTENT_LENGTH_BYTES = 1024 * 1024 * BUFFERED_MAX_CONTENT_LENGTH_MiB; | ||
|
||
private final AtomicInteger contentRead = new AtomicInteger(0); | ||
private final AtomicBoolean doneFinal = new AtomicBoolean(false); | ||
private final Subscriber<? super ByteBuffer> wrappedSubscriber; | ||
private final Cipher cipher; | ||
private final int contentLength; | ||
|
||
private byte[] outputBuffer; | ||
private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>(); | ||
|
||
BufferedCipherSubscriber(Subscriber<? super ByteBuffer> wrappedSubscriber, Cipher cipher, Long contentLength) { | ||
this.wrappedSubscriber = wrappedSubscriber; | ||
this.cipher = cipher; | ||
if (contentLength == null) { | ||
throw new S3EncryptionClientException("contentLength cannot be null in buffered mode. To enable unbounded " + | ||
"streaming, reconfigure the S3 Encryption Client with Delayed Authentication mode enabled."); | ||
} | ||
if (contentLength > BUFFERED_MAX_CONTENT_LENGTH_BYTES) { | ||
throw new S3EncryptionClientException(String.format("The object you are attempting to decrypt exceeds the maximum content " + | ||
"length allowed in default mode. Please enable Delayed Authentication mode to decrypt objects larger" + | ||
"than %d", BUFFERED_MAX_CONTENT_LENGTH_MiB)); | ||
} | ||
this.contentLength = Math.toIntExact(contentLength); | ||
} | ||
|
||
@Override | ||
public void onSubscribe(Subscription s) { | ||
wrappedSubscriber.onSubscribe(s); | ||
} | ||
|
||
@Override | ||
public void onNext(ByteBuffer byteBuffer) { | ||
int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer(byteBuffer); | ||
|
||
if (amountToReadFromByteBuffer > 0) { | ||
byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer); | ||
outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer); | ||
|
||
if (outputBuffer == null && amountToReadFromByteBuffer < cipher.getBlockSize()) { | ||
// The underlying data is too short to fill in the block cipher | ||
// This is true at the end of the file, so complete to get the final | ||
// bytes | ||
this.onComplete(); | ||
} | ||
|
||
// Enqueue the buffer until all data is read | ||
buffers.add(ByteBuffer.wrap(outputBuffer)); | ||
|
||
// Sometimes, onComplete won't be called, so we check if all | ||
// data is read to avoid hanging indefinitely | ||
if (contentRead.get() == contentLength) { | ||
this.onComplete(); | ||
} | ||
// This avoids the subscriber waiting indefinitely for more data | ||
// without actually releasing any plaintext before it can be authenticated | ||
wrappedSubscriber.onNext(ByteBuffer.allocate(0)); | ||
} | ||
|
||
} | ||
|
||
private int getAmountToReadFromByteBuffer(ByteBuffer byteBuffer) { | ||
|
||
long amountReadSoFar = contentRead.getAndAdd(byteBuffer.remaining()); | ||
long amountRemaining = Math.max(0, contentLength - amountReadSoFar); | ||
|
||
if (amountRemaining > byteBuffer.remaining()) { | ||
return byteBuffer.remaining(); | ||
} else { | ||
return Math.toIntExact(amountRemaining); | ||
} | ||
} | ||
|
||
@Override | ||
public void onError(Throwable t) { | ||
wrappedSubscriber.onError(t); | ||
} | ||
|
||
@Override | ||
public void onComplete() { | ||
if (doneFinal.get()) { | ||
// doFinal has already been called, bail out | ||
return; | ||
} | ||
try { | ||
outputBuffer = cipher.doFinal(); | ||
doneFinal.set(true); | ||
// Once doFinal is called, then we can release the plaintext | ||
if (contentRead.get() == contentLength) { | ||
while (!buffers.isEmpty()) { | ||
wrappedSubscriber.onNext(buffers.remove()); | ||
} | ||
} | ||
// Send the final bytes to the wrapped subscriber | ||
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); | ||
} catch (final GeneralSecurityException exception) { | ||
// Forward error, else the wrapped subscriber waits indefinitely | ||
wrappedSubscriber.onError(exception); | ||
throw new S3EncryptionClientSecurityException(exception.getMessage(), exception); | ||
} | ||
wrappedSubscriber.onComplete(); | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Praise: I love clarifying comments.