20
20
use MongoDB \Operation \FindOneAndUpdate ;
21
21
use Override ;
22
22
23
- use function date_default_timezone_get ;
24
23
use function is_string ;
25
24
use function serialize ;
26
25
use function unserialize ;
@@ -66,6 +65,9 @@ public function find(string $batchId): ?Batch
66
65
$ batch = $ this ->collection ->findOne (
67
66
['_id ' => $ batchId ],
68
67
[
68
+ // If the select query is executed faster than the database replication takes place,
69
+ // then no batch is found. In that case an exception is thrown because jobs are added
70
+ // to a null batch.
69
71
'readPreference ' => new ReadPreference (ReadPreference::PRIMARY ),
70
72
'typeMap ' => ['root ' => 'array ' , 'array ' => 'array ' , 'document ' => 'array ' ],
71
73
],
@@ -85,7 +87,7 @@ public function store(PendingBatch $batch): Batch
85
87
'failed_job_ids ' => [],
86
88
// Serialization is required for Closures
87
89
'options ' => serialize ($ batch ->options ),
88
- 'created_at ' => new UTCDateTime (Carbon:: now () ),
90
+ 'created_at ' => $ this -> getUTCDateTime ( ),
89
91
'cancelled_at ' => null ,
90
92
'finished_at ' => null ,
91
93
]);
@@ -161,7 +163,7 @@ public function markAsFinished(string $batchId): void
161
163
$ batchId = new ObjectId ($ batchId );
162
164
$ this ->collection ->updateOne (
163
165
['_id ' => $ batchId ],
164
- ['$set ' => ['finished_at ' => new UTCDateTime (Carbon:: now () )]],
166
+ ['$set ' => ['finished_at ' => $ this -> getUTCDateTime ( )]],
165
167
);
166
168
}
167
169
@@ -173,8 +175,8 @@ public function cancel(string $batchId): void
173
175
['_id ' => $ batchId ],
174
176
[
175
177
'$set ' => [
176
- 'cancelled_at ' => new UTCDateTime (Carbon:: now () ),
177
- 'finished_at ' => new UTCDateTime (Carbon:: now () ),
178
+ 'cancelled_at ' => $ this -> getUTCDateTime ( ),
179
+ 'finished_at ' => $ this -> getUTCDateTime ( ),
178
180
],
179
181
],
180
182
);
@@ -194,16 +196,14 @@ public function transaction(Closure $callback): mixed
194
196
return $ this ->connection ->transaction (fn () => $ callback ());
195
197
}
196
198
197
- /**
198
- * Rollback the last database transaction for the connection.
199
- */
199
+ /** Rollback the last database transaction for the connection. */
200
200
#[Override]
201
201
public function rollBack (): void
202
202
{
203
203
$ this ->connection ->rollBack ();
204
204
}
205
205
206
- /** Mark the batch that has the given ID as finished . */
206
+ /** Prune the entries older than the given date . */
207
207
#[Override]
208
208
public function prune (DateTimeInterface $ before ): int
209
209
{
@@ -258,13 +258,19 @@ protected function toBatch($batch): Batch
258
258
);
259
259
}
260
260
261
+ private function getUTCDateTime (): UTCDateTime
262
+ {
263
+ // Using Carbon so the current time can be modified for tests
264
+ return new UTCDateTime (Carbon::now ());
265
+ }
266
+
261
267
/** @return ($date is null ? null : CarbonImmutable) */
262
268
private function toCarbon (?UTCDateTime $ date ): ?CarbonImmutable
263
269
{
264
270
if ($ date === null ) {
265
271
return null ;
266
272
}
267
273
268
- return CarbonImmutable::createFromTimestamp ((string ) $ date, date_default_timezone_get () );
274
+ return CarbonImmutable::createFromTimestampMsUTC ((string ) $ date );
269
275
}
270
276
}
0 commit comments