streamly-core
Copyright(c) 2025 Composewell Technologies
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityreleased
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Data.RingArray

Description

This module provides APIs to create and use unboxed, mutable ring arrays of fixed size. Ring arrays are useful to keep a circular buffer or a sliding window of elements.

RingArrays are of fixed size but there is a way to expand the size of the ring, you can copy the ring to a MutArray, expand the MutArray and the cast it back to RingArray.

This module is designed to be imported qualified:

>>> import qualified Streamly.Data.RingArray as Ring

Please refer to Streamly.Internal.Data.RingArray for functions that have not yet been released.

Synopsis

Documentation

data RingArray a Source #

A ring buffer is a circular buffer. A new element is inserted at a position called the ring head which points to the oldest element in the ring, an insert overwrites the oldest element. After inserting, the head is moved to point to the next element which is now the oldest element.

Elements in the ring are indexed relative to the head. RingArray head is designated as the index 0 of the ring buffer, it points to the oldest or the first element in the buffer. Higher positive indices point to the newer elements in the buffer. Index -1 points to the newest or the last element in the buffer. Higher negative indices point to older elements.

The ring is of fixed size and cannot be expanded or reduced after creation. Creation of zero sized rings is not allowed.

This module provides an unboxed implementation of ring buffers for best performance.

Construction

createOfLast :: forall a (m :: Type -> Type). (Unbox a, MonadIO m) => Int -> Fold m a (RingArray a) Source #

createOfLast n returns the last n elements of the stream in a ring array. n must be non-zero.

castMutArray :: Unbox a => MutArray a -> Maybe (RingArray a) Source #

Cast a MutArray to a ring sharing the same memory without copying. The ring head is positioned at index 0 of the array. The size of the ring is equal to the MutArray length.

See castMutArrayWith for failure scenarios.

>>> castMutArray = RingArray.castMutArrayWith 0

castMutArrayWith :: Unbox a => Int -> MutArray a -> Maybe (RingArray a) Source #

castMutArrayWith index arr casts a mutable array to a ring array, and positions the ring head at the given index in the array.

A MutArray can be a slice which means its memory starts from some offset in the underlying MutableByteArray, and not from 0 offset. RingArray always uses the memory from offset zero in the MutableByteArray, therefore, it refuses to cast if it finds the array does not start from offset zero i.e. if the array was created from some slicing operation over another array. In such cases it returns Nothing.

To create a RingArray from a sliced MutArray use createOfLast, or clone the MutArray and then cast it.

This operation throws an error if the index is not within the array bounds.

Moving the Head

moveForward :: Unbox a => RingArray a -> RingArray a Source #

Advance the ring head forward by 1 slot, the ring head will now point to the next (newer) item, and the old ring head position will become the latest or the newest item position.

>>> moveForward = RingArray.moveBy 1

moveReverse :: Unbox a => RingArray a -> RingArray a Source #

Move the ring head backward by 1 slot, the ring head will now point to the prev (older) item, when the ring head is at the oldest item it will move to the newest item.

>>> moveForward = RingArray.moveBy (-1)

In-place Mutation

insert :: RingArray a -> a -> m (RingArray a) Source #

Insert a new element without replacing an old one. Expands the size of the ring. This is similar to the snoc operation for MutArray.

Unimplemented

replace :: (MonadIO m, Unbox a) => RingArray a -> a -> m (RingArray a, a) Source #

Replace the oldest item in the ring (the item at the ring head) with a new item and move the ring head to the remaining oldest item.

Throws an error if the ring is empty.

replace_ :: (MonadIO m, Unbox a) => RingArray a -> a -> m (RingArray a) Source #

Like replace but does not return the old value of overwritten element.

Same as:

>>> replace_ rb x = RingArray.putIndex 0 rb x >> pure (RingArray.moveForward rb)

putIndex :: (MonadIO m, Unbox a) => Int -> RingArray a -> a -> m () Source #

O(1) Write the given element at the given index relative to the current position of the ring head. Index starts at 0, could be positive or negative.

Throws an error if the index is more than or equal to the size of the ring.

Performs in-place mutation of the array.

modifyIndex :: Int -> RingArray a -> (a -> (a, b)) -> m b Source #

Modify a given index of a ring array using a modifier function.

Unimplemented

Random Access

getIndex :: (MonadIO m, Unbox a) => Int -> RingArray a -> m (Maybe a) Source #

