| Copyright | (c) Justin Le 2019 | 
|---|---|
| License | BSD3 | 
| Maintainer | justin@jle.im | 
| Stability | experimental | 
| Portability | non-portable | 
| Safe Haskell | Safe-Inferred | 
| Language | Haskell2010 | 
Data.Conduino
Description
Base API for Pipe.  See documentation for Pipe, .|, and runPipe
 for information on usage.
A "prelude" of useful pipes can be found in Data.Conduino.Combinators.
Why a stream processing library?
A stream processing library is a way to stream processors in a composable way: instead of defining your entire stream processing function as a single recursive loop with some global state, instead think about each "stage" of the process, and isolate each state to its own segment. Each component can contain its own isolated state:
>>>runPipePure $ sourceList [1..10].| scan (+) 0 .| sinkList [1,3,6,10,15,21,28,36,45,55]
All of these components have internal "state":
- sourceListkeeps track of "which" item in the list to yield next
- scankeeps track of the current running sum
- sinkListkeeps track of all items that have been seen so far, as a list
They all work together without knowing any other component's internal state, so you can write your total streaming function without concerning yourself, at each stage, with the entire part.
In addition, there are useful functions to "combine" stream processors:
- zipSinkcombines sinks in an "and" sort of way: combine two sinks in parallel and finish when all finish.
- altSinkcombines sinks in an "or" sort of way: combine two sinks in parallel and finish when any of them finish
- zipSourcecombines sources in parallel and collate their outputs.
Stream processing libraries are also useful for streaming composition of monadic effects (like IO or State), as well.
Synopsis
- data Pipe i o u m a
- (.|) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m r
- runPipe :: Monad m => Pipe () Void u m a -> m a
- runPipePure :: Pipe () Void Void Identity a -> a
- awaitEither :: Pipe i o u m (Either u i)
- await :: Pipe i o u m (Maybe i)
- awaitWith :: (i -> Pipe i o u m u) -> Pipe i o u m u
- awaitSurely :: Pipe i o Void m i
- awaitForever :: (i -> Pipe i o u m a) -> Pipe i o u m u
- yield :: o -> Pipe i o u m ()
- yieldLazy :: o -> Pipe i o u m ()
- (&|) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (v, r)
- (|.) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m v
- fuseBoth :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (v, r)
- fuseUpstream :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m v
- fuseBothMaybe :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (Maybe v, r)
- squeezePipe :: Monad m => Pipe i o u m a -> m ([o], Either (i -> Pipe i o u m a) a)
- squeezePipeEither :: Monad m => Pipe i o u m a -> m ([o], Either (Either u i -> Pipe i o u m a) a)
- feedPipe :: Monad m => [i] -> Pipe i o u m a -> m ([o], Either (i -> Pipe i o u m a) ([i], a))
- feedPipeEither :: Monad m => [i] -> Pipe i o u m a -> m ([o], Either (Either u i -> Pipe i o u m a) ([i], a))
- mapInput :: (i -> j) -> Pipe j o u m a -> Pipe i o u m a
- mapOutput :: (p -> o) -> Pipe i p u m a -> Pipe i o u m a
- mapUpRes :: (u -> v) -> Pipe i o v m a -> Pipe i o u m a
- trimapPipe :: (i -> j) -> (p -> o) -> (u -> v) -> Pipe j p v m a -> Pipe i o u m a
- passthrough :: Monad m => Pipe i o u m a -> Pipe i (Maybe i, o) u m a
- hoistPipe :: (Monad m, Monad n) => (forall x. m x -> n x) -> Pipe i o u m a -> Pipe i o u n a
- feedbackPipe :: Monad m => Pipe x x u m a -> Pipe x x u m a
- feedbackPipeEither :: Monad m => Pipe (Either i o) o u m a -> Pipe i o u m a
- newtype ZipSource m a = ZipSource {- getZipSource :: Pipe () a Void m ()
 
- unconsZipSource :: Monad m => ZipSource m a -> m (Maybe (Maybe a, ZipSource m a))
- zipSource :: Monad m => Pipe () (a -> b) u m () -> Pipe () a v m () -> Pipe () b w m ()
- newtype ZipSink i u m a = ZipSink {- getZipSink :: Pipe i Void u m a
 
- zipSink :: Monad m => Pipe i Void u m (a -> b) -> Pipe i Void u m a -> Pipe i Void u m b
- altSink :: Monad m => Pipe i Void u m a -> Pipe i Void u m a -> Pipe i Void u m a
- toListT :: Applicative m => Pipe () o u m () -> ListT m (Maybe o)
- fromListT :: Monad m => ListT m (Maybe o) -> Pipe i o u m ()
- pattern PipeList :: Monad m => ListT m (Maybe a) -> Pipe () a u m ()
- withSource :: Pipe () o u m () -> (Maybe (o, m r) -> m r) -> m r
- genSource :: (forall r. (Maybe (o, m r) -> m r) -> m r) -> Pipe i o u m ()
Documentation
Similar to a conduit from the conduit package.
For a Pipe i o u m a
- i: Type of input stream (the things you can- await)
- o: Type of output stream (the things you- yield)
- u: Type of the result of the upstream pipe (Outputted when upstream pipe terminates)
- m: Underlying monad (the things you can- lift)
- a: Result type when pipe terminates (outputted when finished, with- pureor- return)
Some specializations:
- If iis(), the pipe is a source --- it doesn't need anything to produce items. It will pump out items on its own, for pipes downstream to receive and process.
- If oisVoid, the pipe is a sink --- it will neveryieldanything downstream. It will consume items from things upstream, and produce a result (a) if and when it terminates.
- If uisVoid, then the pipe's upstream is limitless, and never terminates. This means that you can useawaitSurelyinstead ofawait, to get await a value that is guaranteed to come. You'll get aniinstead of aMaybei
- If aisVoid, then the pipe never terminates --- it will keep on consuming and/or producing values forever. If this is a sink, it means that the sink will never terminate, and sorunPipewill also never terminate. If it is a source, it means that if you chain something downstream with.|, that downstream pipe can useawaitSurelyto guarantee something being passed down.
Applicative and Monadic sequencing of pipes chains by exhaustion.
do pipeX pipeY pipeZ
is a pipe itself, that behaves like pipeX until it terminates, then
 pipeY until it terminates, then pipeZ until it terminates.  The
 Monad instance allows you to choose "which pipe to behave like next"
 based on the terminating result of a previous pipe.
do x <- pipeX pipeBasedOn x
Usually you would use it by chaining together pipes with
 .| and then running the result with
 runPipe.
runPipe$ someSource.|somePipe .| someOtherPipe .| someSink
See .| and runPipe for more information
 on usage.
For a "prelude" of commonly used Pipes, see
 Data.Conduino.Combinators.
Instances
(.|) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m r infixr 2 Source #
The main operator for chaining pipes together.  pipe1 .| pipe2 will
 connect the output of pipe1 to the input of pipe2.
Running a pipe will draw from pipe2, and if pipe2 ever asks for
 input (with await or something similar), it will block until pipe1
 outputs something (or signals termination).
The structure of a full pipeline usually looks like:
runPipe$ someSource.|somePipe .| someOtherPipe .| someSink
Where you route a source into a series of pipes, which eventually ends
 up at a sink.  runPipe will then produce the result of that sink.
runPipe :: Monad m => Pipe () Void u m a -> m a Source #
Run a pipe that is both a source and a sink (an "effect") into the effect that it represents.
Usually you wouild construct this using something like:
runPipe$ someSource.|somePipe .| someOtherPipe .| someSink
runPipe will produce the result of that final sink.
Some common errors you might receive:
- iis not- (): If you give a pipe where the first parameter ("input") is not- (), it means that your pipe is not a producer. Pre-compose it (using- .|) with a producer of the type you need.
For example, if you have a myPipe :: , this is
    a pipe that is awaiting Pipe Int o u m aInts from upstream.  Pre-compose with
    a producer of Ints, like sourceList
    [1,2,3] .| myPipe
- ois not- Void: If you give a pipe where the second parameter ("output") is not- Void, it means that your pipe is not a consumer. Post-compose it (using- .|) with a consumer of the type you need.
For example, if you have myPipe :: , this is
    a pipe that is yielding Pipe i Int u m aInts downstream that are going unhandled.
    Post-compose it a consumer of Ints, like myPipe , in order to be able to run it..|
    foldl (+) 0
If you just want to ignore all downstream yields, post-compose with
    sinkNull.
Primitives
awaitEither :: Pipe i o u m (Either u i) Source #
Await on upstream output.  Will block until it receives an i
 (expected input type) or a u if the upstream pipe terminates.
await :: Pipe i o u m (Maybe i) Source #
Await input from upstream.  Will block until upstream yields.
Will return Nothing if the upstream pipe finishes and terminates.
If the upstream pipe never terminates, then you can use awaitSurely to
 guarantee a result.
awaitSurely :: Pipe i o Void m i Source #
Await input from upstream where the upstream pipe is guaranteed to never terminate.
A common type error will occur if u (upstream pipe result type) is not
 Void -- it might be () or some non-Void type.  This means that the
 upstream pipe terminates, so awaiting cannot be assured.
In that case, either change your upstream pipe to be one that never
 terminates (which is most likely not possible), or use await instead
 of awaitSurely.
awaitForever :: (i -> Pipe i o u m a) -> Pipe i o u m u Source #
A useful utility function over repeated awaits.  Will repeatedly
 await and then continue with the given pipe whenever the upstream pipe
 yields.
Can be used to implement many pipe combinators:
mapf =awaitForever$ x ->yield(f x)
yield :: o -> Pipe i o u m () Source #
Send output downstream.
Since v0.2.3.0, is strict.  See yieldLazy for the original behavior.
yieldLazy :: o -> Pipe i o u m () Source #
Send output downstream without forcing its argument.
Since: 0.2.3.0
Special chaining
(&|) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (v, r) infixr 2 Source #
Like .|, but get the result of both pipes on termination, instead
 of just the second.  This means that p &| q will only terminate with a result when
 both p and q terminate.  (Typically, p .| q would terminate as soon as
 q terminates.)
Since: 0.2.1.0
(|.) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m v infixr 2 Source #
Like .|, but keep the result of the first pipe, instead of the
 second.  This means that p |. q will only terminate with a result when
 both p and q terminate.  (Typically, p .| q would terminate as soon as
 q terminates.)
Since: 0.2.1.0
fuseBoth :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (v, r) Source #
Useful prefix version of &|.
Since: 0.2.1.0
fuseUpstream :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m v Source #
Useful prefix version of |..
Since: 0.2.1.0
Incremental running
squeezePipeEither :: Monad m => Pipe i o u m a -> m ([o], Either (Either u i -> Pipe i o u m a) a) Source #
Squeeze a pipe by extracting all output that can be extracted before
 any input is requested.  Returns a Left if the pipe eventually does
 request input (as a continuation on the new input, or a terminating u
 value), or a Right if the pipe terminates with a value before ever
 asking for input.
Since: 0.2.1.0
Arguments
| :: Monad m | |
| => [i] | input to feed in | 
| -> Pipe i o u m a | |
| -> m ([o], Either (i -> Pipe i o u m a) ([i], a)) | 
Repeatedly run squeezePipe by giving it items from an input list.
 Returns the outputs observed, and Left if the input list was exhausted
 with more input expected, or Right if the pipe terminated, with the
 leftover inputs and output result.
Since: 0.2.1.0
Arguments
| :: Monad m | |
| => [i] | input to feed in | 
| -> Pipe i o u m a | |
| -> m ([o], Either (Either u i -> Pipe i o u m a) ([i], a)) | 
Repeatedly run squeezePipeEither by giving it items from an input
 list.  Returns the outputs observed, and Left if the input list was
 exhausted with more input expected (or a u terminating upstream
 value), or Right if the pipe terminated, with the leftover inputs and
 output result.
Since: 0.2.1.0
Pipe transformers
mapInput :: (i -> j) -> Pipe j o u m a -> Pipe i o u m a Source #
(Contravariantly) map over the expected input type.
mapOutput :: (p -> o) -> Pipe i p u m a -> Pipe i o u m a Source #
Map over the downstream output type.
If you want to map over the result type, use fmap.
mapUpRes :: (u -> v) -> Pipe i o v m a -> Pipe i o u m a Source #
(Contravariantly) map over the upstream result type.
trimapPipe :: (i -> j) -> (p -> o) -> (u -> v) -> Pipe j p v m a -> Pipe i o u m a Source #
Map over the input type, output type, and upstream result type.
If you want to map over the result type, use fmap.
hoistPipe :: (Monad m, Monad n) => (forall x. m x -> n x) -> Pipe i o u m a -> Pipe i o u n a Source #
Transform the underlying monad of a pipe.
Note that if you are trying to work with monad transformers, this is probably not what you want. See Data.Conduino.Lift for tools for working with underlying monad transformers.
feedbackPipe :: Monad m => Pipe x x u m a -> Pipe x x u m a Source #
Loop a pipe into itself.
- Will feed all output back to the input
- Will only ask for input upstream if output is stalled.
- Yields all outputted values downstream, effectively duplicating them.
Since: 0.2.1.0
feedbackPipeEither :: Monad m => Pipe (Either i o) o u m a -> Pipe i o u m a Source #
A version of feedbackPipe that distinguishes upstream input from
 downstream output fed back.  Gets Left from upstream, and Right from
 its own output.
- Will feed all output back to the input
- Will only ask for input upstream if output is stalled.
- Yields all outputted values downstream, effectively duplicating them.
Since: 0.2.2.0
Wrappers
newtype ZipSource m a Source #
A newtype wrapper over a source (Pipe () o VoidApplicative and Alternative instance, matching "ListT
 done right".
<*> will pair up each output that the sources produce: if you await
 a value from downstream, it will wait until both paired sources yield
 before passing them on together.
<|> will completely exhaust the first source before moving on to the
 next source.
ZipSource is effectively equivalent to "ListT done right", the true
 List Monad transformer.  <|> is concatentation.  You can use this type
 with lift to lift a yielding action and <|> to sequence yields to
 implement the pattern described in
 http://www.haskellforall.com/2014/11/how-to-build-library-agnostic-streaming.html,
 where you can write streaming producers in a polymorphic way, and have
 it run with pipes, conduit, etc.
The main difference is that its Applicative instance ("zipping") is
 different from the traditional Applicative instance for ListT
 ("all combinations").  Effectively this becomes like a "zipping"
 Applicative instance for ListT.
If you want a Monad (or MonadIO) instance,
 use ListT instead, and convert using toListT/fromListT or the
 PipeList pattern/constructor.
Constructors
| ZipSource | |
| Fields 
 | |
Instances
| MonadTrans ZipSource Source # | |
| Defined in Data.Conduino | |
| Monad m => Alternative (ZipSource m) Source # | |
| Monad m => Applicative (ZipSource m) Source # | |
| Defined in Data.Conduino | |
| Functor (ZipSource m) Source # | |
unconsZipSource :: Monad m => ZipSource m a -> m (Maybe (Maybe a, ZipSource m a)) Source #
ZipSource is effectively ListT returning a Maybe.  As such, you
 can use unconsZipSource to "peel off" the first yielded item, if it
 exists, and return the "rest of the list".
zipSource :: Monad m => Pipe () (a -> b) u m () -> Pipe () a v m () -> Pipe () b w m () Source #
Takes two sources and runs them in parallel, collating their outputs.
Since: 0.2.1.0
newtype ZipSink i u m a Source #
A newtype wrapper over a sink (Pipe i VoidApplicative and Alternative instance.
<*> will distribute input over both sinks, and output a final result
 once both sinks finish.
<|> will distribute input over both sinks, and output a final result
 as soon as one or the other finishes.
Constructors
| ZipSink | |
| Fields 
 | |
Instances
| MonadTrans (ZipSink i u) Source # | |
| Defined in Data.Conduino | |
| Monad m => Alternative (ZipSink i u m) Source # | 
 
 | 
| Monad m => Applicative (ZipSink i u m) Source # | 
 
 | 
| Defined in Data.Conduino Methods pure :: a -> ZipSink i u m a # (<*>) :: ZipSink i u m (a -> b) -> ZipSink i u m a -> ZipSink i u m b # liftA2 :: (a -> b -> c) -> ZipSink i u m a -> ZipSink i u m b -> ZipSink i u m c # (*>) :: ZipSink i u m a -> ZipSink i u m b -> ZipSink i u m b # (<*) :: ZipSink i u m a -> ZipSink i u m b -> ZipSink i u m a # | |
| Functor (ZipSink i u m) Source # | |
zipSink :: Monad m => Pipe i Void u m (a -> b) -> Pipe i Void u m a -> Pipe i Void u m b Source #
Distribute input to both sinks, and finishes with the final result once both finish.
Forms an identity with pure.
altSink :: Monad m => Pipe i Void u m a -> Pipe i Void u m a -> Pipe i Void u m a Source #
Distribute input to both sinks, and finishes with the result of the one that finishes first.