Copyright | (c) Oleksandr Zhabenko |
---|---|
License | MIT |
Maintainer | oleksandr.zhabenko@yahoo.com |
Stability | experimental |
Portability | POSIX |
Safe Haskell | None |
Language | Haskell2010 |
Keter.RateLimiter.AutoPurge
Description
This module provides automatic purging and cleanup functionality for rate limiter caches and STM-based data structures. It implements background threads that periodically remove expired entries to prevent memory leaks and maintain optimal performance.
Purging Strategies
The module supports two main purging approaches:
- Generic Cache Purging: Works with any
Cache
instance - Custom STM Map Purging: Specialized for rate limiter bucket entries
Architecture Overview
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐ │ Cache/STM │ │ Purge Thread │ │ Timer Logic │ │ Map │◄───┤ │◄───┤ │ └─────────────────┘ │ • Check TTL │ │ • Interval calc │ │ • Remove expired│ │ • Sleep mgmt │ │ • Cleanup locks │ │ • Precise timing│ └──────────────────┘ └─────────────────┘
Performance Characteristics
- Time Complexity: O(n) where n is the number of entries (full scan)
- Space Complexity: O(1) additional memory usage
- Timing Precision: Microsecond-level accuracy using monotonic clock
- Thread Safety: All operations are atomic using STM
Example Usage
Basic Cache Purging
import qualified Data.Cache as C import Data.Text -- Create a cache with 1-hour TTL cache <- C.newCache (Just $ C.seconds 3600) -- Start auto-purge every 10 minutes (600 seconds) startAutoPurge cache 600 -- Cache will now automatically remove expired entries
Token Bucket Purging
import qualified StmContainers.Map as StmMap -- Create STM map for token buckets tokenBuckets <- StmMap.newIO -- Start purging inactive buckets every 5 minutes, TTL 1 hour threadId <- startCustomPurgeTokenBucket tokenBuckets 300 3600 -- Purge thread runs in background, cleaning up unused buckets
Leaky Bucket Purging
-- Create STM map for leaky buckets leakyBuckets <- StmMap.newIO -- Start purging inactive buckets every 2 minutes, TTL 30 minutes threadId <- startCustomPurgeLeakyBucket leakyBuckets 120 1800 -- Thread automatically removes buckets not used for 30+ minutes
Thread Management
All purge functions return ThreadId
values that can be used for thread
management (killing, monitoring, etc.). The threads run indefinitely until
explicitly terminated.
Memory Management
The purge system is designed to prevent memory leaks in long-running applications by:
- Removing expired cache entries
- Cleaning up unused worker threads and locks
- Releasing STM resources for inactive buckets
- Maintaining bounded memory usage regardless of request patterns
Synopsis
- data TokenBucketEntry = TokenBucketEntry {
- tbeState :: TVar TokenBucketState
- tbeQueue :: TQueue (MVar Bool)
- tbeWorkerLock :: TMVar ()
- data LeakyBucketEntry = LeakyBucketEntry {
- lbeState :: TVar LeakyBucketState
- lbeQueue :: TQueue (TMVar Bool)
- lbeWorkerLock :: TMVar ()
- startAutoPurge :: Cache Text v -> Integer -> IO ()
- startCustomPurge :: (entry -> STM Double) -> (Text -> entry -> STM ()) -> Map Text entry -> Integer -> Integer -> IO ThreadId
- startCustomPurgeTokenBucket :: Map Text TokenBucketEntry -> Integer -> Integer -> IO ThreadId
- startCustomPurgeLeakyBucket :: Map Text LeakyBucketEntry -> Integer -> Integer -> IO ThreadId
Data Types
Token Bucket Entries
data TokenBucketEntry Source #
Container for token bucket state and associated worker resources.
This type encapsulates all the resources needed for a single token bucket rate limiter, including the bucket state, request queue, and worker thread synchronization. It's designed to work with STM-based concurrent access patterns.
Resource Management
The entry includes a worker lock (tbeWorkerLock
) that coordinates worker
thread lifecycle:
- Empty TMVar: No worker thread is currently running for this bucket
- Full TMVar: A worker thread is active and processing requests
Cleanup Behavior
When purging expired entries, the system:
- Attempts to take the worker lock (non-blocking)
- If successful, terminates any associated worker thread
- Removes the entry from the map
- Releases all associated STM resources
Example
-- Create a token bucket entry state <- newTVarIO $ TokenBucketState 100 now queue <- TQueue.newTBroadcastTQueueIO lock <- newEmptyTMVarIO let entry = TokenBucketEntry { tbeState = state , tbeQueue = queue , tbeWorkerLock = lock }
Constructors
TokenBucketEntry | |
Fields
|
Leaky Bucket Entries
data LeakyBucketEntry Source #
Container for leaky bucket state and associated worker resources.
Similar to TokenBucketEntry
but designed for leaky bucket algorithm
requirements. The leaky bucket uses continuous time calculations and
different queue communication patterns.
Differences from Token Bucket
- Uses 'TMVar Bool' instead of 'MVar Bool' for queue communication
- State contains
Double
timestamps for sub-second precision - Worker threads implement continuous draining logic
Resource Lifecycle
The entry lifecycle mirrors token buckets:
- Creation: Entry is created and added to STM map
- Activation: First request triggers worker thread creation
- Processing: Worker handles requests and updates state
- Expiration: Entry is purged after TTL period of inactivity
- Cleanup: Worker is terminated and resources are released
Example
-- Create a leaky bucket entry state <- newTVarIO $ LeakyBucketState 0.0 now queue <- TQueue.newTBroadcastTQueueIO lock <- newEmptyTMVarIO let entry = LeakyBucketEntry { lbeState = state , lbeQueue = queue , lbeWorkerLock = lock }
Constructors
LeakyBucketEntry | |
Fields
|
Purging Functions
Generic Cache Purging
Arguments
:: Cache Text v | The cache instance to purge. Can contain any value type |
-> Integer | Purge interval in seconds. Determines how frequently expired entries are removed. Shorter intervals provide more responsive cleanup but use more CPU. Typical values: 60-3600 seconds. |
-> IO () | Returns immediately. The purge thread runs in background indefinitely. |
Start a background thread that periodically purges expired entries from a generic cache.
This function creates a self-regulating purge thread that maintains precise timing intervals regardless of purge operation duration. It uses monotonic clock measurements to ensure accurate scheduling even under system load.
Timing Algorithm
The purge cycle works as follows:
- Start Timer: Record monotonic timestamp before purge
- Purge Operation: Call
purgeExpired
on the cache - Calculate Remaining: Subtract elapsed time from target interval
- Precise Sleep: Wait for exactly the remaining time
- Repeat: Continue with next cycle
Timing Precision
The algorithm ensures that purge cycles happen at regular intervals:
Target Interval: |----10s----|----10s----|----10s----| Actual Timing: |--9s purge-|1s wait|--8s purge--|2s wait| Result: Consistent 10-second intervals maintained
Performance Considerations
- Non-blocking: Returns immediately after starting background thread
- Self-correcting: Adapts sleep time based on actual purge duration
- Memory efficient: Only maintains minimal state for timing
- CPU friendly: Sleeps for majority of time between purges
Example Usage
import qualified Data.Cache as C import Data.Text -- Create cache with 30-minute TTL cache <- C.newCache (Just $ C.minutes 30) -- Purge expired entries every 5 minutes (300 seconds) startAutoPurge cache 300 -- Cache now automatically maintains itself -- Memory usage remains bounded regardless of request patterns
Thread Safety: Safe to call concurrently. Each call creates independent purge thread.
Resource Usage: Creates one background thread with minimal memory footprint.
Error Handling: Thread continues running even if individual purge operations fail.
Custom STM Map Purging
startCustomPurge :: (entry -> STM Double) -> (Text -> entry -> STM ()) -> Map Text entry -> Integer -> Integer -> IO ThreadId Source #
Generic purge loop implementation for STM-based data structures.
This is the foundational purge function that powers both token bucket and leaky bucket purging. It provides a flexible framework for implementing custom purge logic while maintaining consistent timing and cleanup behavior.
Algorithm Overview
The purge cycle consists of several phases:
- Scan Phase: Atomically list all entries in the STM map
- Filter Phase: Identify expired entries based on timestamp extraction
- Cleanup Phase: Execute custom delete actions for expired entries
- Timing Phase: Calculate sleep duration to maintain precise intervals
- Sleep Phase: Wait until next purge cycle should begin
Atomicity Guarantees
- Entry Listing: All entries are captured in a single STM transaction
- Expiration Check: Timestamp extraction is atomic per entry
- Deletion: Custom delete actions are executed atomically
- Consistency: Map state remains consistent throughout the process
Custom Delete Actions
The deleteAction
parameter allows specialized cleanup logic:
-- Simple deletion deleteAction key entry = StmMap.delete key stmMap -- Cleanup with resource release deleteAction key entry = do releaseResources entry -- Custom cleanup StmMap.delete key stmMap -- Conditional deletion deleteAction key entry = do shouldDelete <- checkCondition entry when shouldDelete $ StmMap.delete key stmMap
Error Handling
The function is designed to be resilient:
- Individual entry failures don't stop the purge cycle
- Timing calculations handle clock adjustments gracefully
- Thread continues running even if STM transactions retry
Performance Optimization
- Batch Processing: All deletions happen in a single STM transaction
- Minimal Copying: Entries are processed in-place where possible
- Efficient Filtering: Uses lazy evaluation for timestamp checks
- Precise Timing: Avoids unnecessary CPU usage during sleep periods
Example Usage
-- Custom purge for application-specific entries startCustomPurge -- Extract timestamp from custom entry type (\entry -> readTVar (customTimestamp entry)) -- Custom cleanup with logging (\key entry -> do liftIO $ logInfo ("Purging entry: " ++ show key) releaseCustomResources entry StmMap.delete key stmMap) customMap 600 -- 10-minute intervals 3600 -- 1-hour TTL
Flexibility: Supports any entry type with extractable timestamps.
Extensibility: Custom delete actions enable complex cleanup scenarios.
Reliability: Robust error handling ensures continuous operation.
getTimestamp
- Timestamp extraction function. Called for each entry to determine last activity time. Should be fast and side-effect free. Returns Unix timestamp asDouble
for sub-second precision.deleteAction
- Custom delete action executed for expired entries. Receives both key and entry for context. Should handle resource cleanup and map removal atomically. Can perform logging, resource release, etc.stmMap
- STM map to purge. Entries are identified byText
keys and can be any typeentry
. Map is scanned completely during each purge cycle for expired entries.intervalSeconds
- Purge interval in seconds. Controls how frequently the purge operation runs. Shorter intervals provide more responsive cleanup but increase CPU usage.ttlSeconds
- TTL (time-to-live) in seconds. Entries with timestamps older than (currentTime - ttlSeconds) are considered expired and eligible for removal.
Returns thread ID of the purge thread. Thread runs indefinitely until killed. Can be used for thread management and monitoring.
startCustomPurgeTokenBucket :: Map Text TokenBucketEntry -> Integer -> Integer -> IO ThreadId Source #
Start a specialized purge thread for token bucket entries in an STM map.
This function provides a convenient wrapper around startCustomPurge
specifically
configured for TokenBucketEntry
cleanup. It handles the complexities of extracting
timestamps from token bucket state and properly cleaning up worker threads.
Token Bucket Specific Behavior
- Timestamp Extraction: Uses
lastUpdate
field fromTokenBucketState
- Worker Cleanup: Attempts to acquire worker lock and terminate threads
- Resource Release: Removes entry from STM map and frees all resources
TTL Behavior
Token buckets are considered expired when:
currentTime - lastUpdateTime >= ttlSeconds
This means buckets are purged based on when they were last used for rate limiting, not when they were created. Active buckets are never purged regardless of age.
Example Scenarios
-- High-frequency API with 5-minute cleanup cycles tokenBuckets <- StmMap.newIO threadId <- startCustomPurgeTokenBucket tokenBuckets 300 3600 -- 5min interval, 1hr TTL -- Low-frequency batch processing with daily cleanup batchBuckets <- StmMap.newIO threadId <- startCustomPurgeTokenBucket batchBuckets 86400 604800 -- 1day interval, 1week TTL
Thread Management: Returns ThreadId
for thread control (e.g., killThread
).
Memory Impact: Prevents unbounded memory growth in applications with dynamic rate limiting keys.
Performance: O(n) scan of all buckets, but typically runs infrequently during low-traffic periods.
stmMap
- STM map containing token bucket entries keyed by identifier. Typically uses API keys, user IDs, or IP addresses as keys. Map is scanned atomically during each purge cycle.intervalSeconds
- Purge interval in seconds. How often to scan for expired buckets. Balance between cleanup responsiveness and CPU usage. Recommended: 300-3600 seconds depending on traffic patterns.ttlSeconds
- TTL (time-to-live) in seconds. Buckets unused for this duration are considered expired and eligible for removal. Should be much larger than typical request intervals. Recommended: 3600-86400 seconds.
Returns thread ID of the background purge thread.
Can be used with killThread
to stop purging.
startCustomPurgeLeakyBucket :: Map Text LeakyBucketEntry -> Integer -> Integer -> IO ThreadId Source #
Start a specialized purge thread for leaky bucket entries in an STM map.
Similar to startCustomPurgeTokenBucket
but configured for LeakyBucketEntry
cleanup.
Handles the specific requirements of leaky bucket timestamp extraction and worker
thread management.
Leaky Bucket Specific Behavior
- Timestamp Precision: Uses
Double
timestamp fromLeakyBucketState
for sub-second accuracy - Continuous Time Model: Supports fractional timestamps for precise drain calculations
- Worker Cleanup: Terminates continuous drain worker threads properly
TTL Calculation
Leaky buckets are expired when:
currentTime - lastTime >= fromIntegral ttlSeconds
The lastTime
field represents the most recent bucket level update, providing
more precise expiration timing than integer-based systems.
Use Cases
- Streaming Applications: Rate limiting for continuous data flows
- Real-time APIs: Sub-second precision rate limiting
- Traffic Shaping: Network bandwidth management with smooth rates
Example Configuration
-- Real-time streaming with frequent cleanup streamBuckets <- StmMap.newIO threadId <- startCustomPurgeLeakyBucket streamBuckets 60 900 -- 1min interval, 15min TTL -- Network traffic shaping with moderate cleanup trafficBuckets <- StmMap.newIO threadId <- startCustomPurgeLeakyBucket trafficBuckets 600 7200 -- 10min interval, 2hr TTL
Precision: Sub-second timestamp accuracy for fine-grained rate control.
Cleanup Behavior: More aggressive cleanup suitable for high-frequency, short-lived connections.
stmMap
- STM map containing leaky bucket entries keyed by identifier. Keys typically represent connections, streams, or data flows. All entries are scanned atomically during purge operations.intervalSeconds
- Purge interval in seconds. Frequency of expired entry cleanup. For high-frequency applications, shorter intervals (60-600s) provide more responsive cleanup.ttlSeconds
- TTL in seconds. Buckets inactive for this duration are purged. Should account for typical connection/stream lifetime patterns. Shorter TTL (900-3600s) suitable for transient connections.
Returns thread ID for background purge thread management. Thread runs continuously until explicitly terminated.