Skip to content

PHPLIB-247: Use strings instead of memory stream for GridFS download buffering #323

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 21 additions & 20 deletions src/GridFS/CollectionWrapper.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
use MongoDB\Driver\Cursor;
use MongoDB\Driver\Manager;
use MongoDB\Driver\ReadPreference;
use IteratorIterator;
use stdClass;

/**
Expand Down Expand Up @@ -87,6 +86,27 @@ public function dropCollections()
$this->chunksCollection->drop(['typeMap' => []]);
}

/**
* Finds GridFS chunk documents for a given file ID and optional offset.
*
* @param mixed $id File ID
* @param integer $fromChunk Starting chunk (inclusive)
* @return Cursor
*/
public function findChunksByFileId($id, $fromChunk = 0)
{
return $this->chunksCollection->find(
[
'files_id' => $id,
'n' => ['$gte' => $fromChunk],
],
[
'sort' => ['n' => 1],
'typeMap' => ['root' => 'stdClass'],
]
);
}

/**
* Finds a GridFS file document for a given filename and revision.
*
Expand Down Expand Up @@ -177,25 +197,6 @@ public function getBucketName()
return $this->bucketName;
}

/**
* Returns a chunks iterator for a given file ID.
*
* @param mixed $id
* @return IteratorIterator
*/
public function getChunksIteratorByFilesId($id)
{
$cursor = $this->chunksCollection->find(
['files_id' => $id],
[
'sort' => ['n' => 1],
'typeMap' => ['root' => 'stdClass'],
]
);

return new IteratorIterator($cursor);
}

/**
* Return the database name.
*
Expand Down
188 changes: 102 additions & 86 deletions src/GridFS/ReadableStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use MongoDB\Exception\InvalidArgumentException;
use MongoDB\GridFS\Exception\CorruptFileException;
use IteratorIterator;
use stdClass;

/**
Expand All @@ -29,18 +30,15 @@
class ReadableStream
{
private $buffer;
private $bufferEmpty;
private $bufferFresh;
private $bytesSeen = 0;
private $bufferOffset = 0;
private $chunkSize;
private $chunkOffset = 0;
private $chunksIterator;
private $collectionWrapper;
private $expectedLastChunkSize = 0;
private $file;
private $firstCheck = true;
private $iteratorEmpty = false;
private $length;
private $numChunks;
private $numChunks = 0;

/**
* Constructs a readable GridFS stream.
Expand All @@ -64,13 +62,15 @@ public function __construct(CollectionWrapper $collectionWrapper, stdClass $file
}

$this->file = $file;
$this->chunkSize = $file->chunkSize;
$this->length = $file->length;
$this->chunkSize = (integer) $file->chunkSize;
$this->length = (integer) $file->length;

$this->chunksIterator = $collectionWrapper->getChunksIteratorByFilesId($file->_id);
$this->collectionWrapper = $collectionWrapper;
$this->numChunks = ceil($this->length / $this->chunkSize);
$this->initEmptyBuffer();

if ($this->length > 0) {
$this->numChunks = (integer) ceil($this->length / $this->chunkSize);
$this->expectedLastChunkSize = ($this->length - (($this->numChunks - 1) * $this->chunkSize));
}
}

/**
Expand All @@ -90,56 +90,7 @@ public function __debugInfo()

public function close()
{
fclose($this->buffer);
}

/**
* Read bytes from the stream.
*
* Note: this method may return a string smaller than the requested length
* if data is not available to be read.
*
* @param integer $numBytes Number of bytes to read
* @return string
* @throws InvalidArgumentException if $numBytes is negative
*/
public function downloadNumBytes($numBytes)
{
if ($numBytes < 0) {
throw new InvalidArgumentException(sprintf('$numBytes must be >= zero; given: %d', $numBytes));
}

if ($numBytes == 0) {
return '';
}

if ($this->bufferFresh) {
rewind($this->buffer);
$this->bufferFresh = false;
}

// TODO: Should we be checking for fread errors here?
$output = fread($this->buffer, $numBytes);

if (strlen($output) == $numBytes) {
return $output;
}

$this->initEmptyBuffer();

$bytesLeft = $numBytes - strlen($output);

while (strlen($output) < $numBytes && $this->advanceChunks()) {
$bytesLeft = $numBytes - strlen($output);
$output .= substr($this->chunksIterator->current()->data->getData(), 0, $bytesLeft);
}

if ( ! $this->iteratorEmpty && $this->length > 0 && $bytesLeft < strlen($this->chunksIterator->current()->data->getData())) {
fwrite($this->buffer, substr($this->chunksIterator->current()->data->getData(), $bytesLeft));
$this->bufferEmpty = false;
}

return $output;
// Nothing to do
}

