| Safe Haskell | None | 
|---|---|
| Language | Haskell2010 | 
Data.Machine.Concurrent
Contents
Description
The primary use of concurrent machines is to establish a pipelined architecture that can boost overall throughput by running each stage of the pipeline at the same time. The processing, or production, rate of each stage may not be identical, so facilities are provided to loosen the temporal coupling between pipeline stages using buffers.
This architecture also lends itself to operations where multiple
 workers are available for procesisng inputs. If each worker is to
 process the same set of inputs, consider fanout and
 fanoutSteps. If each worker is to process a disjoint set of
 inputs, consider scatter.
- module Data.Machine
- (>~>) :: MonadBaseControl IO m => MachineT m k b -> ProcessT m b c -> MachineT m k c
- (<~<) :: MonadBaseControl IO m => ProcessT m b c -> MachineT m k b -> MachineT m k c
- bufferConnect :: MonadBaseControl IO m => Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c
- rollingConnect :: MonadBaseControl IO m => Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c
- fanout :: (MonadBaseControl IO m, Semigroup r) => [ProcessT m a r] -> ProcessT m a r
- fanoutSteps :: (MonadBaseControl IO m, Monoid r) => [ProcessT m a r] -> ProcessT m a r
- wye :: forall m a a' b b' c. MonadBaseControl IO m => ProcessT m a a' -> ProcessT m b b' -> WyeT m a' b' c -> WyeT m a b c
- tee :: forall m a a' b b' c. MonadBaseControl IO m => ProcessT m a a' -> ProcessT m b b' -> TeeT m a' b' c -> TeeT m a b c
- scatter :: MonadBaseControl IO m => [MachineT m k o] -> MachineT m k o
- splitSum :: forall m a b c d. MonadBaseControl IO m => ProcessT m a b -> ProcessT m c d -> ProcessT m (Either a c) (Either b d)
- mergeSum :: forall m a b r. MonadBaseControl IO m => ProcessT m a r -> ProcessT m b r -> ProcessT m (Either a b) r
- splitProd :: forall m a b r. MonadBaseControl IO m => ProcessT m a r -> ProcessT m b r -> ProcessT m (a, b) r
Documentation
module Data.Machine
Concurrent connection
(>~>) :: MonadBaseControl IO m => MachineT m k b -> ProcessT m b c -> MachineT m k c infixl 7 Source #
Flipped (<~<).
(<~<) :: MonadBaseControl IO m => ProcessT m b c -> MachineT m k b -> MachineT m k c Source #
Build a new Machine by adding a Process to the output of an
 old Machine. The upstream machine is run concurrently with
 downstream with the aim that upstream will have a yielded value
 ready as soon as downstream awaits. This effectively creates a
 buffer between upstream and downstream, or source and sink, that
 can contain up to one value.
(<~<) ::Processb c ->Processa b ->Processa c (<~<) ::Processc d ->Teea b c ->Teea b d (<~<) ::Processb c ->Machinek b ->Machinek c
Buffered machines
bufferConnect :: MonadBaseControl IO m => Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c Source #
rollingConnect :: MonadBaseControl IO m => Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c Source #
Concurrent processing of shared inputs
fanout :: (MonadBaseControl IO m, Semigroup r) => [ProcessT m a r] -> ProcessT m a r Source #
Share inputs with each of a list of processes in lockstep. Any values yielded by the processes for a given input are combined into a single yield from the composite process.
fanoutSteps :: (MonadBaseControl IO m, Monoid r) => [ProcessT m a r] -> ProcessT m a r Source #
Share inputs with each of a list of processes in lockstep. If
 none of the processes yields a value, the composite process will
 itself yield mempty. The idea is to provide a handle on steps
 only executed for their side effects. For instance, if you want to
 run a collection of ProcessTs that await but don't yield some
 number of times, you can use 'fanOutSteps . map (fmap (const ()))'
 followed by a taking process.
Concurrent multiple-input machines
wye :: forall m a a' b b' c. MonadBaseControl IO m => ProcessT m a a' -> ProcessT m b b' -> WyeT m a' b' c -> WyeT m a b c Source #
tee :: forall m a a' b b' c. MonadBaseControl IO m => ProcessT m a a' -> ProcessT m b b' -> TeeT m a' b' c -> TeeT m a b c Source #
Compose a pair of pipes onto the front of a Tee.
scatter :: MonadBaseControl IO m => [MachineT m k o] -> MachineT m k o Source #
Produces values from whichever source MachineT yields
 first. This operation may also be viewed as a gather operation in
 that all values produced by the given machines are interleaved when
 fed downstream. Note that inputs are not shared. The composite
 machine will await an input when any constituent machine awaits an
 input. That input will be supplied to the awaiting constituent and
 no other.
Some examples of more specific useful types scatter may be used
 at,
scatter :: [ProcessT m a b] -> ProcessT m a b scatter :: [SourceT m a] -> SourceT m a
The former may be used to stream data through a collection of
 worker Processes, the latter may be used to intersperse values
 from a collection of sources.
splitSum :: forall m a b c d. MonadBaseControl IO m => ProcessT m a b -> ProcessT m c d -> ProcessT m (Either a c) (Either b d) Source #
Similar to +++: split the input between two
 processes, retagging and merging their outputs.
The two processes are run concurrently whenever possible.
mergeSum :: forall m a b r. MonadBaseControl IO m => ProcessT m a r -> ProcessT m b r -> ProcessT m (Either a b) r Source #
Similar to |||: split the input between two
 processes and merge their outputs.
Connect two processes to the downstream tails of a Machine that
 produces Eithers. The two downstream consumers are run
 concurrently when possible. When one downstream consumer stops, the
 other is allowed to run until it stops or the upstream source
 yields a value the remaining consumer can not handle.
mergeSum sinkL sinkR produces a topology like this,
                                sinkL
                               /      \
                             a          \
                            /            \
   source -- Either a b -->                -- r -->
                            \            /
                             b          /
                              \       /
                                sinkR 
splitProd :: forall m a b r. MonadBaseControl IO m => ProcessT m a r -> ProcessT m b r -> ProcessT m (a, b) r Source #
Connect two processes to the downstream tails of a Machine that
 produces tuples. The two downstream consumers are run
 concurrently. When one downstream consumer stops, the entire
 pipeline is stopped.
splitProd sink1 sink2 produces a topology like this,
                           sink1
                          /      \
                        a          \
                       /            \
   source -- (a,b) -->               -- r -->
                       \            /
                        b         /
                          \     /
                           sink2