-- | Streamly streaming interface for bolty Neo4j queries.
--
-- Instead of buffering all result records into a 'V.Vector', this module
-- yields records one-by-one as a @'Stream' IO 'Record'@,
-- allowing constant-memory consumption of large result sets.
--
-- @
-- import qualified Database.Bolty           as Bolt
-- import qualified Database.Bolty.Streamly  as BoltS
-- import qualified Streamly.Data.Stream     as Stream
-- import qualified Streamly.Data.Fold       as Fold
--
-- main :: IO ()
-- main = do
--   conn <- Bolt.connect cfg
--   s <- BoltS.queryStream conn \"MATCH (n) RETURN n\"
--   count <- Stream.fold Fold.length s
--   Bolt.close conn
-- @
module Database.Bolty.Streamly
  ( -- * Streaming queries
    queryStream
  , queryStreamP
    -- * Streaming queries with decoding
  , queryStreamAs
  , queryStreamPAs
    -- * Low-level streaming
  , pullStream
    -- * Pool-based streaming
  , withPoolStream
  , withPoolStreamP
  , withPoolStreamAs
  , withPoolStreamPAs
    -- * Routing pool streaming
  , withRoutingStream
  , withRoutingStreamP
  , withRoutingStreamAs
  , withRoutingStreamPAs
    -- * Session streaming
  , sessionReadStream
  , sessionReadStreamP
  , sessionWriteStream
  , sessionWriteStreamP
  , sessionReadStreamAs
  , sessionReadStreamPAs
  , sessionWriteStreamAs
  , sessionWriteStreamPAs
    -- * Re-exports
  , Stream
  ) where

import           Control.Exception              (throwIO)
import           Data.Kind                      (Type)
import           Data.Text                      (Text)
import           GHC.Stack                      (HasCallStack)
import qualified Data.HashMap.Lazy              as H
import qualified Data.PackStream.Ps             as PS
import           Data.PackStream.Result         (Result(..))
import qualified Data.Vector                    as V
import           Streamly.Data.Stream           (Stream)
import qualified Streamly.Data.Stream           as Stream

import           Database.Bolty.Connection      (requestResponseRunIO)
import qualified Database.Bolty.Connection.Pipe as P
import           Database.Bolty.Connection.Type
import           Database.Bolty.Decode          (RowDecoder, decodeRow)
import           Database.Bolty.Message.Request (Request(..), defaultPull)
import           Database.Bolty.Message.Response (Response(..), Failure(..), successFields)
import           Database.Bolty.Pool            (BoltPool, withConnection)
import           Database.Bolty.Record          (Record)
import           Database.Bolty.Routing         (AccessMode(..), RoutingPool,
                                                  withRoutingConnection)
import           Database.Bolty.Session         (Session, readTransaction, writeTransaction)


-- | Run a Cypher query and return results as a stream of records.
--
-- Records are yielded one at a time as they arrive from the server,
-- without buffering the entire result set in memory.
--
-- Must be called in @Ready@ or @TXready@ state.
queryStream :: HasCallStack => Connection -> Text -> IO (Stream IO Record)
queryStream :: HasCallStack => Connection -> Text -> IO (Stream IO Record)
queryStream Connection
conn Text
cypher = HasCallStack =>
Connection -> Text -> HashMap Text Ps -> IO (Stream IO Record)
Connection -> Text -> HashMap Text Ps -> IO (Stream IO Record)
queryStreamP Connection
conn Text
cypher HashMap Text Ps
forall k v. HashMap k v
H.empty


-- | Run a parameterised Cypher query and return results as a stream.
queryStreamP :: HasCallStack => Connection -> Text -> H.HashMap Text PS.Ps -> IO (Stream IO Record)
queryStreamP :: HasCallStack =>
Connection -> Text -> HashMap Text Ps -> IO (Stream IO Record)
queryStreamP Connection
conn Text
cypher HashMap Text Ps
params = do
  SuccessRun
