Skip to content
This repository was archived by the owner on Feb 15, 2025. It is now read-only.

Commit fd6ae3e

Browse files
committed
Prioritise event listeners and allow for queue modifications
1 parent 5824399 commit fd6ae3e

File tree

7 files changed

+169
-47
lines changed

7 files changed

+169
-47
lines changed

src/Control/Distributed/Process/FSM.hs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
module Control.Distributed.Process.FSM where
1515

16+
import Control.Distributed.Process (wrapMessage)
1617
import Control.Distributed.Process.Extras (ExitReason)
1718
import Control.Distributed.Process.Extras.Time
1819
( TimeInterval
@@ -21,23 +22,40 @@ import Control.Distributed.Process.ManagedProcess
2122
( processState
2223
, setProcessState
2324
, runAfter
25+
, Priority
2426
)
27+
import Control.Distributed.Process.ManagedProcess.Server.Priority (setPriority)
2528
import qualified Control.Distributed.Process.ManagedProcess.Internal.Types as MP (liftIO)
2629
import Control.Distributed.Process.FSM.Internal.Types
2730
import Control.Distributed.Process.Serializable (Serializable)
2831

2932
initState :: forall s d . s -> d -> Step s d
3033
initState = Yield
3134

35+
event :: (Serializable m) => Event m
36+
event = Wait
37+
38+
pevent :: (Serializable m) => Int -> Event m
39+
pevent = WaitP . setPriority
40+
3241
enter :: forall s d . s -> FSM s d (Transition s d)
3342
enter = return . Enter
3443

44+
postpone :: forall s d . FSM s d (Transition s d)
45+
postpone = return Postpone
46+
47+
putBack :: forall s d . FSM s d (Transition s d)
48+
putBack = return PutBack
49+
50+
nextEvent :: forall s d m . (Serializable m) => m -> FSM s d (Transition s d)
51+
nextEvent m = return $ Push (wrapMessage m)
52+
53+
publishEvent :: forall s d m . (Serializable m) => m -> FSM s d (Transition s d)
54+
publishEvent m = return $ Enqueue (wrapMessage m)
55+
3556
resume :: forall s d . FSM s d (Transition s d)
3657
resume = return Remain
3758

38-
event :: (Serializable m) => Event m
39-
event = Wait
40-
4159
reply :: forall s d r . (Serializable r) => FSM s d r -> Step s d
4260
reply = Reply
4361

@@ -105,6 +123,9 @@ infixr 9 ~@
105123
atState :: forall s d . (Eq s) => s -> FSM s d (Transition s d) -> Step s d
106124
atState = Perhaps
107125

126+
whenStateIs :: forall s d . (Eq s) => s -> Step s d
127+
whenStateIs s = s ~@ resume
128+
108129
allState :: forall s d m . (Serializable m) => (m -> FSM s d (Transition s d)) -> Step s d
109130
allState = Always
110131

src/Control/Distributed/Process/FSM/Internal/Process.hs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import Control.Distributed.Process
2121
, sendChan
2222
, spawnLocal
2323
, liftIO
24+
, handleMessage
2425
)
2526
import qualified Control.Distributed.Process as P
2627
( Message
@@ -38,14 +39,17 @@ import Control.Distributed.Process.ManagedProcess
3839
, prioritised
3940
)
4041
import qualified Control.Distributed.Process.ManagedProcess as MP (pserve)
41-
import Control.Distributed.Process.ManagedProcess.Server.Priority (safely)
42+
import Control.Distributed.Process.ManagedProcess.Server.Priority
43+
( safely
44+
)
4245
import Control.Distributed.Process.ManagedProcess.Server
4346
( handleRaw
4447
, handleInfo
4548
, continue
4649
)
4750
import Control.Distributed.Process.ManagedProcess.Internal.Types
4851
( ExitSignalDispatcher(..)
52+
, DispatchPriority(PrioritiseInfo)
4953
)
5054
import Data.Maybe (isJust)
5155
import qualified Data.Sequence as Q (empty)
@@ -55,15 +59,16 @@ import qualified Data.Sequence as Q (empty)
5559
-- import Data.Typeable (Typeable)
5660
-- import GHC.Generics
5761

58-
start :: forall s d . (Show s) => s -> d -> (Step s d) -> Process ProcessId
62+
start :: forall s d . (Show s, Eq s) => s -> d -> (Step s d) -> Process ProcessId
5963
start s d p = spawnLocal $ run s d p
6064

