@@ -18,6 +18,35 @@ import std::str;
18
18
import std:: vec;
19
19
import std:: map;
20
20
21
+ fn map ( str filename , map_reduce:: putter emit) {
22
+ auto f = io:: file_reader ( filename) ;
23
+
24
+ while ( true ) {
25
+ alt ( read_word ( f) ) {
26
+ case ( some ( ?w) ) {
27
+ emit ( w, "1" ) ;
28
+ }
29
+ case ( none) {
30
+ break ;
31
+ }
32
+ }
33
+ }
34
+ }
35
+
36
+ fn reduce ( str word , map_reduce:: getter get) {
37
+ auto count = 0 ;
38
+
39
+ while ( true ) {
40
+ alt ( get ( ) ) {
41
+ case ( some ( _) ) { count += 1 }
42
+ case ( none) { break }
43
+ }
44
+ }
45
+
46
+ auto out = io:: stdout ( ) ;
47
+ out. write_line ( #fmt ( "%s: %d" , word, count) ) ;
48
+ }
49
+
21
50
mod map_reduce {
22
51
export putter;
23
52
export getter;
@@ -33,84 +62,119 @@ mod map_reduce {
33
62
34
63
type reducer = fn ( str , getter ) ;
35
64
65
+ tag ctrl_proto {
66
+ find_reducer( str , chan[ chan[ reduce_proto] ] ) ;
67
+ mapper_done;
68
+ }
69
+
70
+ tag reduce_proto {
71
+ emit_val( str) ;
72
+ done;
73
+ }
74
+
75
+ fn start_mappers ( chan[ ctrl_proto] ctrl ,
76
+ vec[ str] inputs ) {
77
+ for ( str i in inputs) {
78
+ spawn map_task( ctrl, i) ;
79
+ }
80
+ }
81
+
82
+ fn map_task ( chan[ ctrl_proto] ctrl ,
83
+ str input ) {
36
84
37
- fn map_reduce ( vec[ str] inputs ,
38
- mapper f,
39
- reducer reduce) {
40
- auto intermediates = map:: new_str_hash[ vec[ str] ] ( ) ;
85
+ auto intermediates = map:: new_str_hash ( ) ;
41
86
42
- fn emit ( & map:: hashmap[ str, vec[ str] ] im ,
87
+ fn emit ( & map:: hashmap[ str, chan[ reduce_proto] ] im ,
88
+ chan[ ctrl_proto] ctrl ,
43
89
str key, str val) {
44
- auto old = [ ] ;
45
- alt ( im. remove ( key) ) {
46
- case ( some ( ?v) ) {
47
- old = v;
90
+ auto c;
91
+ alt ( im. find ( key) ) {
92
+ case ( some ( ?_c) ) {
93
+ c = _c
94
+ }
95
+ case ( none) {
96
+ auto p = port[ chan[ reduce_proto] ] ( ) ;
97
+ ctrl <| find_reducer( key, chan( p) ) ;
98
+ p |> c;
99
+ im. insert ( key, c) ;
48
100
}
49
- case ( none) { }
50
101
}
51
-
52
- im. insert ( key, old + [ val] ) ;
102
+ c <| emit_val( val) ;
53
103
}
54
104
55
- for ( str i in inputs ) {
56
- f ( i , bind emit ( intermediates , _ , _ ) ) ;
57
- }
105
+ map ( input , bind emit ( intermediates , ctrl , _ , _ ) ) ;
106
+ ctrl <| mapper_done ;
107
+ }
58
108
59
- fn get ( vec[ str] vals , & mutable uint i) -> option[ str ] {
60
- i += 1 u;
61
- if ( i <= vec:: len ( vals) ) {
62
- some ( vals. ( i - 1 u) )
63
- }
64
- else {
65
- none
109
+ fn reduce_task ( str key, chan[ chan[ reduce_proto] ] out ) {
110
+ auto p = port ( ) ;
111
+
112
+ out <| chan( p) ;
113
+
114
+ fn get ( port[ reduce_proto] p ) -> option[ str ] {
115
+ auto m;
116
+ p |> m;
117
+
118
+ alt ( m) {
119
+ case ( emit_val ( ?v) ) { ret some ( v) ; }
120
+ case ( done) { ret none; }
66
121
}
67
122
}
68
123
69
- for each ( @tup( str , vec[ str ] ) kv in intermediates. items( ) ) {
70
- auto i = 0 u;
71
- reduce( kv. _0, bind get( kv. _1, i) ) ;
72
- }
124
+ reduce ( key, bind get( p) ) ;
73
125
}
74
- }
75
126
76
- fn main ( vec[ str] argv ) {
77
- if ( vec:: len ( argv) < 2 u) {
78
- auto out = io:: stdout ( ) ;
127
+ fn map_reduce ( vec[ str] inputs ) {
128
+ auto ctrl = port[ ctrl_proto] ( ) ;
79
129
80
- out. write_line ( #fmt ( "Usage: %s <filename> ..." , argv. ( 0 ) ) ) ;
81
- fail;
82
- }
130
+ // This task becomes the master control task. It spawns others
131
+ // to do the rest.
83
132
84
- fn map ( str filename , map_reduce:: putter emit) {
85
- auto f = io:: file_reader ( filename) ;
133
+ let map:: hashmap[ str, chan[ reduce_proto] ] reducers;
86
134
87
- while ( true ) {
88
- alt ( read_word ( f) ) {
89
- case ( some ( ?w) ) {
90
- emit ( w, "1" ) ;
91
- }
92
- case ( none) {
93
- break ;
135
+ reducers = map:: new_str_hash ( ) ;
136
+
137
+ start_mappers ( chan ( ctrl) , inputs) ;
138
+
139
+ auto num_mappers = vec:: len ( inputs) as int ;
140
+
141
+ while ( num_mappers > 0 ) {
142
+ auto m;
143
+ ctrl |> m;
144
+
145
+ alt ( m) {
146
+ case ( mapper_done) { num_mappers -= 1 ; }
147
+ case ( find_reducer ( ?k, ?cc) ) {
148
+ auto c;
149
+ alt ( reducers. find ( k) ) {
150
+ case ( some ( ?_c) ) { c = _c; }
151
+ case ( none) {
152
+ auto p = port ( ) ;
153
+ spawn reduce_task ( k, chan ( p) ) ;
154
+ p |> c;
155
+ reducers. insert ( k, c) ;
156
+ }
157
+ }
158
+ cc <| c;
94
159
}
95
160
}
96
161
}
97
- }
98
162
99
- fn reduce ( str word , map_reduce:: getter get) {
100
- auto count = 0 ;
101
-
102
- while ( true ) {
103
- alt ( get ( ) ) {
104
- case ( some ( _) ) { count += 1 }
105
- case ( none) { break }
106
- }
163
+ for each( @tup( str, chan[ reduce_proto] ) kv in reducers. items ( ) ) {
164
+ kv. _1 <| done;
107
165
}
166
+ }
167
+ }
108
168
169
+ fn main ( vec[ str] argv ) {
170
+ if ( vec:: len ( argv) < 2 u) {
109
171
auto out = io:: stdout ( ) ;
110
- out. write_line ( #fmt ( "%s: %d" , word, count) ) ;
172
+
173
+ out. write_line ( #fmt ( "Usage: %s <filename> ..." , argv. ( 0 ) ) ) ;
174
+ fail;
111
175
}
112
176
113
- map_reduce:: map_reduce ( vec:: slice ( argv, 1 u, vec:: len ( argv) ) , map , reduce ) ;
177
+ map_reduce:: map_reduce ( vec:: slice ( argv, 1 u, vec:: len ( argv) ) ) ;
114
178
}
115
179
116
180
fn read_word ( io:: reader r) -> option[ str ] {
0 commit comments