Skip to content

Commit 1fc8a2d

Browse files
author
Divjot Arora
committed
GODRIVER-1592 Run CSE flow for change streams
1 parent 237abe9 commit 1fc8a2d

File tree

6 files changed

+193
-2
lines changed

6 files changed

+193
-2
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
{
2+
"json_schema": {
3+
"properties": {
4+
"encrypted_string": {
5+
"encrypt": {
6+
"keyId": [
7+
{
8+
"$binary": {
9+
"base64": "AAAAAAAAAAAAAAAAAAAAAA==",
10+
"subType": "04"
11+
}
12+
}
13+
],
14+
"bsonType": "string",
15+
"algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Deterministic"
16+
}
17+
}
18+
},
19+
"bsonType": "object"
20+
},
21+
"key_vault_data": [
22+
{
23+
"status": 1,
24+
"_id": {
25+
"$binary": {
26+
"base64": "AAAAAAAAAAAAAAAAAAAAAA==",
27+
"subType": "04"
28+
}
29+
},
30+
"masterKey": {
31+
"provider": "aws",
32+
"key": "arn:aws:kms:us-east-1:579766882180:key/89fcc2c4-08b0-4bd9-9f25-e30687b580d0",
33+
"region": "us-east-1"
34+
},
35+
"updateDate": {
36+
"$date": {
37+
"$numberLong": "1552949630483"
38+
}
39+
},
40+
"keyMaterial": {
41+
"$binary": {
42+
"base64": "AQICAHhQNmWG2CzOm1dq3kWLM+iDUZhEqnhJwH9wZVpuZ94A8gEqnsxXlR51T5EbEVezUqqKAAAAwjCBvwYJKoZIhvcNAQcGoIGxMIGuAgEAMIGoBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDHa4jo6yp0Z18KgbUgIBEIB74sKxWtV8/YHje5lv5THTl0HIbhSwM6EqRlmBiFFatmEWaeMk4tO4xBX65eq670I5TWPSLMzpp8ncGHMmvHqRajNBnmFtbYxN3E3/WjxmdbOOe+OXpnGJPcGsftc7cB2shRfA4lICPnE26+oVNXT6p0Lo20nY5XC7jyCO",
43+
"subType": "00"
44+
}
45+
},
46+
"creationDate": {
47+
"$date": {
48+
"$numberLong": "1552949630483"
49+
}
50+
},
51+
"keyAltNames": [
52+
"altname",
53+
"another_altname"
54+
]
55+
}
56+
],
57+
"encrypted_document": {
58+
"_id": 1,
59+
"encrypted_string": {
60+
"$binary": {
61+
"base64": "AQAAAAAAAAAAAAAAAAAAAAACwj+3zkv2VM+aTfk60RqhXq6a/77WlLwu/BxXFkL7EppGsju/m8f0x5kBDD3EZTtGALGXlym5jnpZAoSIkswHoA==",
62+
"subType": "06"
63+
}
64+
}
65+
},
66+
"decrytped_document": {
67+
"_id": 1,
68+
"encrypted_string": "string0"
69+
}
70+
}

mongo/change_stream.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ type changeStreamConfig struct {
6969
streamType StreamType
7070
collectionName string
7171
databaseName string
72+
crypt *driver.Crypt
7273
}
7374

7475
func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline interface{},
@@ -100,8 +101,12 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in
100101
cs.aggregate = operation.NewAggregate(nil).
101102
ReadPreference(config.readPreference).ReadConcern(config.readConcern).
102103
Deployment(cs.client.deployment).ClusterClock(cs.client.clock).
103-
CommandMonitor(cs.client.monitor).Session(cs.sess).ServerSelector(cs.selector).Retry(driver.RetryNone)
104+
CommandMonitor(cs.client.monitor).Session(cs.sess).ServerSelector(cs.selector).Retry(driver.RetryNone).
105+
Crypt(config.crypt)
104106

