Skip to content

Commit 99d742e

Browse files
committed
implement pipe and pipe2
1 parent 78dfb8a commit 99d742e

File tree

6 files changed

+223
-31
lines changed

6 files changed

+223
-31
lines changed

src/tools/miri/src/shims/unix/foreign_items.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,14 +288,25 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
288288
this.write_scalar(result, dest)?;
289289
}
290290

291-
// Sockets
291+
// Unnamed sockets and pipes
292292
"socketpair" => {
293293
let [domain, type_, protocol, sv] =
294294
this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
295-
296295
let result = this.socketpair(domain, type_, protocol, sv)?;
297296
this.write_scalar(result, dest)?;
298297
}
298+
"pipe" => {
299+
let [pipefd] =
300+
this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
301+
let result = this.pipe2(pipefd, /*flags*/ None)?;
302+
this.write_scalar(result, dest)?;
303+
}
304+
"pipe2" => {
305+
let [pipefd, flags] =
306+
this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
307+
let result = this.pipe2(pipefd, Some(flags))?;
308+
this.write_scalar(result, dest)?;
309+
}
299310

300311
// Time
301312
"gettimeofday" => {

src/tools/miri/src/shims/unix/linux/epoll.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,10 @@ pub struct EpollEventInterest {
6262

6363
/// EpollReadyEvents reflects the readiness of a file description.
6464
pub struct EpollReadyEvents {
65-
/// The associated file is available for read(2) operations.
65+
/// The associated file is available for read(2) operations, in the sense that a read will not block.
66+
/// (I.e., returning EOF is considered "ready".)
6667
pub epollin: bool,
67-
/// The associated file is available for write(2) operations.
68+
/// The associated file is available for write(2) operations, in the sense that a write will not block.
6869
pub epollout: bool,
6970
/// Stream socket peer closed connection, or shut down writing
7071
/// half of connection.

src/tools/miri/src/shims/unix/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ mod env;
44
mod fd;
55
mod fs;
66
mod mem;
7-
mod socket;
87
mod sync;
98
mod thread;
9+
mod unnamed_socket;
1010

1111
mod android;
1212
mod freebsd;
@@ -23,9 +23,9 @@ pub use env::EvalContextExt as _;
2323
pub use fd::EvalContextExt as _;
2424
pub use fs::EvalContextExt as _;
2525
pub use mem::EvalContextExt as _;
26-
pub use socket::EvalContextExt as _;
2726
pub use sync::EvalContextExt as _;
2827
pub use thread::EvalContextExt as _;
28+
pub use unnamed_socket::EvalContextExt as _;
2929

3030
// Make up some constants.
3131
const UID: u32 = 1000;

src/tools/miri/src/shims/unix/socket.rs renamed to src/tools/miri/src/shims/unix/unnamed_socket.rs

Lines changed: 85 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
//! This implements "anonymous" sockets, that do not correspond to anything on the host system and
2+
//! are entirely implemented inside Miri.
3+
//! We also use the same infrastructure to implement unnamed pipes.
4+
15
use std::cell::{OnceCell, RefCell};
26
use std::collections::VecDeque;
37
use std::io;
@@ -16,8 +20,9 @@ const MAX_SOCKETPAIR_BUFFER_CAPACITY: usize = 212992;
1620
/// Pair of connected sockets.
1721
#[derive(Debug)]
1822
struct SocketPair {
19-
/// The buffer we are reading from.
20-
readbuf: RefCell<Buffer>,
23+
/// The buffer we are reading from, or `None` if this is the writing end of a pipe.
24+
/// (In that case, the peer FD will be the reading end of that pipe.)
25+
readbuf: Option<RefCell<Buffer>>,
2126
/// The `SocketPair` file descriptor that is our "peer", and that holds the buffer we are
2227
/// writing to. This is a weak reference because the other side may be closed before us; all
2328
/// future writes will then trigger EPIPE.
@@ -55,17 +60,25 @@ impl FileDescription for SocketPair {
5560
let mut epoll_ready_events = EpollReadyEvents::new();
5661

5762
// Check if it is readable.
58-
let readbuf = self.readbuf.borrow();
59-
if !readbuf.buf.is_empty() {
63+
if let Some(readbuf) = &self.readbuf {
64+
if !readbuf.borrow().buf.is_empty() {
65+
epoll_ready_events.epollin = true;
66+
}
67+
} else {
68+
// Without a read buffer, reading never blocks, so we are always ready.
6069
epoll_ready_events.epollin = true;
6170
}
6271

6372
// Check if is writable.
6473
if let Some(peer_fd) = self.peer_fd().upgrade() {
65-
let writebuf = &peer_fd.downcast::<SocketPair>().unwrap().readbuf.borrow();
66-
let data_size = writebuf.buf.len();
67-
let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size);
68-
if available_space != 0 {
74+
if let Some(writebuf) = &peer_fd.downcast::<SocketPair>().unwrap().readbuf {
75+
let data_size = writebuf.borrow().buf.len();
76+
let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size);
77+
if available_space != 0 {
78+
epoll_ready_events.epollout = true;
79+
}
80+
} else {
81+
// Without a write buffer, writing never blocks.
6982
epoll_ready_events.epollout = true;
7083
}
7184
} else {
@@ -108,7 +121,12 @@ impl FileDescription for SocketPair {
108121
return Ok(Ok(0));
109122
}
110123

111-
let mut readbuf = self.readbuf.borrow_mut();
124+
let Some(readbuf) = &self.readbuf else {
125+
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
126+
// corresponding ErrorKind variant.
127+
throw_unsup_format!("reading from the write end of a pipe");
128+
};
129+
let mut readbuf = readbuf.borrow_mut();
112130
if readbuf.buf.is_empty() {
113131
if self.peer_fd().upgrade().is_none() {
114132
// Socketpair with no peer and empty buffer.
@@ -176,7 +194,13 @@ impl FileDescription for SocketPair {
176194
// closed.
177195
return Ok(Err(Error::from(ErrorKind::BrokenPipe)));
178196
};
179-
let mut writebuf = peer_fd.downcast::<SocketPair>().unwrap().readbuf.borrow_mut();
197+
198+
let Some(writebuf) = &peer_fd.downcast::<SocketPair>().unwrap().readbuf else {
199+
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
200+
// corresponding ErrorKind variant.
201+
throw_unsup_format!("writing to the reading end of a pipe");
202+
};
203+
let mut writebuf = writebuf.borrow_mut();
180204
let data_size = writebuf.buf.len();
181205
let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size);
182206
if available_space == 0 {
@@ -227,12 +251,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
227251

228252
let mut is_sock_nonblock = false;
229253

230-
// Parse and remove the type flags that we support. If type != 0 after removing,
231-
// unsupported flags are used.
232-
if type_ & this.eval_libc_i32("SOCK_STREAM") == this.eval_libc_i32("SOCK_STREAM") {
233-
type_ &= !(this.eval_libc_i32("SOCK_STREAM"));
234-
}
235-
254+
// Parse and remove the type flags that we support.
236255
// SOCK_NONBLOCK only exists on Linux.
237256
if this.tcx.sess.target.os == "linux" {
238257
if type_ & this.eval_libc_i32("SOCK_NONBLOCK") == this.eval_libc_i32("SOCK_NONBLOCK") {
@@ -253,7 +272,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
253272
and AF_LOCAL are allowed",
254273
domain
255274
);
256-
} else if type_ != 0 {
275+
} else if type_ != this.eval_libc_i32("SOCK_STREAM") {
257276
throw_unsup_format!(
258277
"socketpair: type {:#x} is unsupported, only SOCK_STREAM, \
259278
SOCK_CLOEXEC and SOCK_NONBLOCK are allowed",
@@ -269,12 +288,12 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
269288
// Generate file descriptions.
270289
let fds = &mut this.machine.fds;
271290
let fd0 = fds.new_ref(SocketPair {
272-
readbuf: RefCell::new(Buffer::new()),
291+
readbuf: Some(RefCell::new(Buffer::new())),
273292
peer_fd: OnceCell::new(),
274293
is_nonblock: is_sock_nonblock,
275294
});
276295
let fd1 = fds.new_ref(SocketPair {
277-
readbuf: RefCell::new(Buffer::new()),
296+
readbuf: Some(RefCell::new(Buffer::new())),
278297
peer_fd: OnceCell::new(),
279298
is_nonblock: is_sock_nonblock,
280299
});
@@ -295,4 +314,51 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
295314

296315
Ok(Scalar::from_i32(0))
297316
}
317+
318+
fn pipe2(
319+
&mut self,
320+
pipefd: &OpTy<'tcx>,
321+
flags: Option<&OpTy<'tcx>>,
322+
) -> InterpResult<'tcx, Scalar> {
323+
let this = self.eval_context_mut();
324+
325+
let pipefd = this.deref_pointer(pipefd)?;
326+
let flags = match flags {
327+
Some(flags) => this.read_scalar(flags)?.to_i32()?,
328+
None => 0,
329+
};
330+
331+
// As usual we ignore CLOEXEC.
332+
let cloexec = this.eval_libc_i32("O_CLOEXEC");
333+
if flags != 0 && flags != cloexec {
334+
throw_unsup_format!("unsupported flags in `pipe2`");
335+
}
336+
337+
// Generate file descriptions.
338+
// pipefd[0] refers to the read end of the pipe.
339+
let fds = &mut this.machine.fds;
340+
let fd0 = fds.new_ref(SocketPair {
341+
readbuf: Some(RefCell::new(Buffer::new())),
342+
peer_fd: OnceCell::new(),
343+
is_nonblock: false,
344+
});
345+
let fd1 =
346+
fds.new_ref(SocketPair { readbuf: None, peer_fd: OnceCell::new(), is_nonblock: false });
347+
348+
// Make the file descriptions point to each other.
349+
fd0.downcast::<SocketPair>().unwrap().peer_fd.set(fd1.downgrade()).unwrap();
350+
fd1.downcast::<SocketPair>().unwrap().peer_fd.set(fd0.downgrade()).unwrap();
351+
352+
// Insert the file description to the fd table, generating the file descriptors.
353+
let pipefd0 = fds.insert(fd0);
354+
let pipefd1 = fds.insert(fd1);
355+
356+
// Return file descriptors to the caller.
357+
let pipefd0 = Scalar::from_int(pipefd0, pipefd.layout.size);
358+
let pipefd1 = Scalar::from_int(pipefd1, pipefd.layout.size);
359+
this.write_scalar(pipefd0, &pipefd)?;
360+
this.write_scalar(pipefd1, &pipefd.offset(pipefd.layout.size, pipefd.layout, this)?)?;
361+
362+
Ok(Scalar::from_i32(0))
363+
}
298364
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
//@ignore-target-windows: No libc pipe on Windows
2+
// test_race depends on a deterministic schedule.
3+
//@compile-flags: -Zmiri-preemption-rate=0
4+
use std::thread;
5+
fn main() {
6+
test_pipe();
7+
test_pipe_threaded();
8+
test_race();
9+
}
10+
11+
fn test_pipe() {
12+
let mut fds = [-1, -1];
13+
let mut res = unsafe { libc::pipe(fds.as_mut_ptr()) };
14+
assert_eq!(res, 0);
15+
16+
// Read size == data available in buffer.
17+
let data = "12345".as_bytes().as_ptr();
18+
res = unsafe { libc::write(fds[1], data as *const libc::c_void, 5).try_into().unwrap() };
19+
assert_eq!(res, 5);
20+
let mut buf3: [u8; 5] = [0; 5];
21+
res = unsafe {
22+
libc::read(fds[0], buf3.as_mut_ptr().cast(), buf3.len() as libc::size_t).try_into().unwrap()
23+
};
24+
assert_eq!(res, 5);
25+
assert_eq!(buf3, "12345".as_bytes());
26+
27+
// Read size > data available in buffer.
28+
let data = "123".as_bytes().as_ptr();
29+
res = unsafe { libc::write(fds[1], data as *const libc::c_void, 3).try_into().unwrap() };
30+
assert_eq!(res, 3);
31+
let mut buf4: [u8; 5] = [0; 5];
32+
res = unsafe {
33+
libc::read(fds[0], buf4.as_mut_ptr().cast(), buf4.len() as libc::size_t).try_into().unwrap()
34+
};
35+
assert_eq!(res, 3);
36+
assert_eq!(&buf4[0..3], "123".as_bytes());
37+
}
38+
39+
fn test_pipe_threaded() {
40+
let mut fds = [-1, -1];
41+
let mut res = unsafe { libc::pipe(fds.as_mut_ptr()) };
42+
assert_eq!(res, 0);
43+
44+
let thread1 = thread::spawn(move || {
45+
let mut buf: [u8; 5] = [0; 5];
46+
let res: i64 = unsafe {
47+
libc::read(fds[0], buf.as_mut_ptr().cast(), buf.len() as libc::size_t)
48+
.try_into()
49+
.unwrap()
50+
};
51+
assert_eq!(res, 5);
52+
assert_eq!(buf, "abcde".as_bytes());
53+
});
54+
// FIXME: we should yield here once blocking is implemented.
55+
//thread::yield_now();
56+
let data = "abcde".as_bytes().as_ptr();
57+
res = unsafe { libc::write(fds[1], data as *const libc::c_void, 5).try_into().unwrap() };
58+
assert_eq!(res, 5);
59+
thread1.join().unwrap();
60+
61+
// Read and write from different direction
62+
let thread2 = thread::spawn(move || {
63+
// FIXME: we should yield here once blocking is implemented.
64+
//thread::yield_now();
65+
let data = "12345".as_bytes().as_ptr();
66+
let res: i64 =
67+
unsafe { libc::write(fds[1], data as *const libc::c_void, 5).try_into().unwrap() };
68+
assert_eq!(res, 5);
69+
});
70+
// FIXME: we should not yield here once blocking is implemented.
71+
thread::yield_now();
72+
let mut buf: [u8; 5] = [0; 5];
73+
res = unsafe {
74+
libc::read(fds[0], buf.as_mut_ptr().cast(), buf.len() as libc::size_t).try_into().unwrap()
75+
};
76+
assert_eq!(res, 5);
77+
assert_eq!(buf, "12345".as_bytes());
78+
thread2.join().unwrap();
79+
}
80+
81+
fn test_race() {
82+
static mut VAL: u8 = 0;
83+
let mut fds = [-1, -1];
84+
let mut res = unsafe { libc::pipe(fds.as_mut_ptr()) };
85+
assert_eq!(res, 0);
86+
let thread1 = thread::spawn(move || {
87+
let mut buf: [u8; 1] = [0; 1];
88+
// write() from the main thread will occur before the read() here
89+
// because preemption is disabled and the main thread yields after write().
90+
let res: i32 = unsafe {
91+
libc::read(fds[0], buf.as_mut_ptr().cast(), buf.len() as libc::size_t)
92+
.try_into()
93+
.unwrap()
94+
};
95+
assert_eq!(res, 1);
96+
assert_eq!(buf, "a".as_bytes());
97+
// The read above establishes a happens-before so it is now safe to access this global variable.
98+
unsafe { assert_eq!(VAL, 1) };
99+
});
100+
unsafe { VAL = 1 };
101+
let data = "a".as_bytes().as_ptr();
102+
res = unsafe { libc::write(fds[1], data as *const libc::c_void, 1).try_into().unwrap() };
103+
assert_eq!(res, 1);
104+
thread::yield_now();
105+
thread1.join().unwrap();
106+
}

src/tools/miri/tests/pass-dep/libc/libc-socketpair.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,6 @@ fn test_socketpair_threaded() {
6666
unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
6767
assert_eq!(res, 0);
6868

69-
let data = "abcde".as_bytes().as_ptr();
70-
res = unsafe { libc::write(fds[0], data as *const libc::c_void, 5).try_into().unwrap() };
71-
assert_eq!(res, 5);
7269
let thread1 = thread::spawn(move || {
7370
let mut buf: [u8; 5] = [0; 5];
7471
let res: i64 = unsafe {
@@ -79,23 +76,33 @@ fn test_socketpair_threaded() {
7976
assert_eq!(res, 5);
8077
assert_eq!(buf, "abcde".as_bytes());
8178
});
79+
// FIXME: we should yield here once blocking is implemented.
80+
//thread::yield_now();
81+
let data = "abcde".as_bytes().as_ptr();
82+
res = unsafe { libc::write(fds[0], data as *const libc::c_void, 5).try_into().unwrap() };
83+
assert_eq!(res, 5);
8284
thread1.join().unwrap();
8385

8486
// Read and write from different direction
8587
let thread2 = thread::spawn(move || {
88+
// FIXME: we should yield here once blocking is implemented.
89+
//thread::yield_now();
8690
let data = "12345".as_bytes().as_ptr();
8791
let res: i64 =
88-
unsafe { libc::write(fds[0], data as *const libc::c_void, 5).try_into().unwrap() };
92+
unsafe { libc::write(fds[1], data as *const libc::c_void, 5).try_into().unwrap() };
8993
assert_eq!(res, 5);
9094
});
91-
thread2.join().unwrap();
95+
// FIXME: we should not yield here once blocking is implemented.
96+
thread::yield_now();
9297
let mut buf: [u8; 5] = [0; 5];
9398
res = unsafe {
94-
libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t).try_into().unwrap()
99+
libc::read(fds[0], buf.as_mut_ptr().cast(), buf.len() as libc::size_t).try_into().unwrap()
95100
};
96101
assert_eq!(res, 5);
97102
assert_eq!(buf, "12345".as_bytes());
103+
thread2.join().unwrap();
98104
}
105+
99106
fn test_race() {
100107
static mut VAL: u8 = 0;
101108
let mut fds = [-1, -1];
@@ -113,6 +120,7 @@ fn test_race() {
113120
};
114121
assert_eq!(res, 1);
115122
assert_eq!(buf, "a".as_bytes());
123+
// The read above establishes a happens-before so it is now safe to access this global variable.
116124
unsafe { assert_eq!(VAL, 1) };
117125
});
118126
unsafe { VAL = 1 };

0 commit comments

Comments
 (0)