Skip to content

Commit 10ef3f6

Browse files
committed
Merge pull request #619 from estolfo/RUBY-909-parallel-scan
RUBY-909 Add #parallel_scan back onto Collection::View
2 parents ff4fc0e + b6c2038 commit 10ef3f6

File tree

5 files changed

+212
-0
lines changed

5 files changed

+212
-0
lines changed

lib/mongo/collection/view/readable.rb

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,35 @@ def aggregate(pipeline, options = {})
5353
Aggregation.new(self, pipeline, options)
5454
end
5555

56+
# Execute a parallel scan on the collection view.
57+
# Returns a list of up to cursor_count cursors that can be iterated concurrently.
58+
# As long as the collection is not modified during scanning, each document appears once
59+
# in one of the cursors' result sets.
60+
#
61+
# @example Execute a parallel collection scan.
62+
# view.parallel_scan(2)
63+
#
64+
# @param [ Integer ] cursor_count The max number of cursors to return.
65+
#
66+
# @return [ Array<Cursor> ] An array of cursors.
67+
#
68+
# @since 2.1
69+
def parallel_scan(cursor_count)
70+
server = read.select_server(cluster)
71+
Operation::ParallelScan.new(
72+
:coll_name => collection.name,
73+
:db_name => database.name,
74+
:cursor_count => cursor_count
75+
).execute(server.context).cursor_ids.map do |cursor_id|
76+
result = Operation::Read::GetMore.new({ :to_return => 0,
77+
:cursor_id => cursor_id,
78+
:db_name => database.name,
79+
:coll_name => collection.name
80+
}).execute(server.context)
81+
Cursor.new(self, result, server)
82+
end
83+
end
84+
5685
# Allows the query to get partial results if some shards are down.
5786
#
5887
# @example Allow partial results.

lib/mongo/operation.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,4 @@
2323
require 'mongo/operation/command'
2424
require 'mongo/operation/kill_cursors'
2525
require 'mongo/operation/map_reduce'
26+
require 'mongo/operation/parallel_scan'

lib/mongo/operation/parallel_scan.rb

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# Copyright (C) 2009-2014 MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
require 'mongo/operation/parallel_scan/result'
16+
17+
module Mongo
18+
module Operation
19+
20+
# A MongoDB parallel scan operation.
21+
#
22+
# @example Create the parallel scan operation.
23+
# ParallelScan.new({
24+
# :db_name => 'test_db',
25+
# :coll_name = > 'test_collection',
26+
# :cursor_count => 5
27+
# })
28+
#
29+
# @param [ Hash ] spec The specifications for the operation.
30+
#
31+
# @option spec :db_name [ String ] The name of the database on which
32+
# the operation should be executed.
33+
# @option spec :coll_name [ String ] The collection to scan.
34+
# @option spec :cursor_count [ Integer ] The number of cursors to use.
35+
# @option spec :options [ Hash ] Options for the command.
36+
#
37+
# @since 2.0.0
38+
class ParallelScan
39+
include Executable
40+
include Specifiable
41+
include Limited
42+
include ReadPreferrable
43+
44+
# Execute the parallel scan operation.
45+
#
46+
# @example Execute the operation.
47+
# operation.execute(context)
48+
#
49+
# @params [ Mongo::Server::Context ] The context for this operation.
50+
#
51+
# @return [ Result ] The operation response, if there is one.
52+
#
53+
# @since 2.0.0
54+
def execute(context)
55+
execute_message(context)
56+
end
57+
58+
private
59+
60+
def execute_message(context)
61+
context.with_connection do |connection|
62+
Result.new(connection.dispatch([ message(context) ])).validate!
63+
end
64+
end
65+
66+
def selector
67+
{ :parallelCollectionScan => coll_name, :numCursors => cursor_count }
68+
end
69+
70+
def query_coll
71+
Database::COMMAND
72+
end
73+
end
74+
end
75+
end
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
2+
# Copyright (C) 2009-2014 MongoDB, Inc.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
module Mongo
17+
module Operation
18+
class ParallelScan
19+
20+
# Defines custom behaviour of results in a parallel scan.
21+
#
22+
# @since 2.0.0
23+
class Result < Operation::Result
24+
25+
# Get the name of the cursor field.
26+
#
27+
# @since 2.0.0
28+
CURSOR = 'cursor'.freeze
29+
30+
# The name of the cursors field in the result.
31+
#
32+
# @since 2.0.0
33+
CURSORS = 'cursors'.freeze
34+
35+
# Get the name of the id field.
36+
#
37+
# @since 2.0.0
38+
ID = 'id'.freeze
39+
40+
# Get all the cursor ids from the result.
41+
#
42+
# @example Get the cursor ids.
43+
# result.cursor_ids
44+
#
45+
# @return [ Array<Integer> ] The cursor ids.
46+
#
47+
# @since 2.0.0
48+
def cursor_ids
49+
documents.map{ |doc| doc[CURSOR][ID] }
50+
end
51+
52+
# Get the documents from parallel scan.
53+
#
54+
# @example Get the documents.
55+
# result.documents
56+
#
57+
# @return [ Array<BSON::Document> ] The documents.
58+
#
59+
# @since 2.0.0
60+
def documents
61+
reply.documents[0][CURSORS]
62+
end
63+
64+
private
65+
66+
def first
67+
@first ||= reply.documents[0] || {}
68+
end
69+
end
70+
end
71+
end
72+
end

spec/mongo/collection/view/readable_spec.rb

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,41 @@
434434
end
435435
end
436436

437+
describe '#parallel_scan' do
438+
439+
let(:documents) do
440+
(1..200).map do |i|
441+
{ name: "testing-scan-#{i}" }
442+
end
443+
end
444+
445+
before do
446+
authorized_collection.insert_many(documents)
447+
end
448+
449+
let(:cursors) do
450+
view.parallel_scan(2)
451+
end
452+
453+
it 'returns an array of cursors', if: write_command_enabled? do
454+
cursors.each do |cursor|
455+
expect(cursor.class).to be(Mongo::Cursor)
456+
end
457+
end
458+
459+
it 'returns the correct number of documents', if: write_command_enabled? do
460+
expect(
461+
cursors.reduce(0) { |total, cursor| total + cursor.to_a.size }
462+
).to eq(200)
463+
end
464+
465+
it 'raises an error', unless: write_command_enabled? do
466+
expect {
467+
cursors
468+
}.to raise_error(Mongo::Error::OperationFailure)
469+
end
470+
end
471+
437472
describe '#projection' do
438473

439474
let(:options) do

0 commit comments

Comments
 (0)