Skip to content

Drop dep tokio's io-util feat as it broke MSRV and isn't useful #2537

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ci/ci-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ PIN_RELEASE_DEPS # pin the release dependencies in our main workspace
# The proc-macro2 crate switched to Rust edition 2021 starting with v1.0.66, i.e., has MSRV of 1.56
[ "$RUSTC_MINOR_VERSION" -lt 56 ] && cargo update -p proc-macro2 --precise "1.0.65" --verbose

# The memchr crate switched to an MSRV of 1.60 starting with v2.6.0
[ "$RUSTC_MINOR_VERSION" -lt 60 ] && cargo update -p memchr --precise "2.5.0" --verbose

[ "$LDK_COVERAGE_BUILD" != "" ] && export RUSTFLAGS="-C link-dead-code"

export RUST_BACKTRACE=1
Expand Down
4 changes: 2 additions & 2 deletions lightning-net-tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ rustdoc-args = ["--cfg", "docsrs"]
[dependencies]
bitcoin = "0.29.0"
lightning = { version = "0.0.116", path = "../lightning" }
tokio = { version = "1.0", features = [ "io-util", "rt", "sync", "net", "time" ] }
tokio = { version = "1.0", features = [ "rt", "sync", "net", "time" ] }

[dev-dependencies]
tokio = { version = "1.14", features = [ "io-util", "macros", "rt", "rt-multi-thread", "sync", "net", "time" ] }
tokio = { version = "1.14", features = [ "macros", "rt", "rt-multi-thread", "sync", "net", "time" ] }
lightning = { version = "0.0.116", path = "../lightning", features = ["_test_utils"] }
75 changes: 35 additions & 40 deletions lightning-net-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@

use bitcoin::secp256k1::PublicKey;

use tokio::net::TcpStream;
use tokio::net::{tcp, TcpStream};
use tokio::{io, time};
use tokio::sync::mpsc;
use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::io::AsyncWrite;

use lightning::ln::peer_handler;
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
Expand All @@ -59,7 +59,7 @@ static ID_COUNTER: AtomicU64 = AtomicU64::new(0);
// define a trivial two- and three- select macro with the specific types we need and just use that.

pub(crate) enum SelectorOutput {
A(Option<()>), B(Option<()>), C(tokio::io::Result<usize>),
A(Option<()>), B(Option<()>), C(tokio::io::Result<()>),
}

pub(crate) struct TwoSelector<
Expand Down Expand Up @@ -87,15 +87,15 @@ impl<
}

pub(crate) struct ThreeSelector<
A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<usize>> + Unpin
A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<()>> + Unpin
> {
pub a: A,
pub b: B,
pub c: C,
}

