{-# LANGUAGE CPP #-}
{- HLINT ignore "Eta reduce" -}
-- |
-- Module      : Streamly.Internal.Data.StreamK
-- Copyright   : (c) 2017 Composewell Technologies
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
module Streamly.Internal.Data.StreamK
    (
    -- * Setup
    -- | To execute the code examples provided in this module in ghci, please
    -- run the following commands first.
    --
    -- $setup

      module Streamly.Internal.Data.StreamK.Type
    -- * Transformer
    , module Streamly.Internal.Data.StreamK.Transformer

    -- * From containers
    , fromStream

    -- * Specialized Generation
    , repeatM
    , replicate
    , replicateM
    , fromIndices
    , fromIndicesM
    , iterate
    , iterateM

    -- * Elimination
    -- ** General Folds
    , foldr1
    , fold
    , foldBreak
    , foldEither
    , foldConcat
    , ParserK.toParserK -- XXX move the code to this module
    , parseDBreak
    , parseD
    , parseBreak
    , parseBreakPos
    , parse
    , parsePos

    -- ** Specialized Folds
    , head
    , elem
    , notElem
    , all
    , any
    , last
    , minimum
    , minimumBy
    , maximum
    , maximumBy
    , findIndices
    , lookup
    , findM
    , find
    , (!!)

    -- ** To Containers
    , toList
    , toStream

    -- ** Map and Fold
    , mapM_

    -- * Transformation
    -- ** By folding (scans)
    , scanl'
    , scanlx'

    -- ** Filtering
    , filter
    , take
    , takeWhile
    , drop
    , dropWhile

    -- ** Mapping
    , mapM
    , sequence

    -- ** Inserting
    , intersperseM
    , intersperse
    , insertBy

    -- ** Deleting
    , deleteBy

    -- ** Reordering
    , sortBy
    , sortOn

    -- ** Map and Filter
    , mapMaybe

    -- ** Zipping
    , zipWith
    , zipWithM

    -- ** Merging
    , mergeBy
    , mergeByM

    -- ** Transformation comprehensions
    , the

    -- ** Transforming Inner Monad
    , morphInner

    -- * Exceptions
    , handle

    -- * Resource Management
    , bracketIO

    -- * Deprecated
    , hoist
    , parseBreakChunks
    , parseChunks
    , parseBreakChunksGeneric
    , parseChunksGeneric
    )
where

#include "ArrayMacros.h"
#include "inline.hs"
#include "assert.hs"
#include "deprecation.h"

import Control.Exception (mask_, Exception)
import Control.Monad (void, join)
import Control.Monad.Catch (MonadCatch)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Ord (comparing)
import GHC.Types (SPEC(..))
import Streamly.Internal.Data.Array.Type (Array(..))
import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.IOFinalizer (newIOFinalizer, runIOFinalizer)
import Streamly.Internal.Data.ParserK.Type (ParserK)
import Streamly.Internal.Data.Producer.Type (Producer(..))
import Streamly.Internal.Data.SVar.Type (adaptState, defState)
import Streamly.Internal.Data.Unbox (Unbox)

import qualified Control.Monad.Catch as MC
import qualified Streamly.Internal.Data.Array as Array
import qualified Streamly.Internal.Data.Array.Generic as GenArr
import qualified Streamly.Internal.Data.Fold.Type as FL
import qualified Streamly.Internal.Data.Parser as Parser
import qualified Streamly.Internal.Data.ParserDrivers as Drivers
import qualified Streamly.Internal.Data.Parser.Type as PR
import qualified Streamly.Internal.Data.ParserK.Type as ParserK
import qualified Streamly.Internal.Data.Stream as Stream
import qualified Prelude

import Prelude
       hiding (Foldable(..), last, map, mapM, mapM_, repeat, sequence,
               take, filter, all, any, takeWhile, drop, dropWhile,
               notElem, head, tail, init, zipWith, lookup,
               (!!), replicate, reverse, concatMap, iterate, splitAt)
import Data.Foldable (length)
import Streamly.Internal.Data.StreamK.Type
import Streamly.Internal.Data.StreamK.Transformer
import Streamly.Internal.Data.Parser (ParseError(..), ParseErrorPos(..))

#include "DocTestDataStreamK.hs"

-- | Convert a fused 'Stream' to 'StreamK'.
--
-- For example:
--
-- >>> s1 = StreamK.fromStream $ Stream.fromList [1,2]
-- >>> s2 = StreamK.fromStream $ Stream.fromList [3,4]
-- >>> Stream.fold Fold.toList $ StreamK.toStream $ s1 `StreamK.append` s2
-- [1,2,3,4]
--
{-# INLINE fromStream #-}
fromStream :: Monad m => Stream.Stream m a -> StreamK m a
fromStream :: forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
fromStream = Stream m a -> StreamK m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK

-- | Convert a 'StreamK' to a fused 'Stream'.
--
{-# INLINE toStream #-}
toStream :: Applicative m => StreamK m a -> Stream.Stream m a
toStream :: forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
toStream = StreamK m a -> Stream m a
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
Stream.fromStreamK

-------------------------------------------------------------------------------
-- Generation
-------------------------------------------------------------------------------

{-
-- Generalization of concurrent streams/SVar via unfoldr.
--
-- Unfold a value into monadic actions and then run the resulting monadic
-- actions to generate a stream. Since the step of generating the monadic
-- action and running them are decoupled we can run the monadic actions
-- cooncurrently. For example, the seed could be a list of monadic actions or a
-- pure stream of monadic actions.
--
-- We can have different flavors of this depending on the stream type t. The
-- concurrent version could be async or ahead etc. Depending on how we queue
-- back the feedback portion b, it could be DFS or BFS style.
--
unfoldrA :: (b -> Maybe (m a, b)) -> b -> StreamK m a
unfoldrA = undefined
-}

-------------------------------------------------------------------------------
-- Special generation
-------------------------------------------------------------------------------

-- |
-- >>> repeatM = StreamK.sequence . StreamK.repeat
-- >>> repeatM = fix . StreamK.consM
-- >>> repeatM = cycle1 . StreamK.fromEffect
--
-- Generate a stream by repeatedly executing a monadic action forever.
--
-- >>> :{
-- repeatAction =
--        StreamK.repeatM (threadDelay 1000000 >> print 1)
--      & StreamK.take 10
--      & StreamK.fold Fold.drain
-- :}
--
repeatM :: Monad m => m a -> StreamK m a
repeatM :: forall (m :: * -> *) a. Monad m => m a -> StreamK m a
repeatM = (m a -> StreamK m a -> StreamK m a) -> m a -> StreamK m a
forall (m :: * -> *) a (t :: (* -> *) -> * -> *).
(m a -> t m a -> t m a) -> m a -> t m a
repeatMWith m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
consM

{-# INLINE replicateM #-}
replicateM :: Monad m => Int -> m a -> StreamK m a
replicateM :: forall (m :: * -> *) a. Monad m => Int -> m a -> StreamK m a
replicateM = (m a -> StreamK m a -> StreamK m a) -> Int -> m a -> StreamK m a
forall (m :: * -> *) a.
(m a -> StreamK m a -> StreamK m a) -> Int -> m a -> StreamK m a
replicateMWith m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
consM
{-# INLINE replicate #-}
replicate :: Int -> a -> StreamK m a
replicate :: forall a (m :: * -> *). Int -> a -> StreamK m a
replicate Int
n a
a = Int -> StreamK m a
forall {t} {m :: * -> *}. (Ord t, Num t) => t -> StreamK m a
go Int
n
    where
    go :: t -> StreamK m a
go t
cnt = if t
cnt t -> t -> Bool
forall a. Ord a => a -> a -> Bool
<= t
0 then StreamK m a
forall (m :: * -> *) a. StreamK m a
nil else a
a a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` t -> StreamK m a
go (t
cnt t -> t -> t
forall a. Num a => a -> a -> a
- t
1)

{-# INLINE fromIndicesM #-}
fromIndicesM :: Monad m => (Int -> m a) -> StreamK m a
fromIndicesM :: forall (m :: * -> *) a. Monad m => (Int -> m a) -> StreamK m a
fromIndicesM = (m a -> StreamK m a -> StreamK m a) -> (Int -> m a) -> StreamK m a
forall (m :: * -> *) a.
(m a -> StreamK m a -> StreamK m a) -> (Int -> m a) -> StreamK m a
fromIndicesMWith m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
consM
{-# INLINE fromIndices #-}
fromIndices :: (Int -> a) -> StreamK m a
fromIndices :: forall a (m :: * -> *). (Int -> a) -> StreamK m a
fromIndices Int -> a
gen = Int -> StreamK m a
forall {m :: * -> *}. Int -> StreamK m a
go Int
0
  where
    go :: Int -> StreamK m a
go Int
n = Int -> a
gen Int
n a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` Int -> StreamK m a
go (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)

-- |
-- >>> iterate f x = x `StreamK.cons` iterate f x
--
-- Generate an infinite stream with @x@ as the first element and each
-- successive element derived by applying the function @f@ on the previous
-- element.
--
-- >>> StreamK.toList $ StreamK.take 5 $ StreamK.iterate (+1) 1
-- [1,2,3,4,5]
--
{-# INLINE iterate #-}
iterate :: (a -> a) -> a -> StreamK m a
iterate :: forall a (m :: * -> *). (a -> a) -> a -> StreamK m a
iterate a -> a
step = a -> StreamK m a
forall {m :: * -> *}. a -> StreamK m a
go
    where
        go :: a -> StreamK m a
go !a
s = a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons a
s (a -> StreamK m a
go (a -> a
step a
s))

-- |
-- >>> iterateM f m = m >>= \a -> return a `StreamK.consM` iterateM f (f a)
--
-- Generate an infinite stream with the first element generated by the action
-- @m@ and each successive element derived by applying the monadic function
-- @f@ on the previous element.
--
-- >>> :{
-- StreamK.iterateM (\x -> print x >> return (x + 1)) (return 0)
--     & StreamK.take 3
--     & StreamK.toList
-- :}
-- 0
-- 1
-- [0,1,2]
--
{-# INLINE iterateM #-}
iterateM :: Monad m => (a -> m a) -> m a -> StreamK m a
iterateM :: forall (m :: * -> *) a. Monad m => (a -> m a) -> m a -> StreamK m a
iterateM = (m a -> StreamK m a -> StreamK m a)
-> (a -> m a) -> m a -> StreamK m a
forall (m :: * -> *) a.
Monad m =>
(m a -> StreamK m a -> StreamK m a)
-> (a -> m a) -> m a -> StreamK m a
iterateMWith m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
consM

-------------------------------------------------------------------------------
-- Elimination by Folding
-------------------------------------------------------------------------------

{-# INLINE foldr1 #-}
foldr1 :: Monad m => (a -> a -> a) -> StreamK m a -> m (Maybe a)
foldr1 :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> a) -> StreamK m a -> m (Maybe a)
foldr1 a -> a -> a
step StreamK m a
m = do
    Maybe (a, StreamK m a)
r <- StreamK m a -> m (Maybe (a, StreamK m a))
forall (m :: * -> *) a.
Applicative m =>
StreamK m a -> m (Maybe (a, StreamK m a))
uncons StreamK m a
m
    case Maybe (a, StreamK m a)
r of
        Maybe (a, StreamK m a)
Nothing -> Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
        Just (a
h, StreamK m a
t) -> (a -> Maybe a) -> m a -> m (Maybe a)
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Maybe a
forall a. a -> Maybe a
Just (a -> StreamK m a -> m a
forall {m :: * -> *}. Monad m => a -> StreamK m a -> m a
go a
h StreamK m a
t)
    where
    go :: a -> StreamK m a -> m a
go a
p StreamK m a
m1 =
        let stp :: m a
stp = a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
p
            single :: a -> m a
single a
a = a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> m a) -> a -> m a
forall a b. (a -> b) -> a -> b
$ a -> a -> a
step a
a a
p
            yieldk :: a -> StreamK m a -> m a
yieldk a
a StreamK m a
r = (a -> a) -> m a -> m a
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a -> a -> a
step a
p) (a -> StreamK m a -> m a
go a
a StreamK m a
r)
         in State StreamK m a
-> (a -> StreamK m a -> m a)
-> (a -> m a)
-> m a
-> StreamK m a
-> m a
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m a
yieldk a -> m a
forall {m :: * -> *}. Monad m => a -> m a
single m a
stp StreamK m a
m1

-- | Fold a stream using the supplied left 'Fold' and reducing the resulting
-- expression strictly at each step. The behavior is similar to 'foldl''. A
-- 'Fold' can terminate early without consuming the full stream. See the
-- documentation of individual 'Fold's for termination behavior.
--
-- Definitions:
--
-- >>> fold f = fmap fst . StreamK.foldBreak f
-- >>> fold f = StreamK.parseD (Parser.fromFold f)
--
-- Example:
--
-- >>> StreamK.fold Fold.sum $ StreamK.fromStream $ Stream.enumerateFromTo 1 100
-- 5050
--
{-# INLINABLE fold #-}
fold :: Monad m => FL.Fold m a b -> StreamK m a -> m b
fold :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> StreamK m a -> m b
fold (FL.Fold s -> a -> m (Step s b)
step m (Step s b)
begin s -> m b
_ s -> m b
final) StreamK m a
m = do
    Step s b
res <- m (Step s b)
begin
    case Step s b
res of
        FL.Partial s
fs -> s -> StreamK m a -> m b
go s
fs StreamK m a
m
        FL.Done b
fb -> b -> m b
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return b
fb

    where
    go :: s -> StreamK m a -> m b
go !s
acc StreamK m a
m1 =
        let stop :: m b
stop = s -> m b
final s
acc
            single :: a -> m b
single a
a = s -> a -> m (Step s b)
step s
acc a
a
              m (Step s b) -> (Step s b -> m b) -> m b
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                        FL.Partial s
s -> s -> m b
final s
s
                        FL.Done b
b1 -> b -> m b
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return b
b1
            yieldk :: a -> StreamK m a -> m b
yieldk a
a StreamK m a
r = s -> a -> m (Step s b)
step s
acc a
a
              m (Step s b) -> (Step s b -> m b) -> m b
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                        FL.Partial s
s -> s -> StreamK m a -> m b
go s
s StreamK m a
r
                        FL.Done b
b1 -> b -> m b
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return b
b1
         in State StreamK m a
-> (a -> StreamK m a -> m b)
-> (a -> m b)
-> m b
-> StreamK m a
-> m b
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m b
yieldk a -> m b
single m b
stop StreamK m a
m1

-- | Fold resulting in either breaking the stream or continuation of the fold.
-- Instead of supplying the input stream in one go we can run the fold multiple
-- times, each time supplying the next segment of the input stream. If the fold
-- has not yet finished it returns a fold that can be run again otherwise it
-- returns the fold result and the residual stream.
--
-- /Internal/
{-# INLINE foldEither #-}
foldEither :: Monad m =>
    Fold m a b -> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a))
foldEither :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b
-> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a))
foldEither (FL.Fold s -> a -> m (Step s b)
step m (Step s b)
begin s -> m b
done s -> m b
final) StreamK m a
m = do
    Step s b
res <- m (Step s b)
begin
    case Step s b
res of
        FL.Partial s
fs -> s -> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a))
go s
fs StreamK m a
m
        FL.Done b
fb -> Either (Fold m a b) (b, StreamK m a)
-> m (Either (Fold m a b) (b, StreamK m a))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either (Fold m a b) (b, StreamK m a)
 -> m (Either (Fold m a b) (b, StreamK m a)))
