1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
-----------------------------------------------------------------------------
-- |
-- Module : Plugins.BufferedPipeReader
-- Copyright : (c) Jochen Keil
-- License : BSD-style (see LICENSE)
--
-- Maintainer : Jochen Keil <jochen dot keil at gmail dot com>
-- Stability : unstable
-- Portability : unportable
--
-- A plugin for reading (temporarily) from named pipes with reset
--
-----------------------------------------------------------------------------
module Plugins.BufferedPipeReader where
import Control.Monad(forM_, when, void)
import Control.Concurrent
import Control.Concurrent.STM
import System.IO
import System.IO.Unsafe(unsafePerformIO)
import Plugins
import Signal
data BufferedPipeReader = BufferedPipeReader String [(Int, Bool, String)]
deriving (Read, Show)
signal :: MVar SignalType
signal = unsafePerformIO newEmptyMVar
instance Exec BufferedPipeReader where
alias ( BufferedPipeReader a _ ) = a
trigger br@( BufferedPipeReader _ _ ) sh =
takeMVar signal >>= sh . Just >> trigger br sh
start ( BufferedPipeReader _ ps ) cb = do
(chan, str, rst) <- initV
forM_ ps $ \p -> forkIO $ reader p chan
writer chan str rst
where
initV :: IO ( TChan (Int, Bool, String), TVar (Maybe String), TVar Bool )
initV = atomically $ do
tc <- newTChan
ts <- newTVar Nothing
tb <- newTVar False
return (tc, ts, tb)
reader :: (Int, Bool, FilePath) -> TChan (Int, Bool, String) -> IO ()
reader p@(to, tg, fp) tc = do
openFile fp ReadWriteMode >>= hGetLineSafe >>= \dt ->
atomically $ writeTChan tc (to, tg, dt)
reader p tc
writer :: TChan (Int, Bool, String)
-> TVar (Maybe String) -> TVar Bool -> IO ()
writer tc ts otb = do
(to, tg, dt, ntb) <- update
cb dt
when tg $ putMVar signal $ Reveal 0
when (to /= 0) $ sfork $ reset to tg ts ntb
writer tc ts ntb
where
sfork :: IO () -> IO ()
sfork f = void (forkIO f)
update :: IO (Int, Bool, String, TVar Bool)
update = atomically $ do
(to, tg, dt) <- readTChan tc
when (to == 0) $ writeTVar ts $ Just dt
writeTVar otb False
tb <- newTVar True
return (to, tg, dt, tb)
reset :: Int -> Bool -> TVar (Maybe String) -> TVar Bool -> IO ()
reset to tg ts tb = do
threadDelay ( to * 100 * 1000 )
readTVarIO tb >>= \b -> when b $ do
when tg $ putMVar signal $ Hide 0
atomically (readTVar ts) >>= maybe (return ()) cb
|