keter-rate-limiting-plugin-0.2.0.0: Simple Keter rate limiting plugin.
Copyright(c) 2025 Oleksandr Zhabenko
LicenseMIT
Maintaineroleksandr.zhabenko@yahoo.com
Stabilityexperimental
PortabilityPOSIX
Safe HaskellNone
LanguageHaskell2010

Keter.RateLimiter.TokenBucketWorker

Description

This module provides a concurrent worker thread implementation for the token bucket rate limiting algorithm. The worker processes incoming requests from a queue and atomically updates the bucket state using Software Transactional Memory (STM).

Algorithm Overview

The token bucket algorithm works as follows:

  1. Initialization: A bucket starts with a certain number of tokens (up to capacity)
  2. Token Refill: Tokens are added to the bucket at a constant rate over time
  3. Request Processing: Each request attempts to consume one token
  4. Rate Limiting: If no tokens are available, the request is denied

Concurrency Model

The worker uses STM for atomic state updates and communicates via:

  • TQueue for receiving incoming requests
  • MVar for sending responses back to clients
  • TVar for maintaining bucket state
  • TMVar for signaling worker readiness

Example Usage

import Control.Concurrent.STM
import Control.Concurrent.MVar
import Data.Text

-- Create initial bucket state (100 tokens, last updated now)
now <- floor <$> getPOSIXTime  
initialState <- newTVarIO $ TokenBucketState 100 now

-- Create communication channels
requestQueue <- newTBroadcastTQueueIO
readySignal <- newEmptyTMVarIO

-- Start worker: 100 token capacity, 10 tokens/second refill rate
startTokenBucketWorker initialState requestQueue 100 10.0 readySignal

-- Wait for worker to be ready
atomically $ takeTMVar readySignal

-- Send a request and wait for response
replyVar <- newEmptyMVar
atomically $ writeTQueue requestQueue replyVar  
allowed <- takeMVar replyVar  -- True if request allowed, False if denied

Performance Characteristics

  • Time Complexity: O(1) per request (constant time token calculation)
  • Space Complexity: O(1) (fixed bucket state size)
  • Concurrency: Lock-free using STM, supports high throughput
  • Precision: Uses POSIX timestamps for accurate time-based calculations

Thread Safety

All operations are thread-safe through STM. Multiple clients can safely send requests to the same worker concurrently.

Synopsis

Worker Thread Management

startTokenBucketWorker Source #

Arguments

:: TVar TokenBucketState

Shared bucket state (tokens + last update time). This TVar is read and updated atomically by the worker.

-> TQueue (MVar Bool)

Request queue containing MVars for client responses. Clients place their response MVar in this queue and wait for the worker to write the allow/deny decision.

-> Int

Maximum bucket capacity (maximum tokens that can be stored). This sets the upper limit for burst traffic handling. Must be positive.

-> Double

Token refill rate in tokens per second. Determines the long-term sustainable request rate. Must be positive. Can be fractional (e.g., 0.5 = 1 token per 2 seconds).

-> TMVar ()

Synchronization variable to signal when worker is ready. The worker writes to this TMVar once startup is complete. Clients can wait on this to ensure the worker is operational.

-> IO ()

Returns immediately after forking the worker thread. The actual worker runs in the background indefinitely.

Start a dedicated worker thread for processing token bucket requests.

The worker runs in an infinite loop, processing requests from the provided queue. Each request is handled atomically: the bucket state is read, tokens are refilled based on elapsed time, a token is consumed if available, and the new state is written back.

Worker Lifecycle

  1. Startup: Worker thread is forked and signals readiness via TMVar
  2. Processing Loop: Worker waits for requests, processes them atomically
  3. Response: Results are sent back to clients via MVar

Token Refill Algorithm

Tokens are refilled using the formula:

newTokens = min capacity (currentTokens + refillRate * elapsedSeconds)

This ensures:

  • Tokens are added proportionally to elapsed time
  • Bucket capacity is never exceeded
  • Sub-second precision for refill calculations

Atomic Request Processing

Each request is processed in a single STM transaction that:

  1. Reads current bucket state (tokens, lastUpdate)
  2. Calculates elapsed time since last update
  3. Computes available tokens after refill
  4. Attempts to consume one token if available
  5. Updates bucket state with new token count and timestamp
  6. Returns allow/deny decision

Error Handling

The worker is designed to be resilient:

  • Time calculation errors are handled by using floor for integer conversion
  • Negative elapsed time (clock adjustments) results in no refill
  • Worker continues running even if individual requests fail

Examples

Expand
-- Create a bucket for API rate limiting: 1000 requests/hour = ~0.278 req/sec
let capacity = 100              -- Allow bursts up to 100 requests
    refillRate = 1000.0 / 3600.0 -- 1000 requests per hour

initialState <- newTVarIO $ TokenBucketState capacity now
requestQueue <- newTBroadcastTQueueIO  
readySignal <- newEmptyTMVarIO

startTokenBucketWorker initialState requestQueue capacity refillRate readySignal

Thread Safety: All state updates are atomic via STM transactions.

Resource Usage: Creates one background thread that runs indefinitely.