|
20 | 20 | import traceback
|
21 | 21 | import socket
|
22 | 22 | import sys
|
| 23 | +import textwrap |
23 | 24 | import uuid
|
24 | 25 |
|
25 | 26 | sys.path[0:0] = [""]
|
|
52 | 53 | from test.utils import (TestCreator,
|
53 | 54 | camel_to_snake_args,
|
54 | 55 | OvertCommandListener,
|
| 56 | + WhiteListEventListener, |
55 | 57 | rs_or_single_client,
|
56 | 58 | wait_until)
|
57 | 59 | from test.utils_spec_runner import SpecRunner
|
@@ -1105,5 +1107,141 @@ def test_05_endpoint_invalid_host(self):
|
1105 | 1107 | 'aws', master_key=master_key)
|
1106 | 1108 |
|
1107 | 1109 |
|
| 1110 | +class AzureGCPEncryptionTestMixin(object): |
| 1111 | + KMS_PROVIDER_MAP = None |
| 1112 | + KEYVAULT_DB = 'keyvault' |
| 1113 | + KEYVAULT_COLL = 'datakeys' |
| 1114 | + |
| 1115 | + def read_extjson(self, filename): |
| 1116 | + with open(os.path.join(BASE, 'custom', filename), 'r') as fp: |
| 1117 | + return json_util.loads(fp.read(), json_options=JSON_OPTS) |
| 1118 | + |
| 1119 | + def setup_keyvault(self, DEK): |
| 1120 | + keyvault = self.client.get_database(self.KEYVAULT_DB).get_collection( |
| 1121 | + self.KEYVAULT_COLL, codec_options=OPTS, |
| 1122 | + write_concern=WriteConcern('majority')) |
| 1123 | + keyvault.drop() |
| 1124 | + keyvault.insert_one(DEK) |
| 1125 | + |
| 1126 | + def _test_explicit(self, expectation): |
| 1127 | + client_encryption = ClientEncryption( |
| 1128 | + self.KMS_PROVIDER_MAP, |
| 1129 | + '.'.join([self.KEYVAULT_DB, self.KEYVAULT_COLL]), |
| 1130 | + client_context.client, |
| 1131 | + OPTS) |
| 1132 | + |
| 1133 | + ciphertext = client_encryption.encrypt( |
| 1134 | + 'test', |
| 1135 | + algorithm=Algorithm.AEAD_AES_256_CBC_HMAC_SHA_512_Deterministic, |
| 1136 | + key_id=Binary.from_uuid(self.DEK['_id'], STANDARD)) |
| 1137 | + |
| 1138 | + self.assertEqual(bytes(ciphertext), base64.b64decode(expectation)) |
| 1139 | + self.assertEqual(client_encryption.decrypt(ciphertext), 'test') |
| 1140 | + |
| 1141 | + def _test_automatic(self, expectation_extjson, payload): |
| 1142 | + encrypted_db = "db" |
| 1143 | + encrypted_coll = "coll" |
| 1144 | + |
| 1145 | + encryption_opts = AutoEncryptionOpts( |
| 1146 | + self.KMS_PROVIDER_MAP, |
| 1147 | + '.'.join([self.KEYVAULT_DB, self.KEYVAULT_COLL]), |
| 1148 | + schema_map=self.SCHEMA_MAP) |
| 1149 | + |
| 1150 | + insert_listener = WhiteListEventListener('insert') |
| 1151 | + client = rs_or_single_client( |
| 1152 | + auto_encryption_opts=encryption_opts, |
| 1153 | + event_listeners=[insert_listener]) |
| 1154 | + |
| 1155 | + coll = client.get_database(encrypted_db).get_collection( |
| 1156 | + encrypted_coll, codec_options=OPTS, |
| 1157 | + write_concern=WriteConcern("majority")) |
| 1158 | + coll.drop() |
| 1159 | + |
| 1160 | + expected_document = json_util.loads( |
| 1161 | + expectation_extjson, json_options=JSON_OPTS) |
| 1162 | + |
| 1163 | + coll.insert_one(payload) |
| 1164 | + event = insert_listener.results['started'][0] |
| 1165 | + inserted_doc = event.command['documents'][0] |
| 1166 | + |
| 1167 | + for key, value in expected_document.items(): |
| 1168 | + self.assertEqual(value, inserted_doc[key]) |
| 1169 | + |
| 1170 | + output_doc = coll.find_one({}) |
| 1171 | + for key, value in payload.items(): |
| 1172 | + self.assertEqual(output_doc[key], value) |
| 1173 | + |
| 1174 | + |
| 1175 | +AZURE_CREDS = { |
| 1176 | + 'tenantId': os.environ.get('FLE_AZURE_TENANTID', ''), |
| 1177 | + 'clientId': os.environ.get('FLE_AZURE_CLIENTID', ''), |
| 1178 | + 'clientSecret': os.environ.get('FLE_AZURE_CLIENTSECRET', '')} |
| 1179 | + |
| 1180 | + |
| 1181 | +class TestAzureEncryption(AzureGCPEncryptionTestMixin, |
| 1182 | + EncryptionIntegrationTest): |
| 1183 | + @classmethod |
| 1184 | + @unittest.skipUnless(_HAVE_PYMONGOCRYPT, 'pymongocrypt is not installed') |
| 1185 | + @unittest.skipUnless(any(AZURE_CREDS.values()), |
| 1186 | + 'Azure environment credentials are not set') |
| 1187 | + def setUpClass(cls): |
| 1188 | + cls.KMS_PROVIDER_MAP = {'azure': AZURE_CREDS} |
| 1189 | + cls.DEK = json_data(BASE, 'custom', 'azure-dek.json') |
| 1190 | + cls.SCHEMA_MAP = json_data(BASE, 'custom', 'azure-gcp-schema.json') |
| 1191 | + super(TestAzureEncryption, cls).setUpClass() |
| 1192 | + |
| 1193 | + def setUp(self): |
| 1194 | + self.setup_keyvault(self.DEK) |
| 1195 | + |
| 1196 | + def test_explicit(self): |
| 1197 | + return self._test_explicit( |
| 1198 | + 'AQLN1ERNY0XMhzj42i1hzlwC8/OSU9bHfaQRmmRF5l7d5ZpqJX13qF5zSyExo8N9c1b6uS/LoKrHNzcEMKNrkpi3jf2HiShTFRF0xi8AOD9yfw==') |
| 1199 | + |
| 1200 | + def test_automatic(self): |
| 1201 | + expected_document_extjson = textwrap.dedent(""" |
| 1202 | + {"secret_azure": { |
| 1203 | + "$binary": { |
| 1204 | + "base64": "AQLN1ERNY0XMhzj42i1hzlwC8/OSU9bHfaQRmmRF5l7d5ZpqJX13qF5zSyExo8N9c1b6uS/LoKrHNzcEMKNrkpi3jf2HiShTFRF0xi8AOD9yfw==", |
| 1205 | + "subType": "06"} |
| 1206 | + }}""") |
| 1207 | + return self._test_automatic( |
| 1208 | + expected_document_extjson, {"secret_azure": "test"}) |
| 1209 | + |
| 1210 | + |
| 1211 | +GCP_CREDS = { |
| 1212 | + 'email': os.environ.get('FLE_GCP_EMAIL', ''), |
| 1213 | + 'privateKey': os.environ.get('FLE_GCP_PRIVATEKEY', '')} |
| 1214 | + |
| 1215 | + |
| 1216 | +class TestGCPEncryption(AzureGCPEncryptionTestMixin, |
| 1217 | + EncryptionIntegrationTest): |
| 1218 | + @classmethod |
| 1219 | + @unittest.skipUnless(_HAVE_PYMONGOCRYPT, 'pymongocrypt is not installed') |
| 1220 | + @unittest.skipUnless(any(GCP_CREDS.values()), |
| 1221 | + 'GCP environment credentials are not set') |
| 1222 | + def setUpClass(cls): |
| 1223 | + cls.KMS_PROVIDER_MAP = {'gcp': GCP_CREDS} |
| 1224 | + cls.DEK = json_data(BASE, 'custom', 'gcp-dek.json') |
| 1225 | + cls.SCHEMA_MAP = json_data(BASE, 'custom', 'azure-gcp-schema.json') |
| 1226 | + super(TestGCPEncryption, cls).setUpClass() |
| 1227 | + |
| 1228 | + def setUp(self): |
| 1229 | + self.setup_keyvault(self.DEK) |
| 1230 | + |
| 1231 | + def test_explicit(self): |
| 1232 | + return self._test_explicit( |
| 1233 | + 'AaLFPEi8SURzjW5fDoeaPnoCGcOFAmFOPpn5584VPJJ8iXIgml3YDxMRZD9IWv5otyoft8fBzL1LsDEp0lTeB32cV1gOj0IYeAKHhGIleuHZtA==') |
| 1234 | + |
| 1235 | + def test_automatic(self): |
| 1236 | + expected_document_extjson = textwrap.dedent(""" |
| 1237 | + {"secret_gcp": { |
| 1238 | + "$binary": { |
| 1239 | + "base64": "AaLFPEi8SURzjW5fDoeaPnoCGcOFAmFOPpn5584VPJJ8iXIgml3YDxMRZD9IWv5otyoft8fBzL1LsDEp0lTeB32cV1gOj0IYeAKHhGIleuHZtA==", |
| 1240 | + "subType": "06"} |
| 1241 | + }}""") |
| 1242 | + return self._test_automatic( |
| 1243 | + expected_document_extjson, {"secret_gcp": "test"}) |
| 1244 | + |
| 1245 | + |
1108 | 1246 | if __name__ == "__main__":
|
1109 | 1247 | unittest.main()
|
0 commit comments