@@ -4,7 +4,7 @@ High-level interface to libuv's TCP functionality
4
4
5
5
import ip = net_ip;
6
6
7
- export tcp_connect_result, tcp_write_result;
7
+ export tcp_connect_result, tcp_write_result, tcp_read_start_result ;
8
8
export connect, write;
9
9
10
10
resource tcp_socket( socket_data: @tcp_socket_data) unsafe {
@@ -36,6 +36,17 @@ enum tcp_write_result {
36
36
tcp_write_error( uv:: ll:: uv_err_data )
37
37
}
38
38
39
+ enum tcp_read_start_result {
40
+ tcp_read_start_success( comm:: port < tcp_read_result > ) ,
41
+ tcp_read_start_error( uv:: ll:: uv_err_data )
42
+ }
43
+
44
+ enum tcp_read_result {
45
+ tcp_read_data( [ u8 ] ) ,
46
+ tcp_read_done,
47
+ tcp_read_err( uv:: ll:: uv_err_data )
48
+ }
49
+
39
50
#[ doc="
40
51
Initiate a client connection over TCP/IP
41
52
@@ -58,8 +69,10 @@ fn connect(input_ip: ip::ip_addr, port: uint) -> tcp_connect_result unsafe {
58
69
} ;
59
70
let conn_data_ptr = ptr:: addr_of ( conn_data) ;
60
71
let hl_loop = uv:: global_loop:: get ( ) ;
72
+ let reader_po = comm:: port :: < tcp_read_result > ( ) ;
61
73
let socket_data = @{
62
- reader_port: comm:: port :: < [ u8 ] > ( ) ,
74
+ reader_po: reader_po,
75
+ reader_ch: comm:: chan ( reader_po) ,
63
76
stream_handle : uv:: ll:: tcp_t ( ) ,
64
77
connect_req : uv:: ll:: connect_t ( ) ,
65
78
write_req : uv:: ll:: write_t ( ) ,
@@ -183,9 +196,99 @@ fn write(sock: tcp_socket, raw_write_data: [[u8]]) -> tcp_write_result
183
196
comm:: recv ( result_po)
184
197
}
185
198
199
+ #[ doc="
200
+ " ]
201
+ fn read_start ( sock : tcp_socket ) -> tcp_read_start_result unsafe {
202
+ let stream_handle_ptr = ptr:: addr_of ( ( * * sock) . stream_handle ) ;
203
+ let start_po = comm:: port :: < option < uv:: ll:: uv_err_data > > ( ) ;
204
+ let start_ch = comm:: chan ( start_po) ;
205
+ uv:: hl:: interact ( ( * * sock) . hl_loop ) { |loop_ptr|
206
+ log ( debug, #fmt ( "in tcp::read_start interact cb %?" , loop_ptr) ) ;
207
+ alt uv:: ll:: read_start ( stream_handle_ptr as * uv:: ll:: uv_stream_t ,
208
+ on_alloc_cb,
209
+ on_tcp_read_cb) {
210
+ 0i32 {
211
+ log ( debug, "success doing uv_read_start" ) ;
212
+ comm:: send ( start_ch, none) ;
213
+ }
214
+ _ {
215
+ log( debug, "error attempting uv_read_start" ) ;
216
+ let err_data = uv:: ll:: get_last_err_data ( loop_ptr) ;
217
+ comm:: send ( start_ch, some ( err_data) ) ;
218
+ }
219
+ }
220
+ } ;
221
+ alt comm:: recv ( start_po) {
222
+ some ( err_data) {
223
+ tcp_read_start_error ( err_data)
224
+ }
225
+ none {
226
+ tcp_read_start_success( ( * * sock) . reader_po )
227
+ }
228
+ }
229
+ }
186
230
231
+ fn read_stop ( sock : tcp_socket ) -> option < uv:: ll:: uv_err_data > unsafe {
232
+ let stream_handle_ptr = ptr:: addr_of ( ( * * sock) . stream_handle ) ;
233
+ let stop_po = comm:: port :: < option < uv:: ll:: uv_err_data > > ( ) ;
234
+ let stop_ch = comm:: chan ( stop_po) ;
235
+ uv:: hl:: interact ( ( * * sock) . hl_loop ) { |loop_ptr|
236
+ log ( debug, "in interact cb for tcp::read_stop" ) ;
237
+ alt uv:: ll:: read_stop ( stream_handle_ptr as * uv:: ll:: uv_stream_t ) {
238
+ 0i32 {
239
+ log ( debug, "successfully called uv_read_stop" ) ;
240
+ comm:: send ( stop_ch, none) ;
241
+ }
242
+ _ {
243
+ log( debug, "failure in calling uv_read_stop" ) ;
244
+ let err_data = uv:: ll:: get_last_err_data ( loop_ptr) ;
245
+ comm:: send ( stop_ch, some ( err_data) ) ;
246
+ }
247
+ }
248
+ } ;
249
+ comm:: recv ( stop_po)
250
+ }
187
251
188
252
// INTERNAL API
253
+ crust fn on_tcp_read_cb ( stream : * uv:: ll:: uv_stream_t ,
254
+ nread : libc:: ssize_t ,
255
+ ++buf : uv:: ll:: uv_buf_t ) unsafe {
256
+ let loop_ptr = uv:: ll:: get_loop_for_uv_handle ( stream) ;
257
+ let socket_data_ptr = uv:: ll:: get_data_for_uv_handle ( stream)
258
+ as * tcp_socket_data ;
259
+ let reader_ch = ( * socket_data_ptr) . reader_ch ;
260
+ alt nread {
261
+ // incoming err.. probably eof
262
+ -1 {
263
+ let err_data = uv:: ll:: get_last_err_data ( loop_ptr) ;
264
+ comm:: send ( reader_ch, tcp_read_err ( err_data) ) ;
265
+ }
266
+ // do nothing .. unneeded buf
267
+ 0 { }
268
+ // have data
269
+ _ {
270
+ // we have data
271
+ log( debug, #fmt ( "tcp on_read_cb nread: %d" , nread) ) ;
272
+ let buf_base = uv:: ll:: get_base_from_buf ( buf) ;
273
+ let buf_len = uv:: ll:: get_len_from_buf ( buf) ;
274
+ let new_bytes = vec:: unsafe:: from_buf ( buf_base, buf_len) ;
275
+ comm:: send ( reader_ch, tcp_read_data ( new_bytes) ) ;
276
+ }
277
+ }
278
+ uv:: ll:: free_base_of_buf ( buf) ;
279
+ }
280
+
281
+ crust fn on_alloc_cb ( handle: * libc:: c_void,
282
+ ++suggested_size: libc:: size_t)
283
+ -> uv:: ll:: uv_buf_t unsafe {
284
+ log( debug, "tcp read on_alloc_cb!" ) ;
285
+ let char_ptr = uv:: ll:: malloc_buf_base_of ( suggested_size) ;
286
+ log ( debug, #fmt ( "tcp read on_alloc_cb h: %? char_ptr: %u sugsize: %u" ,
287
+ handle,
288
+ char_ptr as uint ,
289
+ suggested_size as uint ) ) ;
290
+ uv:: ll:: buf_init ( char_ptr, suggested_size)
291
+ }
189
292
190
293
type tcp_socket_close_data = {
191
294
closed_ch: comm:: chan<( ) >
@@ -272,9 +375,9 @@ enum conn_attempt {
272
375
conn_failure( uv:: ll:: uv_err_data )
273
376
}
274
377
275
-
276
378
type tcp_socket_data = {
277
- reader_port : comm:: port < [ u8 ] > ,
379
+ reader_po : comm:: port < tcp_read_result > ,
380
+ reader_ch : comm:: chan < tcp_read_result > ,
278
381
stream_handle : uv:: ll:: uv_tcp_t ,
279
382
connect_req : uv:: ll:: uv_connect_t ,
280
383
write_req : uv:: ll:: uv_write_t ,
@@ -310,30 +413,52 @@ mod test {
310
413
alt write( sock, write_data) {
311
414
tcp_write_success {
312
415
log( debug, "tcp::write successful" ) ;
313
- /*
314
416
let mut total_read_data: [ u8 ] = [ ] ;
315
- let reader_po = read_start(sock);
316
- loop {
317
- alt comm::recv(reader_po) {
318
- new_read_data(data) {
319
- total_read_data += data;
320
- // theoretically, we could keep iterating, here, if
321
- // we expect the server on the other end to keep
322
- // streaming/chunking data to us, but..
323
- read_stop(tcp_stream);
324
- break;
325
- }
326
- done_reading {
327
- break;
328
- }
329
- error {
330
- fail "erroring occured during read attempt.."
331
- + "FIXME need info";
332
- }
417
+ alt read_start( sock) {
418
+ tcp_read_start_success ( reader_po) {
419
+ loop {
420
+ alt comm:: recv ( reader_po) {
421
+ tcp_read_data ( new_data) {
422
+ total_read_data += new_data;
423
+ // theoretically, we could keep iterating, if
424
+ // we expect the server on the other end to keep
425
+ // streaming/chunking data to us, but..
426
+ alt read_stop( sock) {
427
+ some ( err_data) {
428
+ log ( debug, "error while calling read_stop" ) ;
429
+ log ( debug, #fmt ( "read_stop error: %? %?" ,
430
+ err_data. err_name ,
431
+ err_data. err_msg ) ) ;
432
+ assert false;
433
+ }
434
+ none {
435
+ // exiting the read loop
436
+ break;
437
+ }
438
+ }
439
+ }
440
+ tcp_read_done {
441
+ break;
442
+ }
443
+ tcp_read_err ( err_data) {
444
+ log ( debug, "read error data recv'd" ) ;
445
+ log ( debug, #fmt ( "read error: %? %?" ,
446
+ err_data. err_name ,
447
+ err_data. err_msg ) ) ;
448
+ assert false;
449
+ }
450
+ }
333
451
}
452
+ comm:: send ( data_ch, total_read_data) ;
453
+ }
454
+ tcp_read_start_error ( err_data) {
455
+ log ( debug, "tcp_read_start_error received.." ) ;
456
+ log ( debug, #fmt ( "tcp read_start error: %? %?" ,
457
+ err_data. err_name ,
458
+ err_data. err_msg ) ) ;
459
+ assert false;
460
+ }
334
461
}
335
- comm::send(data_ch, total_read_data);
336
- */
337
462
}
338
463
tcp_write_error ( err_data) {
339
464
log ( debug, "tcp_write_error received.." ) ;
0 commit comments