Skip to content

PHPC-1152, PHPC-1161: Emulate implicit sessions for command cursors #803

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 79 additions & 25 deletions php_phongo.c
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ bool phongo_cursor_advance_and_check_for_error(mongoc_cursor_t *cursor TSRMLS_DC
return true;
} /* }}} */

int phongo_execute_query(mongoc_client_t *client, const char *namespace, zval *zquery, zval *options, uint32_t server_id, zval *return_value, int return_value_used TSRMLS_DC) /* {{{ */
bool phongo_execute_query(mongoc_client_t *client, const char *namespace, zval *zquery, zval *options, uint32_t server_id, zval *return_value, int return_value_used TSRMLS_DC) /* {{{ */
{
const php_phongo_query_t *query;
mongoc_cursor_t *cursor;
Expand Down Expand Up @@ -827,7 +827,29 @@ static bson_t *create_wrapped_command_envelope(const char *db, bson_t *reply)
return tmp;
}

int phongo_execute_command(mongoc_client_t *client, php_phongo_command_type_t type, const char *db, zval *zcommand, zval *options, uint32_t server_id, zval *return_value, int return_value_used TSRMLS_DC) /* {{{ */
static zval *phongo_create_implicit_session(mongoc_client_t *client TSRMLS_DC) /* {{{ */
{
mongoc_client_session_t *cs;
zval *zsession;

cs = mongoc_client_start_session(client, NULL, NULL);

if (!cs) {
return NULL;
}

#if PHP_VERSION_ID >= 70000
zsession = ecalloc(sizeof(zval), 1);
#else
ALLOC_INIT_ZVAL(zsession);
#endif

phongo_session_init(zsession, cs TSRMLS_CC);

return zsession;
} /* }}} */

