Skip to content

Commit af3cf3d

Browse files
committed
Change OutboundQueue flush api so that it flushes entire contents
The do_attempt_write_data() function would previously only flush one item at a time. This would cause non-optimal batching in cases where the queue was full and the RoutingMessageHandler was only asked for a single item. Instead, change the API to flush the entire contents of the queue.
1 parent fb3dcf0 commit af3cf3d

File tree

3 files changed

+61
-49
lines changed

3 files changed

+61
-49
lines changed

lightning/src/ln/peers/handler.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,10 @@ pub(super) trait IOutboundQueue {
104104
// | _| | | |_| \__ \ | | | | | | | __/ |_| | | | (_) | (_| \__ \
105105
// |_| |_|\__,_|___/_| |_| |_| |_|\___|\__|_| |_|\___/ \__,_|___/
106106

107-
/// Write previously enqueued data to the SocketDescriptor. A return of false indicates the
108-
/// underlying SocketDescriptor could not fulfill the send_data() call and the blocked state
109-
/// has been set. Use unblock() when the SocketDescriptor may have more room.
110-
fn try_flush_one(&mut self, descriptor: &mut impl SocketDescriptor) -> bool;
107+
/// Attempts to write all previously enqueued data to the SocketDescriptor. A return of false
108+
/// indicates the underlying SocketDescriptor could not fulfill the send_data() calls and the
109+
/// blocked state has been set. Use unblock() when the SocketDescriptor may have more room.
110+
fn try_flush(&mut self, descriptor: &mut impl SocketDescriptor) -> bool;
111111

112112
/// Clear the blocked state caused when a previous write failed
113113
fn unblock(&mut self);
@@ -678,7 +678,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
678678
break;
679679
}
680680

681-
outbound_queue.try_flush_one(descriptor);
681+
outbound_queue.try_flush(descriptor);
682682
}
683683
}
684684

lightning/src/ln/peers/outbound_queue.rs

Lines changed: 55 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -46,32 +46,37 @@ impl IOutboundQueue for OutboundQueue {
4646
self.soft_limit - cmp::min(self.soft_limit, self.buffer.len())
4747
}
4848

