-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add transaction support #1904
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add transaction support #1904
Changes from 9 commits
e54408a
9a01d23
64b4156
34b1a98
84850a9
bedaaa8
b030914
163b41d
92c1e07
caa61ea
7d96c3e
776492d
10bda05
85a0be8
b39251c
fe61056
3029e36
4ce523f
44cb7a7
7221417
1b90047
a5d0858
add9516
14b3ad7
0c22e4a
0b840db
cf88a03
2664bd5
44bd081
850d034
83f45c9
5eb6f42
84186f1
7741acb
72dbdcb
c14bd44
5773597
9e0cfd2
b2d1740
8cc01d5
dafee61
0fd27ba
6e89c8b
5645fc7
9cd8bb2
551c184
49307e2
59020fb
97f9b4b
9c90125
041d02b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
FROM mongo | ||
|
||
ADD mongo-replset-init.sh /usr/local/bin/ | ||
|
||
RUN chmod +x /usr/local/bin/mongo-replset-init.sh |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
#!/bin/bash | ||
|
||
echo "prepare rs initiating" | ||
|
||
check_db_status() { | ||
mongo1=$(mongo --host mongo1 --port 27017 --eval "db.stats().ok" | tail -n1 | grep -E '(^|\s)1($|\s)') | ||
mongo2=$(mongo --host mongo2 --port 27017 --eval "db.stats().ok" | tail -n1 | grep -E '(^|\s)1($|\s)') | ||
mongo3=$(mongo --host mongo3 --port 27017 --eval "db.stats().ok" | tail -n1 | grep -E '(^|\s)1($|\s)') | ||
if [[ $mongo1 == 1 ]] && [[ $mongo2 == 1 ]] && [[ $mongo3 == 1 ]]; then | ||
init_rs | ||
else | ||
check_db_status | ||
fi | ||
} | ||
|
||
init_rs() { | ||
ret=$(mongo --host mongo1 --port 27017 --eval "rs.initiate({ _id: 'rs0', members: [{ _id: 0, host: 'mongo1:27017' }, { _id: 1, host: 'mongo2:27017' }, { _id: 2, host: 'mongo3:27017' } ] })" > /dev/null 2>&1) | ||
} | ||
|
||
check_db_status > /dev/null 2>&1 | ||
|
||
echo "rs initiating finished" | ||
exit 0 |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -5,6 +5,9 @@ | |||||
use Illuminate\Database\Connection as BaseConnection; | ||||||
use Illuminate\Support\Arr; | ||||||
use MongoDB\Client; | ||||||
use MongoDB\Driver\ReadConcern; | ||||||
use MongoDB\Driver\ReadPreference; | ||||||
use MongoDB\Driver\WriteConcern; | ||||||
|
||||||
class Connection extends BaseConnection | ||||||
{ | ||||||
|
@@ -20,6 +23,9 @@ class Connection extends BaseConnection | |||||
*/ | ||||||
protected $connection; | ||||||
|
||||||
protected $session_key; // sessions会话列表当前会话数组key 随机生成 | ||||||
divine marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
protected $sessions = []; // 会话列表 | ||||||
divine marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
/** | ||||||
* Create a new database connection instance. | ||||||
* @param array $config | ||||||
|
@@ -254,4 +260,81 @@ public function __call($method, $parameters) | |||||
{ | ||||||
return call_user_func_array([$this->db, $method], $parameters); | ||||||
} | ||||||
|
||||||
/** | ||||||
* create a session and start a transaction in session | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
* | ||||||
* In version 4.0, MongoDB supports multi-document transactions on replica sets. | ||||||
* In version 4.2, MongoDB introduces distributed transactions, which adds support for multi-document transactions on sharded clusters and incorporates the existing support for multi-document transactions on replica sets. | ||||||
* To use transactions on MongoDB 4.2 deployments(replica sets and sharded clusters), clients must use MongoDB drivers updated for MongoDB 4.2. | ||||||
* | ||||||
* @see https://docs.mongodb.com/manual/core/transactions/ | ||||||
* @author klinson <[email protected]> | ||||||
divine marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
*/ | ||||||
public function beginTransaction() | ||||||
{ | ||||||
$this->session_key = uniqid(); | ||||||
$this->sessions[$this->session_key] = $this->connection->startSession(); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given that the session from the connection instance is applied automatically to all operations sent through the query builder, I'd refrain from starting multiple sessions. Looking at the transaction logic used for PDO, the first call to |
||||||
|
||||||
$this->sessions[$this->session_key]->startTransaction([ | ||||||
'readPreference' => new ReadPreference(ReadPreference::RP_PRIMARY), | ||||||
'writeConcern' => new WriteConcern(1), | ||||||
'readConcern' => new ReadConcern(ReadConcern::LOCAL) | ||||||
Comment on lines
+311
to
+313
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would avoid hardcoding these options this way, especially with the given values. There is no prior art for passing options (as most methods in the query builder don't accept any options), but an optional |
||||||
]); | ||||||
} | ||||||
|
||||||
/** | ||||||
* commit transaction in this session and close this session | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
* @author klinson <[email protected]> | ||||||
divine marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
*/ | ||||||
public function commit() | ||||||
{ | ||||||
if ($session = $this->getSession()) { | ||||||
$session->commitTransaction(); | ||||||
$this->setLastSession(); | ||||||
} | ||||||
} | ||||||
|
||||||
/** | ||||||
* rollback transaction in this session and close this session | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
* @author klinson <[email protected]> | ||||||
divine marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
*/ | ||||||
public function rollBack($toLevel = null) | ||||||
{ | ||||||
if ($session = $this->getSession()) { | ||||||
$session->abortTransaction(); | ||||||
$this->setLastSession(); | ||||||
} | ||||||
} | ||||||
|
||||||
/** | ||||||
* close this session and get last session key to session_key | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
* Why do it ? Because nested transactions | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
* @author klinson <[email protected]> | ||||||
divine marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
*/ | ||||||
protected function setLastSession() | ||||||
{ | ||||||
if ($session = $this->getSession()) { | ||||||
$session->endSession(); | ||||||
unset($this->sessions[$this->session_key]); | ||||||
if (empty($this->sessions)) { | ||||||
$this->session_key = null; | ||||||
} else { | ||||||
end($this->sessions); | ||||||
$this->session_key = key($this->sessions); | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
/** | ||||||
* get now session if it has session | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
* @return \MongoDB\Driver\Session|null | ||||||
* @author klinson <[email protected]> | ||||||
divine marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
*/ | ||||||
public function getSession() | ||||||
{ | ||||||
return ($this->session_key && isset($this->sessions[$this->session_key])) | ||||||
klinson marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
? $this->sessions[$this->session_key] | ||||||
divine marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
: null; | ||||||
divine marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
} | ||||||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -294,7 +294,7 @@ public function getFresh($columns = []) | |||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
// The _id field is mandatory when using grouping. | ||||||
if ($group && empty($group['_id'])) { | ||||||
$group['_id'] = null; | ||||||
|
@@ -338,6 +338,11 @@ public function getFresh($columns = []) | |||||
$options = array_merge($options, $this->options); | ||||||
} | ||||||
|
||||||
// if transaction in session | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
if ($session = $this->connection->getSession()) { | ||||||
$options['session'] = $session; | ||||||
} | ||||||
|
||||||
// Execute aggregation | ||||||
$results = iterator_to_array($this->collection->aggregate($pipeline, $options)); | ||||||
|
||||||
|
@@ -348,13 +353,15 @@ public function getFresh($columns = []) | |||||
// Return distinct results directly | ||||||
$column = isset($this->columns[0]) ? $this->columns[0] : '_id'; | ||||||
|
||||||
// Execute distinct | ||||||
if ($wheres) { | ||||||
$result = $this->collection->distinct($column, $wheres); | ||||||
} else { | ||||||
$result = $this->collection->distinct($column); | ||||||
$options = []; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Please remove. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @klinson can you remove this line? |
||||||
// if transaction in session | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
if ($session = $this->connection->getSession()) { | ||||||
$options['session'] = $session; | ||||||
} | ||||||
|
||||||
// Execute distinct | ||||||
$result = $this->collection->distinct($column, $wheres ?: [], $options); | ||||||
|
||||||
return $this->useCollections ? new Collection($result) : $result; | ||||||
} // Normal query | ||||||
else { | ||||||
|
@@ -397,6 +404,11 @@ public function getFresh($columns = []) | |||||
$options = array_merge($options, $this->options); | ||||||
} | ||||||
|
||||||
// if transaction in session | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
if ($session = $this->connection->getSession()) { | ||||||
$options['session'] = $session; | ||||||
} | ||||||
|
||||||
// Execute query and get MongoCursor | ||||||
$cursor = $this->collection->find($wheres, $options); | ||||||
|
||||||
|
@@ -561,7 +573,12 @@ public function insert(array $values) | |||||
} | ||||||
|
||||||
// Batch insert | ||||||
$result = $this->collection->insertMany($values); | ||||||
$options = []; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Please remove. |
||||||
// if transaction in session | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
if ($session = $this->connection->getSession()) { | ||||||
$options['session'] = $session; | ||||||
} | ||||||
$result = $this->collection->insertMany($values, $options); | ||||||
|
||||||
return (1 == (int) $result->isAcknowledged()); | ||||||
} | ||||||
|
@@ -571,7 +588,12 @@ public function insert(array $values) | |||||
*/ | ||||||
public function insertGetId(array $values, $sequence = null) | ||||||
{ | ||||||
$result = $this->collection->insertOne($values); | ||||||
$options = []; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Common code for many methods, better move this logic in special method used in many places There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @klinson see on this comment, please There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Please remove. |
||||||
// if transaction in session | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
if ($session = $this->connection->getSession()) { | ||||||
$options['session'] = $session; | ||||||
} | ||||||
$result = $this->collection->insertOne($values, $options); | ||||||
|
||||||
if (1 == (int) $result->isAcknowledged()) { | ||||||
if ($sequence === null) { | ||||||
|
@@ -592,6 +614,10 @@ public function update(array $values, array $options = []) | |||||
if (!Str::startsWith(key($values), '$')) { | ||||||
$values = ['$set' => $values]; | ||||||
} | ||||||
// if transaction in session | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
if ($session = $this->connection->getSession()) { | ||||||
$options['session'] = $session; | ||||||
} | ||||||
|
||||||
return $this->performUpdate($values, $options); | ||||||
} | ||||||
|
@@ -614,6 +640,11 @@ public function increment($column, $amount = 1, array $extra = [], array $option | |||||
$query->orWhereNotNull($column); | ||||||
}); | ||||||
|
||||||
// if transaction in session | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
if ($session = $this->connection->getSession()) { | ||||||
$options['session'] = $session; | ||||||
} | ||||||
|
||||||
return $this->performUpdate($query, $options); | ||||||
} | ||||||
|
||||||
|
@@ -673,7 +704,14 @@ public function delete($id = null) | |||||
} | ||||||
|
||||||
$wheres = $this->compileWheres(); | ||||||
$result = $this->collection->DeleteMany($wheres); | ||||||
|
||||||
$options = []; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Please remove. |
||||||
// if transaction in session | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
if ($session = $this->connection->getSession()) { | ||||||
$options['session'] = $session; | ||||||
} | ||||||
|
||||||
$result = $this->collection->DeleteMany($wheres, $options); | ||||||
if (1 == (int) $result->isAcknowledged()) { | ||||||
return $result->getDeletedCount(); | ||||||
} | ||||||
|
@@ -698,7 +736,12 @@ public function from($collection, $as = null) | |||||
*/ | ||||||
public function truncate() | ||||||
{ | ||||||
$result = $this->collection->drop(); | ||||||
$options = []; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Please remove. |
||||||
// if transaction in session | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||
if ($session = $this->connection->getSession()) { | ||||||
$options['session'] = $session; | ||||||
} | ||||||
$result = $this->collection->drop($options); | ||||||
|
||||||
return (1 == (int) $result->ok); | ||||||
} | ||||||
|
@@ -826,6 +869,11 @@ protected function performUpdate($query, array $options = []) | |||||
$options['multiple'] = true; | ||||||
} | ||||||
|
||||||
// if transaction in session | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
if ($session = $this->connection->getSession()) { | ||||||
$options['session'] = $session; | ||||||
} | ||||||
|
||||||
$wheres = $this->compileWheres(); | ||||||
$result = $this->collection->UpdateMany($wheres, $query, $options); | ||||||
if (1 == (int) $result->isAcknowledged()) { | ||||||
|
Uh oh!
There was an error while loading. Please reload this page.