Skip to content

Commit 0da2a4f

Browse files
more unit tests
1 parent f76d7f9 commit 0da2a4f

File tree

4 files changed

+47
-38
lines changed

4 files changed

+47
-38
lines changed

test/unit/test_crypto_authentication_signer.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,14 @@ def test_GIVEN_no_encoding_WHEN_signer_from_key_bytes_THEN_load_der_private_key(
9494
with patch.object(cryptography.hazmat.primitives, "serialization"):
9595
# Mock the `serialization.load_der_private_key`
9696
with patch.object(aws_encryption_sdk.internal.crypto.authentication.serialization, "load_der_private_key") as mock_der:
97+
# When: from_key_bytes
9798
Signer.from_key_bytes(
9899
algorithm=_algorithm,
99100
key_bytes=sentinel.key_bytes,
101+
# Given: No encoding provided => default arg
100102
)
101103

104+
# Then: calls load_der_private_key
102105
mock_der.assert_called_once_with(
103106
data=sentinel.key_bytes, password=None, backend=patch_default_backend.return_value
104107
)
@@ -113,12 +116,15 @@ def test_GIVEN_PEM_encoding_WHEN_signer_from_key_bytes_THEN_load_pem_private_key
113116
mock_algorithm_info = MagicMock(return_value=sentinel.algorithm_info, spec=patch_ec.EllipticCurve)
114117
_algorithm = MagicMock(signing_algorithm_info=mock_algorithm_info)
115118

119+
# When: from_key_bytes
116120
signer = Signer.from_key_bytes(
117121
algorithm=_algorithm,
118122
key_bytes=sentinel.key_bytes,
123+
# Given: PEM encoding
119124
encoding=patch_serialization.Encoding.PEM
120125
)
121126

127+
# Then: calls load_pem_private_key
122128
patch_serialization.load_pem_private_key.assert_called_once_with(
123129
data=sentinel.key_bytes, password=None, backend=patch_default_backend.return_value
124130
)
@@ -136,10 +142,13 @@ def test_GIVEN_unrecognized_encoding_WHEN_signer_from_key_bytes_THEN_raise_Value
136142
mock_algorithm_info = MagicMock(return_value=sentinel.algorithm_info, spec=patch_ec.EllipticCurve)
137143
_algorithm = MagicMock(signing_algorithm_info=mock_algorithm_info)
138144

145+
# Then: Raises ValueError
139146
with pytest.raises(ValueError):
147+
# When: from_key_bytes
140148
signer = Signer.from_key_bytes(
141149
algorithm=_algorithm,
142150
key_bytes=sentinel.key_bytes,
151+
# Given: Invalid encoding
143152
encoding="not an encoding"
144153
)
145154

test/unit/test_streaming_client_configs.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,22 +193,28 @@ def test_client_config_converts(kwargs, stream_type):
193193
assert isinstance(test.materials_manager, DefaultCryptoMaterialsManager)
194194

195195

196+
# Given: no MPL
196197
@pytest.mark.skipif(HAS_MPL, reason="Test should only be executed without MPL in installation")
197198
@patch.object(_ClientConfig, "_no_mpl_attrs_post_init")
198199
def test_GIVEN_no_mpl_WHEN_attrs_post_init_THEN_calls_no_mpl_method(
199200
mock_no_mpl_attrs_post_init,
200201
):
202+
# When: attrs_post_init
201203
_ClientConfig(**BASE_KWARGS)
204+
# Then: calls _no_mpl_attrs_post_init
202205
mock_no_mpl_attrs_post_init.assert_called_once_with()
203206

204207

208+
# Given: has MPL
205209
@pytest.mark.skipif(not HAS_MPL, reason="Test should only be executed with MPL in installation")
206210
@patch.object(_ClientConfig, "_has_mpl_attrs_post_init")
207211
def test_GIVEN_has_mpl_WHEN_attrs_post_init_THEN_calls_no_mpl_method(
208-
_has_mpl_attrs_post_init,
212+
mock_has_mpl_attrs_post_init,
209213
):
214+
# When: attrs_post_init
210215
_ClientConfig(**BASE_KWARGS)
211-
_has_mpl_attrs_post_init.assert_called_once_with()
216+
# Then: calls _has_mpl_attrs_post_init
217+
mock_has_mpl_attrs_post_init.assert_called_once_with()
212218

213219

214220
@pytest.mark.skipif(not HAS_MPL, reason="Test should only be executed with MPL in installation")

test/unit/test_streaming_client_stream_decryptor.py

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,11 @@
3737
# Ideally, this logic would be based on mocking imports and testing logic,
3838
# but doing that introduces errors that cause other tests to fail.
3939
try:
40-
from aws_cryptographic_materialproviders.mpl.references import (
41-
IKeyring,
42-
)
43-
HAS_MPL = True
44-
4540
from aws_encryption_sdk.materials_managers.mpl.cmm import (
4641
CryptoMaterialsManagerFromMPL,
4742
)
43+
HAS_MPL = True
44+
4845
except ImportError:
4946
HAS_MPL = False
5047

@@ -238,12 +235,14 @@ def test_read_header(self, mock_derive_datakey, mock_decrypt_materials_request,
238235
@patch("aws_encryption_sdk.streaming_client.DecryptionMaterialsRequest")
239236
@patch("aws_encryption_sdk.streaming_client.derive_data_encryption_key")
240237
@patch("aws_encryption_sdk.streaming_client.Verifier")
238+
# Given: no MPL
241239
@pytest.mark.skipif(HAS_MPL, reason="Test should only be executed without MPL in installation")
242240
def test_GIVEN_verification_key_AND_no_mpl_WHEN_read_header_THEN_calls_from_key_bytes(
243241
self,
244242
mock_verifier,
245243
*_,
246244
):
245+
# Given: verification key
247246
mock_verifier_instance = MagicMock()
248247
mock_verifier.from_key_bytes.return_value = mock_verifier_instance
249248
ct_stream = io.BytesIO(VALUES["data_128"])
@@ -256,35 +255,42 @@ def test_GIVEN_verification_key_AND_no_mpl_WHEN_read_header_THEN_calls_from_key_
256255
test_decryptor.source_stream = ct_stream
257256
test_decryptor._stream_length = len(VALUES["data_128"])
258257

258+
# When: read header
259259
test_decryptor._read_header()
260260

261+
# Then: calls from_key_bytes
261262
mock_verifier.from_key_bytes.assert_called_once_with(
262263
algorithm=self.mock_header.algorithm, key_bytes=sentinel.verification_key
263264
)
264265

265266
@patch("aws_encryption_sdk.streaming_client.DecryptionMaterialsRequest")
266267
@patch("aws_encryption_sdk.streaming_client.derive_data_encryption_key")
267268
@patch("aws_encryption_sdk.streaming_client.Verifier")
269+
# Given: has MPL
268270
@pytest.mark.skipif(not HAS_MPL, reason="Test should only be executed with MPL in installation")
269271
def test_GIVEN_verification_key_AND_has_mpl_AND_not_MPLCMM_WHEN_read_header_THEN_calls_from_key_bytes(
270272
self,
271273
mock_verifier,
272274
*_,
273275
):
276+
# Given: verification key
274277
mock_verifier_instance = MagicMock()
275278
mock_verifier.from_key_bytes.return_value = mock_verifier_instance
276279
ct_stream = io.BytesIO(VALUES["data_128"])
277280
mock_commitment_policy = MagicMock(__class__=CommitmentPolicy)
278281
test_decryptor = StreamDecryptor(
282+
# Given: native CMM
279283
materials_manager=self.mock_materials_manager,
280284
source=ct_stream,
281285
commitment_policy=mock_commitment_policy,
282286
)
283287
test_decryptor.source_stream = ct_stream
284288
test_decryptor._stream_length = len(VALUES["data_128"])
285289

290+
# When: read_header
286291
test_decryptor._read_header()
287292

293+
# Then: calls from_key_bytess
288294
mock_verifier.from_key_bytes.assert_called_once_with(
289295
algorithm=self.mock_header.algorithm, key_bytes=sentinel.verification_key
290296
)
@@ -293,56 +299,36 @@ def test_GIVEN_verification_key_AND_has_mpl_AND_not_MPLCMM_WHEN_read_header_THEN
293299
@patch("aws_encryption_sdk.streaming_client.derive_data_encryption_key")
294300
@patch("aws_encryption_sdk.streaming_client.Verifier")
295301
@patch("base64.b64encode")
302+
# Given: has MPL
296303
@pytest.mark.skipif(not HAS_MPL, reason="Test should only be executed with MPL in installation")
297304
def test_GIVEN_verification_key_AND_has_mpl_AND_has_MPLCMM_WHEN_read_header_THEN_calls_from_encoded_point(
298305
self,
299306
mock_b64encoding,
300307
mock_verifier,
301308
*_,
302309
):
310+
# Given: Verification key
303311
mock_verifier_instance = MagicMock()
304312
mock_verifier.from_key_bytes.return_value = mock_verifier_instance
305313
ct_stream = io.BytesIO(VALUES["data_128"])
306314
mock_commitment_policy = MagicMock(__class__=CommitmentPolicy)
307315
test_decryptor = StreamDecryptor(
316+
# Given: MPL CMM
308317
materials_manager=self.mock_mpl_materials_manager,
309318
source=ct_stream,
310319
commitment_policy=mock_commitment_policy,
311320
)
312321
test_decryptor.source_stream = ct_stream
313322
test_decryptor._stream_length = len(VALUES["data_128"])
314323

324+
# When: read header
315325
test_decryptor._read_header()
316326

327+
# Then: calls from_encoded_point
317328
mock_verifier.from_encoded_point.assert_called_once_with(
318329
algorithm=self.mock_header.algorithm, encoded_point=mock_b64encoding()
319330
)
320331

321-
# @patch("aws_encryption_sdk.streaming_client.Verifier")
322-
# @pytest.mark.skipif(not HAS_MPL, reason="Test should only be executed with MPL in installation")
323-
# def test_GIVEN_verification_key_AND_has_mpl_AND_not_MPLCMM_WHEN_read_header_THEN_calls_from_key_bytes(
324-
# self,
325-
# mock_verifier,
326-
# ):
327-
# mock_verifier_instance = MagicMock()
328-
# mock_verifier.from_key_bytes.return_value = mock_verifier_instance
329-
# ct_stream = io.BytesIO(VALUES["data_128"])
330-
# mock_commitment_policy = MagicMock(__class__=CommitmentPolicy)
331-
# test_decryptor = StreamDecryptor(
332-
# materials_manager=self.mock_materials_manager,
333-
# source=ct_stream,
334-
# commitment_policy=mock_commitment_policy,
335-
# )
336-
# test_decryptor.source_stream = ct_stream
337-
# test_decryptor._stream_length = len(VALUES["data_128"])
338-
339-
# test_decryptor._read_header()
340-
341-
# mock_verifier.from_key_bytes.assert_called_once_with(
342-
# algorithm=self.mock_header.algorithm, key_bytes=sentinel.verification_key
343-
# )
344-
345-
346332
@patch("aws_encryption_sdk.streaming_client.derive_data_encryption_key")
347333
def test_read_header_frame_too_large(self, mock_derive_datakey):
348334
self.mock_header.content_type = ContentType.FRAMED_DATA

test/unit/test_streaming_client_stream_encryptor.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,11 @@
4242
# Ideally, this logic would be based on mocking imports and testing logic,
4343
# but doing that introduces errors that cause other tests to fail.
4444
try:
45-
from aws_cryptographic_materialproviders.mpl.references import (
46-
IKeyring,
47-
)
48-
HAS_MPL = True
49-
5045
from aws_encryption_sdk.materials_managers.mpl.cmm import (
5146
CryptoMaterialsManagerFromMPL,
5247
)
48+
HAS_MPL = True
49+
5350
except ImportError:
5451
HAS_MPL = False
5552

@@ -387,6 +384,7 @@ def test_prep_message_non_framed_message(self, mock_write_header, mock_prep_non_
387384
test_encryptor._prep_message()
388385
mock_prep_non_framed.assert_called_once_with()
389386

387+
# Given: no MPL
390388
@pytest.mark.skipif(HAS_MPL, reason="Test should only be executed without MPL in installation")
391389
def test_GIVEN_no_mpl_AND_uses_signer_WHEN_prep_message_THEN_signer_uses_default_encoding(self):
392390
self.mock_encryption_materials.algorithm = Algorithm.AES_128_GCM_IV12_TAG16
@@ -400,17 +398,21 @@ def test_GIVEN_no_mpl_AND_uses_signer_WHEN_prep_message_THEN_signer_uses_default
400398
)
401399
test_encryptor.content_type = ContentType.FRAMED_DATA
402400
with patch.object(self.mock_signer, "from_key_bytes"):
401+
# When: prep message
403402
test_encryptor._prep_message()
403+
# Then: calls from_key_bytes with default encoding
404404
self.mock_signer.from_key_bytes.assert_called_once_with(
405405
algorithm=self.mock_encryption_materials.algorithm,
406406
key_bytes=self.mock_encryption_materials.signing_key
407407
)
408408

409+
# Given: has MPL
409410
@pytest.mark.skipif(not HAS_MPL, reason="Test should only be executed with MPL in installation")
410411
def test_GIVEN_has_mpl_AND_not_MPLCMM_AND_uses_signer_WHEN_prep_message_THEN_signer_uses_default_encoding(self):
411412
self.mock_encryption_materials.algorithm = Algorithm.AES_128_GCM_IV12_TAG16
412413
test_encryptor = StreamEncryptor(
413414
source=VALUES["data_128"],
415+
# Given: native CMM
414416
materials_manager=self.mock_materials_manager,
415417
frame_length=self.mock_frame_length,
416418
algorithm=Algorithm.AES_128_GCM_IV12_TAG16,
@@ -419,17 +421,21 @@ def test_GIVEN_has_mpl_AND_not_MPLCMM_AND_uses_signer_WHEN_prep_message_THEN_sig
419421
)
420422
test_encryptor.content_type = ContentType.FRAMED_DATA
421423
with patch.object(self.mock_signer, "from_key_bytes"):
424+
# When: prep_message
422425
test_encryptor._prep_message()
426+
# Then: calls from_key_bytes with default encoding
423427
self.mock_signer.from_key_bytes.assert_called_once_with(
424428
algorithm=self.mock_encryption_materials.algorithm,
425429
key_bytes=self.mock_encryption_materials.signing_key
426430
)
427431

432+
# Given: has MPL
428433
@pytest.mark.skipif(not HAS_MPL, reason="Test should only be executed with MPL in installation")
429-
def test_GIVEN_has_mpl_AND_has_MPLCMM_AND_uses_signer_WHEN_prep_message_THEN_signer_uses_default_encoding(self):
434+
def test_GIVEN_has_mpl_AND_has_MPLCMM_AND_uses_signer_WHEN_prep_message_THEN_signer_uses_PEM_encoding(self):
430435
self.mock_encryption_materials.algorithm = Algorithm.AES_128_GCM_IV12_TAG16
431436
test_encryptor = StreamEncryptor(
432437
source=VALUES["data_128"],
438+
# Given: MPL CMM
433439
materials_manager=self.mock_mpl_materials_manager,
434440
frame_length=self.mock_frame_length,
435441
algorithm=Algorithm.AES_128_GCM_IV12_TAG16,
@@ -438,10 +444,12 @@ def test_GIVEN_has_mpl_AND_has_MPLCMM_AND_uses_signer_WHEN_prep_message_THEN_sig
438444
)
439445
test_encryptor.content_type = ContentType.FRAMED_DATA
440446
with patch.object(self.mock_signer, "from_key_bytes"):
447+
# When: prep_message
441448
test_encryptor._prep_message()
442449
self.mock_signer.from_key_bytes.assert_called_once_with(
443450
algorithm=self.mock_encryption_materials.algorithm,
444451
key_bytes=self.mock_encryption_materials.signing_key,
452+
# Then: calls from_key_bytes with PEM encoding
445453
encoding=serialization.Encoding.PEM
446454
)
447455

0 commit comments

Comments
 (0)