1
- /*! Runtime support for message passing with protocol enforcement.
2
-
3
-
4
- Pipes consist of two endpoints. One endpoint can send messages and
5
- the other can receive messages. The set of legal messages and which
6
- directions they can flow at any given point are determined by a
7
- protocol. Below is an example protocol.
8
-
9
- ~~~
10
- proto! pingpong {
11
- ping: send {
12
- ping -> pong
13
- }
14
- pong: recv {
15
- pong -> ping
16
- }
17
- }
18
- ~~~
19
-
20
- The `proto!` syntax extension will convert this into a module called
21
- `pingpong`, which includes a set of types and functions that can be
22
- used to write programs that follow the pingpong protocol.
23
-
24
- */
25
-
26
- /* IMPLEMENTATION NOTES
27
-
28
- The initial design for this feature is available at:
29
-
30
- https://github.com/eholk/rust/wiki/Proposal-for-channel-contracts
31
-
32
- Much of the design in that document is still accurate. There are
33
- several components for the pipe implementation. First of all is the
34
- syntax extension. To see how that works, it is best see comments in
35
- libsyntax/ext/pipes.rs.
36
-
37
- This module includes two related pieces of the runtime
38
- implementation. There is support for unbounded and bounded
39
- protocols. The main difference between the two is the type of the
40
- buffer that is carried along in the endpoint data structures.
41
-
42
- FIXME (#3072) - This is still incomplete
43
-
44
- */
1
+ // Runtime support for pipes.
45
2
46
3
import unsafe:: { forget, reinterpret_cast, transmute} ;
47
4
import either:: { either, left, right} ;
@@ -54,7 +11,7 @@ export send_packet_buffered, recv_packet_buffered;
54
11
export packet, mk_packet, entangle_buffer, has_buffer, buffer_header;
55
12
56
13
// export these so we can find them in the buffer_resource
57
- // destructor. This is probably a symptom of #3005 .
14
+ // destructor. This is probably another metadata bug .
58
15
export atomic_add_acq, atomic_sub_rel;
59
16
60
17
// User-level things
@@ -63,22 +20,24 @@ export select, select2, selecti, select2i, selectable;
63
20
export spawn_service, spawn_service_recv;
64
21
export stream, port, chan, shared_chan, port_set, channel;
65
22
66
- #[ doc( hidden) ]
67
23
const SPIN_COUNT : uint = 0 ;
68
24
69
25
macro_rules! move_it {
70
26
{ $x: expr } => { unsafe { let y <- * ptr:: addr_of( $x) ; y } }
71
27
}
72
28
73
- #[ doc( hidden) ]
29
+ // This is to help make sure we only move out of enums in safe
30
+ // places. Once there is unary move, it can be removed.
31
+ fn move_it < T > ( -x : T ) -> T { x }
32
+
74
33
enum state {
75
34
empty,
76
35
full,
77
36
blocked,
78
37
terminated
79
38
}
80
39
81
- struct buffer_header {
40
+ class buffer_header {
82
41
// Tracks whether this buffer needs to be freed. We can probably
83
42
// get away with restricting it to 0 or 1, if we're careful.
84
43
let mut ref_count : int;
@@ -90,13 +49,12 @@ struct buffer_header {
90
49
}
91
50
92
51
// This is for protocols to associate extra data to thread around.
93
- #[ doc( hidden) ]
94
52
type buffer < T : send > = {
95
53
header : buffer_header ,
96
54
data : T ,
97
55
} ;
98
56
99
- struct packet_header {
57
+ class packet_header {
100
58
let mut state : state;
101
59
let mut blocked_task: option < * rust_task > ;
102
60
@@ -137,7 +95,6 @@ struct packet_header {
137
95
}
138
96
}
139
97
140
- #[ doc( hidden) ]
141
98
type packet< T : send > = {
142
99
header: packet_header,
143
100
mut payload: option<T >,
@@ -176,7 +133,6 @@ fn unibuffer<T: send>() -> ~buffer<packet<T>> {
176
133
b
177
134
}
178
135
179
- #[ doc( hidden) ]
180
136
fn packet < T : send > ( ) -> * packet < T > {
181
137
let b = unibuffer ( ) ;
182
138
let p = ptr:: addr_of ( b. data ) ;
@@ -185,7 +141,6 @@ fn packet<T: send>() -> *packet<T> {
185
141
p
186
142
}
187
143
188
- #[ doc( hidden) ]
189
144
fn entangle_buffer < T : send , Tstart : send > (
190
145
-buffer : ~buffer < T > ,
191
146
init : fn ( * libc:: c_void , x : & T ) -> * packet < Tstart > )
@@ -208,22 +163,18 @@ extern mod rusti {
208
163
209
164
// If I call the rusti versions directly from a polymorphic function,
210
165
// I get link errors. This is a bug that needs investigated more.
211
- #[ doc( hidden) ]
212
166
fn atomic_xchng_rel ( & dst: int , src : int ) -> int {
213
167
rusti:: atomic_xchng_rel ( dst, src)
214
168
}
215
169
216
- #[ doc( hidden) ]
217
170
fn atomic_add_acq ( & dst: int , src : int ) -> int {
218
171
rusti:: atomic_add_acq ( dst, src)
219
172
}
220
173
221
- #[ doc( hidden) ]
222
174
fn atomic_sub_rel ( & dst: int , src : int ) -> int {
223
175
rusti:: atomic_sub_rel ( dst, src)
224
176
}
225
177
226
- #[ doc( hidden) ]
227
178
type rust_task = libc:: c_void ;
228
179
229
180
extern mod rustrt {
@@ -237,7 +188,6 @@ extern mod rustrt {
237
188
pure fn task_signal_event ( target : * rust_task , event : * libc:: c_void ) ;
238
189
}
239
190
240
- #[ doc( hidden) ]
241
191
fn wait_event ( this : * rust_task ) -> * libc:: c_void {
242
192
let mut event = ptr:: null ( ) ;
243
193
@@ -248,7 +198,6 @@ fn wait_event(this: *rust_task) -> *libc::c_void {
248
198
event
249
199
}
250
200
251
- #[ doc( hidden) ]
252
201
fn swap_state_acq ( & dst: state , src : state ) -> state {
253
202
unsafe {
254
203
reinterpret_cast ( rusti:: atomic_xchng_acq (
@@ -257,7 +206,6 @@ fn swap_state_acq(&dst: state, src: state) -> state {
257
206
}
258
207
}
259
208
260
- #[ doc( hidden) ]
261
209
fn swap_state_rel ( & dst: state , src : state ) -> state {
262
210
unsafe {
263
211
reinterpret_cast ( rusti:: atomic_xchng_rel (
@@ -266,12 +214,11 @@ fn swap_state_rel(&dst: state, src: state) -> state {
266
214
}
267
215
}
268
216
269
- #[ doc( hidden) ]
270
217
unsafe fn get_buffer < T : send > ( p : * packet_header ) -> ~buffer < T > {
271
218
transmute ( ( * p) . buf_header ( ) )
272
219
}
273
220
274
- struct buffer_resource < T : send > {
221
+ class buffer_resource<T : send> {
275
222
let buffer: ~buffer < T > ;
276
223
new( +b: ~buffer<T >) {
277
224
//let p = ptr::addr_of(*b);
@@ -297,7 +244,6 @@ struct buffer_resource<T: send> {
297
244
}
298
245
}
299
246
300
- #[ doc( hidden) ]
301
247
fn send < T : send , Tbuffer : send > ( -p : send_packet_buffered < T , Tbuffer > ,
302
248
-payload : T ) {
303
249
let header = p. header ( ) ;
@@ -335,21 +281,10 @@ fn send<T: send, Tbuffer: send>(-p: send_packet_buffered<T, Tbuffer>,
335
281
}
336
282
}
337
283
338
- /** Receives a message from a pipe.
339
-
340
- Fails if the sender closes the connection.
341
-
342
- */
343
284
fn recv<T : send, Tbuffer : send>( -p: recv_packet_buffered<T , Tbuffer >) -> T {
344
285
option : : unwrap ( try_recv ( p) )
345
286
}
346
287
347
- /** Attempts to receive a message from a pipe.
348
-
349
- Returns `none` if the sender has closed the connection without sending
350
- a message, or `some(T)` if a message was received.
351
-
352
- */
353
288
fn try_recv<T : send, Tbuffer : send>( -p: recv_packet_buffered<T , Tbuffer >)
354
289
-> option<T >
355
290
{
@@ -416,7 +351,6 @@ impl peek<T: send, Tb: send> for recv_packet_buffered<T, Tb> {
416
351
}
417
352
}
418
353
419
- #[ doc( hidden) ]
420
354
fn sender_terminate < T : send > ( p : * packet < T > ) {
421
355
let p = unsafe { & * p } ;
422
356
alt swap_state_rel ( p. header . state , terminated) {
@@ -443,7 +377,6 @@ fn sender_terminate<T: send>(p: *packet<T>) {
443
377
}
444
378
}
445
379
446
- #[ doc( hidden) ]
447
380
fn receiver_terminate < T : send > ( p : * packet < T > ) {
448
381
let p = unsafe { & * p } ;
449
382
alt swap_state_rel ( p. header . state , terminated) {
@@ -461,16 +394,8 @@ fn receiver_terminate<T: send>(p: *packet<T>) {
461
394
}
462
395
}
463
396
464
- /** Returns when one of the packet headers reports data is available.
465
-
466
- This function is primarily intended for building higher level waiting
467
- functions, such as `select`, `select2`, etc.
468
-
469
- It takes a vector slice of packet_headers and returns an index into
470
- that vector. The index points to an endpoint that has either been
471
- closed by the sender or has a message waiting to be received.
472
-
473
- */
397
+ #[ doc = "Returns when one of the packet headers reports data is
398
+ available." ]
474
399
fn wait_many ( pkts : & [ * packet_header ] ) -> uint {
475
400
let this = rustrt:: rust_get_task ( ) ;
476
401
@@ -522,34 +447,6 @@ fn wait_many(pkts: &[*packet_header]) -> uint {
522
447
ready_packet
523
448
}
524
449
525
- /** Receives a message from one of two endpoints.
526
-
527
- The return value is `left` if the first endpoint received something,
528
- or `right` if the second endpoint receives something. In each case,
529
- the result includes the other endpoint as well so it can be used
530
- again. Below is an example of using `select2`.
531
-
532
- ~~~
533
- match select2(a, b) {
534
- left((none, b)) {
535
- // endpoint a was closed.
536
- }
537
- right((a, none)) {
538
- // endpoint b was closed.
539
- }
540
- left((some(_), b)) {
541
- // endpoint a received a message
542
- }
543
- right(a, some(_)) {
544
- // endpoint b received a message.
545
- }
546
- }
547
- ~~~
548
-
549
- Sometimes messages will be available on both endpoints at once. In
550
- this case, `select2` may return either `left` or `right`.
551
-
552
- */
553
450
fn select2 < A : send , Ab : send , B : send , Bb : send > (
554
451
+a : recv_packet_buffered < A , Ab > ,
555
452
+b : recv_packet_buffered < B , Bb > )
@@ -603,16 +500,13 @@ fn select<T: send, Tb: send>(+endpoints: ~[recv_packet_buffered<T, Tb>])
603
500
( ready, result, remaining)
604
501
}
605
502
606
- /// The sending end of a pipe. It can be used to send exactly one
607
- /// message.
608
503
type send_packet < T : send > = send_packet_buffered < T , packet < T > > ;
609
504
610
- #[ doc( hidden) ]
611
505
fn send_packet < T : send > ( p : * packet < T > ) -> send_packet < T > {
612
506
send_packet_buffered ( p)
613
507
}
614
508
615
- struct send_packet_buffered < T : send , Tbuffer : send > {
509
+ class send_packet_buffered<T : send, Tbuffer : send> {
616
510
let mut p: option < * packet < T > > ;
617
511
let mut buffer: option < buffer_resource < Tbuffer > > ;
618
512
new ( p: * packet<T >) {
@@ -666,16 +560,13 @@ struct send_packet_buffered<T: send, Tbuffer: send> {
666
560
}
667
561
}
668
562
669
- /// Represents the receive end of a pipe. It can receive exactly one
670
- /// message.
671
563
type recv_packet<T : send> = recv_packet_buffered<T , packet<T >>;
672
564
673
- #[ doc( hidden) ]
674
565
fn recv_packet < T : send > ( p : * packet < T > ) -> recv_packet < T > {
675
566
recv_packet_buffered ( p)
676
567
}
677
568
678
- struct recv_packet_buffered < T : send , Tbuffer : send > : selectable {
569
+ class recv_packet_buffered<T : send, Tbuffer : send> : selectable {
679
570
let mut p: option<* packet<T >>;
680
571
let mut buffer: option<buffer_resource<Tbuffer >>;
681
572
new ( p: * packet<T >) {
@@ -729,7 +620,6 @@ struct recv_packet_buffered<T: send, Tbuffer: send> : selectable {
729
620
}
730
621
}
731
622
732
- #[ doc( hidden) ]
733
623
fn entangle < T : send > ( ) -> ( send_packet < T > , recv_packet < T > ) {
734
624
let p = packet ( ) ;
735
625
( send_packet ( p) , recv_packet ( p) )
@@ -796,14 +686,12 @@ trait recv<T: send> {
796
686
pure fn peek ( ) -> bool ;
797
687
}
798
688
799
- #[ doc( hidden) ]
800
689
type chan_ < T : send > = { mut endp : option < streamp:: client:: open < T > > } ;
801
690
802
691
enum chan < T : send > {
803
692
chan_( chan_ < T > )
804
693
}
805
694
806
- #[ doc( hidden) ]
807
695
type port_ < T : send > = { mut endp : option < streamp:: server:: open < T > > } ;
808
696
809
697
enum port < T : send > {
@@ -837,7 +725,7 @@ impl port<T: send> of recv<T> for port<T> {
837
725
fn try_recv ( ) -> option < T > {
838
726
let mut endp = none;
839
727
endp <-> self . endp ;
840
- alt move pipes:: try_recv ( unwrap ( endp) ) {
728
+ alt move_it ( pipes:: try_recv ( unwrap ( endp) ) ) {
841
729
some ( streamp:: data ( x, endp) ) {
842
730
self . endp = some ( move_it ! { endp} ) ;
843
731
some ( move_it ! { x} )
@@ -861,7 +749,7 @@ impl port<T: send> of recv<T> for port<T> {
861
749
}
862
750
863
751
// Treat a whole bunch of ports as one.
864
- struct port_set < T : send > : recv<T > {
752
+ class port_set<T : send> : recv<T > {
865
753
let mut ports: ~[ pipes:: port < T > ] ;
866
754
867
755
new ( ) { self . ports = ~[ ] ; }
@@ -882,7 +770,7 @@ struct port_set<T: send> : recv<T> {
882
770
let i = wait_many ( self . ports . map ( |p| p. header ( ) ) ) ;
883
771
// dereferencing an unsafe pointer nonsense to appease the
884
772
// borrowchecker.
885
- alt move unsafe { ( * ptr:: addr_of ( self . ports [ i] ) ) . try_recv ( ) } {
773
+ alt move_it ( unsafe { ( * ptr:: addr_of ( self . ports [ i] ) ) . try_recv ( ) } ) {
886
774
some ( m) {
887
775
result = some ( move_it ! { m} ) ;
888
776
}
0 commit comments