Skip to content

Commit 443b77a

Browse files
olsonjefferybrson
authored andcommitted
---
yaml --- r: 12968 b: refs/heads/master c: 565c5d6 h: refs/heads/master v: v3
1 parent bbac0c2 commit 443b77a

File tree

2 files changed

+151
-26
lines changed

2 files changed

+151
-26
lines changed

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
refs/heads/master: 7e114b200abccb7dcbc03c47585828716dfc0f4a
2+
refs/heads/master: 565c5d694a51882fbbe6f1ebba370682c15bfbe8
33
refs/heads/snap-stage1: e33de59e47c5076a89eadeb38f4934f58a3618a6
44
refs/heads/snap-stage3: 4a81779abd786ff22d71434c6d9a5917ea4cdfff
55
refs/heads/try: 2898dcc5d97da9427ac367542382b6239d9c0bbf

trunk/src/libstd/net_tcp.rs

Lines changed: 150 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ High-level interface to libuv's TCP functionality
44

55
import ip = net_ip;
66

7-
export tcp_connect_result, tcp_write_result;
7+
export tcp_connect_result, tcp_write_result, tcp_read_start_result;
88
export connect, write;
99

1010
resource tcp_socket(socket_data: @tcp_socket_data) unsafe {
@@ -36,6 +36,17 @@ enum tcp_write_result {
3636
tcp_write_error(uv::ll::uv_err_data)
3737
}
3838

39+
enum tcp_read_start_result {
40+
tcp_read_start_success(comm::port<tcp_read_result>),
41+
tcp_read_start_error(uv::ll::uv_err_data)
42+
}
43+
44+
enum tcp_read_result {
45+
tcp_read_data([u8]),
46+
tcp_read_done,
47+
tcp_read_err(uv::ll::uv_err_data)
48+
}
49+
3950
#[doc="
4051
Initiate a client connection over TCP/IP
4152
@@ -58,8 +69,10 @@ fn connect(input_ip: ip::ip_addr, port: uint) -> tcp_connect_result unsafe {
5869
};
5970
let conn_data_ptr = ptr::addr_of(conn_data);
6071
let hl_loop = uv::global_loop::get();
72+
let reader_po = comm::port::<tcp_read_result>();
6173
let socket_data = @{
62-
reader_port: comm::port::<[u8]>(),
74+
reader_po: reader_po,
75+
reader_ch: comm::chan(reader_po),
6376
stream_handle : uv::ll::tcp_t(),
6477
connect_req : uv::ll::connect_t(),
6578
write_req : uv::ll::write_t(),
@@ -183,9 +196,99 @@ fn write(sock: tcp_socket, raw_write_data: [[u8]]) -> tcp_write_result
183196
comm::recv(result_po)
184197
}
185198

199+
#[doc="
200+
"]
201+
fn read_start(sock: tcp_socket) -> tcp_read_start_result unsafe {
202+
let stream_handle_ptr = ptr::addr_of((**sock).stream_handle);
203+
let start_po = comm::port::<option<uv::ll::uv_err_data>>();
204+
let start_ch = comm::chan(start_po);
205+
uv::hl::interact((**sock).hl_loop) {|loop_ptr|
206+
log(debug, #fmt("in tcp::read_start interact cb %?", loop_ptr));
207+
alt uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t,
208+
on_alloc_cb,
209+
on_tcp_read_cb) {
210+
0i32 {
211+
log(debug, "success doing uv_read_start");
212+
comm::send(start_ch, none);
213+
}
214+
_ {
215+
log(debug, "error attempting uv_read_start");
216+
let err_data = uv::ll::get_last_err_data(loop_ptr);
217+
comm::send(start_ch, some(err_data));
218+
}
219+
}
220+
};
221+
alt comm::recv(start_po) {
222+
some(err_data) {
223+
tcp_read_start_error(err_data)
224+
}
225+
none {
226+
tcp_read_start_success((**sock).reader_po)
227+
}
228+
}
229+
}
186230

231+
fn read_stop(sock: tcp_socket) -> option<uv::ll::uv_err_data> unsafe {
232+
let stream_handle_ptr = ptr::addr_of((**sock).stream_handle);
233+
let stop_po = comm::port::<option<uv::ll::uv_err_data>>();
234+
let stop_ch = comm::chan(stop_po);
235+
uv::hl::interact((**sock).hl_loop) {|loop_ptr|
236+
log(debug, "in interact cb for tcp::read_stop");
237+
alt uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) {
238+
0i32 {
239+
log(debug, "successfully called uv_read_stop");
240+
comm::send(stop_ch, none);
241+
}
242+
_ {
243+
log(debug, "failure in calling uv_read_stop");
244+
let err_data = uv::ll::get_last_err_data(loop_ptr);
245+
comm::send(stop_ch, some(err_data));
246+
}
247+
}
248+
};
249+
comm::recv(stop_po)
250+
}
187251

