Skip to content

Commit 321e99d

Browse files
olsonjefferybrson
authored andcommitted
---
yaml --- r: 15993 b: refs/heads/try c: c7656f6 h: refs/heads/master i: 15991: df8cbc6 v: v3
1 parent 53041c8 commit 321e99d

File tree

2 files changed

+220
-91
lines changed

2 files changed

+220
-91
lines changed

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22
refs/heads/master: 61b1875c16de39c166b0f4d54bba19f9c6777d1a
33
refs/heads/snap-stage1: e33de59e47c5076a89eadeb38f4934f58a3618a6
44
refs/heads/snap-stage3: 4a81779abd786ff22d71434c6d9a5917ea4cdfff
5-
refs/heads/try: c2ae062e90105b4230981211f2979d3b093c3bcb
5+
refs/heads/try: c7656f67ad6c6ee687332f613a6074f76107c1b8
66
refs/tags/release-0.1: 1f5c5126e96c79d22cb7862f75304136e204f105

branches/try/src/libstd/net_tcp.rs

Lines changed: 219 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ Write binary data to tcp stream; Returns a `future::future` value immediately
217217
This function can produce unsafe results if the call to `write_future` is
218218
made, the `future::future` value returned is never resolved via
219219
`future::get`, and then the `tcp_socket` passed in to `write_future` leaves
220-
scope and is destructured before the task that runs the libuv write
220+
scope and is destructed before the task that runs the libuv write
221221
operation completes.
222222
223223
As such: If using `write_future`, always be sure to resolve the returned
@@ -261,34 +261,7 @@ fn read_start(sock: tcp_socket)
261261
-> result::result<comm::port<
262262
result::result<[u8], tcp_err_data>>, tcp_err_data> unsafe {
263263
let socket_data = ptr::addr_of(**sock);
264-
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
265-
let start_po = comm::port::<option<uv::ll::uv_err_data>>();
266-
let start_ch = comm::chan(start_po);
267-
log(debug, "in tcp::read_start before interact loop");
268-
uv::hl::interact((*socket_data).hl_loop) {|loop_ptr|
269-
log(debug, #fmt("in tcp::read_start interact cb %?", loop_ptr));
270-
alt uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t,
271-
on_alloc_cb,
272-
on_tcp_read_cb) {
273-
0i32 {
274-
log(debug, "success doing uv_read_start");
275-
comm::send(start_ch, none);
276-
}
277-
_ {
278-
log(debug, "error attempting uv_read_start");
279-
let err_data = uv::ll::get_last_err_data(loop_ptr);
280-
comm::send(start_ch, some(err_data));
281-
}
282-
}
283-
};
284-
alt comm::recv(start_po) {
285-
some(err_data) {
286-
result::err(err_data.to_tcp_err())
287-
}
288-
none {
289-
result::ok((*socket_data).reader_po)
290-
}
291-
}
264+
read_start_common_impl(socket_data)
292265
}
293266

