{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE NamedFieldPuns #-} {-| Module : Keter.RateLimiter.TokenBucketWorker Description : Worker thread implementation for token bucket rate limiting Copyright : (c) 2025 Oleksandr Zhabenko License : MIT Maintainer : oleksandr.zhabenko@yahoo.com Stability : experimental Portability : POSIX This module provides a concurrent worker thread implementation for the token bucket rate limiting algorithm. The worker processes incoming requests from a queue and atomically updates the bucket state using Software Transactional Memory (STM). == Algorithm Overview The token bucket algorithm works as follows: 1. __Initialization__: A bucket starts with a certain number of tokens (up to capacity) 2. __Token Refill__: Tokens are added to the bucket at a constant rate over time 3. __Request Processing__: Each request attempts to consume one token 4. __Rate Limiting__: If no tokens are available, the request is denied == Concurrency Model The worker uses STM for atomic state updates and communicates via: * 'TQueue' for receiving incoming requests * 'MVar' for sending responses back to clients * 'TVar' for maintaining bucket state * 'TMVar' for signaling worker readiness == Example Usage @ import Control.Concurrent.STM import Control.Concurrent.MVar import Data.Text -- Create initial bucket state (100 tokens, last updated now) now <- floor \<$\> getPOSIXTime initialState <- newTVarIO $ TokenBucketState 100 now -- Create communication channels requestQueue <- newTBroadcastTQueueIO readySignal <- newEmptyTMVarIO -- Start worker: 100 token capacity, 10 tokens/second refill rate startTokenBucketWorker initialState requestQueue 100 10.0 readySignal -- Wait for worker to be ready atomically $ takeTMVar readySignal -- Send a request and wait for response replyVar <- newEmptyMVar atomically $ writeTQueue requestQueue replyVar allowed <- takeMVar replyVar -- True if request allowed, False if denied @ == Performance Characteristics * __Time Complexity__: O(1) per request (constant time token calculation) * __Space Complexity__: O(1) (fixed bucket state size) * __Concurrency__: Lock-free using STM, supports high throughput * __Precision__: Uses POSIX timestamps for accurate time-based calculations == Thread Safety All operations are thread-safe through STM. Multiple clients can safely send requests to the same worker concurrently. -} module Keter.RateLimiter.TokenBucketWorker ( -- * Worker Thread Management startTokenBucketWorker ) where import Control.Concurrent.STM import Control.Monad (void, forever) import Control.Concurrent (forkIO) import Control.Monad.IO.Class (liftIO) import Data.Time.Clock.POSIX (getPOSIXTime) import Keter.RateLimiter.Types (TokenBucketState(..)) import Control.Concurrent.MVar (MVar, putMVar) -- | Start a dedicated worker thread for processing token bucket requests. -- -- The worker runs in an infinite loop, processing requests from the provided queue. -- Each request is handled atomically: the bucket state is read, tokens are refilled -- based on elapsed time, a token is consumed if available, and the new state is written back. -- -- ==== Worker Lifecycle -- -- 1. __Startup__: Worker thread is forked and signals readiness via 'TMVar' -- 2. __Processing Loop__: Worker waits for requests, processes them atomically -- 3. __Response__: Results are sent back to clients via 'MVar' -- -- ==== Token Refill Algorithm -- -- Tokens are refilled using the formula: -- -- @ -- newTokens = min capacity (currentTokens + refillRate * elapsedSeconds) -- @ -- -- This ensures: -- * Tokens are added proportionally to elapsed time -- * Bucket capacity is never exceeded -- * Sub-second precision for refill calculations -- -- ==== Atomic Request Processing -- -- Each request is processed in a single STM transaction that: -- -- 1. Reads current bucket state ('tokens', 'lastUpdate') -- 2. Calculates elapsed time since last update -- 3. Computes available tokens after refill -- 4. Attempts to consume one token if available -- 5. Updates bucket state with new token count and timestamp -- 6. Returns allow/deny decision -- -- ==== Error Handling -- -- The worker is designed to be resilient: -- -- * Time calculation errors are handled by using 'floor' for integer conversion -- * Negative elapsed time (clock adjustments) results in no refill -- * Worker continues running even if individual requests fail -- -- ==== Example -- -- @ -- -- Create a bucket for API rate limiting: 1000 requests/hour = ~0.278 req/sec -- let capacity = 100 -- Allow bursts up to 100 requests -- refillRate = 1000.0 / 3600.0 -- 1000 requests per hour -- -- initialState <- newTVarIO $ TokenBucketState capacity now -- requestQueue <- newTBroadcastTQueueIO -- readySignal <- newEmptyTMVarIO -- -- startTokenBucketWorker initialState requestQueue capacity refillRate readySignal -- @ -- -- __Thread Safety:__ All state updates are atomic via STM transactions. -- -- __Resource Usage:__ Creates one background thread that runs indefinitely. startTokenBucketWorker :: TVar TokenBucketState -- ^ Shared bucket state (tokens + last update time). -- This 'TVar' is read and updated atomically by the worker. -> TQueue (MVar Bool) -- ^ Request queue containing 'MVar's for client responses. -- Clients place their response 'MVar' in this queue and wait -- for the worker to write the allow/deny decision. -> Int -- ^ Maximum bucket capacity (maximum tokens that can be stored). -- This sets the upper limit for burst traffic handling. -- Must be positive. -> Double -- ^ Token refill rate in tokens per second. -- Determines the long-term sustainable request rate. -- Must be positive. Can be fractional (e.g., 0.5 = 1 token per 2 seconds). -> TMVar () -- ^ Synchronization variable to signal when worker is ready. -- The worker writes to this 'TMVar' once startup is complete. -- Clients can wait on this to ensure the worker is operational. -> IO () -- ^ Returns immediately after forking the worker thread. -- The actual worker runs in the background indefinitely. startTokenBucketWorker :: TVar TokenBucketState -> TQueue (MVar Bool) -> Int -> Double -> TMVar () -> IO () startTokenBucketWorker TVar TokenBucketState stateVar TQueue (MVar Bool) queue Int capacity Double refillRate TMVar () readyVar = forall (f :: * -> *) a. Functor f => f a -> f () void forall b c a. (b -> c) -> (a -> b) -> a -> c . IO () -> IO ThreadId forkIO forall a b. (a -> b) -> a -> b $ do -- Signal that the worker is ready forall a. STM a -> IO a atomically forall a b. (a -> b) -> a -> b $ forall a. TMVar a -> a -> STM () putTMVar TMVar () readyVar () forall (f :: * -> *) a b. Applicative f => f a -> f b forever forall a b. (a -> b) -> a -> b $ do -- Wait for a request to arrive in the queue MVar Bool replyVar <- forall a. STM a -> IO a atomically forall a b. (a -> b) -> a -> b $ forall a. TQueue a -> STM a readTQueue TQueue (MVar Bool) queue Int now <- forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO forall a b. (a -> b) -> a -> b $ forall a b. (RealFrac a, Integral b) => a -> b floor forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> IO POSIXTime getPOSIXTime -- Atomically process the request: read state, calculate new tokens, -- consume a token if available, and write the new state back. Bool allowed <- forall a. STM a -> IO a atomically forall a b. (a -> b) -> a -> b $ do TokenBucketState { Int tokens :: TokenBucketState -> Int tokens :: Int tokens, Int lastUpdate :: TokenBucketState -> Int lastUpdate :: Int lastUpdate } <- forall a. TVar a -> STM a readTVar TVar TokenBucketState stateVar let elapsed :: Double elapsed = forall a b. (Integral a, Num b) => a -> b fromIntegral (Int now forall a. Num a => a -> a -> a - Int lastUpdate) refilled :: Double refilled = Double elapsed forall a. Num a => a -> a -> a * Double refillRate -- Add refilled tokens, but don't exceed the capacity currentTokens :: Double currentTokens = forall a. Ord a => a -> a -> a min (forall a b. (Integral a, Num b) => a -> b fromIntegral Int capacity) (forall a b. (Integral a, Num b) => a -> b fromIntegral Int tokens forall a. Num a => a -> a -> a + Double refilled) if Double currentTokens forall a. Ord a => a -> a -> Bool >= Double 1 then do -- Request is allowed. Consume one token and update the timestamp. let newTokens :: Double newTokens = Double currentTokens forall a. Num a => a -> a -> a - Double 1 forall a. TVar a -> a -> STM () writeTVar TVar TokenBucketState stateVar (Int -> Int -> TokenBucketState TokenBucketState (forall a b. (RealFrac a, Integral b) => a -> b floor Double newTokens) Int now) forall (m :: * -> *) a. Monad m => a -> m a return Bool True else do -- Request is denied. Don't consume a token, but update the timestamp -- to ensure the next refill calculation is correct. forall a. TVar a -> a -> STM () writeTVar TVar TokenBucketState stateVar (Int -> Int -> TokenBucketState TokenBucketState (forall a b. (RealFrac a, Integral b) => a -> b floor Double currentTokens) Int now) forall (m :: * -> *) a. Monad m => a -> m a return Bool False -- Send the response back to the waiting client. forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO forall a b. (a -> b) -> a -> b $ forall a. MVar a -> a -> IO () putMVar MVar Bool replyVar Bool allowed