Skip to content

Commit f70407d

Browse files
committed
wip
1 parent 4e27620 commit f70407d

File tree

2 files changed

+136
-16
lines changed

2 files changed

+136
-16
lines changed

src/Bus/MongoBatchRepository.php

Lines changed: 83 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,32 +3,46 @@
33
namespace MongoDB\Laravel\Bus;
44

55
use BadMethodCallException;
6+
use Carbon\CarbonImmutable;
67
use Closure;
78
use DateTimeInterface;
9+
use Illuminate\Bus\Batch;
10+
use Illuminate\Bus\BatchFactory;
811
use Illuminate\Bus\DatabaseBatchRepository;
912
use Illuminate\Bus\PendingBatch;
1013
use Illuminate\Bus\PrunableBatchRepository;
1114
use Illuminate\Bus\UpdatedBatchJobCounts;
15+
use Illuminate\Database\Connection;
1216
use Illuminate\Support\Carbon;
1317
use MongoDB\BSON\ObjectId;
1418
use MongoDB\BSON\UTCDateTime;
1519
use MongoDB\Driver\ReadPreference;
1620
use MongoDB\Laravel\Collection;
1721
use Override;
1822

23+
use function assert;
24+
use function date_default_timezone_get;
1925
use function is_string;
2026

2127
// Extending DatabaseBatchRepository is necessary so methods pruneUnfinished and pruneCancelled
2228
// are called by PruneBatchesCommand
2329
class MongoBatchRepository extends DatabaseBatchRepository implements PrunableBatchRepository
2430
{
31+
private Collection $collection;
32+
2533
public function __construct(
26-
private Collection $collection,
34+
BatchFactory $factory,
35+
Connection $connection,
36+
string $collection,
2737
) {
38+
assert($connection instanceof \MongoDB\Laravel\Connection);
39+
$this->collection = $connection->getCollection($collection);
40+
41+
parent::__construct($factory, $connection, $collection);
2842
}
2943

3044
#[Override]
31-
public function get($limit = 50, $before = null)
45+
public function get($limit = 50, $before = null): array
3246
{
3347
if (is_string($before)) {
3448
$before = new ObjectId($before);
@@ -41,22 +55,36 @@ public function get($limit = 50, $before = null)
4155
'sort' => ['_id' => -1],
4256
'typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array'],
4357
],
44-
);
58+
)->toArray();
4559
}
4660