294267
#[doc="
@@ -301,30 +274,62 @@ Stop reading from an open TCP connection; used with `read_start`
301274
fn read_stop(sock: tcp_socket) ->
302275
result::result<(), tcp_err_data> unsafe {
303276
let socket_data = ptr::addr_of(**sock);
304-
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
305-
let stop_po = comm::port::<option<tcp_err_data>>();
306-
let stop_ch = comm::chan(stop_po);
307-
uv::hl::interact((*socket_data).hl_loop) {|loop_ptr|
308-
log(debug, "in interact cb for tcp::read_stop");
309-
alt uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) {
310-
0i32 {
311-
log(debug, "successfully called uv_read_stop");
312-
comm::send(stop_ch, none);
313-
}
314-
_ {
315-
log(debug, "failure in calling uv_read_stop");
316-
let err_data = uv::ll::get_last_err_data(loop_ptr);
317-
comm::send(stop_ch, some(err_data.to_tcp_err()));
318-
}
319-
}
320-
};
321-
alt comm::recv(stop_po) {
322-
some(err_data) {
323-
result::err(err_data.to_tcp_err())
324-
}
325-
none {
326-
result::ok(())
327-
}
277+
read_stop_common_impl(socket_data)
278+
}
279+
280+
#[doc="
281+
Reads a single chunk of data from `tcp_socket`; block until data/error recv'd
282+
283+
Does a blocking read operation for a single chunk of data from a `tcp_socket`
284+
until a data arrives or an error is received. The provided `timeout_msecs`
285+
value is used to raise an error if the timeout period passes without any
286+
data received.
287+
288+
# Arguments
289+
290+
* `sock` - a `net::tcp::tcp_socket` that you wish to read from
291+
* `timeout_msecs` - a `uint` value, in msecs, to wait before dropping the
292+
read attempt. Pass `0u` to wait indefinitely
293+
"]
294+
fn read(sock: tcp_socket, timeout_msecs: uint)
295+
-> result::result<[u8],tcp_err_data> {
296+
let socket_data = ptr::addr_of(**sock);
297+
read_common_impl(socket_data, timeout_msecs)
298+
}
299+
300+
#[doc="
301+
Reads a single chunk of data; returns a `future::future<[u8]>` immediately
302+
303+
Does a non-blocking read operation for a single chunk of data from a
304+
`tcp_socket` and immediately returns a `future` value representing the
305+
result. When resolving the returned `future`, it will block until data
306+
arrives or an error is received. The provided `timeout_msecs`
307+
value is used to raise an error if the timeout period passes without any
308+
data received.
309+
310+
# Safety
311+
312+
This function can produce unsafe results if the call to `read_future` is
313+
made, the `future::future` value returned is never resolved via
314+
`future::get`, and then the `tcp_socket` passed in to `read_future` leaves
315+
scope and is destructed before the task that runs the libuv read
316+
operation completes.
317+
318+
As such: If using `read_future`, always be sure to resolve the returned
319+
`future` so as to ensure libuv doesn't try to access a released read handle.
320+
Otherwise, use the blocking `tcp::read` function instead.
321+
322+
# Arguments
323+
324+
* `sock` - a `net::tcp::tcp_socket` that you wish to read from
325+
* `timeout_msecs` - a `uint` value, in msecs, to wait before dropping the
326+
read attempt. Pass `0u` to wait indefinitely
327+
"]
328+
fn read_future(sock: tcp_socket, timeout_msecs: uint)
329+
-> future::future<result::result<[u8],tcp_err_data>> {
330+
let socket_data = ptr::addr_of(**sock);
331+
future::spawn {||
332+
read_common_impl(socket_data, timeout_msecs)
328333
}
329334
}
330335

@@ -778,13 +783,166 @@ impl sock_methods for tcp_socket {
778783
result::result<(), tcp_err_data> {
779784
read_stop(self)
780785
}
781-
fn write(raw_write_data: [[u8]])
786+
fn read(timeout_msecs: uint) ->
787+
result::result<[u8], tcp_err_data> {
788+
read(self, timeout_msecs)
789+
}
790+
fn read_future(timeout_msecs: uint) ->
791+
future::future<result::result<[u8], tcp_err_data>> {
792+
read_future(self, timeout_msecs)
793+
}
794+
fn write(raw_write_data: [u8])
782795
-> result::result<(), tcp_err_data> {
783796
write(self, raw_write_data)
784797
}
798+
fn write_future(raw_write_data: [u8])
799+
-> future::future<result::result<(), tcp_err_data>> {
800+
write_future(self, raw_write_data)
801+
}
785802
}
786803
// INTERNAL API
787804

