{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE NamedFieldPuns #-}

{-|
Module      : Keter.RateLimiter.TokenBucketWorker
Description : Worker thread implementation for token bucket rate limiting
Copyright   : (c) 2025 Oleksandr Zhabenko
License     : MIT
Maintainer  : oleksandr.zhabenko@yahoo.com
Stability   : experimental
Portability : POSIX

This module provides a concurrent worker thread implementation for the token bucket
rate limiting algorithm. The worker processes incoming requests from a queue and
atomically updates the bucket state using Software Transactional Memory (STM).

== Algorithm Overview

The token bucket algorithm works as follows:

1. __Initialization__: A bucket starts with a certain number of tokens (up to capacity)
2. __Token Refill__: Tokens are added to the bucket at a constant rate over time
3. __Request Processing__: Each request attempts to consume one token
4. __Rate Limiting__: If no tokens are available, the request is denied

== Concurrency Model

The worker uses STM for atomic state updates and communicates via:

* 'TQueue' for receiving incoming requests  
* 'MVar' for sending responses back to clients
* 'TVar' for maintaining bucket state
* 'TMVar' for signaling worker readiness

== Example Usage

@
import Control.Concurrent.STM
import Control.Concurrent.MVar
import Data.Text

-- Create initial bucket state (100 tokens, last updated now)
now <- floor \<$\> getPOSIXTime  
initialState <- newTVarIO $ TokenBucketState 100 now

-- Create communication channels
requestQueue <- newTBroadcastTQueueIO
readySignal <- newEmptyTMVarIO

-- Start worker: 100 token capacity, 10 tokens\/second refill rate
startTokenBucketWorker initialState requestQueue 100 10.0 readySignal

-- Wait for worker to be ready
atomically $ takeTMVar readySignal

-- Send a request and wait for response
replyVar <- newEmptyMVar
atomically $ writeTQueue requestQueue replyVar  
allowed <- takeMVar replyVar  -- True if request allowed, False if denied
@

== Performance Characteristics

* __Time Complexity__: O(1) per request (constant time token calculation)
* __Space Complexity__: O(1) (fixed bucket state size)
* __Concurrency__: Lock-free using STM, supports high throughput
* __Precision__: Uses POSIX timestamps for accurate time-based calculations

== Thread Safety

All operations are thread-safe through STM. Multiple clients can safely
send requests to the same worker concurrently.
-}
module Keter.RateLimiter.TokenBucketWorker
  ( -- * Worker Thread Management
    startTokenBucketWorker
  ) where

import Control.Concurrent.STM
import Control.Monad (void, forever)
import Control.Concurrent (forkIO)
import Control.Monad.IO.Class (liftIO)
import Data.Time.Clock.POSIX (getPOSIXTime)
import Keter.RateLimiter.Types (TokenBucketState(..))
import Control.Concurrent.MVar (MVar, putMVar)

-- | Start a dedicated worker thread for processing token bucket requests.
--
-- The worker runs in an infinite loop, processing requests from the provided queue.
-- Each request is handled atomically: the bucket state is read, tokens are refilled
-- based on elapsed time, a token is consumed if available, and the new state is written back.
--
-- === Worker Lifecycle
--
-- 1. __Startup__: Worker thread is forked and signals readiness via 'TMVar'
-- 2. __Processing Loop__: Worker waits for requests, processes them atomically
-- 3. __Response__: Results are sent back to clients via 'MVar'
--
-- === Token Refill Algorithm
--
-- Tokens are refilled using the formula:
--
-- @
-- newTokens = min capacity (currentTokens + refillRate * elapsedSeconds)
-- @
--
-- This ensures:
--
-- * Tokens are added proportionally to elapsed time
-- * Bucket capacity is never exceeded
-- * Sub-second precision for refill calculations
--
-- === Atomic Request Processing
--
-- Each request is processed in a single STM transaction that:
--
-- 1. Reads current bucket state ('tokens', 'lastUpdate')
-- 2. Calculates elapsed time since last update
-- 3. Computes available tokens after refill
-- 4. Attempts to consume one token if available
-- 5. Updates bucket state with new token count and timestamp
-- 6. Returns allow\/deny decision
--
-- === Error Handling
--
-- The worker is designed to be resilient:
--
-- * Time calculation errors are handled by using 'floor' for integer conversion
-- * Negative elapsed time (clock adjustments) results in no refill
-- * Worker continues running even if individual requests fail
--
-- ==== __Examples__
--
-- @
-- -- Create a bucket for API rate limiting: 1000 requests\/hour = ~0.278 req\/sec
-- let capacity = 100              -- Allow bursts up to 100 requests
--     refillRate = 1000.0 \/ 3600.0 -- 1000 requests per hour
--
-- initialState <- newTVarIO $ TokenBucketState capacity now
-- requestQueue <- newTBroadcastTQueueIO  
-- readySignal <- newEmptyTMVarIO
--
-- startTokenBucketWorker initialState requestQueue capacity refillRate readySignal
-- @
--
-- /Thread Safety:/ All state updates are atomic via STM transactions.
--
-- /Resource Usage:/ Creates one background thread that runs indefinitely.
startTokenBucketWorker
  :: TVar TokenBucketState  -- ^ Shared bucket state (tokens + last update time).
                            --   This 'TVar' is read and updated atomically by the worker.
  -> TQueue (MVar Bool)     -- ^ Request queue containing 'MVar's for client responses.
                            --   Clients place their response 'MVar' in this queue and wait
                            --   for the worker to write the allow\/deny decision.
  -> Int                    -- ^ Maximum bucket capacity (maximum tokens that can be stored).
                            --   This sets the upper limit for burst traffic handling.
                            --   Must be positive.
  -> Double                 -- ^ Token refill rate in tokens per second.
                            --   Determines the long-term sustainable request rate.
                            --   Must be positive. Can be fractional (e.g., 0.5 = 1 token per 2 seconds).
  -> TMVar ()               -- ^ Synchronization variable to signal when worker is ready.
                            --   The worker writes to this 'TMVar' once startup is complete.
                            --   Clients can wait on this to ensure the worker is operational.
  -> IO ()                  -- ^ Returns immediately after forking the worker thread.
                            --   The actual worker runs in the background indefinitely.
startTokenBucketWorker :: TVar TokenBucketState
-> TQueue (MVar Bool) -> Int -> Double -> TMVar () -> IO ()
startTokenBucketWorker TVar TokenBucketState
stateVar TQueue (MVar Bool)
queue Int
capacity Double
refillRate TMVar ()
readyVar = IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> (IO () -> IO ThreadId) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  -- Signal that the worker is ready
  STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar () -> () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar ()
readyVar ()
  
  IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    -- Wait for a request to arrive in the queue
    MVar Bool
replyVar <- STM (MVar Bool) -> IO (MVar Bool)
forall a. STM a -> IO a
atomically (STM (MVar Bool) -> IO (MVar Bool))
-> STM (MVar Bool) -> IO (MVar Bool)
forall a b. (a -> b) -> a -> b
$ TQueue (MVar Bool) -> STM (MVar Bool)
forall a. TQueue a -> STM a
readTQueue TQueue (MVar Bool)
queue
    Int
now <- IO Int -> IO Int
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> IO Int) -> IO Int -> IO Int
forall a b. (a -> b) -> a -> b
$ POSIXTime -> Int
forall b. Integral b => POSIXTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
floor (POSIXTime -> Int) -> IO POSIXTime -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO POSIXTime
getPOSIXTime
    -- Atomically process the request: read state, calculate new tokens,
    -- consume a token if available, and write the new state back.
    Bool
