Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD3 |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Streamly.Internal.Data.StreamK
Description
Synopsis
- foldr :: Monad m => (a -> b -> b) -> b -> StreamK m a -> m b
- repeat :: forall a (m :: Type -> Type). a -> StreamK m a
- foldl' :: Monad m => (b -> a -> b) -> b -> StreamK m a -> m b
- mfix :: Monad m => (m a -> StreamK m a) -> StreamK m a
- unfoldr :: forall b a (m :: Type -> Type). (b -> Maybe (a, b)) -> b -> StreamK m a
- build :: forall (m :: Type -> Type) a. (forall b. (a -> b -> b) -> b -> b) -> StreamK m a
- map :: forall a b (m :: Type -> Type). (a -> b) -> StreamK m a -> StreamK m b
- fromList :: forall a (m :: Type -> Type). [a] -> StreamK m a
- uncons :: Applicative m => StreamK m a -> m (Maybe (a, StreamK m a))
- tail :: Applicative m => StreamK m a -> m (Maybe (StreamK m a))
- init :: Applicative m => StreamK m a -> m (Maybe (StreamK m a))
- null :: Monad m => StreamK m a -> m Bool
- reverse :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a
- concatMap :: forall a (m :: Type -> Type) b. (a -> StreamK m b) -> StreamK m a -> StreamK m b
- foldrM :: (a -> m b -> m b) -> m b -> StreamK m a -> m b
- type Stream = StreamK
- cons :: forall a (m :: Type -> Type). a -> StreamK m a -> StreamK m a
- append :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a -> StreamK m a
- fromPure :: forall a (m :: Type -> Type). a -> StreamK m a
- nil :: forall (m :: Type -> Type) a. StreamK m a
- newtype StreamK (m :: Type -> Type) a = MkStream (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
- nilM :: Applicative m => m b -> StreamK m a
- consM :: Monad m => m a -> StreamK m a -> StreamK m a
- before :: Monad m => m b -> StreamK m a -> StreamK m a
- mkStream :: (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a
- drain :: Monad m => StreamK m a -> m ()
- foldlM' :: Monad m => (b -> a -> m b) -> m b -> StreamK m a -> m b
- fromEffect :: Monad m => m a -> StreamK m a
- unfoldrM :: Monad m => (b -> m (Maybe (a, b))) -> b -> StreamK m a
- interleave :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a -> StreamK m a
- crossWith :: forall (m :: Type -> Type) a b c. Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c
- cross :: forall (m :: Type -> Type) a b. Monad m => StreamK m a -> StreamK m b -> StreamK m (a, b)
- concatEffect :: Monad m => m (StreamK m a) -> StreamK m a
- fairConcatMap :: forall a (m :: Type -> Type) b. (a -> StreamK m b) -> StreamK m a -> StreamK m b
- concatFor :: forall (m :: Type -> Type) a b. StreamK m a -> (a -> StreamK m b) -> StreamK m b
- fairConcatFor :: forall (m :: Type -> Type) a b. StreamK m a -> (a -> StreamK m b) -> StreamK m b
- concatForM :: Monad m => StreamK m a -> (a -> m (StreamK m b)) -> StreamK m b
- fairConcatForM :: Monad m => StreamK m a -> (a -> m (StreamK m b)) -> StreamK m b
- fromFoldable :: forall f a (m :: Type -> Type). Foldable f => f a -> StreamK m a
- bfsConcatMap :: forall a (m :: Type -> Type) b. (a -> StreamK m b) -> StreamK m a -> StreamK m b
- concatMapWith :: forall (m :: Type -> Type) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
- bfsConcatFor :: forall (m :: Type -> Type) a b. StreamK m a -> (a -> StreamK m b) -> StreamK m b
- bfsConcatForM :: Monad m => StreamK m a -> (a -> m (StreamK m b)) -> StreamK m b
- mergeMapWith :: forall (m :: Type -> Type) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
- newtype Nested (m :: Type -> Type) a = Nested {}
- fromFoldableM :: (Foldable f, Monad m) => f (m a) -> StreamK m a
- interleaveEndBy' :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a -> StreamK m a
- interleaveSepBy :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a -> StreamK m a
- interleaveMin :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a -> StreamK m a
- interleaveFst :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a -> StreamK m a
- foldrS :: forall a (m :: Type -> Type) b. (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b
- foldlS :: forall (m :: Type -> Type) b a. (StreamK m b -> a -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b
- initNonEmpty :: forall (m :: Type -> Type) a. Stream m a -> Stream m a
- tailNonEmpty :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a
- foldlx' :: forall m a b x. Monad m => (x -> a -> x) -> x -> (x -> b) -> StreamK m a -> m b
- foldlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> StreamK m a -> m b
- crossApply :: forall (m :: Type -> Type) a b. StreamK m (a -> b) -> StreamK m a -> StreamK m b
- crossApplyFst :: forall (m :: Type -> Type) a b. StreamK m a -> StreamK m b -> StreamK m a
- crossApplySnd :: forall (m :: Type -> Type) a b. StreamK m a -> StreamK m b -> StreamK m b
- mkCross :: forall (m :: Type -> Type) a. StreamK m a -> Nested m a
- unCross :: forall (m :: Type -> Type) a. CrossStreamK m a -> StreamK m a
- newtype FairNested (m :: Type -> Type) a = FairNested {
- unFairNested :: StreamK m a
- foldStream :: State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r
- foldStreamShared :: State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r
- foldrSShared :: forall a (m :: Type -> Type) b. (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b
- foldrSM :: Monad m => (m a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b
- buildS :: forall a (m :: Type -> Type). ((a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a
- buildM :: Monad m => (forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a
- buildSM :: Monad m => ((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a
- augmentS :: forall a (m :: Type -> Type). ((a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a
- augmentSM :: Monad m => ((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a
- unShare :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a
- fromStopK :: forall (m :: Type -> Type) a. StopK m -> StreamK m a
- fromYieldK :: forall (m :: Type -> Type) a. YieldK m a -> StreamK m a
- consK :: forall (m :: Type -> Type) a. YieldK m a -> StreamK m a -> StreamK m a
- (.:) :: forall a (m :: Type -> Type). a -> StreamK m a -> StreamK m a
- consMBy :: Monad m => (StreamK m a -> StreamK m a -> StreamK m a) -> m a -> StreamK m a -> StreamK m a
- unfoldrMWith :: Monad m => (m a -> StreamK m a -> StreamK m a) -> (b -> m (Maybe (a, b))) -> b -> StreamK m a
- repeatMWith :: (m a -> t m a -> t m a) -> m a -> t m a
- replicateMWith :: (m a -> StreamK m a -> StreamK m a) -> Int -> m a -> StreamK m a
- fromIndicesMWith :: (m a -> StreamK m a -> StreamK m a) -> (Int -> m a) -> StreamK m a
- iterateMWith :: Monad m => (m a -> StreamK m a -> StreamK m a) -> (a -> m a) -> m a -> StreamK m a
- headNonEmpty :: Monad m => StreamK m a -> m a
- mapMWith :: (m b -> StreamK m b -> StreamK m b) -> (a -> m b) -> StreamK m a -> StreamK m b
- mapMSerial :: Monad m => (a -> m b) -> StreamK m a -> StreamK m b
- mapMAccum :: (s -> a -> m (s, b)) -> m s -> StreamK m a -> StreamK m b
- conjoin :: forall (m :: Type -> Type) a. Monad m => StreamK m a -> StreamK m a -> StreamK m a
- crossApplyWith :: forall (m :: Type -> Type) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> StreamK m (a -> b) -> StreamK m a -> StreamK m b
- concatMapEffect :: Monad m => (b -> StreamK m a) -> m b -> StreamK m a
- concatMapMAccum :: (StreamK m b -> StreamK m b -> StreamK m b) -> (s -> a -> m (s, StreamK m b)) -> m s -> StreamK m a -> StreamK m b
- concatForWith :: forall (m :: Type -> Type) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> StreamK m a -> (a -> StreamK m b) -> StreamK m b
- concatForWithM :: Monad m => (StreamK m b -> StreamK m b -> StreamK m b) -> StreamK m a -> (a -> m (StreamK m b)) -> StreamK m b
- concatIterateWith :: forall (m :: Type -> Type) a. (StreamK m a -> StreamK m a -> StreamK m a) -> (a -> StreamK m a) -> StreamK m a -> StreamK m a
- concatIterateLeftsWith :: forall b a c (m :: Type -> Type). b ~ Either a c => (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m b -> StreamK m b
- concatIterateScanWith :: Monad m => (StreamK m a -> StreamK m a -> StreamK m a) -> (b -> a -> m (b, StreamK m a)) -> m b -> StreamK m a -> StreamK m a
- mergeIterateWith :: forall (m :: Type -> Type) a. (StreamK m a -> StreamK m a -> StreamK m a) -> (a -> StreamK m a) -> StreamK m a -> StreamK m a
- type CrossStreamK = Nested
- bindWith :: forall (m :: Type -> Type) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> StreamK m a -> (a -> StreamK m b) -> StreamK m b
- evalStateT :: Monad m => m s -> StreamK (StateT s m) a -> StreamK m a
- liftInner :: forall (m :: Type -> Type) (t :: (Type -> Type) -> Type -> Type) a. (Monad m, MonadTrans t, Monad (t m)) => StreamK m a -> StreamK (t m) a
- localReaderT :: forall r (m :: Type -> Type) a. (r -> r) -> StreamK (ReaderT r m) a -> StreamK (ReaderT r m) a
- foldlT :: forall (m :: Type -> Type) s b a. (Monad m, Monad (s m), MonadTrans s) => (s m b -> a -> s m b) -> s m b -> StreamK m a -> s m b
- foldrT :: forall (m :: Type -> Type) s a b. (Monad m, Monad (s m), MonadTrans s) => (a -> s m b -> s m b) -> s m b -> StreamK m a -> s m b
- fromStream :: forall (m :: Type -> Type) a. Monad m => Stream m a -> StreamK m a
- repeatM :: Monad m => m a -> StreamK m a
- replicate :: forall a (m :: Type -> Type). Int -> a -> StreamK m a
- replicateM :: Monad m => Int -> m a -> StreamK m a
- fromIndices :: forall a (m :: Type -> Type). (Int -> a) -> StreamK m a
- fromIndicesM :: Monad m => (Int -> m a) -> StreamK m a
- iterate :: forall a (m :: Type -> Type). (a -> a) -> a -> StreamK m a
- iterateM :: Monad m => (a -> m a) -> m a -> StreamK m a
- foldr1 :: Monad m => (a -> a -> a) -> StreamK m a -> m (Maybe a)
- fold :: Monad m => Fold m a b -> StreamK m a -> m b
- foldBreak :: Monad m => Fold m a b -> StreamK m a -> m (b, StreamK m a)
- foldEither :: Monad m => Fold m a b -> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a))
- foldConcat :: Monad m => Producer m a b -> Fold m b c -> StreamK m a -> m (c, StreamK m a)
- toParserK :: forall (m :: Type -> Type) a b. Monad m => Parser a m b -> ParserK a m b
- parseDBreak :: Monad m => Parser a m b -> StreamK m a -> m (Either ParseErrorPos b, StreamK m a)
- parseD :: Monad m => Parser a m b -> StreamK m a -> m (Either ParseErrorPos b)
- parseBreak :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseError b, StreamK m a)
- parseBreakPos :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseErrorPos b, StreamK m a)
- parse :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseError b)
- parsePos :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseErrorPos b)
- head :: Monad m => StreamK m a -> m (Maybe a)
- elem :: (Monad m, Eq a) => a -> StreamK m a -> m Bool
- notElem :: (Monad m, Eq a) => a -> StreamK m a -> m Bool
- all :: Monad m => (a -> Bool) -> StreamK m a -> m Bool
- any :: Monad m => (a -> Bool) -> StreamK m a -> m Bool
- last :: Monad m => StreamK m a -> m (Maybe a)
- minimum :: (Monad m, Ord a) => StreamK m a -> m (Maybe a)
- minimumBy :: Monad m => (a -> a -> Ordering) -> StreamK m a -> m (Maybe a)
- maximum :: (Monad m, Ord a) => StreamK m a -> m (Maybe a)
- maximumBy :: Monad m => (a -> a -> Ordering) -> StreamK m a -> m (Maybe a)
- findIndices :: forall a (m :: Type -> Type). (a -> Bool) -> StreamK m a -> StreamK m Int
- lookup :: (Monad m, Eq a) => a -> StreamK m (a, b) -> m (Maybe b)
- findM :: Monad m => (a -> m Bool) -> StreamK m a -> m (Maybe a)
- find :: Monad m => (a -> Bool) -> StreamK m a -> m (Maybe a)
- (!!) :: Monad m => StreamK m a -> Int -> m (Maybe a)
- toList :: Monad m => StreamK m a -> m [a]
- toStream :: forall (m :: Type -> Type) a. Applicative m => StreamK m a -> Stream m a
- mapM_ :: Monad m => (a -> m b) -> StreamK m a -> m ()
- scanl' :: forall b a (m :: Type -> Type). (b -> a -> b) -> b -> StreamK m a -> StreamK m b
- scanlx' :: forall x a b (m :: Type -> Type). (x -> a -> x) -> x -> (x -> b) -> StreamK m a -> StreamK m b
- filter :: forall a (m :: Type -> Type). (a -> Bool) -> StreamK m a -> StreamK m a
- take :: forall (m :: Type -> Type) a. Int -> StreamK m a -> StreamK m a
- takeWhile :: forall a (m :: Type -> Type). (a -> Bool) -> StreamK m a -> StreamK m a
- drop :: forall (m :: Type -> Type) a. Int -> StreamK m a -> StreamK m a
- dropWhile :: forall a (m :: Type -> Type). (a -> Bool) -> StreamK m a -> StreamK m a
- mapM :: Monad m => (a -> m b) -> StreamK m a -> StreamK m b
- sequence :: Monad m => StreamK m (m a) -> StreamK m a
- intersperseM :: Monad m => m a -> StreamK m a -> StreamK m a
- intersperse :: forall (m :: Type -> Type) a. Monad m => a -> StreamK m a -> StreamK m a
- insertBy :: forall a (m :: Type -> Type). (a -> a -> Ordering) -> a -> StreamK m a -> StreamK m a
- deleteBy :: forall a (m :: Type -> Type). (a -> a -> Bool) -> a -> StreamK m a -> StreamK m a
- sortBy :: forall (m :: Type -> Type) a. Monad m => (a -> a -> Ordering) -> StreamK m a -> StreamK m a
- sortOn :: forall (m :: Type -> Type) b a. (Monad m, Ord b) => (a -> b) -> StreamK m a -> StreamK m a
- mapMaybe :: forall a b (m :: Type -> Type). (a -> Maybe b) -> StreamK m a -> StreamK m b
- zipWith :: forall (m :: Type -> Type) a b c. Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c
- zipWithM :: Monad m => (a -> b -> m c) -> StreamK m a -> StreamK m b -> StreamK m c
- mergeBy :: forall a (m :: Type -> Type). (a -> a -> Ordering) -> StreamK m a -> StreamK m a -> StreamK m a
- mergeByM :: Monad m => (a -> a -> m Ordering) -> StreamK m a -> StreamK m a -> StreamK m a
- the :: (Eq a, Monad m) => StreamK m a -> m (Maybe a)
- morphInner :: (Monad m, Monad n) => (forall x. m x -> n x) -> StreamK m a -> StreamK n a
- handle :: (MonadCatch m, Exception e) => (e -> m (StreamK m a)) -> StreamK m a -> StreamK m a
- bracketIO :: forall (m :: Type -> Type) b c a. (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> StreamK m a) -> StreamK m a
- hoist :: (Monad m, Monad n) => (forall x. m x -> n x) -> StreamK m a -> StreamK n a
- parseBreakChunks :: (Monad m, Unbox a) => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b, StreamK m (Array a))
- parseChunks :: (Monad m, Unbox a) => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b)
- parseBreakChunksGeneric :: Monad m => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b, StreamK m (Array a))
- parseChunksGeneric :: Monad m => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b)
Setup
To execute the code examples provided in this module in ghci, please run the following commands first.
>>>
:m
>>>
import Control.Concurrent (threadDelay)
>>>
import Data.Function (fix, (&))
>>>
import Data.Semigroup (cycle1)
>>>
import Streamly.Data.StreamK (StreamK)
>>>
import qualified Streamly.Data.Fold as Fold
>>>
import qualified Streamly.Data.Parser as Parser
>>>
import qualified Streamly.Data.Stream as Stream
>>>
import qualified Streamly.Data.StreamK as StreamK
>>>
import qualified Streamly.FileSystem.DirIO as Dir
>>>
mk = StreamK.fromStream . Stream.fromList
>>>
un = Stream.toList . StreamK.toStream
>>>
effect n = print n >> return n
For APIs that have not been released yet.
>>>
import qualified Streamly.Internal.FileSystem.Path as Path
>>>
import qualified Streamly.Internal.Data.StreamK as StreamK
>>>
import qualified Streamly.Internal.FileSystem.DirIO as Dir
repeat :: forall a (m :: Type -> Type). a -> StreamK m a Source #
Generate an infinite stream by repeating a pure value.
>>>
repeat x = let xs = StreamK.cons x xs in xs
Pre-release
mfix :: Monad m => (m a -> StreamK m a) -> StreamK m a Source #
We can define cyclic structures using let
:
>>>
:set -fno-warn-unrecognised-warning-flags
>>>
:set -fno-warn-x-partial
>>>
let (a, b) = ([1, b], head a) in (a, b)
([1,1],1)
The function fix
defined as:
>>>
fix f = let x = f x in x
ensures that the argument of a function and its output refer to the same
lazy value x
i.e. the same location in memory. Thus x
can be defined
in terms of itself, creating structures with cyclic references.
>>>
f ~(a, b) = ([1, b], head a)
>>>
fix f
([1,1],1)
mfix
is essentially the same as fix
but for monadic
values.
Using mfix
for streams we can construct a stream in which each element of
the stream is defined in a cyclic fashion. The argument of the function
being fixed represents the current element of the stream which is being
returned by the stream monad. Thus, we can use the argument to construct
itself.
In the following example, the argument action
of the function f
represents the tuple (x,y)
returned by it in a given iteration. We define
the first element of the tuple in terms of the second.
>>>
import System.IO.Unsafe (unsafeInterleaveIO)
>>>
:{
main = Stream.fold (Fold.drainMapM print) $ StreamK.toStream $ StreamK.mfix f where f action = StreamK.unNested $ do let incr n act = fmap ((+n) . snd) $ unsafeInterleaveIO act x <- StreamK.Nested $ StreamK.fromStream $ Stream.sequence $ Stream.fromList [incr 1 action, incr 2 action] y <- StreamK.Nested $ StreamK.fromStream $ Stream.fromList [4,5] return (x, y) :}
Note: you cannot achieve this by just changing the order of the monad statements because that would change the order in which the stream elements are generated.
Note that the function f
must be lazy in its argument, that's why we use
unsafeInterleaveIO
on action
because IO monad is strict.
Pre-release
unfoldr :: forall b a (m :: Type -> Type). (b -> Maybe (a, b)) -> b -> StreamK m a Source #
>>>
:{
unfoldr step s = case step s of Nothing -> StreamK.nil Just (a, b) -> a `StreamK.cons` unfoldr step b :}
Build a stream by unfolding a pure step function step
starting from a
seed s
. The step function returns the next element in the stream and the
next seed value. When it is done it returns Nothing
and the stream ends.
For example,
>>>
:{
let f b = if b > 2 then Nothing else Just (b, b + 1) in StreamK.toList $ StreamK.unfoldr f 0 :} [0,1,2]
tail :: Applicative m => StreamK m a -> m (Maybe (StreamK m a)) Source #
Same as:
>>>
tail = fmap (fmap snd) . StreamK.uncons
init :: Applicative m => StreamK m a -> m (Maybe (StreamK m a)) Source #
Extract all but the last element of the stream, if any. This will end up evaluating the last element as well to find out that it is last.
Pre-release
concatMap :: forall a (m :: Type -> Type) b. (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #
If total iterations are kept the same, each increase in the nesting level increases the cost by roughly 1.5 times.
foldrM :: (a -> m b -> m b) -> m b -> StreamK m a -> m b Source #
Lazy right fold with a monadic step function.
type Stream = StreamK Source #
Deprecated: Please use StreamK instead.
Continuation Passing Style (CPS) version of Streamly.Data.Stream.Stream.
Unlike Streamly.Data.Stream.Stream, StreamK
can be composed recursively
without affecting performance.
Semigroup instance appends two streams:
>>>
(<>) = Stream.append
cons :: forall a (m :: Type -> Type). a -> StreamK m a -> StreamK m a infixr 5 Source #
A right associative prepend operation to add a pure value at the head of an existing stream:
>>>
s = 1 `StreamK.cons` 2 `StreamK.cons` 3 `StreamK.cons` StreamK.nil
>>>
Stream.fold Fold.toList (StreamK.toStream s)
[1,2,3]
Unlike Streamly.Data.Stream cons StreamK cons can be used recursively:
>>>
repeat x = let xs = StreamK.cons x xs in xs
>>>
fromFoldable = Prelude.foldr StreamK.cons StreamK.nil
cons is same as the following but more efficient:
>>>
cons x xs = return x `StreamK.consM` xs
append :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a -> StreamK m a infixr 6 Source #
Unlike the fused Streamly.Data.Stream append, StreamK append can be used at scale, recursively, with linear performance:
>>>
cycle xs = let ys = xs `StreamK.append` ys in ys
concatMapWith
append
(same as concatMap) flattens a stream of streams in a
depth-first manner i.e. it yields each stream fully and then the next and so
on. Given a stream of three streams:
1. [1,2,3] 2. [4,5,6] 3. [7,8,9]
The resulting stream will be [1,2,3,4,5,6,7,8,9]
.
Best used in a right associative manner.
nil :: forall (m :: Type -> Type) a. StreamK m a Source #
A stream that terminates without producing any output or side effect.
>>>
Stream.fold Fold.toList (StreamK.toStream StreamK.nil)
[]
newtype StreamK (m :: Type -> Type) a Source #
Constructors
MkStream (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) |
Instances
nilM :: Applicative m => m b -> StreamK m a Source #
A stream that terminates without producing any output, but produces a side effect.
>>>
Stream.fold Fold.toList (StreamK.toStream (StreamK.nilM (print "nil")))
"nil" []
Pre-release
consM :: Monad m => m a -> StreamK m a -> StreamK m a infixr 5 Source #
A right associative prepend operation to add an effectful value at the head of an existing stream::
>>>
s = putStrLn "hello" `StreamK.consM` putStrLn "world" `StreamK.consM` StreamK.nil
>>>
Stream.fold Fold.drain (StreamK.toStream s)
hello world
It can be used efficiently with foldr
:
>>>
fromFoldableM = Prelude.foldr StreamK.consM StreamK.nil
Same as the following but more efficient:
>>>
consM x xs = StreamK.fromEffect x `StreamK.append` xs
before :: Monad m => m b -> StreamK m a -> StreamK m a Source #
Run an action before evaluating the stream.
mkStream :: (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a Source #
foldlM' :: Monad m => (b -> a -> m b) -> m b -> StreamK m a -> m b Source #
Like foldl'
but with a monadic step function.
fromEffect :: Monad m => m a -> StreamK m a Source #
unfoldrM :: Monad m => (b -> m (Maybe (a, b))) -> b -> StreamK m a Source #
Build a stream by unfolding a monadic step function starting from a
seed. The step function returns the next element in the stream and the next
seed value. When it is done it returns Nothing
and the stream ends. For
example,
>>>
:{
let f b = if b > 2 then return Nothing else return (Just (b, b + 1)) in StreamK.toList $ StreamK.unfoldrM f 0 :} [0,1,2]
interleave :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a -> StreamK m a infixr 6 Source #
Interleave two streams fairly, yielding one item from each in a round-robin fashion:
>>>
un $ StreamK.interleave (mk [1,3,5]) (mk [2,4,6])
[1,2,3,4,5,6]>>>
un $ StreamK.interleave (mk [1,3]) (mk [2,4,6])
[1,2,3,4,6]>>>
un $ StreamK.interleave (mk []) (mk [2,4,6])
[2,4,6]
interleave
is right associative when used as an infix operator.
>>>
un $ mk [1,2,3] `StreamK.interleave` mk [4,5,6] `StreamK.interleave` mk [7,8,9]
[1,4,2,7,3,5,8,6,9]
Because of right association, the first stream yields as many items as the next two streams combined.
Be careful when refactoring code involving a chain of three or more
interleave
operations as it is not associative i.e. right associated code
may not produce the same result as left associated. This is a direct
consequence of the disbalance of scheduling in the previous example. If left
associated the above example would produce:
>>>
un $ (mk [1,2,3] `StreamK.interleave` mk [4,5,6]) `StreamK.interleave` mk [7,8,9]
[1,7,4,8,2,9,5,3,6]
Note: Use concatMap based interleaving instead of the binary operator to interleave more than two streams to avoid associativity issues.
concatMapWith
interleave
flattens a stream of streams using interleave
in a right associative manner. Given a stream of three streams:
1. [1,2,3] 2. [4,5,6] 3. [7,8,9]
The resulting sequence is [1,4,2,7,3,5,8,6,9]
.
For this reason, the right associated flattening with interleave
can work
with infinite number of streams without opening too many streams at the same
time. Each stream is consumed twice as much as the next one; if we are
combining an infinite number of streams of size n
then at most log n
streams will be opened at any given time, because the first stream will
finish by the time the stream after log n
th stream is opened.
Compare with bfsConcatMap
and mergeMapWith
interleave
.
For interleaving many streams, the best way is to use bfsConcatMap
.
See also the fused version interleave
.
crossWith :: forall (m :: Type -> Type) a b c. Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c Source #
Definition:
>>>
crossWith f m1 m2 = fmap f m1 `StreamK.crossApply` m2
Note that the second stream is evaluated multiple times.
cross :: forall (m :: Type -> Type) a b. Monad m => StreamK m a -> StreamK m b -> StreamK m (a, b) Source #
Given a StreamK m a
and StreamK m b
generate a stream with all possible
combinations of the tuple (a, b)
.
Definition:
>>>
cross = StreamK.crossWith (,)
The second stream is evaluated multiple times. If that is not desired it can
be cached in an Array
and then generated from the array before
calling this function. Caching may also improve performance if the stream is
expensive to evaluate.
See cross
for a much faster fused
alternative.
Time: O(m x n)
Pre-release
fairConcatMap :: forall a (m :: Type -> Type) b. (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #
See fairConcatFor
for detailed documentation.
>>>
fairConcatMap = flip StreamK.fairConcatFor
concatFor :: forall (m :: Type -> Type) a b. StreamK m a -> (a -> StreamK m b) -> StreamK m b Source #
Map a stream generating function on each element of a stream and
concatenate the results. This is the same as the bind function of the monad
instance. It is just a flipped concatMap
but more convenient to use for
nested use case, feels like an imperative for
loop.
>>>
concatFor = flip StreamK.concatMap
A concatenating for
loop:
>>>
:{
un $ StreamK.concatFor (mk [1,2,3]) $ \x -> StreamK.fromPure x :} [1,2,3]
Nested concatenating for
loops:
>>>
:{
un $ StreamK.concatFor (mk [1,2,3]) $ \x -> StreamK.concatFor (mk [4,5,6]) $ \y -> StreamK.fromPure (x, y) :} [(1,4),(1,5),(1,6),(2,4),(2,5),(2,6),(3,4),(3,5),(3,6)]
fairConcatFor :: forall (m :: Type -> Type) a b. StreamK m a -> (a -> StreamK m b) -> StreamK m b Source #
fairConcatFor
is like concatFor
but traverses the depth and breadth of
nesting equally. Therefore, the outer and the inner loops in a nested loop
get equal priority. It can be used to nest infinite streams without starving
outer streams due to inner ones.
Given a stream of three streams:
1. [1,2,3] 2. [4,5,6] 3. [7,8,9]
Here, outer loop is the stream of streams and the inner loops are the
individual streams. The traversal sweeps the diagonals in the above grid to
give equal chance to outer and inner loops. The resulting stream is
(1),(2,4),(3,5,7),(6,8),(9)
, diagonals are parenthesized for emphasis.
Looping
A single stream case is equivalent to concatFor
:
>>>
un $ StreamK.fairConcatFor (mk [1,2]) $ \x -> StreamK.fromPure x
[1,2]
Fair Nested Looping
Multiple streams nest like for
loops. The result is a cross product of the
streams. However, the ordering of the results of the cross product is such
that each stream gets consumed equally. In other words, inner iterations of
a nested loop get the same priority as the outer iterations. Inner
iterations do not finish completely before the outer iterations start.
>>>
:{
un $ do StreamK.fairConcatFor (mk [1,2,3]) $ \x -> StreamK.fairConcatFor (mk [4,5,6]) $ \y -> StreamK.fromPure (x, y) :} [(1,4),(1,5),(2,4),(1,6),(2,5),(3,4),(2,6),(3,5),(3,6)]
Nesting Infinite Streams
Example with infinite streams. Print all pairs in the cross product with sum less than a specified number.
>>>
:{
Stream.toList $ Stream.takeWhile (\(x,y) -> x + y < 6) $ StreamK.toStream $ StreamK.fairConcatFor (mk [1..]) $ \x -> StreamK.fairConcatFor (mk [1..]) $ \y -> StreamK.fromPure (x, y) :} [(1,1),(1,2),(2,1),(1,3),(2,2),(3,1),(1,4),(2,3),(3,2),(4,1)]
How the nesting works?
If we look at the cross product of [1,2,3], [4,5,6], the streams being
combined using fairConcatFor
are the following sequential loop iterations:
(1,4) (1,5) (1,6) -- first iteration of the outer loop (2,4) (2,5) (2,6) -- second iteration of the outer loop (3,4) (3,5) (3,6) -- third iteration of the outer loop
The result is a triangular or diagonal traversal of these iterations:
[(1,4),(1,5),(2,4),(1,6),(2,5),(3,4),(2,6),(3,5),(3,6)]
Non-Termination Cases
If one of the two interleaved streams does not produce an output at all and continues forever then the other stream will never get scheduled. This is because a stream is unscheduled only after it produces an output. This can lead to non-terminating programs, an example is provided below.
>>>
:{
oddsIf x = mk (if x then [1,3..] else [2,4..]) filterEven x = if even x then StreamK.fromPure x else StreamK.nil :}
>>>
:{
evens = StreamK.fairConcatFor (mk [True,False]) $ \r -> StreamK.concatFor (oddsIf r) filterEven :}
The evens
function does not terminate because, when r is True, the nested
concatFor
is a non-productive infinite loop, therefore, the outer loop
never gets a chance to generate the False
value.
But the following refactoring of the above code works as expected:
>>>
:{
mixed = StreamK.fairConcatFor (mk [True,False]) $ \r -> StreamK.concatFor (oddsIf r) StreamK.fromPure :}
>>>
evens = StreamK.fairConcatFor mixed filterEven
>>>
Stream.toList $ Stream.take 3 $ StreamK.toStream evens
[2,4,6]
This works because in mixed
both the streams being interleaved are
productive.
Care should be taken how you write your program, keep in mind the scheduling implications. To avoid such scheduling problems in serial interleaving, you can use concurrent interleaving instead i.e. parFairConcatFor. Due to concurrent threads the other branch will make progress even if one is an infinite loop producing nothing.
Logic Programming
Streamly provides all operations for logic programming. It provides
functionality equivalent to LogicT
type from the logict
package.
The MonadLogic
operations can be implemented using the available stream
operations. For example, uncons
is msplit
, interleave
corresponds to
the interleave
operation of MonadLogic, fairConcatFor
is the
fair bind (>>-
) operation.
Related Operations
concatForWith
interleave
is another way to interleave two serial
streams. In this case, the inner loop iterations get exponentially more
priority over the outer iterations of the nested loop. This is biased
towards the inner loops - this is exactly how the logic-t and list-t
implementation of fair bind works.
concatForM :: Monad m => StreamK m a -> (a -> m (StreamK m b)) -> StreamK m b Source #
Like concatFor
but maps an effectful function. It allows conveniently
mixing monadic effects with streams.
>>>
import Control.Monad.IO.Class (liftIO)
>>>
:{
un $ StreamK.concatForM (mk [1,2,3]) $ \x -> do liftIO $ putStrLn (show x) pure $ StreamK.fromPure x :} 1 2 3 [1,2,3]
Nested concatentating for
loops:
>>>
:{
un $ StreamK.concatForM (mk [1,2,3]) $ \x -> do liftIO $ putStrLn (show x) pure $ StreamK.concatFor (mk [4,5,6]) $ \y -> StreamK.fromPure (x, y) :} 1 2 3 [(1,4),(1,5),(1,6),(2,4),(2,5),(2,6),(3,4),(3,5),(3,6)]
fairConcatForM :: Monad m => StreamK m a -> (a -> m (StreamK m b)) -> StreamK m b Source #
Like fairConcatFor
but maps a monadic function.
fromFoldable :: forall f a (m :: Type -> Type). Foldable f => f a -> StreamK m a Source #
>>>
fromFoldable = Prelude.foldr StreamK.cons StreamK.nil
Construct a stream from a Foldable
containing pure values:
bfsConcatMap :: forall a (m :: Type -> Type) b. (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #
See bfsConcatFor
for detailed documentation.
>>>
bfsConcatMap = flip StreamK.bfsConcatFor
concatMapWith :: forall (m :: Type -> Type) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #
Perform a concatMap
using a specified concat strategy. The first
argument specifies a merge or concat function that is used to merge the
streams generated by the map function.
For example, interleaving n streams in a left biased manner:
>>>
lists = mk [[1,5],[2,6],[3,7],[4,8]]
>>>
un $ StreamK.concatMapWith StreamK.interleave mk lists
[1,2,5,3,6,4,7,8]
For a fair interleaving example see bfsConcatMap
and mergeMapWith
.
bfsConcatFor :: forall (m :: Type -> Type) a b. StreamK m a -> (a -> StreamK m b) -> StreamK m b Source #
While concatFor
flattens a stream of streams in a depth first manner,
bfsConcatFor
flattens it in a breadth-first manner. It yields one item
from the first stream, then one item from the next stream and so on. In
nested loops it has the effect of prioritizing the new outer loop iteration
over the inner loops, thus inverting the looping.
Given a stream of three streams:
1. [1,2,3] 2. [4,5,6] 3. [7,8,9]
The resulting stream is (1,4,7),(2,5,8),(3,6,9)
. The parenthesis are added
to emphasize the iterations.
For example:
>>>
stream = mk [[1,2,3],[4,5,6],[7,8,9]]
>>>
:{
un $ StreamK.bfsConcatFor stream $ \x -> StreamK.fromStream $ Stream.fromList x :} [1,4,7,2,5,8,3,6,9]
Compare with concatForWith
interleave
which explores the depth
exponentially more compared to the breadth, such that each stream yields
twice as many items compared to the next stream.
See also the equivalent fused version unfoldEachInterleave
.
bfsConcatForM :: Monad m => StreamK m a -> (a -> m (StreamK m b)) -> StreamK m b Source #
Like bfsConcatFor
but maps a monadic function.
mergeMapWith :: forall (m :: Type -> Type) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #
Combine streams in pairs using a binary combinator, the resulting streams are then combined again in pairs recursively until we get to a single combined stream. The composition would thus form a binary tree.
For example, 'mergeMapWith interleave' gives the following result:
>>>
lists = mk [[1,2,3],[4,5,6],[7,8,9],[10,11,12]]
>>>
un $ StreamK.mergeMapWith StreamK.interleave mk lists
[1,7,4,10,2,8,5,11,3,9,6,12]
The above example is equivalent to the following pairings:
>>>
pair1 = mk [1,2,3] `StreamK.interleave` mk [4,5,6]
>>>
pair2 = mk [7,8,9] `StreamK.interleave` mk [10,11,12]
>>>
un $ pair1 `StreamK.interleave` pair2
[1,7,4,10,2,8,5,11,3,9,6,12]
If the number of streams being combined is not a power of 2, the binary tree composed by mergeMapWith is not balanced, therefore, the output may not look fairly interleaved, it will be biased towards the unpaired streams:
>>>
lists = mk [[1,2,3],[4,5,6],[7,8,9]]
>>>
un $ StreamK.mergeMapWith StreamK.interleave mk lists
[1,7,4,8,2,9,5,3,6]
An efficient merge sort can be implemented by using mergeBy
as the
combining function:
>>>
combine = StreamK.mergeBy compare
>>>
un $ StreamK.mergeMapWith combine StreamK.fromPure (mk [5,1,7,9,2])
[1,2,5,7,9]
Caution: the stream of streams must be finite
Pre-release
newtype Nested (m :: Type -> Type) a Source #
Nested
is a list-transformer monad, it serves the same purpose as the
ListT
type from the list-t
package. It is similar to the standard
Haskell lists' monad instance. Nested
monad behaves like nested for
loops
implementing a computation based on a cross product over multiple streams.
>>>
mk = StreamK.Nested . StreamK.fromStream . Stream.fromList
>>>
un = Stream.toList . StreamK.toStream . StreamK.unNested
Looping
In the following code the variable x
assumes values of the elements of the
stream one at a time and runs the code that follows; using that value. It is
equivalent to a for
loop:
>>>
:{
un $ do x <- mk [1,2,3] -- for each element in the stream return x :} [1,2,3]
Nested Looping
Multiple streams can be nested like nested for
loops. The result is a
cross product of the streams.
>>>
:{
un $ do x <- mk [1,2,3] -- outer loop, for each element in the stream y <- mk [4,5,6] -- inner loop, for each element in the stream return (x, y) :} [(1,4),(1,5),(1,6),(2,4),(2,5),(2,6),(3,4),(3,5),(3,6)]
Note that an infinite stream in an inner loop will block the outer streams from moving to the next iteration.
How it works?
The bind operation of the monad is flipped concatMapWith
append
. The
concatMap operation maps the lines involving y as a function of x over the
stream [1,2,3]. The streams generated so are combined using the append
operation. If we desugar the above monad code using bind explicitly, it
becomes clear how it works:
>>>
import Streamly.Internal.Data.StreamK (Nested(..))
>>>
(Nested m) >>= f = Nested $ StreamK.concatMapWith StreamK.append (unNested . f) m
>>>
un (mk [1,2,3] >>= (\x -> (mk [4,5,6] >>= \y -> return (x,y))))
[(1,4),(1,5),(1,6),(2,4),(2,5),(2,6),(3,4),(3,5),(3,6)]
You can achieve the looping and nested looping by directly using concatMap but the monad and the "do notation" gives you better ergonomics.
Interleaving of loop iterations
If we look at the cross product of [1,2,3], [4,5,6], the streams being
combined using append
are the for
loop iterations as follows:
(1,4) (1,5) (1,6) -- first iteration of the outer loop (2,4) (2,5) (2,6) -- second iteration of the outer loop (3,4) (3,5) (3,6) -- third iteration of the outer loop
The result is equivalent to sequentially appending all the iterations of the
nested for
loop:
[(1,4),(1,5),(1,6),(2,4),(2,5),(2,6),(3,4),(3,5),(3,6)]
Logic Programming
Nested
also serves the purpose of LogicT
type from the logict
package.
The MonadLogic
operations can be implemented using the available stream
operations. For example, uncons
is msplit
, interleave
corresponds to
the interleave
operation of MonadLogic, fairConcatFor
is the
fair bind (>>-
) operation. The FairNested
type provides a monad with fair
bind.
Related Functionality
A custom type can be created using bfsConcatMap
as the monad bind
operation then the nested loops would get inverted - the innermost loop
becomes the outermost and vice versa.
See FairNested
if you want all the streams to get equal chance to execute
even if they are infinite.
Instances
MonadTrans Nested Source # | |
Defined in Streamly.Internal.Data.StreamK.Type | |
Monad m => MonadFail (Nested m) Source # | |
Defined in Streamly.Internal.Data.StreamK.Type | |
MonadIO m => MonadIO (Nested m) Source # | |
Defined in Streamly.Internal.Data.StreamK.Type | |
(Foldable m, Monad m) => Foldable (Nested m) Source # | |
Defined in Streamly.Internal.Data.StreamK.Type Methods fold :: Monoid m0 => Nested m m0 -> m0 # foldMap :: Monoid m0 => (a -> m0) -> Nested m a -> m0 # foldMap' :: Monoid m0 => (a -> m0) -> Nested m a -> m0 # foldr :: (a -> b -> b) -> b -> Nested m a -> b # foldr' :: (a -> b -> b) -> b -> Nested m a -> b # foldl :: (b -> a -> b) -> b -> Nested m a -> b # foldl' :: (b -> a -> b) -> b -> Nested m a -> b # foldr1 :: (a -> a -> a) -> Nested m a -> a # foldl1 :: (a -> a -> a) -> Nested m a -> a # elem :: Eq a => a -> Nested m a -> Bool # maximum :: Ord a => Nested m a -> a # minimum :: Ord a => Nested m a -> a # | |
Traversable (Nested Identity) Source # | |
Defined in Streamly.Internal.Data.StreamK.Type Methods traverse :: Applicative f => (a -> f b) -> Nested Identity a -> f (Nested Identity b) # sequenceA :: Applicative f => Nested Identity (f a) -> f (Nested Identity a) # mapM :: Monad m => (a -> m b) -> Nested Identity a -> m (Nested Identity b) # sequence :: Monad m => Nested Identity (m a) -> m (Nested Identity a) # | |
(Monad m, Functor m) => Alternative (Nested m) Source # | |
Monad m => Applicative (Nested m) Source # | |
Monad m => Functor (Nested m) Source # | |
Monad m => Monad (Nested m) Source # | |
Monad m => MonadPlus (Nested m) Source # | |
MonadThrow m => MonadThrow (Nested m) Source # | |
Defined in Streamly.Internal.Data.StreamK.Type Methods throwM :: (HasCallStack, Exception e) => e -> Nested m a # | |
a ~ Char => IsString (Nested Identity a) Source # | |
Defined in Streamly.Internal.Data.StreamK.Type Methods fromString :: String -> Nested Identity a # | |
Monoid (Nested m a) Source # | |
Semigroup (Nested m a) Source # | |
IsList (Nested Identity a) Source # | |
Defined in Streamly.Internal.Data.StreamK.Type | |
Read a => Read (Nested Identity a) Source # | |
Show a => Show (Nested Identity a) Source # | |
type Item (Nested Identity a) Source # | |
interleaveEndBy' :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a -> StreamK m a Source #
Examples:
>>>
fromList = StreamK.fromStream . Stream.fromList
>>>
toList = Stream.toList . StreamK.toStream
>>>
f x y = toList $ StreamK.interleaveEndBy' (fromList x) (fromList y)
>>>
f "..." "abc"
"a.b.c.">>>
f "..." "ab"
"a.b."
Currently broken, generates an additional element at the end::
> f ".." "abc"
"a.b."
interleaveMin :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a -> StreamK m a infixr 6 Source #
Deprecated: Please use flip interleaveEndBy' instead.
interleaveFst :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a -> StreamK m a infixr 6 Source #
Deprecated: Please use flip interleaveSepBy instead.
foldrS :: forall a (m :: Type -> Type) b. (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b Source #
Right fold to a streaming monad.
foldrS StreamK.cons StreamK.nil === id
foldrS
can be used to perform stateless stream to stream transformations
like map and filter in general. It can be coupled with a scan to perform
stateful transformations. However, note that the custom map and filter
routines can be much more efficient than this due to better stream fusion.
>>>
input = StreamK.fromStream $ Stream.fromList [1..5]
>>>
Stream.fold Fold.toList $ StreamK.toStream $ StreamK.foldrS StreamK.cons StreamK.nil input
[1,2,3,4,5]
Find if any element in the stream is True
:
>>>
step x xs = if odd x then StreamK.fromPure True else xs
>>>
input = StreamK.fromStream (Stream.fromList (2:4:5:undefined)) :: StreamK IO Int
>>>
Stream.fold Fold.toList $ StreamK.toStream $ StreamK.foldrS step (StreamK.fromPure False) input
[True]
Map (+2) on odd elements and filter out the even elements:
>>>
step x xs = if odd x then (x + 2) `StreamK.cons` xs else xs
>>>
input = StreamK.fromStream (Stream.fromList [1..5]) :: StreamK IO Int
>>>
Stream.fold Fold.toList $ StreamK.toStream $ StreamK.foldrS step StreamK.nil input
[3,5,7]
Pre-release
foldlS :: forall (m :: Type -> Type) b a. (StreamK m b -> a -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b Source #
Lazy left fold to a stream.
initNonEmpty :: forall (m :: Type -> Type) a. Stream m a -> Stream m a Source #
init for non-empty streams, fails for empty stream case.
See also init
for a non-partial version of this function..
tailNonEmpty :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a Source #
tail for non-empty streams, fails for empty stream case.
See also tail
for a non-partial version of this function..
Note: this is same as "drop 1" with error on empty stream.
foldlx' :: forall m a b x. Monad m => (x -> a -> x) -> x -> (x -> b) -> StreamK m a -> m b Source #
Strict left fold with an extraction function. Like the standard strict
left fold, but applies a user supplied extraction function (the third
argument) to the folded value at the end. This is designed to work with the
foldl
library. The suffix x
is a mnemonic for extraction.
Note that the accumulator is always evaluated including the initial value.
foldlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> StreamK m a -> m b Source #
Like foldx
, but with a monadic step function.
crossApply :: forall (m :: Type -> Type) a b. StreamK m (a -> b) -> StreamK m a -> StreamK m b Source #
Apply a stream of functions to a stream of values and flatten the results.
Note that the second stream is evaluated multiple times.
Definition:
>>>
crossApply = StreamK.crossApplyWith StreamK.append
>>>
crossApply = Stream.crossWith id
unCross :: forall (m :: Type -> Type) a. CrossStreamK m a -> StreamK m a Source #
Unwrap the StreamK
type from CrossStreamK
newtype.
This is a type level operation with no runtime overhead.
newtype FairNested (m :: Type -> Type) a Source #
FairNested
is like the Nested
type but explores the depth and breadth of
the cross product grid equally, so that each of the stream being crossed is
consumed equally. It can be used to nest infinite streams without starving
one due to the other.
>>>
mk = StreamK.FairNested . StreamK.fromStream . Stream.fromList
>>>
un = Stream.toList . StreamK.toStream . StreamK.unFairNested
Looping
A single stream case is equivalent to Nested
, it is a simple for
loop
over the stream:
>>>
:{
un $ do x <- mk [1,2] -- for each element in the stream return x :} [1,2]
Fair Nested Looping
Multiple streams nest like for
loops. The result is a cross product of the
streams. However, the ordering of the results of the cross product is such
that each stream gets consumed equally. In other words, inner iterations of
a nested loop get the same priority as the outer iterations. Inner
iterations do not finish completely before the outer iterations start.
>>>
:{
un $ do x <- mk [1,2,3] -- outer, for each element in the stream y <- mk [4,5,6] -- inner, for each element in the stream -- Perform the following actions for each x, for each y return (x, y) :} [(1,4),(1,5),(2,4),(1,6),(2,5),(3,4),(2,6),(3,5),(3,6)]
Nesting Infinite Streams
Example with infinite streams. Print all pairs in the cross product with sum less than a specified number.
>>>
:{
Stream.toList $ Stream.takeWhile (\(x,y) -> x + y < 6) $ StreamK.toStream $ StreamK.unFairNested $ do x <- mk [1..] -- infinite stream y <- mk [1..] -- infinite stream return (x, y) :} [(1,1),(1,2),(2,1),(1,3),(2,2),(3,1),(1,4),(2,3),(3,2),(4,1)]
How it works?
FairNested
uses fairConcatFor
as the monad bind operation.
If we look at the cross product of [1,2,3], [4,5,6], the streams being
combined using concatMapDigaonal
are the sequential loop iterations:
(1,4) (1,5) (1,6) -- first iteration of the outer loop (2,4) (2,5) (2,6) -- second iteration of the outer loop (3,4) (3,5) (3,6) -- third iteration of the outer loop
The result is a triangular or diagonal traversal of these iterations:
[(1,4),(1,5),(2,4),(1,6),(2,5),(3,4),(2,6),(3,5),(3,6)]
Associativity Issues
WARNING! The FairNested monad breaks the associativity law intentionally for
usefulness, it is associative only up to permutation equivalence. In this
monad the association order of statements might make a difference to the
ordering of the results because of changing the way in which streams are
scheduled. The same issues arise when you use the interleave
operation
directly, association order matters - however, here it can be more subtle as
the programmer may not see it directly.
>>>
un (mk [1,2] >>= (\x -> mk [x, x + 1] >>= (\y -> mk [y, y + 2])))
[1,3,2,2,4,4,3,5]>>>
un ((mk [1,2] >>= (\x -> mk [x, x + 1])) >>= (\y -> mk [y, y + 2]))
[1,3,2,4,2,4,3,5]
This type is designed to be used for use cases where ordering of results does not matter, we want to explore different streams to find specific results, but the order in which we find or present the results may not be important. Re-association of statements in this monad may change how different branches are scheduled, which may change the scheduling priority of some streams over others, this may end up starving some branches - in the worst case some branches may be fully starved by some infinite branches producing nothing - resulting in a non-terminating program.
Non-Termination Cases
If an infinite stream that does not produce a value at all is interleaved with another stream then the entire computation gets stuck forever because the interleave operation schedules the second stream only after the first stream yields a value. This can lead to non-terminating programs, an example is provided below.
>>>
:{
toS = StreamK.toStream . StreamK.unFairNested odds x = mk (if x then [1,3..] else [2,4..]) filterEven x = if even x then pure x else StreamK.FairNested StreamK.nil :}
When writing code with do notation, keep in mind that when we bind a variable to a monadic value, all the following code that depends on this variable is associated together and connected to it via a monad bind. Consider the following code:
>>>
:{
evens = toS $ do r <- mk [True,False] -- The next two statements depending on the variable r are associated -- together and bound to the previous line using a monad bind. x <- odds r filterEven x :}
This code does not terminate because, when r is True, odds
and
filterEven
together constitute an infinite inner loop, coninuously working
but not yielding any value at all, this stream is interleaved with the outer
loop, therefore, the outer loop does not get a chance to move to the next
iteration.
But the following code works as expected:
>>>
:{
evens = toS $ do x <- mk [True,False] >>= odds filterEven x :}
>>>
Stream.toList $ Stream.take 3 $ evens
[2,4,6]
This works because both the lists being interleaved continue to produce values in the outer loop and the inner loop keeps filtering them.
Care should be taken how you write your program, keep in mind the scheduling
implications. To avoid such scheduling problems in the serial FairNested type
use the concurrent version i.e. FairParallel described in
MkType
module. Due to concurrent evaluation each
branch will make progress even if one is an infinite loop producing nothing.
Related Operations
We can create a custom type with concatMapWith
interleave
as the monad
bind operation then the inner loop iterations get exponentially more
priority over the outer iterations of the nested loop. This is not fully
fair, it is biased - this is exactly how the logic-t and list-t
implementation of fair bind works.
Constructors
FairNested | |
Fields
|
Instances
MonadTrans FairNested Source # | |||||
Defined in Streamly.Internal.Data.StreamK.Type Methods lift :: Monad m => m a -> FairNested m a # | |||||
MonadIO m => MonadIO (FairNested m) Source # | |||||
Defined in Streamly.Internal.Data.StreamK.Type Methods liftIO :: IO a -> FairNested m a # | |||||
(Foldable m, Monad m) => Foldable (FairNested m) Source # | |||||
Defined in Streamly.Internal.Data.StreamK.Type Methods fold :: Monoid m0 => FairNested m m0 -> m0 # foldMap :: Monoid m0 => (a -> m0) -> FairNested m a -> m0 # foldMap' :: Monoid m0 => (a -> m0) -> FairNested m a -> m0 # foldr :: (a -> b -> b) -> b -> FairNested m a -> b # foldr' :: (a -> b -> b) -> b -> FairNested m a -> b # foldl :: (b -> a -> b) -> b -> FairNested m a -> b # foldl' :: (b -> a -> b) -> b -> FairNested m a -> b # foldr1 :: (a -> a -> a) -> FairNested m a -> a # foldl1 :: (a -> a -> a) -> FairNested m a -> a # toList :: FairNested m a -> [a] # null :: FairNested m a -> Bool # length :: FairNested m a -> Int # elem :: Eq a => a -> FairNested m a -> Bool # maximum :: Ord a => FairNested m a -> a # minimum :: Ord a => FairNested m a -> a # sum :: Num a => FairNested m a -> a # product :: Num a => FairNested m a -> a # | |||||
Traversable (FairNested Identity) Source # | |||||
Defined in Streamly.Internal.Data.StreamK.Type Methods traverse :: Applicative f => (a -> f b) -> FairNested Identity a -> f (FairNested Identity b) # sequenceA :: Applicative f => FairNested Identity (f a) -> f (FairNested Identity a) # mapM :: Monad m => (a -> m b) -> FairNested Identity a -> m (FairNested Identity b) # sequence :: Monad m => FairNested Identity (m a) -> m (FairNested Identity a) # | |||||
Monad m => Applicative (FairNested m) Source # | |||||
Defined in Streamly.Internal.Data.StreamK.Type Methods pure :: a -> FairNested m a # (<*>) :: FairNested m (a -> b) -> FairNested m a -> FairNested m b # liftA2 :: (a -> b -> c) -> FairNested m a -> FairNested m b -> FairNested m c # (*>) :: FairNested m a -> FairNested m b -> FairNested m b # (<*) :: FairNested m a -> FairNested m b -> FairNested m a # | |||||
Monad m => Functor (FairNested m) Source # | |||||
Defined in Streamly.Internal.Data.StreamK.Type Methods fmap :: (a -> b) -> FairNested m a -> FairNested m b # (<$) :: a -> FairNested m b -> FairNested m a # | |||||
Monad m => Monad (FairNested m) Source # | |||||
Defined in Streamly.Internal.Data.StreamK.Type Methods (>>=) :: FairNested m a -> (a -> FairNested m b) -> FairNested m b # (>>) :: FairNested m a -> FairNested m b -> FairNested m b # return :: a -> FairNested m a # | |||||
MonadThrow m => MonadThrow (FairNested m) Source # | |||||
Defined in Streamly.Internal.Data.StreamK.Type Methods throwM :: (HasCallStack, Exception e) => e -> FairNested m a # | |||||
a ~ Char => IsString (FairNested Identity a) Source # | |||||
Defined in Streamly.Internal.Data.StreamK.Type Methods fromString :: String -> FairNested Identity a # | |||||
IsList (FairNested Identity a) Source # | |||||
Defined in Streamly.Internal.Data.StreamK.Type Associated Types
Methods fromList :: [Item (FairNested Identity a)] -> FairNested Identity a # fromListN :: Int -> [Item (FairNested Identity a)] -> FairNested Identity a # toList :: FairNested Identity a -> [Item (FairNested Identity a)] # | |||||
Read a => Read (FairNested Identity a) Source # | |||||
Defined in Streamly.Internal.Data.StreamK.Type Methods readsPrec :: Int -> ReadS (FairNested Identity a) # readList :: ReadS [FairNested Identity a] # readPrec :: ReadPrec (FairNested Identity a) # readListPrec :: ReadPrec [FairNested Identity a] # | |||||
Show a => Show (FairNested Identity a) Source # | |||||
Defined in Streamly.Internal.Data.StreamK.Type | |||||
type Item (FairNested Identity a) Source # | |||||
Defined in Streamly.Internal.Data.StreamK.Type |
foldStream :: State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r Source #
Fold a stream by providing a State, stop continuation, a singleton continuation and a yield continuation. The stream will not use the SVar passed via State.
foldStreamShared :: State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r Source #
Fold a stream by providing an SVar, a stop continuation, a singleton continuation and a yield continuation. The stream would share the current SVar passed via the State.
foldrSShared :: forall a (m :: Type -> Type) b. (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b Source #
Fold sharing the SVar state within the reconstructed stream
foldrSM :: Monad m => (m a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b Source #
buildS :: forall a (m :: Type -> Type). ((a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a Source #
buildM :: Monad m => (forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a Source #
buildSM :: Monad m => ((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a Source #
augmentS :: forall a (m :: Type -> Type). ((a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a Source #
augmentSM :: Monad m => ((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a Source #
unShare :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a Source #
Detach a stream from an SVar
fromStopK :: forall (m :: Type -> Type) a. StopK m -> StreamK m a Source #
Make an empty stream from a stop function.
fromYieldK :: forall (m :: Type -> Type) a. YieldK m a -> StreamK m a Source #
Make a singleton stream from a callback function. The callback function calls the one-shot yield continuation to yield an element.
consK :: forall (m :: Type -> Type) a. YieldK m a -> StreamK m a -> StreamK m a Source #
Add a yield function at the head of the stream.
(.:) :: forall a (m :: Type -> Type). a -> StreamK m a -> StreamK m a infixr 5 Source #
Operator equivalent of cons
.
> toList $ 1 .: 2 .: 3 .: nil [1,2,3]
consMBy :: Monad m => (StreamK m a -> StreamK m a -> StreamK m a) -> m a -> StreamK m a -> StreamK m a Source #
unfoldrMWith :: Monad m => (m a -> StreamK m a -> StreamK m a) -> (b -> m (Maybe (a, b))) -> b -> StreamK m a Source #
repeatMWith :: (m a -> t m a -> t m a) -> m a -> t m a Source #
Like repeatM
but takes a stream cons
operation to combine the actions
in a stream specific manner. A serial cons would repeat the values serially
while an async cons would repeat concurrently.
Pre-release
iterateMWith :: Monad m => (m a -> StreamK m a -> StreamK m a) -> (a -> m a) -> m a -> StreamK m a Source #
headNonEmpty :: Monad m => StreamK m a -> m a Source #
head for non-empty streams, fails for empty stream case.
mapMWith :: (m b -> StreamK m b -> StreamK m b) -> (a -> m b) -> StreamK m a -> StreamK m b Source #
mapMAccum :: (s -> a -> m (s, b)) -> m s -> StreamK m a -> StreamK m b Source #
A stateful map aka scan but with a slight difference.
This is similar to a scan except that instead of emitting the state it emits a separate result. This is also similar to mapAccumL but does not return the final value of the state.
Separation of state from the output makes it easier to think in terms of a shared state, and also makes it easier to keep the state fully strict and the output lazy.
Unimplemented
conjoin :: forall (m :: Type -> Type) a. Monad m => StreamK m a -> StreamK m a -> StreamK m a Source #
crossApplyWith :: forall (m :: Type -> Type) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> StreamK m (a -> b) -> StreamK m a -> StreamK m b Source #
concatMapMAccum :: (StreamK m b -> StreamK m b -> StreamK m b) -> (s -> a -> m (s, StreamK m b)) -> m s -> StreamK m a -> StreamK m b Source #
Like concatMapWith
but carries a state which can be used to share
information across multiple steps of concat.
concatSmapMWith combine f initial = concatMapWith combine id . smapM f initial
Unimplemented
concatForWith :: forall (m :: Type -> Type) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> StreamK m a -> (a -> StreamK m b) -> StreamK m b Source #
concatForWithM :: Monad m => (StreamK m b -> StreamK m b -> StreamK m b) -> StreamK m a -> (a -> m (StreamK m b)) -> StreamK m b Source #
Like concatForWith
but maps an effectful function.
concatIterateWith :: forall (m :: Type -> Type) a. (StreamK m a -> StreamK m a -> StreamK m a) -> (a -> StreamK m a) -> StreamK m a -> StreamK m a Source #
Yield an input element in the output stream, map a stream generator on it
and repeat the process on the resulting stream. Resulting streams are
flattened using the concatMapWith
combinator. This can be used for a depth
first style (DFS) traversal of a tree like structure.
Example, list a directory tree using DFS:
>>>
f = StreamK.fromStream . either (Dir.readEitherPaths id) (const Stream.nil)
>>>
input = StreamK.fromEffect (Left <$> Path.fromString ".")
>>>
ls = StreamK.concatIterateWith StreamK.append f input
Note that iterateM
is a special case of concatIterateWith
:
>>>
iterateM f = StreamK.concatIterateWith StreamK.append (StreamK.fromEffect . f) . StreamK.fromEffect
Pre-release
concatIterateLeftsWith :: forall b a c (m :: Type -> Type). b ~ Either a c => (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m b -> StreamK m b Source #
In an Either
stream iterate on Left
s. This is a special case of
concatIterateWith
:
>>>
concatIterateLeftsWith combine f = StreamK.concatIterateWith combine (either f (const StreamK.nil))
To traverse a directory tree:
>>>
input = StreamK.fromEffect (Left <$> Path.fromString ".")
>>>
ls = StreamK.concatIterateLeftsWith StreamK.append (StreamK.fromStream . Dir.readEither id) input
Pre-release
concatIterateScanWith :: Monad m => (StreamK m a -> StreamK m a -> StreamK m a) -> (b -> a -> m (b, StreamK m a)) -> m b -> StreamK m a -> StreamK m a Source #
Like iterateMap
but carries a state in the stream generation function.
This can be used to traverse graph like structures, we can remember the
visited nodes in the state to avoid cycles.
Note that a combination of iterateMap
and usingState
can also be used to
traverse graphs. However, this function provides a more localized state
instead of using a global state.
See also: mfix
Pre-release
mergeIterateWith :: forall (m :: Type -> Type) a. (StreamK m a -> StreamK m a -> StreamK m a) -> (a -> StreamK m a) -> StreamK m a -> StreamK m a Source #
Like concatIterateWith
but uses the pairwise flattening combinator
mergeMapWith
for flattening the resulting streams. This can be used for a
balanced traversal of a tree like structure.
Example, list a directory tree using balanced traversal:
>>>
f = StreamK.fromStream . either (Dir.readEitherPaths id) (const Stream.nil)
>>>
input = StreamK.fromEffect (Left <$> Path.fromString ".")
>>>
ls = StreamK.mergeIterateWith StreamK.interleave f input
Pre-release
type CrossStreamK = Nested Source #
Deprecated: Use Nested instead.
bindWith :: forall (m :: Type -> Type) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> StreamK m a -> (a -> StreamK m b) -> StreamK m b Source #
Deprecated: Please use concatForWith instead.
Transformer
liftInner :: forall (m :: Type -> Type) (t :: (Type -> Type) -> Type -> Type) a. (Monad m, MonadTrans t, Monad (t m)) => StreamK m a -> StreamK (t m) a Source #
localReaderT :: forall r (m :: Type -> Type) a. (r -> r) -> StreamK (ReaderT r m) a -> StreamK (ReaderT r m) a Source #
Modify the environment of the underlying ReaderT monad.
foldlT :: forall (m :: Type -> Type) s b a. (Monad m, Monad (s m), MonadTrans s) => (s m b -> a -> s m b) -> s m b -> StreamK m a -> s m b Source #
Lazy left fold to an arbitrary transformer monad.
foldrT :: forall (m :: Type -> Type) s a b. (Monad m, Monad (s m), MonadTrans s) => (a -> s m b -> s m b) -> s m b -> StreamK m a -> s m b Source #
Right associative fold to an arbitrary transformer monad.
From containers
Specialized Generation
repeatM :: Monad m => m a -> StreamK m a Source #
>>>
repeatM = StreamK.sequence . StreamK.repeat
>>>
repeatM = fix . StreamK.consM
>>>
repeatM = cycle1 . StreamK.fromEffect
Generate a stream by repeatedly executing a monadic action forever.
>>>
:{
repeatAction = StreamK.repeatM (threadDelay 1000000 >> print 1) & StreamK.take 10 & StreamK.fold Fold.drain :}
iterate :: forall a (m :: Type -> Type). (a -> a) -> a -> StreamK m a Source #
>>>
iterate f x = x `StreamK.cons` iterate f x
Generate an infinite stream with x
as the first element and each
successive element derived by applying the function f
on the previous
element.
>>>
StreamK.toList $ StreamK.take 5 $ StreamK.iterate (+1) 1
[1,2,3,4,5]
iterateM :: Monad m => (a -> m a) -> m a -> StreamK m a Source #
>>>
iterateM f m = m >>= \a -> return a `StreamK.consM` iterateM f (f a)
Generate an infinite stream with the first element generated by the action
m
and each successive element derived by applying the monadic function
f
on the previous element.
>>>
:{
StreamK.iterateM (\x -> print x >> return (x + 1)) (return 0) & StreamK.take 3 & StreamK.toList :} 0 1 [0,1,2]
Elimination
General Folds
fold :: Monad m => Fold m a b -> StreamK m a -> m b Source #
Fold a stream using the supplied left Fold
and reducing the resulting
expression strictly at each step. The behavior is similar to foldl'
. A
Fold
can terminate early without consuming the full stream. See the
documentation of individual Fold
s for termination behavior.
Definitions:
>>>
fold f = fmap fst . StreamK.foldBreak f
>>>
fold f = StreamK.parseD (Parser.fromFold f)
Example:
>>>
StreamK.fold Fold.sum $ StreamK.fromStream $ Stream.enumerateFromTo 1 100
5050
foldEither :: Monad m => Fold m a b -> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a)) Source #
Fold resulting in either breaking the stream or continuation of the fold. Instead of supplying the input stream in one go we can run the fold multiple times, each time supplying the next segment of the input stream. If the fold has not yet finished it returns a fold that can be run again otherwise it returns the fold result and the residual stream.
Internal
foldConcat :: Monad m => Producer m a b -> Fold m b c -> StreamK m a -> m (c, StreamK m a) Source #
Generate streams from individual elements of a stream and fold the concatenation of those streams using the supplied fold. Return the result of the fold and residual stream.
For example, this can be used to efficiently fold an Array Word8 stream using Word8 folds.
Internal
toParserK :: forall (m :: Type -> Type) a b. Monad m => Parser a m b -> ParserK a m b Source #
Convert a Parser
to ParserK
.
Pre-release
parseDBreak :: Monad m => Parser a m b -> StreamK m a -> m (Either ParseErrorPos b, StreamK m a) Source #
Run a Parser
over a stream and return rest of the Stream.
parseBreak :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseError b, StreamK m a) Source #
Similar to parseBreak
but works on singular elements.
parseBreakPos :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseErrorPos b, StreamK m a) Source #
Like parseBreak
but includes stream position information in the error
messages.
parse :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseError b) Source #
Run a ParserK
over a StreamK
. Please use parseChunks
where possible,
for better performance.
parsePos :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseErrorPos b) Source #
Like parse
but includes stream position information in the error
messages.
Specialized Folds
last :: Monad m => StreamK m a -> m (Maybe a) Source #
Extract the last element of the stream, if any.
To Containers
Map and Fold
mapM_ :: Monad m => (a -> m b) -> StreamK m a -> m () Source #
Apply a monadic action to each element of the stream and discard the output of the action.
Transformation
By folding (scans)
scanlx' :: forall x a b (m :: Type -> Type). (x -> a -> x) -> x -> (x -> b) -> StreamK m a -> StreamK m b Source #
Filtering
Mapping
Inserting
insertBy :: forall a (m :: Type -> Type). (a -> a -> Ordering) -> a -> StreamK m a -> StreamK m a Source #
Deleting
deleteBy :: forall a (m :: Type -> Type). (a -> a -> Bool) -> a -> StreamK m a -> StreamK m a Source #
Reordering
sortBy :: forall (m :: Type -> Type) a. Monad m => (a -> a -> Ordering) -> StreamK m a -> StreamK m a Source #
Sort the input stream using a supplied comparison function.
Sorting can be achieved by simply:
>>>
sortBy cmp = StreamK.mergeMapWith (StreamK.mergeBy cmp) StreamK.fromPure
However, this combinator uses a parser to first split the input stream into down and up sorted segments and then merges them to optimize sorting when pre-sorted sequences exist in the input stream.
O(n) space
sortOn :: forall (m :: Type -> Type) b a. (Monad m, Ord b) => (a -> b) -> StreamK m a -> StreamK m a Source #
Map and Filter
Zipping
zipWith :: forall (m :: Type -> Type) a b c. Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c Source #
Zipping of n
streams can be performed by combining the streams pair
wise using mergeMapWith
with O(n * log n) time complexity. If used
with concatMapWith
it will have O(n^2) performance.
Merging
mergeBy :: forall a (m :: Type -> Type). (a -> a -> Ordering) -> StreamK m a -> StreamK m a -> StreamK m a Source #
Merging of n
streams can be performed by combining the streams pair
wise using mergeMapWith
to give O(n * log n) time complexity. If used
with concatMapWith
it will have O(n^2) performance.
Transformation comprehensions
Transforming Inner Monad
Exceptions
handle :: (MonadCatch m, Exception e) => (e -> m (StreamK m a)) -> StreamK m a -> StreamK m a Source #
Like Streamly.Data.Stream.handle
but with one
significant difference, this function observes exceptions from the consumer
of the stream as well.
You can also convert StreamK
to Stream
and use exception handling from
Stream
module:
>>>
handle f s = StreamK.fromStream $ Stream.handle (\e -> StreamK.toStream (f e)) (StreamK.toStream s)
Resource Management
bracketIO :: forall (m :: Type -> Type) b c a. (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> StreamK m a) -> StreamK m a Source #
Like Streamly.Data.Stream.bracketIO
but with one
significant difference, this function observes exceptions from the consumer
of the stream as well. Therefore, it cleans up the resource promptly when
the consumer encounters an exception.
You can also convert StreamK
to Stream
and use resource handling from
Stream
module:
>>>
bracketIO bef aft bet = StreamK.fromStream $ Stream.bracketIO bef aft (StreamK.toStream . bet)
Deprecated
hoist :: (Monad m, Monad n) => (forall x. m x -> n x) -> StreamK m a -> StreamK n a Source #
Deprecated: Please use morphInner instead.
parseBreakChunks :: (Monad m, Unbox a) => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b, StreamK m (Array a)) Source #
parseChunks :: (Monad m, Unbox a) => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b) Source #
Deprecated: Use Streamly.Data.Array.parse instead
parseBreakChunksGeneric :: Monad m => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b, StreamK m (Array a)) Source #
Deprecated: Use Streamly.Data.Array.Generic.parseBreak
Similar to parseBreak
but works on generic arrays
parseChunksGeneric :: Monad m => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b) Source #
Deprecated: Use Streamly.Data.Array.Generic.parse