_ <- HasCallStack =>
Connection -> Text -> HashMap Text Ps -> IO SuccessRun
Connection -> Text -> HashMap Text Ps -> IO SuccessRun
requestResponseRunIO Connection
conn Text
cypher HashMap Text Ps
params
  HasCallStack => Connection -> IO (Stream IO Record)
Connection -> IO (Stream IO Record)
pullStream Connection
conn


-- | Run a Cypher query and decode each record using a 'RowDecoder'.
-- Throws 'Database.Bolty.Decode.DecodeError' on decode failure.
queryStreamAs :: HasCallStack => RowDecoder a -> Connection -> Text -> IO (Stream IO a)
queryStreamAs :: forall a.
HasCallStack =>
RowDecoder a -> Connection -> Text -> IO (Stream IO a)
queryStreamAs RowDecoder a
decoder Connection
conn Text
cypher = RowDecoder a
-> Connection -> Text -> HashMap Text Ps -> IO (Stream IO a)
forall a.
HasCallStack =>
RowDecoder a
-> Connection -> Text -> HashMap Text Ps -> IO (Stream IO a)
queryStreamPAs RowDecoder a
decoder Connection
conn Text
cypher HashMap Text Ps
forall k v. HashMap k v
H.empty


-- | Run a parameterised Cypher query and decode each record using a 'RowDecoder'.
-- Throws 'Database.Bolty.Decode.DecodeError' on decode failure.
queryStreamPAs :: HasCallStack => RowDecoder a -> Connection -> Text -> H.HashMap Text PS.Ps -> IO (Stream IO a)
queryStreamPAs :: forall a.
HasCallStack =>
RowDecoder a
-> Connection -> Text -> HashMap Text Ps -> IO (Stream IO a)
queryStreamPAs RowDecoder a
decoder Connection
conn Text
cypher HashMap Text Ps
params = do
  SuccessRun
runResp <- HasCallStack =>
Connection -> Text -> HashMap Text Ps -> IO SuccessRun
Connection -> Text -> HashMap Text Ps -> IO SuccessRun
requestResponseRunIO Connection
conn Text
cypher HashMap Text Ps
params
  let columns :: Vector Text
columns = SuccessRun -> Vector Text
successFields SuccessRun
runResp
  Stream IO Record
s <- HasCallStack => Connection -> IO (Stream IO Record)
Connection -> IO (Stream IO Record)
pullStream Connection
conn
  Stream IO a -> IO (Stream IO a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Stream IO a -> IO (Stream IO a))
-> Stream IO a -> IO (Stream IO a)
forall a b. (a -> b) -> a -> b
$ (Record -> IO a) -> Stream IO Record -> Stream IO a
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
Stream.mapM (RowDecoder a -> Vector Text -> Record -> IO a
forall a. RowDecoder a -> Vector Text -> Record -> IO a
decodeOrThrow RowDecoder a
decoder Vector Text
columns) Stream IO Record
s


-- | Decode a single record, throwing 'Database.Bolty.Decode.DecodeError' on failure.
decodeOrThrow :: RowDecoder a -> V.Vector Text -> Record -> IO a
decodeOrThrow :: forall a. RowDecoder a -> Vector Text -> Record -> IO a
decodeOrThrow RowDecoder a
decoder Vector Text
columns Record
record =
  case RowDecoder a -> Vector Text -> Record -> Either DecodeError a
forall a.
RowDecoder a -> Vector Text -> Record -> Either DecodeError a
decodeRow RowDecoder a
decoder Vector Text
columns Record
record of
    Right a
a  -> a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a
    Left DecodeError
err -> DecodeError -> IO a
forall e a. Exception e => e -> IO a
throwIO DecodeError
err


-- | Pull state machine.  @NeedPull@ means we need to send a new PULL
-- message to the server.  @Done@ means the result set is exhausted.
type PullState :: Type
data PullState = NeedPull | Done


-- | Stream records from an in-progress PULL.
--
-- Expects the connection to be in @Streaming@ or @TXstreaming@ state
-- (i.e. after a RUN has been sent and acknowledged). Sends PULL messages
-- and yields each 'Record' as it arrives.  When the server signals
-- completion, the state transitions back to @Ready@ / @TXready@.
pullStream :: HasCallStack => Connection -> IO (Stream IO Record)
pullStream :: HasCallStack => Connection -> IO (Stream IO Record)
pullStream Connection
conn = do
  HasCallStack => Connection -> [ServerState] -> Text -> IO ()
