@@ -17,34 +17,49 @@ import std::option::none;
17
17
import std:: str;
18
18
import std:: vec;
19
19
import std:: map;
20
+ import std:: ivec;
21
+
22
+ import std:: time;
23
+ import std:: u64;
24
+
25
+ import std:: task;
26
+ import clone = std:: task:: clone_chan;
20
27
21
28
fn map ( str filename , map_reduce:: putter emit) {
29
+ // log_err "mapping " + filename;
22
30
auto f = io:: file_reader ( filename) ;
23
31
24
32
while ( true ) {
25
33
alt ( read_word ( f) ) {
26
34
case ( some ( ?w) ) {
27
- emit ( w, "1" ) ;
35
+ emit ( w, 1 ) ;
28
36
}
29
37
case ( none) {
30
38
break ;
31
39
}
32
40
}
33
41
}
42
+ // log_err "done mapping " + filename;
34
43
}
35
44
36
45
fn reduce ( str word , map_reduce:: getter get) {
46
+ // log_err "reducing " + word;
37
47
auto count = 0 ;
38
48
39
49
while ( true ) {
40
50
alt ( get ( ) ) {
41
- case ( some ( _) ) { count += 1 }
42
- case ( none) { break }
51
+ some ( _) {
52
+ // log_err "received word " + word;
53
+ count += 1 ;
54
+ }
55
+ none { break }
43
56
}
44
57
}
45
58
46
- auto out = io:: stdout ( ) ;
47
- out. write_line ( #fmt ( "%s: %d" , word, count) ) ;
59
+ // auto out = io::stdout();
60
+ // out.write_line(#fmt("%s: %d", word, count));
61
+
62
+ // log_err "reduce " + word + " done.";
48
63
}
49
64
50
65
mod map_reduce {
@@ -54,74 +69,115 @@ mod map_reduce {
54
69
export reducer;
55
70
export map_reduce;
56
71
57
- type putter = fn ( str , str ) -> ( ) ;
72
+ type putter = fn ( str , int ) -> ( ) ;
58
73
59
74
type mapper = fn ( str , putter ) ;
60
75
61
- type getter = fn ( ) -> option[ str ] ;
76
+ type getter = fn ( ) -> option[ int ] ;
62
77
63
78
type reducer = fn ( str , getter ) ;
64
79
65
80
tag ctrl_proto {
66
- find_reducer( str , chan[ chan[ reduce_proto] ] ) ;
81
+ find_reducer( u8 [ ] , chan[ chan[ reduce_proto] ] ) ;
67
82
mapper_done;
68
83
}
69
84
70
85
tag reduce_proto {
71
- emit_val( str ) ;
86
+ emit_val( int ) ;
72
87
done;
88
+ ref;
89
+ release;
73
90
}
74
91
75
92
fn start_mappers ( chan[ ctrl_proto] ctrl ,
76
- vec[ str] inputs ) {
93
+ vec[ str] inputs ) -> task [ ] {
94
+ auto tasks = ~[ ] ;
95
+ // log_err "starting mappers";
77
96
for ( str i in inputs) {
78
- spawn map_task( ctrl, i) ;
97
+ // log_err "starting mapper for " + i;
98
+ tasks += ~[ spawn map_task ( ctrl, i) ] ;
79
99
}
100
+ // log_err "done starting mappers";
101
+ ret tasks;
80
102
}
81
103
82
104
fn map_task ( chan[ ctrl_proto] ctrl ,
83
105
str input ) {
84
-
106
+ // log_err "map_task " + input;
85
107
auto intermediates = map:: new_str_hash ( ) ;
86
108
87
109
fn emit ( & map:: hashmap[ str, chan[ reduce_proto] ] im ,
88
110
chan[ ctrl_proto] ctrl ,
89
- str key, str val) {
111
+ str key, int val) {
112
+ // log_err "emitting " + key;
90
113
auto c;
91
114
alt ( im. find ( key) ) {
92
- case ( some ( ?_c) ) {
115
+ some ( ?_c) {
116
+ // log_err "reusing saved channel for " + key;
93
117
c = _c
94
118
}
95
- case ( none) {
119
+ none {
120
+ // log_err "fetching new channel for " + key;
96
121
auto p = port[ chan[ reduce_proto] ] ( ) ;
97
- ctrl <| find_reducer( key, chan( p) ) ;
122
+ auto keyi = str:: bytes_ivec ( key) ;
123
+ ctrl <| find_reducer( keyi, chan( p) ) ;
98
124
p |> c;
99
- im. insert ( key, c) ;
125
+ im. insert ( key, clone ( c) ) ;
126
+ c <| ref ;
100
127
}
101
128
}
102
129
c <| emit_val( val) ;
103
130
}
104
131
105
132
map ( input, bind emit ( intermediates, ctrl, _, _) ) ;
133
+
134
+ for each( @tup( str, chan[ reduce_proto] ) kv in intermediates. items ( ) ) {
135
+ // log_err "sending done to reducer for " + kv._0;
136
+ kv. _1 <| release;
137
+ }
138
+
106
139
ctrl <| mapper_done;
140
+
141
+ // log_err "~map_task " + input;
107
142
}
108
143
109
144
fn reduce_task( str key, chan[ chan[ reduce_proto] ] out) {
145
+ // log_err "reduce_task " + key;
110
146
auto p = port ( ) ;
111
147
112
148
out <| chan( p) ;
113
149
114
- fn get ( port[ reduce_proto] p ) -> option[ str ] {
115
- auto m;
116
- p |> m;
150
+ auto ref_count = 0 ;
151
+ auto is_done = false ;
117
152
118
- alt ( m) {
119
- case ( emit_val ( ?v) ) { ret some ( v) ; }
120
- case ( done) { ret none; }
153
+ fn get ( & port[ reduce_proto] p , & mutable int ref_count ,
154
+ & mutable bool is_done) -> option[ int ] {
155
+ while ( !is_done || ref_count > 0 ) {
156
+ auto m;
157
+ p |> m;
158
+
159
+ alt ( m) {
160
+ emit_val ( ?v) {
161
+ // log_err #fmt("received %d", v);
162
+ ret some ( v) ;
163
+ }
164
+ done {
165
+ // log_err "all done";
166
+ is_done = true ;
167
+ }
168
+ ref {
169
+ ref_count += 1 ;
170
+ }
171
+ release {
172
+ ref_count -= 1 ;
173
+ }
174
+ }
121
175
}
176
+ ret none;
122
177
}
123
178
124
- reduce ( key, bind get( p) ) ;
179
+ reduce ( key, bind get( p, ref_count, is_done) ) ;
180
+ // log_err "~reduce_task " + key;
125
181
}
126
182
127
183
fn map_reduce ( vec[ str] inputs ) {
@@ -134,7 +190,7 @@ mod map_reduce {
134
190
135
191
reducers = map:: new_str_hash ( ) ;
136
192
137
- start_mappers ( chan ( ctrl) , inputs) ;
193
+ auto tasks = start_mappers ( chan ( ctrl) , inputs) ;
138
194
139
195
auto num_mappers = vec:: len ( inputs) as int ;
140
196
@@ -143,26 +199,42 @@ mod map_reduce {
143
199
ctrl |> m;
144
200
145
201
alt ( m) {
146
- case ( mapper_done) { num_mappers -= 1 ; }
147
- case ( find_reducer ( ?k, ?cc) ) {
202
+ mapper_done {
203
+ // log_err "received mapper terminated.";
204
+ num_mappers -= 1 ;
205
+ }
206
+ find_reducer ( ?ki, ?cc) {
148
207
auto c;
208
+ auto k = str:: unsafe_from_bytes_ivec ( ki) ;
209
+ // log_err "finding reducer for " + k;
149
210
alt ( reducers. find ( k) ) {
150
- case ( some ( ?_c) ) { c = _c; }
151
- case ( none) {
211
+ some ( ?_c) {
212
+ // log_err "reusing existing reducer for " + k;
213
+ c = _c;
214
+ }
215
+ none {
216
+ // log_err "creating new reducer for " + k;
152
217
auto p = port ( ) ;
153
- spawn reduce_task ( k, chan ( p) ) ;
218
+ tasks += ~ [ spawn reduce_task ( k, chan ( p) ) ] ;
154
219
p |> c;
155
220
reducers. insert ( k, c) ;
156
221
}
157
222
}
158
- cc <| c ;
223
+ cc <| clone ( c ) ;
159
224
}
160
225
}
161
226
}
162
227
163
228
for each( @tup( str, chan[ reduce_proto] ) kv in reducers. items ( ) ) {
229
+ // log_err "sending done to reducer for " + kv._0;
164
230
kv. _1 <| done;
165
231
}
232
+
233
+ // log_err #fmt("joining %u tasks", ivec::len(tasks));
234
+ for ( task t in tasks) {
235
+ task:: join ( t) ;
236
+ }
237
+ // log_err "control task done.";
166
238
}
167
239
}
168
240
@@ -174,7 +246,14 @@ fn main(vec[str] argv) {
174
246
fail;
175
247
}
176
248
249
+ auto start = time:: precise_time_ns ( ) ;
177
250
map_reduce:: map_reduce ( vec:: slice ( argv, 1 u, vec:: len ( argv) ) ) ;
251
+ auto stop = time:: precise_time_ns ( ) ;
252
+
253
+ auto elapsed = stop - start;
254
+ elapsed /= 1000000u64 ;
255
+
256
+ log_err "MapReduce completed in " + u64:: str ( elapsed) + "ms" ;
178
257
}
179
258
180
259
fn read_word ( io:: reader r) -> option[ str ] {
0 commit comments