keter-rate-limiting-plugin-0.2.0.0: Simple Keter rate limiting plugin.
Copyright(c) 2025 Oleksandr Zhabenko
LicenseMIT
Maintaineroleksandr.zhabenko@yahoo.com
Stabilitystable
Portabilityportable
Safe HaskellNone
LanguageHaskell2010

Keter.RateLimiter.LeakyBucket

Description

This module implements the core logic for the Leaky Bucket rate-limiting algorithm. The primary goal of this algorithm is to smooth out bursts of requests into a steady, predictable flow. It is conceptually similar to a bucket with a hole in the bottom.

Incoming requests are like water being added to the bucket. The bucket has a finite capacity. If a request arrives when the bucket is full, it is rejected (it "spills over"). The hole in the bucket allows requests to be processed (or "leak out") at a constant leak rate.

This implementation uses a dedicated worker thread for each bucket (e.g., for each unique user or IP address) to process requests from a queue. This ensures that processing is serialized and state is managed safely under concurrent access. The first request for a given key will spawn the worker, which then serves all subsequent requests for that same key.

Synopsis

Algorithm Logic

allowRequest Source #

Arguments

:: MonadIO m 
=> Cache (InMemoryStore 'LeakyBucket)

Leaky bucket cache instance

-> Text

Throttle name (logical grouping identifier)

-> Text

IP zone identifier for multi-tenant isolation

-> Text

User key (unique client identifier)

-> Int

Bucket capacity (maximum queued requests, must be > 0)

-> Double

Leak rate in requests per second (must be positive)

-> m Bool

True if request is allowed, False if bucket is full

Determines whether a request should be allowed based on the state of its corresponding leaky bucket.

This function is the primary entry point for the leaky bucket algorithm. When a request arrives, this function is called to check if it should be processed or rejected. It operates by finding or creating a bucket associated with the user's key. Every request is added to a queue for its bucket.

A dedicated worker thread is responsible for processing the queue for each bucket. This function ensures that a worker is started for a new bucket but avoids starting duplicate workers. The final result (True or False) is communicated back to the caller once the worker has processed the request.

Algorithm Flow

  1. Validation: Check if capacity is valid (> 0)
  2. Key Construction: Build composite cache key from throttle name, IP zone, and user key
  3. Bucket Management: Find existing bucket or create new one atomically
  4. Request Queuing: Add request to bucket's processing queue
  5. Worker Management: Start worker thread for new buckets (one-time operation)
  6. Response: Wait for worker to process request and return result

Concurrency Model

  • Each bucket has its own worker thread for serialized processing
  • Multiple clients can safely call this function concurrently
  • STM ensures atomic bucket creation and state management
  • Workers are started lazily on first request per bucket

Examples

Expand
-- Basic usage: 10 request capacity, 1 request per second leak rate
isAllowed <- allowRequest cache "api-throttle" "us-east-1" "user-123" 10 1.0
if isAllowed
  then putStrLn "Request is allowed."
  else putStrLn "Request is blocked (429 Too Many Requests)."
-- High-throughput API with burst tolerance
let capacity = 100        -- Allow burst of up to 100 requests
    leakRate = 10.0       -- Process 10 requests per second steadily

result <- allowRequest cache "high-volume" "zone-premium" "client-456" capacity leakRate
when result $ processApiRequest
-- Rate limiting for different service tiers
let (cap, rate) = case userTier of
      Premium -> (50, 5.0)   -- 50 burst, 5/sec sustained
      Standard -> (20, 2.0)  -- 20 burst, 2/sec sustained  
      Basic -> (5, 0.5)      -- 5 burst, 1 per 2 seconds

allowed <- allowRequest cache "tiered-api" zone userId cap rate

Thread Safety: This function is fully thread-safe and can be called concurrently from multiple threads.

Performance: New buckets incur a one-time worker thread creation cost. Subsequent requests are queued with minimal overhead.