bolty-streamly-0.1.0.0: Streamly streaming interface for bolty Neo4j driver
Safe HaskellNone
LanguageGHC2021

Database.Bolty.Streamly

Description

Streamly streaming interface for bolty Neo4j queries.

Instead of buffering all result records into a Vector, this module yields records one-by-one as a Stream IO Record, allowing constant-memory consumption of large result sets.

import qualified Database.Bolty           as Bolt
import qualified Database.Bolty.Streamly  as BoltS
import qualified Streamly.Data.Stream     as Stream
import qualified Streamly.Data.Fold       as Fold

main :: IO ()
main = do
  conn <- Bolt.connect cfg
  s <- BoltS.queryStream conn "MATCH (n) RETURN n"
  count <- Stream.fold Fold.length s
  Bolt.close conn
Synopsis

Streaming queries

queryStream :: HasCallStack => Connection -> Text -> IO (Stream IO Record) Source #

Run a Cypher query and return results as a stream of records.

Records are yielded one at a time as they arrive from the server, without buffering the entire result set in memory.

Must be called in Ready or TXready state.

queryStreamP :: HasCallStack => Connection -> Text -> HashMap Text Ps -> IO (Stream IO Record) Source #

Run a parameterised Cypher query and return results as a stream.

Streaming queries with decoding

queryStreamAs :: HasCallStack => RowDecoder a -> Connection -> Text -> IO (Stream IO a) Source #

Run a Cypher query and decode each record using a RowDecoder. Throws DecodeError on decode failure.

queryStreamPAs :: HasCallStack => RowDecoder a -> Connection -> Text -> HashMap Text Ps -> IO (Stream IO a) Source #

Run a parameterised Cypher query and decode each record using a RowDecoder. Throws DecodeError on decode failure.

Low-level streaming

pullStream :: HasCallStack => Connection -> IO (Stream IO Record) Source #

Stream records from an in-progress PULL.

Expects the connection to be in Streaming or TXstreaming state (i.e. after a RUN has been sent and acknowledged). Sends PULL messages and yields each Record as it arrives. When the server signals completion, the state transitions back to Ready / TXready.

Pool-based streaming

withPoolStream :: HasCallStack => BoltPool -> Text -> (Stream IO Record -> IO a) -> IO a Source #

Acquire a connection from a pool, run a streaming query, and pass the stream to the consumer function. The connection is held until the consumer returns. The stream must be fully consumed within the consumer.

withPoolStreamP :: HasCallStack => BoltPool -> Text -> HashMap Text Ps -> (Stream IO Record -> IO a) -> IO a Source #

Like withPoolStream but with query parameters.

withPoolStreamAs :: HasCallStack => RowDecoder a -> BoltPool -> Text -> (Stream IO a -> IO b) -> IO b Source #

Like withPoolStream but decodes each record using a RowDecoder.

withPoolStreamPAs :: HasCallStack => RowDecoder a -> BoltPool -> Text -> HashMap Text Ps -> (Stream IO a -> IO b) -> IO b Source #

Like withPoolStreamP but decodes each record using a RowDecoder.

Routing pool streaming

withRoutingStream :: HasCallStack => RoutingPool -> AccessMode -> Text -> (Stream IO Record -> IO a) -> IO a Source #

Acquire a routed connection, run a streaming query, and pass the stream to the consumer. Uses ReadAccess or WriteAccess to direct queries to the appropriate cluster member.

withRoutingStreamP :: HasCallStack => RoutingPool -> AccessMode -> Text -> HashMap Text Ps -> (Stream IO Record -> IO a) -> IO a Source #

Like withRoutingStream but with query parameters.

withRoutingStreamAs :: HasCallStack => RowDecoder a -> RoutingPool -> AccessMode -> Text -> (Stream IO a -> IO b) -> IO b Source #

Like withRoutingStream but decodes each record using a RowDecoder.

withRoutingStreamPAs :: HasCallStack => RowDecoder a -> RoutingPool -> AccessMode -> Text -> HashMap Text Ps -> (Stream IO a -> IO b) -> IO b Source #

Like withRoutingStreamP but decodes each record using a RowDecoder.

Session streaming

sessionReadStream :: HasCallStack => Session -> Text -> (Stream IO Record -> IO a) -> IO a Source #

Run a streaming query inside a managed read transaction. Handles BEGIN, COMMIT, bookmark propagation, and retries on transient failures. Directs queries to read replicas when using a routing session.

sessionReadStreamP :: HasCallStack => Session -> Text -> HashMap Text Ps -> (Stream IO Record -> IO a) -> IO a Source #

Like sessionReadStream but with query parameters.

sessionWriteStream :: HasCallStack => Session -> Text -> (Stream IO Record -> IO a) -> IO a Source #

Run a streaming query inside a managed write transaction. Handles BEGIN, COMMIT, bookmark propagation, and retries on transient failures. Directs queries to the leader when using a routing session.

sessionWriteStreamP :: HasCallStack => Session -> Text -> HashMap Text Ps -> (Stream IO Record -> IO a) -> IO a Source #

Like sessionWriteStream but with query parameters.

sessionReadStreamAs :: HasCallStack => RowDecoder a -> Session -> Text -> (Stream IO a -> IO b) -> IO b Source #

Like sessionReadStream but decodes each record using a RowDecoder.

sessionReadStreamPAs :: HasCallStack => RowDecoder a -> Session -> Text -> HashMap Text Ps -> (Stream IO a -> IO b) -> IO b Source #

Like sessionReadStreamP but decodes each record using a RowDecoder.

sessionWriteStreamAs :: HasCallStack => RowDecoder a -> Session -> Text -> (Stream IO a -> IO b) -> IO b Source #

Like sessionWriteStream but decodes each record using a RowDecoder.

sessionWriteStreamPAs :: HasCallStack => RowDecoder a -> Session -> Text -> HashMap Text Ps -> (Stream IO a -> IO b) -> IO b Source #

Like sessionWriteStreamP but decodes each record using a RowDecoder.

Re-exports

data Stream (m :: Type -> Type) a #

A stream consists of a step function that generates the next step given a current state, and the current state.

Instances

Instances details
(Foldable m, Monad m) => Foldable (Stream m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Methods

fold :: Monoid m0 => Stream m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> Stream m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> Stream m a -> m0 #

foldr :: (a -> b -> b) -> b -> Stream m a -> b #

foldr' :: (a -> b -> b) -> b -> Stream m a -> b #

foldl :: (b -> a -> b) -> b -> Stream m a -> b #

foldl' :: (b -> a -> b) -> b -> Stream m a -> b #

foldr1 :: (a -> a -> a) -> Stream m a -> a #

foldl1 :: (a -> a -> a) -> Stream m a -> a #

toList :: Stream m a -> [a] #

null :: Stream m a -> Bool #

length :: Stream m a -> Int #

elem :: Eq a => a -> Stream m a -> Bool #

maximum :: Ord a => Stream m a -> a #

minimum :: Ord a => Stream m a -> a #

sum :: Num a => Stream m a -> a #

product :: Num a => Stream m a -> a #

Monad m => Functor (Stream m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Methods

fmap :: (a -> b) -> Stream m a -> Stream m b #

(<$) :: a -> Stream m b -> Stream m a #

a ~ Char => IsString (Stream Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

IsList (Stream Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Associated Types

type Item (Stream Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

type Item (Stream Identity a) = a
Read a => Read (Stream Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Show a => Show (Stream Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Eq a => Eq (Stream Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Ord a => Ord (Stream Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

type Item (Stream Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

type Item (Stream Identity a) = a