Copyright | (c) 2025 Oleksandr Zhabenko |
---|---|
License | MIT |
Maintainer | oleksandr.zhabenko@yahoo.com |
Stability | experimental |
Portability | POSIX |
Safe Haskell | None |
Language | Haskell2010 |
Keter.RateLimiter.TokenBucketWorker
Contents
Description
This module provides a concurrent worker thread implementation for the token bucket rate limiting algorithm. The worker processes incoming requests from a queue and atomically updates the bucket state using Software Transactional Memory (STM).
Algorithm Overview
The token bucket algorithm works as follows:
- Initialization: A bucket starts with a certain number of tokens (up to capacity)
- Token Refill: Tokens are added to the bucket at a constant rate over time
- Request Processing: Each request attempts to consume one token
- Rate Limiting: If no tokens are available, the request is denied
Concurrency Model
The worker uses STM for atomic state updates and communicates via:
TQueue
for receiving incoming requestsMVar
for sending responses back to clientsTVar
for maintaining bucket stateTMVar
for signaling worker readiness
Example Usage
import Control.Concurrent.STM import Control.Concurrent.MVar import Data.Text -- Create initial bucket state (100 tokens, last updated now) now <- floor <$> getPOSIXTime initialState <- newTVarIO $ TokenBucketState 100 now -- Create communication channels requestQueue <- newTBroadcastTQueueIO readySignal <- newEmptyTMVarIO -- Start worker: 100 token capacity, 10 tokens/second refill rate startTokenBucketWorker initialState requestQueue 100 10.0 readySignal -- Wait for worker to be ready atomically $ takeTMVar readySignal -- Send a request and wait for response replyVar <- newEmptyMVar atomically $ writeTQueue requestQueue replyVar allowed <- takeMVar replyVar -- True if request allowed, False if denied
Performance Characteristics
- Time Complexity: O(1) per request (constant time token calculation)
- Space Complexity: O(1) (fixed bucket state size)
- Concurrency: Lock-free using STM, supports high throughput
- Precision: Uses POSIX timestamps for accurate time-based calculations
Thread Safety
All operations are thread-safe through STM. Multiple clients can safely send requests to the same worker concurrently.
Synopsis
- startTokenBucketWorker :: TVar TokenBucketState -> TQueue (MVar Bool) -> Int -> Double -> TMVar () -> IO ()
Worker Thread Management
startTokenBucketWorker Source #
Arguments
:: TVar TokenBucketState | Shared bucket state (tokens + last update time).
This |
-> TQueue (MVar Bool) | Request queue containing |
-> Int | Maximum bucket capacity (maximum tokens that can be stored). This sets the upper limit for burst traffic handling. Must be positive. |
-> Double | Token refill rate in tokens per second. Determines the long-term sustainable request rate. Must be positive. Can be fractional (e.g., 0.5 = 1 token per 2 seconds). |
-> TMVar () | Synchronization variable to signal when worker is ready.
The worker writes to this |
-> IO () | Returns immediately after forking the worker thread. The actual worker runs in the background indefinitely. |
Start a dedicated worker thread for processing token bucket requests.
The worker runs in an infinite loop, processing requests from the provided queue. Each request is handled atomically: the bucket state is read, tokens are refilled based on elapsed time, a token is consumed if available, and the new state is written back.
Worker Lifecycle
- Startup: Worker thread is forked and signals readiness via
TMVar
- Processing Loop: Worker waits for requests, processes them atomically
- Response: Results are sent back to clients via
MVar
Token Refill Algorithm
Tokens are refilled using the formula:
newTokens = min capacity (currentTokens + refillRate * elapsedSeconds)
This ensures:
- Tokens are added proportionally to elapsed time
- Bucket capacity is never exceeded
- Sub-second precision for refill calculations
Atomic Request Processing
Each request is processed in a single STM transaction that:
- Reads current bucket state (
tokens
,lastUpdate
) - Calculates elapsed time since last update
- Computes available tokens after refill
- Attempts to consume one token if available
- Updates bucket state with new token count and timestamp
- Returns allow/deny decision
Error Handling
The worker is designed to be resilient:
- Time calculation errors are handled by using
floor
for integer conversion - Negative elapsed time (clock adjustments) results in no refill
- Worker continues running even if individual requests fail
Examples
-- Create a bucket for API rate limiting: 1000 requests/hour = ~0.278 req/sec let capacity = 100 -- Allow bursts up to 100 requests refillRate = 1000.0 / 3600.0 -- 1000 requests per hour initialState <- newTVarIO $ TokenBucketState capacity now requestQueue <- newTBroadcastTQueueIO readySignal <- newEmptyTMVarIO startTokenBucketWorker initialState requestQueue capacity refillRate readySignal
Thread Safety: All state updates are atomic via STM transactions.
Resource Usage: Creates one background thread that runs indefinitely.