O(1) Lookup the element at the given index relative to the ring head. Index starts from 0, could be positive or negative. Returns Nothing if the index is more than or equal to the size of the ring.

unsafeGetIndex :: (MonadIO m, Unbox a) => Int -> RingArray a -> m a Source #

Like getIndex but does not check the bounds. Unpredictable behavior occurs if the index is more than or equal to the ring size.

unsafeGetHead :: (MonadIO m, Unbox a) => RingArray a -> m a Source #

O(1) Lookup the element at the head position.

Prefer this over unsafeGetIndex 0 as it does not have have to perform an index rollover check.

Conversion

toList :: (MonadIO m, Unbox a) => RingArray a -> m [a] Source #

Copy the ring to a list, the first element of the list is the oldest element of the ring (i.e. ring head) and the last is the newest.

>>> toList = Stream.toList . RingArray.read

toMutArray :: (MonadIO m, Unbox a) => RingArray a -> m (MutArray a) Source #

Copy the ring to a MutArray, the first element of the MutArray is the oldest element of the ring (i.e. ring head) and the last is the newest.

>>> toMutArray rb = Stream.fold (MutArray.createOf (RingArray.length rb)) $ RingArray.read rb

Streams

read :: forall (m :: Type -> Type) a. (MonadIO m, Unbox a) => RingArray a -> Stream m a Source #

Read the entire ring as a stream, starting at the ring head i.e. from oldest to newest.

readRev :: forall (m :: Type -> Type) a. (MonadIO m, Unbox a) => RingArray a -> Stream m a Source #

Read the entire ring as a stream, starting from newest to oldest elements.

Unfolds

reader :: forall (m :: Type -> Type) a. (MonadIO m, Unbox a) => Unfold m (RingArray a) a Source #

Read the entire ring, starting at the ring head i.e. from oldest to newest.

readerRev :: forall (m :: Type -> Type) a. (MonadIO m, Unbox a) => Unfold m (RingArray a) a Source #

Read the entire ring in reverse order, starting at the item before the ring head i.e. from newest to oldest

Size

length :: Unbox a => RingArray a -> Int Source #

O(1) Get the length of the ring. i.e. the number of elements in the ring.

byteLength :: RingArray a -> Int Source #

O(1) Get the byte length of the ring.

Casting

cast :: forall a b. Unbox b => RingArray a -> Maybe (RingArray b) Source #

Cast a ring having elements of type a into a ring having elements of type b. The length of the ring should be a multiple of the size of the target element otherwise Nothing is returned.

asBytes :: RingArray a -> RingArray Word8 Source #

Cast a RingArray a into a RingArray Word8.

asMutArray :: RingArray a -> (MutArray a, Int) Source #

Cast the ring to a mutable array. Return the mutable array as well as the current position of the ring head. Note that the array does not start with the current ring head. The array refers to the same memory as the ring.

Folds

fold :: (MonadIO m, Unbox a) => Fold m a b -> RingArray a -> m b Source #

Fold the entire length of a ring buffer starting at the current ring head.

Stream of Rings

ringsOf :: forall (m :: Type -> Type) a. (MonadIO m, Unbox a) => Int -> Stream m a -> Stream m (RingArray a) Source #

ringsOf n stream groups the input stream into a stream of ring arrays of size up to n. See scanRingsOf for more details.

scanRingsOf :: forall (m :: Type -> Type) a. (MonadIO m, Unbox a) => Int -> Scanl m a (RingArray a) Source #

scanRingsOf n groups the input stream into a stream of ring arrays of size up to n. The first ring would be of size 1, then 2, and so on up to size n, when size n is reached the ring starts sliding out the oldest elements and keeps the newest n elements.

Note that the ring emitted is a mutable reference, therefore, should not be retained without copying otherwise the contents will change in the next iteration of the stream.

Fast Byte Comparisons

eqArray :: RingArray a -> Array a -> IO Bool Source #

Byte compare the entire length of ringBuffer with the given array, starting at the supplied ring head index. Returns true if the Array and the ring have identical contents. If the array is bigger checks only up to the ring length. If array is shorter than then ring, it is treated as an error.

eqArrayN :: RingArray a -> Array a -> Int -> IO Bool Source #

Like eqArray but compares only N bytes instead of entire length of the ring buffer. If N is bigger than the ring or array size, it is treated as an error.