keter-rate-limiting-plugin-0.2.0.0: Simple Keter rate limiting plugin.
Copyright(c) Oleksandr Zhabenko
LicenseMIT
Maintaineroleksandr.zhabenko@yahoo.com
Stabilityexperimental
PortabilityPOSIX
Safe HaskellNone
LanguageHaskell2010

Keter.RateLimiter.AutoPurge

Description

This module provides automatic purging and cleanup functionality for rate limiter caches and STM-based data structures. It implements background threads that periodically remove expired entries to prevent memory leaks and maintain optimal performance.

Purging Strategies

The module supports two main purging approaches:

  1. Generic Cache Purging: Works with any Cache instance
  2. Custom STM Map Purging: Specialized for rate limiter bucket entries

Architecture Overview

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│   Cache/STM     │    │   Purge Thread   │    │   Timer Logic   │
│     Map         │◄───┤                  │◄───┤                 │
└─────────────────┘    │  • Check TTL     │    │ • Interval calc │
                       │  • Remove expired│    │ • Sleep mgmt    │
                       │  • Cleanup locks │    │ • Precise timing│
                       └──────────────────┘    └─────────────────┘

Performance Characteristics

  • Time Complexity: O(n) where n is the number of entries (full scan)
  • Space Complexity: O(1) additional memory usage
  • Timing Precision: Microsecond-level accuracy using monotonic clock
  • Thread Safety: All operations are atomic using STM

Example Usage

Basic Cache Purging

import qualified Data.Cache as C
import Data.Text

-- Create a cache with 1-hour TTL
cache <- C.newCache (Just $ C.seconds 3600)

-- Start auto-purge every 10 minutes (600 seconds)
startAutoPurge cache 600

-- Cache will now automatically remove expired entries

Token Bucket Purging

import qualified StmContainers.Map as StmMap

-- Create STM map for token buckets
tokenBuckets <- StmMap.newIO

-- Start purging inactive buckets every 5 minutes, TTL 1 hour
threadId <- startCustomPurgeTokenBucket tokenBuckets 300 3600

-- Purge thread runs in background, cleaning up unused buckets

Leaky Bucket Purging

-- Create STM map for leaky buckets  
leakyBuckets <- StmMap.newIO

-- Start purging inactive buckets every 2 minutes, TTL 30 minutes
threadId <- startCustomPurgeLeakyBucket leakyBuckets 120 1800

-- Thread automatically removes buckets not used for 30+ minutes

Thread Management

All purge functions return ThreadId values that can be used for thread management (killing, monitoring, etc.). The threads run indefinitely until explicitly terminated.

Memory Management

The purge system is designed to prevent memory leaks in long-running applications by:

  • Removing expired cache entries
  • Cleaning up unused worker threads and locks
  • Releasing STM resources for inactive buckets
  • Maintaining bounded memory usage regardless of request patterns
Synopsis

Data Types

Token Bucket Entries

data TokenBucketEntry Source #

Container for token bucket state and associated worker resources.

This type encapsulates all the resources needed for a single token bucket rate limiter, including the bucket state, request queue, and worker thread synchronization. It's designed to work with STM-based concurrent access patterns.

Resource Management

The entry includes a worker lock (tbeWorkerLock) that coordinates worker thread lifecycle:

  • Empty TMVar: No worker thread is currently running for this bucket
  • Full TMVar: A worker thread is active and processing requests

Cleanup Behavior

When purging expired entries, the system:

  1. Attempts to take the worker lock (non-blocking)
  2. If successful, terminates any associated worker thread
  3. Removes the entry from the map
  4. Releases all associated STM resources

Example

-- Create a token bucket entry
state <- newTVarIO $ TokenBucketState 100 now
queue <- TQueue.newTBroadcastTQueueIO
lock <- newEmptyTMVarIO

let entry = TokenBucketEntry
      { tbeState = state
      , tbeQueue = queue  
      , tbeWorkerLock = lock
      }

Constructors

TokenBucketEntry 

