module Control.Concurrent.STM.CircularBuffer (
CircularBuffer,
new,
add,
clone,
drain,
close,
) where
import Control.Concurrent.STM.TBMQueue (TBMQueue, closeTBMQueue, isFullTBMQueue, newTBMQueue, readTBMQueue, tryReadTBMQueue, writeTBMQueue)
data CircularBuffer a = CircularBuffer (TBMQueue a) Int
new :: Int -> STM (CircularBuffer a)
new :: forall a. Int -> STM (CircularBuffer a)
new Int
cap = TBMQueue a -> Int -> CircularBuffer a
forall a. TBMQueue a -> Int -> CircularBuffer a
CircularBuffer (TBMQueue a -> Int -> CircularBuffer a)
-> STM (TBMQueue a) -> STM (Int -> CircularBuffer a)
forall (f :: Type -> Type) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> STM (TBMQueue a)
forall a. Int -> STM (TBMQueue a)
newTBMQueue Int
cap STM (Int -> CircularBuffer a) -> STM Int -> STM (CircularBuffer a)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: Type -> Type) a b.
Applicative f =>
f (a -> b) -> f a -> f b
<*> Int -> STM Int
forall a. a -> STM a
forall (f :: Type -> Type) a. Applicative f => a -> f a
pure Int
cap
add :: a -> CircularBuffer a -> STM ()
add :: forall a. a -> CircularBuffer a -> STM ()
add a
item (CircularBuffer TBMQueue a
queue Int
_) = do
STM Bool -> STM () -> STM ()
forall (m :: Type -> Type). Monad m => m Bool -> m () -> m ()
whenM (TBMQueue a -> STM Bool
forall a. TBMQueue a -> STM Bool
isFullTBMQueue TBMQueue a
queue) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ do
STM (Maybe a) -> STM ()
forall (f :: Type -> Type) a. Functor f => f a -> f ()
void (STM (Maybe a) -> STM ()) -> STM (Maybe a) -> STM ()
forall a b. (a -> b) -> a -> b
$ TBMQueue a -> STM (Maybe a)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue a
queue
TBMQueue a -> a -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue a
queue a
item
clone :: (HasCallStack) => CircularBuffer a -> STM (CircularBuffer a)
clone :: forall a.
HasCallStack =>
CircularBuffer a -> STM (CircularBuffer a)
clone sourceBuffer :: CircularBuffer a
sourceBuffer@(CircularBuffer TBMQueue a
_ Int
capacity) = do
CircularBuffer a
newBuffer <- Int -> STM (CircularBuffer a)
forall a. Int -> STM (CircularBuffer a)
new Int
capacity
CircularBuffer a -> STM (Maybe (NonEmpty a))
forall a. CircularBuffer a -> STM (Maybe (NonEmpty a))
drain CircularBuffer a
sourceBuffer STM (Maybe (NonEmpty a))
-> (Maybe (NonEmpty a) -> STM (CircularBuffer a))
-> STM (CircularBuffer a)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: Type -> Type) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe (NonEmpty a)
Nothing -> Text -> STM (CircularBuffer a)
forall a t. (HasCallStack, IsText t) => t -> a
error Text
"Cannot clone a closed CircularBuffer"
Just (NonEmpty a -> [a]
forall a. NonEmpty a -> [a]
forall (t :: Type -> Type) a. Foldable t => t a -> [a]
toList -> [a]
items) -> do
(a -> STM ()) -> [a] -> STM ()
forall (t :: Type -> Type) (m :: Type -> Type) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (a -> CircularBuffer a -> STM ()
forall a. a -> CircularBuffer a -> STM ()
`add` CircularBuffer a
newBuffer) [a]
items
(a -> STM ()) -> [a] -> STM ()
forall (t :: Type -> Type) (m :: Type -> Type) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (a -> CircularBuffer a -> STM ()
forall a. a -> CircularBuffer a -> STM ()
`add` CircularBuffer a
sourceBuffer) [a]
items
CircularBuffer a -> STM (CircularBuffer a)
forall a. a -> STM a
forall (f :: Type -> Type) a. Applicative f => a -> f a
pure CircularBuffer a
newBuffer
drain :: CircularBuffer a -> STM (Maybe (NonEmpty a))
drain :: forall a. CircularBuffer a -> STM (Maybe (NonEmpty a))
drain (CircularBuffer TBMQueue a
queue Int
_) = do
TBMQueue a -> STM (Maybe a)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue a
queue STM (Maybe a)
-> (Maybe a -> STM (Maybe (NonEmpty a)))
-> STM (Maybe (NonEmpty a))
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: Type -> Type) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe a
Nothing -> Maybe (NonEmpty a) -> STM (Maybe (NonEmpty a))
forall a. a -> STM a
forall (f :: Type -> Type) a. Applicative f => a -> f a
pure Maybe (NonEmpty a)
forall a. Maybe a
Nothing
Just a
x -> do
let go :: [a] -> STM [a]
go [a]
acc = do
Maybe (Maybe a)
item <- TBMQueue a -> STM (Maybe (Maybe a))
forall a. TBMQueue a -> STM (Maybe (Maybe a))
tryReadTBMQueue TBMQueue a
queue
case Maybe (Maybe a)
item of
Maybe (Maybe a)
Nothing -> [a] -> STM [a]
forall a. a -> STM a
forall (f :: Type -> Type) a. Applicative f => a -> f a
pure ([a] -> [a]
forall a. [a] -> [a]
reverse [a]
acc)
Just Maybe a
Nothing -> [a] -> STM [a]
forall a. a -> STM a
forall (f :: Type -> Type) a. Applicative f => a -> f a
pure ([a] -> [a]
forall a. [a] -> [a]
reverse [a]
acc)
Just (Just a
y) -> [a] -> STM [a]
go (a
y a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
acc)
[a]
rest <- [a] -> STM [a]
go []
Maybe (NonEmpty a) -> STM (Maybe (NonEmpty a))
forall a. a -> STM a
forall (f :: Type -> Type) a. Applicative f => a -> f a
pure (Maybe (NonEmpty a) -> STM (Maybe (NonEmpty a)))
-> Maybe (NonEmpty a) -> STM (Maybe (NonEmpty a))
forall a b. (a -> b) -> a -> b
$ NonEmpty a -> Maybe (NonEmpty a)
forall a. a -> Maybe a
Just (a
x a -> [a] -> NonEmpty a
forall a. a -> [a] -> NonEmpty a
:| [a]
rest)
close :: CircularBuffer a -> STM ()
close :: forall a. CircularBuffer a -> STM ()
close (CircularBuffer TBMQueue a
queue Int
_) = TBMQueue a -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue a
queue