-> Either (Fold m a b) (b, StreamK m a)
-> m (Either (Fold m a b) (b, StreamK m a))
forall a b. (a -> b) -> a -> b
$ (b, StreamK m a) -> Either (Fold m a b) (b, StreamK m a)
forall a b. b -> Either a b
Right (b
fb, StreamK m a
m)

    where

    go :: s -> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a))
go !s
acc StreamK m a
m1 =
        let stop :: m (Either (Fold m a b) b)
stop =
                let f :: Fold m a b
f = (s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
Fold s -> a -> m (Step s b)
step (Step s b -> m (Step s b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s b -> m (Step s b)) -> Step s b -> m (Step s b)
forall a b. (a -> b) -> a -> b
$ s -> Step s b
forall s b. s -> Step s b
FL.Partial s
acc) s -> m b
done s -> m b
final
                 in Either (Fold m a b) b -> m (Either (Fold m a b) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either (Fold m a b) b -> m (Either (Fold m a b) b))
-> Either (Fold m a b) b -> m (Either (Fold m a b) b)
forall a b. (a -> b) -> a -> b
$ Fold m a b -> Either (Fold m a b) b
forall a b. a -> Either a b
Left Fold m a b
f
            single :: a -> m (Either (Fold m a b) (b, StreamK m a))
single a
a =
                s -> a -> m (Step s b)
step s
acc a
a
                  m (Step s b)
-> (Step s b -> m (Either (Fold m a b) (b, StreamK m a)))
-> m (Either (Fold m a b) (b, StreamK m a))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                    FL.Partial s
s ->
                        let f :: Fold m a b
f = (s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
Fold s -> a -> m (Step s b)
step (Step s b -> m (Step s b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s b -> m (Step s b)) -> Step s b -> m (Step s b)
forall a b. (a -> b) -> a -> b
$ s -> Step s b
forall s b. s -> Step s b
FL.Partial s
s) s -> m b
done s -> m b
final
                         in Either (Fold m a b) (b, StreamK m a)
-> m (Either (Fold m a b) (b, StreamK m a))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either (Fold m a b) (b, StreamK m a)
 -> m (Either (Fold m a b) (b, StreamK m a)))
-> Either (Fold m a b) (b, StreamK m a)
-> m (Either (Fold m a b) (b, StreamK m a))
forall a b. (a -> b) -> a -> b
$ Fold m a b -> Either (Fold m a b) (b, StreamK m a)
forall a b. a -> Either a b
Left Fold m a b
f
                    FL.Done b
b1 -> Either (Fold m a b) (b, StreamK m a)
-> m (Either (Fold m a b) (b, StreamK m a))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either (Fold m a b) (b, StreamK m a)
 -> m (Either (Fold m a b) (b, StreamK m a)))
-> Either (Fold m a b) (b, StreamK m a)
-> m (Either (Fold m a b) (b, StreamK m a))
forall a b. (a -> b) -> a -> b
$ (b, StreamK m a) -> Either (Fold m a b) (b, StreamK m a)
forall a b. b -> Either a b
Right (b
b1, StreamK m a
forall (m :: * -> *) a. StreamK m a
nil)
            yieldk :: a -> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a))
yieldk a
a StreamK m a
r =
                s -> a -> m (Step s b)
step s
acc a
a
                  m (Step s b)
-> (Step s b -> m (Either (Fold m a b) (b, StreamK m a)))
-> m (Either (Fold m a b) (b, StreamK m a))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                    FL.Partial s
s -> s -> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a))
go s
s StreamK m a
r
                    FL.Done b
b1 -> Either (Fold m a b) (b, StreamK m a)
-> m (Either (Fold m a b) (b, StreamK m a))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either (Fold m a b) (b, StreamK m a)
 -> m (Either (Fold m a b) (b, StreamK m a)))
-> Either (Fold m a b) (b, StreamK m a)
-> m (Either (Fold m a b) (b, StreamK m a))
forall a b. (a -> b) -> a -> b
$ (b, StreamK m a) -> Either (Fold m a b) (b, StreamK m a)
forall a b. b -> Either a b
Right (b
b1, StreamK m a
r)
         in State StreamK m a
-> (a -> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a)))
-> (a -> m (Either (Fold m a b) (b, StreamK m a)))
-> m (Either (Fold m a b) (b, StreamK m a))
-> StreamK m a
-> m (Either (Fold m a b) (b, StreamK m a))
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a))
yieldk a -> m (Either (Fold m a b) (b, StreamK m a))
forall {m :: * -> *} {a}.
a -> m (Either (Fold m a b) (b, StreamK m a))
single m (Either (Fold m a b) (b, StreamK m a))
forall {b}. m (Either (Fold m a b) b)
stop StreamK m a
m1

-- | Like 'fold' but also returns the remaining stream. The resulting stream
-- would be 'StreamK.nil' if the stream finished before the fold.
--
{-# INLINE foldBreak #-}
foldBreak :: Monad m => Fold m a b -> StreamK m a -> m (b, StreamK m a)
foldBreak :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> StreamK m a -> m (b, StreamK m a)
foldBreak Fold m a b
fld StreamK m a
strm = do
    Either (Fold m a b) (b, StreamK m a)
r <- Fold m a b
-> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a))
forall (m :: * -> *) a b.
Monad m =>
Fold m a b
-> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a))
foldEither Fold m a b
fld StreamK m a
strm
    case Either (Fold m a b) (b, StreamK m a)
r of
        Right (b, StreamK m a)
res -> (b, StreamK m a) -> m (b, StreamK m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b, StreamK m a)
res
        Left (Fold s -> a -> m (Step s b)
_ m (Step s b)
initial s -> m b
_ s -> m b
final) -> do
            Step s b
res <- m (Step s b)
initial
            case Step s b
res of
                FL.Done b
_ -> [Char] -> m (b, StreamK m a)
forall a. HasCallStack => [Char] -> a
error [Char]
"foldBreak: unreachable state"
                FL.Partial s
s -> do
                    b
b <- s -> m b
final s
s
                    (b, StreamK m a) -> m (b, StreamK m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b
b, StreamK m a
forall (m :: * -> *) a. StreamK m a
nil)

-- XXX Array folds can be implemented using this.
-- foldContainers? Specialized to foldArrays.