805+
// shared implementation for tcp::read
806+
fn read_common_impl(socket_data: *tcp_socket_data, timeout_msecs: uint)
807+
-> result::result<[u8],tcp_err_data> {
808+
log(debug, "starting tcp::read");
809+
let rs_result = read_start_common_impl(socket_data);
810+
if result::is_failure(rs_result) {
811+
let err_data = result::get_err(rs_result);
812+
result::err(err_data)
813+
}
814+
else {
815+
log(debug, "tcp::read before recv_timeout");
816+
let read_result = if timeout_msecs > 0u {
817+
timer::recv_timeout(
818+
timeout_msecs, result::get(rs_result))
819+
} else {
820+
some(comm::recv(result::get(rs_result)))
821+
};
822+
log(debug, "tcp::read after recv_timeout");
823+
alt read_result {
824+
none {
825+
log(debug, "tcp::read: timed out..");
826+
let err_data = {
827+
err_name: "TIMEOUT",
828+
err_msg: "req timed out"
829+
};
830+
read_stop_common_impl(socket_data);
831+
result::err(err_data)
832+
}
833+
some(data_result) {
834+
log(debug, "tcp::read got data");
835+
read_stop_common_impl(socket_data);
836+
data_result
837+
}
838+
}
839+
}
840+
}
841+
842+
// shared impl for read_stop
843+
fn read_stop_common_impl(socket_data: *tcp_socket_data) ->
844+
result::result<(), tcp_err_data> unsafe {
845+
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
846+
let stop_po = comm::port::<option<tcp_err_data>>();
847+
let stop_ch = comm::chan(stop_po);
848+
uv::hl::interact((*socket_data).hl_loop) {|loop_ptr|
849+
log(debug, "in interact cb for tcp::read_stop");
850+
alt uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) {
851+
0i32 {
852+
log(debug, "successfully called uv_read_stop");
853+
comm::send(stop_ch, none);
854+
}
855+
_ {
856+
log(debug, "failure in calling uv_read_stop");
857+
let err_data = uv::ll::get_last_err_data(loop_ptr);
858+
comm::send(stop_ch, some(err_data.to_tcp_err()));
859+
}
860+
}
861+
};
862+
alt comm::recv(stop_po) {
863+
some(err_data) {
864+
result::err(err_data.to_tcp_err())
865+
}
866+
none {
867+
result::ok(())
868+
}
869+
}
870+
}
871+
872+
// shared impl for read_start
873+
fn read_start_common_impl(socket_data: *tcp_socket_data)
874+
-> result::result<comm::port<
875+
result::result<[u8], tcp_err_data>>, tcp_err_data> unsafe {
876+
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
877+
let start_po = comm::port::<option<uv::ll::uv_err_data>>();
878+
let start_ch = comm::chan(start_po);
879+
log(debug, "in tcp::read_start before interact loop");
880+
uv::hl::interact((*socket_data).hl_loop) {|loop_ptr|
881+
log(debug, #fmt("in tcp::read_start interact cb %?", loop_ptr));
882+
alt uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t,
883+
on_alloc_cb,
884+
on_tcp_read_cb) {
885+
0i32 {
886+
log(debug, "success doing uv_read_start");
887+
comm::send(start_ch, none);
888+
}
889+
_ {
890+
log(debug, "error attempting uv_read_start");
891+
let err_data = uv::ll::get_last_err_data(loop_ptr);
892+
comm::send(start_ch, some(err_data));
893+
}
894+
}
895+
};
896+
alt comm::recv(start_po) {
897+
some(err_data) {
898+
result::err(err_data.to_tcp_err())
899+
}
900+
none {
901+
result::ok((*socket_data).reader_po)
902+
}
903+
}
904+
}
905+
906+
// shared implementation used by write and write_future
907+
fn write_common_impl(socket_data_ptr: *tcp_socket_data,
908+
raw_write_data: [u8])
909+
-> result::result<(), tcp_err_data> unsafe {
910+
let write_req_ptr = ptr::addr_of((*socket_data_ptr).write_req);
911+
let stream_handle_ptr =
912+
(*socket_data_ptr).stream_handle_ptr;
913+
let write_buf_vec = [ uv::ll::buf_init(
914+
vec::unsafe::to_ptr(raw_write_data),
915+
vec::len(raw_write_data)) ];
916+
let write_buf_vec_ptr = ptr::addr_of(write_buf_vec);
917+
let result_po = comm::port::<tcp_write_result>();
918+
let write_data = {
919+
result_ch: comm::chan(result_po)
920+
};
921+
let write_data_ptr = ptr::addr_of(write_data);
922+
uv::hl::interact((*socket_data_ptr).hl_loop) {|loop_ptr|
923+
log(debug, #fmt("in interact cb for tcp::write %?", loop_ptr));
924+
alt uv::ll::write(write_req_ptr,
925+
stream_handle_ptr,
926+
write_buf_vec_ptr,
927+
tcp_write_complete_cb) {
928+
0i32 {
929+
log(debug, "uv_write() invoked successfully");
930+
uv::ll::set_data_for_req(write_req_ptr, write_data_ptr);
931+
}
932+
_ {
933+
log(debug, "error invoking uv_write()");
934+
let err_data = uv::ll::get_last_err_data(loop_ptr);
935+
comm::send((*write_data_ptr).result_ch,
936+
tcp_write_error(err_data.to_tcp_err()));
937+
}
938+
}
939+
};
940+
alt comm::recv(result_po) {
941+
tcp_write_success { result::ok(()) }
942+
tcp_write_error(err_data) { result::err(err_data.to_tcp_err()) }
943+
}
944+
}
945+
788946
// various recv_* can use a tcp_conn_port can re-use this..
789947
fn conn_port_new_tcp_socket(
790948
stream_handle_ptr: *uv::ll::uv_tcp_t,
@@ -1266,8 +1424,7 @@ mod test {
12661424
let sock = result::unwrap(accept_result);
12671425
log(debug, "SERVER: successfully accepted"+
12681426
"connection!");
1269-
let received_req_bytes =
1270-
tcp_read_single(sock);
1427+
let received_req_bytes = sock.read(2000u);
12711428
alt received_req_bytes {
12721429
result::ok(data) {
12731430
server_ch.send(
@@ -1278,6 +1435,8 @@ mod test {
12781435
comm::send(kill_ch, none);
12791436
}
12801437
result::err(err_data) {
1438+
log(debug, #fmt("SERVER: error recvd: %s %s",
1439+
err_data.err_name, err_data.err_msg));
12811440
comm::send(kill_ch, some(err_data));
12821441
server_ch.send("");
12831442
}
@@ -1333,7 +1492,7 @@ mod test {
13331492
log(debug, "SERVER: successfully accepted"+
13341493
"connection!");
13351494
let received_req_bytes =
1336-
tcp_read_single(sock);
1495+
sock.read(2000u);
13371496
alt received_req_bytes {
13381497
result::ok(data) {
13391498
server_ch.send(
@@ -1370,7 +1529,7 @@ mod test {
13701529
let sock = result::unwrap(connect_result);
13711530
let resp_bytes = str::bytes(resp);
13721531
tcp_write_single(sock, resp_bytes);
1373-
let read_result = tcp_read_single(sock);
1532+
let read_result = sock.read(2000u);
13741533
if read_result.is_failure() {
13751534
log(debug, "CLIENT: failure to read");
13761535
""
@@ -1385,39 +1544,9 @@ mod test {
13851544
}
13861545
}
13871546

1388-
fn tcp_read_single(sock: tcp_socket)
1389-
-> result::result<[u8],tcp_err_data> {
1390-
log(debug, "starting tcp_read_single");
1391-
let rs_result = sock.read_start();
1392-
if result::is_failure(rs_result) {
1393-
let err_data = result::get_err(rs_result);
1394-
result::err(err_data)
1395-
}
1396-
else {
1397-
log(debug, "before recv_timeout");
1398-
let read_result = timer::recv_timeout(
1399-
2000u, result::get(rs_result));
1400-
log(debug, "after recv_timeout");
1401-
alt read_result {
1402-
none {
1403-
log(debug, "tcp_read_single: timed out..");
1404-
let err_data = {
1405-
err_name: "TIMEOUT",
1406-
err_msg: "req timed out"
1407-
};
1408-
result::err(err_data)
1409-
}
1410-
some(data_result) {
1411-
log(debug, "tcp_read_single: got data");
1412-
sock.read_stop();
1413-
data_result
1414-
}
1415-
}
1416-
}
1417-
}
1418-
14191547
fn tcp_write_single(sock: tcp_socket, val: [u8]) {
1420-
let write_result = sock.write([val]);
1548+
let write_result_future = sock.write_future(val);
1549+
let write_result = write_result_future.get();
14211550
if result::is_failure(write_result) {
14221551
log(debug, "tcp_write_single: write failed!");
14231552
let err_data = result::get_err(write_result);

0 commit comments

Comments
 (0)