Copyright | (c) 2019 Composewell Technologies |
---|---|
License | BSD3 |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Streamly.Internal.Data.Pipe
Description
There are three fundamental types that make up a stream pipeline:
- Stream: sources
- Scan: transformations
- Fold: sinks
Streams are sources or producers of values, multiple sources can be merged into a single source but a source cannot be split into multiple stream sources. Folds are sinks or consumers, a stream can be split and distributed to multiple folds but the results cannot be merged back into a stream source again. Scans are simple one-to-one transformations with filtering. One element cannot be transformed to multiple elements.
The Pipe type is a super type of all the above, it is the most complex type. All of these can be represented by a pipe. A pipe can act as a source or a sink or a transformation, dynamically. A stream source can be split and distributed to multiple pipes each pipe can apply its own transform on the stream and the results can be merged back into a single pipe. Pipes can be attached to a source to produce a source or they can be attached to a fold to produce a fold, or multiple pipes can be merged or zipped into a single pipe.
import qualified Streamly.Internal.Data.Pipe as Pipe
Synopsis
- mapM :: Monad m => (a -> m b) -> Pipe m a b
- filter :: forall (m :: Type -> Type) a. Monad m => (a -> Bool) -> Pipe m a a
- map :: forall (m :: Type -> Type) a b. Monad m => (a -> b) -> Pipe m a b
- filterM :: Monad m => (a -> m Bool) -> Pipe m a a
- compose :: forall (m :: Type -> Type) b c a. Monad m => Pipe m b c -> Pipe m a b -> Pipe m a c
- data Step cs ps b
- fromStream :: forall (m :: Type -> Type) a. Monad m => Stream m a -> Pipe m () a
- fromFold :: forall (m :: Type -> Type) a b. Monad m => Fold m a b -> Pipe m a b
- data Pipe (m :: Type -> Type) a b = Pipe (cs -> a -> m (Step cs ps b)) (ps -> m (Step cs ps b)) cs
- identity :: forall (m :: Type -> Type) a. Monad m => Pipe m a a
- fromScanr :: forall (m :: Type -> Type) a b. Monad m => Scanr m a b -> Pipe m a b
- scanFold :: forall (m :: Type -> Type) a b. Monad m => Fold m a b -> Pipe m a b
- teeMerge :: forall (m :: Type -> Type) a b. Monad m => Pipe m a b -> Pipe m a b -> Pipe m a b
Documentation
mapM :: Monad m => (a -> m b) -> Pipe m a b Source #
A pipe representing mapping of a monadic action.
>>>
Stream.toList $ Stream.pipe (Pipe.mapM print) $ Stream.fromList [1..5::Int]
1 2 3 4 5 [(),(),(),(),()]
filter :: forall (m :: Type -> Type) a. Monad m => (a -> Bool) -> Pipe m a a Source #
A filtering pipe using a pure predicate.
>>>
Stream.toList $ Stream.pipe (Pipe.filter odd) $ Stream.fromList [1..5::Int]
[1,3,5]
map :: forall (m :: Type -> Type) a b. Monad m => (a -> b) -> Pipe m a b Source #
A pipe representing mapping of a pure function.
>>>
Stream.toList $ Stream.pipe (Pipe.map (+1)) $ Stream.fromList [1..5::Int]
[2,3,4,5,6]
filterM :: Monad m => (a -> m Bool) -> Pipe m a a Source #
A filtering pipe using a monadic predicate.
compose :: forall (m :: Type -> Type) b c a. Monad m => Pipe m b c -> Pipe m a b -> Pipe m a c Source #
Series composition. Compose two pipes such that the output of the second pipe is attached to the input of the first pipe.
>>>
Stream.toList $ Stream.pipe (Pipe.map (+1) >>> Pipe.map (+1)) $ Stream.fromList [1..5::Int]
[3,4,5,6,7]
fromStream :: forall (m :: Type -> Type) a. Monad m => Stream m a -> Pipe m () a Source #
Produces the stream on consuming ().
fromFold :: forall (m :: Type -> Type) a b. Monad m => Fold m a b -> Pipe m a b Source #
Create a singleton pipe from a fold.
Pipes do not support finalization yet. This does not finalize the fold when the stream stops before the fold terminates. So cannot be used on folds that require such finalization.
> Stream.toList $ Stream.pipe (Pipe.fromFold Fold.sum) $ Stream.fromList [1..5::Int]
- 15
data Pipe (m :: Type -> Type) a b Source #
Represents a stateful transformation over an input stream of values of
type a
to outputs of type b
in Monad
m
.
The constructor is Pipe consume produce initial
.
Instances
Monad m => Category (Pipe m :: Type -> Type -> Type) Source # | "." composes the pipes in series. |
Functor m => Functor (Pipe m a) Source # |
|
Monad m => Semigroup (Pipe m a b) Source # |
|
identity :: forall (m :: Type -> Type) a. Monad m => Pipe m a a Source #
An identity pipe producing the same output as input.
>>>
identity = Pipe.map Prelude.id
>>>
Stream.toList $ Stream.pipe (Pipe.identity) $ Stream.fromList [1..5::Int]
[1,2,3,4,5]
scanFold :: forall (m :: Type -> Type) a b. Monad m => Fold m a b -> Pipe m a b Source #
Pipes do not support finalization yet. This does not finalize the fold when the stream stops before the fold terminates. So cannot be used on folds that require finalization.
>>>
Stream.toList $ Stream.pipe (Pipe.scanFold Fold.sum) $ Stream.fromList [1..5::Int]
[1,3,6,10,15]
teeMerge :: forall (m :: Type -> Type) a b. Monad m => Pipe m a b -> Pipe m a b -> Pipe m a b Source #
Parallel composition. Distribute the input across two pipes and merge their outputs.
>>>
Stream.toList $ Stream.pipe (Pipe.teeMerge Pipe.identity (Pipe.map (\x -> x * x))) $ Stream.fromList [1..5::Int]
[1,1,2,4,3,9,4,16,5,25]