-- | Generate streams from individual elements of a stream and fold the
-- concatenation of those streams using the supplied fold. Return the result of
-- the fold and residual stream.
--
-- For example, this can be used to efficiently fold an Array Word8 stream
-- using Word8 folds.
--
-- /Internal/
{-# INLINE foldConcat #-}
foldConcat :: Monad m =>
    Producer m a b -> Fold m b c -> StreamK m a -> m (c, StreamK m a)
foldConcat :: forall (m :: * -> *) a b c.
Monad m =>
Producer m a b -> Fold m b c -> StreamK m a -> m (c, StreamK m a)
foldConcat
    (Producer s -> m (Step s b)
pstep a -> m s
pinject s -> m a
pextract)
    (Fold s -> b -> m (Step s c)
fstep m (Step s c)
begin s -> m c
_ s -> m c
final)
    StreamK m a
stream = do

    Step s c
res <- m (Step s c)
begin
    case Step s c
res of
        FL.Partial s
fs -> s -> StreamK m a -> m (c, StreamK m a)
go s
fs StreamK m a
stream
        FL.Done c
fb -> (c, StreamK m a) -> m (c, StreamK m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (c
fb, StreamK m a
stream)

    where

    go :: s -> StreamK m a -> m (c, StreamK m a)
go !s
acc StreamK m a
m1 = do
        let stop :: m (c, StreamK m a)
stop = do
                c
r <- s -> m c
final s
acc
                (c, StreamK m a) -> m (c, StreamK m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (c
r, StreamK m a
forall (m :: * -> *) a. StreamK m a
nil)
            single :: a -> m (c, StreamK m a)
single a
a = do
                s
st <- a -> m s
pinject a
a
                Either s (c, s)
res <- SPEC -> s -> s -> m (Either s (c, s))
go1 SPEC
SPEC s
acc s
st
                case Either s (c, s)
res of
                    Left s
fs -> do
                        c
r <- s -> m c
final s
fs
                        (c, StreamK m a) -> m (c, StreamK m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (c
r, StreamK m a
forall (m :: * -> *) a. StreamK m a
nil)
                    Right (c
b, s
s) -> do
                        a
x <- s -> m a
pextract s
s
                        (c, StreamK m a) -> m (c, StreamK m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (c
b, a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a
fromPure a
x)
            yieldk :: a -> StreamK m a -> m (c, StreamK m a)
yieldk a
a StreamK m a
r = do
                s
st <- a -> m s
pinject a
a
                Either s (c, s)
res <- SPEC -> s -> s -> m (Either s (c, s))
go1 SPEC
SPEC s
acc s
st
                case Either s (c, s)
res of
                    Left s
fs -> s -> StreamK m a -> m (c, StreamK m a)
go s
fs StreamK m a
r
                    Right (c
b, s
s) -> do
                        a
x <- s -> m a
pextract s
s
                        (c, StreamK m a) -> m (c, StreamK m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (c
b, a
x a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m a
r)
         in State StreamK m a
-> (a -> StreamK m a -> m (c, StreamK m a))
-> (a -> m (c, StreamK m a))
-> m (c, StreamK m a)
-> StreamK m a
-> m (c, StreamK m a)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (c, StreamK m a)
yieldk a -> m (c, StreamK m a)
forall {m :: * -> *}. a -> m (c, StreamK m a)
single m (c, StreamK m a)
forall {m :: * -> *} {a}. m (c, StreamK m a)
stop StreamK m a
m1

    {-# INLINE go1 #-}
    go1 :: SPEC -> s -> s -> m (Either s (c, s))
go1 !SPEC
_ !s
fs s
st = do
        Step s b
r <- s -> m (Step s b)
pstep s
st
        case Step s b
r of
            Stream.Yield b
x s
s -> do
                Step s c
res <- s -> b -> m (Step s c)
fstep s
fs b
x
                case Step s c
res of
                    FL.Done c
b -> Either s (c, s) -> m (Either s (c, s))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either s (c, s) -> m (Either s (c, s)))
-> Either s (c, s) -> m (Either s (c, s))
forall a b. (a -> b) -> a -> b
$ (c, s) -> Either s (c, s)
forall a b. b -> Either a b
Right (c
b, s
s)
                    FL.Partial s
fs1 -> SPEC -> s -> s -> m (Either s (c, s))
go1 SPEC
SPEC s
fs1 s
s
            Stream.Skip s
s -> SPEC -> s -> s -> m (Either s (c, s))
go1 SPEC
SPEC s
fs s
s
            Step s b
Stream.Stop -> Either s (c, s) -> m (Either s (c, s))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either s (c, s) -> m (Either s (c, s)))
-> Either s (c, s) -> m (Either s (c, s))
forall a b. (a -> b) -> a -> b
$ s -> Either s (c, s)
forall a b. a -> Either a b
Left s
fs

------------------------------------------------------------------------------
-- Specialized folds
------------------------------------------------------------------------------

{-# INLINE head #-}
head :: Monad m => StreamK m a -> m (Maybe a)
-- head = foldrM (\x _ -> return $ Just x) (return Nothing)
head :: forall (m :: * -> *) a. Monad m => StreamK m a -> m (Maybe a)
head StreamK m a
m =
    let stop :: m (Maybe a)
stop      = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
        single :: a -> m (Maybe a)
single a
a  = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
        yieldk :: a -> p -> m (Maybe a)
yieldk a
a p
_ = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
    in State StreamK m a
-> (a -> StreamK m a -> m (Maybe a))
-> (a -> m (Maybe a))
-> m (Maybe a)
-> StreamK m a
-> m (Maybe a)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe a)
forall {m :: * -> *} {a} {p}. Monad m => a -> p -> m (Maybe a)
yieldk a -> m (Maybe a)
forall {m :: * -> *} {a}. Monad m => a -> m (Maybe a)
single m (Maybe a)
forall {a}. m (Maybe a)
stop StreamK m a
m

{-# INLINE elem #-}
elem :: (Monad m, Eq a) => a -> StreamK m a -> m Bool
elem :: forall (m :: * -> *) a.
(Monad m, Eq a) =>
a -> StreamK m a -> m Bool
elem a
e = StreamK m a -> m Bool
forall {m :: * -> *}. Monad m => StreamK m a -> m Bool
go
    where
    go :: StreamK m a -> m Bool
go StreamK m a
m1 =
        let stop :: m Bool
stop      = Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            single :: a -> m Bool
single a
a  = Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
e)
            yieldk :: a -> StreamK m a -> m Bool
yieldk a
a StreamK m a
r = if a
a a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
e then Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True else StreamK m a -> m Bool
go StreamK m a
r
        in State StreamK m a
-> (a -> StreamK m a -> m Bool)
-> (a -> m Bool)
-> m Bool
-> StreamK m a
-> m Bool
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m Bool
yieldk a -> m Bool
forall {m :: * -> *}. Monad m => a -> m Bool
single m Bool
stop StreamK m a
m1

{-# INLINE notElem #-}
notElem :: (Monad m, Eq a) => a -> StreamK m a -> m Bool
notElem :: forall (m :: * -> *) a.
(Monad m, Eq a) =>
a -> StreamK m a -> m Bool
notElem a
e = StreamK m a -> m Bool
forall {m :: * -> *}. Monad m => StreamK m a -> m Bool
go
    where
    go :: StreamK m a -> m Bool
go StreamK m a
m1 =
        let stop :: m Bool
stop      = Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
            single :: a -> m Bool
single a
a  = Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a a -> a -> Bool
forall a. Eq a => a -> a -> Bool
/= a
e)
            yieldk :: a -> StreamK m a -> m Bool
yieldk a
a StreamK m a
r = if a
a a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
e then Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False else StreamK m a -> m Bool
go StreamK m a
r
        in State StreamK m a
-> (a -> StreamK m a -> m Bool)
-> (a -> m Bool)
-> m Bool
-> StreamK m a
-> m Bool
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m Bool
yieldk a -> m Bool
forall {m :: * -> *}. Monad m => a -> m Bool
single m Bool
stop StreamK m a
m1

{-# INLINABLE all #-}
all :: Monad m => (a -> Bool) -> StreamK m a -> m Bool
all :: forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> StreamK m a -> m Bool
all a -> Bool
p = StreamK m a -> m Bool
forall {m :: * -> *}. Monad m => StreamK m a -> m Bool
go
    where
    go :: StreamK m a -> m Bool
go StreamK m a
m1 =
        let single :: a -> m Bool
single a
a   | a -> Bool
p a
a       = Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
                       | Bool
otherwise = Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            yieldk :: a -> StreamK m a -> m Bool
yieldk a
a StreamK m a
r | a -> Bool
p a
a       = StreamK m a -> m Bool
go StreamK m a
r
                       | Bool
otherwise = Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
         in State StreamK m a
-> (a -> StreamK m a -> m Bool)
-> (a -> m Bool)
-> m Bool
-> StreamK m a
-> m Bool
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m Bool
yieldk a -> m Bool
forall {m :: * -> *}. Monad m => a -> m Bool
single (Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True) StreamK m a
m1

{-# INLINABLE any #-}
any :: Monad m => (a -> Bool) -> StreamK m a -> m Bool
any :: forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> StreamK m a -> m Bool
any a -> Bool
p = StreamK m a -> m Bool
forall {m :: * -> *}. Monad m => StreamK m a -> m Bool
go
    where
    go :: StreamK m a -> m Bool
go StreamK m a
m1 =
        let single :: a -> m Bool
single a
a   | a -> Bool
p a
a       = Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
                       | Bool
otherwise = Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            yieldk :: a -> StreamK m a -> m Bool
yieldk a
a StreamK m a
r | a -> Bool
p a
a       = Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
                       | Bool
otherwise = StreamK m a -> m Bool
go StreamK m a
r
         in State StreamK m a
-> (a -> StreamK m a -> m Bool)
-> (a -> m Bool)
-> m Bool
-> StreamK m a
-> m Bool
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m Bool
yieldk a -> m Bool
forall {m :: * -> *}. Monad m => a -> m Bool
single (Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False) StreamK m a
m1

-- | Extract the last element of the stream, if any.
{-# INLINE last #-}
last :: Monad m => StreamK m a -> m (Maybe a)
last :: forall (m :: * -> *) a. Monad m => StreamK m a -> m (Maybe a)
last = (Maybe a -> a -> Maybe a)
-> Maybe a -> (Maybe a -> Maybe a) -> StreamK m a -> m (Maybe a)
forall (m :: * -> *) a b x.
Monad m =>
(x -> a -> x) -> x -> (x -> b) -> StreamK m a -> m b
foldlx' (\Maybe a
_ a
y -> a -> Maybe a
forall a. a -> Maybe a
Just a
y) Maybe a
forall a. Maybe a
Nothing Maybe a -> Maybe a
forall a. a -> a
id

{-# INLINE minimum #-}
minimum :: (Monad m, Ord a) => StreamK m a -> m (Maybe a)
minimum :: forall (m :: * -> *) a.
(Monad m, Ord a) =>
StreamK m a -> m (Maybe a)
minimum = Maybe a -> StreamK m a -> m (Maybe a)
forall {a} {m :: * -> *}.
(Ord a, Monad m) =>
Maybe a -> StreamK m a -> m (Maybe a)
go Maybe a
forall a. Maybe a
Nothing
    where
    go :: Maybe a -> StreamK m a -> m (Maybe a)
go Maybe a
Nothing StreamK m a
m1 =
        let stop :: m (Maybe a)
stop      = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
            single :: a -> m (Maybe a)
single a
a  = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
            yieldk :: a -> StreamK m a -> m (Maybe a)
yieldk a
a StreamK m a
r = Maybe a -> StreamK m a -> m (Maybe a)
go (a -> Maybe a
forall a. a -> Maybe a
Just a
a) StreamK m a
r
        in State StreamK m a
-> (a -> StreamK m a -> m (Maybe a))
-> (a -> m (Maybe a))
-> m (Maybe a)
-> StreamK m a
-> m (Maybe a)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe a)
yieldk a -> m (Maybe a)
forall {m :: * -> *} {a}. Monad m => a -> m (Maybe a)
single m (Maybe a)
forall {a}. m (Maybe a)
stop StreamK m a
m1

    go (Just a
res) StreamK m a
m1 =
        let stop :: m (Maybe a)
stop      = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
res)
            single :: a -> m (Maybe a)
single a
a  =
                if a
res a -> a -> Bool
forall a. Ord a => a -> a -> Bool
<= a
a
                then Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
res)
                else Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
            yieldk :: a -> StreamK m a -> m (Maybe a)
yieldk a
a StreamK m a
r =
                if a
res a -> a -> Bool
forall a. Ord a => a -> a -> Bool
<= a
a
                then Maybe a -> StreamK m a -> m (Maybe a)
go (a -> Maybe a
forall a. a -> Maybe a
Just a
res) StreamK m a
r
                else Maybe a -> StreamK m a -> m (Maybe a)
go (a -> Maybe a
forall a. a -> Maybe a
Just a
a) StreamK m a
r
        in State StreamK m a
-> (a -> StreamK m a -> m (Maybe a))
-> (a -> m (Maybe a))
-> m (Maybe a)
-> StreamK m a
-> m (Maybe a)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe a)
yieldk a -> m (Maybe a)
forall {m :: * -> *}. Monad m => a -> m (Maybe a)
single m (Maybe a)
stop StreamK m a
m1

{-# INLINE minimumBy #-}
minimumBy
    :: (Monad m)
    => (a -> a -> Ordering) -> StreamK m a -> m (Maybe a)
minimumBy :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> StreamK m a -> m (Maybe a)
minimumBy a -> a -> Ordering
cmp = Maybe a -> StreamK m a -> m (Maybe a)
forall {m :: * -> *}.
Monad m =>
Maybe a -> StreamK m a -> m (Maybe a)
go Maybe a
forall a. Maybe a
Nothing
    where
    go :: Maybe a -> StreamK m a -> m (Maybe a)
go Maybe a
Nothing StreamK m a
m1 =
        let stop :: m (Maybe a)
stop      = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
            single :: a -> m (Maybe a)
single a
a  = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
            yieldk :: a -> StreamK m a -> m (Maybe a)
yieldk a
a StreamK m a
r = Maybe a -> StreamK m a -> m (Maybe a)
go (a -> Maybe a
forall a. a -> Maybe a
Just a
a) StreamK m a
r
        in State StreamK m a
-> (a -> StreamK m a -> m (Maybe a))
-> (a -> m (Maybe a))
-> m (Maybe a)
-> StreamK m a
-> m (Maybe a)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe a)
yieldk a -> m (Maybe a)
forall {m :: * -> *} {a}. Monad m => a -> m (Maybe a)
single m (Maybe a)
forall {a}. m (Maybe a)
stop StreamK m a
m1

    go (Just a
res) StreamK m a
m1 =
        let stop :: m (Maybe a)
stop      = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
res)
            single :: a -> m (Maybe a)
single a
a  = case a -> a -> Ordering
cmp a
res a
a of
                Ordering
GT -> Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
                Ordering
_  -> Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
res)
            yieldk :: a -> StreamK m a -> m (Maybe a)
yieldk a
a StreamK m a
r = case a -> a -> Ordering
cmp a
res a
a of
                Ordering
GT -> Maybe a -> StreamK m a -> m (Maybe a)
go (a -> Maybe a
forall a. a -> Maybe a
Just a
a) StreamK m a
r
                Ordering
_  -> Maybe a -> StreamK m a -> m (Maybe a)
go (a -> Maybe a
forall a. a -> Maybe a
Just a
res) StreamK m a
r
        in State StreamK m a
-> (a -> StreamK m a -> m (Maybe a))
-> (a -> m (Maybe a))
-> m (Maybe a)
-> StreamK m a
-> m (Maybe a)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe a)
yieldk a -> m (Maybe a)
forall {m :: * -> *}. Monad m => a -> m (Maybe a)
single m (Maybe a)
stop StreamK m a
m1

{-# INLINE maximum #-}
maximum :: (Monad m, Ord a) => StreamK m a -> m (Maybe a)
maximum :: forall (m :: * -> *) a.
(Monad m, Ord a) =>
StreamK m a -> m (Maybe a)
maximum = Maybe a -> StreamK m a -> m (Maybe a)
forall {a} {m :: * -> *}.
(Ord a, Monad m) =>
Maybe a -> StreamK m a -> m (Maybe a)
go Maybe a
forall a. Maybe a
Nothing
    where
    go :: Maybe a -> StreamK m a -> m (Maybe a)
go Maybe a
Nothing StreamK m a
m1 =
        let stop :: m (Maybe a)
stop      = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
            single :: a -> m (Maybe a)
single a
a  = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
            yieldk :: a -> StreamK m a -> m (Maybe a)
yieldk a
a StreamK m a
r = Maybe a -> StreamK m a -> m (Maybe a)
go (a -> Maybe a
forall a. a -> Maybe a
Just a
a) StreamK m a
r
        in State StreamK m a
-> (a -> StreamK m a -> m (Maybe a))
-> (a -> m (Maybe a))
-> m (Maybe a)
-> StreamK m a
-> m (Maybe a)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe a)
yieldk a -> m (Maybe a)
forall {m :: * -> *} {a}. Monad m => a -> m (Maybe a)
single m (Maybe a)
forall {a}. m (Maybe a)
stop StreamK m a
m1

    go (Just a
res) StreamK m a
m1 =
        let stop :: m (Maybe a)
stop      = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
res)
            single :: a -> m (Maybe a)
single a
a  =
                if a
res a -> a -> Bool
forall a. Ord a => a -> a -> Bool
<= a
a
                then Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
                else Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
res)
            yieldk :: a -> StreamK m a -> m (Maybe a)
yieldk a
a StreamK m a
r =
                if a
res a -> a -> Bool
forall a. Ord a => a -> a -> Bool
<= a
a
                then Maybe a -> StreamK m a -> m (Maybe a)
go (a -> Maybe a
forall a. a -> Maybe a
Just a
a) StreamK m a
r
                else Maybe a -> StreamK m a -> m (Maybe a)
go (a -> Maybe a
forall a. a -> Maybe a
Just a
res) StreamK m a
r
        in State StreamK m a
-> (a -> StreamK m a -> m (Maybe a))
-> (a -> m (Maybe a))
-> m (Maybe a)
-> StreamK m a
-> m (Maybe a)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe a)
yieldk a -> m (Maybe a)
forall {m :: * -> *}. Monad m => a -> m (Maybe a)
single m (Maybe a)
stop StreamK m a
m1

{-# INLINE maximumBy #-}
maximumBy :: Monad m => (a -> a -> Ordering) -> StreamK m a -> m (Maybe a)
maximumBy :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> StreamK m a -> m (Maybe a)
maximumBy a -> a -> Ordering
cmp = Maybe a -> StreamK m a -> m (Maybe a)
forall {m :: * -> *}.
Monad m =>
Maybe a -> StreamK m a -> m (Maybe a)
go Maybe a
forall a. Maybe a
Nothing
    where
    go :: Maybe a -> StreamK m a -> m (Maybe a)
go Maybe a
Nothing StreamK m a
m1 =
        let stop :: m (Maybe a)
stop      = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
            single :: a -> m (Maybe a)
single a
a  = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
            yieldk :: a -> StreamK m a -> m (Maybe a)
yieldk a
a StreamK m a
r = Maybe a -> StreamK m a -> m (Maybe a)
go (a -> Maybe a
forall a. a -> Maybe a
Just a
a) StreamK m a
r
        in State StreamK m a
-> (a -> StreamK m a -> m (Maybe a))
-> (a -> m (Maybe a))
-> m (Maybe a)
-> StreamK m a
-> m (Maybe a)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe a)
yieldk a -> m (Maybe a)
forall {m :: * -> *} {a}. Monad m => a -> m (Maybe a)
single m (Maybe a)
forall {a}. m (Maybe a)
stop StreamK m a
m1

    go (Just a
res) StreamK m a
m1 =
        let stop :: m (Maybe a)
stop      = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
res)
            single :: a -> m (Maybe a)
single a
a  = case a -> a -> Ordering
cmp a
res a
a of
                Ordering
GT -> Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
res)
                Ordering
_  -> Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
            yieldk :: a -> StreamK m a -> m (Maybe a)
yieldk a
a StreamK m a
r = case a -> a -> Ordering
cmp a
res a
a of
                Ordering
GT -> Maybe a -> StreamK m a -> m (Maybe a)
go (a -> Maybe a
forall a. a -> Maybe a
Just a
res) StreamK m a
r
                Ordering
_  -> Maybe a -> StreamK m a -> m (Maybe a)
go (a -> Maybe a
forall a. a -> Maybe a
Just a
a) StreamK m a
r
        in State StreamK m a
-> (a -> StreamK m a -> m (Maybe a))
-> (a -> m (Maybe a))
-> m (Maybe a)
-> StreamK m a
-> m (Maybe a)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe a)
yieldk a -> m (Maybe a)
forall {m :: * -> *}. Monad m => a -> m (Maybe a)
single m (Maybe a)
stop StreamK m a
m1

{-# INLINE (!!) #-}
(!!) :: Monad m => StreamK m a -> Int -> m (Maybe a)
StreamK m a
m !! :: forall (m :: * -> *) a.
Monad m =>
StreamK m a -> Int -> m (Maybe a)
!! Int
i = Int -> StreamK m a -> m (Maybe a)
forall {t} {m :: * -> *} {a}.
(Ord t, Monad m, Num t) =>
t -> StreamK m a -> m (Maybe a)
go Int
i StreamK m a
m
    where
    go :: t -> StreamK m a -> m (Maybe a)
go t
n StreamK m a
m1 =
      let single :: a -> m (Maybe a)
single a
a | t
n t -> t -> Bool
forall a. Eq a => a -> a -> Bool
== t
0 = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> m (Maybe a)) -> Maybe a -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
a
                   | Bool
otherwise = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
          yieldk :: a -> StreamK m a -> m (Maybe a)
