@@ -81,7 +81,7 @@ use std::os::windows::io::{AsSocket, BorrowedSocket, OwnedSocket};
81
81
use futures_lite:: io:: { AsyncRead , AsyncWrite } ;
82
82
use futures_lite:: stream:: { self , Stream } ;
83
83
use futures_lite:: { future, pin, ready} ;
84
- use socket2 :: { Domain , Protocol , SockAddr , Socket , Type } ;
84
+ use rustix :: net :: { AddressFamily , Protocol , SocketAddrAny , SocketFlags , SocketType } ;
85
85
86
86
use crate :: reactor:: { Reactor , Source } ;
87
87
@@ -636,23 +636,7 @@ impl<T: AsRawFd> Async<T> {
636
636
// depend on Rust >= 1.63, where `AsFd` is stabilized, and when
637
637
// `TimerFd` implements it, we can remove this unsafe and simplify this.
638
638
let fd = unsafe { rustix:: fd:: BorrowedFd :: borrow_raw ( raw) } ;
639
- cfg_if:: cfg_if! {
640
- // ioctl(FIONBIO) sets the flag atomically, but we use this only on Linux
641
- // for now, as with the standard library, because it seems to behave
642
- // differently depending on the platform.
643
- // https://github.com/rust-lang/rust/commit/efeb42be2837842d1beb47b51bb693c7474aba3d
644
- // https://github.com/libuv/libuv/blob/e9d91fccfc3e5ff772d5da90e1c4a24061198ca0/src/unix/poll.c#L78-L80
645
- // https://github.com/tokio-rs/mio/commit/0db49f6d5caf54b12176821363d154384357e70a
646
- if #[ cfg( target_os = "linux" ) ] {
647
- rustix:: io:: ioctl_fionbio( fd, true ) ?;
648
- } else {
649
- let previous = rustix:: fs:: fcntl_getfl( fd) ?;
650
- let new = previous | rustix:: fs:: OFlags :: NONBLOCK ;
651
- if new != previous {
652
- rustix:: fs:: fcntl_setfl( fd, new) ?;
653
- }
654
- }
655
- }
639
+ set_nonblocking ( fd) ?;
656
640
657
641
Ok ( Async {
658
642
source : Reactor :: get ( ) . insert_io ( raw) ?,
@@ -728,7 +712,7 @@ impl<T: AsRawSocket> Async<T> {
728
712
// Safety: We assume `as_raw_socket()` returns a valid fd. When we can
729
713
// depend on Rust >= 1.63, where `AsFd` is stabilized, and when
730
714
// `TimerFd` implements it, we can remove this unsafe and simplify this.
731
- rustix :: io :: ioctl_fionbio ( borrowed , true ) ?;
715
+ set_nonblocking ( fd ) ?;
732
716
733
717
Ok ( Async {
734
718
source : Reactor :: get ( ) . insert_io ( sock) ?,
@@ -1363,10 +1347,26 @@ impl Async<TcpStream> {
1363
1347
/// # std::io::Result::Ok(()) });
1364
1348
/// ```
1365
1349
pub async fn connect < A : Into < SocketAddr > > ( addr : A ) -> io:: Result < Async < TcpStream > > {
1350
+ use rustix:: net:: { SocketAddrV4 , SocketAddrV6 } ;
1351
+
1366
1352
// Begin async connect.
1367
1353
let addr = addr. into ( ) ;
1368
- let domain = Domain :: for_address ( addr) ;
1369
- let socket = connect ( addr. into ( ) , domain, Some ( Protocol :: TCP ) ) ?;
1354
+ let ( addr, domain) = match addr {
1355
+ SocketAddr :: V4 ( sv4) => (
1356
+ SocketAddrAny :: V4 ( SocketAddrV4 :: new ( sv4. ip ( ) . octets ( ) . into ( ) , sv4. port ( ) ) ) ,
1357
+ AddressFamily :: INET ,
1358
+ ) ,
1359
+ SocketAddr :: V6 ( sv6) => (
1360
+ SocketAddrAny :: V6 ( SocketAddrV6 :: new (
1361
+ sv6. ip ( ) . octets ( ) . into ( ) ,
1362
+ sv6. port ( ) ,
1363
+ sv6. flowinfo ( ) ,
1364
+ sv6. scope_id ( ) ,
1365
+ ) ) ,
1366
+ AddressFamily :: INET6 ,
1367
+ ) ,
1368
+ } ;
1369
+ let socket = connect ( & addr, domain, Some ( Protocol :: TCP ) ) ?;
1370
1370
let stream = Async :: new ( TcpStream :: from ( socket) ) ?;
1371
1371
1372
1372
// The stream becomes writable when connected.
@@ -1694,8 +1694,14 @@ impl Async<UnixStream> {
1694
1694
/// # std::io::Result::Ok(()) });
1695
1695
/// ```
1696
1696
pub async fn connect < P : AsRef < Path > > ( path : P ) -> io:: Result < Async < UnixStream > > {
1697
+ use rustix:: net:: SocketAddrUnix ;
1698
+
1697
1699
// Begin async connect.
1698
- let socket = connect ( SockAddr :: unix ( path) ?, Domain :: UNIX , None ) ?;
1700
+ let socket = connect (
1701
+ & SocketAddrAny :: Unix ( SocketAddrUnix :: new ( path. as_ref ( ) ) ?) ,
1702
+ AddressFamily :: UNIX ,
1703
+ None ,
1704
+ ) ?;
1699
1705
let stream = Async :: new ( UnixStream :: from ( socket) ) ?;
1700
1706
1701
1707
// The stream becomes writable when connected.
@@ -1905,8 +1911,14 @@ async fn optimistic(fut: impl Future<Output = io::Result<()>>) -> io::Result<()>
1905
1911
. await
1906
1912
}
1907
1913
1908
- fn connect ( addr : SockAddr , domain : Domain , protocol : Option < Protocol > ) -> io:: Result < Socket > {
1909
- let sock_type = Type :: STREAM ;
1914
+ fn connect (
1915
+ addr : & SocketAddrAny ,
1916
+ domain : AddressFamily ,
1917
+ protocol : Option < Protocol > ,
1918
+ ) -> io:: Result < OwnedFd > {
1919
+ let sock_type = SocketType :: STREAM ;
1920
+ let sock_flags = SocketFlags :: empty ( ) ;
1921
+
1910
1922
#[ cfg( any(
1911
1923
target_os = "android" ,
1912
1924
target_os = "dragonfly" ,
@@ -1918,9 +1930,14 @@ fn connect(addr: SockAddr, domain: Domain, protocol: Option<Protocol>) -> io::Re
1918
1930
target_os = "openbsd"
1919
1931
) ) ]
1920
1932
// If we can, set nonblocking at socket creation for unix
1921
- let sock_type = sock_type. nonblocking ( ) ;
1922
- // This automatically handles cloexec on unix, no_inherit on windows and nosigpipe on macos
1923
- let socket = Socket :: new ( domain, sock_type, protocol) ?;
1933
+ let sock_flags = sock_flags | SocketFlags :: NONBLOCK ;
1934
+
1935
+ // Create the socket.
1936
+ let socket =
1937
+ rustix:: net:: socket_with ( domain, sock_type, sock_flags, protocol. unwrap_or_default ( ) ) ?;
1938
+
1939
+ // TODO: Set cloexec on Unix, nosigpipe on macos and no_inherit on windows
1940
+
1924
1941
#[ cfg( not( any(
1925
1942
target_os = "android" ,
1926
1943
target_os = "dragonfly" ,
@@ -1932,13 +1949,37 @@ fn connect(addr: SockAddr, domain: Domain, protocol: Option<Protocol>) -> io::Re
1932
1949
target_os = "openbsd"
1933
1950
) ) ) ]
1934
1951
// If the current platform doesn't support nonblocking at creation, enable it after creation
1935
- socket. set_nonblocking ( true ) ?;
1936
- match socket. connect ( & addr) {
1952
+ set_nonblocking ( rustix:: fd:: AsFd :: as_fd ( & socket) ) ?;
1953
+
1954
+ match rustix:: net:: connect_any ( & socket, addr) {
1937
1955
Ok ( _) => { }
1938
1956
#[ cfg( unix) ]
1939
- Err ( err) if err. raw_os_error ( ) == Some ( rustix:: io:: Errno :: INPROGRESS . raw_os_error ( ) ) => { }
1957
+ Err ( err) if err. raw_os_error ( ) == rustix:: io:: Errno :: INPROGRESS . raw_os_error ( ) => { }
1940
1958
Err ( err) if err. kind ( ) == io:: ErrorKind :: WouldBlock => { }
1941
- Err ( err) => return Err ( err) ,
1959
+ Err ( err) => return Err ( err. into ( ) ) ,
1942
1960
}
1961
+
1943
1962
Ok ( socket)
1944
1963
}
1964
+
1965
+ fn set_nonblocking ( fd : rustix:: fd:: BorrowedFd < ' _ > ) -> io:: Result < ( ) > {
1966
+ cfg_if:: cfg_if! {
1967
+ // ioctl(FIONBIO) sets the flag atomically, but we use this only on Linux
1968
+ // for now, as with the standard library, because it seems to behave
1969
+ // differently depending on the platform.
1970
+ // https://github.com/rust-lang/rust/commit/efeb42be2837842d1beb47b51bb693c7474aba3d
1971
+ // https://github.com/libuv/libuv/blob/e9d91fccfc3e5ff772d5da90e1c4a24061198ca0/src/unix/poll.c#L78-L80
1972
+ // https://github.com/tokio-rs/mio/commit/0db49f6d5caf54b12176821363d154384357e70a
1973
+ if #[ cfg( any( target_os = "linux" , target_os = "windows" ) ) ] {
1974
+ rustix:: io:: ioctl_fionbio( fd, true ) ?;
1975
+ } else {
1976
+ let previous = rustix:: fs:: fcntl_getfl( fd) ?;
1977
+ let new = previous | rustix:: fs:: OFlags :: NONBLOCK ;
1978
+ if new != previous {
1979
+ rustix:: fs:: fcntl_setfl( fd, new) ?;
1980
+ }
1981
+ }
1982
+ }
1983
+
1984
+ Ok ( ( ) )
1985
+ }
0 commit comments