Fields

  • tbeState :: TVar TokenBucketState

    Atomic bucket state containing current token count and last update timestamp. Shared between worker thread and clients.

  • tbeQueue :: TQueue (MVar Bool)

    Request queue for client communication. Clients place response MVars here and wait for worker to process and respond.

  • tbeWorkerLock :: TMVar ()

    Worker thread synchronization lock. Empty when no worker exists, full when active. Used for coordinating worker lifecycle and cleanup.

Leaky Bucket Entries

data LeakyBucketEntry Source #

Container for leaky bucket state and associated worker resources.

Similar to TokenBucketEntry but designed for leaky bucket algorithm requirements. The leaky bucket uses continuous time calculations and different queue communication patterns.

Differences from Token Bucket

  • Uses 'TMVar Bool' instead of 'MVar Bool' for queue communication
  • State contains Double timestamps for sub-second precision
  • Worker threads implement continuous draining logic

Resource Lifecycle

The entry lifecycle mirrors token buckets:

  1. Creation: Entry is created and added to STM map
  2. Activation: First request triggers worker thread creation
  3. Processing: Worker handles requests and updates state
  4. Expiration: Entry is purged after TTL period of inactivity
  5. Cleanup: Worker is terminated and resources are released

Example

-- Create a leaky bucket entry
state <- newTVarIO $ LeakyBucketState 0.0 now
queue <- TQueue.newTBroadcastTQueueIO
lock <- newEmptyTMVarIO

let entry = LeakyBucketEntry
      { lbeState = state
      , lbeQueue = queue
      , lbeWorkerLock = lock  
      }

Constructors

LeakyBucketEntry 

Fields

  • lbeState :: TVar LeakyBucketState

    Atomic bucket state with current level and last update timestamp (Double precision). Shared between worker and clients.

  • lbeQueue :: TQueue (TMVar Bool)

    Request queue using TMVar for STM-based client communication. Enables atomic request queuing and response handling.

  • lbeWorkerLock :: TMVar ()

    Worker thread coordination lock, same semantics as token bucket but for leaky bucket worker lifecycle management.

Purging Functions

Generic Cache Purging

startAutoPurge Source #

Arguments

:: Cache Text v

The cache instance to purge. Can contain any value type v. Must support purgeExpired operation for TTL-based cleanup.

-> Integer

Purge interval in seconds. Determines how frequently expired entries are removed. Shorter intervals provide more responsive cleanup but use more CPU. Typical values: 60-3600 seconds.

-> IO ()

Returns immediately. The purge thread runs in background indefinitely.

Start a background thread that periodically purges expired entries from a generic cache.

This function creates a self-regulating purge thread that maintains precise timing intervals regardless of purge operation duration. It uses monotonic clock measurements to ensure accurate scheduling even under system load.

Timing Algorithm

The purge cycle works as follows:

  1. Start Timer: Record monotonic timestamp before purge
  2. Purge Operation: Call purgeExpired on the cache
  3. Calculate Remaining: Subtract elapsed time from target interval
  4. Precise Sleep: Wait for exactly the remaining time
  5. Repeat: Continue with next cycle

Timing Precision

The algorithm ensures that purge cycles happen at regular intervals:

Target Interval:    |----10s----|----10s----|----10s----|
Actual Timing:      |--9s purge-|1s wait|--8s purge--|2s wait|
Result:             Consistent 10-second intervals maintained

Performance Considerations

  • Non-blocking: Returns immediately after starting background thread
  • Self-correcting: Adapts sleep time based on actual purge duration
  • Memory efficient: Only maintains minimal state for timing
  • CPU friendly: Sleeps for majority of time between purges

Example Usage

import qualified Data.Cache as C
import Data.Text

-- Create cache with 30-minute TTL
cache <- C.newCache (Just $ C.minutes 30)

-- Purge expired entries every 5 minutes (300 seconds)
startAutoPurge cache 300

-- Cache now automatically maintains itself
-- Memory usage remains bounded regardless of request patterns

Thread Safety: Safe to call concurrently. Each call creates independent purge thread.

Resource Usage: Creates one background thread with minimal memory footprint.