yieldk a
a StreamK m a
x | t
n t -> t -> Bool
forall a. Ord a => a -> a -> Bool
< t
0 = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
                     | t
n t -> t -> Bool
forall a. Eq a => a -> a -> Bool
== t
0 = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> m (Maybe a)) -> Maybe a -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
a
                     | Bool
otherwise = t -> StreamK m a -> m (Maybe a)
go (t
n t -> t -> t
forall a. Num a => a -> a -> a
- t
1) StreamK m a
x
      in State StreamK m a
-> (a -> StreamK m a -> m (Maybe a))
-> (a -> m (Maybe a))
-> m (Maybe a)
-> StreamK m a
-> m (Maybe a)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe a)
yieldk a -> m (Maybe a)
forall {m :: * -> *} {a}. Monad m => a -> m (Maybe a)
single (Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing) StreamK m a
m1

{-# INLINE lookup #-}
lookup :: (Monad m, Eq a) => a -> StreamK m (a, b) -> m (Maybe b)
lookup :: forall (m :: * -> *) a b.
(Monad m, Eq a) =>
a -> StreamK m (a, b) -> m (Maybe b)
lookup a
e = StreamK m (a, b) -> m (Maybe b)
forall {m :: * -> *} {a}.
Monad m =>
StreamK m (a, a) -> m (Maybe a)
go
    where
    go :: StreamK m (a, a) -> m (Maybe a)
go StreamK m (a, a)
m1 =
        let single :: (a, a) -> m (Maybe a)
single (a
a, a
b) | a
a a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
e = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> m (Maybe a)) -> Maybe a -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
b
                          | Bool
otherwise = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
            yieldk :: (a, a) -> StreamK m (a, a) -> m (Maybe a)
yieldk (a
a, a
b) StreamK m (a, a)
x | a
a a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
e = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> m (Maybe a)) -> Maybe a -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
b
                            | Bool
otherwise = StreamK m (a, a) -> m (Maybe a)
go StreamK m (a, a)
x
        in State StreamK m (a, a)
-> ((a, a) -> StreamK m (a, a) -> m (Maybe a))
-> ((a, a) -> m (Maybe a))
-> m (Maybe a)
-> StreamK m (a, a)
-> m (Maybe a)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m (a, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState (a, a) -> StreamK m (a, a) -> m (Maybe a)
yieldk (a, a) -> m (Maybe a)
forall {m :: * -> *} {a}. Monad m => (a, a) -> m (Maybe a)
single (Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing) StreamK m (a, a)
m1

{-# INLINE findM #-}
findM :: Monad m => (a -> m Bool) -> StreamK m a -> m (Maybe a)
findM :: forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> StreamK m a -> m (Maybe a)
findM a -> m Bool
p = StreamK m a -> m (Maybe a)
go
    where
    go :: StreamK m a -> m (Maybe a)
go StreamK m a
m1 =
        let single :: a -> m (Maybe a)
single a
a = do
                Bool
b <- a -> m Bool
p a
a
                if Bool
b then Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> m (Maybe a)) -> Maybe a -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
a else Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
            yieldk :: a -> StreamK m a -> m (Maybe a)
yieldk a
a StreamK m a
x = do
                Bool
b <- a -> m Bool
p a
a
                if Bool
b then Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> m (Maybe a)) -> Maybe a -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
a else StreamK m a -> m (Maybe a)
go StreamK m a
x
        in State StreamK m a
-> (a -> StreamK m a -> m (Maybe a))
-> (a -> m (Maybe a))
-> m (Maybe a)
-> StreamK m a
-> m (Maybe a)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe a)
yieldk a -> m (Maybe a)
single (Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing) StreamK m a
m1

{-# INLINE find #-}
find :: Monad m => (a -> Bool) -> StreamK m a -> m (Maybe a)
find :: forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> StreamK m a -> m (Maybe a)
find a -> Bool
p = (a -> m Bool) -> StreamK m a -> m (Maybe a)
forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> StreamK m a -> m (Maybe a)
findM (Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> m Bool) -> (a -> Bool) -> a -> m Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Bool
p)

{-# INLINE findIndices #-}
findIndices :: (a -> Bool) -> StreamK m a -> StreamK m Int
findIndices :: forall a (m :: * -> *). (a -> Bool) -> StreamK m a -> StreamK m Int
findIndices a -> Bool
p = Int -> StreamK m a -> StreamK m Int
forall {a} {m :: * -> *}. Num a => a -> StreamK m a -> StreamK m a
go Int
0
    where
    go :: a -> StreamK m a -> StreamK m a
go a
offset StreamK m a
m1 = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let single :: a -> m r
single a
a | a -> Bool
p a
a = a -> m r
sng a
offset
                     | Bool
otherwise = m r
stp
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
x | a -> Bool
p a
a = a -> StreamK m a -> m r
yld a
offset (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ a -> StreamK m a -> StreamK m a
go (a
offset a -> a -> a
forall a. Num a => a -> a -> a
+ a
1) StreamK m a
x
                       | Bool
otherwise = State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st) a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$
                            a -> StreamK m a -> StreamK m a
go (a
offset a -> a -> a
forall a. Num a => a -> a -> a
+ a
1) StreamK m a
x
        in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st) a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
m1

------------------------------------------------------------------------------
-- Map and Fold
------------------------------------------------------------------------------

