Skip to content

Commit af9bd99

Browse files
author
Gleb Pomykalov
committed
Address several change requests
1 parent b5e28b7 commit af9bd99

File tree

2 files changed

+89
-50
lines changed

2 files changed

+89
-50
lines changed

src/sys/socket/mod.rs

Lines changed: 47 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -783,7 +783,7 @@ pub fn sendmsg(fd: RawFd, iov: &[IoVec<&[u8]>], cmsgs: &[ControlMessage],
783783
// because subsequent code will not clear the padding bytes.
784784
let mut cmsg_buffer = vec![0u8; capacity];
785785

786-
unsafe { send_pack_mhdr(mhdr.as_mut_ptr(), &mut cmsg_buffer[..], &iov, &cmsgs, addr) };
786+
pack_mhdr(mhdr.as_mut_ptr(), &mut cmsg_buffer[..], &iov, &cmsgs, addr);
787787

788788
let mhdr = unsafe { mhdr.assume_init() };
789789

@@ -805,9 +805,9 @@ pub struct SendMmsgData<'a, I, C>
805805
pub _lt: std::marker::PhantomData<&'a I>,
806806
}
807807

808-
/// An extension of `sendmsg`` that allows the caller to transmit multiple
808+
/// An extension of [`sendmsg`] that allows the caller to transmit multiple
809809
/// messages on a socket using a single system call. (This has performance
810-
/// benefits for some applications.). Supported on Linux and FreeBSD
810+
/// benefits for some applications.).
811811
///
812812
/// Allocations are performed for cmsgs and to build `msghdr` buffer
813813
///
@@ -817,11 +817,23 @@ pub struct SendMmsgData<'a, I, C>
817817
/// * `data`: Struct that implements `IntoIterator` with `SendMmsgData` items
818818
/// * `flags`: Optional flags passed directly to the operating system.
819819
///
820-
/// # References
821-
/// [sendmmsg(2)](http://man7.org/linux/man-pages/man2/sendmmsg.2.html)
822-
#[cfg(any(target_os = "linux", target_os = "freebsd"))]
823-
pub fn sendmmsg<'a, I, C>(fd: RawFd, data: impl std::iter::IntoIterator<Item=&'a SendMmsgData<'a, I, C>>, flags: MsgFlags)
824-
-> Result<(usize, Vec<usize>)>
820+
/// Returns tuple, where the first value is number of sent messages, and the second one
821+
/// it a `Vec` with numbers of bytes sent on each appropriate message.
822+
///
823+
///# References
824+
/// [`sendmmsg`](fn.sendmsg.html)
825+
#[cfg(any(
826+
target_os = "linux",
827+
target_os = "android",
828+
target_os = "freebsd",
829+
target_os = "openbsd",
830+
target_os = "netbsd",
831+
))]
832+
pub fn sendmmsg<'a, I, C>(
833+
fd: RawFd,
834+
data: impl std::iter::IntoIterator<Item=&'a SendMmsgData<'a, I, C>>,
835+
flags: MsgFlags
836+
) -> Result<(usize, Vec<usize>)>
825837
where
826838
I: AsRef<[IoVec<&'a [u8]>]> + 'a,
827839
C: AsRef<[ControlMessage<'a>]> + 'a,
@@ -848,7 +860,7 @@ pub fn sendmmsg<'a, I, C>(fd: RawFd, data: impl std::iter::IntoIterator<Item=&'a
848860
cmsgs_buffer.resize(cmsgs_buffer_need_capacity, 0);
849861

850862
unsafe {
851-
send_pack_mhdr(
863+
pack_mhdr(
852864
&mut (*element.as_mut_ptr()).msg_hdr,
853865
&mut cmsgs_buffer[cmsgs_start..],
854866
&d.iov,
@@ -884,7 +896,7 @@ pub struct RecvMmsgData<'a, I>
884896
pub cmsg_buffer: Option<&'a mut Vec<u8>>,
885897
}
886898

887-
/// An extension of recvmsg(2) that allows the caller to receive multiple
899+
/// An extension of [`recvmsg`] that allows the caller to receive multiple
888900
/// messages from a socket using a single system call. (This has
889901
/// performance benefits for some applications.)
890902
///
@@ -905,8 +917,13 @@ pub struct RecvMmsgData<'a, I>
905917
/// [`cmsg_space!`](macro.cmsg_space.html)
906918
///
907919
/// # References
908-
/// [recvmmsg(2)](http://man7.org/linux/man-pages/man2/recvmmsg.2.html)
909-
#[cfg(any(target_os = "linux", target_os = "freebsd"))]
920+
/// [`recvmsg`](fn.recvmsg.html)
921+
#[cfg(any(
922+
target_os = "linux",
923+
target_os = "android",
924+
target_os = "freebsd",
925+
target_os = "netbsd",
926+
))]
910927
pub fn recvmmsg<'a, I>(
911928
fd: RawFd,
912929
data: impl std::iter::IntoIterator<Item=&'a mut RecvMmsgData<'a, I>>,
@@ -1034,7 +1051,7 @@ unsafe fn recv_pack_mhdr<'a, I>(
10341051
}
10351052

