Skip to content

Commit fc4e86d

Browse files
committed
Change OutboundQueue flush api so that it flushes entire contents
The do_attempt_write_data() function would previously only flush one item at a time. This would cause non-optimal batching in cases where the queue was full and the RoutingMessageHandler was only asked for a single item. Instead, change the API to flush the entire contents of the queue.
1 parent 9d455f1 commit fc4e86d

File tree

2 files changed

+60
-48
lines changed

2 files changed

+60
-48
lines changed

lightning/src/ln/peers/handler.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,10 @@ pub(super) trait MessageQueuer {
8282
/// Trait representing a container that can try to flush data through a SocketDescriptor. Used by the
8383
/// PeerManager to handle flushing the outbound queue and flow control.
8484
pub(super) trait SocketDescriptorFlusher {
85-
/// Write previously enqueued data to the SocketDescriptor. A return of false indicates the
86-
/// underlying SocketDescriptor could not fulfill the send_data() call and the blocked state
87-
/// has been set. Use unblock() when the SocketDescriptor may have more room.
88-
fn try_flush_one(&mut self, descriptor: &mut impl SocketDescriptor) -> bool;
85+
/// Attempts to write all previously enqueued data to the SocketDescriptor. A return of false
86+
/// indicates the underlying SocketDescriptor could not fulfill the send_data() calls and the
87+
/// blocked state has been set. Use unblock() when the SocketDescriptor may have more room.
88+
fn try_flush(&mut self, descriptor: &mut impl SocketDescriptor) -> bool;
8989

9090
/// Clear the blocked state caused when a previous write failed
9191
fn unblock(&mut self);
@@ -656,7 +656,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
656656
break;
657657
}
658658

659-
outbound_queue.try_flush_one(descriptor);
659+
outbound_queue.try_flush(descriptor);
660660
}
661661
}
662662

lightning/src/ln/peers/outbound_queue.rs

Lines changed: 55 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -40,32 +40,37 @@ impl PayloadQueuer for OutboundQueue {
4040
}
4141