-- | Apply a monadic action to each element of the stream and discard the
-- output of the action.
{-# INLINE mapM_ #-}
mapM_ :: Monad m => (a -> m b) -> StreamK m a -> m ()
mapM_ :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> StreamK m a -> m ()
mapM_ a -> m b
f = StreamK m a -> m ()
go
    where
    go :: StreamK m a -> m ()
go StreamK m a
m1 =
        let stop :: m ()
stop = () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            single :: a -> m ()
single a
a = m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (a -> m b
f a
a)
            yieldk :: a -> StreamK m a -> m ()
yieldk a
a StreamK m a
r = a -> m b
f a
a m b -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> StreamK m a -> m ()
go StreamK m a
r
         in State StreamK m a
-> (a -> StreamK m a -> m ())
-> (a -> m ())
-> m ()
-> StreamK m a
-> m ()
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m ()
yieldk a -> m ()
single m ()
stop StreamK m a
m1

{-# INLINE mapM #-}
mapM :: Monad m => (a -> m b) -> StreamK m a -> StreamK m b
mapM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> StreamK m a -> StreamK m b
mapM = (m b -> StreamK m b -> StreamK m b)
-> (a -> m b) -> StreamK m a -> StreamK m b
forall (m :: * -> *) b a.
(m b -> StreamK m b -> StreamK m b)
-> (a -> m b) -> StreamK m a -> StreamK m b
mapMWith m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
consM

------------------------------------------------------------------------------
-- Converting folds
------------------------------------------------------------------------------

{-# INLINABLE toList #-}
toList :: Monad m => StreamK m a -> m [a]
toList :: forall (m :: * -> *) a. Monad m => StreamK m a -> m [a]
toList = (a -> [a] -> [a]) -> [a] -> StreamK m a -> m [a]
forall (m :: * -> *) a b.
Monad m =>
(a -> b -> b) -> b -> StreamK m a -> m b
foldr (:) []

-- Based on suggestions by David Feuer and Pranay Sashank
{-# INLINE morphInner #-}
morphInner, hoist :: (Monad m, Monad n)
    => (forall x. m x -> n x) -> StreamK m a -> StreamK n a
morphInner :: forall (m :: * -> *) (n :: * -> *) a.
(Monad m, Monad n) =>
(forall x. m x -> n x) -> StreamK m a -> StreamK n a
morphInner forall x. m x -> n x
f StreamK m a
str =
    (forall r.
 State StreamK n a
 -> (a -> StreamK n a -> n r) -> (a -> n r) -> n r -> n r)
-> StreamK n a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK n a
  -> (a -> StreamK n a -> n r) -> (a -> n r) -> n r -> n r)
 -> StreamK n a)
-> (forall r.
    State StreamK n a
    -> (a -> StreamK n a -> n r) -> (a -> n r) -> n r -> n r)
-> StreamK n a
forall a b. (a -> b) -> a -> b
$ \State StreamK n a
st a -> StreamK n a -> n r
yld a -> n r
sng n r
stp ->
            let single :: a -> m (n r)
single = n r -> m (n r)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (n r -> m (n r)) -> (a -> n r) -> a -> m (n r)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> n r
sng
                yieldk :: a -> StreamK m a -> m (n r)
yieldk a
a StreamK m a
s = n r -> m (n r)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (n r -> m (n r)) -> n r -> m (n r)
forall a b. (a -> b) -> a -> b
$ a -> StreamK n a -> n r
yld a
a ((forall x. m x -> n x) -> StreamK m a -> StreamK n a
forall (m :: * -> *) (n :: * -> *) a.
(Monad m, Monad n) =>
(forall x. m x -> n x) -> StreamK m a -> StreamK n a
hoist m x -> n x
forall x. m x -> n x
f StreamK m a
s)
                stop :: m (n r)
stop = n r -> m (n r)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return n r
stp
                state :: State StreamK n b
state = State StreamK n a -> State StreamK n b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK n a
st
             in n (n r) -> n r
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (n (n r) -> n r) -> (m (n r) -> n (n r)) -> m (n r) -> n r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m (n r) -> n (n r)
forall x. m x -> n x
f (m (n r) -> n r) -> m (n r) -> n r
forall a b. (a -> b) -> a -> b
$ State StreamK m a
-> (a -> StreamK m a -> m (n r))
-> (a -> m (n r))
-> m (n r)
-> StreamK m a
-> m (n r)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m a
forall {n :: * -> *} {b}. State StreamK n b
state a -> StreamK m a -> m (n r)
forall {m :: * -> *}. Monad m => a -> StreamK m a -> m (n r)
yieldk a -> m (n r)
single m (n r)
stop StreamK m a
str
RENAME(hoist,morphInner)

-------------------------------------------------------------------------------
-- Transformation by folding (Scans)
-------------------------------------------------------------------------------

{-# INLINE scanlx' #-}
scanlx' :: (x -> a -> x) -> x -> (x -> b) -> StreamK m a -> StreamK m b
scanlx' :: forall x a b (m :: * -> *).
(x -> a -> x) -> x -> (x -> b) -> StreamK m a -> StreamK m b
scanlx' x -> a -> x
step x
begin x -> b
done StreamK m a
m =
    b -> StreamK m b -> StreamK m b
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons (x -> b
done x
begin) (StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m b
forall a b. (a -> b) -> a -> b
$ StreamK m a -> x -> StreamK m b
forall {m :: * -> *}. StreamK m a -> x -> StreamK m b
go StreamK m a
m x
begin
    where
    go :: StreamK m a -> x -> StreamK m b
go StreamK m a
m1 !x
acc = (forall r.
 State StreamK m b
 -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m b
  -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
 -> StreamK m b)
-> (forall r.
    State StreamK m b
    -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall a b. (a -> b) -> a -> b
$ \State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp ->
        let single :: a -> m r
single a
a = b -> m r
sng (x -> b
done (x -> b) -> x -> b
forall a b. (a -> b) -> a -> b
$ x -> a -> x
step x
acc a
a)
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r =
                let s :: x
s = x -> a -> x
step x
acc a
a
                in b -> StreamK m b -> m r
yld (x -> b
done x
s) (StreamK m a -> x -> StreamK m b
go StreamK m a
r x
s)
        in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m b -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m b
st) a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
m1

{-# INLINE scanl' #-}
scanl' :: (b -> a -> b) -> b -> StreamK m a -> StreamK m b
scanl' :: forall b a (m :: * -> *).
(b -> a -> b) -> b -> StreamK m a -> StreamK m b
scanl' b -> a -> b
step b
begin = (b -> a -> b) -> b -> (b -> b) -> StreamK m a -> StreamK m b
forall x a b (m :: * -> *).
(x -> a -> x) -> x -> (x -> b) -> StreamK m a -> StreamK m b
scanlx' b -> a -> b
step b
begin b -> b
forall a. a -> a
id

-------------------------------------------------------------------------------
-- Filtering
-------------------------------------------------------------------------------

{-# INLINE filter #-}
filter :: (a -> Bool) -> StreamK m a -> StreamK m a
filter :: forall a (m :: * -> *). (a -> Bool) -> StreamK m a -> StreamK m a
filter a -> Bool
p = StreamK m a -> StreamK m a
forall {m :: * -> *}. StreamK m a -> StreamK m a
go
    where
    go :: StreamK m a -> StreamK m a
go StreamK m a
m1 = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let single :: a -> m r
single a
a   | a -> Bool
p a
a       = a -> m r
sng a
a
                       | Bool
otherwise = m r
stp
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r | a -> Bool
p a
a       = a -> StreamK m a -> m r
yld a
a (StreamK m a -> StreamK m a
go StreamK m a
r)
                       | Bool
otherwise = State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
r
         in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
m1

{-# INLINE take #-}
take :: Int -> StreamK m a -> StreamK m a
take :: forall (m :: * -> *) a. Int -> StreamK m a -> StreamK m a
take = Int -> StreamK m a -> StreamK m a
forall {t} {m :: * -> *} {a}.
(Ord t, Num t) =>
t -> StreamK m a -> StreamK m a
go
    where
    go :: t -> StreamK m a -> StreamK m a
go t
n1 StreamK m a
m1 = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = a -> StreamK m a -> m r
yld a
a (t -> StreamK m a -> StreamK m a
go (t
n1 t -> t -> t
forall a. Num a => a -> a -> a
- t
1) StreamK m a
r)
        in if t
n1 t -> t -> Bool
forall a. Ord a => a -> a -> Bool
<= t
0
           then m r
stp
           else State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
sng m r
stp StreamK m a
m1

{-# INLINE takeWhile #-}
takeWhile :: (a -> Bool) -> StreamK m a -> StreamK m a
takeWhile :: forall a (m :: * -> *). (a -> Bool) -> StreamK m a -> StreamK m a
takeWhile a -> Bool
p = StreamK m a -> StreamK m a
forall {m :: * -> *}. StreamK m a -> StreamK m a
go
    where
    go :: StreamK m a -> StreamK m a
go StreamK m a
m1 = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let single :: a -> m r
single a
a   | a -> Bool
p a
a       = a -> m r
sng a
a
                       | Bool
otherwise = m r
stp
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r | a -> Bool
p a
a       = a -> StreamK m a -> m r
yld a
a (StreamK m a -> StreamK m a
go StreamK m a
r)
                       | Bool
otherwise = m r
stp
         in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
m1

{-# INLINE drop #-}
drop :: Int -> StreamK m a -> StreamK m a
drop :: forall (m :: * -> *) a. Int -> StreamK m a -> StreamK m a
drop Int
n StreamK m a
m = StreamK m a -> StreamK m a
forall (m :: * -> *) a. StreamK m a -> StreamK m a
unShare (Int -> StreamK m a -> StreamK m a
forall {t} {m :: * -> *} {a}.
(Ord t, Num t) =>
t -> StreamK m a -> StreamK m a
go Int
n StreamK m a
m)
    where
    go :: t -> StreamK m a -> StreamK m a
go t
n1 StreamK m a
m1 = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let single :: p -> m r
single p
_ = m r
stp
            yieldk :: p -> StreamK m a -> m r
yieldk p
_ StreamK m a
r = State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ t -> StreamK m a -> StreamK m a
go (t
n1 t -> t -> t
forall a. Num a => a -> a -> a
- t
1) StreamK m a
r
        -- Somehow "<=" check performs better than a ">"
        in if t
n1 t -> t -> Bool
forall a. Ord a => a -> a -> Bool
<= t
0
           then State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
m1
           else State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
forall {p}. p -> StreamK m a -> m r
yieldk a -> m r
forall {p}. p -> m r
single m r
stp StreamK m a
m1

{-# INLINE dropWhile #-}
dropWhile :: (a -> Bool) -> StreamK m a -> StreamK m a
dropWhile :: forall a (m :: * -> *). (a -> Bool) -> StreamK m a -> StreamK m a
dropWhile a -> Bool
p = StreamK m a -> StreamK m a
forall {m :: * -> *}. StreamK m a -> StreamK m a
go
    where
    go :: StreamK m a -> StreamK m a
go StreamK m a
m1 = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let single :: a -> m r
single a
a   | a -> Bool
p a
a       = m r
stp
                       | Bool
otherwise = a -> m r
sng a
a
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r | a -> Bool
p a
a = State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
r
                       | Bool
otherwise = a -> StreamK m a -> m r
yld a
a StreamK m a
r
         in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
m1

-------------------------------------------------------------------------------
-- Mapping
-------------------------------------------------------------------------------

-- Be careful when modifying this, this uses a consM (|:) deliberately to allow
-- other stream types to overload it.
{-# INLINE sequence #-}
sequence :: Monad m => StreamK m (m a) -> StreamK m a
sequence :: forall (m :: * -> *) a. Monad m => StreamK m (m a) -> StreamK m a
sequence = StreamK m (m a) -> StreamK m a
forall (m :: * -> *) a. Monad m => StreamK m (m a) -> StreamK m a
go
    where
    go :: StreamK m (m a) -> StreamK m a
go StreamK m (m a)
m1 = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let single :: m a -> m r
single m a
ma = m a
ma m a -> (a -> m r) -> m r
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> m r
sng
            yieldk :: m a -> StreamK m (m a) -> m r
yieldk m a
ma StreamK m (m a)
r = State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ m a
ma m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
`consM` StreamK m (m a) -> StreamK m a
go StreamK m (m a)
r
         in State StreamK m (m a)
-> (m a -> StreamK m (m a) -> m r)
-> (m a -> m r)
-> m r
-> StreamK m (m a)
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m a -> State StreamK m (m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st) m a -> StreamK m (m a) -> m r
yieldk m a -> m r
single m r
stp StreamK m (m a)
m1

-------------------------------------------------------------------------------
-- Inserting
-------------------------------------------------------------------------------

{-# INLINE intersperseM #-}
intersperseM :: Monad m => m a -> StreamK m a -> StreamK m a
intersperseM :: forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
intersperseM m a
a = StreamK m a -> StreamK m a
prependingStart
    where
    prependingStart :: StreamK m a -> StreamK m a
prependingStart StreamK m a
m1 = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let yieldk :: a -> StreamK m a -> m r
yieldk a
i StreamK m a
x =
                State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
i m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
`consM` StreamK m a -> StreamK m a
go StreamK m a
x
         in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
sng m r
stp StreamK m a
m1
    go :: StreamK m a -> StreamK m a
go StreamK m a
m2 = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let single :: a -> m r
single a
i = State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ m a
a m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
`consM` a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a
fromPure a
i
            yieldk :: a -> StreamK m a -> m r
yieldk a
i StreamK m a
x =
                State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared
                    State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ m a
a m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
`consM` a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
i m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
`consM` StreamK m a -> StreamK m a
go StreamK m a
x
         in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
m2

{-# INLINE intersperse #-}
intersperse :: Monad m => a -> StreamK m a -> StreamK m a
intersperse :: forall (m :: * -> *) a. Monad m => a -> StreamK m a -> StreamK m a
intersperse a
a = m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
intersperseM (a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a)

{-# INLINE insertBy #-}
insertBy :: (a -> a -> Ordering) -> a -> StreamK m a -> StreamK m a
insertBy :: forall a (m :: * -> *).
(a -> a -> Ordering) -> a -> StreamK m a -> StreamK m a
insertBy a -> a -> Ordering
cmp a
x = StreamK m a -> StreamK m a
forall {m :: * -> *}. StreamK m a -> StreamK m a
go
  where
    go :: StreamK m a -> StreamK m a
go StreamK m a
m1 = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
_ m r
_ ->
        let single :: a -> m r
single a
a = case a -> a -> Ordering
cmp a
x a
a of
                Ordering
GT -> a -> StreamK m a -> m r
yld a
a (a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a
fromPure a
x)
                Ordering
_  -> a -> StreamK m a -> m r
yld a
x (a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a
fromPure a
a)
            stop :: m r
stop = a -> StreamK m a -> m r
yld a
x StreamK m a
forall (m :: * -> *) a. StreamK m a
nil
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = case a -> a -> Ordering
cmp a
x a
a of
                Ordering
GT -> a -> StreamK m a -> m r
yld a
a (StreamK m a -> StreamK m a
go StreamK m a
r)
                Ordering
_  -> a -> StreamK m a -> m r
yld a
x (a
a a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m a
r)
         in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single m r
stop StreamK m a
m1

------------------------------------------------------------------------------
-- Deleting
------------------------------------------------------------------------------

{-# INLINE deleteBy #-}
deleteBy :: (a -> a -> Bool) -> a -> StreamK m a -> StreamK m a
deleteBy :: forall a (m :: * -> *).
(a -> a -> Bool) -> a -> StreamK m a -> StreamK m a
deleteBy a -> a -> Bool
eq a
x = StreamK m a -> StreamK m a
forall {m :: * -> *}. StreamK m a -> StreamK m a
go
  where
    go :: StreamK m a -> StreamK m a
go StreamK m a
m1 = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let single :: a -> m r
single a
a = if a -> a -> Bool
eq a
x a
a then m r
stp else a -> m r
sng a
a
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = if a -> a -> Bool
eq a
x a
a
              then State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
r
              else a -> StreamK m a -> m r
yld a
a (StreamK m a -> StreamK m a
go StreamK m a
r)
         in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
m1

-------------------------------------------------------------------------------
-- Map and Filter
-------------------------------------------------------------------------------

{-# INLINE mapMaybe #-}
mapMaybe :: (a -> Maybe b) -> StreamK m a -> StreamK m b
mapMaybe :: forall a b (m :: * -> *).
(a -> Maybe b) -> StreamK m a -> StreamK m b
mapMaybe a -> Maybe b
f = StreamK m a -> StreamK m b
forall {m :: * -> *}. StreamK m a -> StreamK m b
go
  where
    go :: StreamK m a -> StreamK m b
go StreamK m a
m1 = (forall r.
 State StreamK m b
 -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m b
  -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
 -> StreamK m b)
-> (forall r.
    State StreamK m b
    -> (b -> StreamK m b -> m r) -> (b -> m r) -> m r -> m r)
-> StreamK m b
forall a b. (a -> b) -> a -> b
$ \State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp ->
        let single :: a -> m r
single a
a = m r -> (b -> m r) -> Maybe b -> m r
forall b a. b -> (a -> b) -> Maybe a -> b
maybe m r
stp b -> m r
sng (a -> Maybe b
f a
a)
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = case a -> Maybe b
f a
a of
                Just b
b  -> b -> StreamK m b -> m r
yld b
b (StreamK m b -> m r) -> StreamK m b -> m r
forall a b. (a -> b) -> a -> b
$ StreamK m a -> StreamK m b
go StreamK m a
r
                Maybe b
Nothing -> State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m b -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m b
st) a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
r
        in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m b -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m b
st) a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
m1

-------------------------------------------------------------------------------
-- Exception Handling
-------------------------------------------------------------------------------

-- | Like Streamly.Data.Stream.'Streamly.Data.Stream.handle' but with one
-- significant difference, this function observes exceptions from the consumer
-- of the stream as well.
--
-- You can also convert 'StreamK' to 'Stream' and use exception handling from
-- 'Stream' module:
--
-- >>> handle f s = StreamK.fromStream $ Stream.handle (\e -> StreamK.toStream (f e)) (StreamK.toStream s)
--
{-# INLINABLE handle #-}
handle :: (MonadCatch m, Exception e)
    => (e -> m (StreamK m a)) -> StreamK m a -> StreamK m a
handle :: forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> m (StreamK m a)) -> StreamK m a -> StreamK m a
handle e -> m (StreamK m a)
f StreamK m a
stream = StreamK m a -> StreamK m a
go StreamK m a
stream

    where

    go :: StreamK m a -> StreamK m a
go StreamK m a
m1 = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = a -> StreamK m a -> m r
yld a
a (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ StreamK m a -> StreamK m a
go StreamK m a
r
        in do
            Either e r
res <- m r -> m (Either e r)
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try (State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st) a -> StreamK m a -> m r
yieldk a -> m r
sng m r
stp StreamK m a
m1)
            case Either e r
res of
                Right r
r -> r -> m r
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return r
r
                Left e
e -> do
                    StreamK m a
r <- e -> m (StreamK m a)
f e
e
                    State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st) a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
r

-------------------------------------------------------------------------------
-- Resource Management
-------------------------------------------------------------------------------

-- If we are folding the stream and we do not drain the entire stream (e.g. if
-- the fold terminates before the stream) then the finalizer will run on GC.
--
-- XXX To implement a prompt cleanup, we will have to yield a cleanup function
-- via the yield continuation. A chain of cleanup functions can be built and
-- the entire chain can be invoked when the stream ends voluntarily or if
-- someone decides to abandon the stream.

-- | Like Streamly.Data.Stream.'Streamly.Data.Stream.bracketIO' but with one
-- significant difference, this function observes exceptions from the consumer
-- of the stream as well. Therefore, it cleans up the resource promptly when
-- the consumer encounters an exception.
--
-- You can also convert 'StreamK' to 'Stream' and use resource handling from
-- 'Stream' module:
--
-- >>> bracketIO bef aft bet = StreamK.fromStream $ Stream.bracketIO bef aft (StreamK.toStream . bet)
--
{-# INLINABLE bracketIO #-}
bracketIO :: (MonadIO m, MonadCatch m)
    => IO b -> (b -> IO c) -> (b -> StreamK m a) -> StreamK m a
bracketIO :: forall (m :: * -> *) b c a.
(MonadIO m, MonadCatch m) =>
IO b -> (b -> IO c) -> (b -> StreamK m a) -> StreamK m a
bracketIO IO b
bef b -> IO c
aft b -> StreamK m a
bet =
    m (StreamK m a) -> StreamK m a
forall (m :: * -> *) a. Monad m => m (StreamK m a) -> StreamK m a
concatEffect (m (StreamK m a) -> StreamK m a) -> m (StreamK m a) -> StreamK m a
forall a b. (a -> b) -> a -> b
$ do
        (b
r, IOFinalizer
ref) <- IO (b, IOFinalizer) -> m (b, IOFinalizer)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (b, IOFinalizer) -> m (b, IOFinalizer))
-> IO (b, IOFinalizer) -> m (b, IOFinalizer)
forall a b. (a -> b) -> a -> b
$ IO (b, IOFinalizer) -> IO (b, IOFinalizer)
forall a. IO a -> IO a
mask_ (IO (b, IOFinalizer) -> IO (b, IOFinalizer))
-> IO (b, IOFinalizer) -> IO (b, IOFinalizer)
forall a b. (a -> b) -> a -> b
$ do
            b
r <- IO b
bef
            IOFinalizer
ref <- IO c -> IO IOFinalizer
forall (m :: * -> *) a. MonadIO m => IO a -> m IOFinalizer
newIOFinalizer (b -> IO c
aft b
r)
            (b, IOFinalizer) -> IO (b, IOFinalizer)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (b
r, IOFinalizer
ref)
        StreamK m a -> m (StreamK m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamK m a -> m (StreamK m a)) -> StreamK m a -> m (StreamK m a)
forall a b. (a -> b) -> a -> b
$ IOFinalizer -> StreamK m a -> StreamK m a
forall {m :: * -> *} {a}.
(MonadCatch m, MonadIO m) =>
IOFinalizer -> StreamK m a -> StreamK m a
go IOFinalizer
ref (b -> StreamK m a
bet b
r)

    where

    go :: IOFinalizer -> StreamK m a -> StreamK m a
go IOFinalizer
ref StreamK m a
m1 = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let
            -- We can discard exceptions on continuations to make it equivalent
            -- to StreamD, but it seems like a desirable behavior.
            stop :: m r
stop = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IOFinalizer -> IO ()
forall (m :: * -> *). MonadIO m => IOFinalizer -> m ()
runIOFinalizer IOFinalizer
ref) m () -> m r -> m r
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m r
stp
            single :: a -> m r
single a
a = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IOFinalizer -> IO ()
forall (m :: * -> *). MonadIO m => IOFinalizer -> m ()
runIOFinalizer IOFinalizer
ref) m () -> m r -> m r
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> m r
sng a
a
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = a -> StreamK m a -> m r
yld a
a (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ IOFinalizer -> StreamK m a -> StreamK m a
go IOFinalizer
ref StreamK m a
r
        in do
            -- Do not call the finalizer twice if it has already been
            -- called via stop continuation and stop continuation itself
            -- generated an exception. runIOFinalizer takes care of that.
            Either SomeException r
res <- m r -> m (Either SomeException r)
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try (State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st) a -> StreamK m a -> m r
yieldk a -> m r
single m r
stop StreamK m a
m1)
            case Either SomeException r
res of
                Right r
r -> r -> m r
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return r
r
                Left (SomeException
e :: MC.SomeException) ->
                    IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IOFinalizer -> IO ()
forall (m :: * -> *). MonadIO m => IOFinalizer -> m ()
runIOFinalizer IOFinalizer
ref) m () -> m r -> m r
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> m r
forall e a. (HasCallStack, Exception e) => e -> m a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
MC.throwM SomeException
e

------------------------------------------------------------------------------
-- Serial Zipping
------------------------------------------------------------------------------

-- | Zipping of @n@ streams can be performed by combining the streams pair
-- wise using 'mergeMapWith' with O(n * log n) time complexity. If used
-- with 'concatMapWith' it will have O(n^2) performance.
{-# INLINE zipWith #-}
zipWith :: Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c
zipWith :: forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c
zipWith a -> b -> c
f = (a -> b -> m c) -> StreamK m a -> StreamK m b -> StreamK m c
forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> StreamK m a -> StreamK m b -> StreamK m c
zipWithM (\a
a b
b -> c -> m c
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b -> c
f a
a b
b))

{-# INLINE zipWithM #-}
zipWithM :: Monad m =>
    (a -> b -> m c) -> StreamK m a -> StreamK m b -> StreamK m c
zipWithM :: forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> StreamK m a -> StreamK m b -> StreamK m c
zipWithM a -> b -> m c
f = StreamK m a -> StreamK m b -> StreamK m c
go

    where

    go :: StreamK m a -> StreamK m b -> StreamK m c
go StreamK m a
mx StreamK m b
my = (forall r.
 State StreamK m c
 -> (c -> StreamK m c -> m r) -> (c -> m r) -> m r -> m r)
-> StreamK m c
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m c
  -> (c -> StreamK m c -> m r) -> (c -> m r) -> m r -> m r)
 -> StreamK m c)
-> (forall r.
    State StreamK m c
    -> (c -> StreamK m c -> m r) -> (c -> m r) -> m r -> m r)
-> StreamK m c
forall a b. (a -> b) -> a -> b
$ \State StreamK m c
st c -> StreamK m c -> m r
yld c -> m r
sng m r
stp -> do
        let merge :: a -> StreamK m a -> m r
merge a
a StreamK m a
ra =
                let single2 :: b -> m r
single2 b
b   = a -> b -> m c
f a
a b
b m c -> (c -> m r) -> m r
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= c -> m r
sng
                    yield2 :: b -> StreamK m b -> m r
yield2 b
b StreamK m b
rb = a -> b -> m c
f a
a b
b m c -> (c -> m r) -> m r
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \c
x -> c -> StreamK m c -> m r
yld c
x (StreamK m a -> StreamK m b -> StreamK m c
go StreamK m a
ra StreamK m b
rb)
                 in State StreamK m b
-> (b -> StreamK m b -> m r)
-> (b -> m r)
-> m r
-> StreamK m b
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m c -> State StreamK m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m c
st) b -> StreamK m b -> m r
yield2 b -> m r
single2 m r
stp StreamK m b
my
        let single1 :: a -> m r
single1 a
a = a -> StreamK m a -> m r
merge a
a StreamK m a
forall (m :: * -> *) a. StreamK m a
nil
            yield1 :: a -> StreamK m a -> m r
yield1 = a -> StreamK m a -> m r
merge
        State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (State StreamK m c -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m c
st) a -> StreamK m a -> m r
yield1 a -> m r
single1 m r
stp StreamK m a
mx

------------------------------------------------------------------------------
-- Merging
------------------------------------------------------------------------------

{-# INLINE mergeByM #-}
mergeByM :: Monad m =>
    (a -> a -> m Ordering) -> StreamK m a -> StreamK m a -> StreamK m a
mergeByM :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> m Ordering) -> StreamK m a -> StreamK m a -> StreamK m a
mergeByM a -> a -> m Ordering
cmp = StreamK m a -> StreamK m a -> StreamK m a
go

    where

    go :: StreamK m a -> StreamK m a -> StreamK m a
go StreamK m a
mx StreamK m a
my = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
        let stop :: m r
stop = State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
my
            single :: a -> m r
single a
x = State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (a -> StreamK m a -> StreamK m a
goX0 a
x StreamK m a
my)
            yield :: a -> StreamK m a -> m r
yield a
x StreamK m a
rx = State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (a -> StreamK m a -> StreamK m a -> StreamK m a
goX a
x StreamK m a
rx StreamK m a
my)
        State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yield a -> m r
single m r
stop StreamK m a
mx

    goX0 :: a -> StreamK m a -> StreamK m a
goX0 a
x StreamK m a
my = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
_ -> do
        let stop :: m r
stop = a -> m r
sng a
x
            single :: a -> m r
single a
y = do
                Ordering
r <- a -> a -> m Ordering
cmp a
x a
y
                case Ordering
r of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a
fromPure a
x)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a
fromPure a
y)
            yield :: a -> StreamK m a -> m r
yield a
y StreamK m a
ry = do
                Ordering
r <- a -> a -> m Ordering
cmp a
x a
y
                case Ordering
r of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a -> StreamK m a -> StreamK m a
goX0 a
x StreamK m a
ry)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (a
y a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m a
ry)
         in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yield a -> m r
single m r
stop StreamK m a
my

    goX :: a -> StreamK m a -> StreamK m a -> StreamK m a
goX a
x StreamK m a
mx StreamK m a
my = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
_ m r
_ -> do
        let stop :: m r
stop = a -> StreamK m a -> m r
yld a
x StreamK m a
mx
            single :: a -> m r
single a
y = do
                Ordering
r <- a -> a -> m Ordering
cmp a
x a
y
                case Ordering
r of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a
x a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m a
mx)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (StreamK m a -> a -> StreamK m a
goY0 StreamK m a
mx a
y)
            yield :: a -> StreamK m a -> m r
yield a
y StreamK m a
ry = do
                Ordering
r <- a -> a -> m Ordering
cmp a
x a
y
                case Ordering
r of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a -> StreamK m a -> StreamK m a -> StreamK m a
goX a
x StreamK m a
mx StreamK m a
ry)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (StreamK m a -> a -> StreamK m a -> StreamK m a
goY StreamK m a
mx a
y StreamK m a
ry)
         in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yield a -> m r
single m r
stop StreamK m a
my

    goY0 :: StreamK m a -> a -> StreamK m a
goY0 StreamK m a
mx a
y = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
_ -> do
        let stop :: m r
stop = a -> m r
sng a
y
            single :: a -> m r
single a
x = do
                Ordering
r <- a -> a -> m Ordering
cmp a
x a
y
                case Ordering
r of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a
fromPure a
x)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a
fromPure a
y)
            yield :: a -> StreamK m a -> m r
yield a
x StreamK m a
rx = do
                Ordering
r <- a -> a -> m Ordering
cmp a
x a
y
                case Ordering
r of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a
x a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m a
rx)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (StreamK m a -> a -> StreamK m a
goY0 StreamK m a
rx a
y)
         in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yield a -> m r
