{- |
Module      : GHC.Eventlog.Live.Machine.Source
Description : Machines for processing eventlog data.
Stability   : experimental
Portability : portable
-}
module GHC.Eventlog.Live.Machine.Source (
  -- * Eventlog source
  sourceHandleWait,
  sourceHandleBatch,
  defaultChunkSizeBytes,
) where

import Control.Exception (catch, throwIO)
import Control.Monad.IO.Class (MonadIO (..))
import Data.ByteString qualified as BS
import Data.Function (fix)
import Data.Machine (MachineT (..), construct, yield)
import Data.Word (Word64)
import GHC.Clock (getMonotonicTimeNSec)
import GHC.Eventlog.Live.Machine.Core (Tick (..))
import System.IO (Handle, hWaitForInput)
import System.IO.Error (isEOFError)

-------------------------------------------------------------------------------
-- Socket source

{- |
A source which reads chunks from a `Handle`.
When an input is available, it yields an v`Item`.
When the timeout is reached, it yields a v`Tick`.
-}
sourceHandleWait ::
  (MonadIO m) =>
  -- | The wait timeout in milliseconds.
  Int ->
  -- | The number of bytes to read.
  Int ->
  -- | The eventlog socket handle.
  Handle ->
  MachineT m k (Tick BS.ByteString)
sourceHandleWait :: forall (m :: * -> *) (k :: * -> *).
MonadIO m =>
Int -> Int -> Handle -> MachineT m k (Tick ByteString)
sourceHandleWait Int
timeoutMilli Int
chunkSizeBytes Handle
handle =
  PlanT k (Tick ByteString) m () -> MachineT m k (Tick ByteString)
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
construct (PlanT k (Tick ByteString) m () -> MachineT m k (Tick ByteString))
-> PlanT k (Tick ByteString) m () -> MachineT m k (Tick ByteString)
forall a b. (a -> b) -> a -> b
$ (PlanT k (Tick ByteString) m () -> PlanT k (Tick ByteString) m ())
-> PlanT k (Tick ByteString) m ()
forall a. (a -> a) -> a
fix ((PlanT k (Tick ByteString) m () -> PlanT k (Tick ByteString) m ())
 -> PlanT k (Tick ByteString) m ())
-> (PlanT k (Tick ByteString) m ()
    -> PlanT k (Tick ByteString) m ())
-> PlanT k (Tick ByteString) m ()
forall a b. (a -> b) -> a -> b
$ \PlanT k (Tick ByteString) m ()
loop -> do
    Ready
ready <- IO Ready -> PlanT k (Tick ByteString) m Ready
forall a. IO a -> PlanT k (Tick ByteString) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Ready -> PlanT k (Tick ByteString) m Ready)
-> IO Ready -> PlanT k (Tick ByteString) m Ready
forall a b. (a -> b) -> a -> b
$ Handle -> Int -> IO Ready
hWaitForInput' Handle
handle Int
timeoutMilli
    case Ready
ready of
      Ready
Ready -> do
        ByteString
bs <- IO ByteString -> PlanT k (Tick ByteString) m ByteString
forall a. IO a -> PlanT k (Tick ByteString) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ByteString -> PlanT k (Tick ByteString) m ByteString)
-> IO ByteString -> PlanT k (Tick ByteString) m ByteString
forall a b. (a -> b) -> a -> b
$ Handle -> Int -> IO ByteString
BS.hGetSome Handle
handle Int
chunkSizeBytes
        Tick ByteString -> Plan k (Tick ByteString) ()
forall o (k :: * -> *). o -> Plan k o ()
yield (ByteString -> Tick ByteString
forall a. a -> Tick a
Item ByteString
bs)
        PlanT k (Tick ByteString) m ()
loop
      Ready
NotReady -> do
        Tick ByteString -> Plan k (Tick ByteString) ()
forall o (k :: * -> *). o -> Plan k o ()
yield Tick ByteString
forall a. Tick a
Tick
        PlanT k (Tick ByteString) m ()
loop
      Ready
EOF ->
        () -> PlanT k (Tick ByteString) m ()
forall a. a -> PlanT k (Tick ByteString) m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

-------------------------------------------------------------------------------
-- Socket source with batches

{- |
A source which reads chunks from a `Handle`.
When input is available, it yields an v`Item`.
It yields a v`Tick` at each increment of the batch interval.
-}
sourceHandleBatch ::
  (MonadIO m) =>
  -- | The batch interval in milliseconds.
  Int ->
  -- | The number of bytes to read.
  Int ->
  -- | The eventlog socket handle.
  Handle ->
  MachineT m k (Tick BS.ByteString)
sourceHandleBatch :: forall (m :: * -> *) (k :: * -> *).
MonadIO m =>
Int -> Int -> Handle -> MachineT m k (Tick ByteString)
sourceHandleBatch Int
batchIntervalMs Int
chunkSizeBytes Handle
handle = PlanT k (Tick ByteString) m () -> MachineT m k (Tick ByteString)
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
construct PlanT k (Tick ByteString) m ()
start
 where
  start :: PlanT k (Tick ByteString) m ()
start = do
    Int
startTimeMs <- IO Int -> PlanT k (Tick ByteString) m Int
forall a. IO a -> PlanT k (Tick ByteString) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Int
getMonotonicTimeMilli
    Int -> PlanT k (Tick ByteString) m ()