61-
run :: forall s d . (Show s) => s -> d -> (Step s d) -> Process ()
65+
run :: forall s d . (Show s, Eq s) => s -> d -> (Step s d) -> Process ()
6266
run s d p = MP.pserve (s, d, p) fsmInit (processDefinition p)
6367

64-
fsmInit :: forall s d . (Show s) => InitHandler (s, d, Step s d) (State s d)
68+
fsmInit :: forall s d . (Show s, Eq s) => InitHandler (s, d, Step s d) (State s d)
6569
fsmInit (st, sd, prog) =
66-
return $ InitOk (State st sd prog prog Nothing (const $ return ()) Q.empty) Infinity
70+
let st' = State st sd prog Nothing (const $ return ()) Q.empty Q.empty
71+
in return $ InitOk st' Infinity
6772

6873
processDefinition :: forall s d . (Show s) => Step s d -> PrioritisedProcessDefinition (State s d)
6974
processDefinition prog =
@@ -75,10 +80,12 @@ processDefinition prog =
7580
]
7681
, exitHandlers = [ ExitSignalDispatcher (\s _ m -> handleAllRawInputs s m >>= return . Just)
7782
]
78-
} []) { filters = (walkFSM prog []) }
83+
} (walkPFSM prog [])) { filters = (walkFSM prog []) }
7984

80-
-- we should probably make a Foldable (Step s d) for this
81-
walkFSM :: forall s d . Step s d -> [DispatchFilter (State s d)] -> [DispatchFilter (State s d)]
85+
-- we should probably make a Foldable (Step s d) for these
86+
walkFSM :: forall s d . Step s d
87+
-> [DispatchFilter (State s d)]
88+
-> [DispatchFilter (State s d)]
8289
walkFSM st acc
8390
| SafeWait evt act <- st = walkFSM act $ safely (\_ m -> isJust $ decodeToEvent evt m) : acc
8491
| Await _ act <- st = walkFSM act acc
@@ -87,6 +94,20 @@ walkFSM st acc
8794
| Alternate ac1 ac2 <- st = walkFSM ac1 $ walkFSM ac2 acc -- both branches need filter defs
8895
| otherwise = acc
8996

97+
walkPFSM :: forall s d . Step s d
98+
-> [DispatchPriority (State s d)]
99+
-> [DispatchPriority (State s d)]
100+
walkPFSM st acc
101+
| SafeWait evt act <- st = walkPFSM act (checkPrio evt acc)
102+
| Await evt act <- st = walkPFSM act (checkPrio evt acc)
103+
| Sequence ac1 ac2 <- st = walkPFSM ac1 $ walkPFSM ac2 acc
104+
| Init ac1 ac2 <- st = walkPFSM ac1 $ walkPFSM ac2 acc
105+
| Alternate ac1 ac2 <- st = walkPFSM ac1 $ walkPFSM ac2 acc -- both branches need filter defs
106+
| otherwise = acc
107+
where
108+
checkPrio ev acc = (mkPrio ev):acc
109+
mkPrio ev' = PrioritiseInfo $ \s m -> handleMessage m (resolveEvent ev' m s)
110+
90111
handleRpcRawInputs :: forall s d . (Show s) => State s d
91112
-> (P.Message, SendPort P.Message)
92113
-> Action (State s d)
@@ -108,7 +129,7 @@ handleInput :: forall s d . (Show s)
108129
-> Action (State s d)
109130
handleInput msg st@State{..} = do
110131
liftIO $ putStrLn $ "handleInput: " ++ (show stName)
111-
liftIO $ putStrLn $ "apply " ++ (show stProg)
132+
liftIO $ putStrLn $ "apply: " ++ (show stProg)
112133
res <- apply st msg stProg
113134
liftIO $ putStrLn $ "got a result: " ++ (show res)
114135
case res of

src/Control/Distributed/Process/FSM/Internal/Types.hs

