{- |
Circular buffer implementation using STM TBMQueue with drop-oldest semantics.

This module provides a circular buffer built on top of "Control.Concurrent.STM.TBMQueue"
that automatically drops the oldest items when the buffer becomes full.

Example usage:

@
import Control.Concurrent.STM.CircularBuffer qualified as CB

main :: IO ()
main = atomically $ do
  buf <- CB.'new' 3
  CB.'add' "first" buf
  CB.'add' "second" buf
  CB.'add' "third" buf
  CB.'add' "fourth" buf  -- Drops "first"
  items <- CB.'drain' buf
  -- items will be Just ("second" :| ["third", "fourth"])
@
-}
module Control.Concurrent.STM.CircularBuffer (
  CircularBuffer,
  new,
  add,
  clone,
  drain,
  close,
) where

import Control.Concurrent.STM.TBMQueue (TBMQueue, closeTBMQueue, isFullTBMQueue, newTBMQueue, readTBMQueue, tryReadTBMQueue, writeTBMQueue)

-- | Circular buffer using TBMQueue - when full, oldest items are dropped
data CircularBuffer a = CircularBuffer (TBMQueue a) Int

{- | Create a new circular buffer with the given capacity.

The buffer will hold at most @capacity@ items. When full, adding new items
will cause the oldest items to be dropped.
-}
new :: Int -> STM (CircularBuffer a)
new :: forall a. Int -> STM (CircularBuffer a)
new Int
cap = TBMQueue a -> Int -> CircularBuffer a
forall a. TBMQueue a -> Int -> CircularBuffer a
CircularBuffer (TBMQueue a -> Int -> CircularBuffer a)
-> STM (TBMQueue a) -> STM (Int -> CircularBuffer a)
forall (f :: Type -> Type) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> STM (TBMQueue a)
forall a. Int -> STM (TBMQueue a)
newTBMQueue Int
cap STM (Int -> CircularBuffer a) -> STM Int -> STM (CircularBuffer a)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: Type -> Type) a b.
Applicative f =>
f (a -> b) -> f a -> f b
<*> Int -> STM Int
forall a. a -> STM a
forall (f :: Type -> Type) a. Applicative f => a -> f a
pure Int
cap

{- | Add an element to the circular buffer.

If the buffer is full, the oldest item will be dropped to make room
for the new item.
-}
add :: a -> CircularBuffer a -> STM ()
add :: forall a. a -> CircularBuffer a -> STM ()
add a
item (CircularBuffer TBMQueue a
queue Int
_) = do
  STM Bool -> STM () -> STM ()
forall (m :: Type -> Type). Monad m => m Bool -> m () -> m ()
whenM (TBMQueue a -> STM Bool
forall a. TBMQueue a -> STM Bool
isFullTBMQueue TBMQueue a
queue) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ do
    STM (Maybe a) -> STM ()
forall (f :: Type -> Type) a. Functor f => f a -> f ()
void (STM (Maybe a) -> STM ()) -> STM (Maybe a) -> STM ()
forall a b. (a -> b) -> a -> b
$ TBMQueue a -> STM (Maybe a)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue a
queue -- Drop oldest item
  TBMQueue a -> a -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue a
queue a
item

{- | Clone the contents of a circular buffer into a new buffer with the same capacity.

This operation drains all items from the source buffer, copies them to a new buffer,
and then puts them back into the source buffer. The items maintain their order.

Throws an error if the source buffer is closed.
-}
clone :: (HasCallStack) => CircularBuffer a -> STM (CircularBuffer a)
clone :: forall a.
HasCallStack =>
CircularBuffer a -> STM (CircularBuffer a)
clone sourceBuffer :: CircularBuffer a
sourceBuffer@(CircularBuffer TBMQueue a
_ Int
capacity) = do
  CircularBuffer a
newBuffer <- Int -> STM (CircularBuffer a)
forall a. Int -> STM (CircularBuffer a)
new Int
capacity
  -- Use drain to get all items without blocking if empty
  CircularBuffer a -> STM (Maybe (NonEmpty a))