188252
// INTERNAL API
253+
crust fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
254+
nread: libc::ssize_t,
255+
++buf: uv::ll::uv_buf_t) unsafe {
256+
let loop_ptr = uv::ll::get_loop_for_uv_handle(stream);
257+
let socket_data_ptr = uv::ll::get_data_for_uv_handle(stream)
258+
as *tcp_socket_data;
259+
let reader_ch = (*socket_data_ptr).reader_ch;
260+
alt nread {
261+
// incoming err.. probably eof
262+
-1 {
263+
let err_data = uv::ll::get_last_err_data(loop_ptr);
264+
comm::send(reader_ch, tcp_read_err(err_data));
265+
}
266+
// do nothing .. unneeded buf
267+
0 {}
268+
// have data
269+
_ {
270+
// we have data
271+
log(debug, #fmt("tcp on_read_cb nread: %d", nread));
272+
let buf_base = uv::ll::get_base_from_buf(buf);
273+
let buf_len = uv::ll::get_len_from_buf(buf);
274+
let new_bytes = vec::unsafe::from_buf(buf_base, buf_len);
275+
comm::send(reader_ch, tcp_read_data(new_bytes));
276+
}
277+
}
278+
uv::ll::free_base_of_buf(buf);
279+
}
280+
281+
crust fn on_alloc_cb(handle: *libc::c_void,
282+
++suggested_size: libc::size_t)
283+
-> uv::ll::uv_buf_t unsafe {
284+
log(debug, "tcp read on_alloc_cb!");
285+
let char_ptr = uv::ll::malloc_buf_base_of(suggested_size);
286+
log(debug, #fmt("tcp read on_alloc_cb h: %? char_ptr: %u sugsize: %u",
287+
handle,
288+
char_ptr as uint,
289+
suggested_size as uint));
290+
uv::ll::buf_init(char_ptr, suggested_size)
291+
}
189292

190293
type tcp_socket_close_data = {
191294
closed_ch: comm::chan<()>
@@ -272,9 +375,9 @@ enum conn_attempt {
272375
conn_failure(uv::ll::uv_err_data)
273376
}
274377

275-
276378
type tcp_socket_data = {
277-
reader_port: comm::port<[u8]>,
379+
reader_po: comm::port<tcp_read_result>,
380+
reader_ch: comm::chan<tcp_read_result>,
278381
stream_handle: uv::ll::uv_tcp_t,
279382
connect_req: uv::ll::uv_connect_t,
280383
write_req: uv::ll::uv_write_t,
@@ -310,30 +413,52 @@ mod test {
310413
alt write(sock, write_data) {
311414
tcp_write_success {
312415
log(debug, "tcp::write successful");
313-
/*
314416
let mut total_read_data: [u8] = [];
315-
let reader_po = read_start(sock);
316-
loop {
317-
alt comm::recv(reader_po) {
318-
new_read_data(data) {
319-
total_read_data += data;
320-
// theoretically, we could keep iterating, here, if
321-
// we expect the server on the other end to keep
322-
// streaming/chunking data to us, but..
323-
read_stop(tcp_stream);
324-
break;
325-
}
326-
done_reading {
327-
break;
328-
}
329-
error {
330-
fail "erroring occured during read attempt.."
331-
+ "FIXME need info";
332-
}
417+
alt read_start(sock) {
418+
tcp_read_start_success(reader_po) {
419+
loop {
420+
alt comm::recv(reader_po) {
421+
tcp_read_data(new_data) {
422+
total_read_data += new_data;
423+
// theoretically, we could keep iterating, if
424+
// we expect the server on the other end to keep
425+
// streaming/chunking data to us, but..
426+
alt read_stop(sock) {
427+
some(err_data) {
428+
log(debug, "error while calling read_stop");
429+
log(debug, #fmt("read_stop error: %? %?",
430+
err_data.err_name,
431+
err_data.err_msg));
432+
assert false;
433+
}
434+
none {
435+
// exiting the read loop
436+
break;
437+
}
438+
}
439+
}
440+
tcp_read_done {
441+
break;
442+
}
443+
tcp_read_err(err_data) {
444+
log(debug, "read error data recv'd");
445+
log(debug, #fmt("read error: %? %?",
446+
err_data.err_name,
447+
err_data.err_msg));
448+
assert false;
449+
}
450+
}
333451
}
452+
comm::send(data_ch, total_read_data);
453+
}
454+
tcp_read_start_error(err_data) {
455+
log(debug, "tcp_read_start_error received..");
456+
log(debug, #fmt("tcp read_start error: %? %?",
457+
err_data.err_name,
458+
err_data.err_msg));
459+
assert false;
460+
}
334461
}
335-
comm::send(data_ch, total_read_data);
336-
*/
337462
}
338463
tcp_write_error(err_data) {
339464
log(debug, "tcp_write_error received..");

0 commit comments

Comments
 (0)