1
- // Runtime support for pipes.
1
+ /*! Runtime support for message passing with protocol enforcement.
2
+
3
+
4
+ Pipes consist of two endpoints. One endpoint can send messages and
5
+ the other can receive messages. The set of legal messages and which
6
+ directions they can flow at any given point are determined by a
7
+ protocol. Below is an example protocol.
8
+
9
+ ~~~
10
+ proto! pingpong {
11
+ ping: send {
12
+ ping -> pong
13
+ }
14
+ pong: recv {
15
+ pong -> ping
16
+ }
17
+ }
18
+ ~~~
19
+
20
+ The `proto!` syntax extension will convert this into a module called
21
+ `pingpong`, which includes a set of types and functions that can be
22
+ used to write programs that follow the pingpong protocol.
23
+
24
+ */
25
+
26
+ /* IMPLEMENTATION NOTES
27
+
28
+ The initial design for this feature is available at:
29
+
30
+ https://github.com/eholk/rust/wiki/Proposal-for-channel-contracts
31
+
32
+ Much of the design in that document is still accurate. There are
33
+ several components for the pipe implementation. First of all is the
34
+ syntax extension. To see how that works, it is best see comments in
35
+ libsyntax/ext/pipes.rs.
36
+
37
+ This module includes two related pieces of the runtime
38
+ implementation. There is support for unbounded and bounded
39
+ protocols. The main difference between the two is the type of the
40
+ buffer that is carried along in the endpoint data structures.
41
+
42
+ FIXME (#3072) - This is still incomplete
43
+
44
+ */
2
45
3
46
import unsafe:: { forget, reinterpret_cast, transmute} ;
4
47
import either:: { either, left, right} ;
@@ -11,7 +54,7 @@ export send_packet_buffered, recv_packet_buffered;
11
54
export packet, mk_packet, entangle_buffer, has_buffer, buffer_header;
12
55
13
56
// export these so we can find them in the buffer_resource
14
- // destructor. This is probably another metadata bug .
57
+ // destructor. This is probably a symptom of #3005 .
15
58
export atomic_add_acq, atomic_sub_rel;
16
59
17
60
// User-level things
@@ -20,24 +63,22 @@ export select, select2, selecti, select2i, selectable;
20
63
export spawn_service, spawn_service_recv;
21
64
export stream, port, chan, shared_chan, port_set, channel;
22
65
66
+ #[ doc( hidden) ]
23
67
const SPIN_COUNT : uint = 0 ;
24
68
25
69
macro_rules! move_it {
26
70
{ $x: expr } => { unsafe { let y <- * ptr:: addr_of( $x) ; y } }
27
71
}
28
72
29
- // This is to help make sure we only move out of enums in safe
30
- // places. Once there is unary move, it can be removed.
31
- fn move_it < T > ( -x : T ) -> T { x }
32
-
73
+ #[ doc( hidden) ]
33
74
enum state {
34
75
empty,
35
76
full,
36
77
blocked,
37
78
terminated
38
79
}
39
80
40
- class buffer_header {
81
+ struct buffer_header {
41
82
// Tracks whether this buffer needs to be freed. We can probably
42
83
// get away with restricting it to 0 or 1, if we're careful.
43
84
let mut ref_count : int ;
@@ -49,12 +90,13 @@ class buffer_header {
49
90
}
50
91
51
92
// This is for protocols to associate extra data to thread around.
93
+ #[ doc( hidden) ]
52
94
type buffer < T : send > = {
53
95
header : buffer_header ,
54
96
data : T ,
55
97
} ;
56
98
57
- class packet_header {
99
+ struct packet_header {
58
100
let mut state : state ;
59
101
let mut blocked_task: option < * rust_task > ;
60
102
@@ -95,6 +137,7 @@ class packet_header {
95
137
}
96
138
}
97
139
140
+ #[ doc( hidden) ]
98
141
type packet < T : send > = {
99
142
header : packet_header ,
100
143
mut payload : option < T > ,
@@ -133,6 +176,7 @@ fn unibuffer<T: send>() -> ~buffer<packet<T>> {
133
176
b
134
177
}
135
178
179
+ #[ doc( hidden) ]
136
180
fn packet < T : send > ( ) -> * packet < T > {
137
181
let b = unibuffer ( ) ;
138
182
let p = ptr:: addr_of ( b. data ) ;
@@ -141,6 +185,7 @@ fn packet<T: send>() -> *packet<T> {
141
185
p
142
186
}
143
187
188
+ #[ doc( hidden) ]
144
189
fn entangle_buffer < T : send , Tstart : send > (
145
190
-buffer : ~buffer < T > ,
146
191
init : fn ( * libc:: c_void , x : & T ) -> * packet < Tstart > )
@@ -163,18 +208,22 @@ extern mod rusti {
163
208
164
209
// If I call the rusti versions directly from a polymorphic function,
165
210
// I get link errors. This is a bug that needs investigated more.
211
+ #[ doc( hidden) ]
166
212
fn atomic_xchng_rel ( & dst: int , src : int ) -> int {
167
213
rusti:: atomic_xchng_rel ( dst, src)
168
214
}
169
215
216
+ #[ doc( hidden) ]
170
217
fn atomic_add_acq ( & dst: int , src : int ) -> int {
171
218
rusti:: atomic_add_acq ( dst, src)
172
219
}
173
220
221
+ #[ doc( hidden) ]
174
222
fn atomic_sub_rel ( & dst: int , src : int ) -> int {
175
223
rusti:: atomic_sub_rel ( dst, src)
176
224
}
177
225
226
+ #[ doc( hidden) ]
178
227
type rust_task = libc:: c_void ;
179
228
180
229
extern mod rustrt {
@@ -188,6 +237,7 @@ extern mod rustrt {
188
237
pure fn task_signal_event ( target : * rust_task , event : * libc:: c_void ) ;
189
238
}
190
239
240
+ #[ doc( hidden) ]
191
241
fn wait_event ( this : * rust_task ) -> * libc:: c_void {
192
242
let mut event = ptr:: null ( ) ;
193
243
@@ -198,6 +248,7 @@ fn wait_event(this: *rust_task) -> *libc::c_void {
198
248
event
199
249
}
200
250
251
+ #[ doc( hidden) ]
201
252
fn swap_state_acq ( & dst: state , src : state ) -> state {
202
253
unsafe {
203
254
reinterpret_cast ( rusti:: atomic_xchng_acq (
@@ -206,6 +257,7 @@ fn swap_state_acq(&dst: state, src: state) -> state {
206
257
}
207
258
}
208
259
260
+ #[ doc( hidden) ]
209
261
fn swap_state_rel ( & dst: state , src : state ) -> state {
210
262
unsafe {
211
263
reinterpret_cast ( rusti:: atomic_xchng_rel (
@@ -214,11 +266,12 @@ fn swap_state_rel(&dst: state, src: state) -> state {
214
266
}
215
267
}
216
268
269
+ #[ doc( hidden) ]
217
270
unsafe fn get_buffer < T : send > ( p : * packet_header ) -> ~buffer < T > {
218
271
transmute ( ( * p) . buf_header ( ) )
219
272
}
220
273
221
- class buffer_resource<T : send> {
274
+ struct buffer_resource < T : send > {
222
275
let buffer : ~buffer < T > ;
223
276
new ( +b : ~buffer < T > ) {
224
277
//let p = ptr::addr_of(*b);
@@ -244,6 +297,7 @@ class buffer_resource<T: send> {
244
297
}
245
298
}
246
299
300
+ #[ doc( hidden) ]
247
301
fn send < T : send , Tbuffer : send > ( -p : send_packet_buffered < T , Tbuffer > ,
248
302
-payload : T ) {
249
303
let header = p. header ( ) ;
@@ -281,10 +335,21 @@ fn send<T: send, Tbuffer: send>(-p: send_packet_buffered<T, Tbuffer>,
281
335
}
282
336
}
283
337
338
+ /** Receives a message from a pipe.
339
+
340
+ Fails if the sender closes the connection.
341
+
342
+ */
284
343
fn recv < T : send , Tbuffer : send > ( -p : recv_packet_buffered < T , Tbuffer > ) -> T {
285
344
option:: unwrap ( try_recv ( p) )
286
345
}
287
346
347
+ /** Attempts to receive a message from a pipe.
348
+
349
+ Returns `none` if the sender has closed the connection without sending
350
+ a message, or `some(T)` if a message was received.
351
+
352
+ */
288
353
fn try_recv < T : send , Tbuffer : send > ( -p : recv_packet_buffered < T , Tbuffer > )
289
354
-> option < T >
290
355
{
@@ -351,6 +416,7 @@ impl peek<T: send, Tb: send> for recv_packet_buffered<T, Tb> {
351
416
}
352
417
}
353
418
419
+ #[ doc( hidden) ]
354
420
fn sender_terminate < T : send > ( p : * packet < T > ) {
355
421
let p = unsafe { & * p } ;
356
422
alt swap_state_rel ( p. header . state , terminated) {
@@ -377,6 +443,7 @@ fn sender_terminate<T: send>(p: *packet<T>) {
377
443
}
378
444
}
379
445
446
+ #[ doc( hidden) ]
380
447
fn receiver_terminate < T : send > ( p : * packet < T > ) {
381
448
let p = unsafe { & * p } ;
382
449
alt swap_state_rel ( p. header . state , terminated) {
@@ -394,8 +461,16 @@ fn receiver_terminate<T: send>(p: *packet<T>) {
394
461
}
395
462
}
396
463
397
- #[ doc = "Returns when one of the packet headers reports data is
398
- available." ]
464
+ /** Returns when one of the packet headers reports data is available.
465
+
466
+ This function is primarily intended for building higher level waiting
467
+ functions, such as `select`, `select2`, etc.
468
+
469
+ It takes a vector slice of packet_headers and returns an index into
470
+ that vector. The index points to an endpoint that has either been
471
+ closed by the sender or has a message waiting to be received.
472
+
473
+ */
399
474
fn wait_many ( pkts : & [ * packet_header ] ) -> uint {
400
475
let this = rustrt:: rust_get_task ( ) ;
401
476
@@ -447,6 +522,34 @@ fn wait_many(pkts: &[*packet_header]) -> uint {
447
522
ready_packet
448
523
}
449
524
525
+ /** Receives a message from one of two endpoints.
526
+
527
+ The return value is `left` if the first endpoint received something,
528
+ or `right` if the second endpoint receives something. In each case,
529
+ the result includes the other endpoint as well so it can be used
530
+ again. Below is an example of using `select2`.
531
+
532
+ ~~~
533
+ match select2(a, b) {
534
+ left((none, b)) {
535
+ // endpoint a was closed.
536
+ }
537
+ right((a, none)) {
538
+ // endpoint b was closed.
539
+ }
540
+ left((some(_), b)) {
541
+ // endpoint a received a message
542
+ }
543
+ right(a, some(_)) {
544
+ // endpoint b received a message.
545
+ }
546
+ }
547
+ ~~~
548
+
549
+ Sometimes messages will be available on both endpoints at once. In
550
+ this case, `select2` may return either `left` or `right`.
551
+
552
+ */
450
553
fn select2 < A : send , Ab : send , B : send , Bb : send > (
451
554
+a : recv_packet_buffered < A , Ab > ,
452
555
+b : recv_packet_buffered < B , Bb > )
@@ -500,13 +603,16 @@ fn select<T: send, Tb: send>(+endpoints: ~[recv_packet_buffered<T, Tb>])
500
603
( ready, result, remaining)
501
604
}
502
605
606
+ /// The sending end of a pipe. It can be used to send exactly one
607
+ /// message.
503
608
type send_packet < T : send > = send_packet_buffered < T , packet < T > > ;
504
609
610
+ #[ doc( hidden) ]
505
611
fn send_packet < T : send > ( p : * packet < T > ) -> send_packet < T > {
506
612
send_packet_buffered ( p)
507
613
}
508
614
509
- class send_packet_buffered<T : send, Tbuffer : send> {
615
+ struct send_packet_buffered < T : send , Tbuffer : send > {
510
616
let mut p: option < * packet < T > > ;
511
617
let mut buffer: option < buffer_resource < Tbuffer > > ;
512
618
new ( p: * packet<T >) {
@@ -560,13 +666,16 @@ class send_packet_buffered<T: send, Tbuffer: send> {
560
666
}
561
667
}
562
668
669
+ /// Represents the receive end of a pipe. It can receive exactly one
670
+ /// message.
563
671
type recv_packet <T : send> = recv_packet_buffered<T , packet<T >>;
564
672
673
+ #[ doc( hidden) ]
565
674
fn recv_packet < T : send > ( p : * packet < T > ) -> recv_packet < T > {
566
675
recv_packet_buffered ( p)
567
676
}
568
677
569
- class recv_packet_buffered<T : send, Tbuffer : send> : selectable {
678
+ struct recv_packet_buffered < T : send , Tbuffer : send > : selectable {
570
679
let mut p: option < * packet < T > > ;
571
680
let mut buffer: option < buffer_resource < Tbuffer > > ;
572
681
new ( p: * packet<T >) {
@@ -620,6 +729,7 @@ class recv_packet_buffered<T: send, Tbuffer: send> : selectable {
620
729
}
621
730
}
622
731
732
+ #[ doc( hidden) ]
623
733
fn entangle < T : send > ( ) -> ( send_packet < T > , recv_packet < T > ) {
624
734
let p = packet ( ) ;
625
735
( send_packet ( p) , recv_packet ( p) )
@@ -686,12 +796,14 @@ trait recv<T: send> {
686
796
pure fn peek ( ) -> bool ;
687
797
}
688
798
799
+ #[ doc( hidden) ]
689
800
type chan_ < T : send > = { mut endp : option < streamp:: client:: open < T > > } ;
690
801
691
802
enum chan < T : send > {
692
803
chan_( chan_ < T > )
693
804
}
694
805
806
+ #[ doc( hidden) ]
695
807
type port_ < T : send > = { mut endp : option < streamp:: server:: open < T > > } ;
696
808
697
809
enum port < T : send > {
@@ -725,7 +837,7 @@ impl port<T: send> of recv<T> for port<T> {
725
837
fn try_recv ( ) -> option < T > {
726
838
let mut endp = none;
727
839
endp <-> self . endp ;
728
- alt move_it ( pipes:: try_recv ( unwrap ( endp) ) ) {
840
+ alt move pipes:: try_recv ( unwrap ( endp) ) {
729
841
some ( streamp:: data ( x, endp) ) {
730
842
self . endp = some ( move_it ! { endp} ) ;
731
843
some ( move_it ! { x} )
@@ -749,7 +861,7 @@ impl port<T: send> of recv<T> for port<T> {
749
861
}
750
862
751
863
// Treat a whole bunch of ports as one.
752
- class port_set<T : send> : recv<T > {
864
+ struct port_set < T : send > : recv<T > {
753
865
let mut ports : ~[ pipes:: port < T > ] ;
754
866
755
867
new ( ) { self . ports = ~[ ] ; }
@@ -770,7 +882,7 @@ class port_set<T: send> : recv<T> {
770
882
let i = wait_many ( self . ports . map ( |p| p. header ( ) ) ) ;
771
883
// dereferencing an unsafe pointer nonsense to appease the
772
884
// borrowchecker.
773
- alt move_it ( unsafe { ( * ptr:: addr_of ( self . ports [ i] ) ) . try_recv ( ) } ) {
885
+ alt move unsafe { ( * ptr:: addr_of ( self . ports [ i] ) ) . try_recv ( ) } {
774
886
some ( m) {
775
887
result = some ( move_it ! { m} ) ;
776
888
}
0 commit comments