Skip to content

Commit 4e27620

Browse files
committed
PHPORM-81 implement mmongodb driver for batch
1 parent 294f103 commit 4e27620

File tree

1 file changed

+200
-0
lines changed

1 file changed

+200
-0
lines changed

src/Bus/MongoBatchRepository.php

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
<?php
2+
3+
namespace MongoDB\Laravel\Bus;
4+
5+
use BadMethodCallException;
6+
use Closure;
7+
use DateTimeInterface;
8+
use Illuminate\Bus\DatabaseBatchRepository;
9+
use Illuminate\Bus\PendingBatch;
10+
use Illuminate\Bus\PrunableBatchRepository;
11+
use Illuminate\Bus\UpdatedBatchJobCounts;
12+
use Illuminate\Support\Carbon;
13+
use MongoDB\BSON\ObjectId;
14+
use MongoDB\BSON\UTCDateTime;
15+
use MongoDB\Driver\ReadPreference;
16+
use MongoDB\Laravel\Collection;
17+
use Override;
18+
19+
use function is_string;
20+
21+
// Extending DatabaseBatchRepository is necessary so methods pruneUnfinished and pruneCancelled
22+
// are called by PruneBatchesCommand
23+
class MongoBatchRepository extends DatabaseBatchRepository implements PrunableBatchRepository
24+
{
25+
public function __construct(
26+
private Collection $collection,
27+
) {
28+
}
29+
30+
#[Override]
31+
public function get($limit = 50, $before = null)
32+
{
33+
if (is_string($before)) {
34+
$before = new ObjectId($before);
35+
}
36+
37+
return $this->collection->find(
38+
$before ? ['_id' => ['$lt' => $before]] : [],
39+
[
40+
'limit' => $limit,
41+
'sort' => ['_id' => -1],
42+
'typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array'],
43+
],
44+
);
45+
}
46+
47+
#[Override]
48+
public function find(string $batchId)
49+
{
50+
$batchId = new ObjectId($batchId);
51+
52+
return $this->collection->findOne(
53+
['_id' => $batchId],
54+
['readPreference' => ReadPreference::PRIMARY],
55+
);
56+
}
57+
58+
#[Override]
59+
public function store(PendingBatch $batch)
60+
{
61+
$this->collection->insertOne([
62+
'name' => $batch->name,
63+
'total_jobs' => 0,
64+
'pending_jobs' => 0,
65+
'failed_jobs' => 0,
66+
'failed_job_ids' => '[]',
67+
'options' => $this->serialize($batch->options),
68+
'created_at' => new UTCDateTime(Carbon::now()),
69+
'cancelled_at' => null,
70+
'finished_at' => null,
71+
]);
72+
}
73+
74+
#[Override]
75+
public function incrementTotalJobs(string $batchId, int $amount)
76+
{
77+
$batchId = new ObjectId($batchId);
78+
$this->collection->updateOne(
79+
['_id' => $batchId],
80+
[
81+
'$inc' => [
82+
'total_jobs' => $amount,
83+
'pending_jobs' => $amount,
84+
],
85+
'$set' => [
86+
'finished_at' => null,
87+
],
88+
],
89+
);
90+
}
91+
92+
#[Override]
93+
public function decrementPendingJobs(string $batchId, string $jobId)
94+
{
95+
$batchId = new ObjectId($batchId);
96+
$values = $this->collection->findOneAndUpdate(
97+
['_id' => $batchId],
98+
[
99+
'$dec' => ['pending_jobs' => 1],
100+
],
101+
[
102+
'projection' => ['pending_jobs' => 1, 'failed_jobs' => 1],
103+
],
104+
);
105+
106+
return new UpdatedBatchJobCounts(
107+
$values['pending_jobs'],
108+
$values['failed_jobs'],
109+
);
110+
}
111+
112+
#[Override]
113+
public function incrementFailedJobs(string $batchId, string $jobId)
114+
{
115+
// TODO: Implement incrementFailedJobs() method.
116+
}
117+
118+
#[Override]
119+
public function markAsFinished(string $batchId)
120+
{
121+
$batchId = new ObjectId($batchId);
122+
$this->collection->updateOne(
123+
['_id' => $batchId],
124+
['$set' => ['finished_at' => new UTCDateTime(Carbon::now())]],
125+
);
126+
}
127+
128+
#[Override]
129+
public function cancel(string $batchId)
130+
{
131+
$batchId = new ObjectId($batchId);
132+
$this->collection->updateOne(
133+
['_id' => $batchId],
134+
[
135+
'$set' => [
136+
'cancelled_at' => new UTCDateTime(Carbon::now()),
137+
'finished_at' => new UTCDateTime(Carbon::now()),
138+
],
139+
],
140+
);
141+
}
142+
143+
#[Override]
144+
public function delete(string $batchId)
145+
{
146+
$batchId = new ObjectId($batchId);
147+
$this->collection->deleteOne(['_id' => $batchId]);
148+
}
149+
150+
#[Override]
151+
public function transaction(Closure $callback)
152+
{
153+
// Transactions are not necessary
154+
return $callback();
155+
}
156+
157+
/** Update an atomic value within the batch. */
158+
#[Override]
159+
public function rollBack()
160+
{
161+
throw new BadMethodCallException('Not implemented');
162+
}
163+
164+
/** Mark the batch that has the given ID as finished. */
165+
#[Override]
166+
public function prune(DateTimeInterface $before): int
167+
{
168+
$result = $this->collection->deleteMany(
169+
['finished_at' => ['$ne' => null, '$lt' => new UTCDateTime($before)]],
170+
);
171+
172+
return $result->getDeletedCount();
173+
}
174+
175+
/** Prune all the unfinished entries older than the given date. */
176+
public function pruneUnfinished(DateTimeInterface $before): int
177+
{
178+
$result = $this->collection->deleteMany(
179+
[
180+
'finished_at' => null,
181+
'created_at' => ['$lt' => new UTCDateTime($before)],
182+
],
183+
);
184+
185+
return $result->getDeletedCount();
186+
}
187+
188+
/** Prune all the cancelled entries older than the given date. */
189+
public function pruneCancelled(DateTimeInterface $before): int
190+
{
191+
$result = $this->collection->deleteMany(
192+
[
193+
'cancelled_at' => ['$ne' => null],
194+
'created_at' => ['$lt' => new UTCDateTime($before)],
195+
],
196+
);
197+
198+
return $result->getDeletedCount();
199+
}
200+
}

0 commit comments

Comments
 (0)