| Safe Haskell | None |
|---|---|
| Language | GHC2021 |
Database.Bolty.Streamly
Description
Streamly streaming interface for bolty Neo4j queries.
Instead of buffering all result records into a Vector, this module
yields records one-by-one as a ,
allowing constant-memory consumption of large result sets.Stream IO Record
import qualified Database.Bolty as Bolt import qualified Database.Bolty.Streamly as BoltS import qualified Streamly.Data.Stream as Stream import qualified Streamly.Data.Fold as Fold main :: IO () main = do conn <- Bolt.connect cfg s <- BoltS.queryStream conn "MATCH (n) RETURN n" count <- Stream.fold Fold.length s Bolt.close conn
Synopsis
- queryStream :: HasCallStack => Connection -> Text -> IO (Stream IO Record)
- queryStreamP :: HasCallStack => Connection -> Text -> HashMap Text Ps -> IO (Stream IO Record)
- queryStreamAs :: HasCallStack => RowDecoder a -> Connection -> Text -> IO (Stream IO a)
- queryStreamPAs :: HasCallStack => RowDecoder a -> Connection -> Text -> HashMap Text Ps -> IO (Stream IO a)
- pullStream :: HasCallStack => Connection -> IO (Stream IO Record)
- withPoolStream :: HasCallStack => BoltPool -> Text -> (Stream IO Record -> IO a) -> IO a
- withPoolStreamP :: HasCallStack => BoltPool -> Text -> HashMap Text Ps -> (Stream IO Record -> IO a) -> IO a
- withPoolStreamAs :: HasCallStack => RowDecoder a -> BoltPool -> Text -> (Stream IO a -> IO b) -> IO b
- withPoolStreamPAs :: HasCallStack => RowDecoder a -> BoltPool -> Text -> HashMap Text Ps -> (Stream IO a -> IO b) -> IO b
- withRoutingStream :: HasCallStack => RoutingPool -> AccessMode -> Text -> (Stream IO Record -> IO a) -> IO a
- withRoutingStreamP :: HasCallStack => RoutingPool -> AccessMode -> Text -> HashMap Text Ps -> (Stream IO Record -> IO a) -> IO a
- withRoutingStreamAs :: HasCallStack => RowDecoder a -> RoutingPool -> AccessMode -> Text -> (Stream IO a -> IO b) -> IO b
- withRoutingStreamPAs :: HasCallStack => RowDecoder a -> RoutingPool -> AccessMode -> Text -> HashMap Text Ps -> (Stream IO a -> IO b) -> IO b
- sessionReadStream :: HasCallStack => Session -> Text -> (Stream IO Record -> IO a) -> IO a
- sessionReadStreamP :: HasCallStack => Session -> Text -> HashMap Text Ps -> (Stream IO Record -> IO a) -> IO a
- sessionWriteStream :: HasCallStack => Session -> Text -> (Stream IO Record -> IO a) -> IO a
- sessionWriteStreamP :: HasCallStack => Session -> Text -> HashMap Text Ps -> (Stream IO Record -> IO a) -> IO a
- sessionReadStreamAs :: HasCallStack => RowDecoder a -> Session -> Text -> (Stream IO a -> IO b) -> IO b
- sessionReadStreamPAs :: HasCallStack => RowDecoder a -> Session -> Text -> HashMap Text Ps -> (Stream IO a -> IO b) -> IO b
- sessionWriteStreamAs :: HasCallStack => RowDecoder a -> Session -> Text -> (Stream IO a -> IO b) -> IO b
- sessionWriteStreamPAs :: HasCallStack => RowDecoder a -> Session -> Text -> HashMap Text Ps -> (Stream IO a -> IO b) -> IO b
- data Stream (m :: Type -> Type) a
Streaming queries
queryStream :: HasCallStack => Connection -> Text -> IO (Stream IO Record) Source #
Run a Cypher query and return results as a stream of records.
Records are yielded one at a time as they arrive from the server, without buffering the entire result set in memory.
Must be called in Ready or TXready state.
queryStreamP :: HasCallStack => Connection -> Text -> HashMap Text Ps -> IO (Stream IO Record) Source #
Run a parameterised Cypher query and return results as a stream.
Streaming queries with decoding
queryStreamAs :: HasCallStack => RowDecoder a -> Connection -> Text -> IO (Stream IO a) Source #
Run a Cypher query and decode each record using a RowDecoder.
Throws DecodeError on decode failure.
queryStreamPAs :: HasCallStack => RowDecoder a -> Connection -> Text -> HashMap Text Ps -> IO (Stream IO a) Source #
Run a parameterised Cypher query and decode each record using a RowDecoder.
Throws DecodeError on decode failure.
Low-level streaming
pullStream :: HasCallStack => Connection -> IO (Stream IO Record) Source #
Stream records from an in-progress PULL.
Expects the connection to be in Streaming or TXstreaming state
(i.e. after a RUN has been sent and acknowledged). Sends PULL messages
and yields each Record as it arrives. When the server signals
completion, the state transitions back to Ready / TXready.
Pool-based streaming
withPoolStream :: HasCallStack => BoltPool -> Text -> (Stream IO Record -> IO a) -> IO a Source #
Acquire a connection from a pool, run a streaming query, and pass the stream to the consumer function. The connection is held until the consumer returns. The stream must be fully consumed within the consumer.
withPoolStreamP :: HasCallStack => BoltPool -> Text -> HashMap Text Ps -> (Stream IO Record -> IO a) -> IO a Source #
Like withPoolStream but with query parameters.
withPoolStreamAs :: HasCallStack => RowDecoder a -> BoltPool -> Text -> (Stream IO a -> IO b) -> IO b Source #
Like withPoolStream but decodes each record using a RowDecoder.
withPoolStreamPAs :: HasCallStack => RowDecoder a -> BoltPool -> Text -> HashMap Text Ps -> (Stream IO a -> IO b) -> IO b Source #
Like withPoolStreamP but decodes each record using a RowDecoder.
Routing pool streaming
withRoutingStream :: HasCallStack => RoutingPool -> AccessMode -> Text -> (Stream IO Record -> IO a) -> IO a Source #
Acquire a routed connection, run a streaming query, and pass the stream
to the consumer. Uses ReadAccess or WriteAccess to direct queries
to the appropriate cluster member.
withRoutingStreamP :: HasCallStack => RoutingPool -> AccessMode -> Text -> HashMap Text Ps -> (Stream IO Record -> IO a) -> IO a Source #
Like withRoutingStream but with query parameters.
withRoutingStreamAs :: HasCallStack => RowDecoder a -> RoutingPool -> AccessMode -> Text -> (Stream IO a -> IO b) -> IO b Source #
Like withRoutingStream but decodes each record using a RowDecoder.
withRoutingStreamPAs :: HasCallStack => RowDecoder a -> RoutingPool -> AccessMode -> Text -> HashMap Text Ps -> (Stream IO a -> IO b) -> IO b Source #
Like withRoutingStreamP but decodes each record using a RowDecoder.
Session streaming
sessionReadStream :: HasCallStack => Session -> Text -> (Stream IO Record -> IO a) -> IO a Source #
Run a streaming query inside a managed read transaction. Handles BEGIN, COMMIT, bookmark propagation, and retries on transient failures. Directs queries to read replicas when using a routing session.
sessionReadStreamP :: HasCallStack => Session -> Text -> HashMap Text Ps -> (Stream IO Record -> IO a) -> IO a Source #
Like sessionReadStream but with query parameters.
sessionWriteStream :: HasCallStack => Session -> Text -> (Stream IO Record -> IO a) -> IO a Source #
Run a streaming query inside a managed write transaction. Handles BEGIN, COMMIT, bookmark propagation, and retries on transient failures. Directs queries to the leader when using a routing session.
sessionWriteStreamP :: HasCallStack => Session -> Text -> HashMap Text Ps -> (Stream IO Record -> IO a) -> IO a Source #
Like sessionWriteStream but with query parameters.
sessionReadStreamAs :: HasCallStack => RowDecoder a -> Session -> Text -> (Stream IO a -> IO b) -> IO b Source #
Like sessionReadStream but decodes each record using a RowDecoder.
sessionReadStreamPAs :: HasCallStack => RowDecoder a -> Session -> Text -> HashMap Text Ps -> (Stream IO a -> IO b) -> IO b Source #
Like sessionReadStreamP but decodes each record using a RowDecoder.
sessionWriteStreamAs :: HasCallStack => RowDecoder a -> Session -> Text -> (Stream IO a -> IO b) -> IO b Source #
Like sessionWriteStream but decodes each record using a RowDecoder.
sessionWriteStreamPAs :: HasCallStack => RowDecoder a -> Session -> Text -> HashMap Text Ps -> (Stream IO a -> IO b) -> IO b Source #
Like sessionWriteStreamP but decodes each record using a RowDecoder.
Re-exports
data Stream (m :: Type -> Type) a #
A stream consists of a step function that generates the next step given a current state, and the current state.