Error Handling: Thread continues running even if individual purge operations fail.

Custom STM Map Purging

startCustomPurge :: (entry -> STM Double) -> (Text -> entry -> STM ()) -> Map Text entry -> Integer -> Integer -> IO ThreadId Source #

Generic purge loop implementation for STM-based data structures.

This is the foundational purge function that powers both token bucket and leaky bucket purging. It provides a flexible framework for implementing custom purge logic while maintaining consistent timing and cleanup behavior.

Algorithm Overview

The purge cycle consists of several phases:

  1. Scan Phase: Atomically list all entries in the STM map
  2. Filter Phase: Identify expired entries based on timestamp extraction
  3. Cleanup Phase: Execute custom delete actions for expired entries
  4. Timing Phase: Calculate sleep duration to maintain precise intervals
  5. Sleep Phase: Wait until next purge cycle should begin

Atomicity Guarantees

  • Entry Listing: All entries are captured in a single STM transaction
  • Expiration Check: Timestamp extraction is atomic per entry
  • Deletion: Custom delete actions are executed atomically
  • Consistency: Map state remains consistent throughout the process

Custom Delete Actions

The deleteAction parameter allows specialized cleanup logic:

-- Simple deletion
deleteAction key entry = StmMap.delete key stmMap

-- Cleanup with resource release
deleteAction key entry = do
  releaseResources entry  -- Custom cleanup
  StmMap.delete key stmMap

-- Conditional deletion  
deleteAction key entry = do
  shouldDelete <- checkCondition entry
  when shouldDelete $ StmMap.delete key stmMap

Error Handling

The function is designed to be resilient:

  • Individual entry failures don't stop the purge cycle
  • Timing calculations handle clock adjustments gracefully
  • Thread continues running even if STM transactions retry

Performance Optimization

  • Batch Processing: All deletions happen in a single STM transaction
  • Minimal Copying: Entries are processed in-place where possible
  • Efficient Filtering: Uses lazy evaluation for timestamp checks
  • Precise Timing: Avoids unnecessary CPU usage during sleep periods

Example Usage

-- Custom purge for application-specific entries
startCustomPurge
  -- Extract timestamp from custom entry type
  (\entry -> readTVar (customTimestamp entry))
  -- Custom cleanup with logging
  (\key entry -> do
      liftIO $ logInfo ("Purging entry: " ++ show key)
      releaseCustomResources entry
      StmMap.delete key stmMap)
  customMap
  600    -- 10-minute intervals
  3600   -- 1-hour TTL

Flexibility: Supports any entry type with extractable timestamps.

Extensibility: Custom delete actions enable complex cleanup scenarios.

Reliability: Robust error handling ensures continuous operation.

  • getTimestamp - Timestamp extraction function. Called for each entry to determine last activity time. Should be fast and side-effect free. Returns Unix timestamp as Double for sub-second precision.
  • deleteAction - Custom delete action executed for expired entries. Receives both key and entry for context. Should handle resource cleanup and map removal atomically. Can perform logging, resource release, etc.
  • stmMap - STM map to purge. Entries are identified by Text keys and can be any type entry. Map is scanned completely during each purge cycle for expired entries.
  • intervalSeconds - Purge interval in seconds. Controls how frequently the purge operation runs. Shorter intervals provide more responsive cleanup but increase CPU usage.
  • ttlSeconds - TTL (time-to-live) in seconds. Entries with timestamps older than (currentTime - ttlSeconds) are considered expired and eligible for removal.

Returns thread ID of the purge thread. Thread runs indefinitely until killed. Can be used for thread management and monitoring.

startCustomPurgeTokenBucket :: Map Text TokenBucketEntry -> Integer -> Integer -> IO ThreadId Source #

Start a specialized purge thread for token bucket entries in an STM map.

This function provides a convenient wrapper around startCustomPurge specifically configured for TokenBucketEntry cleanup. It handles the complexities of extracting timestamps from token bucket state and properly cleaning up worker threads.