single m r
stop StreamK m a
mx

    goY :: StreamK m a -> a -> StreamK m a -> StreamK m a
goY StreamK m a
mx a
y StreamK m a
my = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
_ m r
_ -> do
        let stop :: m r
stop = a -> StreamK m a -> m r
yld a
y StreamK m a
my
            single :: a -> m r
single a
x = do
                Ordering
r <- a -> a -> m Ordering
cmp a
x a
y
                case Ordering
r of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a -> StreamK m a -> StreamK m a
goX0 a
x StreamK m a
my)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (a
y a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m a
my)
            yield :: a -> StreamK m a -> m r
yield a
x StreamK m a
rx = do
                Ordering
r <- a -> a -> m Ordering
cmp a
x a
y
                case Ordering
r of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a -> StreamK m a -> StreamK m a -> StreamK m a
goX a
x StreamK m a
rx StreamK m a
my)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (StreamK m a -> a -> StreamK m a -> StreamK m a
goY StreamK m a
rx a
y StreamK m a
my)
         in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yield a -> m r
single m r
stop StreamK m a
mx

-- | Merging of @n@ streams can be performed by combining the streams pair
-- wise using 'mergeMapWith' to give O(n * log n) time complexity. If used
-- with 'concatMapWith' it will have O(n^2) performance.
--
{-# INLINE mergeBy #-}
mergeBy :: (a -> a -> Ordering) -> StreamK m a -> StreamK m a -> StreamK m a
-- XXX GHC: This has slightly worse performance than replacing "r <- cmp x y"
-- with "let r = cmp x y" in the monadic version. The definition below is
-- exactly the same as mergeByM except this change.
-- mergeBy cmp = mergeByM (\a b -> return $ cmp a b)
mergeBy :: forall a (m :: * -> *).
(a -> a -> Ordering) -> StreamK m a -> StreamK m a -> StreamK m a
mergeBy a -> a -> Ordering
cmp = StreamK m a -> StreamK m a -> StreamK m a
forall {m :: * -> *}. StreamK m a -> StreamK m a -> StreamK m a
go

    where

    go :: StreamK m a -> StreamK m a -> StreamK m a
go StreamK m a
mx StreamK m a
my = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
        let stop :: m r
stop = State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
my
            single :: a -> m r
single a
x = State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (a -> StreamK m a -> StreamK m a
forall {m :: * -> *}. a -> StreamK m a -> StreamK m a
goX0 a
x StreamK m a
my)
            yield :: a -> StreamK m a -> m r
yield a
x StreamK m a
rx = State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (a -> StreamK m a -> StreamK m a -> StreamK m a
forall {m :: * -> *}.
a -> StreamK m a -> StreamK m a -> StreamK m a
goX a
x StreamK m a
rx StreamK m a
my)
        State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yield a -> m r
single m r
stop StreamK m a
mx

    goX0 :: a -> StreamK m a -> StreamK m a
goX0 a
x StreamK m a
my = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
_ -> do
        let stop :: m r
stop = a -> m r
sng a
x
            single :: a -> m r
single a
y = do
                case a -> a -> Ordering
cmp a
x a
y of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a
fromPure a
x)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a
fromPure a
y)
            yield :: a -> StreamK m a -> m r
yield a
y StreamK m a
ry = do
                case a -> a -> Ordering
cmp a
x a
y of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a -> StreamK m a -> StreamK m a
goX0 a
x StreamK m a
ry)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (a
y a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m a
ry)
         in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yield a -> m r
single m r
stop StreamK m a
my

    goX :: a -> StreamK m a -> StreamK m a -> StreamK m a
goX a
x StreamK m a
mx StreamK m a
my = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
_ m r
_ -> do
        let stop :: m r
stop = a -> StreamK m a -> m r
yld a
x StreamK m a
mx
            single :: a -> m r
single a
y = do
                case a -> a -> Ordering
cmp a
x a
y of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a
x a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m a
mx)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (StreamK m a -> a -> StreamK m a
forall {m :: * -> *}. StreamK m a -> a -> StreamK m a
goY0 StreamK m a
mx a
y)
            yield :: a -> StreamK m a -> m r
yield a
y StreamK m a
ry = do
                case a -> a -> Ordering
cmp a
x a
y of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a -> StreamK m a -> StreamK m a -> StreamK m a
goX a
x StreamK m a
mx StreamK m a
ry)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (StreamK m a -> a -> StreamK m a -> StreamK m a
goY StreamK m a
mx a
y StreamK m a
ry)
         in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yield a -> m r
single m r
stop StreamK m a
my

    goY0 :: StreamK m a -> a -> StreamK m a
goY0 StreamK m a
mx a
y = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
_ -> do
        let stop :: m r
stop = a -> m r
sng a
y
            single :: a -> m r
single a
x = do
                case a -> a -> Ordering
cmp a
x a
y of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a
fromPure a
x)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a
fromPure a
y)
            yield :: a -> StreamK m a -> m r
yield a
x StreamK m a
rx = do
                case a -> a -> Ordering
cmp a
x a
y of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a
x a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m a
rx)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (StreamK m a -> a -> StreamK m a
goY0 StreamK m a
rx a
y)
         in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yield a -> m r
single m r
stop StreamK m a
mx

    goY :: StreamK m a -> a -> StreamK m a -> StreamK m a
goY StreamK m a
mx a
y StreamK m a
my = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
_ m r
_ -> do
        let stop :: m r
stop = a -> StreamK m a -> m r
yld a
y StreamK m a
my
            single :: a -> m r
single a
x = do
                case a -> a -> Ordering
cmp a
x a
y of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a -> StreamK m a -> StreamK m a
forall {m :: * -> *}. a -> StreamK m a -> StreamK m a
goX0 a
x StreamK m a
my)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (a
y a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m a
my)
            yield :: a -> StreamK m a -> m r
yield a
x StreamK m a
rx = do
                case a -> a -> Ordering
cmp a
x a
y of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a -> StreamK m a -> StreamK m a -> StreamK m a
goX a
x StreamK m a
rx StreamK m a
my)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (StreamK m a -> a -> StreamK m a -> StreamK m a
goY StreamK m a
rx a
y StreamK m a
my)
         in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yield a -> m r
single m r
stop StreamK m a
mx

------------------------------------------------------------------------------
-- Transformation comprehensions
------------------------------------------------------------------------------

{-# INLINE the #-}
the :: (Eq a, Monad m) => StreamK m a -> m (Maybe a)
the :: forall a (m :: * -> *).
(Eq a, Monad m) =>
StreamK m a -> m (Maybe a)
the StreamK m a
m = do
    Maybe (a, StreamK m a)
r <- StreamK m a -> m (Maybe (a, StreamK m a))
forall (m :: * -> *) a.
Applicative m =>
StreamK m a -> m (Maybe (a, StreamK m a))
uncons StreamK m a
m
    case Maybe (a, StreamK m a)
r of
        Maybe (a, StreamK m a)
Nothing -> Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
        Just (a
h, StreamK m a
t) -> a -> StreamK m a -> m (Maybe a)
forall {m :: * -> *} {a}.
(Monad m, Eq a) =>
a -> StreamK m a -> m (Maybe a)
go a
h StreamK m a
t
    where
    go :: a -> StreamK m a -> m (Maybe a)
go a
h StreamK m a
m1 =
        let single :: a -> m (Maybe a)
single a
a   | a
h a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
a    = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> m (Maybe a)) -> Maybe a -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
h
                       | Bool
otherwise = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
            yieldk :: a -> StreamK m a -> m (Maybe a)
yieldk a
a StreamK m a
r | a
h a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
a    = a -> StreamK m a -> m (Maybe a)
go a
h StreamK m a
r
                       | Bool
otherwise = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
         in State StreamK m a
