1
1
import { MongoClient , Db , Collection , GridFSBucket , Document } from '../../../src/index' ;
2
+ import { ReadConcern } from '../../../src/read_concern' ;
3
+ import { WriteConcern } from '../../../src/write_concern' ;
4
+ import { ReadPreference } from '../../../src/read_preference' ;
2
5
import { ClientSession } from '../../../src/sessions' ;
3
6
import { ChangeStream } from '../../../src/change_stream' ;
4
7
import type { ClientEntity , EntityDescription } from './schema' ;
@@ -8,13 +11,17 @@ import type {
8
11
CommandSucceededEvent
9
12
} from '../../../src/cmap/events' ;
10
13
import { patchCollectionOptions , patchDbOptions } from './unified-utils' ;
11
- import { TestConfiguration } from './unified.test' ;
12
14
import { expect } from 'chai' ;
15
+ import { TestConfiguration } from './runner' ;
13
16
14
17
interface UnifiedChangeStream extends ChangeStream {
15
18
eventCollector : InstanceType < typeof import ( '../../tools/utils' ) [ 'EventCollector' ] > ;
16
19
}
17
20
21
+ interface UnifiedClientSession extends ClientSession {
22
+ client : UnifiedMongoClient ;
23
+ }
24
+
18
25
export type CommandEvent = CommandStartedEvent | CommandSucceededEvent | CommandFailedEvent ;
19
26
20
27
export class UnifiedMongoClient extends MongoClient {
@@ -85,7 +92,7 @@ export type Entity =
85
92
| UnifiedMongoClient
86
93
| Db
87
94
| Collection
88
- | ClientSession
95
+ | UnifiedClientSession
89
96
| UnifiedChangeStream
90
97
| GridFSBucket
91
98
| Document ; // Results from operations
@@ -112,7 +119,7 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
112
119
mapOf ( type : 'client' ) : EntitiesMap < UnifiedMongoClient > ;
113
120
mapOf ( type : 'db' ) : EntitiesMap < Db > ;
114
121
mapOf ( type : 'collection' ) : EntitiesMap < Collection > ;
115
- mapOf ( type : 'session' ) : EntitiesMap < ClientSession > ;
122
+ mapOf ( type : 'session' ) : EntitiesMap < UnifiedClientSession > ;
116
123
mapOf ( type : 'bucket' ) : EntitiesMap < GridFSBucket > ;
117
124
mapOf ( type : 'stream' ) : EntitiesMap < UnifiedChangeStream > ;
118
125
mapOf ( type : EntityTypeId ) : EntitiesMap < Entity > {
@@ -126,13 +133,13 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
126
133
getEntity ( type : 'client' , key : string , assertExists ?: boolean ) : UnifiedMongoClient ;
127
134
getEntity ( type : 'db' , key : string , assertExists ?: boolean ) : Db ;
128
135
getEntity ( type : 'collection' , key : string , assertExists ?: boolean ) : Collection ;
129
- getEntity ( type : 'session' , key : string , assertExists ?: boolean ) : ClientSession ;
136
+ getEntity ( type : 'session' , key : string , assertExists ?: boolean ) : UnifiedClientSession ;
130
137
getEntity ( type : 'bucket' , key : string , assertExists ?: boolean ) : GridFSBucket ;
131
138
getEntity ( type : 'stream' , key : string , assertExists ?: boolean ) : UnifiedChangeStream ;
132
139
getEntity ( type : EntityTypeId , key : string , assertExists = true ) : Entity {
133
140
const entity = this . get ( key ) ;
134
141
if ( ! entity ) {
135
- if ( assertExists ) throw new Error ( `Entity ${ key } does not exist` ) ;
142
+ if ( assertExists ) throw new Error ( `Entity ' ${ key } ' does not exist` ) ;
136
143
return ;
137
144
}
138
145
const ctor = ENTITY_CTORS . get ( type ) ;
@@ -163,7 +170,8 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
163
170
const map = new EntitiesMap ( ) ;
164
171
for ( const entity of entities ?? [ ] ) {
165
172
if ( 'client' in entity ) {
166
- const client = new UnifiedMongoClient ( config . url ( ) , entity . client ) ;
173
+ const uri = config . url ( { useMultipleMongoses : entity . client . useMultipleMongoses } ) ;
174
+ const client = new UnifiedMongoClient ( uri , entity . client ) ;
167
175
await client . connect ( ) ;
168
176
map . set ( entity . client . id , client ) ;
169
177
} else if ( 'database' in entity ) {
@@ -181,11 +189,60 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
181
189
) ;
182
190
map . set ( entity . collection . id , collection ) ;
183
191
} else if ( 'session' in entity ) {
184
- map . set ( entity . session . id , null ) ;
192
+ const client = map . getEntity ( 'client' , entity . session . client ) ;
193
+
194
+ const options = Object . create ( null ) ;
195
+
196
+ if ( entity . session . sessionOptions ?. causalConsistency ) {
197
+ options . causalConsistency = entity . session . sessionOptions ?. causalConsistency ;
198
+ }
199
+
200
+ if ( entity . session . sessionOptions ?. defaultTransactionOptions ) {
201
+ options . defaultTransactionOptions = Object . create ( null ) ;
202
+ const defaultOptions = entity . session . sessionOptions . defaultTransactionOptions ;
203
+ if ( defaultOptions . readConcern ) {
204
+ options . defaultTransactionOptions . readConcern = ReadConcern . fromOptions (
205
+ defaultOptions . readConcern
206
+ ) ;
207
+ }
208
+ if ( defaultOptions . writeConcern ) {
209
+ options . defaultTransactionOptions . writeConcern = WriteConcern . fromOptions (
210
+ defaultOptions
211
+ ) ;
212
+ }
213
+ if ( defaultOptions . readPreference ) {
214
+ options . defaultTransactionOptions . readPreference = ReadPreference . fromOptions (
215
+ defaultOptions . readPreference
216
+ ) ;
217
+ }
218
+ if ( typeof defaultOptions . maxCommitTimeMS === 'number' ) {
219
+ options . defaultTransactionOptions . maxCommitTimeMS = defaultOptions . maxCommitTimeMS ;
220
+ }
221
+ }
222
+
223
+ const session = client . startSession ( options ) as UnifiedClientSession ;
224
+ // targetedFailPoint operations need to access the client the session came from
225
+ session . client = client ;
226
+
227
+ map . set ( entity . session . id , session ) ;
185
228
} else if ( 'bucket' in entity ) {
186
- map . set ( entity . bucket . id , null ) ;
229
+ const db = map . getEntity ( 'db' , entity . bucket . database ) ;
230
+
231
+ const options = Object . create ( null ) ;
232
+
233
+ if ( entity . bucket . bucketOptions ?. bucketName ) {
234
+ options . bucketName = entity . bucket . bucketOptions ?. bucketName ;
235
+ }
236
+ if ( entity . bucket . bucketOptions ?. chunkSizeBytes ) {
237
+ options . chunkSizeBytes = entity . bucket . bucketOptions ?. chunkSizeBytes ;
238
+ }
239
+ if ( entity . bucket . bucketOptions ?. readPreference ) {
240
+ options . readPreference = entity . bucket . bucketOptions ?. readPreference ;
241
+ }
242
+
243
+ map . set ( entity . bucket . id , new GridFSBucket ( db , options ) ) ;
187
244
} else if ( 'stream' in entity ) {
188
- map . set ( entity . stream . id , null ) ;
245
+ throw new Error ( `Unsupported Entity ${ JSON . stringify ( entity ) } ` ) ;
189
246
} else {
190
247
throw new Error ( `Unsupported Entity ${ JSON . stringify ( entity ) } ` ) ;
191
248
}
0 commit comments