{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE RecordWildCards #-}

{- |
File tailing library with multi-subscriber support.

This library provides a Haskell API for @tail -f@ style file streaming
using 'STM', 'Control.Concurrent.Async', and system processes.

Example usage:

@
import System.Tail

main :: IO ()
main = do
  tail <- 'tailFile' 100 \"\/var\/log\/app.log\"
  subscriber <- 'tailSubscribe' tail
  -- Read from subscriber...
  'tailStop' tail
@
-}
module System.Tail (
  -- * Core Types
  Tail,

  -- * Operations
  tailFile,
  tailStop,
  tailSubscribe,
) where

import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (Async, async)
import Control.Concurrent.STM.CircularBuffer (CircularBuffer)
import Control.Concurrent.STM.CircularBuffer qualified as CB
import System.Directory (doesFileExist)
import System.IO (hGetLine)
import System.Process (CreateProcess (..), ProcessHandle, StdStream (..), createProcess, proc, terminateProcess, waitForProcess)

-- | Represent `tail -f`'ing a file in Haskell
data Tail = Tail
  { Tail -> String
filePath :: FilePath
  -- ^ The file being tailed
  , Tail -> TMVar ()
stop :: TMVar ()
  -- ^ Signal to stop tailing
  , Tail -> TVar (Maybe (ProcessHandle, Async ()))
tailProcess :: TVar (Maybe (ProcessHandle, Async ()))
  -- ^ The tail process handle and async reader
  , Tail -> TVar [CircularBuffer Text]
queues :: TVar [CircularBuffer Text]
  -- ^ Active subscriber queues
  , Tail -> CircularBuffer Text
ringBuffer :: CircularBuffer Text
  -- ^ Ring buffer storing last N lines for new subscribers
  }
  deriving stock ((forall x. Tail -> Rep Tail x)
-> (forall x. Rep Tail x -> Tail) -> Generic Tail
forall x. Rep Tail x -> Tail
forall x. Tail -> Rep Tail x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. Tail -> Rep Tail x
from :: forall x. Tail -> Rep Tail x
$cto :: forall x. Rep Tail x -> Tail
to :: forall x. Rep Tail x -> Tail
Generic)

{- | Create a new 'Tail' handle for the given file path with specified buffer size.

The tail process starts immediately and begins reading from the file.
New subscribers will receive a ring buffer containing the last @bufferSize@ lines.
-}
tailFile :: (HasCallStack) => Int -> FilePath -> IO Tail
tailFile :: HasCallStack => Int -> String -> IO Tail
tailFile Int
bufferSize String
filePath = do
  IO Bool -> IO () -> IO ()
forall (m :: Type -> Type). Monad m => m Bool -> m () -> m ()
unlessM (String -> IO Bool
doesFileExist String
filePath) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Text -> IO ()
forall a t. (HasCallStack, IsText t) => t -> a
error (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"File does not exist: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
forall a. ToText a => a -> Text
toText String
filePath
  TVar [CircularBuffer Text]
queues <- [CircularBuffer Text] -> IO (TVar [CircularBuffer Text])
forall (m :: Type -> Type) a. MonadIO m => a -> m (TVar a)
newTVarIO [CircularBuffer Text]
forall a. Monoid a => a
mempty
  TMVar ()
stop <- IO (TMVar ())
forall (m :: Type -> Type) a. MonadIO m => m (TMVar a)
newEmptyTMVarIO
  TVar (Maybe (ProcessHandle, Async ()))
tailProcess <- Maybe (ProcessHandle, Async ())
-> IO (TVar (Maybe (ProcessHandle, Async ())))
forall (m :: Type -> Type) a. MonadIO m => a -> m (TVar a)
newTVarIO Maybe (ProcessHandle, Async ())
forall a. Maybe a
Nothing
  CircularBuffer Text
ringBuffer <- STM (CircularBuffer Text) -> IO (CircularBuffer Text)
forall (m :: Type -> Type) a. MonadIO m => STM a -> m a
atomically (STM (CircularBuffer Text) -> IO (CircularBuffer Text))
-> STM (CircularBuffer Text) -> IO (CircularBuffer Text)
forall a b. (a -> b) -> a -> b
$ Int -> STM (CircularBuffer Text)
forall a. Int -> STM (CircularBuffer a)
CB.new Int
bufferSize
  let t :: Tail
t = Tail {String
TVar [CircularBuffer Text]
TVar (Maybe (ProcessHandle, Async ()))
TMVar ()
CircularBuffer Text
filePath :: String
stop :: TMVar ()
tailProcess :: TVar (Maybe (ProcessHandle, Async ()))
queues :: TVar [CircularBuffer Text]
ringBuffer :: CircularBuffer Text
filePath :: String
queues :: TVar [CircularBuffer Text]
stop :: TMVar ()
tailProcess :: TVar (Maybe (ProcessHandle, Async ()))
ringBuffer :: CircularBuffer Text
..}
  -- Start the tail process immediately
  IO (Async ()) -> IO ()
forall (f :: Type -> Type) a. Functor f => f a -> f ()
void (IO (Async ()) -> IO ()) -> IO (Async ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ Tail -> IO ()
tailRun Tail
t
  Tail -> IO Tail
forall a. a -> IO a
forall (f :: Type -> Type) a. Applicative f => a -> f a
pure Tail
t

{- | Signal the tail process to stop reading the file.

This will terminate the underlying @tail@ process and close all subscriber queues.
-}
tailStop :: Tail -> IO ()
tailStop :: Tail -> IO ()
tailStop Tail
t = do
  STM () -> IO ()
forall (m :: Type -> Type) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar () -> () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar Tail
t.stop ()

tailRun :: Tail -> IO ()
tailRun :: Tail -> IO ()
tailRun Tail
t = do
  -- Start the tail -F process (show entire file from beginning)
  let createProc :: CreateProcess
createProc = (String -> [String] -> CreateProcess
proc String
"tail" [String
"-F", String
"-n", String
"+1", Tail
t.filePath]) {std_out = CreatePipe}
  (Maybe Handle
_, Just Handle
hout, Maybe Handle
_, ProcessHandle
ph) <- CreateProcess
-> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
createProcess CreateProcess
createProc

  -- Start async reader that reads from tail process and distributes to queues
  Async ()
readerAsync <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ Handle -> IO ()
readAndDistribute Handle
hout

  -- Store the process and reader
  STM () -> IO ()
forall (m :: Type -> Type) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (ProcessHandle, Async ()))
-> Maybe (ProcessHandle, Async ()) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar Tail
t.tailProcess ((ProcessHandle, Async ()) -> Maybe (ProcessHandle, Async ())
forall a. a -> Maybe a
Just (ProcessHandle
ph, Async ()
readerAsync))

  -- Wait for stop signal
  STM () -> IO ()
forall (m :: Type -> Type) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar () -> STM ()
forall a. TMVar a -> STM a
takeTMVar Tail
t.stop

  -- Clean up: terminate process and wait for reader to finish at EOF
  Int -> IO ()
threadDelay Int
1_000_000 -- Give `tail -f` a second to flush any remaining lines
  ProcessHandle -> IO ()
terminateProcess ProcessHandle
ph
  IO ExitCode -> IO ()
forall (f :: Type -> Type) a. Functor f => f a -> f ()
void (IO ExitCode -> IO ()) -> IO ExitCode -> IO ()
forall a b. (a -> b) -> a -> b
$ ProcessHandle -> IO ExitCode
waitForProcess ProcessHandle
ph

  -- Reader will naturally stop when it reaches EOF

  -- Close all queues so readers can detect end of stream
  STM () -> IO ()
forall (m :: Type -> Type) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    [CircularBuffer Text]
qs <- TVar [CircularBuffer Text] -> STM [CircularBuffer Text]
forall a. TVar a -> STM a
readTVar Tail
t.queues
    (CircularBuffer Text -> STM ()) -> [CircularBuffer Text] -> STM ()
forall (t :: Type -> Type) (m :: Type -> Type) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ CircularBuffer Text -> STM ()
forall a. CircularBuffer a -> STM ()
CB.close [CircularBuffer Text]
qs

  STM () -> IO ()
forall (m :: Type -> Type) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (ProcessHandle, Async ()))
-> Maybe (ProcessHandle, Async ()) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar Tail
t.tailProcess Maybe (ProcessHandle, Async ())
forall a. Maybe a
Nothing
  where
    readAndDistribute :: Handle -> IO ()
    readAndDistribute :: Handle -> IO ()
readAndDistribute Handle
h = do
      let readLines :: IO ()
readLines = do
            Handle -> IO Bool
forall (m :: Type -> Type). MonadIO m => Handle -> m Bool
hIsEOF Handle
h IO Bool -> (Bool -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: Type -> Type) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
              Bool
True -> IO ()
forall (f :: Type -> Type). Applicative f => f ()
pass -- EOF reached, stop reading
              Bool
False -> do
                Text
line <- String -> Text
forall a. ToText a => a -> Text
toText (String -> Text) -> IO String -> IO Text
forall (f :: Type -> Type) a b. Functor f => (a -> b) -> f a -> f b
<$> Handle -> IO String
hGetLine Handle
h
                -- Add to ring buffer and distribute to all queues
                STM () -> IO ()
forall (m :: Type -> Type) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                  -- Update ring buffer
                  Text -> CircularBuffer Text -> STM ()
forall a. a -> CircularBuffer a -> STM ()
CB.add Text
line Tail
t.ringBuffer
                  -- Distribute to subscriber queues
                  [CircularBuffer Text]
qs <- TVar [CircularBuffer Text] -> STM [CircularBuffer Text]
forall a. TVar a -> STM a
readTVar Tail
t.queues
                  [CircularBuffer Text] -> (CircularBuffer Text -> STM ()) -> STM ()
forall (t :: Type -> Type) (m :: Type -> Type) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [CircularBuffer Text]
qs ((CircularBuffer Text -> STM ()) -> STM ())
-> (CircularBuffer Text -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \CircularBuffer Text
q ->
                    Text -> CircularBuffer Text -> STM ()
forall a. a -> CircularBuffer a -> STM ()
CB.add Text
line CircularBuffer Text
q
                IO ()
readLines
      IO ()
readLines

{- | Subscribe to tail output and receive a 'CircularBuffer' for reading lines.

The returned buffer will contain any previously read lines from the ring buffer,
plus all new lines as they are read from the file.

Use 'Control.Concurrent.STM.CircularBuffer.drain' to read from the buffer.
-}
tailSubscribe :: Tail -> IO (CircularBuffer Text)
tailSubscribe :: Tail -> IO (CircularBuffer Text)
tailSubscribe Tail
t = STM (CircularBuffer Text) -> IO (CircularBuffer Text)
forall (m :: Type -> Type) a. MonadIO m => STM a -> m a
atomically (STM (CircularBuffer Text) -> IO (CircularBuffer Text))
-> STM (CircularBuffer Text) -> IO (CircularBuffer Text)
forall a b. (a -> b) -> a -> b
$ do
  -- Clone ring buffer as CircularBuffer with buffered lines
  CircularBuffer Text
queue <- CircularBuffer Text -> STM (CircularBuffer Text)
forall a.
HasCallStack =>
CircularBuffer a -> STM (CircularBuffer a)
CB.clone Tail
t.ringBuffer
  -- Add to active subscribers
  TVar [CircularBuffer Text]
-> ([CircularBuffer Text] -> [CircularBuffer Text]) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' Tail
t.queues (CircularBuffer Text
queue :)
  CircularBuffer Text -> STM (CircularBuffer Text)
forall a. a -> STM a
forall (f :: Type -> Type) a. Applicative f => a -> f a
pure CircularBuffer Text
queue