1
1
import { MongoClient , Db , Collection , GridFSBucket , Document } from '../../../src/index' ;
2
+ import { ReadConcern } from '../../../src/read_concern' ;
3
+ import { WriteConcern } from '../../../src/write_concern' ;
4
+ import { ReadPreference } from '../../../src/read_preference' ;
2
5
import { ClientSession } from '../../../src/sessions' ;
3
6
import { ChangeStream } from '../../../src/change_stream' ;
4
7
import type { ClientEntity , EntityDescription } from './schema' ;
@@ -10,11 +13,16 @@ import type {
10
13
import { patchCollectionOptions , patchDbOptions } from './unified-utils' ;
11
14
import { TestConfiguration } from './unified.test' ;
12
15
import { expect } from 'chai' ;
16
+ import { parseURI } from '../../../src/connection_string' ;
13
17
14
18
interface UnifiedChangeStream extends ChangeStream {
15
19
eventCollector : InstanceType < typeof import ( '../../tools/utils' ) [ 'EventCollector' ] > ;
16
20
}
17
21
22
+ interface UnifiedClientSession extends ClientSession {
23
+ client : UnifiedMongoClient ;
24
+ }
25
+
18
26
export type CommandEvent = CommandStartedEvent | CommandSucceededEvent | CommandFailedEvent ;
19
27
20
28
export class UnifiedMongoClient extends MongoClient {
@@ -85,7 +93,7 @@ export type Entity =
85
93
| UnifiedMongoClient
86
94
| Db
87
95
| Collection
88
- | ClientSession
96
+ | UnifiedClientSession
89
97
| UnifiedChangeStream
90
98
| GridFSBucket
91
99
| Document ; // Results from operations
@@ -112,7 +120,7 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
112
120
mapOf ( type : 'client' ) : EntitiesMap < UnifiedMongoClient > ;
113
121
mapOf ( type : 'db' ) : EntitiesMap < Db > ;
114
122
mapOf ( type : 'collection' ) : EntitiesMap < Collection > ;
115
- mapOf ( type : 'session' ) : EntitiesMap < ClientSession > ;
123
+ mapOf ( type : 'session' ) : EntitiesMap < UnifiedClientSession > ;
116
124
mapOf ( type : 'bucket' ) : EntitiesMap < GridFSBucket > ;
117
125
mapOf ( type : 'stream' ) : EntitiesMap < UnifiedChangeStream > ;
118
126
mapOf ( type : EntityTypeId ) : EntitiesMap < Entity > {
@@ -126,13 +134,13 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
126
134
getEntity ( type : 'client' , key : string , assertExists ?: boolean ) : UnifiedMongoClient ;
127
135
getEntity ( type : 'db' , key : string , assertExists ?: boolean ) : Db ;
128
136
getEntity ( type : 'collection' , key : string , assertExists ?: boolean ) : Collection ;
129
- getEntity ( type : 'session' , key : string , assertExists ?: boolean ) : ClientSession ;
137
+ getEntity ( type : 'session' , key : string , assertExists ?: boolean ) : UnifiedClientSession ;
130
138
getEntity ( type : 'bucket' , key : string , assertExists ?: boolean ) : GridFSBucket ;
131
139
getEntity ( type : 'stream' , key : string , assertExists ?: boolean ) : UnifiedChangeStream ;
132
140
getEntity ( type : EntityTypeId , key : string , assertExists = true ) : Entity {
133
141
const entity = this . get ( key ) ;
134
142
if ( ! entity ) {
135
- if ( assertExists ) throw new Error ( `Entity ${ key } does not exist` ) ;
143
+ if ( assertExists ) throw new Error ( `Entity ' ${ key } ' does not exist` ) ;
136
144
return ;
137
145
}
138
146
const ctor = ENTITY_CTORS . get ( type ) ;
@@ -163,7 +171,17 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
163
171
const map = new EntitiesMap ( ) ;
164
172
for ( const entity of entities ?? [ ] ) {
165
173
if ( 'client' in entity ) {
166
- const client = new UnifiedMongoClient ( config . url ( ) , entity . client ) ;
174
+ let uri = config . url ( ) ;
175
+ const { hosts, url } = parseURI ( uri ) ;
176
+
177
+ if ( entity . client . useMultipleMongoses ) {
178
+ expect ( hosts ) . to . have . length . greaterThan ( 1 ) ;
179
+ } else if ( entity . client . useMultipleMongoses === false ) {
180
+ url . host = hosts [ 0 ] ;
181
+ uri = url . toString ( ) ;
182
+ }
183
+
184
+ const client = new UnifiedMongoClient ( uri , entity . client ) ;
167
185
await client . connect ( ) ;
168
186
map . set ( entity . client . id , client ) ;
169
187
} else if ( 'database' in entity ) {
@@ -181,11 +199,60 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
181
199
) ;
182
200
map . set ( entity . collection . id , collection ) ;
183
201
} else if ( 'session' in entity ) {
184
- map . set ( entity . session . id , null ) ;
202
+ const client = map . getEntity ( 'client' , entity . session . client ) ;
203
+
204
+ const options = Object . create ( null ) ;
205
+
206
+ if ( entity . session . sessionOptions ?. causalConsistency ) {
207
+ options . causalConsistency = entity . session . sessionOptions ?. causalConsistency ;
208
+ }
209
+
210
+ if ( entity . session . sessionOptions ?. defaultTransactionOptions ) {
211
+ options . defaultTransactionOptions = Object . create ( null ) ;
212
+ const defaultOptions = entity . session . sessionOptions . defaultTransactionOptions ;
213
+ if ( defaultOptions . readConcern ) {
214
+ options . defaultTransactionOptions . readConcern = ReadConcern . fromOptions (
215
+ defaultOptions . readConcern
216
+ ) ;
217
+ }
218
+ if ( defaultOptions . writeConcern ) {
219
+ options . defaultTransactionOptions . writeConcern = WriteConcern . fromOptions (
220
+ defaultOptions
221
+ ) ;
222
+ }
223
+ if ( defaultOptions . readPreference ) {
224
+ options . defaultTransactionOptions . readPreference = ReadPreference . fromOptions (
225
+ defaultOptions . readPreference
226
+ ) ;
227
+ }
228
+ if ( typeof defaultOptions . maxCommitTimeMS === 'number' ) {
229
+ options . defaultTransactionOptions . maxCommitTimeMS = defaultOptions . maxCommitTimeMS ;
230
+ }
231
+ }
232
+
233
+ const session = client . startSession ( options ) as UnifiedClientSession ;
234
+ // targetedFailPoint operations need to access the client the session came from
235
+ session . client = client ;
236
+
237
+ map . set ( entity . session . id , session ) ;
185
238
} else if ( 'bucket' in entity ) {
186
- map . set ( entity . bucket . id , null ) ;
239
+ const db = map . getEntity ( 'db' , entity . bucket . database ) ;
240
+
241
+ const options = Object . create ( null ) ;
242
+
243
+ if ( entity . bucket . bucketOptions ?. bucketName ) {
244
+ options . bucketName = entity . bucket . bucketOptions ?. bucketName ;
245
+ }
246
+ if ( entity . bucket . bucketOptions ?. chunkSizeBytes ) {
247
+ options . chunkSizeBytes = entity . bucket . bucketOptions ?. chunkSizeBytes ;
248
+ }
249
+ if ( entity . bucket . bucketOptions ?. readPreference ) {
250
+ options . readPreference = entity . bucket . bucketOptions ?. readPreference ;
251
+ }
252
+
253
+ map . set ( entity . bucket . id , new GridFSBucket ( db , options ) ) ;
187
254
} else if ( 'stream' in entity ) {
188
- map . set ( entity . stream . id , null ) ;
255
+ throw new Error ( `Unsupported Entity ${ JSON . stringify ( entity ) } ` ) ;
189
256
} else {
190
257
throw new Error ( `Unsupported Entity ${ JSON . stringify ( entity ) } ` ) ;
191
258
}
0 commit comments