batch Int
startTimeMs
  batch :: Int -> PlanT k (Tick ByteString) m ()
batch Int
startTimeMs = PlanT k (Tick ByteString) m ()
waitForInput
   where
    getRemainingTimeMilli :: PlanT k (Tick ByteString) m Int
getRemainingTimeMilli = do
      Int
currentTimeMilli <- IO Int -> PlanT k (Tick ByteString) m Int
forall a. IO a -> PlanT k (Tick ByteString) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Int
getMonotonicTimeMilli
      Int -> PlanT k (Tick ByteString) m Int
forall a. a -> PlanT k (Tick ByteString) m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> PlanT k (Tick ByteString) m Int)
-> Int -> PlanT k (Tick ByteString) m Int
forall a b. (a -> b) -> a -> b
$ (Int
startTimeMs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
batchIntervalMs) Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
currentTimeMilli
    waitForInput :: PlanT k (Tick ByteString) m ()
waitForInput = do
      Int
remainingTimeMilli <- PlanT k (Tick ByteString) m Int
getRemainingTimeMilli
      if Int
remainingTimeMilli Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
        then do
          Tick ByteString -> Plan k (Tick ByteString) ()
forall o (k :: * -> *). o -> Plan k o ()
yield Tick ByteString
forall a. Tick a
Tick
          PlanT k (Tick ByteString) m ()
start
        else do
          Ready
ready <- IO Ready -> PlanT k (Tick ByteString) m Ready
forall a. IO a -> PlanT k (Tick ByteString) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Handle -> Int -> IO Ready
hWaitForInput' Handle
handle Int
remainingTimeMilli)
          case Ready
ready of
            Ready
Ready -> do
              ByteString
chunk <- IO ByteString -> PlanT k (Tick ByteString) m ByteString
forall a. IO a -> PlanT k (Tick ByteString) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ByteString -> PlanT k (Tick ByteString) m ByteString)
-> IO ByteString -> PlanT k (Tick ByteString) m ByteString
forall a b. (a -> b) -> a -> b
$ Handle -> Int -> IO ByteString
BS.hGetSome Handle
handle Int
chunkSizeBytes
              Tick ByteString -> Plan k (Tick ByteString) ()
forall o (k :: * -> *). o -> Plan k o ()
yield (ByteString -> Tick ByteString
forall a. a -> Tick a
Item ByteString
chunk) PlanT k (Tick ByteString) m ()
-> PlanT k (Tick ByteString) m () -> PlanT k (Tick ByteString) m ()
forall a b.
PlanT k (Tick ByteString) m a
-> PlanT k (Tick ByteString) m b -> PlanT k (Tick ByteString) m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> PlanT k (Tick ByteString) m ()
waitForInput
            Ready
NotReady -> PlanT k (Tick ByteString) m ()
waitForInput
            Ready
EOF -> () -> PlanT k (Tick ByteString) m ()
forall a. a -> PlanT k (Tick ByteString) m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

{- |
Eventlog chunk size in bytes.
This should be equal to the page size.
-}
defaultChunkSizeBytes :: Int
defaultChunkSizeBytes :: Int
defaultChunkSizeBytes = Int
4096

{- |
Internal helper.
Return monotonic time in milliseconds, since some unspecified starting point
-}
getMonotonicTimeMilli :: IO Int
getMonotonicTimeMilli :: IO Int
getMonotonicTimeMilli = Word64 -> Int
nanoToMilli (Word64 -> Int) -> IO Word64 -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO Word64
getMonotonicTimeNSec

{- |
Internal helper.
Convert nanoseconds to milliseconds.
The conversion from 'Word64' to 'Int' is safe.
It cannot overflow due to the division by 1_000_000.
-}
nanoToMilli :: Word64 -> Int
nanoToMilli :: Word64 -> Int
nanoToMilli = Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word64 -> Int) -> (Word64 -> Word64) -> Word64 -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Word64 -> Word64 -> Word64
forall a. Integral a => a -> a -> a
`div` Word64
1_000_000)

{- |
Internal helper.
Type to represent the state of a handle.
-}
data Ready = Ready | NotReady | EOF

{- |
Internal helper.
Wait for input from a `Handle` for a given number of milliseconds.
-}
hWaitForInput' ::
  -- | The handle.
  Handle ->
  -- | The timeout in milliseconds.
  Int ->
  IO Ready
hWaitForInput' :: Handle -> Int -> IO Ready
hWaitForInput' Handle
handle Int
timeoutMilli =
  IO Ready -> (IOError -> IO Ready) -> IO Ready
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch (Bool -> Ready
boolToReady (Bool -> Ready) -> IO Bool -> IO Ready
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Handle -> Int -> IO Bool
hWaitForInput Handle
handle Int
timeoutMilli) IOError -> IO Ready
handleEOFError
 where
  boolToReady :: Bool -> Ready
boolToReady Bool
True = Ready
Ready
  boolToReady Bool
False = Ready
NotReady
  handleEOFError :: IOError -> IO Ready
handleEOFError IOError
err
    | IOError -> Bool
isEOFError IOError
err = Ready -> IO Ready
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Ready
EOF
    | Bool
otherwise = IOError -> IO Ready
forall e a. Exception e => e -> IO a
throwIO IOError
err