Skip to content

Commit bd4f3fd

Browse files
committed
Merge pull request #138 from ruby-ldap/read-queue-by-message_id
Prioritize reads from queue by message ID
2 parents 49fd5c9 + 1e59169 commit bd4f3fd

File tree

2 files changed

+61
-7
lines changed

2 files changed

+61
-7
lines changed

lib/net/ldap/connection.rb

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,45 @@ def close
111111
@conn = nil
112112
end
113113

114+
# Internal: Reads messages by ID from a queue, falling back to reading from
115+
# the connected socket until a message matching the ID is read. Any messages
116+
# with mismatched IDs gets queued for subsequent reads by the origin of that
117+
# message ID.
118+
#
119+
# Returns a Net::LDAP::PDU object or nil.
120+
def queued_read(message_id)
121+
if pdu = message_queue[message_id].shift
122+
return pdu
123+
end
124+
125+
# read messages until we have a match for the given message_id
126+
while pdu = read
127+
if pdu.message_id == message_id
128+
return pdu
129+
else
130+
message_queue[pdu.message_id].push pdu
131+
next
132+
end
133+
end
134+
135+
pdu
136+
end
137+
138+
# Internal: The internal queue of messages, read from the socket, grouped by
139+
# message ID.
140+
#
141+
# Used by `queued_read` to return messages sent by the server with the given
142+
# ID. If no messages are queued for that ID, `queued_read` will `read` from
143+
# the socket and queue messages that don't match the given ID for other
144+
# readers.
145+
#
146+
# Returns the message queue Hash.
147+
def message_queue
148+
@message_queue ||= Hash.new do |hash, key|
149+
hash[key] = []
150+
end
151+
end
152+
114153
# Internal: Reads and parses data from the configured connection.
115154
#
116155
# - syntax: the BER syntax to use to parse the read data with
@@ -146,9 +185,9 @@ def read(syntax = Net::LDAP::AsnSyntax)
146185
#
147186
# Returns the return value from writing to the connection, which in some
148187
# cases is the Integer number of bytes written to the socket.
149-
def write(request, controls = nil)
188+
def write(request, controls = nil, message_id = next_msgid)
150189
instrument "write.net_ldap_connection" do |payload|
151-
packet = [next_msgid.to_ber, request, controls].compact.to_ber_sequence
190+
packet = [message_id.to_ber, request, controls].compact.to_ber_sequence
152191
payload[:content_length] = @conn.write(packet)
153192
end
154193
end
@@ -356,7 +395,10 @@ def search(args = {})
356395
result_pdu = nil
357396
n_results = 0
358397

398+
message_id = next_msgid
399+
359400
instrument "search.net_ldap_connection",
401+
:message_id => message_id,
360402
:filter => search_filter,
361403
:base => search_base,
362404
:scope => scope,
@@ -403,12 +445,12 @@ def search(args = {})
403445
controls << sort_control if sort_control
404446
controls = controls.empty? ? nil : controls.to_ber_contextspecific(0)
405447

406-
write(request, controls)
448+
write(request, controls, message_id)
407449

408450
result_pdu = nil
409451
controls = []
410452

411-
while pdu = read
453+
while pdu = queued_read(message_id)
412454
case pdu.app_tag
413455
when Net::LDAP::PDU::SearchReturnedData
414456
n_results += 1
@@ -476,6 +518,14 @@ def search(args = {})
476518

477519
result_pdu || OpenStruct.new(:status => :failure, :result_code => 1, :message => "Invalid search")
478520
end # instrument
521+
ensure
522+
# clean up message queue for this search
523+
messages = message_queue.delete(message_id)
524+
525+
unless messages.empty?
526+
instrument "search_messages_unread.net_ldap_connection",
527+
message_id: message_id, messages: messages
528+
end
479529
end
480530

481531
MODIFY_OPERATIONS = { #:nodoc:

test/test_ldap_connection.rb

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,20 +185,21 @@ def test_bind_net_ldap_connection_event
185185

186186
def test_search_net_ldap_connection_event
187187
# search data
188-
search_data_ber = Net::BER::BerIdentifiedArray.new([2, [
188+
search_data_ber = Net::BER::BerIdentifiedArray.new([1, [
189189
"uid=user1,ou=OrgUnit2,ou=OrgUnitTop,dc=openldap,dc=ghe,dc=local",
190190
[ ["uid", ["user1"]] ]
191191
]])
192192
search_data_ber.ber_identifier = Net::LDAP::PDU::SearchReturnedData
193-
search_data = [2, search_data_ber]
193+
search_data = [1, search_data_ber]
194194
# search result (end of results)
195195
search_result_ber = Net::BER::BerIdentifiedArray.new([0, "", ""])
196196
search_result_ber.ber_identifier = Net::LDAP::PDU::SearchResult
197-
search_result = [2, search_result_ber]
197+
search_result = [1, search_result_ber]
198198
@tcp_socket.should_receive(:read_ber).and_return(search_data).
199199
and_return(search_result)
200200

201201
events = @service.subscribe "search.net_ldap_connection"
202+
unread = @service.subscribe "search_messages_unread.net_ldap_connection"
202203

203204
result = @connection.search(filter: "(uid=user1)")
204205
assert result.success?, "should be success"
@@ -209,5 +210,8 @@ def test_search_net_ldap_connection_event
209210
assert payload.has_key?(:filter)
210211
assert_equal "(uid=user1)", payload[:filter].to_s
211212
assert result
213+
214+
# ensure no unread
215+
assert unread.empty?, "should not have any leftover unread messages"
212216
end
213217
end

0 commit comments

Comments
 (0)