allowed <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ do
      TokenBucketState { Int
tokens :: TokenBucketState -> Int
tokens :: Int
tokens, Int
lastUpdate :: TokenBucketState -> Int
lastUpdate :: Int
lastUpdate } <- TVar TokenBucketState -> STM TokenBucketState
forall a. TVar a -> STM a
readTVar TVar TokenBucketState
stateVar
      
      let elapsed :: Double
elapsed = Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int
now Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
lastUpdate)
          refilled :: Double
refilled = Double
elapsed Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
refillRate
          -- Add refilled tokens, but don't exceed the capacity
          currentTokens :: Double
currentTokens = Double -> Double -> Double
forall a. Ord a => a -> a -> a
min (Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
capacity) (Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
tokens Double -> Double -> Double
forall a. Num a => a -> a -> a
+ Double
refilled)
      if Double
currentTokens Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
>= Double
1
        then do
          -- Request is allowed. Consume one token and update the timestamp.
          let newTokens :: Double
newTokens = Double
currentTokens Double -> Double -> Double
forall a. Num a => a -> a -> a
- Double
1
          TVar TokenBucketState -> TokenBucketState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar TokenBucketState
stateVar (Int -> Int -> TokenBucketState
TokenBucketState (Double -> Int
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
floor Double
newTokens) Int
now)
          Bool -> STM Bool
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        else do
          -- Request is denied. Don't consume a token, but update the timestamp
          -- to ensure the next refill calculation is correct.
          TVar TokenBucketState -> TokenBucketState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar TokenBucketState
stateVar (Int -> Int -> TokenBucketState
TokenBucketState (Double -> Int
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
floor Double
currentTokens) Int
now)
          Bool -> STM Bool
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
    -- Send the response back to the waiting client.
    IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar Bool -> Bool -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar Bool
replyVar Bool
allowed