4242
impl SocketDescriptorFlusher for OutboundQueue {
43-
fn try_flush_one(&mut self, descriptor: &mut impl SocketDescriptor) -> bool {
43+
fn try_flush(&mut self, descriptor: &mut impl SocketDescriptor) -> bool {
4444
// Exit early if a previous full write failed and haven't heard that there may be more
4545
// room available
4646
if self.blocked {
4747
return false;
4848
}
4949

50-
let full_write_succeeded = match self.buffer.front() {
51-
None => true,
52-
Some(next_buff) => {
53-
let should_be_reading = self.buffer.len() < self.soft_limit;
54-
let pending = &next_buff[self.buffer_first_msg_offset..];
55-
let data_sent = descriptor.send_data(pending, should_be_reading);
56-
self.buffer_first_msg_offset += data_sent;
57-
self.buffer_first_msg_offset == next_buff.len()
50+
loop {
51+
if self.buffer.is_empty() {
52+
return true;
5853
}
59-
};
6054

61-
if full_write_succeeded {
62-
self.buffer_first_msg_offset = 0;
63-
self.buffer.pop_front();
64-
} else {
65-
self.blocked = true;
55+
let full_write_succeeded = match self.buffer.front() {
56+
None => true,
57+
Some(next_buff) => {
58+
let should_be_reading = self.buffer.len() < self.soft_limit;
59+
let pending = &next_buff[self.buffer_first_msg_offset..];
60+
let data_sent = descriptor.send_data(pending, should_be_reading);
61+
self.buffer_first_msg_offset += data_sent;
62+
self.buffer_first_msg_offset == next_buff.len()
63+
}
64+
};
65+
66+
if full_write_succeeded {
67+
self.buffer_first_msg_offset = 0;
68+
self.buffer.pop_front();
69+
} else {
70+
self.blocked = true;
71+
return false;
72+
}
6673
}
67-
68-
full_write_succeeded
6974
}
7075

7176
fn unblock(&mut self) {
@@ -96,40 +101,40 @@ mod tests {
96101
use super::*;
97102
use ln::peers::test_util::*;
98103

99-
// Test that a try_flush_one() call with no queued data doesn't write anything
104+
// Test that a try_flush() call with no queued data doesn't write anything
100105
#[test]
101106
fn empty_does_not_write() {
102107
let mut descriptor = SocketDescriptorMock::new();
103108
let mut empty = OutboundQueue::new(10);
104109

105-
assert!(empty.try_flush_one(&mut descriptor));
110+
assert!(empty.try_flush(&mut descriptor));
106111
descriptor.assert_called_with(vec![]);
107112

108113
}
109114

110-
// Test that try_flush_one() sends the push_back
115+
// Test that try_flush() sends the push_back
111116
#[test]
112117
fn push_back_drain() {
113118
let mut descriptor = SocketDescriptorMock::new();
114119
let mut queue = OutboundQueue::new(10);
115120

116121
queue.push_back(vec![1]);
117-
assert!(queue.try_flush_one(&mut descriptor));
122+
assert!(queue.try_flush(&mut descriptor));
118123

119124
descriptor.assert_called_with(vec![(vec![1], true)]);
120125
}
121126

122-
// Test that try_flush_one() sends just first push_back
127+
// Test that try_flush() sends all
123128
#[test]
124129
fn push_back_push_back_drain_drain() {
125130
let mut descriptor = SocketDescriptorMock::new();
126131
let mut queue = OutboundQueue::new(10);
127132

128133
queue.push_back(vec![1]);
129134
queue.push_back(vec![2]);
130-
assert!(queue.try_flush_one(&mut descriptor));
135+
assert!(queue.try_flush(&mut descriptor));
131136

132-
descriptor.assert_called_with(vec![(vec![1], true)]);
137+
descriptor.assert_called_with(vec![(vec![1], true), (vec![2], true)]);
133138
}
134139

135140
// Test that descriptor that can't write all bytes returns valid response
@@ -139,27 +144,40 @@ mod tests {
139144
let mut queue = OutboundQueue::new(10);
140145

141146
queue.push_back(vec![1, 2, 3]);
142-
assert!(!queue.try_flush_one(&mut descriptor));
147+
assert!(!queue.try_flush(&mut descriptor));
143148

144149
descriptor.assert_called_with(vec![(vec![1, 2, 3], true)]);
145150
}
146151

152+
// Test that descriptor that can't write all bytes (in second pushed item) returns valid response
153+
#[test]
154+
fn push_back_drain_partial_multiple_push() {
155+
let mut descriptor = SocketDescriptorMock::with_fixed_size(2);
156+
let mut queue = OutboundQueue::new(10);
157+
158+
queue.push_back(vec![1]);
159+
queue.push_back(vec![2, 3]);
160+
assert!(!queue.try_flush(&mut descriptor));
161+
162+
descriptor.assert_called_with(vec![(vec![1], true), (vec![2, 3], true)]);
163+
}
164+
147165
// Test the bookkeeping for multiple partial writes
148166
#[test]
149-
fn push_back_drain_partial_drain_partial_try_flush_one() {
167+
fn push_back_drain_partial_drain_partial_try_flush() {
150168
let mut descriptor = SocketDescriptorMock::with_fixed_size(1);
151169
let mut queue = OutboundQueue::new(10);
152170

153171
queue.push_back(vec![1, 2, 3]);
154-
assert!(!queue.try_flush_one(&mut descriptor));
172+
assert!(!queue.try_flush(&mut descriptor));
155173

156174
descriptor.make_room(1);
157175
queue.unblock();
158-
assert!(!queue.try_flush_one(&mut descriptor));
176+
assert!(!queue.try_flush(&mut descriptor));
159177

160178
descriptor.make_room(1);
161179
queue.unblock();
162-
assert!(queue.try_flush_one(&mut descriptor));
180+
assert!(queue.try_flush(&mut descriptor));
163181

164182
descriptor.assert_called_with(vec![(vec![1, 2, 3], true), (vec![2, 3], true), (vec![3], true)]);
165183
}
@@ -171,27 +189,27 @@ mod tests {
171189

172190
// Fail write and move to blocked state
173191
queue.push_back(vec![1, 2]);
174-
assert!(!queue.try_flush_one(&mut descriptor));
192+
assert!(!queue.try_flush(&mut descriptor));
175193
descriptor.assert_called_with(vec![(vec![1, 2], true)]);
176194

177195
// Make room but don't signal
178196
descriptor.make_room(1);
179-
assert!(!queue.try_flush_one(&mut descriptor));
197+
assert!(!queue.try_flush(&mut descriptor));
180198
assert!(queue.is_blocked());
181199
descriptor.assert_called_with(vec![(vec![1, 2], true)]);
182200

183201
// Unblock and try again
184202
queue.unblock();
185203

186204
// Partial write will succeed, but still move to blocked
187-
assert!(!queue.try_flush_one(&mut descriptor));
205+
assert!(!queue.try_flush(&mut descriptor));
188206
assert!(queue.is_blocked());
189207
descriptor.assert_called_with(vec![(vec![1, 2], true), (vec![1, 2], true)]);
190208

191209
// Make room and signal which will succeed in writing the final piece
192210
descriptor.make_room(1);
193211
queue.unblock();
194-
assert!(queue.try_flush_one(&mut descriptor));
212+
assert!(queue.try_flush(&mut descriptor));
195213
assert!(!queue.is_blocked());
196214
descriptor.assert_called_with(vec![(vec![1, 2], true), (vec![1, 2], true), (vec![2], true)]);
197215
}
@@ -203,7 +221,7 @@ mod tests {
203221
let mut queue = OutboundQueue::new(1);
204222

205223
queue.push_back(vec![1]);
206-
assert!(queue.try_flush_one(&mut descriptor));
224+
assert!(queue.try_flush(&mut descriptor));
207225
descriptor.assert_called_with(vec![(vec![1], false)]);
208226
}
209227

@@ -216,9 +234,7 @@ mod tests {
216234
queue.push_back(vec![1]);
217235
queue.push_back(vec![2]);
218236
queue.push_back(vec![3]);
219-
assert!(queue.try_flush_one(&mut descriptor));
220-
assert!(queue.try_flush_one(&mut descriptor));
221-
assert!(queue.try_flush_one(&mut descriptor));
237+
assert!(queue.try_flush(&mut descriptor));
222238
descriptor.assert_called_with(vec![(vec![1], false), (vec![2], false), (vec![3], true)]);
223239
}
224240

@@ -232,7 +248,7 @@ mod tests {
232248
queue.push_back(vec![1]);
233249
assert!(!queue.is_empty());
234250

235-
assert!(queue.try_flush_one(&mut descriptor));
251+
assert!(queue.try_flush(&mut descriptor));
236252
assert!(queue.is_empty());
237253
}
238254

@@ -253,12 +269,8 @@ mod tests {
253269
queue.push_back(vec![2]);
254270
assert_eq!(queue.queue_space(), 0);
255271

256-
// at soft limit
257-
assert!(queue.try_flush_one(&mut descriptor));
258-
assert_eq!(queue.queue_space(), 0);
259-
260272
// below soft limt
261-
assert!(queue.try_flush_one(&mut descriptor));
273+
assert!(queue.try_flush(&mut descriptor));
262274
assert_eq!(queue.queue_space(), 1);
263275
}
264276
}

0 commit comments

Comments
 (0)