107+
if config.crypt != nil {
108+
cs.cursorOptions.Crypt = config.crypt
109+
}
105110
if cs.options.Collation != nil {
106111
cs.aggregate.Collation(bsoncore.Document(cs.options.Collation.ToDocument()))
107112
}

mongo/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -812,6 +812,7 @@ func (c *Client) Watch(ctx context.Context, pipeline interface{},
812812
client: c,
813813
registry: c.registry,
814814
streamType: ClientStream,
815+
crypt: c.crypt,
815816
}
816817

817818
return newChangeStream(ctx, csConfig, pipeline, opts...)

mongo/collection.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1553,6 +1553,7 @@ func (coll *Collection) Watch(ctx context.Context, pipeline interface{},
15531553
streamType: CollectionStream,
15541554
collectionName: coll.Name(),
15551555
databaseName: coll.db.Name(),
1556+
crypt: coll.client.crypt,
15561557
}
15571558
return newChangeStream(ctx, csConfig, pipeline, opts...)
15581559
}

mongo/database.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,7 @@ func (db *Database) Watch(ctx context.Context, pipeline interface{},
429429
registry: db.registry,
430430
streamType: DatabaseStream,
431431
databaseName: db.Name(),
432+
crypt: db.client.crypt,
432433
}
433434
return newChangeStream(ctx, csConfig, pipeline, opts...)
434435
}

mongo/integration/client_side_encryption_prose_test.go

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,105 @@ func TestClientSideEncryptionProse(t *testing.T) {
708708
})
709709
}
710710
})
711+
changeStreamOpts := mtest.NewOptions().
712+
CreateClient(false).
713+
Topologies(mtest.ReplicaSet)
714+
mt.RunOpts("change streams", changeStreamOpts, func(mt *mtest.T) {
715+
// Change streams can't easily fit into the spec test format because of their tailable nature, so there are two
716+
// prose tests for them instead:
717+
//
718+
// 1. Auto-encryption errors for Watch operations. Collection-level change streams error because the
719+
// $changeStream aggregation stage is not valid for encryption. Client and database-level streams error because
720+
// only collection-level operations are valid for encryption.
721+
//
722+
// 2. Events are automatically decrypted: If the Watch() is done with BypassAutoEncryption=true, the Watch
723+
// should succeed and subsequent getMore calls should decrypt documents when necessary.
724+
725+
var testConfig struct {
726+
JSONSchema bson.Raw `bson:"json_schema"`
727+
KeyVaultData []bson.Raw `bson:"key_vault_data"`
728+
EncryptedDocument bson.Raw `bson:"encrypted_document"`
729+
DecryptedDocument bson.Raw `bson:"decrytped_document"`
730+
}
731+
decodeJSONFile(mt, "change-streams-test.json", &testConfig)
732+
733+
schemaMap := map[string]interface{}{
734+
"db.coll": testConfig.JSONSchema,
735+
}
736+
kmsProviders := map[string]map[string]interface{}{
737+
"aws": {
738+
"accessKeyId": keyID,
739+
"secretAccessKey": secretAccessKey,
740+
},
741+
}
742+
743+
testCases := []struct {
744+
name string
745+
streamType mongo.StreamType
746+
}{
747+
{"client", mongo.ClientStream},
748+
{"database", mongo.DatabaseStream},
749+
{"collection", mongo.CollectionStream},
750+
}
751+
mt.RunOpts("auto encryption errors", noClientOpts, func(mt *mtest.T) {
752+
for _, tc := range testCases {
753+
mt.Run(tc.name, func(mt *mtest.T) {
754+
autoEncryptionOpts := options.AutoEncryption().
755+
SetKmsProviders(kmsProviders).
756+
SetKeyVaultNamespace(kvNamespace).
757+
SetSchemaMap(schemaMap)
758+
cpt := setup(mt, autoEncryptionOpts, nil, nil)
759+
defer cpt.teardown(mt)
760+
761+
_, err := getWatcher(mt, tc.streamType, cpt).Watch(mtest.Background, mongo.Pipeline{})
762+
assert.NotNil(mt, err, "expected Watch error: %v", err)
763+
})
764+
}
765+
})
766+
mt.RunOpts("events are automatically decrypted", noClientOpts, func(mt *mtest.T) {
767+
for _, tc := range testCases {
768+
mt.Run(tc.name, func(mt *mtest.T) {
769+
autoEncryptionOpts := options.AutoEncryption().
770+
SetKmsProviders(kmsProviders).
771+
SetKeyVaultNamespace(kvNamespace).
772+
SetSchemaMap(schemaMap).
773+
SetBypassAutoEncryption(true)
774+
cpt := setup(mt, autoEncryptionOpts, nil, nil)
775+
defer cpt.teardown(mt)
776+
777+
// Insert key vault data so the key can be accessed when starting the change stream.
778+
insertDocuments(mt, cpt.keyVaultColl, testConfig.KeyVaultData)
779+
780+
stream, err := getWatcher(mt, tc.streamType, cpt).Watch(mtest.Background, mongo.Pipeline{})
781+
assert.Nil(mt, err, "Watch error: %v", err)
782+
defer stream.Close(mtest.Background)
783+
784+
// Insert already encrypted data and verify that it is automatically decrypted by Next().
785+
insertDocuments(mt, cpt.coll, []bson.Raw{testConfig.EncryptedDocument})
786+
assert.True(mt, stream.Next(mtest.Background), "expected Next to return true, got false")
787+
gotDocument := stream.Current.Lookup("fullDocument").Document()
788+
err = compareDocs(mt, testConfig.DecryptedDocument, gotDocument)
789+
assert.Nil(mt, err, "compareDocs error: %v", err)
790+
})
791+
}
792+
})
793+
})
794+
}
795+
796+
func getWatcher(mt *mtest.T, streamType mongo.StreamType, cpt *cseProseTest) watcher {
797+
mt.Helper()
798+
799+
switch streamType {
800+
case mongo.ClientStream:
801+
return cpt.cseClient
802+
case mongo.DatabaseStream:
803+
return cpt.cseColl.Database()
804+
case mongo.CollectionStream:
805+
return cpt.cseColl
806+
default:
807+
mt.Fatalf("unknown stream type %v", streamType)
808+
}
809+
return nil
711810
}
712811