Connection -> [ServerState] -> Text -> IO ()
P.requireStateIO Connection
conn [ServerState
Streaming, ServerState
TXstreaming] Text
"PULL"
  HasCallStack => Connection -> Request -> IO ()
Connection -> Request -> IO ()
P.flushIO Connection
conn (Request -> IO ()) -> Request -> IO ()
forall a b. (a -> b) -> a -> b
$ Pull -> Request
RPull Pull
defaultPull
  Stream IO Record -> IO (Stream IO Record)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Stream IO Record -> IO (Stream IO Record))
-> Stream IO Record -> IO (Stream IO Record)
forall a b. (a -> b) -> a -> b
$ (PullState -> IO (Maybe (Record, PullState)))
-> PullState -> Stream IO Record
forall (m :: * -> *) s a.
Monad m =>
(s -> m (Maybe (a, s))) -> s -> Stream m a
Stream.unfoldrM (HasCallStack =>
Connection -> PullState -> IO (Maybe (Record, PullState))
Connection -> PullState -> IO (Maybe (Record, PullState))
step Connection
conn) PullState
NeedPull
  where
    step :: HasCallStack => Connection -> PullState -> IO (Maybe (Record, PullState))
    step :: HasCallStack =>
Connection -> PullState -> IO (Maybe (Record, PullState))
step Connection
_ PullState
Done = Maybe (Record, PullState) -> IO (Maybe (Record, PullState))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Record, PullState)
forall a. Maybe a
Nothing
    step Connection
c PullState
NeedPull = do
      Response
response <- HasCallStack => Connection -> IO Response
Connection -> IO Response
P.fetchIO Connection
c
      case Response
response of
        RRecord Record
record ->
          Maybe (Record, PullState) -> IO (Maybe (Record, PullState))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Record, PullState) -> IO (Maybe (Record, PullState)))
-> Maybe (Record, PullState) -> IO (Maybe (Record, PullState))
forall a b. (a -> b) -> a -> b
$ (Record, PullState) -> Maybe (Record, PullState)
forall a. a -> Maybe a
Just (Record
record, PullState
NeedPull)

        RSuccess HashMap Text Ps
meta -> do
          let hasMore :: Bool
hasMore = case Text -> HashMap Text Ps -> Maybe Ps
forall k v. Hashable k => k -> HashMap k v -> Maybe v
H.lookup Text
"has_more" HashMap Text Ps
meta of
                          Just Ps
hm -> case Ps -> Result Bool
forall a. PackStream a => Ps -> Result a
PS.fromPs Ps
hm of
                            Success Bool
True -> Bool
True
                            Result Bool
_            -> Bool
False
                          Maybe Ps
Nothing -> Bool
False
          if Bool
hasMore then do
            -- Server has more batches; send another PULL and continue
            HasCallStack => Connection -> Request -> IO ()
Connection -> Request -> IO ()
P.flushIO Connection
c (Request -> IO ()) -> Request -> IO ()
forall a b. (a -> b) -> a -> b
$ Pull -> Request
RPull Pull
defaultPull
            HasCallStack =>
Connection -> PullState -> IO (Maybe (Record, PullState))
Connection -> PullState -> IO (Maybe (Record, PullState))
step Connection
c PullState
NeedPull
          else do
            -- All records consumed; transition state
            ServerState
st <- Connection -> IO ServerState
forall (m :: * -> *). MonadIO m => Connection -> m ServerState
P.getState Connection
c
            Connection -> ServerState -> IO ()
forall (m :: * -> *).
MonadIO m =>
Connection -> ServerState -> m ()
P.setState Connection
c (ServerState -> IO ()) -> ServerState -> IO ()
forall a b. (a -> b) -> a -> b
$ case ServerState
st of
              ServerState
TXstreaming -> ServerState
TXready
              ServerState