4761
#[Override]
48-
public function find(string $batchId)
62+
public function find(string $batchId): ?Batch
4963
{
5064
$batchId = new ObjectId($batchId);
5165

52-
return $this->collection->findOne(
66+
$batch = $this->collection->findOne(
5367
['_id' => $batchId],
5468
['readPreference' => ReadPreference::PRIMARY],
5569
);
70+
71+
return $this->factory->make(
72+
$this,
73+
$batch['id'],
74+
$batch['name'],
75+
$batch['total_jobs'],
76+
$batch['pending_jobs'],
77+
$batch['failed_jobs'],
78+
$batch['failed_job_ids'],
79+
$batch['options'],
80+
CarbonImmutable::createFromTimestamp($batch['created_at']->getTimestamp(), date_default_timezone_get()),
81+
$batch['cancelled_at'] ? CarbonImmutable::createFromTimestamp($batch['cancelled_at']->getTimestamp(), date_default_timezone_get()) : null,
82+
$batch['finished_at'] ? CarbonImmutable::createFromTimestamp($batch['finished_at']->getTimestamp(), date_default_timezone_get()) : null,
83+
);
5684
}
5785

5886
#[Override]
59-
public function store(PendingBatch $batch)
87+
public function store(PendingBatch $batch): Batch
6088
{
6189
$this->collection->insertOne([
6290
'name' => $batch->name,
@@ -72,7 +100,7 @@ public function store(PendingBatch $batch)
72100
}
73101

74102
#[Override]
75-
public function incrementTotalJobs(string $batchId, int $amount)
103+
public function incrementTotalJobs(string $batchId, int $amount): void
76104
{
77105
$batchId = new ObjectId($batchId);
78106
$this->collection->updateOne(
@@ -90,13 +118,14 @@ public function incrementTotalJobs(string $batchId, int $amount)
90118
}
91119

92120
#[Override]
93-
public function decrementPendingJobs(string $batchId, string $jobId)
121+
public function decrementPendingJobs(string $batchId, string $jobId): UpdatedBatchJobCounts
94122
{
95123
$batchId = new ObjectId($batchId);
96124
$values = $this->collection->findOneAndUpdate(
97125
['_id' => $batchId],
98126
[
99127
'$dec' => ['pending_jobs' => 1],
128+
'$pull' => ['failed_job_ids' => $jobId],
100129
],
101130
[
102131
'projection' => ['pending_jobs' => 1, 'failed_jobs' => 1],
@@ -110,13 +139,28 @@ public function decrementPendingJobs(string $batchId, string $jobId)
110139
}
111140

112141
#[Override]
113-
public function incrementFailedJobs(string $batchId, string $jobId)
142+
public function incrementFailedJobs(string $batchId, string $jobId): UpdatedBatchJobCounts
114143
{
115-
// TODO: Implement incrementFailedJobs() method.
144+
$batchId = new ObjectId($batchId);
145+
$values = $this->collection->findOneAndUpdate(
146+
['_id' => $batchId],
147+
[
148+
'$inc' => ['pending_jobs' => 1],
149+
'$push' => ['failed_job_ids' => $jobId],
150+
],
151+
[
152+
'projection' => ['pending_jobs' => 1, 'failed_jobs' => 1],
153+
],
154+
);
155+
156+
return new UpdatedBatchJobCounts(
157+
$values['pending_jobs'],
158+
$values['failed_jobs'],
159+
);
116160
}
117161

118162
#[Override]
119-
public function markAsFinished(string $batchId)
163+
public function markAsFinished(string $batchId): void
120164
{
121165
$batchId = new ObjectId($batchId);
122166
$this->collection->updateOne(
@@ -126,7 +170,7 @@ public function markAsFinished(string $batchId)
126170
}
127171

128172
#[Override]
129-
public function cancel(string $batchId)
173+
public function cancel(string $batchId): void
130174
{
131175
$batchId = new ObjectId($batchId);
132176
$this->collection->updateOne(
@@ -141,22 +185,27 @@ public function cancel(string $batchId)
141185
}
142186

143187
#[Override]
144-
public function delete(string $batchId)
188+
public function delete(string $batchId): void
145189
{
146190
$batchId = new ObjectId($batchId);
147191
$this->collection->deleteOne(['_id' => $batchId]);
148192
}
149193

194+
/** Execute the given Closure within a storage specific transaction. */
150195
#[Override]
151-
public function transaction(Closure $callback)
196+
public function transaction(Closure $callback): mixed
152197
{
153198
// Transactions are not necessary
154199
return $callback();
155200
}
156201

157-
/** Update an atomic value within the batch. */
202+
/**
203+
* Rollback the last database transaction for the connection.
204+
*
205+
* Not implemented.
206+
*/
158207
#[Override]
159-
public function rollBack()
208+
public function rollBack(): void
160209
{
161210
throw new BadMethodCallException('Not implemented');
162211
}
@@ -197,4 +246,22 @@ public function pruneCancelled(DateTimeInterface $before): int
197246

198247
return $result->getDeletedCount();
199248
}
249+
250+
#[Override]
251+
protected function toBatch($batch): Batch
252+
{
253+
return $this->factory->make(
254+
$this,
255+
$batch->id,
256+
$batch->name,
257+
$batch->total_jobs,
258+
$batch->pending_jobs,
259+
$batch->failed_jobs,
260+
$batch->failed_job_ids,
261+
$batch->options,
262+
CarbonImmutable::createFromTimestamp($batch->created_at->getTimestamp(), date_default_timezone_get()),
263+
$batch->cancelled_at ? CarbonImmutable::createFromTimestamp($batch->cancelled_at->getTimestamp(), date_default_timezone_get()) : null,
264+
$batch->finished_at ? CarbonImmutable::createFromTimestamp($batch->finished_at->getTimestamp(), date_default_timezone_get()) : null,
265+
);
266+
}
200267
}

src/MongoDBBusServiceProvider.php

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
<?php
2+
3+
namespace MongoDB\Laravel;
4+
5+
use Illuminate\Bus\BatchFactory;
6+
use Illuminate\Bus\BatchRepository;
7+
use Illuminate\Bus\BusServiceProvider;
8+
use Illuminate\Container\Container;
9+
use Illuminate\Contracts\Support\DeferrableProvider;
10+
use Illuminate\Support\ServiceProvider;
11+
use MongoDB\Laravel\Bus\MongoBatchRepository;
12+
13+
class MongoDBBusServiceProvider extends ServiceProvider implements DeferrableProvider
14+
{
15+
/**
16+
* Register the service provider.
17+
*/
18+
public function register()
19+
{
20+
$this->app->singleton(MongoBatchRepository::class, function (Container $app) {
21+
return new MongoBatchRepository(
22+
$app->make(BatchFactory::class),
23+
$app->make('db')->connection($app->config->get('queue.batching.database')),
24+
$app->config->get('queue.batching.collection', 'job_batches'),
25+
);
26+
});
27+
28+
/** @see BusServiceProvider::registerBatchServices() */
29+
$this->app->extend(BatchRepository::class, function (BatchRepository $repository, Container $app) {
30+
$driver = $app->config->get('queue.batching.driver');
31+
32+
return $driver === 'mongodb'
33+
? $app->make(MongoBatchRepository::class)
34+
: $repository;
35+
});
36+
// Add database driver.
37+
$this->app->resolving('db', function ($db) {
38+
$db->extend('mongodb', function ($config, $name) {
39+
$config['name'] = $name;
40+
41+
return new Connection($config);
42+
});
43+
});
44+
}
45+
46+
public function provides()
47+
{
48+
return [
49+
BatchRepository::class,
50+
MongoBatchRepository::class,
51+
];
52+
}
53+
}

0 commit comments

Comments
 (0)