10361053

1037-
unsafe fn send_pack_mhdr<'a, I, C>(
1054+
fn pack_mhdr<'a, I, C>(
10381055
out: *mut msghdr,
10391056
cmsg_buffer: &mut [u8],
10401057
iov: I,
@@ -1050,7 +1067,7 @@ unsafe fn send_pack_mhdr<'a, I, C>(
10501067
// Next encode the sending address, if provided
10511068
let (name, namelen) = match addr {
10521069
Some(addr) => {
1053-
let (x, y) = addr.as_ffi_pair();
1070+
let (x, y) = unsafe { addr.as_ffi_pair() };
10541071
(x as *const _, y)
10551072
},
10561073
None => (ptr::null(), 0),
@@ -1065,27 +1082,31 @@ unsafe fn send_pack_mhdr<'a, I, C>(
10651082

10661083
// Musl's msghdr has private fields, so this is the only way to
10671084
// initialize it.
1068-
(*out).msg_name = name as *mut _;
1069-
(*out).msg_namelen = namelen;
1070-
// transmute iov into a mutable pointer. sendmsg doesn't really mutate
1071-
// the buffer, but the standard says that it takes a mutable pointer
1072-
(*out).msg_iov = iov.as_ref().as_ptr() as *mut _;
1073-
(*out).msg_iovlen = iov.as_ref().len() as _;
1074-
(*out).msg_control = cmsg_ptr;
1075-
(*out).msg_controllen = cmsg_capacity as _;
1076-
(*out).msg_flags = 0;
1085+
{
1086+
let mut hdr = unsafe { &mut *out };
1087+
1088+
hdr.msg_name = name as *mut _;
1089+
hdr.msg_namelen = namelen;
1090+
// transmute iov into a mutable pointer. sendmsg doesn't really mutate
1091+
// the buffer, but the standard says that it takes a mutable pointer
1092+
hdr.msg_iov = iov.as_ref().as_ptr() as *mut _;
1093+
hdr.msg_iovlen = iov.as_ref().len() as _;
1094+
hdr.msg_control = cmsg_ptr;
1095+
hdr.msg_controllen = cmsg_capacity as _;
1096+
hdr.msg_flags = 0;
1097+
}
10771098

10781099
// Encode each cmsg. This must happen after initializing the header because
10791100
// CMSG_NEXT_HDR and friends read the msg_control and msg_controllen fields.
10801101
// CMSG_FIRSTHDR is always safe
1081-
let mut pmhdr: *mut cmsghdr = CMSG_FIRSTHDR(out);
1102+
let mut pmhdr: *mut cmsghdr = unsafe { CMSG_FIRSTHDR(out) };
10821103
for cmsg in cmsgs.as_ref() {
10831104
assert_ne!(pmhdr, ptr::null_mut());
10841105
// Safe because we know that pmhdr is valid, and we initialized it with
10851106
// sufficient space
1086-
cmsg.encode_into(pmhdr);
1107+
unsafe { cmsg.encode_into(pmhdr); }
10871108
// Safe because mhdr is valid
1088-
pmhdr = CMSG_NXTHDR(out, pmhdr);
1109+
pmhdr = unsafe { CMSG_NXTHDR(out, pmhdr) };
10891110
}
10901111
}
10911112

test/sys/test_socket.rs

Lines changed: 42 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,13 @@ mod recvfrom {
227227
assert_eq!(AddressFamily::Inet, from.unwrap().family());
228228
}
229229