bool phongo_execute_command(mongoc_client_t *client, php_phongo_command_type_t type, const char *db, zval *zcommand, zval *options, uint32_t server_id, zval *return_value, int return_value_used TSRMLS_DC) /* {{{ */
{
const php_phongo_command_t *command;
bson_iter_t iter;
Expand All @@ -837,38 +859,50 @@ int phongo_execute_command(mongoc_client_t *client, php_phongo_command_type_t ty
mongoc_cursor_t *cmd_cursor;
zval *zreadPreference = NULL;
zval *zsession = NULL;
int result;
bool result = false;
bool free_reply = false;
bool free_zsession = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before you merge this into master, you should run clang-format on this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in the v1.4 branch, which isn't formatted. I intend to run it through clang-format after (or during) the merge to master. Does that work?


command = Z_COMMAND_OBJ_P(zcommand);

if ((type & PHONGO_OPTION_READ_CONCERN) && !phongo_parse_read_concern(options, &opts TSRMLS_CC)) {
/* Exception should already have been thrown */
bson_destroy(&opts);
return false;
goto cleanup;
}

if ((type & PHONGO_OPTION_READ_PREFERENCE) && !phongo_parse_read_preference(options, &zreadPreference TSRMLS_CC)) {
/* Exception should already have been thrown */
bson_destroy(&opts);
return false;
goto cleanup;
}

if (!phongo_parse_session(options, client, &opts, &zsession TSRMLS_CC)) {
/* Exception should already have been thrown */
bson_destroy(&opts);
return false;
goto cleanup;
}

/* If an explicit session was not provided, attempt to create an implicit
* client session (ignoring any errors). */
if (!zsession) {
zsession = phongo_create_implicit_session(client TSRMLS_CC);

if (zsession) {
free_zsession = true;

if (!mongoc_client_session_append(Z_SESSION_OBJ_P(zsession)->client_session, &opts, NULL)) {
phongo_throw_exception(PHONGO_ERROR_INVALID_ARGUMENT TSRMLS_CC, "Error appending implicit \"sessionId\" option");
goto cleanup;
}
}
}

if ((type & PHONGO_OPTION_WRITE_CONCERN) && !phongo_parse_write_concern(options, &opts, NULL TSRMLS_CC)) {
/* Exception should already have been thrown */
bson_destroy(&opts);
return false;
goto cleanup;
}

if (!BSON_APPEND_INT32(&opts, "serverId", server_id)) {
phongo_throw_exception(PHONGO_ERROR_INVALID_ARGUMENT TSRMLS_CC, "Error appending \"serverId\" option");
bson_destroy(&opts);
return false;
goto cleanup;
}

/* Although "opts" already always includes the serverId option, the read
Expand All @@ -891,27 +925,25 @@ int phongo_execute_command(mongoc_client_t *client, php_phongo_command_type_t ty
default:
/* Should never happen, but if it does: exception */
phongo_throw_exception(PHONGO_ERROR_LOGIC TSRMLS_CC, "Type '%d' should never have been passed to phongo_execute_command, please file a bug report", type);
bson_destroy(&opts);
return false;
goto cleanup;
}

free_reply = true;

if (!result) {
phongo_throw_exception_from_bson_error_t(&error TSRMLS_CC);
bson_destroy(&reply);
bson_destroy(&opts);
return false;
goto cleanup;
}

bson_destroy(&opts);

if (!return_value_used) {
bson_destroy(&reply);
return true;
goto cleanup;
}

/* According to mongoc_cursor_new_from_command_reply(), the reply bson_t
* is ultimately destroyed on both success and failure. */
if (bson_iter_init_find(&iter, &reply, "cursor") && BSON_ITER_HOLDS_DOCUMENT(&iter)) {
bson_t initial_reply = BSON_INITIALIZER;
bson_error_t error = {0};

bson_copy_to(&reply, &initial_reply);

Expand All @@ -925,17 +957,39 @@ int phongo_execute_command(mongoc_client_t *client, php_phongo_command_type_t ty
bson_append_int64(&initial_reply, "batchSize", -1, command->batch_size);
}

if (zsession && !mongoc_client_session_append(Z_SESSION_OBJ_P(zsession)->client_session, &initial_reply, &error)) {
phongo_throw_exception_from_bson_error_t(&error TSRMLS_CC);
bson_destroy(&initial_reply);
result = false;
goto cleanup;
}

cmd_cursor = mongoc_cursor_new_from_command_reply(client, &initial_reply, server_id);
bson_destroy(&reply);
} else {
bson_t *wrapped_reply = create_wrapped_command_envelope(db, &reply);

cmd_cursor = mongoc_cursor_new_from_command_reply(client, wrapped_reply, server_id);
bson_destroy(&reply);
}

phongo_cursor_init_for_command(return_value, client, cmd_cursor, db, zcommand, zreadPreference, zsession TSRMLS_CC);
return true;

cleanup:
bson_destroy(&opts);

if (free_reply) {
bson_destroy(&reply);
}

if (free_zsession) {
#if PHP_VERSION_ID >= 70000
zval_ptr_dtor(zsession);
efree(zsession);
#else
zval_ptr_dtor(&zsession);
#endif
}

return result;
} /* }}} */
/* }}} */