Lines changed: 57 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,15 @@ import Control.Distributed.Process.ManagedProcess
3535
, setProcessState
3636
, processState
3737
)
38-
import qualified Control.Distributed.Process.ManagedProcess.Internal.Types as MP (lift, liftIO)
39-
import Control.Distributed.Process.ManagedProcess.Server.Priority
40-
( push
41-
, act
38+
import qualified Control.Distributed.Process.ManagedProcess.Internal.GenProcess as Gen (enqueue, push)
39+
import Control.Distributed.Process.ManagedProcess.Internal.Types
40+
( Priority(..)
4241
)
42+
import qualified Control.Distributed.Process.ManagedProcess.Internal.Types as MP
43+
( lift
44+
, liftIO
45+
)
46+
import Control.Distributed.Process.ManagedProcess.Server.Priority (act)
4347
import Control.Distributed.Process.Serializable (Serializable)
4448
import Control.Monad.Fix (MonadFix)
4549
import Control.Monad.IO.Class (MonadIO)
@@ -65,13 +69,14 @@ import Data.Typeable (Typeable, typeOf)
6569
import Data.Tuple (swap, uncurry)
6670
-- import GHC.Generics
6771

68-
data State s d = State { stName :: s
72+
data State s d = (Show s, Eq s) =>
73+
State { stName :: s
6974
, stData :: d
7075
, stProg :: Step s d -- original program
71-
, stInstr :: Step s d -- current step in the program
7276
, stInput :: Maybe P.Message
7377
, stReply :: (P.Message -> Process ())
7478
, stTrans :: Seq (Transition s d)
79+
, stQueue :: Seq P.Message
7580
}
7681

7782
instance forall s d . (Show s) => Show (State s d) where
@@ -80,24 +85,42 @@ instance forall s d . (Show s) => Show (State s d) where
8085

8186
data Transition s d = Remain
8287
| PutBack
88+
| Push P.Message
89+
| Enqueue P.Message
90+
| Postpone
8391
| Enter s
8492
| Stop ExitReason
8593
| Eval (GenProcess (State s d) ())
8694

8795
instance forall s d . (Show s) => Show (Transition s d) where
8896
show Remain = "Remain"
8997
show PutBack = "PutBack"
98+
show Postpone = "Postpone"
99+
show (Push m) = "Push " ++ (show m)
100+
show (Enqueue m) = "Enqueue " ++ (show m)
90101
show (Enter s) = "Enter " ++ (show s)
91102
show (Stop er) = "Stop " ++ (show er)
92103
show (Eval _) = "Eval"
93104

94105
data Event m where
95-
Wait :: (Serializable m) => Event m
96-
Event :: (Serializable m) => m -> Event m
106+
Wait :: (Serializable m) => Event m
107+
WaitP :: (Serializable m) => Priority () -> Event m
108+
Event :: (Serializable m) => m -> Event m
109+
110+
resolveEvent :: forall s d m . (Serializable m)
111+
=> Event m
112+
-> P.Message
113+
-> State s d
114+
-> m
115+
-> Process (Int, P.Message)
116+
resolveEvent ev m _ _
117+
| WaitP p <- ev = return (getPrio p, m)
118+
| otherwise = return (0, m)
97119

98120
instance forall m . (Typeable m) => Show (Event m) where
99-
show ev@Wait = show $ "Wait::" ++ (show $ typeOf ev)
100-
show ev = show $ typeOf ev
121+
show ev@Wait = show $ "Wait::" ++ (show $ typeOf ev)
122+
show ev@(WaitP _) = show $ "WaitP::" ++ (show $ typeOf ev)
123+
show ev = show $ typeOf ev
101124

102125
data Step s d where
103126
Init :: Step s d -> Step s d -> Step s d
@@ -171,9 +194,13 @@ seqEnqueue s a = a <| s
171194
seqPush :: Seq a -> a -> Seq a
172195
seqPush s a = s |> a
173196

197+
{-# INLINE seqPop #-}
198+
seqPop :: Seq a -> Maybe (a, Seq a)
199+
seqPop s = maybe Nothing (\(s' :> a) -> Just (a, s')) $ getR s
200+
174201
{-# INLINE seqDequeue #-}
175202
seqDequeue :: Seq a -> Maybe (a, Seq a)
176-
seqDequeue s = maybe Nothing (\(s' :> a) -> Just (a, s')) $ getR s
203+
seqDequeue = seqPop
177204

178205
{-# INLINE peek #-}
179206
peek :: Seq a -> Maybe a
@@ -198,7 +225,7 @@ apply st msg step
198225
P.liftIO $ putStrLn "Init _ _"
199226
st' <- apply st msg is
200227
case st' of
201-
Just s -> apply (s { stProg = ns, stInstr = ns }) msg ns
228+
Just s -> apply (s { stProg = ns }) msg ns
202229
Nothing -> die $ ExitOther $ baseErr ++ ":InitFailed"
203230
| Yield sn sd <- step = do
204231
P.liftIO $ putStrLn "Yield s d"
@@ -255,28 +282,38 @@ applyTransitions st@State{..} evals
255282
MP.liftIO $ putStrLn $ "ProcessState: " ++ (show stName)
256283
mapM_ id evals
257284
| (tr, st2) <- next
258-
, Enter s <- tr = let st' = st2 { stName = s }
259-
in do P.liftIO $ putStrLn $ "NEWSTATE: " ++ (show st')
260-
applyTransitions st' evals
285+
, PutBack <- tr = applyTransitions st2 ((Gen.enqueue $ fromJust stInput) : evals)
286+
| isJust stInput
287+
, input <- fromJust stInput
288+
, (tr, st2) <- next
289+
, Postpone <- tr = applyTransitions (st2 { stQueue = seqEnqueue stQueue input }) evals
261290
| (tr, st2) <- next
262-
, PutBack <- tr = applyTransitions st2 ((push $ fromJust stInput) : evals)
263-
{- let act' = setProcessState $ fsmSt { fsmName = stName, fsmData = stData }
264-
push stInput -}
291+
, Enqueue m <- tr = applyTransitions st2 ((Gen.enqueue m):evals)
292+
| (tr, st2) <- next
293+
, Push m <- tr = applyTransitions st2 ((Gen.push m):evals)
265294
| (tr, st2) <- next
266295
, Eval proc <- tr = applyTransitions st2 (proc:evals)
267296
| (tr, st2) <- next
268297
, Remain <- tr = applyTransitions st2 evals
269298
| (tr, _) <- next
270299
, Stop er <- tr = stopWith st er
271-
| otherwise = error $ baseErr ++ ".Internal.Process.applyTransitions:InvalidState"
300+
| (tr, st2) <- next
301+
, Enter s <- tr =
302+
if s == stName then applyTransitions st2 evals
303+
else do let st' = st2 { stName = s }
304+
let evals' = if Q.null stQueue then evals
305+
else (mapM_ Gen.push stQueue) : evals
306+
applyTransitions st' evals'
307+
| otherwise = error $ baseErr ++ ".Internal.Process.applyTransitions:InvalidTransition"
272308
where
273309
-- don't call if Q.null!
274-
next = let (t, q) = fromJust $ seqDequeue stTrans
310+
next = let (t, q) = fromJust $ seqPop stTrans
275311
in (t, st { stTrans = q })
276312

277313
baseErr :: String
278314
baseErr = "Control.Distributed.Process.FSM"
279315

280316
decodeToEvent :: Serializable m => Event m -> P.Message -> Maybe (Event m)
281317
decodeToEvent Wait msg = unwrapMessage msg >>= fmap Event
318+
decodeToEvent (WaitP _) msg = unwrapMessage msg >>= fmap Event
282319
decodeToEvent ev@(Event _) _ = Just ev -- it's a bit odd that we'd end up here....

stack-ghc-7.10.3.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ packages:
44
- '.'
55
- location:
66
git: https://github.com/haskell-distributed/distributed-process-client-server.git
7-
commit: 5589d9ef5a50b86d489797f78c0118f75e53659e
7+
commit: ceaf6b48a6cc90e2bc726025ce3a28b40899440c
88
extra-dep: true
99

1010
extra-deps:

stack-ghc-8.0.1.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ packages:
44
- '.'
55
- location:
66
git: https://github.com/haskell-distributed/distributed-process-client-server.git
7-
commit: 5589d9ef5a50b86d489797f78c0118f75e53659e
7+
commit: ceaf6b48a6cc90e2bc726025ce3a28b40899440c
88
extra-dep: true
99

1010
extra-deps:

stack.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ packages:
44
- '.'
55
- location:
66
git: https://github.com/haskell-distributed/distributed-process-client-server.git
7-
commit: 5589d9ef5a50b86d489797f78c0118f75e53659e
7+
commit: ceaf6b48a6cc90e2bc726025ce3a28b40899440c
88
extra-dep: true
99

1010
extra-deps:

0 commit comments

Comments
 (0)