Skip to content

PHPLIB-1237 Implement Parallel Multi File Export Bench #1169

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion benchmark/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"require": {
"php": ">=8.1",
"ext-pcntl": "*",
"amphp/parallel-functions": "^1.1",
"amphp/parallel": "^2.2",
"mongodb/mongodb": "@dev",
"phpbench/phpbench": "^1.2"
},
Expand All @@ -26,5 +26,8 @@
},
"scripts": {
"benchmark": "phpbench run --report=aggregate"
},
"config": {
"sort-packages": true
}
}
25 changes: 25 additions & 0 deletions benchmark/src/DriverBench/Amp/ExportFileTask.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

namespace MongoDB\Benchmark\DriverBench\Amp;

use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;
use MongoDB\Benchmark\DriverBench\ParallelMultiFileExportBench;

final class ExportFileTask implements Task
{
public function __construct(
private string|array $files,
private array $filter = [],
private array $options = [],
) {
}

public function run(Channel $channel, Cancellation $cancellation): mixed
{
ParallelMultiFileExportBench::exportFile($this->files, $this->filter, $this->options);

return $this->files;
}
}
23 changes: 23 additions & 0 deletions benchmark/src/DriverBench/Amp/ImportFileTask.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

namespace MongoDB\Benchmark\DriverBench\Amp;

use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;
use MongoDB\Benchmark\DriverBench\ParallelMultiFileImportBench;

final class ImportFileTask implements Task
{
public function __construct(
private array $files,
) {
}

public function run(Channel $channel, Cancellation $cancellation): mixed
{
ParallelMultiFileImportBench::importFile($this->files);

return $this->files;
}
}
6 changes: 4 additions & 2 deletions benchmark/src/DriverBench/BSONMicroBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
final class BSONMicroBench
{
/** @param array{document:Document} $params */
#[ParamProviders('provideParams')]
public function benchEncoding(array $params): void
{
Expand All @@ -25,12 +26,13 @@ public function benchEncoding(array $params): void
}
}

/** @param array{bson:string} $params */
#[ParamProviders('provideParams')]
public function benchDecoding(array $params): void
{
$document = $params['document'];
$bson = $params['bson'];
for ($i = 0; $i < 10_000; $i++) {
Document::fromBSON($document);
Document::fromBSON($bson);
}
}

Expand Down
225 changes: 225 additions & 0 deletions benchmark/src/DriverBench/ParallelMultiFileExportBench.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
<?php

namespace MongoDB\Benchmark\DriverBench;

use Amp\Future;
use Amp\Parallel\Worker\ContextWorkerFactory;
use Amp\Parallel\Worker\ContextWorkerPool;
use Generator;
use MongoDB\Benchmark\DriverBench\Amp\ExportFileTask;
use MongoDB\Benchmark\Fixtures\Data;
use MongoDB\Benchmark\Utils;
use MongoDB\BSON\Document;
use PhpBench\Attributes\AfterClassMethods;
use PhpBench\Attributes\AfterMethods;
use PhpBench\Attributes\BeforeClassMethods;
use PhpBench\Attributes\Iterations;
use PhpBench\Attributes\ParamProviders;
use PhpBench\Attributes\Revs;
use RuntimeException;

use function array_chunk;
use function array_fill;
use function array_map;
use function ceil;
use function file_exists;
use function file_get_contents;
use function file_put_contents;
use function is_dir;
use function json_encode;
use function mkdir;
use function pcntl_fork;
use function pcntl_waitpid;
use function range;
use function sprintf;
use function sys_get_temp_dir;
use function unlink;