Expand Down
4 changes: 2 additions & 2 deletions php_phongo.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ void phongo_readconcern_init (zval *return_value, const
void phongo_readpreference_init (zval *return_value, const mongoc_read_prefs_t *read_prefs TSRMLS_DC);
void phongo_writeconcern_init (zval *return_value, const mongoc_write_concern_t *write_concern TSRMLS_DC);
bool phongo_execute_bulk_write (mongoc_client_t *client, const char *namespace, php_phongo_bulkwrite_t *bulk_write, zval *zwriteConcern, uint32_t server_id, zval *return_value, int return_value_used TSRMLS_DC);
int phongo_execute_command (mongoc_client_t *client, php_phongo_command_type_t type, const char *db, zval *zcommand, zval *zreadPreference, uint32_t server_id, zval *return_value, int return_value_used TSRMLS_DC);
int phongo_execute_query (mongoc_client_t *client, const char *namespace, zval *zquery, zval *zreadPreference, uint32_t server_id, zval *return_value, int return_value_used TSRMLS_DC);
bool phongo_execute_command (mongoc_client_t *client, php_phongo_command_type_t type, const char *db, zval *zcommand, zval *zreadPreference, uint32_t server_id, zval *return_value, int return_value_used TSRMLS_DC);
bool phongo_execute_query (mongoc_client_t *client, const char *namespace, zval *zquery, zval *zreadPreference, uint32_t server_id, zval *return_value, int return_value_used TSRMLS_DC);

bool phongo_cursor_advance_and_check_for_error(mongoc_cursor_t *cursor TSRMLS_DC);

Expand Down
22 changes: 21 additions & 1 deletion src/MongoDB/Cursor.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,22 @@

zend_class_entry *php_phongo_cursor_ce;

/* Check if the cursor is exhausted (i.e. ID is zero) and free any reference to
* the session. Calling this function during iteration will allow an implicit
* session to return to the pool immediately after a getMore indicates that the
* server has no more results to return. */
static void php_phongo_cursor_free_session_if_exhausted(php_phongo_cursor_t *cursor) /* {{{ */
{
if (mongoc_cursor_get_id(cursor->cursor)) {
return;
}

if (!Z_ISUNDEF(cursor->session)) {
zval_ptr_dtor(&cursor->session);
ZVAL_UNDEF(&cursor->session);
}
} /* }}} */

static void php_phongo_cursor_free_current(php_phongo_cursor_t *cursor) /* {{{ */
{
if (!Z_ISUNDEF(cursor->visitor_data.zchild)) {
Expand Down Expand Up @@ -117,6 +133,8 @@ static void php_phongo_cursor_iterator_move_forward(zend_object_iterator *iter T
phongo_throw_exception_from_bson_error_t(&error TSRMLS_CC);
}
}

php_phongo_cursor_free_session_if_exhausted(cursor);
} /* }}} */

static void php_phongo_cursor_iterator_rewind(zend_object_iterator *iter TSRMLS_DC) /* {{{ */
Expand Down Expand Up @@ -147,6 +165,8 @@ static void php_phongo_cursor_iterator_rewind(zend_object_iterator *iter TSRMLS_
if (doc) {
php_phongo_bson_to_zval_ex(bson_get_data(doc), doc->len, &cursor->visitor_data);
}

php_phongo_cursor_free_session_if_exhausted(cursor);
} /* }}} */

static zend_object_iterator_funcs php_phongo_cursor_iterator_funcs = {
Expand Down Expand Up @@ -440,7 +460,7 @@ static HashTable *php_phongo_cursor_get_debug_info(zval *object, int *is_temp TS
*is_temp = 1;
intern = Z_CURSOR_OBJ_P(object);

array_init_size(&retval, 9);
array_init_size(&retval, 10);

if (intern->database) {
ADD_ASSOC_STRING(&retval, "database", intern->database);
Expand Down
133 changes: 133 additions & 0 deletions tests/cursor/bug1152-001.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
--TEST--
PHPC-1152: Command cursors should use the same session for getMore and killCursors (implicit)
--SKIPIF--
<?php if (PHP_INT_SIZE !== 8) { die("skip Can't represent 64-bit ints on a 32-bit platform"); } ?>
<?php require __DIR__ . "/../utils/basic-skipif.inc"; ?>
<?php NEEDS_CRYPTO(); ?>
<?php NEEDS('STANDALONE'); ?>
<?php NEEDS_ATLEAST_MONGODB_VERSION(STANDALONE, "3.6"); ?>
<?php CLEANUP(STANDALONE); ?>
--FILE--
<?php
require_once __DIR__ . "/../utils/basic.inc";

class Test implements MongoDB\Driver\Monitoring\CommandSubscriber
{
private $lsidByCursorId = [];
private $lsidByRequestId = [];

public function executeCommand()
{
$manager = new MongoDB\Driver\Manager(STANDALONE);

$bulk = new MongoDB\Driver\BulkWrite;
$bulk->insert(['_id' => 1]);
$bulk->insert(['_id' => 2]);
$bulk->insert(['_id' => 3]);
$manager->executeBulkWrite(NS, $bulk);

$command = new MongoDB\Driver\Command([
'aggregate' => COLLECTION_NAME,
'pipeline' => [['$match' => new stdClass]],
'cursor' => ['batchSize' => 2],
]);

MongoDB\Driver\Monitoring\addSubscriber($this);

/* By creating two cursors with the same name, PHP's reference counting
* will destroy the first after the second is created. Note that
* mongoc_cursor_destroy also destroys implicit sessions and returns
* them to the LIFO pool. This sequencing allows us to test that getMore
* and killCursors use the session ID corresponding to the original
* aggregate command. */
$cursor = $manager->executeCommand(DATABASE_NAME, $command);
$cursor->toArray();

$cursor = $manager->executeCommand(DATABASE_NAME, $command);
$cursor->toArray();

$cursor = $manager->executeCommand(DATABASE_NAME, $command);
$cursor = $manager->executeCommand(DATABASE_NAME, $command);
unset($cursor);

MongoDB\Driver\Monitoring\removeSubscriber($this);

/* We should expect two unique session IDs over the course of the test,
* since at most two implicit sessions would have been in use at any
* given time. */
printf("Unique session IDs used: %d\n", count(array_unique($this->lsidByRequestId)));
}

public function commandStarted(MongoDB\Driver\Monitoring\CommandStartedEvent $event)
{
$requestId = $event->getRequestId();
$sessionId = bin2hex((string) $event->getCommand()->lsid->id);

printf("%s session ID: %s\n", $event->getCommandName(), $sessionId);

if ($event->getCommandName() === 'aggregate') {
if (isset($this->lsidByRequestId[$requestId])) {
throw new UnexpectedValueException('Previous command observed for request ID: ' . $requestId);
}

$this->lsidByRequestId[$requestId] = $sessionId;
}

if ($event->getCommandName() === 'getMore') {
$cursorId = $event->getCommand()->getMore;

if ( ! isset($this->lsidByCursorId[$cursorId])) {
throw new UnexpectedValueException('No previous command observed for cursor ID: ' . $cursorId);
}

printf("getMore used same session as aggregate: %s\n", $sessionId === $this->lsidByCursorId[$cursorId] ? 'yes' : 'no');
}

if ($event->getCommandName() === 'killCursors') {
$cursorId = $event->getCommand()->cursors[0];

if ( ! isset($this->lsidByCursorId[$cursorId])) {
throw new UnexpectedValueException('No previous command observed for cursor ID: ' . $cursorId);
}

printf("killCursors used same session as aggregate: %s\n", $sessionId === $this->lsidByCursorId[$cursorId] ? 'yes' : 'no');
}
}

public function commandSucceeded(MongoDB\Driver\Monitoring\CommandSucceededEvent $event)
{
/* Associate the aggregate's session ID with its cursor ID so it can be
* looked up by the subsequent getMore or killCursors */
if ($event->getCommandName() === 'aggregate') {
$cursorId = $event->getReply()->cursor->id;
$requestId = $event->getRequestId();

$this->lsidByCursorId[$cursorId] = $this->lsidByRequestId[$requestId];
}
}

public function commandFailed(MongoDB\Driver\Monitoring\CommandFailedEvent $event)
{
}
}

(new Test)->executeCommand();

?>
===DONE===
<?php exit(0); ?>
--EXPECTF--
aggregate session ID: %x
getMore session ID: %x
getMore used same session as aggregate: yes
aggregate session ID: %x
getMore session ID: %x
getMore used same session as aggregate: yes
aggregate session ID: %x
aggregate session ID: %x
killCursors session ID: %x
killCursors used same session as aggregate: yes
killCursors session ID: %x
killCursors used same session as aggregate: yes
Unique session IDs used: 2
===DONE===
Loading