{-# LANGUAGE OverloadedStrings #-}
module GHC.Eventlog.Live.Socket (
EventlogSource (..),
Tick (..),
tryConnect,
runWithEventlogSource,
) where
import Control.Concurrent (threadDelay)
import Control.Exception (Exception (..))
import Control.Exception qualified as E
import Control.Monad.IO.Unlift (MonadUnliftIO (..))
import Data.Foldable (traverse_)
import Data.Machine (ProcessT, runT_, (~>))
import Data.Machine.Fanout (fanout)
import Data.Maybe (fromMaybe)
import Data.Text (Text)
import Data.Text qualified as T
import Data.Void (Void)
import GHC.Eventlog.Live.Logger (logDebug, logInfo)
import GHC.Eventlog.Live.Machine.Core
import GHC.Eventlog.Live.Machine.Decoder
import GHC.Eventlog.Live.Machine.Sink
import GHC.Eventlog.Live.Machine.Source
import GHC.Eventlog.Live.Options (EventlogSource (..))
import GHC.Eventlog.Live.Verbosity (Verbosity)
import GHC.RTS.Events (Event)
import Network.Socket qualified as S
import System.IO (Handle)
import System.IO qualified as IO
import Text.Printf (printf)
runWithEventlogSource ::
(MonadUnliftIO m) =>
Verbosity ->
EventlogSource ->
Double ->
Double ->
Int ->
Maybe Int ->
Maybe FilePath ->
ProcessT m (Tick Event) Void ->
m ()
runWithEventlogSource :: forall (m :: * -> *).
MonadUnliftIO m =>
Verbosity
-> EventlogSource
-> Double
-> Double
-> Int
-> Maybe Int
-> Maybe String
-> ProcessT m (Tick Event) Void
-> m ()
runWithEventlogSource Verbosity
verbosity EventlogSource
eventlogSocket Double
timeoutExponent Double
initialTimeoutMcs Int
batchIntervalMs Maybe Int
maybeChuckSizeBytes Maybe String
maybeOutputFile ProcessT m (Tick Event) Void
toEventSink = do
Verbosity
-> Double -> Double -> EventlogSource -> (Handle -> m ()) -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
Verbosity
-> Double -> Double -> EventlogSource -> (Handle -> m ()) -> m ()
withEventlogSource Verbosity
verbosity Double
timeoutExponent Double
initialTimeoutMcs EventlogSource
eventlogSocket ((Handle -> m ()) -> m ()) -> (Handle -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Handle
eventlogSource -> do
let chuckSizeBytes :: Int
chuckSizeBytes = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe Int
defaultChunkSizeBytes Maybe Int
maybeChuckSizeBytes
let fromSocket :: MachineT m Any (Tick ByteString)
fromSocket = Int -> Int -> Handle -> MachineT m Any (Tick ByteString)
forall (m :: * -> *) (k :: * -> *).
MonadIO m =>
Int -> Int -> Handle -> MachineT m k (Tick ByteString)
sourceHandleBatch Int
batchIntervalMs Int
chuckSizeBytes Handle
eventlogSource
case Maybe String
maybeOutputFile of
Maybe String
Nothing ->
MachineT m Any Void -> m ()
forall (m :: * -> *) (k :: * -> *) b.
Monad m =>
MachineT m k b -> m ()
runT_ (MachineT m Any Void -> m ()) -> MachineT m Any Void -> m ()
forall a b. (a -> b) -> a -> b
$
MachineT m Any (Tick ByteString)
fromSocket MachineT m Any (Tick ByteString)
-> ProcessT m (Tick ByteString) (Tick Event)
-> MachineT m Any (Tick Event)
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> ProcessT m (Tick ByteString) (Tick Event)
forall (m :: * -> *).
MonadIO m =>
ProcessT m (Tick ByteString) (Tick Event)
decodeEventBatch MachineT m Any (Tick Event)
-> ProcessT m (Tick Event) Void -> MachineT m Any Void
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> ProcessT m (Tick Event) Void
toEventSink
Just String
outputFile ->
((forall a. m a -> IO a) -> IO ()) -> m ()
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO ()) -> m ())
-> ((forall a. m a -> IO a) -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
runInIO ->
String -> IOMode -> (Handle -> IO ()) -> IO ()
forall r. String -> IOMode -> (Handle -> IO r) -> IO r
IO.withFile String
outputFile IOMode
IO.WriteMode ((Handle -> IO ()) -> IO ()) -> (Handle -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Handle
outputHandle -> do
m () -> IO ()
forall a. m a -> IO a
runInIO (m () -> IO ())
-> (MachineT m Any Void -> m ()) -> MachineT m Any Void -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MachineT m Any Void -> m ()
forall (m :: * -> *) (k :: * -> *) b.
Monad m =>
MachineT m k b -> m ()
runT_ (MachineT m Any Void -> IO ()) -> MachineT m Any Void -> IO ()
forall a b. (a -> b) -> a -> b
$
MachineT m Any (Tick ByteString)
fromSocket
MachineT m Any (Tick ByteString)
-> ProcessT m (Tick ByteString) Void -> MachineT m Any Void
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> [ProcessT m (Tick ByteString) Void]
-> ProcessT m (Tick ByteString) Void
forall (m :: * -> *) a r.
(Monad m, Semigroup r) =>
[ProcessT m a r] -> ProcessT m a r
fanout
[ Handle -> ProcessT m (Tick ByteString) Void
forall (m :: * -> *).
MonadIO m =>
Handle -> ProcessT m (Tick ByteString) Void
fileSinkBatch Handle
outputHandle
, ProcessT m (Tick ByteString) (Tick Event)
forall (m :: * -> *).
MonadIO m =>
ProcessT m (Tick ByteString) (Tick Event)
decodeEventBatch ProcessT m (Tick ByteString) (Tick Event)
-> ProcessT m (Tick Event) Void
-> ProcessT m (Tick ByteString) Void
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> ProcessT m (Tick Event) Void
toEventSink
]
withEventlogSource ::
(MonadUnliftIO m) =>
Verbosity ->
Double ->
Double ->
EventlogSource ->
(Handle -> m ()) ->
m ()
withEventlogSource :: forall (m :: * -> *).
MonadUnliftIO m =>
Verbosity
-> Double -> Double -> EventlogSource -> (Handle -> m ()) -> m ()
withEventlogSource Verbosity
verbosity Double
initialTimeoutMcs Double
timeoutExponent EventlogSource
eventlogSource Handle -> m ()
action = do
((forall a. m a -> IO a) -> IO ()) -> m ()
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO ()) -> m ())
-> ((forall a. m a -> IO a) -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
runInIO ->
case EventlogSource
eventlogSource of
EventlogSource
EventlogStdin -> do
Verbosity -> Text -> IO ()
forall (m :: * -> *).
(HasCallStack, MonadIO m) =>
Verbosity -> Text -> m ()
logInfo Verbosity
verbosity Text
"Reading eventlog from stdin"
let enter :: IO (Maybe TextEncoding)
enter = do
Maybe TextEncoding
maybeStdinTextEncoding <- Handle -> IO (Maybe TextEncoding)
IO.hGetEncoding Handle
IO.stdin
Handle -> Bool -> IO ()
IO.hSetBinaryMode Handle
IO.stdin Bool
True
Maybe TextEncoding -> IO (Maybe TextEncoding)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe TextEncoding
maybeStdinTextEncoding
let leave :: t TextEncoding -> IO ()
leave t TextEncoding
maybeStdinTextEncoding = do
(TextEncoding -> IO ()) -> t TextEncoding -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (Handle -> TextEncoding -> IO ()
IO.hSetEncoding Handle
IO.stdin) t TextEncoding
maybeStdinTextEncoding
Handle -> NewlineMode -> IO ()
IO.hSetNewlineMode Handle
IO.stdin NewlineMode
IO.nativeNewlineMode
IO (Maybe TextEncoding)
-> (Maybe TextEncoding -> IO ())
-> (Maybe TextEncoding -> IO ())
-> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
E.bracket IO (Maybe TextEncoding)
enter Maybe TextEncoding -> IO ()
forall {t :: * -> *}. Foldable t => t TextEncoding -> IO ()
leave ((Maybe TextEncoding -> IO ()) -> IO ())
-> (Handle -> Maybe TextEncoding -> IO ()) -> Handle -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> Maybe TextEncoding -> IO ()
forall a b. a -> b -> a
const (IO () -> Maybe TextEncoding -> IO ())
-> (Handle -> IO ()) -> Handle -> Maybe TextEncoding -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> IO ()
forall a. m a -> IO a
runInIO (m () -> IO ()) -> (Handle -> m ()) -> Handle -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Handle -> m ()
action (Handle -> IO ()) -> Handle -> IO ()
forall a b. (a -> b) -> a -> b
$ Handle
IO.stdin
EventlogFile String
eventlogFile -> do
Verbosity -> Text -> IO ()
forall (m :: * -> *).
(HasCallStack, MonadIO m) =>
Verbosity -> Text -> m ()
logInfo Verbosity
verbosity (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Reading eventlog from " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
eventlogFile
String -> IOMode -> (Handle -> IO ()) -> IO ()
forall r. String -> IOMode -> (Handle -> IO r) -> IO r
IO.withBinaryFile String
eventlogFile IOMode
IO.ReadMode ((Handle -> IO ()) -> IO ()) -> (Handle -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Handle
handle ->
m () -> IO ()
forall a. m a -> IO a
runInIO (m () -> IO ()) -> m () -> IO ()
forall a b. (a -> b) -> a -> b
$ Handle -> m ()
action Handle
handle
EventlogSocketUnix String
eventlogSocketUnix -> do
Verbosity -> Text -> IO ()
forall (m :: * -> *).
(HasCallStack, MonadIO m) =>
Verbosity -> Text -> m ()
logInfo Verbosity
verbosity (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Waiting to connect on " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
prettyEventlogSocketUnix String
eventlogSocketUnix
IO Handle -> (Handle -> IO ()) -> (Handle -> IO ()) -> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
E.bracket (Verbosity -> Double -> Double -> String -> IO Handle
connectRetry Verbosity
verbosity Double
initialTimeoutMcs Double
timeoutExponent String
eventlogSocketUnix) Handle -> IO ()
IO.hClose ((Handle -> IO ()) -> IO ()) -> (Handle -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Handle
handle ->
m () -> IO ()
forall a. m a -> IO a
runInIO (m () -> IO ()) -> m () -> IO ()
forall a b. (a -> b) -> a -> b
$ Handle -> m ()
action Handle
handle
connectRetry ::
Verbosity ->
Double ->
Double ->
FilePath ->
IO Handle
connectRetry :: Verbosity -> Double -> Double -> String -> IO Handle
connectRetry Verbosity
verbosity Double
initialTimeoutMcs Double
timeoutExponent String
eventlogSocketUnix =
Double -> IO Handle
connectLoop Double
initialTimeoutMcs
where
waitFor :: Double -> IO ()
waitFor :: Double -> IO ()
waitFor Double
timeoutMcs = Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Double -> Int
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int) -> Double -> Int
forall a b. (a -> b) -> a -> b
$ Double
timeoutMcs Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1_000_000
connectLoop :: Double -> IO Handle
connectLoop :: Double -> IO Handle
connectLoop Double
timeoutMcs = do
let connect :: IO Handle
connect = do
Verbosity -> Text -> IO ()
forall (m :: * -> *).
(HasCallStack, MonadIO m) =>
Verbosity -> Text -> m ()
logDebug Verbosity
verbosity (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Trying to connect on " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
prettyEventlogSocketUnix String
eventlogSocketUnix
Handle
handle <- String -> IO Handle
tryConnect String
eventlogSocketUnix
Verbosity -> Text -> IO ()
forall (m :: * -> *).
(HasCallStack, MonadIO m) =>
Verbosity -> Text -> m ()
logInfo Verbosity
verbosity (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Connected on " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
prettyEventlogSocketUnix String
eventlogSocketUnix
Handle -> IO Handle
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Handle
handle
let cleanup :: IOException -> IO Handle
cleanup (IOException
e :: E.IOException) = do
Verbosity -> Text -> IO ()
forall (m :: * -> *).
(HasCallStack, MonadIO m) =>
Verbosity -> Text -> m ()
logDebug Verbosity
verbosity (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Failed to connect on " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
prettyEventlogSocketUnix String
eventlogSocketUnix Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
": " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (IOException -> String
forall e. Exception e => e -> String
displayException IOException
e)
Verbosity -> Text -> IO ()
forall (m :: * -> *).
(HasCallStack, MonadIO m) =>
Verbosity -> Text -> m ()
logDebug Verbosity
verbosity (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Waiting " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Double -> Text
prettyTimeoutMcs Double
timeoutMcs Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" to retry..."
Double -> IO ()
waitFor Double
timeoutMcs
Double -> IO Handle
connectLoop (Double
timeoutMcs Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
timeoutExponent)
IO Handle -> (IOException -> IO Handle) -> IO Handle
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
E.catch IO Handle
connect IOException -> IO Handle
cleanup
tryConnect :: FilePath -> IO Handle
tryConnect :: String -> IO Handle
tryConnect String
eventlogSocketUnix =
IO Socket
-> (Socket -> IO ()) -> (Socket -> IO Handle) -> IO Handle
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
E.bracketOnError (Family -> SocketType -> ProtocolNumber -> IO Socket
S.socket Family
S.AF_UNIX SocketType
S.Stream ProtocolNumber
S.defaultProtocol) Socket -> IO ()
S.close ((Socket -> IO Handle) -> IO Handle)
-> (Socket -> IO Handle) -> IO Handle
forall a b. (a -> b) -> a -> b
$ \Socket
socket -> do
Socket -> SockAddr -> IO ()
S.connect Socket
socket (String -> SockAddr
S.SockAddrUnix String
eventlogSocketUnix)
Handle
handle <- Socket -> IOMode -> IO Handle
S.socketToHandle Socket
socket IOMode
IO.ReadMode
Handle -> BufferMode -> IO ()
IO.hSetBuffering Handle
handle BufferMode
IO.NoBuffering
Handle -> IO Handle
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Handle
handle
prettyTimeoutMcs :: Double -> Text
prettyTimeoutMcs :: Double -> Text
prettyTimeoutMcs Double
timeoutMcs
| Double
timeoutMcs Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
8.64e10 = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ String -> Double -> String
forall r. PrintfType r => String -> r
printf String
"%.2f days" (Double
timeoutMcs Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
8.64e10)
| Double
timeoutMcs Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
3.6e9 = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ String -> Double -> String
forall r. PrintfType r => String -> r
printf String
"%.2f hours" (Double
timeoutMcs Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
3.6e9)
| Double
timeoutMcs Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
6e7 = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ String -> Double -> String
forall r. PrintfType r => String -> r
printf String
"%.2f minutes" (Double
timeoutMcs Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
6e7)
| Double
timeoutMcs Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
1e6 = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ String -> Double -> String
forall r. PrintfType r => String -> r
printf String
"%.2f seconds" (Double
timeoutMcs Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
1e6)
| Double
timeoutMcs Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
1e3 = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ String -> Double -> String
forall r. PrintfType r => String -> r
printf String
"%.2f milliseconds" (Double
timeoutMcs Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
1e3)
| Bool
otherwise = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ String -> Double -> String
forall r. PrintfType r => String -> r
printf String
"%.2f microseconds" Double
timeoutMcs
prettyEventlogSocketUnix :: FilePath -> Text
prettyEventlogSocketUnix :: String -> Text
prettyEventlogSocketUnix String
eventlogSocketUnix = Text
"Unix socket " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
eventlogSocketUnix