_           -> ServerState
Ready
            Maybe (Record, PullState) -> IO (Maybe (Record, PullState))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Record, PullState)
forall a. Maybe a
Nothing

        Response
RIgnored -> do
          Connection -> IO ()
forall (m :: * -> *).
(MonadIO m, HasCallStack) =>
Connection -> m ()
P.reset Connection
c
          Error -> IO (Maybe (Record, PullState))
forall e a. Exception e => e -> IO a
throwIO Error
ResponseErrorIgnored

        RFailure Failure{Text
code :: Text
code :: Failure -> Text
code, Text
message :: Text
message :: Failure -> Text
message} -> do
          Connection -> ServerState -> IO ()
forall (m :: * -> *).
MonadIO m =>
Connection -> ServerState -> m ()
P.setState Connection
c ServerState
Failed
          Connection -> IO ()
forall (m :: * -> *).
(MonadIO m, HasCallStack) =>
Connection -> m ()
P.reset Connection
c
          Error -> IO (Maybe (Record, PullState))
forall e a. Exception e => e -> IO a
throwIO (Error -> IO (Maybe (Record, PullState)))
-> Error -> IO (Maybe (Record, PullState))
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Error
ResponseErrorFailure Text
code Text
message


-- ---------------------------------------------------------------------------
-- Pool-based streaming
-- ---------------------------------------------------------------------------

-- | Acquire a connection from a pool, run a streaming query, and pass the
-- stream to the consumer function. The connection is held until the consumer
-- returns. The stream must be fully consumed within the consumer.
withPoolStream :: HasCallStack
              => BoltPool
              -> Text
              -> (Stream IO Record -> IO a)
              -> IO a
withPoolStream :: forall a.
HasCallStack =>
BoltPool -> Text -> (Stream IO Record -> IO a) -> IO a
withPoolStream BoltPool
pool Text
cypher Stream IO Record -> IO a
consume =
  BoltPool
-> Text -> HashMap Text Ps -> (Stream IO Record -> IO a) -> IO a
forall a.
HasCallStack =>
BoltPool
-> Text -> HashMap Text Ps -> (Stream IO Record -> IO a) -> IO a
withPoolStreamP BoltPool
pool Text
cypher HashMap Text Ps
forall k v. HashMap k v
H.empty Stream IO Record -> IO a
consume


-- | Like 'withPoolStream' but with query parameters.
withPoolStreamP :: HasCallStack
               => BoltPool
               -> Text
               -> H.HashMap Text PS.Ps
               -> (Stream IO Record -> IO a)
               -> IO a
withPoolStreamP :: forall a.
HasCallStack =>
BoltPool
-> Text -> HashMap Text Ps -> (Stream IO Record -> IO a) -> IO a
withPoolStreamP BoltPool
pool Text
cypher HashMap Text Ps
params Stream IO Record -> IO a
consume =
  BoltPool -> (Connection -> IO a) -> IO a
