{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE NoLinearTypes #-}

module Control.Concurrent.STM.TMDequeRingBuffer (
  -- * The TMDeque type
  TMDeque,

  -- * Construction
  newTMDeque,
  newTMDequeIO,

  -- * Push operations
  pushFrontTMDeque,

  -- * Pop operations (blocking)
  popFrontTMDeque,
  popBackTMDeque,

  -- * Pop operations (non-blocking)
  tryPopFrontTMDeque,
  tryPopBackTMDeque,

  -- * Closing & queries
  closeTMDeque,
  isClosedTMDeque,
  isClosedTMDequeIO,
  isEmptyTMDeque,
  estimateSizeTMDequeIO,
  sizeTMDeque,
) where

import Control.Concurrent.STM
import Control.Monad (when)
import Data.Array.Base (newArray_, readArray, writeArray)
import Data.Function (fix, (&))

{- | 0 | 1 | 2 | ... | i | ... | N - 1 |
      ^             ^
      |             |
     back         front
-}
data TMDeque a = TMDeque
  { forall a. TMDeque a -> TVar Bool
closed :: TVar Bool
  , forall a. TMDeque a -> TVar (TArray Int a)
ringBuffer :: TVar (TArray Int a)
  , forall a. TMDeque a -> TVar Int
capacity :: TVar Int
  , forall a. TMDeque a -> TVar Int
front :: TVar Int
  , forall a. TMDeque a -> TVar Int
back :: TVar Int
  }

newtype UniqIx = UniqIx Int

initialCapacity :: Int
initialCapacity :: Int
initialCapacity = Int
64

-- | Create a new empty 'TMDeque'.
newTMDeque :: STM (TMDeque a)
newTMDeque :: forall a. STM (TMDeque a)
newTMDeque =
  TVar Bool
-> TVar (TArray Int a)
-> TVar Int
-> TVar Int
-> TVar Int
-> TMDeque a
forall a.
TVar Bool
-> TVar (TArray Int a)
-> TVar Int
-> TVar Int
-> TVar Int
-> TMDeque a
TMDeque
    (TVar Bool
 -> TVar (TArray Int a)
 -> TVar Int
 -> TVar Int
 -> TVar Int
 -> TMDeque a)
-> STM (TVar Bool)
-> STM
     (TVar (TArray Int a)
      -> TVar Int -> TVar Int -> TVar Int -> TMDeque a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Bool -> STM (TVar Bool)
forall a. a -> STM (TVar a)
newTVar Bool
False
    STM
  (TVar (TArray Int a)
   -> TVar Int -> TVar Int -> TVar Int -> TMDeque a)
-> STM (TVar (TArray Int a))
-> STM (TVar Int -> TVar Int -> TVar Int -> TMDeque a)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (TArray Int a -> STM (TVar (TArray Int a))
forall a. a -> STM (TVar a)
newTVar (TArray Int a -> STM (TVar (TArray Int a)))
-> STM (TArray Int a) -> STM (TVar (TArray Int a))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (Int, Int) -> STM (TArray Int a)
forall i. Ix i => (i, i) -> STM (TArray i a)
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
(i, i) -> m (a i e)
newArray_ (Int
0, Int
initialCapacity Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1))
    STM (TVar Int -> TVar Int -> TVar Int -> TMDeque a)
-> STM (TVar Int) -> STM (TVar Int -> TVar Int -> TMDeque a)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> STM (TVar Int)
forall a. a -> STM (TVar a)
newTVar Int
initialCapacity
    STM (TVar Int -> TVar Int -> TMDeque a)
-> STM (TVar Int) -> STM (TVar Int -> TMDeque a)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> STM (TVar Int)
forall a. a -> STM (TVar a)
newTVar Int
0
    STM (TVar Int -> TMDeque a) -> STM (TVar Int) -> STM (TMDeque a)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> STM (TVar Int)
forall a. a -> STM (TVar a)
newTVar Int
0

-- | IO variant of 'newTMDeque', which is faster without STM transaction overhead.
newTMDequeIO :: IO (TMDeque a)
newTMDequeIO :: forall a. IO (TMDeque a)
newTMDequeIO =
  TVar Bool
-> TVar (TArray Int a)
-> TVar Int
-> TVar Int
-> TVar Int
-> TMDeque a
forall a.
TVar Bool
-> TVar (TArray Int a)
-> TVar Int
-> TVar Int
-> TVar Int
-> TMDeque a
TMDeque
    (TVar Bool
 -> TVar (TArray Int a)
 -> TVar Int
 -> TVar Int
 -> TVar Int
 -> TMDeque a)
-> IO (TVar Bool)
-> IO
     (TVar (TArray Int a)
      -> TVar Int -> TVar Int -> TVar Int -> TMDeque a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
    IO
  (TVar (TArray Int a)
   -> TVar Int -> TVar Int -> TVar Int -> TMDeque a)
-> IO (TVar (TArray Int a))
-> IO (TVar Int -> TVar Int -> TVar Int -> TMDeque a)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (TArray Int a -> IO (TVar (TArray Int a))
forall a. a -> IO (TVar a)
newTVarIO (TArray Int a -> IO (TVar (TArray Int a)))
-> IO (TArray Int a) -> IO (TVar (TArray Int a))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (Int, Int) -> IO (TArray Int a)
forall i. Ix i => (i, i) -> IO (TArray i a)
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
(i, i) -> m (a i e)
newArray_ (Int
0, Int
initialCapacity Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1))
    IO (TVar Int -> TVar Int -> TVar Int -> TMDeque a)
-> IO (TVar Int) -> IO (TVar Int -> TVar Int -> TMDeque a)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
initialCapacity
    IO (TVar Int -> TVar Int -> TMDeque a)
-> IO (TVar Int) -> IO (TVar Int -> TMDeque a)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0
    IO (TVar Int -> TMDeque a) -> IO (TVar Int) -> IO (TMDeque a)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0

growThreshold :: Int
growThreshold :: Int
growThreshold = Int
16

{- | Push an element to the front of the deque.  Silently ignored if the
deque is closed.
-}
pushFrontTMDeque :: TMDeque a -> a -> STM ()
pushFrontTMDeque :: forall a. TMDeque a -> a -> STM ()
pushFrontTMDeque TMDeque a
deq a
v = do
  TMDeque a -> STM ()
forall a. TMDeque a -> STM ()
growIfNeeded TMDeque a
deq
  capa <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TMDeque a
deq.capacity
  UniqIx dest <- stateTVar deq.front \Int
i ->
    let !j :: Int
j = Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
     in (Int -> UniqIx
UniqIx (Int -> UniqIx) -> Int -> UniqIx
forall a b. (a -> b) -> a -> b
$ Int
i Int -> Int -> Int
forall a. Integral a => a -> a -> a
`rem` Int
capa, Int
j)
  buf <- readTVar deq.ringBuffer
  writeArray buf dest v

growIfNeeded :: TMDeque a -> STM ()
{-# INLINE growIfNeeded #-}
growIfNeeded :: forall a. TMDeque a -> STM ()
growIfNeeded TMDeque a
deq = do
  capa <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TMDeque a
deq.capacity
  size <- sizeTMDeque deq
  when (capa - size - 1 <= growThreshold) do
    ring <- doubleDeq capa deq
    writeTVar deq.ringBuffer ring
    writeTVar deq.capacity (capa * 2)

sizeTMDeque :: TMDeque a -> STM Int
sizeTMDeque :: forall a. TMDeque a -> STM Int
sizeTMDeque TMDeque a
deq = do
  front <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TMDeque a
deq.front
  back <- readTVar deq.back
  pure $ front - back

doubleDeq :: Int -> TMDeque a -> STM (TArray Int a)
{-# INLINE doubleDeq #-}
doubleDeq :: forall a. Int -> TMDeque a -> STM (TArray Int a)
doubleDeq Int
oldSize TMDeque a
deq = do
  let !newSize :: Int
newSize = Int
oldSize Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
2
  back <- (Int -> Int -> Int
forall a. Integral a => a -> a -> a
`rem` Int
oldSize) (Int -> Int) -> STM Int -> STM Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TMDeque a
deq.back
  front <- (`rem` oldSize) <$> readTVar deq.front
  arr <- readTVar deq.ringBuffer
  dest <- newArray_ (0, newSize - 1)
  if back <= front
    then -- linear copy on [back, front]
      back & fix \Int -> STM ()
go !Int
i -> Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
front) do
        e <- TArray Int a -> Int -> STM a
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> m e
readArray TArray Int a
arr Int
i
        writeArray dest i e
        go (i + 1)
    else do
      -- first copy [0, front), then copy [back, oldSize)
      0 & fix \Int -> STM ()
go !Int
i -> Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
front) do
        e <- TArray Int a -> Int -> STM a
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> m e
readArray TArray Int a
arr Int
i
        writeArray dest i e
        go (i + 1)
      back & fix \Int -> STM ()
go !Int
i -> Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
oldSize) do
        e <- TArray Int a -> Int -> STM a
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> m e
readArray TArray Int a
arr Int
i
        writeArray dest (i + oldSize) e
        go (i + 1)
  pure dest

{- | Pop an element from the front.  Blocks if the deque is open and empty.
Returns @Nothing@ when the deque is closed and empty (back-of-stream).
-}
popFrontTMDeque :: TMDeque a -> STM (Maybe a)
popFrontTMDeque :: forall a. TMDeque a -> STM (Maybe a)
popFrontTMDeque TMDeque a
deq = do
  may <- TMDeque a -> STM (Maybe (Maybe a))
forall a. TMDeque a -> STM (Maybe (Maybe a))
tryPopFrontTMDeque TMDeque a
deq
  maybe retry pure may

{- | Pop an element from the back.  Blocks if the deque is open and empty.
Returns @Nothing@ when the deque is closed and empty (back-of-stream).
-}
popBackTMDeque :: TMDeque a -> STM (Maybe a)
popBackTMDeque :: forall a. TMDeque a -> STM (Maybe a)
popBackTMDeque TMDeque a
deq = do
  may <- TMDeque a -> STM (Maybe (Maybe a))
forall a. TMDeque a -> STM (Maybe (Maybe a))
tryPopBackTMDeque TMDeque a
deq
  maybe retry pure may

{- | Non-blocking pop from the front.

  * @Nothing@         — closed (end-of-stream)
  * @Just Nothing@    — open and empty (would block)
  * @Just (Just a)@   — got an element
-}
tryPopFrontTMDeque :: TMDeque a -> STM (Maybe (Maybe a))
tryPopFrontTMDeque :: forall a. TMDeque a -> STM (Maybe (Maybe a))
tryPopFrontTMDeque TMDeque a
deq = do
  size <- TMDeque a -> STM Int
forall a. TMDeque a -> STM Int
sizeTMDeque TMDeque a
deq
  if size == 0
    then do
      closed <- readTVar deq.closed
      if closed
        then pure Nothing
        else pure (Just Nothing)
    else do
      capa <- readTVar deq.capacity
      UniqIx dest <- stateTVar deq.front \Int
i ->
        let !j :: Int
j = Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
         in (Int -> UniqIx
UniqIx (Int -> UniqIx) -> Int -> UniqIx
forall a b. (a -> b) -> a -> b
$ Int
j Int -> Int -> Int
forall a. Integral a => a -> a -> a
`rem` Int
capa, Int
j)
      buf <- readTVar deq.ringBuffer
      e <- readArray buf dest
      pure (Just (Just e))

{- | Non-blocking pop from the back.

  * @Nothing@         — closed (end-of-stream)
  * @Just Nothing@    — open and empty (would block)
  * @Just (Just a)@   — got an element
-}
tryPopBackTMDeque :: TMDeque a -> STM (Maybe (Maybe a))
tryPopBackTMDeque :: forall a. TMDeque a -> STM (Maybe (Maybe a))
tryPopBackTMDeque TMDeque a
deq = do
  back <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TMDeque a
deq.back
  front <- readTVar deq.front
  if back == front
    then do
      closed <- readTVar deq.closed
      if closed
        then pure Nothing
        else pure (Just Nothing)
    else do
      capa <- readTVar deq.capacity
      UniqIx dest <- stateTVar deq.back \Int
i ->
        let !j :: Int
j = Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
         in (Int -> UniqIx
UniqIx (Int -> UniqIx) -> Int -> UniqIx
forall a b. (a -> b) -> a -> b
$ Int
i Int -> Int -> Int
forall a. Integral a => a -> a -> a
`rem` Int
capa, Int
j)
      buf <- readTVar deq.ringBuffer
      e <- readArray buf dest
      pure (Just (Just e))

-- | Close the deque.  After this, all push operations will be ignored, and all pop operations will return @Nothing@ once the deque is empty.
closeTMDeque :: TMDeque a -> STM ()
closeTMDeque :: forall a. TMDeque a -> STM ()
closeTMDeque TMDeque a
deq = TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TMDeque a
deq.closed Bool
True

-- | Check if the deque is closed.
isClosedTMDeque :: TMDeque a -> STM Bool
isClosedTMDeque :: forall a. TMDeque a -> STM Bool
isClosedTMDeque TMDeque a
deq = TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TMDeque a
deq.closed

-- | IO variant of 'isClosedTMDeque'.
isClosedTMDequeIO :: TMDeque a -> IO Bool
isClosedTMDequeIO :: forall a. TMDeque a -> IO Bool
isClosedTMDequeIO TMDeque a
deq = TVar Bool -> IO Bool
forall a. TVar a -> IO a
readTVarIO TMDeque a
deq.closed

-- | Check if the deque is empty.  Note that an open deque may become non-empty after this returns.
isEmptyTMDeque :: TMDeque a -> STM Bool
isEmptyTMDeque :: forall a. TMDeque a -> STM Bool
isEmptyTMDeque TMDeque a
deq = Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
(==) (Int -> Int -> Bool) -> STM Int -> STM (Int -> Bool)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TMDeque a
deq.front STM (Int -> Bool) -> STM Int -> STM Bool
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TMDeque a
deq.back

-- | IO variant of 'countTMDeque'.
estimateSizeTMDequeIO :: TMDeque a -> IO Int
estimateSizeTMDequeIO :: forall a. TMDeque a -> IO Int
estimateSizeTMDequeIO TMDeque a
deq =
  (-) (Int -> Int -> Int) -> IO Int -> IO (Int -> Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar Int -> IO Int
forall a. TVar a -> IO a
readTVarIO TMDeque a
deq.front IO (Int -> Int) -> IO Int -> IO Int
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TVar Int -> IO Int
forall a. TVar a -> IO a
readTVarIO TMDeque a
deq.back