{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE NamedFieldPuns #-} {-| Module : Keter.RateLimiter.TokenBucketWorker Description : Worker thread implementation for token bucket rate limiting Copyright : (c) 2025 Oleksandr Zhabenko License : MIT Maintainer : oleksandr.zhabenko@yahoo.com Stability : experimental Portability : POSIX This module provides a concurrent worker thread implementation for the token bucket rate limiting algorithm. The worker processes incoming requests from a queue and atomically updates the bucket state using Software Transactional Memory (STM). == Algorithm Overview The token bucket algorithm works as follows: 1. __Initialization__: A bucket starts with a certain number of tokens (up to capacity) 2. __Token Refill__: Tokens are added to the bucket at a constant rate over time 3. __Request Processing__: Each request attempts to consume one token 4. __Rate Limiting__: If no tokens are available, the request is denied == Concurrency Model The worker uses STM for atomic state updates and communicates via: * 'TQueue' for receiving incoming requests * 'MVar' for sending responses back to clients * 'TVar' for maintaining bucket state * 'TMVar' for signaling worker readiness == Example Usage @ import Control.Concurrent.STM import Control.Concurrent.MVar import Data.Text -- Create initial bucket state (100 tokens, last updated now) now <- floor \<$\> getPOSIXTime initialState <- newTVarIO $ TokenBucketState 100 now -- Create communication channels requestQueue <- newTBroadcastTQueueIO readySignal <- newEmptyTMVarIO -- Start worker: 100 token capacity, 10 tokens\/second refill rate startTokenBucketWorker initialState requestQueue 100 10.0 readySignal -- Wait for worker to be ready atomically $ takeTMVar readySignal -- Send a request and wait for response replyVar <- newEmptyMVar atomically $ writeTQueue requestQueue replyVar allowed <- takeMVar replyVar -- True if request allowed, False if denied @ == Performance Characteristics * __Time Complexity__: O(1) per request (constant time token calculation) * __Space Complexity__: O(1) (fixed bucket state size) * __Concurrency__: Lock-free using STM, supports high throughput * __Precision__: Uses POSIX timestamps for accurate time-based calculations == Thread Safety All operations are thread-safe through STM. Multiple clients can safely send requests to the same worker concurrently. -} module Keter.RateLimiter.TokenBucketWorker ( -- * Worker Thread Management startTokenBucketWorker ) where import Control.Concurrent.STM import Control.Monad (void, forever) import Control.Concurrent (forkIO) import Control.Monad.IO.Class (liftIO) import Data.Time.Clock.POSIX (getPOSIXTime) import Keter.RateLimiter.Types (TokenBucketState(..)) import Control.Concurrent.MVar (MVar, putMVar) -- | Start a dedicated worker thread for processing token bucket requests. -- -- The worker runs in an infinite loop, processing requests from the provided queue. -- Each request is handled atomically: the bucket state is read, tokens are refilled -- based on elapsed time, a token is consumed if available, and the new state is written back. -- -- === Worker Lifecycle -- -- 1. __Startup__: Worker thread is forked and signals readiness via 'TMVar' -- 2. __Processing Loop__: Worker waits for requests, processes them atomically -- 3. __Response__: Results are sent back to clients via 'MVar' -- -- === Token Refill Algorithm -- -- Tokens are refilled using the formula: -- -- @ -- newTokens = min capacity (currentTokens + refillRate * elapsedSeconds) -- @ -- -- This ensures: -- -- * Tokens are added proportionally to elapsed time -- * Bucket capacity is never exceeded -- * Sub-second precision for refill calculations -- -- === Atomic Request Processing -- -- Each request is processed in a single STM transaction that: -- -- 1. Reads current bucket state ('tokens', 'lastUpdate') -- 2. Calculates elapsed time since last update -- 3. Computes available tokens after refill -- 4. Attempts to consume one token if available -- 5. Updates bucket state with new token count and timestamp -- 6. Returns allow\/deny decision -- -- === Error Handling -- -- The worker is designed to be resilient: -- -- * Time calculation errors are handled by using 'floor' for integer conversion -- * Negative elapsed time (clock adjustments) results in no refill -- * Worker continues running even if individual requests fail -- -- ==== __Examples__ -- -- @ -- -- Create a bucket for API rate limiting: 1000 requests\/hour = ~0.278 req\/sec -- let capacity = 100 -- Allow bursts up to 100 requests -- refillRate = 1000.0 \/ 3600.0 -- 1000 requests per hour -- -- initialState <- newTVarIO $ TokenBucketState capacity now -- requestQueue <- newTBroadcastTQueueIO -- readySignal <- newEmptyTMVarIO -- -- startTokenBucketWorker initialState requestQueue capacity refillRate readySignal -- @ -- -- /Thread Safety:/ All state updates are atomic via STM transactions. -- -- /Resource Usage:/ Creates one background thread that runs indefinitely. startTokenBucketWorker :: TVar TokenBucketState -- ^ Shared bucket state (tokens + last update time). -- This 'TVar' is read and updated atomically by the worker. -> TQueue (MVar Bool) -- ^ Request queue containing 'MVar's for client responses. -- Clients place their response 'MVar' in this queue and wait -- for the worker to write the allow\/deny decision. -> Int -- ^ Maximum bucket capacity (maximum tokens that can be stored). -- This sets the upper limit for burst traffic handling. -- Must be positive. -> Double -- ^ Token refill rate in tokens per second. -- Determines the long-term sustainable request rate. -- Must be positive. Can be fractional (e.g., 0.5 = 1 token per 2 seconds). -> TMVar () -- ^ Synchronization variable to signal when worker is ready. -- The worker writes to this 'TMVar' once startup is complete. -- Clients can wait on this to ensure the worker is operational. -> IO () -- ^ Returns immediately after forking the worker thread. -- The actual worker runs in the background indefinitely. startTokenBucketWorker :: TVar TokenBucketState -> TQueue (MVar Bool) -> Int -> Double -> TMVar () -> IO () startTokenBucketWorker TVar TokenBucketState stateVar TQueue (MVar Bool) queue Int capacity Double refillRate TMVar () readyVar = IO ThreadId -> IO () forall (f :: * -> *) a. Functor f => f a -> f () void (IO ThreadId -> IO ()) -> (IO () -> IO ThreadId) -> IO () -> IO () forall b c a. (b -> c) -> (a -> b) -> a -> c . IO () -> IO ThreadId forkIO (IO () -> IO ()) -> IO () -> IO () forall a b. (a -> b) -> a -> b $ do -- Signal that the worker is ready STM () -> IO () forall a. STM a -> IO a atomically (STM () -> IO ()) -> STM () -> IO () forall a b. (a -> b) -> a -> b $ TMVar () -> () -> STM () forall a. TMVar a -> a -> STM () putTMVar TMVar () readyVar () IO () -> IO () forall (f :: * -> *) a b. Applicative f => f a -> f b forever (IO () -> IO ()) -> IO () -> IO () forall a b. (a -> b) -> a -> b $ do -- Wait for a request to arrive in the queue MVar Bool replyVar <- STM (MVar Bool) -> IO (MVar Bool) forall a. STM a -> IO a atomically (STM (MVar Bool) -> IO (MVar Bool)) -> STM (MVar Bool) -> IO (MVar Bool) forall a b. (a -> b) -> a -> b $ TQueue (MVar Bool) -> STM (MVar Bool) forall a. TQueue a -> STM a readTQueue TQueue (MVar Bool) queue Int now <- IO Int -> IO Int forall a. IO a -> IO a forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO (IO Int -> IO Int) -> IO Int -> IO Int forall a b. (a -> b) -> a -> b $ POSIXTime -> Int forall b. Integral b => POSIXTime -> b forall a b. (RealFrac a, Integral b) => a -> b floor (POSIXTime -> Int) -> IO POSIXTime -> IO Int forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> IO POSIXTime getPOSIXTime -- Atomically process the request: read state, calculate new tokens, -- consume a token if available, and write the new state back. Bool allowed <- STM Bool -> IO Bool forall a. STM a -> IO a atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool forall a b. (a -> b) -> a -> b $ do TokenBucketState { Int tokens :: TokenBucketState -> Int tokens :: Int tokens, Int lastUpdate :: TokenBucketState -> Int lastUpdate :: Int lastUpdate } <- TVar TokenBucketState -> STM TokenBucketState forall a. TVar a -> STM a readTVar TVar TokenBucketState stateVar let elapsed :: Double elapsed = Int -> Double forall a b. (Integral a, Num b) => a -> b fromIntegral (Int now Int -> Int -> Int forall a. Num a => a -> a -> a - Int lastUpdate) refilled :: Double refilled = Double elapsed Double -> Double -> Double forall a. Num a => a -> a -> a * Double refillRate -- Add refilled tokens, but don't exceed the capacity currentTokens :: Double currentTokens = Double -> Double -> Double forall a. Ord a => a -> a -> a min (Int -> Double forall a b. (Integral a, Num b) => a -> b fromIntegral Int capacity) (Int -> Double forall a b. (Integral a, Num b) => a -> b fromIntegral Int tokens Double -> Double -> Double forall a. Num a => a -> a -> a + Double refilled) if Double currentTokens Double -> Double -> Bool forall a. Ord a => a -> a -> Bool >= Double 1 then do -- Request is allowed. Consume one token and update the timestamp. let newTokens :: Double newTokens = Double currentTokens Double -> Double -> Double forall a. Num a => a -> a -> a - Double 1 TVar TokenBucketState -> TokenBucketState -> STM () forall a. TVar a -> a -> STM () writeTVar TVar TokenBucketState stateVar (Int -> Int -> TokenBucketState TokenBucketState (Double -> Int forall b. Integral b => Double -> b forall a b. (RealFrac a, Integral b) => a -> b floor Double newTokens) Int now) Bool -> STM Bool forall a. a -> STM a forall (m :: * -> *) a. Monad m => a -> m a return Bool True else do -- Request is denied. Don't consume a token, but update the timestamp -- to ensure the next refill calculation is correct. TVar TokenBucketState -> TokenBucketState -> STM () forall a. TVar a -> a -> STM () writeTVar TVar TokenBucketState stateVar (Int -> Int -> TokenBucketState TokenBucketState (Double -> Int forall b. Integral b => Double -> b forall a b. (RealFrac a, Integral b) => a -> b floor Double currentTokens) Int now) Bool -> STM Bool forall a. a -> STM a forall (m :: * -> *) a. Monad m => a -> m a return Bool False -- Send the response back to the waiting client. IO () -> IO () forall a. IO a -> IO a forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO (IO () -> IO ()) -> IO () -> IO () forall a b. (a -> b) -> a -> b $ MVar Bool -> Bool -> IO () forall a. MVar a -> a -> IO () putMVar MVar Bool replyVar Bool allowed