2
2
3
3
namespace MongoDB \Benchmark \DriverBench ;
4
4
5
- use Amp \Parallel \Worker \DefaultPool ;
5
+ use Amp \Future ;
6
+ use Amp \Parallel \Worker \ContextWorkerFactory ;
7
+ use Amp \Parallel \Worker \ContextWorkerPool ;
6
8
use Generator ;
9
+ use MongoDB \Benchmark \DriverBench \Amp \ImportFileTask ;
7
10
use MongoDB \Benchmark \Fixtures \Data ;
8
11
use MongoDB \Benchmark \Utils ;
9
12
use MongoDB \BSON \Document ;
16
19
use PhpBench \Attributes \Revs ;
17
20
use RuntimeException ;
18
21
19
- use function Amp \ParallelFunctions \parallelMap ;
20
- use function Amp \Promise \wait ;
21
22
use function array_map ;
22
23
use function count ;
23
24
use function fclose ;
@@ -107,7 +108,7 @@ public function benchMultiFileImportInsertMany(): void
107
108
/**
108
109
* Using multiple forked threads
109
110
*
110
- * @param array{processes:int, files:string[], batchSize:int } $params
111
+ * @param array{processes:int} $params
111
112
*/
112
113
#[ParamProviders(['provideProcessesParameter ' ])]
113
114
public function benchMultiFileImportFork (array $ params ): void
@@ -148,20 +149,25 @@ public function benchMultiFileImportFork(array $params): void
148
149
}
149
150
150
151
/**
151
- * Using amphp/parallel-functions with worker pool
152
+ * Using amphp/parallel with worker pool
152
153
*
153
- * @param array{processes:int, files:string[], batchSize:int } $params
154
+ * @param array{processes:int} $params
154
155
*/
155
156
#[ParamProviders(['provideProcessesParameter ' ])]
156
157
public function benchMultiFileImportAmp (array $ params ): void
157
158
{
158
- wait (parallelMap (
159
+ $ workerPool = new ContextWorkerPool ($ params ['processes ' ], new ContextWorkerFactory ());
160
+
161
+ $ futures = array_map (
162
+ fn ($ file ) => $ workerPool ->submit (new ImportFileTask ($ file ))->getFuture (),
159
163
self ::getFileNames (),
160
- // Uses array callable instead of closure to skip complex serialization
161
- [self ::class, 'importFile ' ],
162
- // The pool size is the number of processes
163
- new DefaultPool ($ params ['processes ' ]),
164
- ));
164
+ );
165
+
166
+ foreach (Future::iterate ($ futures ) as $ future ) {
167
+ $ future ->await ();
168
+ }
169
+
170
+ $ workerPool ->shutdown ();
165
171
}
166
172
167
173
public static function provideProcessesParameter (): Generator
@@ -172,7 +178,6 @@ public static function provideProcessesParameter(): Generator
172
178
yield '8 proc ' => ['processes ' => 8 ]; // 13 sequences
173
179
yield '13 proc ' => ['processes ' => 13 ]; // 8 sequences
174
180
yield '20 proc ' => ['processes ' => 20 ]; // 5 sequences
175
- yield '34 proc ' => ['processes ' => 34 ]; // 3 sequences
176
181
}
177
182
178
183
/**
0 commit comments