module GHC.Eventlog.Live.Machine.Source (
sourceHandleWait,
sourceHandleBatch,
defaultChunkSizeBytes,
) where
import Control.Exception (catch, throwIO)
import Control.Monad.IO.Class (MonadIO (..))
import Data.ByteString qualified as BS
import Data.Function (fix)
import Data.Machine (MachineT (..), construct, yield)
import Data.Word (Word64)
import GHC.Clock (getMonotonicTimeNSec)
import GHC.Eventlog.Live.Machine.Core (Tick (..))
import System.IO (Handle, hWaitForInput)
import System.IO.Error (isEOFError)
sourceHandleWait ::
(MonadIO m) =>
Int ->
Int ->
Handle ->
MachineT m k (Tick BS.ByteString)
sourceHandleWait :: forall (m :: * -> *) (k :: * -> *).
MonadIO m =>
Int -> Int -> Handle -> MachineT m k (Tick ByteString)
sourceHandleWait Int
timeoutMilli Int
chunkSizeBytes Handle
handle =
PlanT k (Tick ByteString) m () -> MachineT m k (Tick ByteString)
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
construct (PlanT k (Tick ByteString) m () -> MachineT m k (Tick ByteString))
-> PlanT k (Tick ByteString) m () -> MachineT m k (Tick ByteString)
forall a b. (a -> b) -> a -> b
$ (PlanT k (Tick ByteString) m () -> PlanT k (Tick ByteString) m ())
-> PlanT k (Tick ByteString) m ()
forall a. (a -> a) -> a
fix ((PlanT k (Tick ByteString) m () -> PlanT k (Tick ByteString) m ())
-> PlanT k (Tick ByteString) m ())
-> (PlanT k (Tick ByteString) m ()
-> PlanT k (Tick ByteString) m ())
-> PlanT k (Tick ByteString) m ()
forall a b. (a -> b) -> a -> b
$ \PlanT k (Tick ByteString) m ()
loop -> do
Ready
ready <- IO Ready -> PlanT k (Tick ByteString) m Ready
forall a. IO a -> PlanT k (Tick ByteString) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Ready -> PlanT k (Tick ByteString) m Ready)
-> IO Ready -> PlanT k (Tick ByteString) m Ready
forall a b. (a -> b) -> a -> b
$ Handle -> Int -> IO Ready
hWaitForInput' Handle
handle Int
timeoutMilli
case Ready
ready of
Ready
Ready -> do
ByteString
bs <- IO ByteString -> PlanT k (Tick ByteString) m ByteString
forall a. IO a -> PlanT k (Tick ByteString) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ByteString -> PlanT k (Tick ByteString) m ByteString)
-> IO ByteString -> PlanT k (Tick ByteString) m ByteString
forall a b. (a -> b) -> a -> b
$ Handle -> Int -> IO ByteString
BS.hGetSome Handle
handle Int
chunkSizeBytes
Tick ByteString -> Plan k (Tick ByteString) ()
forall o (k :: * -> *). o -> Plan k o ()
yield (ByteString -> Tick ByteString
forall a. a -> Tick a
Item ByteString
bs)
PlanT k (Tick ByteString) m ()
loop
Ready
NotReady -> do
Tick ByteString -> Plan k (Tick ByteString) ()
forall o (k :: * -> *). o -> Plan k o ()
yield Tick ByteString
forall a. Tick a
Tick
PlanT k (Tick ByteString) m ()
loop
Ready
EOF ->
() -> PlanT k (Tick ByteString) m ()
forall a. a -> PlanT k (Tick ByteString) m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
sourceHandleBatch ::
(MonadIO m) =>
Int ->
Int ->
Handle ->
MachineT m k (Tick BS.ByteString)
sourceHandleBatch :: forall (m :: * -> *) (k :: * -> *).
MonadIO m =>
Int -> Int -> Handle -> MachineT m k (Tick ByteString)
sourceHandleBatch Int
batchIntervalMs Int
chunkSizeBytes Handle
handle = PlanT k (Tick ByteString) m () -> MachineT m k (Tick ByteString)
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
construct PlanT k (Tick ByteString) m ()
start
where
start :: PlanT k (Tick ByteString) m ()
start = do
Int
startTimeMs <- IO Int -> PlanT k (Tick ByteString) m Int
forall a. IO a -> PlanT k (Tick ByteString) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Int
getMonotonicTimeMilli
Int -> PlanT k (Tick ByteString) m ()
batch Int
startTimeMs
batch :: Int -> PlanT k (Tick ByteString) m ()
batch Int
startTimeMs = PlanT k (Tick ByteString) m ()
waitForInput
where
getRemainingTimeMilli :: PlanT k (Tick ByteString) m Int
getRemainingTimeMilli = do
Int
currentTimeMilli <- IO Int -> PlanT k (Tick ByteString) m Int
forall a. IO a -> PlanT k (Tick ByteString) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Int
getMonotonicTimeMilli
Int -> PlanT k (Tick ByteString) m Int
forall a. a -> PlanT k (Tick ByteString) m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> PlanT k (Tick ByteString) m Int)
-> Int -> PlanT k (Tick ByteString) m Int
forall a b. (a -> b) -> a -> b
$ (Int
startTimeMs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
batchIntervalMs) Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
currentTimeMilli
waitForInput :: PlanT k (Tick ByteString) m ()
waitForInput = do
Int
remainingTimeMilli <- PlanT k (Tick ByteString) m Int
getRemainingTimeMilli
if Int
remainingTimeMilli Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
then do
Tick ByteString -> Plan k (Tick ByteString) ()
forall o (k :: * -> *). o -> Plan k o ()
yield Tick ByteString
forall a. Tick a
Tick
PlanT k (Tick ByteString) m ()
start
else do
Ready
ready <- IO Ready -> PlanT k (Tick ByteString) m Ready
forall a. IO a -> PlanT k (Tick ByteString) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Handle -> Int -> IO Ready
hWaitForInput' Handle
handle Int
remainingTimeMilli)
case Ready
ready of
Ready
Ready -> do
ByteString
chunk <- IO ByteString -> PlanT k (Tick ByteString) m ByteString
forall a. IO a -> PlanT k (Tick ByteString) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ByteString -> PlanT k (Tick ByteString) m ByteString)
-> IO ByteString -> PlanT k (Tick ByteString) m ByteString
forall a b. (a -> b) -> a -> b
$ Handle -> Int -> IO ByteString
BS.hGetSome Handle
handle Int
chunkSizeBytes
Tick ByteString -> Plan k (Tick ByteString) ()
forall o (k :: * -> *). o -> Plan k o ()
yield (ByteString -> Tick ByteString
forall a. a -> Tick a
Item ByteString
chunk) PlanT k (Tick ByteString) m ()
-> PlanT k (Tick ByteString) m () -> PlanT k (Tick ByteString) m ()
forall a b.
PlanT k (Tick ByteString) m a
-> PlanT k (Tick ByteString) m b -> PlanT k (Tick ByteString) m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> PlanT k (Tick ByteString) m ()
waitForInput
Ready
NotReady -> PlanT k (Tick ByteString) m ()
waitForInput
Ready
EOF -> () -> PlanT k (Tick ByteString) m ()
forall a. a -> PlanT k (Tick ByteString) m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
defaultChunkSizeBytes :: Int
defaultChunkSizeBytes :: Int
defaultChunkSizeBytes = Int
4096
getMonotonicTimeMilli :: IO Int
getMonotonicTimeMilli :: IO Int
getMonotonicTimeMilli = Word64 -> Int
nanoToMilli (Word64 -> Int) -> IO Word64 -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO Word64
getMonotonicTimeNSec
nanoToMilli :: Word64 -> Int
nanoToMilli :: Word64 -> Int
nanoToMilli = Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word64 -> Int) -> (Word64 -> Word64) -> Word64 -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Word64 -> Word64 -> Word64
forall a. Integral a => a -> a -> a
`div` Word64
1_000_000)
data Ready = Ready | NotReady | EOF
hWaitForInput' ::
Handle ->
Int ->
IO Ready
hWaitForInput' :: Handle -> Int -> IO Ready
hWaitForInput' Handle
handle Int
timeoutMilli =
IO Ready -> (IOError -> IO Ready) -> IO Ready
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch (Bool -> Ready
boolToReady (Bool -> Ready) -> IO Bool -> IO Ready
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Handle -> Int -> IO Bool
hWaitForInput Handle
handle Int
timeoutMilli) IOError -> IO Ready
handleEOFError
where
boolToReady :: Bool -> Ready
boolToReady Bool
True = Ready
Ready
boolToReady Bool
False = Ready
NotReady
handleEOFError :: IOError -> IO Ready
handleEOFError IOError
err
| IOError -> Bool
isEOFError IOError
err = Ready -> IO Ready
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Ready
EOF
| Bool
otherwise = IOError -> IO Ready
forall e a. Exception e => e -> IO a
throwIO IOError
err