2
2
3
3
namespace MongoDB \Benchmark \DriverBench ;
4
4
5
+ use Amp \Parallel \Worker \DefaultPool ;
5
6
use Generator ;
6
7
use MongoDB \Benchmark \Fixtures \Data ;
7
8
use MongoDB \Benchmark \Utils ;
14
15
use PhpBench \Attributes \Revs ;
15
16
use RuntimeException ;
16
17
17
- use function array_chunk ;
18
+ use function Amp \ParallelFunctions \parallelMap ;
19
+ use function Amp \Promise \wait ;
18
20
use function array_map ;
19
21
use function ceil ;
20
22
use function count ;
@@ -68,8 +70,6 @@ public static function afterClass(): void
68
70
/**
69
71
* Parallel: LDJSON multi-file import
70
72
* Using single thread
71
- *
72
- * @see https://github.com/mongodb/specifications/blob/ddfc8b583d49aaf8c4c19fa01255afb66b36b92e/source/benchmarking/benchmarking.rst#ldjson-multi-file-import
73
73
*/
74
74
#[BeforeMethods('beforeMultiFileImport ' )]
75
75
#[Revs(1 )]
@@ -85,16 +85,15 @@ public function benchMultiFileImport(): void
85
85
* Parallel: LDJSON multi-file import
86
86
* Using multiple forked threads
87
87
*
88
- * @see https://github.com/mongodb/specifications/blob/ddfc8b583d49aaf8c4c19fa01255afb66b36b92e/source/benchmarking/benchmarking.rst#ldjson-multi-file-import
89
88
* @param array{processes:int, files:string[], batchSize:int} $params
90
89
*/
91
90
#[BeforeMethods('beforeMultiFileImport ' )]
92
- #[ParamProviders(['provideProcessesParameter ' , ' provideMultiFileImportParameters ' ])]
91
+ #[ParamProviders(['provideProcessesParameter ' ])]
93
92
#[Revs(1 )]
94
93
public function benchMultiFileImportFork (array $ params ): void
95
94
{
96
95
$ pids = [];
97
- foreach ($ params [ ' files ' ] as $ files ) {
96
+ foreach (self :: getFileNames () as $ file ) {
98
97
// Wait for a child process to finish if we have reached the maximum number of processes
99
98
if (count ($ pids ) >= $ params ['processes ' ]) {
100
99
$ pid = pcntl_waitpid (-1 , $ status );
@@ -107,11 +106,7 @@ public function benchMultiFileImportFork(array $params): void
107
106
// If we don't reset, we will get the same manager client_zval in the child process
108
107
// and share the libmongoc client.
109
108
Utils::reset ();
110
- $ collection = Utils::getCollection ();
111
-
112
- foreach ($ files as $ file ) {
113
- self ::importFile ($ file , $ collection );
114
- }
109
+ self ::importFile ($ file , Utils::getCollection ());
115
110
116
111
// Exit the child process
117
112
exit (0 );
@@ -132,21 +127,31 @@ public function benchMultiFileImportFork(array $params): void
132
127
}
133
128
}
134
129
135
- public static function provideProcessesParameter (): Generator
130
+ /**
131
+ * Parallel: LDJSON multi-file import
132
+ * Using amphp/parallel-functions with worker pool
133
+ *
134
+ * @param array{processes:int, files:string[], batchSize:int} $params
135
+ */
136
+ #[BeforeMethods('beforeMultiFileImport ' )]
137
+ #[ParamProviders(['provideProcessesParameter ' ])]
138
+ #[Revs(1 )]
139
+ public function benchMultiFileImportAmp (array $ params ): void
136
140
{
137
- // Max number of forked processes
138
- for ($ i = 1 ; $ i <= 30 ; $ i = (int ) ceil ($ i * 1.25 )) {
139
- yield $ i . 'fork ' => ['processes ' => $ i ];
140
- }
141
+ wait (parallelMap (
142
+ self ::getFileNames (),
143
+ // Uses array callable instead of closure to skip complex serialization
144
+ [self ::class, 'importFile ' ],
145
+ // The pool size is the number of processes
146
+ new DefaultPool ($ params ['processes ' ]),
147
+ ));
141
148
}
142
149
143
- public static function provideMultiFileImportParameters (): Generator
150
+ public static function provideProcessesParameter (): Generator
144
151
{
145
- $ files = self ::getFileNames ();
146
-
147
- // Chunk of file names to be handled by each processes
148
- for ($ i = 1 ; $ i <= 10 ; $ i += 3 ) {
149
- yield 'by ' . $ i => ['files ' => array_chunk ($ files , $ i )];
152
+ // Max number of forked processes
153
+ for ($ i = 1 ; $ i <= 30 ; $ i = (int ) ceil ($ i * 1.25 )) {
154
+ yield $ i . ' proc ' => ['processes ' => $ i ];
150
155
}
151
156
}
152
157
@@ -166,8 +171,10 @@ public function afterMultiFileImport(): void
166
171
unset($ this ->files );
167
172
}
168
173
169
- private static function importFile (string $ file , Collection $ collection ): void
174
+ public static function importFile (string $ file , ? Collection $ collection = null ): void
170
175
{
176
+ $ collection ??= Utils::getCollection ();
177
+
171
178
// Read file contents into BSON documents
172
179
$ docs = array_map (
173
180
static fn (string $ line ) => Document::fromJSON ($ line ),
@@ -186,8 +193,7 @@ private static function getFileNames(): array
186
193
187
194
return array_map (
188
195
static fn (int $ i ) => sprintf ('%s/%03d.txt ' , $ tempDir , $ i ),
189
- //range(0, 99),
190
- range (0 , 5 ),
196
+ range (0 , 99 ),
191
197
);
192
198
}
193
199
}
0 commit comments