/**
Expand All @@ -162,58 +113,123 @@ public function getSize()
return $this->length;
}

/**
* Return whether the current read position is at the end of the stream.
*
* @return boolean
*/
public function isEOF()
{
return ($this->iteratorEmpty && $this->bufferEmpty);
if ($this->chunkOffset === $this->numChunks - 1) {
return $this->bufferOffset >= $this->expectedLastChunkSize;
}

return $this->chunkOffset >= $this->numChunks;
}

private function advanceChunks()
/**
* Read bytes from the stream.
*
* Note: this method may return a string smaller than the requested length
* if data is not available to be read.
*
* @param integer $length Number of bytes to read
* @return string
* @throws InvalidArgumentException if $length is negative
*/
public function readBytes($length)
{
if ($this->chunkOffset >= $this->numChunks) {
$this->iteratorEmpty = true;
if ($length < 0) {
throw new InvalidArgumentException(sprintf('$length must be >= 0; given: %d', $length));
}

return false;
if ($this->chunksIterator === null) {
$this->initChunksIterator();
}

if ($this->buffer === null && ! $this->initBufferFromCurrentChunk()) {
return '';
}

if ($this->firstCheck) {
$this->chunksIterator->rewind();
$this->firstCheck = false;
} else {
$this->chunksIterator->next();
$data = '';

while (strlen($data) < $length) {
if ($this->bufferOffset >= strlen($this->buffer) && ! $this->initBufferFromNextChunk()) {
break;
}

$initialDataLength = strlen($data);
$data .= substr($this->buffer, $this->bufferOffset, $length - $initialDataLength);
$this->bufferOffset += strlen($data) - $initialDataLength;
}

return $data;
}

/**
* Initialize the buffer to the current chunk's data.
*
* @return boolean Whether there was a current chunk to read
* @throws CorruptFileException if an expected chunk could not be read successfully
*/
private function initBufferFromCurrentChunk()
{
if ($this->chunkOffset === 0 && $this->numChunks === 0) {
return false;
}

if ( ! $this->chunksIterator->valid()) {
throw CorruptFileException::missingChunk($this->chunkOffset);
}

if ($this->chunksIterator->current()->n != $this->chunkOffset) {
throw CorruptFileException::unexpectedIndex($this->chunksIterator->current()->n, $this->chunkOffset);
$currentChunk = $this->chunksIterator->current();

if ($currentChunk->n !== $this->chunkOffset) {
throw CorruptFileException::unexpectedIndex($currentChunk->n, $this->chunkOffset);
}

$actualChunkSize = strlen($this->chunksIterator->current()->data->getData());
$this->buffer = $currentChunk->data->getData();

$expectedChunkSize = ($this->chunkOffset == $this->numChunks - 1)
? ($this->length - $this->bytesSeen)
$actualChunkSize = strlen($this->buffer);

$expectedChunkSize = ($this->chunkOffset === $this->numChunks - 1)
? $this->expectedLastChunkSize
: $this->chunkSize;

if ($actualChunkSize != $expectedChunkSize) {
if ($actualChunkSize !== $expectedChunkSize) {
throw CorruptFileException::unexpectedSize($actualChunkSize, $expectedChunkSize);
}

$this->bytesSeen += $actualChunkSize;
$this->chunkOffset++;

return true;
}

private function initEmptyBuffer()
/**
* Advance to the next chunk and initialize the buffer to its data.
*
* @return boolean Whether there was a next chunk to read
* @throws CorruptFileException if an expected chunk could not be read successfully
*/
private function initBufferFromNextChunk()
{
if (isset($this->buffer)) {
fclose($this->buffer);
if ($this->chunkOffset === $this->numChunks - 1) {
return false;
}

$this->buffer = fopen("php://memory", "w+b");
$this->bufferEmpty = true;
$this->bufferFresh = true;
$this->bufferOffset = 0;
$this->chunkOffset++;
$this->chunksIterator->next();

return $this->initBufferFromCurrentChunk();
}

/**
* Initializes the chunk iterator starting from the current offset.
*/
private function initChunksIterator()
{
$cursor = $this->collectionWrapper->findChunksByFileId($this->file->_id, $this->chunkOffset);

$this->chunksIterator = new IteratorIterator($cursor);
$this->chunksIterator->rewind();
}
}
6 changes: 3 additions & 3 deletions src/GridFS/StreamWrapper.php
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,17 @@ public function stream_open($path, $mode, $options, &$openedPath)
* if data is not available to be read.
*
* @see http://php.net/manual/en/streamwrapper.stream-read.php
* @param integer $count Number of bytes to read
* @param integer $length Number of bytes to read
* @return string
*/
public function stream_read($count)
public function stream_read($length)
{
if ( ! $this->stream instanceof ReadableStream) {
return '';
}

try {
return $this->stream->downloadNumBytes($count);
return $this->stream->readBytes($length);
} catch (Exception $e) {
trigger_error(sprintf('%s: %s', get_class($e), $e->getMessage()), \E_USER_WARNING);
return false;
Expand Down
Loading