/**
* For accurate results, run benchmarks on a standalone server.
*
* @see https://github.com/mongodb/specifications/blob/ddfc8b583d49aaf8c4c19fa01255afb66b36b92e/source/benchmarking/benchmarking.rst#ldjson-multi-file-export
*/
#[BeforeClassMethods('beforeClass')]
#[AfterClassMethods('afterClass')]
#[AfterMethods('afterIteration')]
#[Iterations(1)]
#[Revs(1)]
final class ParallelMultiFileExportBench
{
public static function beforeClass(): void
{
// Resets the database to ensure that the collection is empty
Utils::getDatabase()->drop();

$doc = Document::fromJSON(file_get_contents(Data::LDJSON_FILE_PATH));
Utils::getCollection()->insertMany(array_fill(0, 500_000, $doc));
}

public static function afterClass(): void
{
Utils::getDatabase()->drop();
}

public function afterIteration(): void
{
foreach (self::getFileNames() as $file) {
if (file_exists($file)) {
unlink($file);
}
}
}

/**
* Using a single thread to export multiple files.
* By executing a single Find command for multiple files, we can reduce the number of roundtrips to the server.
*
* @param array{chunk:int} $params
*/
#[ParamProviders(['provideChunkParams'])]
public function benchSequential(array $params): void
{
foreach (array_chunk(self::getFileNames(), $params['chunk']) as $i => $files) {
self::exportFile($files, [], [
'limit' => 5_000 * $params['chunk'],
'skip' => 5_000 * $params['chunk'] * $i,
]);
}
}

/**
* Using multiple forked threads
*
* @param array{chunk:int} $params
*/
#[ParamProviders(['provideChunkParams'])]
public function benchFork(array $params): void
{
$pids = [];

// Reset to ensure that the existing libmongoc client (via the Manager) is not re-used by the child
// process. When the child process constructs a new Manager, the differing PID will result in creation
// of a new libmongoc client.
Utils::reset();

// Create a child process for each chunk of files
foreach (array_chunk(self::getFileNames(), $params['chunk']) as $i => $files) {
$pid = pcntl_fork();
if ($pid === 0) {
self::exportFile($files, [], [
'limit' => 5_000 * $params['chunk'],
'skip' => 5_000 * $params['chunk'] * $i,
]);

// Exit the child process
exit(0);
}

if ($pid === -1) {
throw new RuntimeException('Failed to fork');
}

// Keep the forked process id to wait for it later
$pids[$pid] = true;
}

// Wait for all child processes to finish
while ($pids !== []) {
$pid = pcntl_waitpid(-1, $status);
unset($pids[$pid]);
}
}

/**
* Using amphp/parallel with worker pool
*
* @param array{chunk:int} $params
*/
#[ParamProviders(['provideChunkParams'])]
public function benchAmpWorkers(array $params): void
{
$workerPool = new ContextWorkerPool(ceil(100 / $params['chunk']), new ContextWorkerFactory());

$futures = [];
foreach (array_chunk(self::getFileNames(), $params['chunk']) as $i => $files) {
$futures[] = $workerPool->submit(
new ExportFileTask(
files: $files,
options: [
'limit' => 5_000 * $params['chunk'],
'skip' => 5_000 * $params['chunk'] * $i,
],
),
)->getFuture();
}

foreach (Future::iterate($futures) as $future) {
$future->await();
}
}

public static function provideChunkParams(): Generator
{
yield 'by 1' => ['chunk' => 1];
yield 'by 2' => ['chunk' => 2];
yield 'by 4' => ['chunk' => 4];
yield 'by 8' => ['chunk' => 8];
yield 'by 13' => ['chunk' => 13];
yield 'by 20' => ['chunk' => 20];
yield 'by 100' => ['chunk' => 100];
}

/**
* Export a query to a file
*/
public static function exportFile(array|string $files, array $filter = [], array $options = []): void
{
$options += [
// bson typemap is faster on query result, but slower to JSON encode
'typeMap' => ['root' => 'array'],
// Excludes _id field to be identical to fixtures data
'projection' => ['_id' => 0],
'sort' => ['_id' => 1],
];
$cursor = Utils::getCollection()->find($filter, $options);
$cursor->rewind();

foreach ((array) $files as $file) {
// Aggregate file in memory to reduce filesystem operations
$data = '';
for ($i = 0; $i < 5_000; $i++) {
$document = $cursor->current();
// Cursor exhausted
if (! $document) {
break;
}

// We don't use MongoDB\BSON\Document::toCanonicalExtendedJSON() because
// it is slower than json_encode() on an array.
Comment on lines +197 to +198
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that json_encode does not produce the same result as toCanonicalExtendedJSON. That said, it does produce the result we wanted.

I also had to do a double-take on this, so I wrote a small benchmark:

    benchToJSONViaToCanonicalExtendedJSON...I2 - Mo48.200μs (±0.00%)
    benchToJSONViaToRelaxedExtendedJSON.....I2 - Mo48.591μs (±6.09%)
    benchToJSONViaJsonEncode................I2 - Mo12.410μs (±3.55%)

The last benchmark actually calls json_encode($document->toPHP(['root' => 'array'])), so I'm a bit surprised that is faster. This might provide an opportunity to revisit the JSON serialisation logic in libbson.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked in PHPC-2299

$data .= json_encode($document) . "\n";
$cursor->next();
}

// Write file in a single operation
file_put_contents($file, $data);
}
}

/**
* Using a method to regenerate the file names because we cannot cache the result of the method in a static
* property. The benchmark runner will call the method in a different process, so the static property will not be
* populated.
*/
private static function getFileNames(): array
{
$tempDir = sys_get_temp_dir() . '/mongodb-php-benchmark';
if (! is_dir($tempDir)) {
mkdir($tempDir);
}

return array_map(
static fn (int $i) => sprintf('%s/%03d.txt', $tempDir, $i),
range(0, 99),
);
}
}
Loading