impl<
A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<usize>> + Unpin
A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<()>> + Unpin
> Future for ThreeSelector<A, B, C> {
type Output = SelectorOutput;
fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<SelectorOutput> {
Expand All @@ -119,7 +119,7 @@ impl<
/// Connection object (in an Arc<Mutex<>>) in each SocketDescriptor we create as well as in the
/// read future (which is returned by schedule_read).
struct Connection {
writer: Option<io::WriteHalf<TcpStream>>,
writer: Option<Arc<TcpStream>>,
// Because our PeerManager is templated by user-provided types, and we can't (as far as I can
// tell) have a const RawWakerVTable built out of templated functions, we need some indirection
// between being woken up with write-ready and calling PeerManager::write_buffer_space_avail.
Expand Down Expand Up @@ -156,7 +156,7 @@ impl Connection {
async fn schedule_read<PM: Deref + 'static + Send + Sync + Clone>(
peer_manager: PM,
us: Arc<Mutex<Self>>,
mut reader: io::ReadHalf<TcpStream>,
reader: Arc<TcpStream>,
mut read_wake_receiver: mpsc::Receiver<()>,
mut write_avail_receiver: mpsc::Receiver<()>,
) where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
Expand Down Expand Up @@ -200,7 +200,7 @@ impl Connection {
ThreeSelector {
a: Box::pin(write_avail_receiver.recv()),
b: Box::pin(read_wake_receiver.recv()),
c: Box::pin(reader.read(&mut buf)),
c: Box::pin(reader.readable()),
}.await
};
match select_result {
Expand All @@ -211,8 +211,9 @@ impl Connection {
}
},
SelectorOutput::B(_) => {},
SelectorOutput::C(read) => {
match read {
SelectorOutput::C(res) => {
if res.is_err() { break Disconnect::PeerDisconnected; }
match reader.try_read(&mut buf) {
Ok(0) => break Disconnect::PeerDisconnected,
Ok(len) => {
let read_res = peer_manager.as_ref().read_event(&mut our_descriptor, &buf[0..len]);
Expand All @@ -226,7 +227,11 @@ impl Connection {
Err(_) => break Disconnect::CloseConnection,
}
},
Err(_) => break Disconnect::PeerDisconnected,
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// readable() is allowed to spuriously wake, so we have to handle
// WouldBlock here.
},
Err(e) => break Disconnect::PeerDisconnected,
}
},
}
Expand All @@ -239,18 +244,14 @@ impl Connection {
// here.
let _ = tokio::task::yield_now().await;
};
let writer_option = us.lock().unwrap().writer.take();
if let Some(mut writer) = writer_option {
// If the socket is already closed, shutdown() will fail, so just ignore it.
let _ = writer.shutdown().await;
}
us.lock().unwrap().writer.take();
if let Disconnect::PeerDisconnected = disconnect_type {
peer_manager.as_ref().socket_disconnected(&our_descriptor);
peer_manager.as_ref().process_events();
}
}

fn new(stream: StdTcpStream) -> (io::ReadHalf<TcpStream>, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc<Mutex<Self>>) {
fn new(stream: StdTcpStream) -> (Arc<TcpStream>, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc<Mutex<Self>>) {
// We only ever need a channel of depth 1 here: if we returned a non-full write to the
// PeerManager, we will eventually get notified that there is room in the socket to write
// new bytes, which will generate an event. That event will be popped off the queue before
Expand All @@ -262,11 +263,11 @@ impl Connection {
// false.
let (read_waker, read_receiver) = mpsc::channel(1);
stream.set_nonblocking(true).unwrap();
let (reader, writer) = io::split(TcpStream::from_std(stream).unwrap());
let tokio_stream = Arc::new(TcpStream::from_std(stream).unwrap());

(reader, write_receiver, read_receiver,
(Arc::clone(&tokio_stream), write_receiver, read_receiver,
Arc::new(Mutex::new(Self {
writer: Some(writer), write_avail, read_waker, read_paused: false,
writer: Some(tokio_stream), write_avail, read_waker, read_paused: false,
rl_requested_disconnect: false,
id: ID_COUNTER.fetch_add(1, Ordering::AcqRel)
})))
Expand Down Expand Up @@ -462,9 +463,9 @@ impl SocketDescriptor {
}
impl peer_handler::SocketDescriptor for SocketDescriptor {
fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize {
// To send data, we take a lock on our Connection to access the WriteHalf of the TcpStream,
// writing to it if there's room in the kernel buffer, or otherwise create a new Waker with
// a SocketDescriptor in it which can wake up the write_avail Sender, waking up the
// To send data, we take a lock on our Connection to access the TcpStream, writing to it if
// there's room in the kernel buffer, or otherwise create a new Waker with a
// SocketDescriptor in it which can wake up the write_avail Sender, waking up the
// processing future which will call write_buffer_space_avail and we'll end up back here.
let mut us = self.conn.lock().unwrap();
if us.writer.is_none() {
Expand All @@ -484,24 +485,18 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
let mut ctx = task::Context::from_waker(&waker);
let mut written_len = 0;
loop {
match std::pin::Pin::new(us.writer.as_mut().unwrap()).poll_write(&mut ctx, &data[written_len..]) {
task::Poll::Ready(Ok(res)) => {
// The tokio docs *seem* to indicate this can't happen, and I certainly don't
// know how to handle it if it does (cause it should be a Poll::Pending
// instead):
assert_ne!(res, 0);
written_len += res;
if written_len == data.len() { return written_len; }
},
task::Poll::Ready(Err(e)) => {
// The tokio docs *seem* to indicate this can't happen, and I certainly don't
// know how to handle it if it does (cause it should be a Poll::Pending
// instead):
assert_ne!(e.kind(), io::ErrorKind::WouldBlock);
// Probably we've already been closed, just return what we have and let the
// read thread handle closing logic.
return written_len;
match us.writer.as_ref().unwrap().poll_write_ready(&mut ctx) {
task::Poll::Ready(Ok(())) => {
match us.writer.as_ref().unwrap().try_write(&data[written_len..]) {
Ok(res) => {
debug_assert_ne!(res, 0);
written_len += res;
if written_len == data.len() { return written_len; }
},
Err(e) => return written_len,
}
},
task::Poll::Ready(Err(e)) => return written_len,
task::Poll::Pending => {
// We're queued up for a write event now, but we need to make sure we also
// pause read given we're now waiting on the remote end to ACK (and in
Expand Down