Skip to content

Implement Unix domain sockets in libnative #11935

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/libnative/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,10 @@ impl rtio::IoFactory for IoFactory {
net::UdpSocket::bind(addr).map(|u| ~u as ~RtioUdpSocket)
}
fn unix_bind(&mut self, _path: &CString) -> IoResult<~RtioUnixListener> {
Err(unimpl())
net::UnixListener::bind(_path).map(|s| ~s as ~RtioUnixListener)
}
fn unix_connect(&mut self, _path: &CString) -> IoResult<~RtioPipe> {
Err(unimpl())
net::UnixStream::connect(_path, libc::SOCK_STREAM).map(|s| ~s as ~RtioPipe)
}
fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
hint: Option<ai::Hint>) -> IoResult<~[ai::Info]> {
Expand Down
355 changes: 355 additions & 0 deletions src/libnative/io/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::libc;
use std::mem;
use std::rt::rtio;
use std::unstable::intrinsics;
use std::c_str::CString;

use super::{IoResult, retry};
use super::file::keep_going;
Expand Down Expand Up @@ -88,6 +89,30 @@ fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) {
}
}

fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint)> {
// the sun_path length is limited to SUN_LEN (with null)
if addr.len() > libc::sun_len -1 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only use-case of sun_len, and it doesn't need to be a static:

let temp: libc::sockaddr_un = intrinsics::init();
let sun_len = temp.sun_path.len()

return Err(io::IoError {
kind: io::OtherIoError,
desc: "path must be smaller than SUN_LEN",
detail: None,
})
}
unsafe {
let storage: libc::sockaddr_storage = intrinsics::init();
let s: *mut libc::sockaddr_un = cast::transmute(&storage);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could transmute to &mut libc::sockaddr_un to make things "safer" below.

(*s).sun_family = libc::AF_UNIX as libc::sa_family_t;
let mut i = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't i at the end just addr.len() - 1 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably. I don't remember why I added that. There was probably some concern about null terminating sun_path, but it should be ok now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I should fix that function, since the structure size is false (some version of sockaddr_un have a sun_len field). Returning mem::size_of::libc::sockaddr_un() for the len will be better.

for c in addr.iter() {
(*s).sun_path[i] = c;
i += 1;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop could be expressed as

for (slot, byte) in s.sun_path.mut_iter().zip(addr.iter()) {
    *slot = byte;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, be sure to use 4-space tabs


let len = mem::size_of::<libc::sa_family_t>() + i + 1; //count the null terminator
return Ok((storage, len));
}
}

fn socket(addr: ip::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> {
unsafe {
let fam = match addr.ip {
Expand All @@ -101,6 +126,15 @@ fn socket(addr: ip::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> {
}
}

fn unix_socket(ty: libc::c_int) -> IoResult<sock_t> {
unsafe {
match libc::socket(libc::AF_UNIX, ty, 0) {
-1 => Err(super::last_error()),
fd => Ok(fd)
}
}
}

fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
payload: T) -> IoResult<()> {
unsafe {
Expand Down Expand Up @@ -176,6 +210,24 @@ pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
}
}

fn sockaddr_to_unix(storage: &libc::sockaddr_storage,
len: uint) -> IoResult<CString> {
match storage.ss_family as libc::c_int {
libc::AF_UNIX => {
assert!(len as uint <= mem::size_of::<libc::sockaddr_un>());
let storage: &libc::sockaddr_un = unsafe {
cast::transmute(storage)
};
unsafe {
Ok(CString::new(storage.sun_path.to_owned().as_ptr(), false))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a use-after-free because you're taking an unsafe pointer into an array which is immediately deallocated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't uderstantd. Isn't to_owned copying the data in a new buffer? Is there a better way? CString seems to only accept a raw vector. Maybe I should return something else than a CString.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine to return a CString but the data needs to be manually copied into a malloc'd buffer.

}
}
_ => {
Err(io::standard_error(io::OtherIoError))
}
}
}

#[cfg(unix)]
pub fn init() {}

Expand Down Expand Up @@ -595,3 +647,306 @@ impl rtio::RtioUdpSocket for UdpSocket {
impl Drop for UdpSocket {
fn drop(&mut self) { unsafe { close(self.fd) } }
}


#[cfg(not(windows))]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can get removed.

////////////////////////////////////////////////////////////////////////////////
// Unix
////////////////////////////////////////////////////////////////////////////////


////////////////////////////////////////////////////////////////////////////////
// Unix streams
////////////////////////////////////////////////////////////////////////////////

pub struct UnixStream {
priv fd: sock_t,
}

impl UnixStream {
pub fn connect(addr: &CString, ty: libc::c_int) -> IoResult<UnixStream> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ty parameter should always be libc::SOCK_STREAM, so let's not take it as a parameter.

unsafe {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you contain this unsafe block to just the relevant parts?

unix_socket(ty).and_then(|fd| {
match addr_to_sockaddr_un(addr) {
Err(e) => return Err(e),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case leaks fd

Ok((addr, len)) => {
let ret = UnixStream{ fd: fd };
let addrp = &addr as *libc::sockaddr_storage;
match retry(|| {
libc::connect(fd, addrp as *libc::sockaddr,
len as libc::socklen_t)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use 4-space tabs.

}) {
-1 => return Err(super::last_error()),
_ => return Ok(ret)
}
}
}
})
}
}

pub fn fd(&self) -> sock_t { self.fd }
}

impl rtio::RtioPipe for UnixStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
let ret = retry(|| {
unsafe {
libc::recv(self.fd,
buf.as_ptr() as *mut libc::c_void,
buf.len() as wrlen,
0) as libc::c_int
}
});
if ret == 0 {
Err(io::standard_error(io::EndOfFile))
} else if ret < 0 {
Err(super::last_error())
} else {
Ok(ret as uint)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you refactor this and the TcpStream's read/write methods into global helpers? Just something like sock_read and sock_write

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, this is a good idea

}
fn write(&mut self, buf: &[u8]) -> IoResult<()> {
let ret = keep_going(buf, |buf, len| {
unsafe {
libc::send(self.fd,
buf as *mut libc::c_void,
len as wrlen,
0) as i64
}
});
if ret < 0 {
Err(super::last_error())
} else {
Ok(())
}
}
}

impl Drop for UnixStream {
fn drop(&mut self) { unsafe { close(self.fd); } }
}

////////////////////////////////////////////////////////////////////////////////
// Unix Datagram
////////////////////////////////////////////////////////////////////////////////

pub struct UnixDatagram {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't tested by std::io::net::unix, so this will need tests in this module before we integrate this with the rest of green I/O.

priv fd: sock_t,
}

impl UnixDatagram {
pub fn connect(addr: &CString, ty: libc::c_int) -> IoResult<UnixDatagram> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, the ty parameter probably shouldn't exist, I believe it's always going to be libc::SOCK_DGRAM

unsafe {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As with above, let's contain the unsafety of this block to just where necessary.

unix_socket(ty).and_then(|fd| {
match addr_to_sockaddr_un(addr) {
Err(e) => return Err(e),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case also leaks fd

Ok((addr, len)) => {
let ret = UnixDatagram{ fd: fd };
let addrp = &addr as *libc::sockaddr_storage;
match retry(|| {
libc::connect(fd, addrp as *libc::sockaddr,
len as libc::socklen_t)
}) {
-1 => return Err(super::last_error()),
_ => return Ok(ret)
}
}
}
})
}
}

pub fn bind(addr: &CString) -> IoResult<UnixDatagram> {
unsafe {
unix_socket(libc::SOCK_DGRAM).and_then(|fd| {
match addr_to_sockaddr_un(addr) {
Err(e) => return Err(e),
Ok((addr, len)) => {
let ret = UnixDatagram{ fd: fd };
let addrp = &addr as *libc::sockaddr_storage;
match libc::bind(fd, addrp as *libc::sockaddr,
len as libc::socklen_t) {
-1 => return Err(super::last_error()),
_ => return Ok(ret)
}
}
}
})
}
}

pub fn fd(&self) -> sock_t { self.fd }
}

impl rtio::RtioPipe for UnixDatagram {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
let ret = retry(|| {
unsafe {
libc::recv(self.fd,
buf.as_ptr() as *mut libc::c_void,
buf.len() as wrlen,
0) as libc::c_int
}
});
if ret == 0 {
Err(io::standard_error(io::EndOfFile))
} else if ret < 0 {
Err(super::last_error())
} else {
Ok(ret as uint)
}
}
fn write(&mut self, buf: &[u8]) -> IoResult<()> {
let ret = keep_going(buf, |buf, len| {
unsafe {
libc::send(self.fd,
buf as *mut libc::c_void,
len as wrlen,
0) as i64
}
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think datagram sockets should keep going in the sense of sending many packets. I think for now this should mirror UDP in that it only sends one packet and returns an error if everything wasn't sent.

Additionally, it'd be nice to refactor the methods with less duplication between udp and unix datagram sockets.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second though, this should drop the RtioPipe implementation for a UnixDatagram

if ret < 0 {
Err(super::last_error())
} else {
Ok(())
}
}
}

impl rtio::RtioDatagramPipe for UnixDatagram {
fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, CString)> {
unsafe {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's contain the unsafety to just where necessary.

let mut storage: libc::sockaddr_storage = intrinsics::init();
let storagep = &mut storage as *mut libc::sockaddr_storage;
let mut addrlen: libc::socklen_t =
mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
let ret = retry(|| {
libc::recvfrom(self.fd,
buf.as_ptr() as *mut libc::c_void,
buf.len() as msglen_t,
0,
storagep as *mut libc::sockaddr,
&mut addrlen) as libc::c_int
});
if ret < 0 { return Err(super::last_error()) }
sockaddr_to_unix(&storage, addrlen as uint).and_then(|addr| {
Ok((ret as uint, addr))
})
}
}

fn sendto(&mut self, buf: &[u8], dst: &CString) -> IoResult<()> {
match addr_to_sockaddr_un(dst) {
Err(e) => Err(e),
Ok((dst, len)) => {
let dstp = &dst as *libc::sockaddr_storage;
unsafe {
let ret = retry(|| {
libc::sendto(self.fd,
buf.as_ptr() as *libc::c_void,
buf.len() as msglen_t,
0,
dstp as *libc::sockaddr,
len as libc::socklen_t) as libc::c_int
});
match ret {
-1 => Err(super::last_error()),
n if n as uint != buf.len() => {
Err(io::IoError {
kind: io::OtherIoError,
desc: "couldn't send entire packet at once",
detail: None,
})
}
_ => Ok(())
}
}
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two methods could have less duplication with the UDP methods.

}

impl Drop for UnixDatagram {
fn drop(&mut self) { unsafe { close(self.fd); } }
}
////////////////////////////////////////////////////////////////////////////////
// Unix Listener
////////////////////////////////////////////////////////////////////////////////

pub struct UnixListener {
priv fd: sock_t,
}

impl UnixListener {
pub fn bind(addr: &CString) -> IoResult<UnixListener> {
unsafe {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's contain this unsafe block.

unix_socket(libc::SOCK_STREAM).and_then(|fd| {
match addr_to_sockaddr_un(addr) {
Err(e) => return Err(e),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This leaks fd

Ok((addr, len)) => {
let ret = UnixListener{ fd: fd };
let addrp = &addr as *libc::sockaddr_storage;
match libc::bind(fd, addrp as *libc::sockaddr,
len as libc::socklen_t) {
-1 => return Err(super::last_error()),
_ => return Ok(ret)
}
}
}
})
}
}

pub fn fd(&self) -> sock_t { self.fd }

pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> {
match unsafe { libc::listen(self.fd, backlog as libc::c_int) } {
-1 => Err(super::last_error()),
_ => Ok(UnixAcceptor { listener: self })
}
}
}

impl rtio::RtioUnixListener for UnixListener {
fn listen(~self) -> IoResult<~rtio::RtioUnixAcceptor> {
self.native_listen(128).map(|a| ~a as ~rtio::RtioUnixAcceptor)
}
}

impl Drop for UnixListener {
fn drop(&mut self) { unsafe { close(self.fd); } }
}

pub struct UnixAcceptor {
priv listener: UnixListener,
}

impl UnixAcceptor {
pub fn fd(&self) -> sock_t { self.listener.fd }

pub fn native_accept(&mut self) -> IoResult<UnixStream> {
unsafe {
let mut storage: libc::sockaddr_storage = intrinsics::init();
let storagep = &mut storage as *mut libc::sockaddr_storage;
let size = mem::size_of::<libc::sockaddr_storage>();
let mut size = size as libc::socklen_t;
match retry(|| {
libc::accept(self.fd(),
storagep as *mut libc::sockaddr,
&mut size as *mut libc::socklen_t) as libc::c_int
}) as sock_t {
-1 => Err(super::last_error()),
fd => Ok(UnixStream { fd: fd })
}
}
}
}

impl rtio::RtioUnixAcceptor for UnixAcceptor {
fn accept(&mut self) -> IoResult<~rtio::RtioPipe> {
self.native_accept().map(|s| ~s as ~rtio::RtioPipe)
}
}


Loading