3
3
<https://github.com/input-output-hk/cardano-shell/blob/develop/specs/CardanoShellSpec.pdf>
4
4
-}
5
5
6
- {-# LANGUAGE DeriveGeneric #-}
7
- {-# LANGUAGE LambdaCase #-}
8
- {-# LANGUAGE RankNTypes #-}
9
- {-# LANGUAGE ScopedTypeVariables #-}
6
+ {-# LANGUAGE DeriveGeneric #-}
7
+ {-# LANGUAGE LambdaCase #-}
8
+ {-# LANGUAGE RankNTypes #-}
9
+ {-# LANGUAGE ScopedTypeVariables #-}
10
10
11
11
module Cardano.Shell.NodeIPC.Lib
12
12
( startNodeJsIPC
13
13
, startIPC
14
14
, Port (.. )
15
15
, ProtocolDuration (.. )
16
+ , handleIPCProtocol
17
+ , clientIPCListener
18
+ , testStartNodeIPC
19
+ , ServerHandles (.. )
20
+ , ClientHandles (.. )
21
+ , closeFullDuplexAnonPipesHandles
22
+ , createFullDuplexAnonPipesHandles
23
+ , bracketFullDuplexAnonPipesHandles
24
+ , serverReadWrite
16
25
-- * Testing
17
26
, getIPCHandle
18
27
, MsgIn (.. )
@@ -38,10 +47,14 @@ import Data.Aeson.Types (Options, SumEncoding (ObjectWithSingleField),
38
47
sumEncoding )
39
48
import GHC.IO.Handle (hIsOpen , hIsReadable , hIsWritable )
40
49
import GHC.IO.Handle.FD (fdToHandle )
50
+
41
51
import System.Environment (lookupEnv )
42
- import System.IO (hClose , hFlush , hSetNewlineMode ,
43
- noNewlineTranslation )
52
+ import System.Process (createPipe )
53
+
54
+ import System.IO (BufferMode (.. ), hClose , hFlush , hSetBuffering ,
55
+ hSetNewlineMode , noNewlineTranslation )
44
56
import System.IO.Error (IOError , isEOFError )
57
+
45
58
import Test.QuickCheck (Arbitrary (.. ), Gen , arbitraryASCIIChar ,
46
59
choose , elements , listOf1 )
47
60
@@ -52,7 +65,26 @@ import Cardano.Shell.NodeIPC.Message (MessageException,
52
65
53
66
import qualified Prelude as P (Show (.. ))
54
67
55
- -- | The way the IPC protocol works.
68
+ -- | When using pipes, __the write doesn't block, but the read blocks__!
69
+ -- As a consequence, we eiter need to use IDs to keep track of the client/server pair,
70
+ -- or (read) block so we know which message pair arrived.
71
+ -- This might seems an overkill for this task, but it's actually required if we
72
+ -- want to reason about it and test it properly.
73
+ --
74
+ -- >>> (readEnd, writeEnd) <- createPipe
75
+ --
76
+ -- >>> replicateM 100 $ sendMessage (WriteHandle writeEnd) Cardano.Shell.NodeIPC.Ping
77
+ -- [(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),()]
78
+ --
79
+ -- >>> mesg <- replicateM 100 ((readMessage (ReadHandle readEnd)) :: IO MsgIn)
80
+ --
81
+ -- >>> mesg <- (readMessage (ReadHandle readEnd)) :: IO MsgIn
82
+ --
83
+ --
84
+ -- Blocked!
85
+
86
+ -- | The way the IPC protocol works - it either responds to a single
87
+ -- __IPC__ message or it remains in a loop responding to multiple messages.
56
88
data ProtocolDuration
57
89
= SingleMessage
58
90
-- ^ Responds to a single message and exits
@@ -186,8 +218,10 @@ startIPC protocolDuration readHandle writeHandle port = liftIO $ void $ ipcListe
186
218
startNodeJsIPC :: forall m . (MonadIO m ) => ProtocolDuration -> Port -> m ()
187
219
startNodeJsIPC protocolDuration port = do
188
220
handle <- liftIO $ getIPCHandle
221
+
189
222
let readHandle = ReadHandle handle
190
223
let writeHandle = WriteHandle handle
224
+
191
225
liftIO $ void $ ipcListener protocolDuration readHandle writeHandle port
192
226
193
227
-- | Function for handling the protocol
@@ -255,11 +289,127 @@ ipcListener protocolDuration readHandle@(ReadHandle rHndl) writeHandle@(WriteHan
255
289
checkHandle wHandle hIsOpen (HandleClosed wHandle)
256
290
checkHandle rHandle hIsReadable (UnreadableHandle rHandle)
257
291
checkHandle wHandle hIsWritable (UnwritableHandle wHandle)
258
-
259
- checkHandle :: Handle -> (Handle -> IO Bool ) -> NodeIPCException -> IO ()
260
- checkHandle handle pre exception = do
261
- result <- pre handle
262
- when (not result) $ throwM exception
292
+ where
293
+ -- | Utility function for checking a handle.
294
+ checkHandle :: Handle -> (Handle -> IO Bool ) -> NodeIPCException -> IO ()
295
+ checkHandle handle pre exception = do
296
+ result <- pre handle
297
+ when (not result) $ throwM exception
298
+
299
+ -- | Client side IPC protocol.
300
+ clientIPCListener
301
+ :: forall m . (MonadIO m , MonadMask m )
302
+ => ProtocolDuration
303
+ -> ClientHandles
304
+ -> Port
305
+ -- ^ This is really making things confusing. A Port is here,
306
+ -- but it's determined on the client side, not before.
307
+ -> m ()
308
+ clientIPCListener duration clientHandles port =
309
+ ipcListener
310
+ duration
311
+ (getClientReadHandle clientHandles)
312
+ (getClientWriteHandle clientHandles)
313
+ port
314
+
315
+ -- | The set of handles for the server, the halves of one pipe.
316
+ data ServerHandles = ServerHandles
317
+ { getServerReadHandle :: ! ReadHandle
318
+ , getServerWriteHandle :: ! WriteHandle
319
+ }
320
+
321
+ -- | The set of handles for the client, the halves of one pipe.
322
+ data ClientHandles = ClientHandles
323
+ { getClientReadHandle :: ! ReadHandle
324
+ , getClientWriteHandle :: ! WriteHandle
325
+ }
326
+
327
+ -- | This is a __blocking call__ that sends the message to the client
328
+ -- and returns it's response, __after the client response arrives__.
329
+ serverReadWrite :: ServerHandles -> MsgIn -> IO MsgOut
330
+ serverReadWrite serverHandles msgIn = do
331
+ sendMessage (getServerWriteHandle serverHandles) msgIn
332
+ readMessage (getServerReadHandle serverHandles)
333
+
334
+ -- | A bracket function that can be useful.
335
+ bracketFullDuplexAnonPipesHandles
336
+ :: ((ServerHandles , ClientHandles ) -> IO () )
337
+ -> IO ()
338
+ bracketFullDuplexAnonPipesHandles computationToRun =
339
+ bracket
340
+ createFullDuplexAnonPipesHandles
341
+ closeFullDuplexAnonPipesHandles
342
+ computationToRun
343
+
344
+ -- | Close the pipe handles.
345
+ closeFullDuplexAnonPipesHandles :: (ServerHandles , ClientHandles ) -> IO ()
346
+ closeFullDuplexAnonPipesHandles (serverHandles, clientHandles) = do
347
+ -- close the server side
348
+ hClose $ getReadHandle (getServerReadHandle serverHandles)
349
+ hClose $ getWriteHandle (getServerWriteHandle serverHandles)
350
+
351
+ -- close the client side
352
+ hClose $ getReadHandle (getClientReadHandle clientHandles)
353
+ hClose $ getWriteHandle (getClientWriteHandle clientHandles)
354
+
355
+ -- | Creation of a two-way communication between the server and the client.
356
+ -- Full-duplex (two-way) communication normally requires two anonymous pipes.
357
+ -- TODO(KS): Bracket this!
358
+ createFullDuplexAnonPipesHandles :: IO (ServerHandles , ClientHandles )
359
+ createFullDuplexAnonPipesHandles = do
360
+
361
+ (clientReadHandle, clientWriteHandle) <- getReadWriteHandles
362
+ (serverReadHandle, serverWriteHandle) <- getReadWriteHandles
363
+
364
+ let serverHandles = ServerHandles clientReadHandle serverWriteHandle
365
+ let clientHandles = ClientHandles serverReadHandle clientWriteHandle
366
+
367
+ return (serverHandles, clientHandles)
368
+
369
+ -- | Create a pipe for interprocess communication and return a
370
+ -- ('ReadHandle', 'WriteHandle') Handle pair.
371
+ getReadWriteHandles :: IO (ReadHandle , WriteHandle )
372
+ getReadWriteHandles = do
373
+ (readHndl, writeHndl) <- createPipe
374
+
375
+ hSetBuffering readHndl LineBuffering
376
+ hSetBuffering writeHndl LineBuffering
377
+
378
+ let readHandle = ReadHandle readHndl
379
+ let writeHandle = WriteHandle writeHndl
380
+
381
+ return (readHandle, writeHandle)
382
+
383
+
384
+ -- | Test 'startIPC'
385
+ testStartNodeIPC :: (ToJSON msg ) => Port -> msg -> IO (MsgOut , MsgOut )
386
+ testStartNodeIPC port msg = do
387
+ (clientReadHandle, clientWriteHandle) <- getReadWriteHandles
388
+ (serverReadHandle, serverWriteHandle) <- getReadWriteHandles
389
+
390
+ -- Start the server
391
+ (_, responses) <-
392
+ startIPC
393
+ SingleMessage
394
+ serverReadHandle
395
+ clientWriteHandle
396
+ port
397
+ `concurrently`
398
+ do
399
+ -- Use these functions so you don't pass the wrong handle by mistake
400
+ let readClientMessage :: IO MsgOut
401
+ readClientMessage = readMessage clientReadHandle
402
+
403
+ let sendServer :: (ToJSON msg ) => msg -> IO ()
404
+ sendServer = sendMessage serverWriteHandle
405
+
406
+ -- Communication starts here
407
+ started <- readClientMessage
408
+ sendServer msg
409
+ response <- readClientMessage
410
+ return (started, response)
411
+
412
+ return responses
263
413
264
414
--------------------------------------------------------------------------------
265
415
-- Placeholder
0 commit comments