forall a. CircularBuffer a -> STM (Maybe (NonEmpty a))
drain CircularBuffer a
sourceBuffer STM (Maybe (NonEmpty a))
-> (Maybe (NonEmpty a) -> STM (CircularBuffer a))
-> STM (CircularBuffer a)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: Type -> Type) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Maybe (NonEmpty a)
Nothing -> Text -> STM (CircularBuffer a)
forall a t. (HasCallStack, IsText t) => t -> a
error Text
"Cannot clone a closed CircularBuffer"
    Just (NonEmpty a -> [a]
forall a. NonEmpty a -> [a]
forall (t :: Type -> Type) a. Foldable t => t a -> [a]
toList -> [a]
items) -> do
      -- Add all items to new buffer and put them back in source
      (a -> STM ()) -> [a] -> STM ()
forall (t :: Type -> Type) (m :: Type -> Type) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (a -> CircularBuffer a -> STM ()
forall a. a -> CircularBuffer a -> STM ()
`add` CircularBuffer a
newBuffer) [a]
items
      -- Put items back in source buffer (FIFO order)
      (a -> STM ()) -> [a] -> STM ()
forall (t :: Type -> Type) (m :: Type -> Type) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (a -> CircularBuffer a -> STM ()
forall a. a -> CircularBuffer a -> STM ()
`add` CircularBuffer a
sourceBuffer) [a]
items
      CircularBuffer a -> STM (CircularBuffer a)
forall a. a -> STM a
forall (f :: Type -> Type) a. Applicative f => a -> f a
pure CircularBuffer a
newBuffer

{- | Read all currently available items, blocking for the first item.

This function will block until at least one item is available, then drain
all remaining available items without blocking.

Returns 'Nothing' if the queue is closed, or @'Just' items@ if at least
one item is available.
-}
drain :: CircularBuffer a -> STM (Maybe (NonEmpty a))
drain :: forall a. CircularBuffer a -> STM (Maybe (NonEmpty a))
drain (CircularBuffer TBMQueue a
queue Int
_) = do
  -- First, block for one item (same behavior as readTBMQueue)
  TBMQueue a -> STM (Maybe a)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue a
queue STM (Maybe a)
-> (Maybe a -> STM (Maybe (NonEmpty a)))
-> STM (Maybe (NonEmpty a))
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: Type -> Type) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Maybe a
Nothing -> Maybe (NonEmpty a) -> STM (Maybe (NonEmpty a))
forall a. a -> STM a
forall (f :: Type -> Type) a. Applicative f => a -> f a
pure Maybe (NonEmpty a)
forall a. Maybe a
Nothing -- Queue is closed
    Just a
x -> do
      -- Got first item, now collect all other available items without blocking
      let go :: [a] -> STM [a]
go [a]
acc = do
            Maybe (Maybe a)
item <- TBMQueue a -> STM (Maybe (Maybe a))
forall a. TBMQueue a -> STM (Maybe (Maybe a))
tryReadTBMQueue TBMQueue a
queue
            case Maybe (Maybe a)
item of
              Maybe (Maybe a)
Nothing -> [a] -> STM [a]
forall a. a -> STM a
forall (f :: Type -> Type) a. Applicative f => a -> f a
pure ([a] -> [a]
forall a. [a] -> [a]
reverse [a]
acc) -- Queue is closed
              Just Maybe a
Nothing -> [a] -> STM [a]
forall a. a -> STM a
forall (f :: Type -> Type) a. Applicative f => a -> f a
pure ([a] -> [a]
forall a. [a] -> [a]
reverse [a]
acc) -- No more items available
              Just (Just a
y) -> [a] -> STM [a]
go (a
y a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
acc) -- Got another item
      [a]
rest <- [a] -> STM [a]
go []
      Maybe (NonEmpty a) -> STM (Maybe (NonEmpty a))
forall a. a -> STM a
forall (f :: Type -> Type) a. Applicative f => a -> f a
pure (Maybe (NonEmpty a) -> STM (Maybe (NonEmpty a)))
-> Maybe (NonEmpty a) -> STM (Maybe (NonEmpty a))
forall a b. (a -> b) -> a -> b
$ NonEmpty a -> Maybe (NonEmpty a)
forall a. a -> Maybe a
Just (a
x a -> [a] -> NonEmpty a
forall a. a -> [a] -> NonEmpty a
:| [a]
rest)

{- | Close the buffer to signal end-of-stream.

Once closed, no more items can be added to the buffer, and 'drain'
will return 'Nothing' after all remaining items have been drained.
-}
close :: CircularBuffer a -> STM ()
close :: forall a. CircularBuffer a -> STM ()
close (CircularBuffer TBMQueue a
queue Int
_) = TBMQueue a -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue a
queue