Copyright | (c) 2025 Oleksandr Zhabenko |
---|---|
License | MIT |
Maintainer | oleksandr.zhabenko@yahoo.com |
Stability | stable |
Portability | portable |
Safe Haskell | None |
Language | Haskell2010 |
Keter.RateLimiter.LeakyBucket
Contents
Description
This module implements the core logic for the Leaky Bucket rate-limiting algorithm. The primary goal of this algorithm is to smooth out bursts of requests into a steady, predictable flow. It is conceptually similar to a bucket with a hole in the bottom.
Incoming requests are like water being added to the bucket. The bucket has a finite capacity. If a request arrives when the bucket is full, it is rejected (it "spills over"). The hole in the bucket allows requests to be processed (or "leak out") at a constant leak rate.
This implementation uses a dedicated worker thread for each bucket (e.g., for each unique user or IP address) to process requests from a queue. This ensures that processing is serialized and state is managed safely under concurrent access. The first request for a given key will spawn the worker, which then serves all subsequent requests for that same key.
Synopsis
- allowRequest :: MonadIO m => Cache (InMemoryStore 'LeakyBucket) -> Text -> Text -> Text -> Int -> Double -> m Bool
Algorithm Logic
Arguments
:: MonadIO m | |
=> Cache (InMemoryStore 'LeakyBucket) | Leaky bucket cache instance |
-> Text | Throttle name (logical grouping identifier) |
-> Text | IP zone identifier for multi-tenant isolation |
-> Text | User key (unique client identifier) |
-> Int | Bucket capacity (maximum queued requests, must be > 0) |
-> Double | Leak rate in requests per second (must be positive) |
-> m Bool |
Determines whether a request should be allowed based on the state of its corresponding leaky bucket.
This function is the primary entry point for the leaky bucket algorithm. When a request arrives, this function is called to check if it should be processed or rejected. It operates by finding or creating a bucket associated with the user's key. Every request is added to a queue for its bucket.
A dedicated worker thread is responsible for processing the queue for each bucket. This function ensures
that a worker is started for a new bucket but avoids starting duplicate workers. The final result
(True
or False
) is communicated back to the caller once the worker has processed the request.
Algorithm Flow
- Validation: Check if capacity is valid (> 0)
- Key Construction: Build composite cache key from throttle name, IP zone, and user key
- Bucket Management: Find existing bucket or create new one atomically
- Request Queuing: Add request to bucket's processing queue
- Worker Management: Start worker thread for new buckets (one-time operation)
- Response: Wait for worker to process request and return result
Concurrency Model
- Each bucket has its own worker thread for serialized processing
- Multiple clients can safely call this function concurrently
- STM ensures atomic bucket creation and state management
- Workers are started lazily on first request per bucket
Examples
-- Basic usage: 10 request capacity, 1 request per second leak rate isAllowed <- allowRequest cache "api-throttle" "us-east-1" "user-123" 10 1.0 if isAllowed then putStrLn "Request is allowed." else putStrLn "Request is blocked (429 Too Many Requests)."
-- High-throughput API with burst tolerance let capacity = 100 -- Allow burst of up to 100 requests leakRate = 10.0 -- Process 10 requests per second steadily result <- allowRequest cache "high-volume" "zone-premium" "client-456" capacity leakRate when result $ processApiRequest
-- Rate limiting for different service tiers let (cap, rate) = case userTier of Premium -> (50, 5.0) -- 50 burst, 5/sec sustained Standard -> (20, 2.0) -- 20 burst, 2/sec sustained Basic -> (5, 0.5) -- 5 burst, 1 per 2 seconds allowed <- allowRequest cache "tiered-api" zone userId cap rate
Thread Safety: This function is fully thread-safe and can be called concurrently from multiple threads.
Performance: New buckets incur a one-time worker thread creation cost. Subsequent requests are queued with minimal overhead.