Skip to content

Commit 5ad4d24

Browse files
pzhan9facebook-github-bot
authored andcommitted
Separate enqueue when reducer is none
Summary: When reducer is none, the split port's lambda is doing a simply forward. This diff separate this scenario into its own lambda so the code is easier to read and maintain. Reviewed By: mariusae Differential Revision: D75090318 fbshipit-source-id: 532182da291eebaa733e0c11f05816fcc0720a67
1 parent d3c37fb commit 5ad4d24

File tree

1 file changed

+39
-30
lines changed

1 file changed

+39
-30
lines changed

hyperactor/src/mailbox/mod.rs

Lines changed: 39 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1279,45 +1279,54 @@ impl cap::sealed::CanOpenPort for Mailbox {
12791279

12801280
impl cap::sealed::CanSplitPort for Mailbox {
12811281
fn split(&self, port_id: PortId, reducer_typehash: Option<u64>) -> PortId {
1282+
fn post(mailbox: &Mailbox, port_id: PortId, msg: Serialized) {
1283+
mailbox.post(
1284+
MessageEnvelope::new(mailbox.actor_id().clone(), port_id, msg),
1285+
// TODO(pzhang) figure out how to use upstream's return handle,
1286+
// instead of getting a new one like this.
1287+
// This is okay for now because upstream is currently also using
1288+
// the same handle singleton, but that could change in the future.
1289+
monitored_return_handle(),
1290+
);
1291+
}
1292+
12821293
let port_index = self.state.allocate_port();
12831294
let split_port = self.actor_id().port_id(port_index);
12841295
let mailbox = self.clone();
1285-
let buffer = Arc::new(Mutex::new(Vec::<Serialized>::new()));
12861296
let reducer = reducer_typehash.and_then(accum::resolve_reducer);
1287-
let enqueue = move |serialized: Serialized| {
1288-
// Hold the lock until messages are sent. This is to avoid another
1289-
// invocation of this method trying to send message concurrently and
1290-
// cause messages delivered out of order.
1291-
let mut buf = buffer.lock().unwrap();
1292-
buf.push(serialized);
1293-
// TODO(pzhang) add policy and use this buffer
1294-
let buffered = std::mem::take(&mut *buf);
1295-
let reduced = match &reducer {
1296-
Some(r) => vec![r.reduce_updates(buffered).map_err(|(e, mut b)| {
1297-
(
1298-
b.pop()
1299-
.expect("there should be at least one update from buffer"),
1300-
e,
1301-
)
1302-
})?],
1303-
None => buffered,
1304-
};
1305-
for msg in reduced {
1306-
mailbox.post(
1307-
MessageEnvelope::new(mailbox.actor_id().clone(), port_id.clone(), msg),
1308-
// TODO(pzhang) figure out how to use upstream's return handle,
1309-
// instead of getting a new one like this.
1310-
// This is okay for now because upstream is currently also using
1311-
// the same handle singleton, but that could change in the future.
1312-
monitored_return_handle(),
1313-
);
1297+
let enqueue: Box<
1298+
dyn Fn(Serialized) -> Result<(), (Serialized, anyhow::Error)> + Send + Sync,
1299+
> = match reducer {
1300+
None => Box::new(move |serialized: Serialized| {
1301+
post(&mailbox, port_id.clone(), serialized);
1302+
Ok(())
1303+
}),
1304+
Some(r) => {
1305+
let buffer = Arc::new(Mutex::new(Vec::<Serialized>::new()));
1306+
Box::new(move |serialized: Serialized| {
1307+
// Hold the lock until messages are sent. This is to avoid another
1308+
// invocation of this method trying to send message concurrently and
1309+
// cause messages delivered out of order.
1310+
let mut buf = buffer.lock().unwrap();
1311+
buf.push(serialized);
1312+
// TODO(pzhang) add policy and use this buffer
1313+
let buffered = std::mem::take(&mut *buf);
1314+
let reduced = r.reduce_updates(buffered).map_err(|(e, mut b)| {
1315+
(
1316+
b.pop()
1317+
.expect("there should be at least one update from buffer"),
1318+
e,
1319+
)
1320+
})?;
1321+
post(&mailbox, port_id.clone(), reduced);
1322+
Ok(())
1323+
})
13141324
}
1315-
Ok(())
13161325
};
13171326
self.bind_untyped(
13181327
&split_port,
13191328
UntypedUnboundedSender {
1320-
sender: Box::new(enqueue),
1329+
sender: enqueue,
13211330
port_id: split_port.clone(),
13221331
},
13231332
);

0 commit comments

Comments
 (0)