49-
fn try_flush_one(&mut self, descriptor: &mut impl SocketDescriptor) -> bool {
49+
fn try_flush(&mut self, descriptor: &mut impl SocketDescriptor) -> bool {
5050
// Exit early if a previous full write failed and haven't heard that there may be more
5151
// room available
5252
if self.blocked {
5353
return false;
5454
}
5555

56-
let full_write_succeeded = match self.buffer.front() {
57-
None => true,
58-
Some(next_buff) => {
59-
let should_be_reading = self.buffer.len() < self.soft_limit;
60-
let pending = &next_buff[self.buffer_first_msg_offset..];
61-
let data_sent = descriptor.send_data(pending, should_be_reading);
62-
self.buffer_first_msg_offset += data_sent;
63-
self.buffer_first_msg_offset == next_buff.len()
56+
loop {
57+
if self.buffer.is_empty() {
58+
return true;
6459
}
65-
};
6660

67-
if full_write_succeeded {
68-
self.buffer_first_msg_offset = 0;
69-
self.buffer.pop_front();
70-
} else {
71-
self.blocked = true;
61+
let full_write_succeeded = match self.buffer.front() {
62+
None => true,
63+
Some(next_buff) => {
64+
let should_be_reading = self.buffer.len() < self.soft_limit;
65+
let pending = &next_buff[self.buffer_first_msg_offset..];
66+
let data_sent = descriptor.send_data(pending, should_be_reading);
67+
self.buffer_first_msg_offset += data_sent;
68+
self.buffer_first_msg_offset == next_buff.len()
69+
}
70+
};
71+
72+
if full_write_succeeded {
73+
self.buffer_first_msg_offset = 0;
74+
self.buffer.pop_front();
75+
} else {
76+
self.blocked = true;
77+
return false;
78+
}
7279
}
73-
74-
full_write_succeeded
7580
}
7681

7782
fn unblock(&mut self) {
@@ -88,40 +93,40 @@ mod tests {
8893
use super::*;
8994
use ln::peers::test_util::*;
9095

91-
// Test that a try_flush_one() call with no queued data doesn't write anything
96+
// Test that a try_flush() call with no queued data doesn't write anything
9297
#[test]
9398
fn empty_does_not_write() {
9499
let mut descriptor = SocketDescriptorMock::new();
95100
let mut empty = OutboundQueue::new(10);
96101

97-
assert!(empty.try_flush_one(&mut descriptor));
102+
assert!(empty.try_flush(&mut descriptor));
98103
descriptor.assert_called_with(vec![]);
99104

100105
}
101106

102-
// Test that try_flush_one() sends the push_back
107+
// Test that try_flush() sends the push_back
103108
#[test]
104109
fn push_back_drain() {
105110
let mut descriptor = SocketDescriptorMock::new();
106111
let mut queue = OutboundQueue::new(10);
107112

108113
queue.push_back(vec![1]);
109-
assert!(queue.try_flush_one(&mut descriptor));
114+
assert!(queue.try_flush(&mut descriptor));
110115

111116
descriptor.assert_called_with(vec![(vec![1], true)]);
112117
}
113118

114-
// Test that try_flush_one() sends just first push_back
119+
// Test that try_flush() sends all
115120
#[test]
116121
fn push_back_push_back_drain_drain() {
117122
let mut descriptor = SocketDescriptorMock::new();
118123
let mut queue = OutboundQueue::new(10);
119124

120125
queue.push_back(vec![1]);
121126
queue.push_back(vec![2]);
122-
assert!(queue.try_flush_one(&mut descriptor));
127+
assert!(queue.try_flush(&mut descriptor));
123128

124-
descriptor.assert_called_with(vec![(vec![1], true)]);
129+
descriptor.assert_called_with(vec![(vec![1], true), (vec![2], true)]);
125130
}
126131

127132
// Test that descriptor that can't write all bytes returns valid response
@@ -131,27 +136,40 @@ mod tests {
131136
let mut queue = OutboundQueue::new(10);
132137

133138
queue.push_back(vec![1, 2, 3]);
134-
assert!(!queue.try_flush_one(&mut descriptor));
139+
assert!(!queue.try_flush(&mut descriptor));
135140

136141
descriptor.assert_called_with(vec![(vec![1, 2, 3], true)]);
137142
}
138143

144+
// Test that descriptor that can't write all bytes (in second pushed item) returns valid response
145+
#[test]
146+
fn push_back_drain_partial_multiple_push() {
147+
let mut descriptor = SocketDescriptorMock::with_fixed_size(2);
148+
let mut queue = OutboundQueue::new(10);
149+
150+
queue.push_back(vec![1]);
151+
queue.push_back(vec![2, 3]);
152+
assert!(!queue.try_flush(&mut descriptor));
153+
154+
descriptor.assert_called_with(vec![(vec![1], true), (vec![2, 3], true)]);
155+
}
156+
139157
// Test the bookkeeping for multiple partial writes
140158
#[test]
141-
fn push_back_drain_partial_drain_partial_try_flush_one() {
159+
fn push_back_drain_partial_drain_partial_try_flush() {
142160
let mut descriptor = SocketDescriptorMock::with_fixed_size(1);
143161
let mut queue = OutboundQueue::new(10);
144162

145163
queue.push_back(vec![1, 2, 3]);
146-
assert!(!queue.try_flush_one(&mut descriptor));
164+
assert!(!queue.try_flush(&mut descriptor));
147165

148166
descriptor.make_room(1);
149167
queue.unblock();
150-
assert!(!queue.try_flush_one(&mut descriptor));
168+
assert!(!queue.try_flush(&mut descriptor));
151169

152170
descriptor.make_room(1);
153171
queue.unblock();
154-
assert!(queue.try_flush_one(&mut descriptor));
172+
assert!(queue.try_flush(&mut descriptor));
155173

156174
descriptor.assert_called_with(vec![(vec![1, 2, 3], true), (vec![2, 3], true), (vec![3], true)]);
157175
}
@@ -163,27 +181,27 @@ mod tests {
163181

164182
// Fail write and move to blocked state
165183
queue.push_back(vec![1, 2]);
166-
assert!(!queue.try_flush_one(&mut descriptor));
184+
assert!(!queue.try_flush(&mut descriptor));
167185
descriptor.assert_called_with(vec![(vec![1, 2], true)]);
168186

169187
// Make room but don't signal
170188
descriptor.make_room(1);
171-
assert!(!queue.try_flush_one(&mut descriptor));
189+
assert!(!queue.try_flush(&mut descriptor));
172190
assert!(queue.is_blocked());
173191
descriptor.assert_called_with(vec![(vec![1, 2], true)]);
174192

175193
// Unblock and try again
176194
queue.unblock();
177195

178196
// Partial write will succeed, but still move to blocked
179-
assert!(!queue.try_flush_one(&mut descriptor));
197+
assert!(!queue.try_flush(&mut descriptor));
180198
assert!(queue.is_blocked());
181199
descriptor.assert_called_with(vec![(vec![1, 2], true), (vec![1, 2], true)]);
182200

183201
// Make room and signal which will succeed in writing the final piece
184202
descriptor.make_room(1);
185203
queue.unblock();
186-
assert!(queue.try_flush_one(&mut descriptor));
204+
assert!(queue.try_flush(&mut descriptor));
187205
assert!(!queue.is_blocked());
188206
descriptor.assert_called_with(vec![(vec![1, 2], true), (vec![1, 2], true), (vec![2], true)]);
189207
}
@@ -195,7 +213,7 @@ mod tests {
195213
let mut queue = OutboundQueue::new(1);
196214

197215
queue.push_back(vec![1]);
198-
assert!(queue.try_flush_one(&mut descriptor));
216+
assert!(queue.try_flush(&mut descriptor));
199217
descriptor.assert_called_with(vec![(vec![1], false)]);
200218
}
201219

@@ -208,9 +226,7 @@ mod tests {
208226
queue.push_back(vec![1]);
209227
queue.push_back(vec![2]);
210228
queue.push_back(vec![3]);
211-
assert!(queue.try_flush_one(&mut descriptor));
212-
assert!(queue.try_flush_one(&mut descriptor));
213-
assert!(queue.try_flush_one(&mut descriptor));
229+
assert!(queue.try_flush(&mut descriptor));
214230
descriptor.assert_called_with(vec![(vec![1], false), (vec![2], false), (vec![3], true)]);
215231
}
216232

@@ -224,7 +240,7 @@ mod tests {
224240
queue.push_back(vec![1]);
225241
assert!(!queue.is_empty());
226242

227-
assert!(queue.try_flush_one(&mut descriptor));
243+
assert!(queue.try_flush(&mut descriptor));
228244
assert!(queue.is_empty());
229245
}
230246

@@ -245,12 +261,8 @@ mod tests {
245261
queue.push_back(vec![2]);
246262
assert_eq!(queue.queue_space(), 0);
247263

248-
// at soft limit
249-
assert!(queue.try_flush_one(&mut descriptor));
250-
assert_eq!(queue.queue_space(), 0);
251-
252264
// below soft limt
253-
assert!(queue.try_flush_one(&mut descriptor));
265+
assert!(queue.try_flush(&mut descriptor));
254266
assert_eq!(queue.queue_space(), 1);
255267
}
256268
}

lightning/src/ln/peers/test_util.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ impl IOutboundQueue for Vec<Vec<u8>> {
245245
unimplemented!()
246246
}
247247

248-
fn try_flush_one(&mut self, _descriptor: &mut impl SocketDescriptor) -> bool {
248+
fn try_flush(&mut self, _descriptor: &mut impl SocketDescriptor) -> bool {
249249
unimplemented!()
250250
}
251251

0 commit comments

Comments
 (0)