230-
#[cfg(any(target_os = "linux", target_os = "freebsd"))]
230+
#[cfg(any(
231+
target_os = "linux",
232+
target_os = "android",
233+
target_os = "freebsd",
234+
target_os = "openbsd",
235+
target_os = "netbsd",
236+
))]
231237
#[test]
232238
pub fn udp_sendmmsg() {
233239
use nix::sys::uio::IoVec;
@@ -262,7 +268,10 @@ mod recvfrom {
262268
addr: Some(sock_addr),
263269
_lt: Default::default(),
264270
});
265-
for _ in 0..15 {
271+
272+
let batch_size = 15;
273+
274+
for _ in 0..batch_size {
266275
msgs.push(
267276
SendMmsgData {
268277
iov: &iov,
@@ -286,12 +295,20 @@ mod recvfrom {
286295
assert_eq!(AddressFamily::Inet, from.unwrap().family());
287296
}
288297

289-
#[cfg(any(target_os = "linux", target_os = "freebsd"))]
298+
#[cfg(any(
299+
target_os = "linux",
300+
target_os = "android",
301+
target_os = "freebsd",
302+
target_os = "netbsd",
303+
))]
290304
#[test]
291305
pub fn udp_recvmmsg() {
292306
use nix::sys::uio::IoVec;
293307
use nix::sys::socket::{MsgFlags, recvmmsg};
294308

309+
const NUM_MESSAGES_SENT: usize = 2;
310+
const DATA: [u8; 2] = [1,2];
311+
295312
let std_sa = SocketAddr::from_str("127.0.0.1:6798").unwrap();
296313
let inet_addr = InetAddr::from_std(&std_sa);
297314
let sock_addr = SockAddr::new_inet(inet_addr);
@@ -309,38 +326,39 @@ mod recvfrom {
309326
None,
310327
).expect("send socket failed");
311328

329+
312330
let send_thread = thread::spawn(move || {
313-
for _ in 0..2 {
314-
sendto(ssock, &b"12"[..], &sock_addr, MsgFlags::empty()).unwrap();
331+
for _ in 0..NUM_MESSAGES_SENT {
332+
sendto(ssock, &DATA[..], &sock_addr, MsgFlags::empty()).unwrap();
315333
}
316334
});
317335

318336
let mut msgs = std::collections::LinkedList::new();
319-
let mut rec_buf1 = [0u8; 32];
320-
let mut rec_buf2 = [0u8; 32];
321-
let iov1 = [IoVec::from_mut_slice(&mut rec_buf1[..])];
322-
let iov2 = [IoVec::from_mut_slice(&mut rec_buf2[..])];
323-
msgs.push_back(
324-
RecvMmsgData {
325-
iov: &iov1,
326-
cmsg_buffer: None,
327-
}
328-
);
329-
msgs.push_back(
330-
RecvMmsgData {
331-
iov: &iov2,
337+
338+
// Buffers to receive exactly `NUM_MESSAGES_SENT` messages
339+
let mut receive_buffers = [[0u8; 32]; NUM_MESSAGES_SENT];
340+
let iovs: Vec<_> = receive_buffers.iter_mut().map(|buf| {
341+
[IoVec::from_mut_slice(&mut buf[..])]
342+
}).collect();
343+
344+
for iov in &iovs {
345+
msgs.push_back(RecvMmsgData {
346+
iov: iov,
332347
cmsg_buffer: None,
333-
}
334-
);
348+
})
349+
};
350+
335351
let res = recvmmsg(rsock, &mut msgs, MsgFlags::empty(), None).expect("recvmmsg");
336-
assert_eq!(res.len(), 2);
352+
assert_eq!(res.len(), DATA.len());
337353

338354
for RecvMsg { address, bytes, .. } in res.into_iter() {
339355
assert_eq!(AddressFamily::Inet, address.unwrap().family());
340-
assert_eq!(2, bytes);
356+
assert_eq!(DATA.len(), bytes);
357+
}
358+
359+
for buf in &receive_buffers {
360+
assert_eq!(&buf[..DATA.len()], DATA);
341361
}
342-
assert_eq!(&rec_buf1[..2], b"12");
343-
assert_eq!(&rec_buf2[..2], b"12");
344362

345363
send_thread.join().unwrap();
346364
}

0 commit comments

Comments
 (0)