Skip to content

add driver mongodb job batching #2551

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
349 changes: 349 additions & 0 deletions src/Bus/MongodbBatchRepository.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,349 @@
<?php

namespace Jenssegers\Mongodb\Bus;

use Carbon\CarbonImmutable;
use Closure;
use DateTimeInterface;
use Illuminate\Bus\BatchFactory;
use Illuminate\Bus\PendingBatch;
use Illuminate\Bus\PrunableBatchRepository;
use Illuminate\Bus\UpdatedBatchJobCounts;
use Illuminate\Database\Connection;
use Illuminate\Database\PostgresConnection;
use Illuminate\Support\Str;

class MongodbBatchRepository implements PrunableBatchRepository
{
/**
* The batch factory instance.
*
* @var \Illuminate\Bus\BatchFactory
*/
protected $factory;

/**
* The database connection instance.
*
* @var \Illuminate\Database\Connection
*/
protected $connection;

/**
* The database table to use to store batch information.
*
* @var string
*/
protected $table;

/**
* Create a new batch repository instance.
*
* @param \Illuminate\Bus\BatchFactory $factory
* @param \Illuminate\Database\Connection $connection
* @param string $table
*/
public function __construct(BatchFactory $factory, Connection $connection, string $table)
{
$this->factory = $factory;
$this->connection = $connection;
$this->table = $table;
}

/**
* Retrieve a list of batches.
*
* @param int $limit
* @param mixed $before
* @return \Illuminate\Bus\Batch[]
*/
public function get($limit = 50, $before = null)
{
return $this->connection->table($this->table)
->orderByDesc('_id')
->take($limit)
->when($before, function ($q) use ($before) {
return $q->where('_id', '<', $before);
})
->get()
->map(function ($batch) {
return $this->toBatch($batch);
})
->all();
}

/**
* Retrieve information about an existing batch.
*
* @param string $batchId
* @return \Illuminate\Bus\Batch|null
*/
public function find(string $batchId)
{
$batch = $this->connection->table($this->table)
->where('_id', $batchId)
->first();

if ($batch) {
return $this->toBatch($batch);
}
}

/**
* Store a new pending batch.
*
* @param \Illuminate\Bus\PendingBatch $batch
* @return \Illuminate\Bus\Batch
*/
public function store(PendingBatch $batch)
{
$id = $this->connection->table($this->table)->insertGetId([
'name' => $batch->name,
'total_jobs' => 0,
'pending_jobs' => 0,
'failed_jobs' => 0,
'failed_job_ids' => '[]',
'options' => $this->serialize($batch->options),
'created_at' => time(),
'cancelled_at' => null,
'finished_at' => null,
]);

return $this->find($id);
}

/**
* Increment the total number of jobs within the batch.
*
* @param string $batchId
* @param int $amount
* @return void
*/
public function incrementTotalJobs(string $batchId, int $amount)
{
$batch = $this->connection->table($this->table)->where('_id', $batchId)->first();

$this->connection->table($this->table)->where('_id', $batchId)->update([
'total_jobs' => $batch['total_jobs'] + $amount,
'pending_jobs' => $batch['pending_jobs'] + $amount,
'finished_at' => null,
]);
}

/**
* Decrement the total number of pending jobs for the batch.
*
* @param string $batchId
* @param string $jobId
* @return \Illuminate\Bus\UpdatedBatchJobCounts
*/
public function decrementPendingJobs(string $batchId, string $jobId)
{
$values = $this->updateAtomicValues($batchId, function ($batch) use ($jobId) {
return [
'pending_jobs' => $batch['pending_jobs'] - 1,
'failed_jobs' => $batch['failed_jobs'],
'failed_job_ids' => json_encode(array_values(array_diff(json_decode($batch['failed_job_ids'], true), [$jobId]))),
];
});

return new UpdatedBatchJobCounts(
$values['pending_jobs'],
$values['failed_jobs']
);
}

/**
* Increment the total number of failed jobs for the batch.
*
* @param string $batchId
* @param string $jobId
* @return \Illuminate\Bus\UpdatedBatchJobCounts
*/
public function incrementFailedJobs(string $batchId, string $jobId)
{
$values = $this->updateAtomicValues($batchId, function ($batch) use ($jobId) {
return [
'pending_jobs' => $batch['pending_jobs'],
'failed_jobs' => $batch['failed_jobs'] + 1,
'failed_job_ids' => json_encode(array_values(array_unique(array_merge(json_decode($batch['failed_job_ids'], true), [$jobId])))),
];
});

return new UpdatedBatchJobCounts(
$values['pending_jobs'],
$values['failed_jobs']
);
}

/**
* Update an atomic value within the batch.
*
* @param string $batchId
* @param Closure $callback
* @return int|null
*/
protected function updateAtomicValues(string $batchId, Closure $callback)
{
return $this->connection->transaction(function () use ($batchId, $callback) {
$batch = $this->connection->table($this->table)->where('_id', $batchId)
->lockForUpdate()
->first();

return is_null($batch) ? [] : tap($callback($batch), function ($values) use ($batchId) {
$this->connection->table($this->table)->where('_id', $batchId)->update($values);
});
});
}

/**
* Mark the batch that has the given ID as finished.
*
* @param string $batchId
* @return void
*/
public function markAsFinished(string $batchId)
{
$this->connection->table($this->table)->where('_id', $batchId)->update([
'finished_at' => time(),
]);
}

/**
* Cancel the batch that has the given ID.
*
* @param string $batchId
* @return void
*/
public function cancel(string $batchId)
{
$this->connection->table($this->table)->where('_id', $batchId)->update([
'cancelled_at' => time(),
'finished_at' => time(),
]);
}

/**
* Delete the batch that has the given ID.
*
* @param string $batchId
* @return void
*/
public function delete(string $batchId)
{
$this->connection->table($this->table)->where('_id', $batchId)->delete();
}

/**
* Prune all the entries older than the given date.
*
* @param \DateTimeInterface $before
* @return int
*/
public function prune(DateTimeInterface $before)
{
$query = $this->connection->table($this->table)
->whereNotNull('finished_at')
->where('finished_at', '<', $before->getTimestamp());

$totalDeleted = 0;

do {
$deleted = $query->take(1000)->delete();

$totalDeleted += $deleted;
} while ($deleted !== 0);

return $totalDeleted;
}

/**
* Prune all the unfinished entries older than the given date.
*
* @param \DateTimeInterface $before
* @return int
*/
public function pruneUnfinished(DateTimeInterface $before)
{
$query = $this->connection->table($this->table)
->whereNull('finished_at')
->where('created_at', '<', $before->getTimestamp());

$totalDeleted = 0;

do {
$deleted = $query->take(1000)->delete();

$totalDeleted += $deleted;
} while ($deleted !== 0);

return $totalDeleted;
}

/**
* Execute the given Closure within a storage specific transaction.
*
* @param Closure $callback
*
*/
public function transaction(Closure $callback)
{
return $this->connection->transaction(function () use ($callback) {
return $callback();
});
}

/**
* Serialize the given value.
*
* @param mixed $value
* @return string
*/
protected function serialize($value)
{
$serialized = serialize($value);

return $this->connection instanceof PostgresConnection
? base64_encode($serialized)
: $serialized;
}

/**
* Unserialize the given value.
*
* @param string $serialized
* @return mixed
*/
protected function unserialize($serialized)
{
if ($this->connection instanceof PostgresConnection &&
!Str::contains($serialized, [':', ';'])) {
$serialized = base64_decode($serialized);
}

return unserialize($serialized);
}

/**
* Convert the given raw batch to a Batch object.
*
* @param object $batch
* @return \Illuminate\Bus\Batch
*/
protected function toBatch($batch)
{
return $this->factory->make(
$this,
$batch['_id'],
$batch['name'],
(int)$batch['total_jobs'],
(int)$batch['pending_jobs'],
(int)$batch['failed_jobs'],
json_decode($batch['failed_job_ids'], true),
$this->unserialize($batch['options']),
CarbonImmutable::createFromTimestamp($batch['created_at']),
$batch['cancelled_at'] ? CarbonImmutable::createFromTimestamp($batch['cancelled_at']) : $batch['cancelled_at'],
$batch['finished_at'] ? CarbonImmutable::createFromTimestamp($batch['finished_at']) : $batch['finished_at']
);
}
}
4 changes: 2 additions & 2 deletions src/Concerns/ManagesTransactions.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
trait ManagesTransactions
{
protected ?Session $session = null;
protected $session = null;

protected $transactions = 0;

Expand Down Expand Up @@ -80,7 +80,7 @@ public function rollBack($toLevel = null): void
*
* @param int $attempts
*/
public function transaction(Closure $callback, $attempts = 1, array $options = []): mixed
public function transaction(Closure $callback, $attempts = 1, array $options = [])
{
$attemptsLeft = $attempts;
$callbackResult = null;
Expand Down
2 changes: 1 addition & 1 deletion src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class Connection extends BaseConnection
{
use ManagesTransactions;

private static ?string $version = null;
private static $version = null;

/**
* The MongoDB database handler.
Expand Down
Loading