{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE NamedFieldPuns #-}

{-|
Module      : Keter.RateLimiter.TokenBucketWorker
Description : Worker thread implementation for token bucket rate limiting
Copyright   : (c) 2025 Oleksandr Zhabenko
License     : MIT
Maintainer  : oleksandr.zhabenko@yahoo.com
Stability   : experimental
Portability : POSIX

This module provides a concurrent worker thread implementation for the token bucket
rate limiting algorithm. The worker processes incoming requests from a queue and
atomically updates the bucket state using Software Transactional Memory (STM).

== Algorithm Overview

The token bucket algorithm works as follows:

1. __Initialization__: A bucket starts with a certain number of tokens (up to capacity)
2. __Token Refill__: Tokens are added to the bucket at a constant rate over time
3. __Request Processing__: Each request attempts to consume one token
4. __Rate Limiting__: If no tokens are available, the request is denied

== Concurrency Model

The worker uses STM for atomic state updates and communicates via:

* 'TQueue' for receiving incoming requests  
* 'MVar' for sending responses back to clients
* 'TVar' for maintaining bucket state
* 'TMVar' for signaling worker readiness

== Example Usage

@
import Control.Concurrent.STM
import Control.Concurrent.MVar
import Data.Text

-- Create initial bucket state (100 tokens, last updated now)
now <- floor \<$\> getPOSIXTime  
initialState <- newTVarIO $ TokenBucketState 100 now

-- Create communication channels
requestQueue <- newTBroadcastTQueueIO
readySignal <- newEmptyTMVarIO

-- Start worker: 100 token capacity, 10 tokens/second refill rate
startTokenBucketWorker initialState requestQueue 100 10.0 readySignal

-- Wait for worker to be ready
atomically $ takeTMVar readySignal

-- Send a request and wait for response
replyVar <- newEmptyMVar
atomically $ writeTQueue requestQueue replyVar  
allowed <- takeMVar replyVar  -- True if request allowed, False if denied
@

== Performance Characteristics

* __Time Complexity__: O(1) per request (constant time token calculation)
* __Space Complexity__: O(1) (fixed bucket state size)
* __Concurrency__: Lock-free using STM, supports high throughput
* __Precision__: Uses POSIX timestamps for accurate time-based calculations

== Thread Safety

All operations are thread-safe through STM. Multiple clients can safely
send requests to the same worker concurrently.
-}
module Keter.RateLimiter.TokenBucketWorker
  ( -- * Worker Thread Management
    startTokenBucketWorker
  ) where

import Control.Concurrent.STM
import Control.Monad (void, forever)
import Control.Concurrent (forkIO)
import Control.Monad.IO.Class (liftIO)
import Data.Time.Clock.POSIX (getPOSIXTime)
import Keter.RateLimiter.Types (TokenBucketState(..))
import Control.Concurrent.MVar (MVar, putMVar)

-- | Start a dedicated worker thread for processing token bucket requests.
--
-- The worker runs in an infinite loop, processing requests from the provided queue.
-- Each request is handled atomically: the bucket state is read, tokens are refilled
-- based on elapsed time, a token is consumed if available, and the new state is written back.
--
-- ==== Worker Lifecycle
--
-- 1. __Startup__: Worker thread is forked and signals readiness via 'TMVar'
-- 2. __Processing Loop__: Worker waits for requests, processes them atomically
-- 3. __Response__: Results are sent back to clients via 'MVar'
--
-- ==== Token Refill Algorithm
--
-- Tokens are refilled using the formula:
--
-- @
-- newTokens = min capacity (currentTokens + refillRate * elapsedSeconds)
-- @
--
-- This ensures:
-- * Tokens are added proportionally to elapsed time
-- * Bucket capacity is never exceeded
-- * Sub-second precision for refill calculations
--
-- ==== Atomic Request Processing
--
-- Each request is processed in a single STM transaction that:
--
-- 1. Reads current bucket state ('tokens', 'lastUpdate')
-- 2. Calculates elapsed time since last update
-- 3. Computes available tokens after refill
-- 4. Attempts to consume one token if available
-- 5. Updates bucket state with new token count and timestamp
-- 6. Returns allow/deny decision
--
-- ==== Error Handling
--
-- The worker is designed to be resilient:
--
-- * Time calculation errors are handled by using 'floor' for integer conversion
-- * Negative elapsed time (clock adjustments) results in no refill
-- * Worker continues running even if individual requests fail
--
-- ==== Example
--
-- @
-- -- Create a bucket for API rate limiting: 1000 requests/hour = ~0.278 req/sec
-- let capacity = 100              -- Allow bursts up to 100 requests
--     refillRate = 1000.0 / 3600.0 -- 1000 requests per hour
--
-- initialState <- newTVarIO $ TokenBucketState capacity now
-- requestQueue <- newTBroadcastTQueueIO  
-- readySignal <- newEmptyTMVarIO
--
-- startTokenBucketWorker initialState requestQueue capacity refillRate readySignal
-- @
--
-- __Thread Safety:__ All state updates are atomic via STM transactions.
--
-- __Resource Usage:__ Creates one background thread that runs indefinitely.
startTokenBucketWorker
  :: TVar TokenBucketState  -- ^ Shared bucket state (tokens + last update time).
                            --   This 'TVar' is read and updated atomically by the worker.
  -> TQueue (MVar Bool)     -- ^ Request queue containing 'MVar's for client responses.
                            --   Clients place their response 'MVar' in this queue and wait
                            --   for the worker to write the allow/deny decision.
  -> Int                    -- ^ Maximum bucket capacity (maximum tokens that can be stored).
                            --   This sets the upper limit for burst traffic handling.
                            --   Must be positive.
  -> Double                 -- ^ Token refill rate in tokens per second.
                            --   Determines the long-term sustainable request rate.
                            --   Must be positive. Can be fractional (e.g., 0.5 = 1 token per 2 seconds).
  -> TMVar ()               -- ^ Synchronization variable to signal when worker is ready.
                            --   The worker writes to this 'TMVar' once startup is complete.
                            --   Clients can wait on this to ensure the worker is operational.
  -> IO ()                  -- ^ Returns immediately after forking the worker thread.
                            --   The actual worker runs in the background indefinitely.
startTokenBucketWorker :: TVar TokenBucketState
-> TQueue (MVar Bool) -> Int -> Double -> TMVar () -> IO ()
startTokenBucketWorker TVar TokenBucketState
stateVar TQueue (MVar Bool)
queue Int
capacity Double
refillRate TMVar ()
readyVar = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ do
  -- Signal that the worker is ready
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> a -> STM ()
putTMVar TMVar ()
readyVar ()
  
  forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
    -- Wait for a request to arrive in the queue
    MVar Bool
replyVar <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> STM a
readTQueue TQueue (MVar Bool)
queue
    Int
now <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a b. (RealFrac a, Integral b) => a -> b
floor forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO POSIXTime
getPOSIXTime
    -- Atomically process the request: read state, calculate new tokens,
    -- consume a token if available, and write the new state back.
    Bool
allowed <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
      TokenBucketState { Int
tokens :: TokenBucketState -> Int
tokens :: Int
tokens, Int
lastUpdate :: TokenBucketState -> Int
lastUpdate :: Int
lastUpdate } <- forall a. TVar a -> STM a
readTVar TVar TokenBucketState
stateVar
      
      let elapsed :: Double
elapsed = forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int
now forall a. Num a => a -> a -> a
- Int
lastUpdate)
          refilled :: Double
