Skip to content

PYTHON-1592 Remove Collection.parallel_scan #547

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion doc/api/pymongo/collection.rst
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
.. automethod:: options
.. automethod:: map_reduce
.. automethod:: inline_map_reduce
.. automethod:: parallel_scan
.. automethod:: initialize_unordered_bulk_op
.. automethod:: initialize_ordered_bulk_op
.. automethod:: group
Expand Down
9 changes: 5 additions & 4 deletions doc/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ Breaking Changes in 4.0
.......................

- Removed support for Python 2.7, 3.4, and 3.5. Python 3.6+ is now required.
- Removed :meth:`~pymongo.database.Database.eval`,
:data:`~pymongo.database.Database.system_js` and
:class:`~pymongo.database.SystemJS`.
- Removed :meth:`pymongo.database.Database.eval`,
:data:`pymongo.database.Database.system_js` and
:class:`pymongo.database.SystemJS`.
- Removed :meth:`pymongo.mongo_client.MongoClient.fsync`,
:meth:`pymongo.mongo_client.MongoClient.unlock`, and
:attr:`pymongo.mongo_client.MongoClient.is_locked`.
- Removed :mod:`~pymongo.thread_util`.
- Removed :meth:`pymongo.collection.Collection.parallel_scan`.
- Removed :mod:`pymongo.thread_util`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When did this happen?

Copy link
Member Author

@ShaneHarvey ShaneHarvey Jan 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In https://jira.mongodb.org/browse/PYTHON-2462. pymongo.thread_util was undocumented so I doubt anyone is using it externally.


Notable improvements
....................
Expand Down
9 changes: 9 additions & 0 deletions doc/migrate-to-pymongo4.rst
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,12 @@ can be changed to this::

>>> from bson.code import Code
>>> result = database.command('eval', Code('function (x) {return x;}'), args=[3]).get('retval')


Collection.parallel_scan is removed
...................................

Removed :meth:`~pymongo.collection.Collection.parallel_scan`. MongoDB 4.2
removed the `parallelCollectionScan command`_. There is no replacement.

.. _parallelCollectionScan command: https://docs.mongodb.com/manual/reference/command/parallelCollectionScan/
87 changes: 0 additions & 87 deletions pymongo/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1559,93 +1559,6 @@ def find_raw_batches(self, *args, **kwargs):

return RawBatchCursor(self, *args, **kwargs)

def parallel_scan(self, num_cursors, session=None, **kwargs):
"""**DEPRECATED**: Scan this entire collection in parallel.

Returns a list of up to ``num_cursors`` cursors that can be iterated
concurrently. As long as the collection is not modified during
scanning, each document appears once in one of the cursors result
sets.

For example, to process each document in a collection using some
thread-safe ``process_document()`` function:

>>> def process_cursor(cursor):
... for document in cursor:
... # Some thread-safe processing function:
... process_document(document)
>>>
>>> # Get up to 4 cursors.
...
>>> cursors = collection.parallel_scan(4)
>>> threads = [
... threading.Thread(target=process_cursor, args=(cursor,))
... for cursor in cursors]
>>>
>>> for thread in threads:
... thread.start()
>>>
>>> for thread in threads:
... thread.join()
>>>
>>> # All documents have now been processed.

The :meth:`parallel_scan` method obeys the :attr:`read_preference` of
this :class:`Collection`.

:Parameters:
- `num_cursors`: the number of cursors to return
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession`.
- `**kwargs`: additional options for the parallelCollectionScan
command can be passed as keyword arguments.

.. note:: Requires server version **>= 2.5.5**.

.. versionchanged:: 3.7
Deprecated.

.. versionchanged:: 3.6
Added ``session`` parameter.

.. versionchanged:: 3.4
Added back support for arbitrary keyword arguments. MongoDB 3.4
adds support for maxTimeMS as an option to the
parallelCollectionScan command.

.. versionchanged:: 3.0
Removed support for arbitrary keyword arguments, since
the parallelCollectionScan command has no optional arguments.
"""
warnings.warn("parallel_scan is deprecated. MongoDB 4.2 will remove "
"the parallelCollectionScan command.",
DeprecationWarning, stacklevel=2)
cmd = SON([('parallelCollectionScan', self.__name),
('numCursors', num_cursors)])
cmd.update(kwargs)

