@@ -4,18 +4,38 @@ High-level interface to libuv's TCP functionality
4
4
5
5
import ip = net_ip;
6
6
7
- export tcp_connect_result;
8
- export connect;
7
+ export tcp_connect_result, tcp_write_result ;
8
+ export connect, write ;
9
9
10
- enum tcp_socket {
11
- valid_tcp_socket( @tcp_socket_data )
10
+ resource tcp_socket( socket_data: @tcp_socket_data) unsafe {
11
+ let closed_po = comm:: port :: < ( ) > ( ) ;
12
+ let closed_ch = comm:: chan ( closed_po) ;
13
+ let close_data = {
14
+ closed_ch: closed_ch
15
+ } ;
16
+ let close_data_ptr = ptr:: addr_of ( close_data) ;
17
+ let stream_handle_ptr = ptr:: addr_of ( ( * socket_data) . stream_handle ) ;
18
+ uv:: hl:: interact ( ( * socket_data) . hl_loop ) { |loop_ptr|
19
+ log ( debug, #fmt ( "interact dtor for tcp_socket stream %? loop %?" ,
20
+ stream_handle_ptr, loop_ptr) ) ;
21
+ uv:: ll:: set_data_for_uv_handle ( stream_handle_ptr,
22
+ close_data_ptr) ;
23
+ uv:: ll:: close ( stream_handle_ptr, tcp_socket_dtor_close_cb) ;
24
+ } ;
25
+ comm:: recv ( closed_po) ;
26
+ log ( debug, "exiting dtor for tcp_socket" ) ;
12
27
}
13
28
14
29
enum tcp_connect_result {
15
30
tcp_connected( tcp_socket ) ,
16
31
tcp_connect_error( uv:: ll:: uv_err_data )
17
32
}
18
33
34
+ enum tcp_write_result {
35
+ tcp_write_success,
36
+ tcp_write_error( uv:: ll:: uv_err_data )
37
+ }
38
+
19
39
#[ doc="
20
40
Initiate a client connection over TCP/IP
21
41
@@ -37,19 +57,19 @@ fn connect(input_ip: ip::ip_addr, port: uint) -> tcp_connect_result unsafe {
37
57
closed_signal_ch: comm:: chan ( closed_signal_po)
38
58
} ;
39
59
let conn_data_ptr = ptr:: addr_of ( conn_data) ;
60
+ let hl_loop = uv:: global_loop:: get ( ) ;
40
61
let socket_data = @{
41
62
reader_port: comm:: port :: < [ u8 ] > ( ) ,
42
63
stream_handle : uv:: ll:: tcp_t ( ) ,
43
64
connect_req : uv:: ll:: connect_t ( ) ,
44
- write_req : uv:: ll:: write_t ( )
65
+ write_req : uv:: ll:: write_t ( ) ,
66
+ hl_loop: hl_loop
45
67
} ;
46
68
log ( debug, #fmt ( "tcp_connect result_ch %?" , conn_data. result_ch ) ) ;
47
69
// get an unsafe representation of our stream_handle_ptr that
48
70
// we can send into the interact cb to be handled in libuv..
49
71
let socket_data_ptr: * tcp_socket_data =
50
72
ptr:: addr_of ( * socket_data) ;
51
- // in we go!
52
- let hl_loop = uv:: global_loop:: get ( ) ;
53
73
log ( debug, #fmt ( "stream_handl_ptr outside interact %?" ,
54
74
ptr:: addr_of ( ( * socket_data_ptr) . stream_handle ) ) ) ;
55
75
uv:: hl:: interact ( hl_loop) { |loop_ptr|
@@ -113,7 +133,7 @@ fn connect(input_ip: ip::ip_addr, port: uint) -> tcp_connect_result unsafe {
113
133
alt comm:: recv ( result_po) {
114
134
conn_success {
115
135
log( debug, "tcp::connect - received success on result_po" ) ;
116
- tcp_connected ( valid_tcp_socket ( socket_data) )
136
+ tcp_connected ( tcp_socket ( socket_data) )
117
137
}
118
138
conn_failure ( err_data) {
119
139
comm:: recv ( closed_signal_po) ;
@@ -122,7 +142,87 @@ fn connect(input_ip: ip::ip_addr, port: uint) -> tcp_connect_result unsafe {
122
142
}
123
143
}
124
144
}
145
+
146
+ #[ doc="
147
+ Write binary data to a tcp stream
148
+ " ]
149
+ fn write ( sock : tcp_socket , raw_write_data : [ [ u8 ] ] ) -> tcp_write_result
150
+ unsafe {
151
+ let socket_data_ptr = ptr:: addr_of ( * * sock) ;
152
+ let write_req_ptr = ptr:: addr_of ( ( * socket_data_ptr) . write_req ) ;
153
+ let stream_handle_ptr =
154
+ ptr:: addr_of ( ( * socket_data_ptr) . stream_handle ) ;
155
+ let write_buf_vec = iter:: map_to_vec ( raw_write_data) { |raw_bytes|
156
+ uv:: ll:: buf_init ( vec:: unsafe:: to_ptr ( raw_bytes) ,
157
+ vec:: len ( raw_bytes) )
158
+ } ;
159
+ let write_buf_vec_ptr = ptr:: addr_of ( write_buf_vec) ;
160
+ let result_po = comm:: port :: < tcp_write_result > ( ) ;
161
+ let write_data = {
162
+ result_ch: comm:: chan ( result_po)
163
+ } ;
164
+ let write_data_ptr = ptr:: addr_of ( write_data) ;
165
+ uv:: hl:: interact ( ( * socket_data_ptr) . hl_loop ) { |loop_ptr|
166
+ log ( debug, #fmt ( "in interact cb for tcp::write %?" , loop_ptr) ) ;
167
+ alt uv:: ll:: write ( write_req_ptr,
168
+ stream_handle_ptr,
169
+ write_buf_vec_ptr,
170
+ tcp_write_complete_cb) {
171
+ 0i32 {
172
+ log ( debug, "uv_write() invoked successfully" ) ;
173
+ uv:: ll:: set_data_for_req ( write_req_ptr, write_data_ptr) ;
174
+ }
175
+ _ {
176
+ log( debug, "error invoking uv_write()" ) ;
177
+ let err_data = uv:: ll:: get_last_err_data ( loop_ptr) ;
178
+ comm:: send ( ( * write_data_ptr) . result_ch ,
179
+ tcp_write_error ( err_data) ) ;
180
+ }
181
+ }
182
+ } ;
183
+ comm:: recv ( result_po)
184
+ }
185
+
186
+
187
+
125
188
// INTERNAL API
189
+
190
+ type tcp_socket_close_data = {
191
+ closed_ch : comm:: chan < ( ) >
192
+ } ;
193
+
194
+ crust fn tcp_socket_dtor_close_cb ( handle : * uv:: ll:: uv_tcp_t ) unsafe {
195
+ let data = uv:: ll:: get_data_for_uv_handle ( handle)
196
+ as * tcp_socket_close_data ;
197
+ let closed_ch = ( * data) . closed_ch ;
198
+ comm:: send ( closed_ch, ( ) ) ;
199
+ log ( debug, "tcp_socket_dtor_close_cb exiting.." ) ;
200
+ }
201
+
202
+ crust fn tcp_write_complete_cb ( write_req : * uv:: ll:: uv_write_t ,
203
+ status : libc:: c_int ) unsafe {
204
+ let write_data_ptr = uv:: ll:: get_data_for_req ( write_req)
205
+ as * write_req_data ;
206
+ alt status {
207
+ 0i32 {
208
+ log( debug, "successful write complete" ) ;
209
+ comm:: send ( ( * write_data_ptr) . result_ch , tcp_write_success) ;
210
+ }
211
+ _ {
212
+ let stream_handle_ptr = uv:: ll:: get_stream_handle_from_write_req (
213
+ write_req) ;
214
+ let loop_ptr = uv:: ll:: get_loop_for_uv_handle ( stream_handle_ptr) ;
215
+ let err_data = uv:: ll:: get_last_err_data ( loop_ptr) ;
216
+ log ( debug, "failure to write" ) ;
217
+ comm:: send ( ( * write_data_ptr) . result_ch , tcp_write_error ( err_data) ) ;
218
+ }
219
+ }
220
+ }
221
+
222
+ type write_req_data = {
223
+ result_ch: comm:: chan<tcp_write_result>
224
+ } ;
225
+
126
226
type connect_req_data = {
127
227
result_ch : comm:: chan < conn_attempt > ,
128
228
closed_signal_ch : comm:: chan < ( ) >
@@ -177,7 +277,8 @@ type tcp_socket_data = {
177
277
reader_port : comm:: port < [ u8 ] > ,
178
278
stream_handle : uv:: ll:: uv_tcp_t ,
179
279
connect_req : uv:: ll:: uv_connect_t ,
180
- write_req : uv:: ll:: uv_write_t
280
+ write_req : uv:: ll:: uv_write_t ,
281
+ hl_loop : uv:: hl:: high_level_loop
181
282
} ;
182
283
183
284
// convert rust ip_addr to libuv's native representation
@@ -190,10 +291,10 @@ fn ipv4_ip_addr_to_sockaddr_in(input: ip::ip_addr,
190
291
mod test {
191
292
#[ test]
192
293
fn test_gl_tcp_ipv4_request ( ) {
193
- let ip_str = "127.0.0.1 " ;
294
+ let ip_str = "173.194.79.99 " ;
194
295
let port = 80 u;
195
296
let expected_read_msg = "foo" ;
196
- let actual_write_msg = "bar " ;
297
+ let actual_write_msg = "GET / HTTP/1.1 \r \n \r \n " ;
197
298
let host_ip = ip:: v4:: parse_addr ( ip_str) ;
198
299
199
300
let data_po = comm:: port :: < [ u8 ] > ( ) ;
@@ -202,12 +303,16 @@ mod test {
202
303
alt connect( host_ip, port) {
203
304
tcp_connected ( sock) {
204
305
log ( debug, "successful tcp connect" ) ;
205
- /*
206
- let write_data = str::as_buf(actual_write_msg);
207
- alt write(sock, [write_data]) {
306
+ let mut write_data: [ [ u8 ] ] = [ ] ;
307
+ let write_data = [ str:: as_bytes ( actual_write_msg) { |str_bytes|
308
+ str_bytes
309
+ } ] ;
310
+ alt write( sock, write_data) {
208
311
tcp_write_success {
312
+ log( debug, "tcp::write successful" ) ;
313
+ /*
209
314
let mut total_read_data: [u8] = [];
210
- let reader_po = read_start(sock);nyw
315
+ let reader_po = read_start(sock);
211
316
loop {
212
317
alt comm::recv(reader_po) {
213
318
new_read_data(data) {
@@ -228,12 +333,15 @@ mod test {
228
333
}
229
334
}
230
335
comm::send(data_ch, total_read_data);
336
+ */
231
337
}
232
- tcp_write_error {
233
- fail "error during write attempt.. FIXME need err info";
338
+ tcp_write_error ( err_data) {
339
+ log ( debug, "tcp_write_error received.." ) ;
340
+ log ( debug, #fmt ( "tcp write error: %? %?" , err_data. err_name ,
341
+ err_data. err_msg ) ) ;
342
+ assert false;
234
343
}
235
344
}
236
- */
237
345
}
238
346
tcp_connect_error ( err_data) {
239
347
log ( debug, "tcp_connect_error received.." ) ;
0 commit comments