Token Bucket Specific Behavior

  • Timestamp Extraction: Uses lastUpdate field from TokenBucketState
  • Worker Cleanup: Attempts to acquire worker lock and terminate threads
  • Resource Release: Removes entry from STM map and frees all resources

TTL Behavior

Token buckets are considered expired when:

currentTime - lastUpdateTime >= ttlSeconds

This means buckets are purged based on when they were last used for rate limiting, not when they were created. Active buckets are never purged regardless of age.

Example Scenarios

-- High-frequency API with 5-minute cleanup cycles
tokenBuckets <- StmMap.newIO
threadId <- startCustomPurgeTokenBucket tokenBuckets 300 3600  -- 5min interval, 1hr TTL

-- Low-frequency batch processing with daily cleanup  
batchBuckets <- StmMap.newIO
threadId <- startCustomPurgeTokenBucket batchBuckets 86400 604800  -- 1day interval, 1week TTL

Thread Management: Returns ThreadId for thread control (e.g., killThread).

Memory Impact: Prevents unbounded memory growth in applications with dynamic rate limiting keys.

Performance: O(n) scan of all buckets, but typically runs infrequently during low-traffic periods.

  • stmMap - STM map containing token bucket entries keyed by identifier. Typically uses API keys, user IDs, or IP addresses as keys. Map is scanned atomically during each purge cycle.
  • intervalSeconds - Purge interval in seconds. How often to scan for expired buckets. Balance between cleanup responsiveness and CPU usage. Recommended: 300-3600 seconds depending on traffic patterns.
  • ttlSeconds - TTL (time-to-live) in seconds. Buckets unused for this duration are considered expired and eligible for removal. Should be much larger than typical request intervals. Recommended: 3600-86400 seconds.

Returns thread ID of the background purge thread. Can be used with killThread to stop purging.

startCustomPurgeLeakyBucket :: Map Text LeakyBucketEntry -> Integer -> Integer -> IO ThreadId Source #

Start a specialized purge thread for leaky bucket entries in an STM map.

Similar to startCustomPurgeTokenBucket but configured for LeakyBucketEntry cleanup. Handles the specific requirements of leaky bucket timestamp extraction and worker thread management.

Leaky Bucket Specific Behavior

  • Timestamp Precision: Uses Double timestamp from LeakyBucketState for sub-second accuracy
  • Continuous Time Model: Supports fractional timestamps for precise drain calculations
  • Worker Cleanup: Terminates continuous drain worker threads properly

TTL Calculation

Leaky buckets are expired when:

currentTime - lastTime >= fromIntegral ttlSeconds

The lastTime field represents the most recent bucket level update, providing more precise expiration timing than integer-based systems.

Use Cases

  • Streaming Applications: Rate limiting for continuous data flows
  • Real-time APIs: Sub-second precision rate limiting
  • Traffic Shaping: Network bandwidth management with smooth rates

Example Configuration

-- Real-time streaming with frequent cleanup
streamBuckets <- StmMap.newIO  
threadId <- startCustomPurgeLeakyBucket streamBuckets 60 900  -- 1min interval, 15min TTL

-- Network traffic shaping with moderate cleanup
trafficBuckets <- StmMap.newIO
threadId <- startCustomPurgeLeakyBucket trafficBuckets 600 7200  -- 10min interval, 2hr TTL

Precision: Sub-second timestamp accuracy for fine-grained rate control.

Cleanup Behavior: More aggressive cleanup suitable for high-frequency, short-lived connections.

  • stmMap - STM map containing leaky bucket entries keyed by identifier. Keys typically represent connections, streams, or data flows. All entries are scanned atomically during purge operations.
  • intervalSeconds - Purge interval in seconds. Frequency of expired entry cleanup. For high-frequency applications, shorter intervals (60-600s) provide more responsive cleanup.
  • ttlSeconds - TTL in seconds. Buckets inactive for this duration are purged. Should account for typical connection/stream lifetime patterns. Shorter TTL (900-3600s) suitable for transient connections.

Returns thread ID for background purge thread management. Thread runs continuously until explicitly terminated.