-> (a -> StreamK m a -> m (Maybe a))
-> (a -> m (Maybe a))
-> m (Maybe a)
-> StreamK m a
-> m (Maybe a)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe a)
yieldk a -> m (Maybe a)
forall {m :: * -> *}. Monad m => a -> m (Maybe a)
single (Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> m (Maybe a)) -> Maybe a -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
h) StreamK m a
m1

------------------------------------------------------------------------------
-- Alternative & MonadPlus
------------------------------------------------------------------------------

_alt :: StreamK m a -> StreamK m a -> StreamK m a
_alt :: forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
_alt StreamK m a
m1 StreamK m a
m2 = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
    let stop :: m r
stop  = State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
m2
    in State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stop StreamK m a
m1

------------------------------------------------------------------------------
-- MonadError
------------------------------------------------------------------------------

{-
-- XXX handle and test cross thread state transfer
withCatchError
    :: MonadError e m
    => StreamK m a -> (e -> StreamK m a) -> StreamK m a
withCatchError m h =
    mkStream $ \_ stp sng yld ->
        let run x = unStream x Nothing stp sng yieldk
            handle r = r `catchError` \e -> run $ h e
            yieldk a r = yld a (withCatchError r h)
        in handle $ run m
-}

-------------------------------------------------------------------------------
-- Parsing
-------------------------------------------------------------------------------