forall a. HasCallStack => BoltPool -> (Connection -> IO a) -> IO a
withConnection BoltPool
pool ((Connection -> IO a) -> IO a) -> (Connection -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Connection
conn -> do
    Stream IO Record
s <- HasCallStack =>
Connection -> Text -> HashMap Text Ps -> IO (Stream IO Record)
Connection -> Text -> HashMap Text Ps -> IO (Stream IO Record)
queryStreamP Connection
conn Text
cypher HashMap Text Ps
params
    Stream IO Record -> IO a
consume Stream IO Record
s


-- | Like 'withPoolStream' but decodes each record using a 'RowDecoder'.
withPoolStreamAs :: HasCallStack
                => RowDecoder a
                -> BoltPool
                -> Text
                -> (Stream IO a -> IO b)
                -> IO b
withPoolStreamAs :: forall a b.
HasCallStack =>
RowDecoder a -> BoltPool -> Text -> (Stream IO a -> IO b) -> IO b
withPoolStreamAs RowDecoder a
decoder BoltPool
pool Text
cypher Stream IO a -> IO b
consume =
  RowDecoder a
-> BoltPool
-> Text
-> HashMap Text Ps
-> (Stream IO a -> IO b)
-> IO b
forall a b.
HasCallStack =>
RowDecoder a
-> BoltPool
-> Text
-> HashMap Text Ps
-> (Stream IO a -> IO b)
-> IO b
withPoolStreamPAs RowDecoder a
decoder BoltPool
pool Text
cypher HashMap Text Ps
forall k v. HashMap k v
H.empty Stream IO a -> IO b
consume


-- | Like 'withPoolStreamP' but decodes each record using a 'RowDecoder'.
withPoolStreamPAs :: HasCallStack
                 => RowDecoder a
                 -> BoltPool
                 -> Text
                 -> H.HashMap Text PS.Ps
                 -> (Stream IO a -> IO b)
                 -> IO b
withPoolStreamPAs :: forall a b.
HasCallStack =>
RowDecoder a
-> BoltPool
-> Text
-> HashMap Text Ps
-> (Stream IO a -> IO b)
-> IO b
withPoolStreamPAs RowDecoder a
decoder BoltPool
pool Text
cypher HashMap Text Ps
params Stream IO a -> IO b
consume =
  BoltPool -> (Connection -> IO b) -> IO b
forall a. HasCallStack => BoltPool -> (Connection -> IO a) -> IO a
withConnection BoltPool
pool ((Connection -> IO b) -> IO b) -> (Connection -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \Connection
conn -> do
    Stream IO a
s <- RowDecoder a
-> Connection -> Text -> HashMap Text Ps -> IO (Stream IO a)
forall a.
HasCallStack =>
RowDecoder a
-> Connection -> Text -> HashMap Text Ps -> IO (Stream IO a)
queryStreamPAs RowDecoder a
decoder Connection
conn Text
cypher HashMap Text Ps
params
    Stream IO a -> IO b
consume Stream IO a
s


-- ---------------------------------------------------------------------------
-- Routing pool streaming
-- ---------------------------------------------------------------------------

-- | Acquire a routed connection, run a streaming query, and pass the stream
-- to the consumer. Uses 'ReadAccess' or 'WriteAccess' to direct queries
-- to the appropriate cluster member.
withRoutingStream :: HasCallStack
                 => RoutingPool
                 -> AccessMode
                 -> Text
                 -> (Stream IO Record -> IO a)
                 -> IO a
withRoutingStream :: forall a.
HasCallStack =>
RoutingPool
-> AccessMode -> Text -> (Stream IO Record -> IO a) -> IO a
withRoutingStream RoutingPool
rp AccessMode
mode Text
cypher Stream IO Record -> IO a
consume =
  RoutingPool
-> AccessMode
-> Text
-> HashMap Text Ps
-> (Stream IO Record -> IO a)
-> IO a
forall a.
HasCallStack =>
RoutingPool
-> AccessMode
-> Text
-> HashMap Text Ps
-> (Stream IO Record -> IO a)
-> IO a
withRoutingStreamP RoutingPool
rp AccessMode
mode Text
cypher HashMap Text Ps
forall k v. HashMap k v
H.empty Stream IO Record -> IO a
consume


-- | Like 'withRoutingStream' but with query parameters.
withRoutingStreamP :: HasCallStack
                  => RoutingPool
                  -> AccessMode
                  -> Text
                  -> H.HashMap Text PS.Ps
                  -> (Stream IO Record -> IO a)
                  -> IO a
withRoutingStreamP :: forall a.
HasCallStack =>
RoutingPool
-> AccessMode
-> Text
-> HashMap Text Ps
-> (Stream IO Record -> IO a)
-> IO a
withRoutingStreamP RoutingPool
rp AccessMode
mode Text
cypher HashMap Text Ps
params Stream IO Record -> IO a
consume =
  RoutingPool -> AccessMode -> (Connection -> IO a) -> IO a
forall a.
HasCallStack =>
RoutingPool -> AccessMode -> (Connection -> IO a) -> IO a
withRoutingConnection RoutingPool
rp AccessMode
mode ((Connection -> IO a) -> IO a) -> (Connection -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Connection
conn -> do
    Stream IO Record
s <- HasCallStack =>
Connection -> Text -> HashMap Text Ps -> IO (Stream IO Record)
Connection -> Text -> HashMap Text Ps -> IO (Stream IO Record)
queryStreamP Connection
conn Text
cypher HashMap Text Ps
params
    Stream IO Record -> IO a
consume Stream IO Record
s


-- | Like 'withRoutingStream' but decodes each record using a 'RowDecoder'.
withRoutingStreamAs :: HasCallStack
                   => RowDecoder a
                   -> RoutingPool
                   -> AccessMode
                   -> Text
                   -> (Stream IO a -> IO b)
                   -> IO b
withRoutingStreamAs :: forall a b.
HasCallStack =>
RowDecoder a
-> RoutingPool
-> AccessMode
-> Text
-> (Stream IO a -> IO b)
-> IO b
withRoutingStreamAs RowDecoder a
decoder RoutingPool
rp AccessMode
mode Text
cypher Stream IO a -> IO b
consume =
  RowDecoder a
-> RoutingPool
-> AccessMode
-> Text
-> HashMap Text Ps
-> (Stream IO a -> IO b)
-> IO b
forall a b.
HasCallStack =>
RowDecoder a
-> RoutingPool
-> AccessMode
-> Text
-> HashMap Text Ps
-> (Stream IO a -> IO b)
-> IO b
withRoutingStreamPAs RowDecoder a
decoder RoutingPool
rp AccessMode
mode Text
cypher HashMap Text Ps
forall k v. HashMap k v
H.empty Stream IO a -> IO b
consume


-- | Like 'withRoutingStreamP' but decodes each record using a 'RowDecoder'.
withRoutingStreamPAs :: HasCallStack
                    => RowDecoder a
                    -> RoutingPool
                    -> AccessMode
                    -> Text
                    -> H.HashMap Text PS.Ps
                    -> (Stream IO a -> IO b)
                    -> IO b
withRoutingStreamPAs :: forall a b.
HasCallStack =>
RowDecoder a
-> RoutingPool
-> AccessMode
-> Text
-> HashMap Text Ps
-> (Stream IO a -> IO b)
-> IO b
withRoutingStreamPAs RowDecoder a
decoder RoutingPool
rp AccessMode
mode Text
cypher HashMap Text Ps
params Stream IO a -> IO b
consume =
  RoutingPool -> AccessMode -> (Connection -> IO b) -> IO b
forall a.
HasCallStack =>
RoutingPool -> AccessMode -> (Connection -> IO a) -> IO a
withRoutingConnection RoutingPool
rp AccessMode
mode ((Connection -> IO b) -> IO b) -> (Connection -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \Connection
conn -> do
    Stream IO a
s <- RowDecoder a
-> Connection -> Text -> HashMap Text Ps -> IO (Stream IO a)
forall a.
HasCallStack =>
RowDecoder a
-> Connection -> Text -> HashMap Text Ps -> IO (Stream IO a)
queryStreamPAs RowDecoder a
decoder Connection
conn Text
cypher HashMap Text Ps
params
    Stream IO a -> IO b
consume Stream IO a
s


-- ---------------------------------------------------------------------------
-- Session streaming
-- ---------------------------------------------------------------------------

-- | Run a streaming query inside a managed read transaction.
-- Handles BEGIN, COMMIT, bookmark propagation, and retries on transient
-- failures. Directs queries to read replicas when using a routing session.
sessionReadStream :: HasCallStack
                 => Session
                 -> Text
                 -> (Stream IO Record -> IO a)
                 -> IO a
sessionReadStream :: forall a.
HasCallStack =>
Session -> Text -> (Stream IO Record -> IO a) -> IO a
sessionReadStream Session
session Text
cypher Stream IO Record -> IO a
consume =
  Session
-> Text -> HashMap Text Ps -> (Stream IO Record -> IO a) -> IO a
forall a.
HasCallStack =>
Session
-> Text -> HashMap Text Ps -> (Stream IO Record -> IO a) -> IO a
sessionReadStreamP Session
session Text
cypher HashMap Text Ps
forall k v. HashMap k v
H.empty Stream IO Record -> IO a
consume


-- | Like 'sessionReadStream' but with query parameters.
sessionReadStreamP :: HasCallStack
                  => Session
                  -> Text
                  -> H.HashMap Text PS.Ps
                  -> (Stream IO Record -> IO a)
                  -> IO a
sessionReadStreamP :: forall a.
HasCallStack =>
Session
-> Text -> HashMap Text Ps -> (Stream IO Record -> IO a) -> IO a
sessionReadStreamP Session
session Text
cypher HashMap Text Ps
params Stream IO Record -> IO a
consume =
  Session -> (Connection -> IO a) -> IO a
forall a. HasCallStack => Session -> (Connection -> IO a) -> IO a
readTransaction Session
session ((Connection -> IO a) -> IO a) -> (Connection -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Connection
conn -> do
    Stream IO Record
s <- HasCallStack =>
Connection -> Text -> HashMap Text Ps -> IO (Stream IO Record)
Connection -> Text -> HashMap Text Ps -> IO (Stream IO Record)
queryStreamP Connection
conn Text
cypher HashMap Text Ps
params
    Stream IO Record -> IO a
consume Stream IO Record
s


-- | Run a streaming query inside a managed write transaction.
-- Handles BEGIN, COMMIT, bookmark propagation, and retries on transient
-- failures. Directs queries to the leader when using a routing session.
sessionWriteStream :: HasCallStack
                  => Session
                  -> Text
                  -> (Stream IO Record -> IO a)
                  -> IO a
sessionWriteStream :: forall a.
HasCallStack =>
Session -> Text -> (Stream IO Record -> IO a) -> IO a
sessionWriteStream Session
session Text
cypher Stream IO Record -> IO a
consume =
  Session
-> Text -> HashMap Text Ps -> (Stream IO Record -> IO a) -> IO a
forall a.
HasCallStack =>
Session
-> Text -> HashMap Text Ps -> (Stream IO Record -> IO a) -> IO a
sessionWriteStreamP Session
session Text
cypher HashMap Text Ps
forall k v. HashMap k v
H.empty Stream IO Record -> IO a
consume


-- | Like 'sessionWriteStream' but with query parameters.
sessionWriteStreamP :: HasCallStack
                   => Session
                   -> Text
                   -> H.HashMap Text PS.Ps
                   -> (Stream IO Record -> IO a)
                   -> IO a
sessionWriteStreamP :: forall a.
HasCallStack =>
Session
-> Text -> HashMap Text Ps -> (Stream IO Record -> IO a) -> IO a
sessionWriteStreamP Session
session Text
cypher HashMap Text Ps
params Stream IO Record -> IO a
consume =
  Session -> (Connection -> IO a) -> IO a
forall a. HasCallStack => Session -> (Connection -> IO a) -> IO a
writeTransaction Session
session ((Connection -> IO a) -> IO a) -> (Connection -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Connection
conn -> do
    Stream IO Record
s <- HasCallStack =>
Connection -> Text -> HashMap Text Ps -> IO (Stream IO Record)
Connection -> Text -> HashMap Text Ps -> IO (Stream IO Record)
queryStreamP Connection
conn Text
cypher HashMap Text Ps
params
    Stream IO Record -> IO a
consume Stream IO Record
s


-- | Like 'sessionReadStream' but decodes each record using a 'RowDecoder'.
sessionReadStreamAs :: HasCallStack
                   => RowDecoder a
                   -> Session
                   -> Text
                   -> (Stream IO a -> IO b)
                   -> IO b
sessionReadStreamAs :: forall a b.
HasCallStack =>
RowDecoder a -> Session -> Text -> (Stream IO a -> IO b) -> IO b
sessionReadStreamAs RowDecoder a
decoder Session
session Text
cypher Stream IO a -> IO b
consume =
  RowDecoder a
-> Session
-> Text
-> HashMap Text Ps
-> (Stream IO a -> IO b)
-> IO b
forall a b.
HasCallStack =>
RowDecoder a
-> Session
-> Text
-> HashMap Text Ps
-> (Stream IO a -> IO b)
-> IO b
sessionReadStreamPAs RowDecoder a
decoder Session
session Text
cypher HashMap Text Ps
forall k v. HashMap k v
H.empty Stream IO a -> IO b
consume


-- | Like 'sessionReadStreamP' but decodes each record using a 'RowDecoder'.
sessionReadStreamPAs :: HasCallStack
                    => RowDecoder a
                    -> Session
                    -> Text
                    -> H.HashMap Text PS.Ps
                    -> (Stream IO a -> IO b)
                    -> IO b
sessionReadStreamPAs :: forall a b.
HasCallStack =>
RowDecoder a
-> Session
-> Text
-> HashMap Text Ps
-> (Stream IO a -> IO b)
-> IO b
sessionReadStreamPAs RowDecoder a
decoder Session
session Text
cypher HashMap Text Ps
params Stream IO a -> IO b
consume =
  Session -> (Connection -> IO b) -> IO b
forall a. HasCallStack => Session -> (Connection -> IO a) -> IO a
readTransaction Session
session ((Connection -> IO b) -> IO b) -> (Connection -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \Connection
conn -> do
    Stream IO a
s <- RowDecoder a
-> Connection -> Text -> HashMap Text Ps -> IO (Stream IO a)
forall a.
HasCallStack =>
RowDecoder a
-> Connection -> Text -> HashMap Text Ps -> IO (Stream IO a)
queryStreamPAs RowDecoder a
decoder Connection
conn Text
cypher HashMap Text Ps
params
    Stream IO a -> IO b
consume Stream IO a
s


-- | Like 'sessionWriteStream' but decodes each record using a 'RowDecoder'.
sessionWriteStreamAs :: HasCallStack
                    => RowDecoder a
                    -> Session
                    -> Text
                    -> (Stream IO a -> IO b)
                    -> IO b
sessionWriteStreamAs :: forall a b.
HasCallStack =>
RowDecoder a -> Session -> Text -> (Stream IO a -> IO b) -> IO b
sessionWriteStreamAs RowDecoder a
decoder Session
session Text
cypher Stream IO a -> IO b
consume =
  RowDecoder a
-> Session
-> Text
-> HashMap Text Ps
-> (Stream IO a -> IO b)
-> IO b
forall a b.
HasCallStack =>
RowDecoder a
-> Session
-> Text
-> HashMap Text Ps
-> (Stream IO a -> IO b)
-> IO b
sessionWriteStreamPAs RowDecoder a
decoder Session
session Text
cypher HashMap Text Ps
forall k v. HashMap k v
H.empty Stream IO a -> IO b
consume


-- | Like 'sessionWriteStreamP' but decodes each record using a 'RowDecoder'.
sessionWriteStreamPAs :: HasCallStack
                     => RowDecoder a
                     -> Session
                     -> Text
                     -> H.HashMap Text PS.Ps
                     -> (Stream IO a -> IO b)
                     -> IO b
sessionWriteStreamPAs :: forall a b.
HasCallStack =>
RowDecoder a
-> Session
-> Text
-> HashMap Text Ps
-> (Stream IO a -> IO b)
-> IO b
sessionWriteStreamPAs RowDecoder a
decoder Session
session Text
cypher HashMap Text Ps
params Stream IO a -> IO b
consume =
  Session -> (Connection -> IO b) -> IO b
forall a. HasCallStack => Session -> (Connection -> IO a) -> IO a
writeTransaction Session
session ((Connection -> IO b) -> IO b) -> (Connection -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \Connection
conn -> do
    Stream IO a
s <- RowDecoder a
-> Connection -> Text -> HashMap Text Ps -> IO (Stream IO a)
forall a.
HasCallStack =>
RowDecoder a
-> Connection -> Text -> HashMap Text Ps -> IO (Stream IO a)
queryStreamPAs RowDecoder a
decoder Connection
conn Text
cypher HashMap Text Ps
params
    Stream IO a -> IO b
consume Stream IO a
s