7
7
class rust_uvtmp_thread ;
8
8
9
9
struct connect_data {
10
+ uint32_t req_id;
10
11
rust_uvtmp_thread *thread;
12
+ char * ip_addr;
11
13
uv_connect_t connect;
12
14
uv_tcp_t tcp;
13
15
chan_handle chan;
@@ -60,12 +62,13 @@ send(rust_task *task, chan_handle chan, void *data) {
60
62
class rust_uvtmp_thread : public rust_thread {
61
63
62
64
private:
65
+ std::map<int , connect_data *> req_map;
63
66
rust_task *task;
64
67
uv_loop_t *loop;
65
68
uv_idle_t idle;
66
69
lock_and_signal lock;
67
70
bool stop_flag;
68
- std::queue<std::pair<std::string , chan_handle> > connect_queue;
71
+ std::queue<std::pair<connect_data * , chan_handle> > connect_queue;
69
72
std::queue<connect_data*> close_connection_queue;
70
73
std::queue<write_data*> write_queue;
71
74
std::queue<read_start_data*> read_start_queue;
@@ -90,40 +93,50 @@ class rust_uvtmp_thread : public rust_thread {
90
93
stop_flag = true ;
91
94
}
92
95
93
- void connect (char *ip, chan_handle chan) {
96
+ connect_data * connect (uint32_t req_id, char *ip, chan_handle chan) {
94
97
scoped_lock with (lock);
95
- connect_queue.push (std::pair<std::string, chan_handle>
96
- (std::string (ip), chan));
98
+ if (req_map.count (req_id)) return NULL ;
99
+ connect_data *cd = new connect_data ();
100
+ req_map[req_id] = cd;
101
+ cd->req_id = req_id;
102
+ cd->ip_addr = ip;
103
+ connect_queue.push (
104
+ std::pair<connect_data *, chan_handle>(cd, chan));
105
+ return cd;
97
106
}
98
107
99
108
void
100
- close_connection (connect_data *cd) {
101
- scoped_lock with (lock);
102
- close_connection_queue.push (cd);
109
+ close_connection (uint32_t req_id) {
110
+ scoped_lock with (lock);
111
+ connect_data *cd = req_map[req_id];
112
+ close_connection_queue.push (cd);
113
+ req_map.erase (req_id);
103
114
}
104
115
105
116
void
106
- write (connect_data *cd, uint8_t *buf, size_t len, chan_handle chan) {
107
- scoped_lock with (lock);
108
- write_data *wd = new write_data ();
109
- wd->cd = cd;
110
- wd->buf = new uint8_t [len];
111
- wd->len = len;
112
- wd->chan = chan;
113
-
114
- memcpy (wd->buf , buf, len);
115
-
116
- write_queue.push (wd);
117
+ write (uint32_t req_id, uint8_t *buf, size_t len, chan_handle chan) {
118
+ scoped_lock with (lock);
119
+ connect_data *cd = req_map[req_id];
120
+ write_data *wd = new write_data ();
121
+ wd->cd = cd;
122
+ wd->buf = new uint8_t [len];
123
+ wd->len = len;
124
+ wd->chan = chan;
125
+
126
+ memcpy (wd->buf , buf, len);
127
+
128
+ write_queue.push (wd);
117
129
}
118
130
119
131
void
120
- read_start (connect_data *cd, chan_handle chan) {
121
- scoped_lock with (lock);
122
- read_start_data *rd = new read_start_data ();
123
- rd->cd = cd;
124
- rd->chan = chan;
125
-
126
- read_start_queue.push (rd);
132
+ read_start (uint32_t req_id, chan_handle chan) {
133
+ scoped_lock with (lock);
134
+ connect_data *cd = req_map[req_id];
135
+ read_start_data *rd = new read_start_data ();
136
+ rd->cd = cd;
137
+ rd->chan = chan;
138
+
139
+ read_start_queue.push (rd);
127
140
}
128
141
129
142
private:
@@ -153,12 +166,12 @@ class rust_uvtmp_thread : public rust_thread {
153
166
make_new_connections () {
154
167
assert (lock.lock_held_by_current_thread ());
155
168
while (!connect_queue.empty ()) {
156
- std::pair<std::string , chan_handle> pair = connect_queue.front ();
169
+ std::pair<connect_data * , chan_handle> pair = connect_queue.front ();
157
170
connect_queue.pop ();
171
+ connect_data *cd = pair.first ;
158
172
struct sockaddr_in client_addr = uv_ip4_addr (" 0.0.0.0" , 0 );
159
- struct sockaddr_in server_addr = uv_ip4_addr (pair. first . c_str () , 80 );
173
+ struct sockaddr_in server_addr = uv_ip4_addr (cd-> ip_addr , 80 );
160
174
161
- connect_data *cd = new connect_data ();
162
175
cd->thread = this ;
163
176
cd->chan = pair.second ;
164
177
cd->connect .data = cd;
@@ -318,29 +331,36 @@ rust_uvtmp_delete_thread(rust_uvtmp_thread *thread) {
318
331
delete thread;
319
332
}
320
333
321
- extern " C" void
322
- rust_uvtmp_connect (rust_uvtmp_thread *thread, char *ip, chan_handle *chan) {
323
- thread->connect (ip, *chan);
334
+ extern " C" connect_data *
335
+ rust_uvtmp_connect (rust_uvtmp_thread *thread, uint32_t req_id, char *ip, chan_handle *chan) {
336
+ return thread->connect (req_id, ip, *chan);
324
337
}
325
338
326
339
extern " C" void
327
- rust_uvtmp_close_connection (rust_uvtmp_thread *thread, connect_data *cd ) {
328
- thread->close_connection (cd );
340
+ rust_uvtmp_close_connection (rust_uvtmp_thread *thread, uint32_t req_id ) {
341
+ thread->close_connection (req_id );
329
342
}
330
343
331
344
extern " C" void
332
- rust_uvtmp_write (rust_uvtmp_thread *thread, connect_data *cd ,
345
+ rust_uvtmp_write (rust_uvtmp_thread *thread, uint32_t req_id ,
333
346
uint8_t *buf, size_t len, chan_handle *chan) {
334
- thread->write (cd , buf, len, *chan);
347
+ thread->write (req_id , buf, len, *chan);
335
348
}
336
349
337
350
extern " C" void
338
- rust_uvtmp_read_start (rust_uvtmp_thread *thread, connect_data *cd ,
351
+ rust_uvtmp_read_start (rust_uvtmp_thread *thread, uint32_t req_id ,
339
352
chan_handle *chan) {
340
- thread->read_start (cd , *chan);
353
+ thread->read_start (req_id , *chan);
341
354
}
342
355
343
356
extern " C" void
344
357
rust_uvtmp_delete_buf (uint8_t *buf) {
345
358
delete [] buf;
346
359
}
360
+
361
+ extern " C" uint32_t
362
+ rust_uvtmp_get_req_id (connect_data *cd) {
363
+ return cd->req_id ;
364
+ }
365
+
366
+
0 commit comments