with self._socket_for_reads(session) as (sock_info, slave_ok):
# We call sock_info.command here directly, instead of
# calling self._command to avoid using an implicit session.
result = sock_info.command(
self.__database.name,
cmd,
slave_ok,
self._read_preference_for(session),
self.codec_options,
read_concern=self.read_concern,
parse_write_concern_error=True,
session=session,
client=self.__database.client)

cursors = []
for cursor in result['cursors']:
cursors.append(CommandCursor(
self, cursor['cursor'], sock_info.address,
session=session, explicit_session=session is not None))

return cursors

def _count(self, cmd, collation=None, session=None):
"""Internal count helper."""
# XXX: "ns missing" checks can be removed when we drop support for
Expand Down
46 changes: 0 additions & 46 deletions test/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1737,52 +1737,6 @@ def test_aggregation_cursor_alive(self):

self.assertTrue(cursor.alive)

@client_context.require_no_mongos
@client_context.require_version_max(4, 1, 0)
@ignore_deprecations
def test_parallel_scan(self):
db = self.db
db.drop_collection("test")
if client_context.has_secondaries:
# Test that getMore messages are sent to the right server.
db = self.client.get_database(
db.name,
read_preference=ReadPreference.SECONDARY,
write_concern=WriteConcern(w=self.w))

coll = db.test
coll.insert_many([{'_id': i} for i in range(8000)])
docs = []
threads = [threading.Thread(target=docs.extend, args=(cursor,))
for cursor in coll.parallel_scan(3)]
for t in threads:
t.start()
for t in threads:
t.join()

self.assertEqual(
set(range(8000)),
set(doc['_id'] for doc in docs))

@client_context.require_no_mongos
@client_context.require_version_min(3, 3, 10)
@client_context.require_version_max(4, 1, 0)
@client_context.require_test_commands
@ignore_deprecations
def test_parallel_scan_max_time_ms(self):
self.client.admin.command("configureFailPoint",
"maxTimeAlwaysTimeOut",
mode="alwaysOn")
try:
self.assertRaises(ExecutionTimeout,
self.db.test.parallel_scan,
3,
maxTimeMS=1)
finally:
self.client.admin.command("configureFailPoint",
"maxTimeAlwaysTimeOut",
mode="off")

def test_large_limit(self):
db = self.db
db.drop_collection("test_large_limit")
Expand Down
42 changes: 0 additions & 42 deletions test/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,40 +323,6 @@ def test_collection(self):

self._test_ops(client, *ops)

@client_context.require_no_mongos
@client_context.require_version_max(4, 1, 0)
@ignore_deprecations
def test_parallel_collection_scan(self):
listener = self.listener
client = self.client
coll = client.pymongo_test.collection
coll.insert_many([{'_id': i} for i in range(1000)])

listener.results.clear()

def scan(session=None):
cursors = coll.parallel_scan(4, session=session)
for c in cursors:
c.batch_size(2)
list(c)

listener.results.clear()
with client.start_session() as session:
scan(session)
cursor_lsids = {}
for event in listener.results['started']:
self.assertIn(
'lsid', event.command,
"parallel_scan sent no lsid with %s" % (event.command_name, ))

if event.command_name == 'getMore':
cursor_id = event.command['getMore']
if cursor_id in cursor_lsids:
self.assertEqual(cursor_lsids[cursor_id],
event.command['lsid'])
else:
cursor_lsids[cursor_id] = event.command['lsid']

def test_cursor_clone(self):
coll = self.client.pymongo_test.collection
# Ensure some batches.
Expand Down Expand Up @@ -874,14 +840,6 @@ def test_reads(self):
lambda coll, session: coll.inline_map_reduce(
'function() {}', 'function() {}', session=session),
exception=map_reduce_exc)
if (not client_context.is_mongos and
not client_context.version.at_least(4, 1, 0)):
def scan(coll, session):
cursors = coll.parallel_scan(1, session=session)
for cur in cursors:
list(cur)
self._test_reads(
lambda coll, session: scan(coll, session=session))

self.assertRaises(
ConfigurationError,
Expand Down