refilled = Double
elapsed forall a. Num a => a -> a -> a
* Double
refillRate
          -- Add refilled tokens, but don't exceed the capacity
          currentTokens :: Double
currentTokens = forall a. Ord a => a -> a -> a
min (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
capacity) (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
tokens forall a. Num a => a -> a -> a
+ Double
refilled)
      if Double
currentTokens forall a. Ord a => a -> a -> Bool
>= Double
1
        then do
          -- Request is allowed. Consume one token and update the timestamp.
          let newTokens :: Double
newTokens = Double
currentTokens forall a. Num a => a -> a -> a
- Double
1
          forall a. TVar a -> a -> STM ()
writeTVar TVar TokenBucketState
stateVar (Int -> Int -> TokenBucketState
TokenBucketState (forall a b. (RealFrac a, Integral b) => a -> b
floor Double
newTokens) Int
now)
          forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        else do
          -- Request is denied. Don't consume a token, but update the timestamp
          -- to ensure the next refill calculation is correct.
          forall a. TVar a -> a -> STM ()
writeTVar TVar TokenBucketState
stateVar (Int -> Int -> TokenBucketState
TokenBucketState (forall a b. (RealFrac a, Integral b) => a -> b
floor Double
currentTokens) Int
now)
          forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
    -- Send the response back to the waiting client.
    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO ()
putMVar MVar Bool
replyVar Bool
allowed