-- | Run a 'Parser' over a stream and return rest of the Stream.
{-# INLINE_NORMAL parseDBreak #-}
parseDBreak
    :: Monad m
    => PR.Parser a m b
    -> StreamK m a
    -> m (Either ParseErrorPos b, StreamK m a)
parseDBreak :: forall (m :: * -> *) a b.
Monad m =>
Parser a m b
-> StreamK m a -> m (Either ParseErrorPos b, StreamK m a)
parseDBreak (PR.Parser s -> a -> m (Step s b)
pstep m (Initial s b)
initial s -> m (Final s b)
extract) StreamK m a
stream = do
    Initial s b
res <- m (Initial s b)
initial
    case Initial s b
res of
        PR.IPartial s
s -> StreamK m a
-> [a] -> s -> Int -> m (Either ParseErrorPos b, StreamK m a)
goStream StreamK m a
stream [] s
s Int
0
        PR.IDone b
b -> (Either ParseErrorPos b, StreamK m a)
-> m (Either ParseErrorPos b, StreamK m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Either ParseErrorPos b
forall a b. b -> Either a b
Right b
b, StreamK m a
stream)
        PR.IError [Char]
err -> (Either ParseErrorPos b, StreamK m a)
-> m (Either ParseErrorPos b, StreamK m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ParseErrorPos -> Either ParseErrorPos b
forall a b. a -> Either a b
Left (Int -> [Char] -> ParseErrorPos
ParseErrorPos Int
0 [Char]
err), StreamK m a
stream)

    where

    {-# INLINE splitAt #-}
    splitAt :: Int -> [a] -> ([a], [a])
splitAt = [Char] -> Int -> [a] -> ([a], [a])
forall a. [Char] -> Int -> [a] -> ([a], [a])
Stream.splitAt [Char]
"Data.StreamK.parseDBreak"

    -- "buf" contains last few items in the stream that we may have to
    -- backtrack to.
    --
    -- XXX currently we are using a dumb list based approach for backtracking
    -- buffer. This can be replaced by a sliding/ring buffer using Data.Array.
    -- That will allow us more efficient random back and forth movement.
    goStream :: StreamK m a
-> [a] -> s -> Int -> m (Either ParseErrorPos b, StreamK m a)
goStream StreamK m a
st [a]
buf !s
pst Int
i =
        let stop :: m (Either ParseErrorPos b, StreamK m a)
stop = do
                Final s b
r <- s -> m (Final s b)
extract s
pst
                case Final s b
r of
                    PR.FError [Char]
err -> do
                        let src :: [a]
src = [a] -> [a]
forall a. [a] -> [a]
Prelude.reverse [a]
buf
                        (Either ParseErrorPos b, StreamK m a)
-> m (Either ParseErrorPos b, StreamK m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ParseErrorPos -> Either ParseErrorPos b
forall a b. a -> Either a b
Left (Int -> [Char] -> ParseErrorPos
ParseErrorPos Int
i [Char]
err), [a] -> StreamK m a
forall a (m :: * -> *). [a] -> StreamK m a
fromList [a]
src)
                    PR.FDone Int
m b
b -> do
                        let n :: Int
n = (- Int
m)
                        assertM(Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= [a] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [a]
buf)
                        let src0 :: [a]
src0 = Int -> [a] -> [a]
forall a. Int -> [a] -> [a]
Prelude.take Int
n [a]
buf
                            src :: [a]
src  = [a] -> [a]
forall a. [a] -> [a]
Prelude.reverse [a]
src0
                        (Either ParseErrorPos b, StreamK m a)
-> m (Either ParseErrorPos b, StreamK m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Either ParseErrorPos b
forall a b. b -> Either a b
Right b
b, [a] -> StreamK m a
forall a (m :: * -> *). [a] -> StreamK m a
fromList [a]
src)
                    PR.FContinue Int
0 s
s -> StreamK m a
-> [a] -> s -> Int -> m (Either ParseErrorPos b, StreamK m a)
goStream StreamK m a
forall (m :: * -> *) a. StreamK m a
nil [a]
buf s
s Int
i
                    PR.FContinue Int
m s
s -> do
                        let n :: Int
n = (- Int
m)
                        assertM(Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= [a] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [a]
buf)
                        let ([a]
src0, [a]
buf1) = Int -> [a] -> ([a], [a])
forall {a}. Int -> [a] -> ([a], [a])
splitAt Int
n [a]
buf
                            src :: [a]
src = [a] -> [a]
forall a. [a] -> [a]
Prelude.reverse [a]
src0
                        StreamK m a
-> [a]
-> [a]
-> s
-> Int
-> m (Either ParseErrorPos b, StreamK m a)
goBuf StreamK m a
forall (m :: * -> *) a. StreamK m a
nil [a]
buf1 [a]
src s
s (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
m)
            single :: a -> m (Either ParseErrorPos b, StreamK m a)
single a
x = a -> StreamK m a -> m (Either ParseErrorPos b, StreamK m a)
yieldk a
x StreamK m a
forall (m :: * -> *) a. StreamK m a
nil
            yieldk :: a -> StreamK m a -> m (Either ParseErrorPos b, StreamK m a)
yieldk a
x StreamK m a
r = do
                Step s b
res <- s -> a -> m (Step s b)
pstep s
pst a
x
                case Step s b
res of
                    PR.SPartial Int
1 s
s -> StreamK m a
-> [a] -> s -> Int -> m (Either ParseErrorPos b, StreamK m a)
goStream StreamK m a
r [] s
s (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                    PR.SPartial Int
m s
s -> do
                        let n :: Int
n = Int
1 Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
m
                        assertM(Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= [a] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (a
xa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
buf))
                        let src0 :: [a]
src0 = Int -> [a] -> [a]
forall a. Int -> [a] -> [a]
Prelude.take Int
n (a
xa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
buf)
                            src :: [a]
src  = [a] -> [a]
forall a. [a] -> [a]
Prelude.reverse [a]
src0
                        StreamK m a
-> [a]
-> [a]
-> s
-> Int
-> m (Either ParseErrorPos b, StreamK m a)
goBuf StreamK m a
r [] [a]
src s
s (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
m)
                    PR.SContinue Int
1 s
s -> StreamK m a
-> [a] -> s -> Int -> m (Either ParseErrorPos b, StreamK m a)
goStream StreamK m a
r (a
xa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
buf) s
s (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                    PR.SContinue Int
m s
s -> do
                        let n :: Int
n = Int
1 Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
m
                        assertM(Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= [a] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (a
xa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
buf))
                        let ([a]
src0, [a]
buf1) = Int -> [a] -> ([a], [a])
forall {a}. Int -> [a] -> ([a], [a])
splitAt Int
n (a
xa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
buf)
                            src :: [a]
src = [a] -> [a]
forall a. [a] -> [a]
Prelude.reverse [a]
src0
                        StreamK m a
-> [a]
-> [a]
-> s
-> Int
-> m (Either ParseErrorPos b, StreamK m a)
goBuf StreamK m a
r [a]
buf1 [a]
src s
s (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
m)
                    PR.SDone Int
1 b
b -> (Either ParseErrorPos b, StreamK m a)
-> m (Either ParseErrorPos b, StreamK m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Either ParseErrorPos b
forall a b. b -> Either a b
Right b
b, StreamK m a
r)
                    PR.SDone Int
m b
b -> do
                        let n :: Int
n = Int
1 Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
m
                        assertM(Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= [a] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (a
xa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
buf))
                        let src0 :: [a]
src0 = Int -> [a] -> [a]
forall a. Int -> [a] -> [a]
Prelude.take Int
n (a
xa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
buf)
                            src :: [a]
src  = [a] -> [a]
forall a. [a] -> [a]
Prelude.reverse [a]
src0
                        (Either ParseErrorPos b, StreamK m a)
-> m (Either ParseErrorPos b, StreamK m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Either ParseErrorPos b
forall a b. b -> Either a b
Right b
b, StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
append ([a] -> StreamK m a
forall a (m :: * -> *). [a] -> StreamK m a
fromList [a]
src) StreamK m a
r)
                    PR.SError [Char]
err -> do
                        let src :: [a]
src = [a] -> [a]
forall a. [a] -> [a]
Prelude.reverse (a
xa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
buf)
                        (Either ParseErrorPos b, StreamK m a)
-> m (Either ParseErrorPos b, StreamK m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ParseErrorPos -> Either ParseErrorPos b
forall a b. a -> Either a b
Left (Int -> [Char] -> ParseErrorPos
ParseErrorPos (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) [Char]
err), StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
append ([a] -> StreamK m a
forall a (m :: * -> *). [a] -> StreamK m a
fromList [a]
src) StreamK m a
r)
         in State StreamK m a
-> (a -> StreamK m a -> m (Either ParseErrorPos b, StreamK m a))
-> (a -> m (Either ParseErrorPos b, StreamK m a))
-> m (Either ParseErrorPos b, StreamK m a)
-> StreamK m a
-> m (Either ParseErrorPos b, StreamK m a)
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Either ParseErrorPos b, StreamK m a)
yieldk a -> m (Either ParseErrorPos b, StreamK m a)
single m (Either ParseErrorPos b, StreamK m a)
stop StreamK m a
st

    goBuf :: StreamK m a
-> [a]
-> [a]
-> s
-> Int
-> m (Either ParseErrorPos b, StreamK m a)
goBuf StreamK m a
st [a]
buf [] !s
pst Int
i = StreamK m a
-> [a] -> s -> Int -> m (Either ParseErrorPos b, StreamK m a)
goStream StreamK m a
st [a]
buf s
pst Int
i
    goBuf StreamK m a
st [a]
buf (a
x:[a]
xs) !s
pst Int
i = do
        Step s b
pRes <- s -> a -> m (Step s b)
pstep s
pst a
x
        case Step s b
pRes of
            PR.SPartial Int
1 s
s -> StreamK m a
-> [a]
-> [a]
-> s
-> Int
-> m (Either ParseErrorPos b, StreamK m a)
goBuf StreamK m a
st [] [a]
xs s
s (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
            PR.SPartial Int
m s
s -> do
                let n :: Int
n = Int
1 Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
m
                Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= [a] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (a
xa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
buf)) (() -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let src0 :: [a]
src0 = Int -> [a] -> [a]
forall a. Int -> [a] -> [a]
Prelude.take Int
n (a
xa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
buf)
                    src :: [a]
src  = [a] -> [a]
forall a. [a] -> [a]
Prelude.reverse [a]
src0 [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
++ [a]
xs
                StreamK m a
-> [a]
-> [a]
-> s
-> Int
-> m (Either ParseErrorPos b, StreamK m a)
goBuf StreamK m a
st [] [a]
src s
s (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
m)
            PR.SContinue Int
1 s
s -> StreamK m a
-> [a]
-> [a]
-> s
-> Int
-> m (Either ParseErrorPos b, StreamK m a)
goBuf StreamK m a
st (a
xa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
buf) [a]
xs s
s (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
            PR.SContinue Int
m s
s -> do
                let n :: Int
n = Int
1 Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
m
                Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= [a] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (a
xa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
buf)) (() -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let ([a]
src0, [a]
buf1) = Int -> [a] -> ([a], [a])
forall {a}. Int -> [a] -> ([a], [a])
splitAt Int
n (a
xa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
buf)
                    src :: [a]
src  = [a] -> [a]
forall a. [a] -> [a]
Prelude.reverse [a]
src0 [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
++ [a]
xs
                StreamK m a
-> [a]
-> [a]
-> s
-> Int
-> m (Either ParseErrorPos b, StreamK m a)
goBuf StreamK m a
st [a]
buf1 [a]
src s
s (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
m)
            PR.SDone Int
m b
b -> do
                let n :: Int
n = Int
1 Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
m
                Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= [a] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (a
xa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
buf)) (() -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let src0 :: [a]
src0 = Int -> [a] -> [a]
forall a. Int -> [a] -> [a]
Prelude.take Int
n (a
xa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
buf)
                    src :: [a]
src  = [a] -> [a]
forall a. [a] -> [a]
Prelude.reverse [a]
src0 [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
++ [a]
xs
                (Either ParseErrorPos b, StreamK m a)
-> m (Either ParseErrorPos b, StreamK m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Either ParseErrorPos b
forall a b. b -> Either a b
Right b
b, StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
append ([a] -> StreamK m a
forall a (m :: * -> *). [a] -> StreamK m a
fromList [a]
src) StreamK m a
st)
            PR.SError [Char]
err -> do
                let src :: [a]
src = [a] -> [a]
forall a. [a] -> [a]
Prelude.reverse [a]
buf [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
++ a
xa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
xs
                (Either ParseErrorPos b, StreamK m a)
-> m (Either ParseErrorPos b, StreamK m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ParseErrorPos -> Either ParseErrorPos b
forall a b. a -> Either a b
Left (Int -> [Char] -> ParseErrorPos
ParseErrorPos (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) [Char]
err), StreamK m a -> StreamK m a -> StreamK m a
forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
append ([a] -> StreamK m a
forall a (m :: * -> *). [a] -> StreamK m a
fromList [a]
src) StreamK m a
st)

-- Using ParserD or ParserK on StreamK may not make much difference. We should
-- perhaps use only chunked parsing on StreamK. We can always convert a stream
-- to chunks before parsing. Or just have a ParserK element parser for StreamK
-- and convert ParserD to ParserK for element parsing using StreamK.
{-# INLINE parseD #-}
parseD :: Monad m =>
    Parser.Parser a m b -> StreamK m a -> m (Either ParseErrorPos b)
parseD :: forall (m :: * -> *) a b.
Monad m =>
Parser a m b -> StreamK m a -> m (Either ParseErrorPos b)
parseD Parser a m b
f = ((Either ParseErrorPos b, StreamK m a) -> Either ParseErrorPos b)
-> m (Either ParseErrorPos b, StreamK m a)
-> m (Either ParseErrorPos b)
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Either ParseErrorPos b, StreamK m a) -> Either ParseErrorPos b
forall a b. (a, b) -> a
fst (m (Either ParseErrorPos b, StreamK m a)
 -> m (Either ParseErrorPos b))
-> (StreamK m a -> m (Either ParseErrorPos b, StreamK m a))
-> StreamK m a
-> m (Either ParseErrorPos b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Parser a m b
-> StreamK m a -> m (Either ParseErrorPos b, StreamK m a)
forall (m :: * -> *) a b.
Monad m =>
Parser a m b
-> StreamK m a -> m (Either ParseErrorPos b, StreamK m a)
parseDBreak Parser a m b
f

-------------------------------------------------------------------------------
-- ParserK Chunked
-------------------------------------------------------------------------------

-- XXX parseDBreakChunks may be faster than converting parserD to toParserK and
-- using parseBreakChunks. We can also use parseBreak as an alternative to the
-- monad instance of ParserD.

-- | Run a 'ParserK' over a chunked 'StreamK' and return the parse result and
-- the remaining Stream.
{-# DEPRECATED parseBreakChunks "Use Streamly.Data.Array.parseBreak instead" #-}
{-# INLINE_NORMAL parseBreakChunks #-}
parseBreakChunks
    :: (Monad m, Unbox a)
    => ParserK (Array a) m b
    -> StreamK m (Array a)
    -> m (Either ParseError b, StreamK m (Array a))
parseBreakChunks :: forall (m :: * -> *) a b.
(Monad m, Unbox a) =>
ParserK (Array a) m b
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
parseBreakChunks = ParserK (Array a) m b
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
forall (m :: * -> *) a b.
(Monad m, Unbox a) =>
ParserK (Array a) m b
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
Array.parseBreak

{-# DEPRECATED parseChunks "Use Streamly.Data.Array.parse instead" #-}
{-# INLINE parseChunks #-}
parseChunks :: (Monad m, Unbox a) =>
    ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b)
parseChunks :: forall (m :: * -> *) a b.
(Monad m, Unbox a) =>
ParserK (Array a) m b
-> StreamK m (Array a) -> m (Either ParseError b)
parseChunks = ParserK (Array a) m b
-> StreamK m (Array a) -> m (Either ParseError b)
forall (m :: * -> *) a b.
(Monad m, Unbox a) =>
ParserK (Array a) m b
-> StreamK m (Array a) -> m (Either ParseError b)
Array.parse

-------------------------------------------------------------------------------
-- ParserK Singular
-------------------------------------------------------------------------------

-- | Similar to 'parseBreak' but works on singular elements.
--
{-# INLINE parseBreak #-}
parseBreak
    :: forall m a b. Monad m
    => ParserK.ParserK a m b
    -> StreamK m a
    -> m (Either ParseError b, StreamK m a)
parseBreak :: forall (m :: * -> *) a b.
Monad m =>
ParserK a m b
-> StreamK m a -> m (Either ParseError b, StreamK m a)
parseBreak = ParserK a m b
-> StreamK m a -> m (Either ParseError b, StreamK m a)
forall (m :: * -> *) a b.
Monad m =>
ParserK a m b
-> StreamK m a -> m (Either ParseError b, StreamK m a)
Drivers.parseBreakStreamK

-- | Like 'parseBreak' but includes stream position information in the error
-- messages.
--
{-# INLINE parseBreakPos #-}
parseBreakPos
    :: forall m a b. Monad m
    => ParserK.ParserK a m b
    -> StreamK m a
    -> m (Either ParseErrorPos b, StreamK m a)
parseBreakPos :: forall (m :: * -> *) a b.
Monad m =>
ParserK a m b
-> StreamK m a -> m (Either ParseErrorPos b, StreamK m a)
parseBreakPos = ParserK a m b
-> StreamK m a -> m (Either ParseErrorPos b, StreamK m a)
forall (m :: * -> *) a b.
Monad m =>
ParserK a m b
-> StreamK m a -> m (Either ParseErrorPos b, StreamK m a)
Drivers.parseBreakStreamKPos

-- | Run a 'ParserK' over a 'StreamK'. Please use 'parseChunks' where possible,
-- for better performance.
{-# INLINE parse #-}
parse :: Monad m =>
    ParserK.ParserK a m b -> StreamK m a -> m (Either ParseError b)
parse :: forall (m :: * -> *) a b.
Monad m =>
ParserK a m b -> StreamK m a -> m (Either ParseError b)
parse ParserK a m b
f = ((Either ParseError b, StreamK m a) -> Either ParseError b)
-> m (Either ParseError b, StreamK m a) -> m (Either ParseError b)
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Either ParseError b, StreamK m a) -> Either ParseError b
forall a b. (a, b) -> a
fst (m (Either ParseError b, StreamK m a) -> m (Either ParseError b))
-> (StreamK m a -> m (Either ParseError b, StreamK m a))
-> StreamK m a
-> m (Either ParseError b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ParserK a m b
-> StreamK m a -> m (Either ParseError b, StreamK m a)
forall (m :: * -> *) a b.
Monad m =>
ParserK a m b
-> StreamK m a -> m (Either ParseError b, StreamK m a)
parseBreak ParserK a m b
f

-- | Like 'parse' but includes stream position information in the error
-- messages.
--
{-# INLINE parsePos #-}
parsePos :: Monad m =>
    ParserK.ParserK a m b -> StreamK m a -> m (Either ParseErrorPos b)
parsePos :: forall (m :: * -> *) a b.
Monad m =>
ParserK a m b -> StreamK m a -> m (Either ParseErrorPos b)
parsePos ParserK a m b
f = ((Either ParseErrorPos b, StreamK m a) -> Either ParseErrorPos b)
-> m (Either ParseErrorPos b, StreamK m a)
-> m (Either ParseErrorPos b)
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Either ParseErrorPos b, StreamK m a) -> Either ParseErrorPos b
forall a b. (a, b) -> a
fst (m (Either ParseErrorPos b, StreamK m a)
 -> m (Either ParseErrorPos b))
-> (StreamK m a -> m (Either ParseErrorPos b, StreamK m a))
-> StreamK m a
-> m (Either ParseErrorPos b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ParserK a m b
-> StreamK m a -> m (Either ParseErrorPos b, StreamK m a)
forall (m :: * -> *) a b.
Monad m =>
ParserK a m b
-> StreamK m a -> m (Either ParseErrorPos b, StreamK m a)
parseBreakPos ParserK a m b
f

-------------------------------------------------------------------------------
-- ParserK Chunked Generic
-------------------------------------------------------------------------------

-- | Similar to 'parseBreak' but works on generic arrays
--
{-# DEPRECATED parseBreakChunksGeneric "Use Streamly.Data.Array.Generic.parseBreak" #-}
{-# INLINE_NORMAL parseBreakChunksGeneric #-}
parseBreakChunksGeneric
    :: forall m a b. Monad m
    => ParserK.ParserK (GenArr.Array a) m b
    -> StreamK m (GenArr.Array a)
    -> m (Either ParseError b, StreamK m (GenArr.Array a))
parseBreakChunksGeneric :: forall (m :: * -> *) a b.
Monad m =>
ParserK (Array a) m b
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
parseBreakChunksGeneric = ParserK (Array a) m b
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
forall (m :: * -> *) a b.
Monad m =>
ParserK (Array a) m b
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
GenArr.parseBreak

{-# DEPRECATED parseChunksGeneric "Use Streamly.Data.Array.Generic.parse" #-}
{-# INLINE parseChunksGeneric #-}
parseChunksGeneric ::
       (Monad m)
    => ParserK.ParserK (GenArr.Array a) m b
    -> StreamK m (GenArr.Array a)
    -> m (Either ParseError b)
parseChunksGeneric :: forall (m :: * -> *) a b.
Monad m =>
ParserK (Array a) m b
-> StreamK m (Array a) -> m (Either ParseError b)
parseChunksGeneric = ParserK (Array a) m b
-> StreamK m (Array a) -> m (Either ParseError b)
forall (m :: * -> *) a b.
Monad m =>
ParserK (Array a) m b
-> StreamK m (Array a) -> m (Either ParseError b)
GenArr.parse

-------------------------------------------------------------------------------
-- Sorting
-------------------------------------------------------------------------------

-- | Sort the input stream using a supplied comparison function.
--
-- Sorting can be achieved by simply:
--
-- >>> sortBy cmp = StreamK.mergeMapWith (StreamK.mergeBy cmp) StreamK.fromPure
--
-- However, this combinator uses a parser to first split the input stream into
-- down and up sorted segments and then merges them to optimize sorting when
-- pre-sorted sequences exist in the input stream.
--
-- /O(n) space/
--
{-# INLINE sortBy #-}
sortBy :: Monad m => (a -> a -> Ordering) -> StreamK m a -> StreamK m a
-- sortBy f = Stream.concatPairsWith (Stream.mergeBy f) Stream.fromPure
sortBy :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> StreamK m a -> StreamK m a
sortBy a -> a -> Ordering
cmp =
    let p :: Parser a m (Either (StreamK n a) (StreamK n a))
p =
            (a -> a -> Bool)
-> Fold m a (StreamK n a)
-> Fold m a (StreamK n a)
-> Parser a m (Either (StreamK n a) (StreamK n a))
forall (m :: * -> *) a b c.
Monad m =>
(a -> a -> Bool)
-> Fold m a b -> Fold m a c -> Parser a m (Either b c)
Parser.groupByRollingEither
                (\a
x -> (Ordering -> Ordering -> Bool
forall a. Ord a => a -> a -> Bool
< Ordering
GT) (Ordering -> Bool) -> (a -> Ordering) -> a -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> a -> Ordering
cmp a
x)
                Fold m a (StreamK n a)
forall (m :: * -> *) a (n :: * -> *).
Monad m =>
Fold m a (StreamK n a)
FL.toStreamKRev
                Fold m a (StreamK n a)
forall (m :: * -> *) a (n :: * -> *).
Monad m =>
Fold m a (StreamK n a)
FL.toStreamK
     in   (StreamK m a -> StreamK m a -> StreamK m a)
-> (StreamK m a -> StreamK m a)
-> StreamK m (StreamK m a)
-> StreamK m a
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
mergeMapWith ((a -> a -> Ordering) -> StreamK m a -> StreamK m a -> StreamK m a
forall a (m :: * -> *).
(a -> a -> Ordering) -> StreamK m a -> StreamK m a -> StreamK m a
mergeBy a -> a -> Ordering
cmp) StreamK m a -> StreamK m a
forall a. a -> a
id
        (StreamK m (StreamK m a) -> StreamK m a)
-> (StreamK m a -> StreamK m (StreamK m a))
-> StreamK m a
-> StreamK m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m (StreamK m a) -> StreamK m (StreamK m a)
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK
        (Stream m (StreamK m a) -> StreamK m (StreamK m a))
-> (StreamK m a -> Stream m (StreamK m a))
-> StreamK m a
-> StreamK m (StreamK m a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m (Either ParseError (StreamK m a))
-> Stream m (StreamK m a)
forall (m :: * -> *) a b.
Monad m =>
Stream m (Either a b) -> Stream m b
Stream.catRights -- its a non-failing backtracking parser
        (Stream m (Either ParseError (StreamK m a))
 -> Stream m (StreamK m a))
-> (StreamK m a -> Stream m (Either ParseError (StreamK m a)))
-> StreamK m a
-> Stream m (StreamK m a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Parser a m (StreamK m a)
-> Stream m a -> Stream m (Either ParseError (StreamK m a))
forall (m :: * -> *) a b.
Monad m =>
Parser a m b -> Stream m a -> Stream m (Either ParseError b)
Stream.parseMany ((Either (StreamK m a) (StreamK m a) -> StreamK m a)
-> Parser a m (Either (StreamK m a) (StreamK m a))
-> Parser a m (StreamK m a)
forall a b. (a -> b) -> Parser a m a -> Parser a m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((StreamK m a -> StreamK m a)
-> (StreamK m a -> StreamK m a)
-> Either (StreamK m a) (StreamK m a)
-> StreamK m a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either StreamK m a -> StreamK m a
forall a. a -> a
id StreamK m a -> StreamK m a
forall a. a -> a
id) Parser a m (Either (StreamK m a) (StreamK m a))
forall {n :: * -> *} {n :: * -> *}.
Parser a m (Either (StreamK n a) (StreamK n a))
p)
        (Stream m a -> Stream m (Either ParseError (StreamK m a)))
-> (StreamK m a -> Stream m a)
-> StreamK m a
-> Stream m (Either ParseError (StreamK m a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamK m a -> Stream m a
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
Stream.fromStreamK

{-# INLINE sortOn #-}
sortOn :: (Monad m, Ord b) => (a -> b) -> StreamK m a -> StreamK m a
sortOn :: forall (m :: * -> *) b a.
(Monad m, Ord b) =>
(a -> b) -> StreamK m a -> StreamK m a
sortOn a -> b
f =
      ((b, a) -> a) -> StreamK m (b, a) -> StreamK m a
forall a b. (a -> b) -> StreamK m a -> StreamK m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (b, a) -> a
forall a b. (a, b) -> b
snd
    (StreamK m (b, a) -> StreamK m a)
-> (StreamK m a -> StreamK m (b, a)) -> StreamK m a -> StreamK m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((b, a) -> (b, a) -> Ordering)
-> StreamK m (b, a) -> StreamK m (b, a)
forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> StreamK m a -> StreamK m a
sortBy (((b, a) -> b) -> (b, a) -> (b, a) -> Ordering
forall a b. Ord a => (b -> a) -> b -> b -> Ordering
comparing (b, a) -> b
forall a b. (a, b) -> a
fst)
    (StreamK m (b, a) -> StreamK m (b, a))
-> (StreamK m a -> StreamK m (b, a))
-> StreamK m a
-> StreamK m (b, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> (b, a)) -> StreamK m a -> StreamK m (b, a)
forall a b. (a -> b) -> StreamK m a -> StreamK m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\a
x -> let y :: b
y = a -> b
f a
x in b
y b -> (b, a) -> (b, a)
forall a b. a -> b -> b
`seq` (b
y, a
x))