713812
type cseProseTest struct {
@@ -729,10 +828,12 @@ func setup(mt *mtest.T, aeo *options.AutoEncryptionOptions, kvClientOpts *option
729828
cpt.coll = mt.CreateCollection(mtest.Collection{
730829
Name: "coll",
731830
DB: "db",
831+
Opts: options.Collection().SetWriteConcern(mtest.MajorityWc),
732832
}, false)
733833
cpt.keyVaultColl = mt.CreateCollection(mtest.Collection{
734834
Name: "datakeys",
735-
DB: "admin",
835+
DB: "keyvault",
836+
Opts: options.Collection().SetWriteConcern(mtest.MajorityWc),
736837
}, false)
737838

738839
if aeo != nil {
@@ -781,6 +882,18 @@ func readJSONFile(mt *mtest.T, file string) bson.Raw {
781882
return doc
782883
}
783884

885+
func decodeJSONFile(mt *mtest.T, file string, val interface{}) bson.Raw {
886+
mt.Helper()
887+
888+
content, err := ioutil.ReadFile(filepath.Join(clientEncryptionProseDir, file))
889+
assert.Nil(mt, err, "ReadFile error for %v: %v", file, err)
890+
891+
var doc bson.Raw
892+
err = bson.UnmarshalExtJSON(content, true, val)
893+
assert.Nil(mt, err, "UnmarshalExtJSON error for file %v: %v", file, err)
894+
return doc
895+
}
896+
784897
func rawValueToCoreValue(rv bson.RawValue) bsoncore.Value {
785898
return bsoncore.Value{Type: rv.Type, Data: rv.Value}
786899
}

0 commit comments

Comments
 (0)