module Hpgsql.Internal
  ( -- * Connection
    connect,
    connectOpts,
    defaultConnectOpts,
    withConnection,
    withConnectionOpts,
    closeGracefully,
    closeForcefully,

    -- * Query
    query,
    queryWith,
    queryMWith,
    queryS,
    querySWith,
    querySMWith,
    query1,
    query1With,
    queryMay,
    queryMayWith,
    execute,
    execute_,

    -- * Pipeline
    Pipeline,
    runPipeline,
    pipelineS,
    pipelineSWith,
    pipelineSMWith,
    pipeline,
    pipelineWith,
    pipelineExec,
    pipelineExec_,
    pipeline1,
    pipeline1With,
    pipelineMay,
    pipelineMayWith,

    -- * Copy
    copyStart,
    copyEnd,
    putCopyData,
    withCopy,
    withCopy_,
    copyFrom,
    copyFromS,

    -- * Pool
    resetConnectionState,

    -- * Transaction
    transactionStatus,

    -- * Notifications
    getNotification,
    getNotificationNonBlocking,

    -- * Type info
    refreshTypeInfoCache,
    resetTypeInfoCache,
    getParameterStatus,
    getBackendPid,

    -- * Misc
    cancelActiveStatement,

    -- * Infrastructure (not re-exported publicly)
    connectionReadyForNewPipeline,
    fullTransactionStatus,
    receiveNextMsgWithMaskedContinuation,
    receiveNextMsgWithMaskedContinuationButDontThrowOnParsingFailure,
    sendCancellationRequest,
    isLastInPipeline,
    updateConnStateTxn,
    withControlMsgsLock,
    receiveOutstandingResponseMsgsAtomically,
    consumeResults,
    consumeResultsIgnoreRows,
    consumeStreamingResults,
    WhichRowDecoder (..),
    mkPostgresError,
    throwPostgresError,
    throwIrrecoverableErrorWithStatement,
    lookupQueryText,
    sendPipeline,
    waitUntilPipelineIsReadyForNewQuery,
    acquireOwnershipOfOrphanedQueries,
    nonAtomicSendMsg,
    rethrowAsIrrecoverable,
    atomicallySendControlMsgs,
    atomicallySendControlMsgs_,
    SomeMessage (..),
    runPipelineInternal,
    InternalConnectOrCancelRequest (..),
    internalConnectOrCancel,
    debugPrint,
    _globalDebugLock,
    timeDebugNonBlockingOperation,
    whenNotClosed,
    chunkBuildersBySize,
    pipelineExecInternal,
    pipelineM,
    withCopyInternal,
    copyEndInternal,
    putCopyError,
  )
where

import Control.Applicative (Alternative (..))
import Control.Concurrent (modifyMVar, modifyMVar_, readMVar)
import Control.Concurrent.MVar (MVar, newMVar)
import Control.Concurrent.STM (STM, TVar)
import qualified Control.Concurrent.STM as STM
import Control.Exception.Safe (MonadThrow, SomeException, bracket, bracketOnError, finally, handle, mask, mask_, onException, throw, toException, tryJust)
import Control.Monad (forM, forM_, join, unless, void, when)
import Data.ByteString (ByteString)
import Data.ByteString.Internal (w2c)
import qualified Data.ByteString.Lazy as LBS
import Data.Data (Proxy (..))
import Data.Either (isLeft, isRight)
import Data.Int (Int32, Int64)
import qualified Data.List as List
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as NE
import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe, isNothing, mapMaybe)
import qualified Data.Serialize as Cereal
import qualified Data.Set as Set
import Data.Text (Text)
import qualified Data.Text as Text
import Data.Text.Encoding (decodeUtf8)
import qualified Data.Text.IO as Text
import Data.Time (DiffTime, diffTimeToPicoseconds, secondsToDiffTime)
import GHC.Conc (ThreadStatus (..), threadStatus)
import Hpgsql.Base
import qualified Hpgsql.Builder as Builder
import Hpgsql.Encoding (FieldInfo (..), FromPgRow (..), RowDecoder (..), RowEncoder (..), ToPgRow (..))
import Hpgsql.Encoding.RowDecoderMonadic (ConversionState (..), RowDecoderMonadic (..))
import Hpgsql.InternalTypes (BindComplete (..), CommandComplete (..), ConnectOpts (..), ConnectionString (..), CopyInResponse (..), CopyQueryState (..), DataRow (..), Either3 (..), EncodingContext (..), ErrorDetail (..), ErrorResponse (..), HPgConnection (..), InternalConnectionState (..), IrrecoverableHpgsqlError (..), NoData (..), NotificationResponse (..), ParseComplete (..), Pipeline (..), PostgresError (..), Query (..), QueryId (..), QueryProtocol (..), QueryState (..), ReadyForQuery (..), ResetConnectionOpts (..), ResponseMsg (..), ResponseMsgsReceived (..), RowDescription (..), SingleQuery (..), TransactionStatus (..), WeakThreadId (..), mkMutex, queryToByteString, throwIrrecoverableError)
import Hpgsql.Locking (getMyWeakThreadId, withMutex)
import Hpgsql.Msgs (AuthenticationMethod (..), AuthenticationResponse (..), BackendKeyData (..), Bind (..), CancelRequest (..), CopyData (..), CopyDone (..), Describe (..), Execute (..), FromPgMessage (..), NoticeResponse (..), ParameterStatus (..), Parse (..), PasswordMessage (..), PgMsgParser (..), StartupMessage (..), Sync (..), Terminate (..), ToPgMessage (..), parsePgMessage)
import qualified Hpgsql.Msgs as Msgs
import Hpgsql.Networking (recvNonBlocking, sendNonBlocking, socketWaitRead, socketWaitWrite)
import Hpgsql.Query (breakQueryIntoStatements)
import qualified Hpgsql.SimpleParser as Parser
import Hpgsql.TypeInfo (ArrayTypeDetails (..), TypeDetails (..), TypeInfo (..), buildTypeInfoCache, builtinPgTypesMap)
import Network.Socket (AddrInfo (..))
import qualified Network.Socket as Socket
import qualified Network.Socket.ByteString as SocketBS
import qualified Network.Socket.ByteString.Lazy as SocketLBS
import Streaming (Of (..), Stream)
import qualified Streaming as S
import qualified Streaming.Internal as SInternal
import qualified Streaming.Prelude as S
import System.IO (stderr)
import System.IO.Error (isResourceVanishedError)
import System.IO.Unsafe (unsafePerformIO)
import System.Mem.Weak (deRefWeak)
import System.Timeout (timeout)

-- | Returns a Left with the current pipeline if connection is not ready for a new pipeline, a Right
-- with the current transaction status otherwise.
connectionReadyForNewPipeline :: InternalConnectionState -> Either (NonEmpty QueryState) TransactionStatus
connectionReadyForNewPipeline :: InternalConnectionState
-> Either (NonEmpty QueryState) TransactionStatus
connectionReadyForNewPipeline (InternalConnectionState -> [QueryState]
currentPipeline -> [QueryState]
pip) =
  -- You can tell there are two ways to represent no active pipeline, aka being ready
  -- to send a new query: and empty pipeline and a pipeline with a ReadyForQuery received.
  -- This might seem silly, but it really helps resuming interrupted execution from a point
  -- when ReadyForQuery was already received, because we can still associate a ReadyForQuery
  -- with a QueryId.
  -- The empty list should only exist immediately after connecting and before the very first
  -- query is sent. After that it's never empty again as new pipelines replace old ones.
  case [QueryState]
pip of
    [] -> TransactionStatus -> Either (NonEmpty QueryState) TransactionStatus
forall a b. b -> Either a b
Right TransactionStatus
TransIdle
    (QueryState
q1 : [QueryState]
qs) -> case [TransactionStatus] -> Maybe TransactionStatus
forall a. [a] -> Maybe a
headMaybe ([TransactionStatus] -> Maybe TransactionStatus)
-> [TransactionStatus] -> Maybe TransactionStatus
forall a b. (a -> b) -> a -> b
$
      (QueryState -> Maybe TransactionStatus)
-> [QueryState] -> [TransactionStatus]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe
        ( \QueryState
qstate -> case QueryState -> ResponseMsgsReceived
responseMsgsState QueryState
qstate of
            ReadyForQueryReceived Either ErrorResponse CommandComplete
_ (ReadyForQuery TransactionStatus
s) -> TransactionStatus -> Maybe TransactionStatus
forall a. a -> Maybe a
Just TransactionStatus
s
            ResponseMsgsReceived
_ -> Maybe TransactionStatus
forall a. Maybe a
Nothing
        )
        [QueryState]
pip of
      Maybe TransactionStatus
Nothing -> NonEmpty QueryState
-> Either (NonEmpty QueryState) TransactionStatus
forall a b. a -> Either a b
Left (QueryState
q1 QueryState -> [QueryState] -> NonEmpty QueryState
forall a. a -> [a] -> NonEmpty a
:| [QueryState]
qs)
      Just TransactionStatus
st -> TransactionStatus -> Either (NonEmpty QueryState) TransactionStatus
forall a b. b -> Either a b
Right TransactionStatus
st

-- | Returns the transaction's status _before_ the current pipeline was sent
-- and the current transaction status.
-- The former can be used to determine if the pipeline was initiated inside
-- an explicit or implicit transaction.
fullTransactionStatus :: TVar InternalConnectionState -> STM (TransactionStatus, TransactionStatus)
fullTransactionStatus :: TVar InternalConnectionState
-> STM (TransactionStatus, TransactionStatus)
fullTransactionStatus TVar InternalConnectionState
sttv = do
  st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
sttv
  (st.transactionStatusBeforeCurrentPipeline,) <$> case connectionReadyForNewPipeline st of
    Right TransactionStatus
s -> TransactionStatus -> STM TransactionStatus
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TransactionStatus
s
    Left NonEmpty QueryState
pip ->
      TransactionStatus -> STM TransactionStatus
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TransactionStatus -> STM TransactionStatus)
-> TransactionStatus -> STM TransactionStatus
forall a b. (a -> b) -> a -> b
$
        if (QueryState -> Bool) -> NonEmpty QueryState -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any
          ( ( \case
                ErrorResponseReceived Maybe (Either3 NoData RowDescription CopyInResponse)
_ ErrorResponse
_ -> Bool
True
                ResponseMsgsReceived
_ -> Bool
False
            )
              (ResponseMsgsReceived -> Bool)
-> (QueryState -> ResponseMsgsReceived) -> QueryState -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QueryState -> ResponseMsgsReceived
responseMsgsState
          )
          NonEmpty QueryState
pip
          then
            TransactionStatus
TransInError
          else TransactionStatus
TransActive

-- | The current transaction status.
-- Note that if `withTransaction` is interruped by an asynchronous exception,
-- this may still report `TransActive` until you run your next command.
-- See `withTransaction` for details.
transactionStatus :: HPgConnection -> IO TransactionStatus
transactionStatus :: HPgConnection -> IO TransactionStatus
transactionStatus HPgConnection
conn = (TransactionStatus, TransactionStatus) -> TransactionStatus
forall a b. (a, b) -> b
snd ((TransactionStatus, TransactionStatus) -> TransactionStatus)
-> IO (TransactionStatus, TransactionStatus)
-> IO TransactionStatus
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (TransactionStatus, TransactionStatus)
-> IO (TransactionStatus, TransactionStatus)
forall a. STM a -> IO a
STM.atomically (TVar InternalConnectionState
-> STM (TransactionStatus, TransactionStatus)
fullTransactionStatus HPgConnection
conn.internalConnectionState)

connect :: ConnectionString -> DiffTime -> IO HPgConnection
connect :: ConnectionString -> DiffTime -> IO HPgConnection
connect =
  ConnectOpts -> ConnectionString -> DiffTime -> IO HPgConnection
connectOpts ConnectOpts
defaultConnectOpts

connectOpts :: ConnectOpts -> ConnectionString -> DiffTime -> IO HPgConnection
connectOpts :: ConnectOpts -> ConnectionString -> DiffTime -> IO HPgConnection
connectOpts =
  InternalConnectOrCancelRequest HPgConnection
-> ConnectOpts -> ConnectionString -> DiffTime -> IO HPgConnection
forall a.
InternalConnectOrCancelRequest a
-> ConnectOpts -> ConnectionString -> DiffTime -> IO a
internalConnectOrCancel
    InternalConnectOrCancelRequest HPgConnection
Connect

defaultConnectOpts :: ConnectOpts
defaultConnectOpts :: ConnectOpts
defaultConnectOpts =
  ConnectOpts
    { killedThreadPollIntervalMs :: Int
killedThreadPollIntervalMs = Int
500,
      cancellationRequestResendIntervalMs :: Int
cancellationRequestResendIntervalMs = Int
500,
      fillTypeInfoCache :: Bool
fillTypeInfoCache = Bool
True
    }

data InternalConnectOrCancelRequest a where
  Connect :: InternalConnectOrCancelRequest HPgConnection
  CancelNotConnect :: CancelRequest -> AddrInfo -> InternalConnectOrCancelRequest ()

internalConnectOrCancel :: InternalConnectOrCancelRequest a -> ConnectOpts -> ConnectionString -> DiffTime -> IO a
internalConnectOrCancel :: forall a.
InternalConnectOrCancelRequest a
-> ConnectOpts -> ConnectionString -> DiffTime -> IO a
internalConnectOrCancel InternalConnectOrCancelRequest a
connectOrCancel ConnectOpts
connOpts originalConnStr :: ConnectionString
originalConnStr@ConnectionString {Word16
Text
hostname :: Text
port :: Word16
user :: Text
password :: Text
database :: Text
options :: Text
options :: ConnectionString -> Text
database :: ConnectionString -> Text
password :: ConnectionString -> Text
user :: ConnectionString -> Text
port :: ConnectionString -> Word16
hostname :: ConnectionString -> Text
..} DiffTime
conntimeout = do
  sockOrTimeout <- Int -> IO (Socket, AddrInfo) -> IO (Maybe (Socket, AddrInfo))
forall a. Int -> IO a -> IO (Maybe a)
timeout (Integer -> Int
forall a. Num a => Integer -> a
fromInteger (Integer -> Int) -> Integer -> Int
forall a b. (a -> b) -> a -> b
$ DiffTime -> Integer
diffTimeToPicoseconds DiffTime
conntimeout Integer -> Integer -> Integer
forall a. Integral a => a -> a -> a
`div` Integer
1_000_000) IO (Socket, AddrInfo)
getConnectedSocket
  case sockOrTimeout of
    Maybe (Socket, AddrInfo)
Nothing -> Text -> IO a
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Could not connect in the supplied timeout"
    -- TODO: It's still possible for an asynchronous exception to interrupt this before the `onException` handler is installed
    Just (Socket
sock, AddrInfo
addrInfo) -> (IO a -> IO () -> IO a) -> IO () -> IO a -> IO a
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO a -> IO () -> IO a
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
onException (Socket -> IO ()
Socket.close Socket
sock) (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ do
      Socket -> (CInt -> IO Bool) -> IO Bool
forall r. Socket -> (CInt -> IO r) -> IO r
Socket.withFdSocket Socket
sock CInt -> IO Bool
Socket.getNonBlock IO Bool -> (Bool -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Bool
False -> Text -> IO ()
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Socket is not marked as non-blocking, which is not supported by hpgsql. You might be running on an unsupported platform"
        Bool
True -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      socketIsClosed <- Bool -> IO (MVar Bool)
forall a. a -> IO (MVar a)
newMVar Bool
False
      recvBuffer <- newMVar mempty
      sendBuffer <- newMVar mempty
      socketMutex <- mkMutex
      encodingContext <- newMVar (EncodingContext builtinPgTypesMap)
      connParams <- newMVar mempty
      notifQueue <- STM.newTQueueIO
      currentConnectionState <-
        STM.newTVarIO $
          InternalConnectionState
            { totalQueriesSent = 0,
              currentPipeline = [],
              notificationsReceived = notifQueue,
              mustIssueRollbackBeforeNextCommand = False,
              preparedStatementNames = mempty,
              transactionStatusBeforeCurrentPipeline = TransIdle
            }
      let hpgConnPartialDoNotReturn = Socket
-> MVar Bool
-> MVar ByteString
-> MVar [(ByteString, STM ())]
-> Mutex
-> ConnectionString
-> AddrInfo
-> MVar EncodingContext
-> MVar (Map Text Text)
-> TVar InternalConnectionState
-> Int32
-> Int32
-> ConnectOpts
-> HPgConnection
HPgConnection Socket
sock MVar Bool
socketIsClosed MVar ByteString
recvBuffer MVar [(ByteString, STM ())]
sendBuffer Mutex
socketMutex ConnectionString
originalConnStr AddrInfo
addrInfo MVar EncodingContext
encodingContext MVar (Map Text Text)
connParams TVar InternalConnectionState
currentConnectionState Int32
0 Int32
0 ConnectOpts
connOpts
      case connectOrCancel of
        CancelNotConnect CancelRequest
cancelRequest AddrInfo
_ -> do
          HPgConnection -> CancelRequest -> IO ()
forall msg.
(ToPgMessage msg, Show msg) =>
HPgConnection -> msg -> IO ()
nonAtomicSendMsg HPgConnection
hpgConnPartialDoNotReturn CancelRequest
cancelRequest
          -- We _must_ wait until the socket is closed _by the other end_ (PostgreSQL-the-server),
          -- because otherwise this cancellation request might be processed while the client sends
          -- another query. See https://www.postgresql.org/message-id/flat/27126.1126649920%40sss.pgh.pa.us#75364d0966758fccad56cd6c71547771
          IO (Either () ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Either () ()) -> IO ()) -> IO (Either () ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ (IOError -> Maybe ()) -> IO () -> IO (Either () ())
forall (m :: * -> *) e b a.
(HasCallStack, MonadCatch m, Exception e) =>
(e -> Maybe b) -> m a -> m (Either b a)
tryJust (\IOError
err -> if IOError -> Bool
isResourceVanishedError IOError
err then () -> Maybe ()
forall a. a -> Maybe a
Just () else Maybe ()
forall a. Maybe a
Nothing) (IO () -> IO (Either () ())) -> IO () -> IO (Either () ())
forall a b. (a -> b) -> a -> b
$ Socket -> IO ()
socketWaitRead (HPgConnection -> Socket
socket HPgConnection
hpgConnPartialDoNotReturn)
          Socket -> IO ()
Socket.close Socket
sock
        InternalConnectOrCancelRequest a
Connect -> do
          [Char] -> IO ()
debugPrint [Char]
"Sending startup message"
          -- We set the client_encoding to UTF8 at connection time
          HPgConnection -> StartupMessage -> IO ()
forall msg.
(ToPgMessage msg, Show msg) =>
HPgConnection -> msg -> IO ()
nonAtomicSendMsg HPgConnection
hpgConnPartialDoNotReturn (StartupMessage -> IO ()) -> StartupMessage -> IO ()
forall a b. (a -> b) -> a -> b
$ StartupMessage {user :: [Char]
user = Text -> [Char]
Text.unpack Text
user, database :: [Char]
database = Text -> [Char]
Text.unpack Text
database, options :: [Char]
options = [Char]
"-c client_encoding=UTF8 " [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> Text -> [Char]
Text.unpack Text
options}
          AuthenticationResponse authMethod <- HPgConnection
-> PgMsgParser AuthenticationResponse
-> (Either (Char, Maybe PostgresError) AuthenticationResponse
    -> IO AuthenticationResponse)
-> IO AuthenticationResponse
forall a b.
Show a =>
HPgConnection
-> PgMsgParser a
-> (Either (Char, Maybe PostgresError) a -> IO b)
-> IO b
receiveNextMsgWithMaskedContinuationButDontThrowOnParsingFailure HPgConnection
hpgConnPartialDoNotReturn (forall a. FromPgMessage a => PgMsgParser a
msgParser @Msgs.AuthenticationResponse) ((Either (Char, Maybe PostgresError) AuthenticationResponse
  -> IO AuthenticationResponse)
 -> IO AuthenticationResponse)
-> (Either (Char, Maybe PostgresError) AuthenticationResponse
    -> IO AuthenticationResponse)
-> IO AuthenticationResponse
forall a b. (a -> b) -> a -> b
$ \case
            Right AuthenticationResponse
knownAuthMsg -> AuthenticationResponse -> IO AuthenticationResponse
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure AuthenticationResponse
knownAuthMsg
            Left (Char, Maybe PostgresError)
unknownAuthMsg -> Text -> IO AuthenticationResponse
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError (Text -> IO AuthenticationResponse)
-> Text -> IO AuthenticationResponse
forall a b. (a -> b) -> a -> b
$ Text
"Received unknown authentication message from PostgreSQL. This is probably an authentication method unsupported by hpgsql. More details about the message: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
Text.pack ((Char, Maybe PostgresError) -> [Char]
forall a. Show a => a -> [Char]
show (Char, Maybe PostgresError)
unknownAuthMsg)
          case authMethod of
            AuthenticationMethod
AuthOk -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            AuthenticationMethod
AuthCleartextPassword -> do
              HPgConnection -> PasswordMessage -> IO ()
forall msg.
(ToPgMessage msg, Show msg) =>
HPgConnection -> msg -> IO ()
nonAtomicSendMsg HPgConnection
hpgConnPartialDoNotReturn (PasswordMessage -> IO ()) -> PasswordMessage -> IO ()
forall a b. (a -> b) -> a -> b
$ AuthenticationMethod -> [Char] -> [Char] -> PasswordMessage
PasswordMessage AuthenticationMethod
authMethod (Text -> [Char]
Text.unpack Text
user) (Text -> [Char]
Text.unpack Text
password)
              HPgConnection -> IO ()
receiveAuthOkOrThrow HPgConnection
hpgConnPartialDoNotReturn
            AuthMD5Password ByteString
_ -> do
              HPgConnection -> PasswordMessage -> IO ()
forall msg.
(ToPgMessage msg, Show msg) =>
HPgConnection -> msg -> IO ()
nonAtomicSendMsg HPgConnection
hpgConnPartialDoNotReturn (PasswordMessage -> IO ()) -> PasswordMessage -> IO ()
forall a b. (a -> b) -> a -> b
$ AuthenticationMethod -> [Char] -> [Char] -> PasswordMessage
PasswordMessage AuthenticationMethod
authMethod (Text -> [Char]
Text.unpack Text
user) (Text -> [Char]
Text.unpack Text
password)
              HPgConnection -> IO ()
receiveAuthOkOrThrow HPgConnection
hpgConnPartialDoNotReturn
            AuthenticationMethod
_ -> Text -> IO ()
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Hpgsql does not yet support authenticating with method " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
Text.pack (AuthenticationMethod -> [Char]
forall a. Show a => a -> [Char]
show AuthenticationMethod
authMethod)
          errorOrBackendKeyData <- receiveNextMsgUnsafe hpgConnPartialDoNotReturn $ Right <$> msgParser @BackendKeyData <|> Left <$> msgParser @ErrorResponse
          case errorOrBackendKeyData of
            Left ErrorResponse
errResp -> IrrecoverableHpgsqlError -> IO a
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throw (IrrecoverableHpgsqlError -> IO a)
-> IrrecoverableHpgsqlError -> IO a
forall a b. (a -> b) -> a -> b
$ IrrecoverableHpgsqlError {hpgsqlDetails :: Text
hpgsqlDetails = Text
"Socket connected but postgresql threw an error during connection startup handshake", innerException :: Maybe SomeException
innerException = SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just (SomeException -> Maybe SomeException)
-> SomeException -> Maybe SomeException
forall a b. (a -> b) -> a -> b
$ PostgresError -> SomeException
forall e. Exception e => e -> SomeException
toException (PostgresError -> SomeException) -> PostgresError -> SomeException
forall a b. (a -> b) -> a -> b
$ ByteString -> ErrorResponse -> PostgresError
mkPostgresError ByteString
"" ErrorResponse
errResp, relatedStatement :: Maybe ByteString
relatedStatement = Maybe ByteString
forall a. Maybe a
Nothing}
            Right BackendKeyData
backendKeyData -> do
              readyForQueryOrError <- HPgConnection
-> PgMsgParser (Either ErrorResponse ReadyForQuery)
-> IO (Either ErrorResponse ReadyForQuery)
forall a. Show a => HPgConnection -> PgMsgParser a -> IO a
receiveNextMsgUnsafe HPgConnection
hpgConnPartialDoNotReturn (PgMsgParser (Either ErrorResponse ReadyForQuery)
 -> IO (Either ErrorResponse ReadyForQuery))
-> PgMsgParser (Either ErrorResponse ReadyForQuery)
-> IO (Either ErrorResponse ReadyForQuery)
forall a b. (a -> b) -> a -> b
$ ReadyForQuery -> Either ErrorResponse ReadyForQuery
forall a b. b -> Either a b
Right (ReadyForQuery -> Either ErrorResponse ReadyForQuery)
-> PgMsgParser ReadyForQuery
-> PgMsgParser (Either ErrorResponse ReadyForQuery)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @ReadyForQuery PgMsgParser (Either ErrorResponse ReadyForQuery)
-> PgMsgParser (Either ErrorResponse ReadyForQuery)
-> PgMsgParser (Either ErrorResponse ReadyForQuery)
forall a. PgMsgParser a -> PgMsgParser a -> PgMsgParser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ErrorResponse -> Either ErrorResponse ReadyForQuery
forall a b. a -> Either a b
Left (ErrorResponse -> Either ErrorResponse ReadyForQuery)
-> PgMsgParser ErrorResponse
-> PgMsgParser (Either ErrorResponse ReadyForQuery)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @ErrorResponse
              case readyForQueryOrError of
                Left ErrorResponse
errResp -> IrrecoverableHpgsqlError -> IO ()
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throw (IrrecoverableHpgsqlError -> IO ())
-> IrrecoverableHpgsqlError -> IO ()
forall a b. (a -> b) -> a -> b
$ IrrecoverableHpgsqlError {hpgsqlDetails :: Text
hpgsqlDetails = Text
"A postgresql error happened while connecting", innerException :: Maybe SomeException
innerException = SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just (SomeException -> Maybe SomeException)
-> SomeException -> Maybe SomeException
forall a b. (a -> b) -> a -> b
$ PostgresError -> SomeException
forall e. Exception e => e -> SomeException
toException (PostgresError -> SomeException) -> PostgresError -> SomeException
forall a b. (a -> b) -> a -> b
$ ByteString -> ErrorResponse -> PostgresError
mkPostgresError ByteString
"" ErrorResponse
errResp, relatedStatement :: Maybe ByteString
relatedStatement = Maybe ByteString
forall a. Maybe a
Nothing}
                Right ReadyForQuery {} -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
              debugPrint $ "Connected with backend PID " ++ show (backendPid backendKeyData)
              let finalConn = HPgConnection
hpgConnPartialDoNotReturn {connPid = backendPid backendKeyData, cancelSecretKey = backendSecretKey backendKeyData}
              when (fillTypeInfoCache connOpts) $ join $ runPipeline finalConn $ refreshTypeInfoCache finalConn
              pure finalConn
  where
    -- \| Unsafe version of `withSafeReceiveNextMsg`.
    receiveNextMsgUnsafe :: (Show a) => HPgConnection -> PgMsgParser a -> IO a
    receiveNextMsgUnsafe :: forall a. Show a => HPgConnection -> PgMsgParser a -> IO a
receiveNextMsgUnsafe HPgConnection
conn PgMsgParser a
parser = HPgConnection -> PgMsgParser a -> (a -> IO a) -> IO a
forall a b.
Show a =>
HPgConnection -> PgMsgParser a -> (a -> IO b) -> IO b
receiveNextMsgWithMaskedContinuation HPgConnection
conn PgMsgParser a
parser a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure

    receiveAuthOkOrThrow :: HPgConnection -> IO ()
    receiveAuthOkOrThrow :: HPgConnection -> IO ()
receiveAuthOkOrThrow HPgConnection
conn = do
      authMsg <- HPgConnection
-> PgMsgParser (Either ErrorResponse AuthenticationResponse)
-> IO (Either ErrorResponse AuthenticationResponse)
forall a. Show a => HPgConnection -> PgMsgParser a -> IO a
receiveNextMsgUnsafe HPgConnection
conn (AuthenticationResponse
-> Either ErrorResponse AuthenticationResponse
forall a b. b -> Either a b
Right (AuthenticationResponse
 -> Either ErrorResponse AuthenticationResponse)
-> PgMsgParser AuthenticationResponse
-> PgMsgParser (Either ErrorResponse AuthenticationResponse)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @AuthenticationResponse PgMsgParser (Either ErrorResponse AuthenticationResponse)
-> PgMsgParser (Either ErrorResponse AuthenticationResponse)
-> PgMsgParser (Either ErrorResponse AuthenticationResponse)
forall a. PgMsgParser a -> PgMsgParser a -> PgMsgParser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ErrorResponse -> Either ErrorResponse AuthenticationResponse
forall a b. a -> Either a b
Left (ErrorResponse -> Either ErrorResponse AuthenticationResponse)
-> PgMsgParser ErrorResponse
-> PgMsgParser (Either ErrorResponse AuthenticationResponse)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @ErrorResponse)
      case authMsg of
        Left ErrorResponse
errResp -> IrrecoverableHpgsqlError -> IO ()
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throw (IrrecoverableHpgsqlError -> IO ())
-> IrrecoverableHpgsqlError -> IO ()
forall a b. (a -> b) -> a -> b
$ IrrecoverableHpgsqlError {hpgsqlDetails :: Text
hpgsqlDetails = Text
"A postgresql error happened while connecting", innerException :: Maybe SomeException
innerException = SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just (SomeException -> Maybe SomeException)
-> SomeException -> Maybe SomeException
forall a b. (a -> b) -> a -> b
$ PostgresError -> SomeException
forall e. Exception e => e -> SomeException
toException (PostgresError -> SomeException) -> PostgresError -> SomeException
forall a b. (a -> b) -> a -> b
$ ByteString -> ErrorResponse -> PostgresError
mkPostgresError ByteString
"" ErrorResponse
errResp, relatedStatement :: Maybe ByteString
relatedStatement = Maybe ByteString
forall a. Maybe a
Nothing}
        Right (AuthenticationResponse AuthenticationMethod
authMethod) ->
          case AuthenticationMethod
authMethod of
            AuthenticationMethod
AuthOk -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            AuthenticationMethod
_ -> Text -> IO ()
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Failed to authenticate user."

    getConnectedSocket :: IO (Socket, AddrInfo)
getConnectedSocket = do
      addrInfo <- case InternalConnectOrCancelRequest a
connectOrCancel of
        CancelNotConnect CancelRequest
_ AddrInfo
addrInfo -> AddrInfo -> IO AddrInfo
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure AddrInfo
addrInfo
        InternalConnectOrCancelRequest a
Connect ->
          if Text
"/" Text -> Text -> Bool
`Text.isInfixOf` Text
hostname
            then
              AddrInfo -> IO AddrInfo
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
                AddrInfo
                  { addrFlags :: [AddrInfoFlag]
addrFlags = [],
                    addrFamily :: Family
addrFamily = Family
Socket.AF_UNIX,
                    addrSocketType :: SocketType
addrSocketType = SocketType
Socket.Stream,
                    addrProtocol :: CInt
addrProtocol = CInt
Socket.defaultProtocol,
                    addrAddress :: SockAddr
addrAddress = [Char] -> SockAddr
Socket.SockAddrUnix ([Char] -> SockAddr) -> [Char] -> SockAddr
forall a b. (a -> b) -> a -> b
$ Text -> [Char]
Text.unpack ((Char -> Bool) -> Text -> Text
Text.dropWhileEnd (Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
'/') Text
hostname) [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"/.s.PGSQL." [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Word16 -> [Char]
forall a. Show a => a -> [Char]
show Word16
port,
                    addrCanonName :: Maybe [Char]
addrCanonName = Maybe [Char]
forall a. Maybe a
Nothing
                  }
            else do
              addrInfos <- Maybe AddrInfo -> Maybe [Char] -> Maybe [Char] -> IO [AddrInfo]
forall (t :: * -> *).
GetAddrInfo t =>
Maybe AddrInfo -> Maybe [Char] -> Maybe [Char] -> IO (t AddrInfo)
Socket.getAddrInfo (AddrInfo -> Maybe AddrInfo
forall a. a -> Maybe a
Just AddrInfo
Socket.defaultHints) ([Char] -> Maybe [Char]
forall a. a -> Maybe a
Just ([Char] -> Maybe [Char]) -> [Char] -> Maybe [Char]
forall a b. (a -> b) -> a -> b
$ Text -> [Char]
Text.unpack Text
hostname) ([Char] -> Maybe [Char]
forall a. a -> Maybe a
Just ([Char] -> Maybe [Char]) -> [Char] -> Maybe [Char]
forall a b. (a -> b) -> a -> b
$ Word16 -> [Char]
forall a. Show a => a -> [Char]
show Word16
port)
              case addrInfos of
                [] -> Text -> IO AddrInfo
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Could not resolve address"
                AddrInfo
addrInfo : [AddrInfo]
_ -> AddrInfo -> IO AddrInfo
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure AddrInfo
addrInfo
      -- TODO: It's still possible for an asynchronous exception to interrupt this before the `onException` handler is installed
      sock <- Socket.openSocket addrInfo
      flip onException (Socket.close sock) $ do
        Socket.connect sock (Socket.addrAddress addrInfo)
        pure (sock, addrInfo)

-- | Fetches custom types from postgres and refreshes this connection's
-- internal typeInfo cache with them.
-- Note that builtin postgres types are always available in the typeInfo cache;
-- this just refreshes any other custom types, including removing those that
-- no longer exist and adding new ones.
-- Hpgsql runs this automatically for new connections unless you disable it.
-- This is a Pipeline so you can batch it with other commands for reduced latency.
-- See `ConnectOpts` and `resetTypeInfoCache` for more.
refreshTypeInfoCache :: HPgConnection -> Pipeline (IO ())
refreshTypeInfoCache :: HPgConnection -> Pipeline (IO ())
refreshTypeInfoCache HPgConnection
conn =
  -- https://www.postgresql.org/docs/current/system-catalog-initial-data.html#SYSTEM-CATALOG-OID-ASSIGNMENT
  -- says "OIDs assigned during normal database operation are constrained to be 16384 or higher. This ensures that the range 10000—16383 is free for OIDs assigned automatically by genbki.pl or during initdb. These automatically-assigned OIDs are not considered stable, and may change from one installation to another."
  let fetchPipeline :: Pipeline (IO [(Oid, Text, Oid, Oid, Char, Char)])
fetchPipeline = Query -> Pipeline (IO [(Oid, Text, Oid, Oid, Char, Char)])
forall a. FromPgRow a => Query -> Pipeline (IO [a])
pipeline Query
"select oid, typname, typarray, typelem, typcategory, typtype from pg_catalog.pg_type WHERE oid >= 16384"
   in IO [(Oid, Text, Oid, Oid, Char, Char)] -> IO ()
fillTypeInfoCache (IO [(Oid, Text, Oid, Oid, Char, Char)] -> IO ())
-> Pipeline (IO [(Oid, Text, Oid, Oid, Char, Char)])
-> Pipeline (IO ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Pipeline (IO [(Oid, Text, Oid, Oid, Char, Char)])
fetchPipeline
  where
    fillTypeInfoCache :: IO [(Oid, Text, Oid, Oid, Char, Char)] -> IO ()
fillTypeInfoCache IO [(Oid, Text, Oid, Oid, Char, Char)]
queryResultsIO = do
      queryResults <- IO [(Oid, Text, Oid, Oid, Char, Char)]
queryResultsIO
      let customTypes = [TypeInfo] -> TypeInfoCache
buildTypeInfoCache ([TypeInfo] -> TypeInfoCache) -> [TypeInfo] -> TypeInfoCache
forall a b. (a -> b) -> a -> b
$ ((Oid, Text, Oid, Oid, Char, Char) -> TypeInfo)
-> [(Oid, Text, Oid, Oid, Char, Char)] -> [TypeInfo]
forall a b. (a -> b) -> [a] -> [b]
map (\(Oid
oid, Text
typname, Oid
typarray, Oid
typelem, Char
typcategory, Char
typtype) -> Oid -> Text -> Maybe Oid -> TypeDetails -> TypeInfo
TypeInfo Oid
oid Text
typname (if Oid
typarray Oid -> Oid -> Bool
forall a. Eq a => a -> a -> Bool
== Oid
0 then Maybe Oid
forall a. Maybe a
Nothing else Oid -> Maybe Oid
forall a. a -> Maybe a
Just Oid
typarray) (Oid -> Char -> Char -> TypeDetails
toTypeDetails Oid
typelem Char
typcategory Char
typtype)) [(Oid, Text, Oid, Oid, Char, Char)]
queryResults
      modifyMVar_ conn.encodingContext $ \EncodingContext
_ -> EncodingContext -> IO EncodingContext
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (EncodingContext -> IO EncodingContext)
-> EncodingContext -> IO EncodingContext
forall a b. (a -> b) -> a -> b
$ TypeInfoCache -> EncodingContext
EncodingContext (TypeInfoCache -> EncodingContext)
-> TypeInfoCache -> EncodingContext
forall a b. (a -> b) -> a -> b
$ TypeInfoCache
customTypes TypeInfoCache -> TypeInfoCache -> TypeInfoCache
forall a. Semigroup a => a -> a -> a
<> TypeInfoCache
builtinPgTypesMap
    toTypeDetails :: Oid -> Char -> Char -> TypeDetails
toTypeDetails Oid
typelem Char
typcategory Char
typtype = case (Char
typcategory, Char
typtype) of
      (Char
'A', Char
_) -> ArrayTypeDetails -> TypeDetails
ArrayType (Oid -> ArrayTypeDetails
ArrayTypeDetails Oid
typelem)
      (Char
_, Char
'c') -> TypeDetails
CompositeType
      (Char
_, Char
'd') -> TypeDetails
DomainType
      (Char
_, Char
'e') -> TypeDetails
EnumType
      (Char
_, Char
'p') -> TypeDetails
PseudoType
      (Char
_, Char
'r') -> TypeDetails
RangeType
      (Char
_, Char
'm') -> TypeDetails
MultiRangeType
      (Char, Char)
_ -> TypeDetails
BasicType

-- | Useful to reset the connection's internal typeInfo cache
-- to the builtin postgres types.
resetTypeInfoCache :: HPgConnection -> IO ()
resetTypeInfoCache :: HPgConnection -> IO ()
resetTypeInfoCache HPgConnection
conn = MVar EncodingContext
-> (EncodingContext -> IO EncodingContext) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ HPgConnection
conn.encodingContext ((EncodingContext -> IO EncodingContext) -> IO ())
-> (EncodingContext -> IO EncodingContext) -> IO ()
forall a b. (a -> b) -> a -> b
$ \EncodingContext
_ -> EncodingContext -> IO EncodingContext
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (EncodingContext -> IO EncodingContext)
-> EncodingContext -> IO EncodingContext
forall a b. (a -> b) -> a -> b
$ TypeInfoCache -> EncodingContext
EncodingContext TypeInfoCache
builtinPgTypesMap

getParameterStatus :: HPgConnection -> Text -> IO (Maybe Text)
getParameterStatus :: HPgConnection -> Text -> IO (Maybe Text)
getParameterStatus HPgConnection {MVar (Map Text Text)
parameterStatusMap :: MVar (Map Text Text)
parameterStatusMap :: HPgConnection -> MVar (Map Text Text)
parameterStatusMap} Text
paramName = Text -> Map Text Text -> Maybe Text
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Text
paramName (Map Text Text -> Maybe Text)
-> IO (Map Text Text) -> IO (Maybe Text)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MVar (Map Text Text) -> IO (Map Text Text)
forall a. MVar a -> IO a
readMVar MVar (Map Text Text)
parameterStatusMap

getBackendPid :: HPgConnection -> Int32
getBackendPid :: HPgConnection -> Int32
getBackendPid HPgConnection {Int32
connPid :: HPgConnection -> Int32
connPid :: Int32
connPid} = Int32
connPid

-- | Resets many forms of connection state in an attempt to make it
-- as similar as possible to a newly opened connection. It can be useful to
-- run this before e.g. putting a connection back into a connection pool.
-- Check `ResetConnectionOpts` for more details.
resetConnectionState ::
  HPgConnection ->
  -- | Pass `Nothing` to use defaults.
  Maybe ResetConnectionOpts ->
  IO ()
resetConnectionState :: HPgConnection -> Maybe ResetConnectionOpts -> IO ()
resetConnectionState conn :: HPgConnection
conn@HPgConnection {TVar InternalConnectionState
internalConnectionState :: TVar InternalConnectionState
internalConnectionState :: HPgConnection -> TVar InternalConnectionState
internalConnectionState} Maybe ResetConnectionOpts
mCleanOpts = do
  STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
internalConnectionState
    when (isLeft $ connectionReadyForNewPipeline st) $ throwIrrecoverableError "There are still active queries in progress. Make sure to close this connection with `closeForcefully` or consume all existing queries' results"
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ResetConnectionOpts -> Bool
checkTransactionState ResetConnectionOpts
cleanOpts) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    txnStatus <- HPgConnection -> IO TransactionStatus
transactionStatus HPgConnection
conn
    unless (txnStatus == TransIdle) $ throwIrrecoverableError $ "The connection's transaction was left in an invalid state: " <> Text.pack (show txnStatus) <> ". Make sure to close this connection with `closeForcefully`"
  -- What if there are notifications in the socket buffer? It seems reasonable to assume that when
  -- running "UNLISTEN *" those would be received, so this might be fine as long as we
  -- clear the internal queue _after_ "UNLISTEN *".
  do
    let qs :: [Query]
qs = (if ResetConnectionOpts -> Bool
unlistenAll ResetConnectionOpts
cleanOpts then [Query
"UNLISTEN *"] else []) [Query] -> [Query] -> [Query]
forall a. [a] -> [a] -> [a]
++ (if ResetConnectionOpts -> Bool
resetAll ResetConnectionOpts
cleanOpts then [Query
"RESET ALL", Query
"RESET ROLE"] else [])
    HPgConnection -> Pipeline [IO ()] -> IO [IO ()]
forall a. HPgConnection -> Pipeline a -> IO a
runPipeline HPgConnection
conn ((Query -> Pipeline (IO ())) -> [Query] -> Pipeline [IO ()]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse Query -> Pipeline (IO ())
pipelineExec_ [Query]
qs) IO [IO ()] -> ([IO ()] -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= [IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ResetConnectionOpts -> Bool
unlistenAll ResetConnectionOpts
cleanOpts) IO ()
clearInternalNotificationQueue
  where
    cleanOpts :: ResetConnectionOpts
cleanOpts = ResetConnectionOpts
-> Maybe ResetConnectionOpts -> ResetConnectionOpts
forall a. a -> Maybe a -> a
fromMaybe ResetConnectionOpts {resetAll :: Bool
resetAll = Bool
True, unlistenAll :: Bool
unlistenAll = Bool
True, checkTransactionState :: Bool
checkTransactionState = Bool
True} Maybe ResetConnectionOpts
mCleanOpts
    clearInternalNotificationQueue :: IO ()
clearInternalNotificationQueue = STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
internalConnectionState
      emptyQueue <- STM.newTQueue
      STM.writeTVar internalConnectionState $ st {notificationsReceived = emptyQueue}

-- | Closes the connection with postgres. Do not use this in exception handlers; use `closeForcefully`
-- instead.
closeGracefully :: HPgConnection -> IO ()
closeGracefully :: HPgConnection -> IO ()
closeGracefully conn :: HPgConnection
conn@(HPgConnection {Socket
socket :: HPgConnection -> Socket
socket :: Socket
socket}) = HPgConnection -> IO () -> IO ()
whenNotClosed HPgConnection
conn (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ (IO () -> IO () -> IO ()) -> IO () -> IO () -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO () -> IO () -> IO ()
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
finally (Socket -> IO ()
Socket.close Socket
socket IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar Bool -> (Bool -> IO Bool) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ HPgConnection
conn.socketClosed (IO Bool -> Bool -> IO Bool
forall a b. a -> b -> a
const (IO Bool -> Bool -> IO Bool) -> IO Bool -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  HPgConnection
-> (TVar InternalConnectionState -> STM ())
-> (TVar InternalConnectionState -> STM ())
-> (() -> IO ())
-> IO ()
forall a b c.
HPgConnection
-> (TVar InternalConnectionState -> STM a)
-> (TVar InternalConnectionState -> STM b)
-> (a -> IO c)
-> IO c
withControlMsgsLock
    HPgConnection
conn
    (STM () -> TVar InternalConnectionState -> STM ()
forall a b. a -> b -> a
const (STM () -> TVar InternalConnectionState -> STM ())
-> STM () -> TVar InternalConnectionState -> STM ()
forall a b. (a -> b) -> a -> b
$ () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
    (STM () -> TVar InternalConnectionState -> STM ()
forall a b. a -> b -> a
const (STM () -> TVar InternalConnectionState -> STM ())
-> STM () -> TVar InternalConnectionState -> STM ()
forall a b. (a -> b) -> a -> b
$ () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
    ((() -> IO ()) -> IO ()) -> (() -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \() -> do
      HPgConnection -> Terminate -> IO ()
forall msg.
(ToPgMessage msg, Show msg) =>
HPgConnection -> msg -> IO ()
nonAtomicSendMsg HPgConnection
conn Terminate
Terminate

-- | Closes the connection with postgres as quickly as possible without
-- the proper postgres protocol handshake procedures. This is equivalent to
-- closing the connection's socket in the kernel without making postgres
-- aware of it.
-- Use this if you need to close the connection in exception handlers or
-- if you received an irrecoverable Hpgsql exception.
closeForcefully :: HPgConnection -> IO ()
closeForcefully :: HPgConnection -> IO ()
closeForcefully conn :: HPgConnection
conn@(HPgConnection {Socket
socket :: HPgConnection -> Socket
socket :: Socket
socket}) = HPgConnection -> IO () -> IO ()
whenNotClosed HPgConnection
conn (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  Socket -> IO ()
Socket.close Socket
socket
  MVar Bool -> (Bool -> IO Bool) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ HPgConnection
conn.socketClosed ((Bool -> IO Bool) -> IO ()) -> (Bool -> IO Bool) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> Bool -> IO Bool
forall a b. a -> b -> a
const (IO Bool -> Bool -> IO Bool) -> IO Bool -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True

whenNotClosed :: HPgConnection -> IO () -> IO ()
whenNotClosed :: HPgConnection -> IO () -> IO ()
whenNotClosed HPgConnection
conn IO ()
f = do
  isClosed <- MVar Bool -> IO Bool
forall a. MVar a -> IO a
readMVar HPgConnection
conn.socketClosed
  unless isClosed f

withConnection :: ConnectionString -> DiffTime -> (HPgConnection -> IO a) -> IO a
withConnection :: forall a.
ConnectionString -> DiffTime -> (HPgConnection -> IO a) -> IO a
withConnection ConnectionString
connstr DiffTime
conntimeout HPgConnection -> IO a
f = IO HPgConnection
-> (HPgConnection -> IO ()) -> (HPgConnection -> IO a) -> IO a
forall (m :: * -> *) a b c.
(HasCallStack, MonadMask m) =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracketOnError (ConnectionString -> DiffTime -> IO HPgConnection
connect ConnectionString
connstr DiffTime
conntimeout) HPgConnection -> IO ()
closeForcefully ((HPgConnection -> IO a) -> IO a)
-> (HPgConnection -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \HPgConnection
conn -> do
  res <- HPgConnection -> IO a
f HPgConnection
conn
  closeGracefully conn
  pure res

withConnectionOpts :: ConnectOpts -> ConnectionString -> DiffTime -> (HPgConnection -> IO a) -> IO a
withConnectionOpts :: forall a.
ConnectOpts
-> ConnectionString -> DiffTime -> (HPgConnection -> IO a) -> IO a
withConnectionOpts ConnectOpts
connOpts ConnectionString
connstr DiffTime
conntimeout HPgConnection -> IO a
f = IO HPgConnection
-> (HPgConnection -> IO ()) -> (HPgConnection -> IO a) -> IO a
forall (m :: * -> *) a b c.
(HasCallStack, MonadMask m) =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracketOnError (ConnectOpts -> ConnectionString -> DiffTime -> IO HPgConnection
connectOpts ConnectOpts
connOpts ConnectionString
connstr DiffTime
conntimeout) HPgConnection -> IO ()
closeForcefully ((HPgConnection -> IO a) -> IO a)
-> (HPgConnection -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \HPgConnection
conn -> do
  res <- HPgConnection -> IO a
f HPgConnection
conn
  closeGracefully conn
  pure res

-- | Just like `receiveNextMsgWithMaskedContinuation` but passes a `Maybe a` to
-- the continuation instead of throwing an exception on parser failure. On parsing
-- failure, this makes sure the message buffer remains unaltered.
receiveNextMsgWithMaskedContinuation :: (Show a) => HPgConnection -> PgMsgParser a -> (a -> IO b) -> IO b
receiveNextMsgWithMaskedContinuation :: forall a b.
Show a =>
HPgConnection -> PgMsgParser a -> (a -> IO b) -> IO b
receiveNextMsgWithMaskedContinuation HPgConnection
conn PgMsgParser a
parser a -> IO b
f =
  HPgConnection
-> PgMsgParser a
-> (Either (Char, Maybe PostgresError) a -> IO b)
-> IO b
forall a b.
Show a =>
HPgConnection
-> PgMsgParser a
-> (Either (Char, Maybe PostgresError) a -> IO b)
-> IO b
receiveNextMsgWithMaskedContinuationButDontThrowOnParsingFailure HPgConnection
conn PgMsgParser a
parser ((Either (Char, Maybe PostgresError) a -> IO b) -> IO b)
-> (Either (Char, Maybe PostgresError) a -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \case
    Right a
p -> a -> IO b
f a
p
    Left (Char
msgIdentChar, Maybe PostgresError
mPgError) -> IrrecoverableHpgsqlError -> IO b
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throw IrrecoverableHpgsqlError {hpgsqlDetails :: Text
hpgsqlDetails = Text
"Could not parse postgres message with ident char " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
Text.pack (Char -> [Char]
forall a. Show a => a -> [Char]
show Char
msgIdentChar) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
". This is an internal error in Hpgsql. Please report it.", innerException :: Maybe SomeException
innerException = PostgresError -> SomeException
forall e. Exception e => e -> SomeException
toException (PostgresError -> SomeException)
-> Maybe PostgresError -> Maybe SomeException
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe PostgresError
mPgError, relatedStatement :: Maybe ByteString
relatedStatement = Maybe ByteString
forall a. Maybe a
Nothing}

-- | Masks asynchronous exceptions in between the moment the message is extracted from
-- the internal buffer and the supplied function runs to completion.
-- This is important for control messages (i.e. not `DataRow`) because if you extract a
-- message from the buffer, you must be able to update the connection's internal state,
-- lest it will be left in a very broken place.
-- CAREFUL: avoid doing networking or too much work in your supplied function. It must
-- be really cheap!
receiveNextMsgWithMaskedContinuationButDontThrowOnParsingFailure :: (Show a) => HPgConnection -> PgMsgParser a -> (Either (Char, Maybe PostgresError) a -> IO b) -> IO b
receiveNextMsgWithMaskedContinuationButDontThrowOnParsingFailure :: forall a b.
Show a =>
HPgConnection
-> PgMsgParser a
-> (Either (Char, Maybe PostgresError) a -> IO b)
-> IO b
receiveNextMsgWithMaskedContinuationButDontThrowOnParsingFailure conn :: HPgConnection
conn@HPgConnection {Socket
socket :: HPgConnection -> Socket
socket :: Socket
socket, MVar ByteString
recvBuffer :: MVar ByteString
recvBuffer :: HPgConnection -> MVar ByteString
recvBuffer} PgMsgParser a
parser Either (Char, Maybe PostgresError) a -> IO b
f = do
  -- We need to preserve the invariant that the internal buffer's first byte is
  -- always the first byte of a valid Message while keeping this function
  -- interruptible.
  -- This means we can't extract a message partially from the internal buffer,
  -- due to the presence of asynchronous exceptions.
  -- So we append to the buffer up until it has been fully fetched,
  -- and then extract it from the buffer in one piece.
  currentBuf <- Int64 -> IO ByteString
receiveUntilBufferHasAtLeast Int64
5
  let bufLen = ByteString -> Int64
LBS.length ByteString
currentBuf
  let charAndLength = Int64 -> ByteString -> ByteString
LBS.take Int64
5 ByteString
currentBuf
  let (w2c -> msgIdentChar, lenbs) = fromMaybe (error "impossible") $ LBS.uncons charAndLength
      lenLeftToFetch :: Int64 = fromIntegral $ either error id (Cereal.decodeLazy @Int32 lenbs) - 4
      fullMessageLen = Int64
5 Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
lenLeftToFetch
  restOfMsg <- LBS.drop 5 . LBS.take fullMessageLen <$> if bufLen >= fullMessageLen then pure currentBuf else receiveUntilBufferHasAtLeast fullMessageLen
  receivedNoticeOrParameterSoTryAgain <- go msgIdentChar restOfMsg fullMessageLen
  case receivedNoticeOrParameterSoTryAgain of
    Maybe b
Nothing -> HPgConnection
-> PgMsgParser a
-> (Either (Char, Maybe PostgresError) a -> IO b)
-> IO b
forall a b.
Show a =>
HPgConnection
-> PgMsgParser a
-> (Either (Char, Maybe PostgresError) a -> IO b)
-> IO b
receiveNextMsgWithMaskedContinuationButDontThrowOnParsingFailure HPgConnection
conn PgMsgParser a
parser Either (Char, Maybe PostgresError) a -> IO b
f
    Just b
res -> b -> IO b
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
res
  where
    -- We mask_ because if the supplied IO action runs with a message extracted from
    -- the recvBuffer, then we _must_ remove that message from recvBuffer.
    -- TODO: These IO actions are always STM transactions (some times just a `pure` call),
    -- so maybe we should reflect that in the type to avoid the impression that
    -- anything can happen.
    go :: Char -> ByteString -> Int64 -> IO (Maybe b)
go Char
msgIdentChar ByteString
restOfMsg Int64
fullMessageLen = IO (Maybe b) -> IO (Maybe b)
forall (m :: * -> *) a. (HasCallStack, MonadMask m) => m a -> m a
mask_ (IO (Maybe b) -> IO (Maybe b)) -> IO (Maybe b) -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$ MVar ByteString
-> (ByteString -> IO (ByteString, Maybe b)) -> IO (Maybe b)
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar ByteString
recvBuffer ((ByteString -> IO (ByteString, Maybe b)) -> IO (Maybe b))
-> (ByteString -> IO (ByteString, Maybe b)) -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$ \ByteString
lbs -> do
      let !bufferWithoutMsg :: ByteString
bufferWithoutMsg =
            if ByteString -> Int64
LBS.length ByteString
lbs Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
>= Int64
fullMessageLen
              then Int64 -> ByteString -> ByteString
LBS.drop Int64
fullMessageLen ByteString
lbs
              else
                [Char] -> ByteString
forall a. HasCallStack => [Char] -> a
error [Char]
"Bug in Hpgsql. Internal buffer's bytes weren't filled enough"

      case Char -> ByteString -> PgMsgParser a -> Maybe a
forall a. Char -> ByteString -> PgMsgParser a -> Maybe a
parsePgMessage Char
msgIdentChar ByteString
restOfMsg PgMsgParser a
parser of
        Just a
msg -> do
          [Char] -> IO ()
debugPrint ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Received " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ a -> [Char]
forall a. Show a => a -> [Char]
show a
msg
          (Maybe b -> (ByteString, Maybe b))
-> IO (Maybe b) -> IO (ByteString, Maybe b)
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (ByteString
bufferWithoutMsg,) (IO (Maybe b) -> IO (ByteString, Maybe b))
-> IO (Maybe b) -> IO (ByteString, Maybe b)
forall a b. (a -> b) -> a -> b
$ b -> Maybe b
forall a. a -> Maybe a
Just (b -> Maybe b) -> IO b -> IO (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either (Char, Maybe PostgresError) a -> IO b
f (a -> Either (Char, Maybe PostgresError) a
forall a b. b -> Either a b
Right a
msg)
        Maybe a
Nothing -> do
          -- This could be a Notification, NOTICE or a ParameterStatus message, since these
          -- can be received _at any time_ according to the docs.
          case Char
-> ByteString
-> PgMsgParser
     (Either3 NotificationResponse NoticeResponse ParameterStatus)
-> Maybe
     (Either3 NotificationResponse NoticeResponse ParameterStatus)
forall a. Char -> ByteString -> PgMsgParser a -> Maybe a
parsePgMessage Char
msgIdentChar ByteString
restOfMsg (NotificationResponse
-> Either3 NotificationResponse NoticeResponse ParameterStatus
forall a b c. a -> Either3 a b c
Left3 (NotificationResponse
 -> Either3 NotificationResponse NoticeResponse ParameterStatus)
-> PgMsgParser NotificationResponse
-> PgMsgParser
     (Either3 NotificationResponse NoticeResponse ParameterStatus)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @NotificationResponse PgMsgParser
  (Either3 NotificationResponse NoticeResponse ParameterStatus)
-> PgMsgParser
     (Either3 NotificationResponse NoticeResponse ParameterStatus)
-> PgMsgParser
     (Either3 NotificationResponse NoticeResponse ParameterStatus)
forall a. PgMsgParser a -> PgMsgParser a -> PgMsgParser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> NoticeResponse
-> Either3 NotificationResponse NoticeResponse ParameterStatus
forall a b c. b -> Either3 a b c
Middle3 (NoticeResponse
 -> Either3 NotificationResponse NoticeResponse ParameterStatus)
-> PgMsgParser NoticeResponse
-> PgMsgParser
     (Either3 NotificationResponse NoticeResponse ParameterStatus)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @NoticeResponse PgMsgParser
  (Either3 NotificationResponse NoticeResponse ParameterStatus)
-> PgMsgParser
     (Either3 NotificationResponse NoticeResponse ParameterStatus)
-> PgMsgParser
     (Either3 NotificationResponse NoticeResponse ParameterStatus)
forall a. PgMsgParser a -> PgMsgParser a -> PgMsgParser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ParameterStatus
-> Either3 NotificationResponse NoticeResponse ParameterStatus
forall a b c. c -> Either3 a b c
Right3 (ParameterStatus
 -> Either3 NotificationResponse NoticeResponse ParameterStatus)
-> PgMsgParser ParameterStatus
-> PgMsgParser
     (Either3 NotificationResponse NoticeResponse ParameterStatus)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @ParameterStatus) of
            Just (Left3 NotificationResponse
notifResponse) -> do
              [Char] -> IO ()
debugPrint [Char]
"Received notification. Will add it to internal queue."
              STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                sttv <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar (TVar InternalConnectionState -> STM InternalConnectionState)
-> TVar InternalConnectionState -> STM InternalConnectionState
forall a b. (a -> b) -> a -> b
$ HPgConnection -> TVar InternalConnectionState
internalConnectionState HPgConnection
conn
                STM.writeTQueue (notificationsReceived sttv) notifResponse
              (ByteString, Maybe b) -> IO (ByteString, Maybe b)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString
bufferWithoutMsg, Maybe b
forall a. Maybe a
Nothing)
            Just (Middle3 (NoticeResponse Map ErrorDetail ByteString
details)) -> do
              let severity :: ByteString
severity =
                    ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
fromMaybe
                      ByteString
"Notice of unknown severity"
                      (ErrorDetail -> Map ErrorDetail ByteString -> Maybe ByteString
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup ErrorDetail
ErrorSeverity Map ErrorDetail ByteString
details)
                  humanmsg :: ByteString
humanmsg = ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
fromMaybe ByteString
"no message" (ErrorDetail -> Map ErrorDetail ByteString -> Maybe ByteString
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup ErrorDetail
ErrorHumanReadableMsg Map ErrorDetail ByteString
details)
              Handle -> Text -> IO ()
Text.hPutStrLn Handle
stderr (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Text
decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
LBS.toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ ByteString
severity ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
": " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
humanmsg
              (ByteString, Maybe b) -> IO (ByteString, Maybe b)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString
bufferWithoutMsg, Maybe b
forall a. Maybe a
Nothing)
            Just (Right3 (ParameterStatus {Text
parameterName :: Text
parameterValue :: Text
parameterValue :: ParameterStatus -> Text
parameterName :: ParameterStatus -> Text
..})) -> do
              Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Text
parameterName Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Text
"client_encoding" Bool -> Bool -> Bool
&& Text
parameterValue Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
/= Text
"UTF8") (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Text -> IO ()
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Postgres sent us a change of client_encoding to not UTF8, and Hpgsql only supports UTF8. The encoding postgres sent us is " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
parameterValue
              MVar (Map Text Text)
-> (Map Text Text -> IO (Map Text Text)) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (HPgConnection -> MVar (Map Text Text)
parameterStatusMap HPgConnection
conn) ((Map Text Text -> IO (Map Text Text)) -> IO ())
-> (Map Text Text -> IO (Map Text Text)) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(!Map Text Text
paramMap) -> Map Text Text -> IO (Map Text Text)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text -> Text -> Map Text Text -> Map Text Text
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Text
parameterName Text
parameterValue Map Text Text
paramMap)
              (ByteString, Maybe b) -> IO (ByteString, Maybe b)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString
bufferWithoutMsg, Maybe b
forall a. Maybe a
Nothing)
            Maybe (Either3 NotificationResponse NoticeResponse ParameterStatus)
Nothing ->
              -- Just in case this is a postgres error, it might include useful information,
              -- so we spit that out
              let mPgError :: Maybe PostgresError
mPgError = ByteString -> ErrorResponse -> PostgresError
mkPostgresError ByteString
"" (ErrorResponse -> PostgresError)
-> Maybe ErrorResponse -> Maybe PostgresError
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Char
-> ByteString -> PgMsgParser ErrorResponse -> Maybe ErrorResponse
forall a. Char -> ByteString -> PgMsgParser a -> Maybe a
parsePgMessage Char
msgIdentChar ByteString
restOfMsg (forall a. FromPgMessage a => PgMsgParser a
msgParser @ErrorResponse)
               in (Maybe b -> (ByteString, Maybe b))
-> IO (Maybe b) -> IO (ByteString, Maybe b)
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (ByteString
lbs,) (IO (Maybe b) -> IO (ByteString, Maybe b))
-> IO (Maybe b) -> IO (ByteString, Maybe b)
forall a b. (a -> b) -> a -> b
$ b -> Maybe b
forall a. a -> Maybe a
Just (b -> Maybe b) -> IO b -> IO (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either (Char, Maybe PostgresError) a -> IO b
f ((Char, Maybe PostgresError) -> Either (Char, Maybe PostgresError) a
forall a b. a -> Either a b
Left (Char
msgIdentChar, Maybe PostgresError
mPgError))

    -- \| Appends into the internal buffer by reading from the socket
    -- until the buffer has at least N bytes.
    -- Returns the current buffer.
    receiveUntilBufferHasAtLeast :: Int64 -> IO LBS.ByteString
    receiveUntilBufferHasAtLeast :: Int64 -> IO ByteString
receiveUntilBufferHasAtLeast Int64
minBytesNecessary = do
      currentBuffer <- MVar ByteString -> IO ByteString
forall a. MVar a -> IO a
readMVar MVar ByteString
recvBuffer
      let nBytesInBuffer = ByteString -> Int64
LBS.length ByteString
currentBuffer
      if nBytesInBuffer >= minBytesNecessary
        then pure currentBuffer
        else do
          -- This takes from the kernel's recv buffer and appends to our buffer atomically,
          -- or an exception is thrown when receiving.
          mask $ \forall a. IO a -> IO a
restore -> IO () -> IO ()
forall a. IO a -> IO a
rethrowAsIrrecoverable (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar ByteString -> (ByteString -> IO ByteString) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar ByteString
recvBuffer ((ByteString -> IO ByteString) -> IO ())
-> (ByteString -> IO ByteString) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ByteString
lbs -> do
            IO () -> IO ()
forall a. IO a -> IO a
restore (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Socket -> IO ()
socketWaitRead Socket
socket
            someBytes <- [Char] -> IO ByteString -> IO ByteString
forall a. [Char] -> IO a -> IO a
timeDebugNonBlockingOperation [Char]
"recv" (IO ByteString -> IO ByteString) -> IO ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ Socket -> CSize -> IO ByteString
recvNonBlocking Socket
socket (CSize -> CSize -> CSize
forall a. Ord a => a -> a -> a
max CSize
16000 (CSize -> CSize) -> CSize -> CSize
forall a b. (a -> b) -> a -> b
$ Int64 -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> CSize) -> Int64 -> CSize
forall a b. (a -> b) -> a -> b
$ Int64
minBytesNecessary Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
nBytesInBuffer)
            pure (lbs <> LBS.fromStrict someBytes)
          receiveUntilBufferHasAtLeast minBytesNecessary

sendCancellationRequest :: HPgConnection -> IO ()
sendCancellationRequest :: HPgConnection -> IO ()
sendCancellationRequest HPgConnection
conn = do
  copyState <-
    STM (Maybe CopyQueryState) -> IO (Maybe CopyQueryState)
forall a. STM a -> IO a
STM.atomically (STM (Maybe CopyQueryState) -> IO (Maybe CopyQueryState))
-> STM (Maybe CopyQueryState) -> IO (Maybe CopyQueryState)
forall a b. (a -> b) -> a -> b
$
      TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar (HPgConnection -> TVar InternalConnectionState
internalConnectionState HPgConnection
conn) STM InternalConnectionState
-> (InternalConnectionState -> STM (Maybe CopyQueryState))
-> STM (Maybe CopyQueryState)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \InternalConnectionState
st -> Maybe CopyQueryState -> STM (Maybe CopyQueryState)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe CopyQueryState -> STM (Maybe CopyQueryState))
-> Maybe CopyQueryState -> STM (Maybe CopyQueryState)
forall a b. (a -> b) -> a -> b
$ case InternalConnectionState -> [QueryState]
currentPipeline InternalConnectionState
st of
        [QueryState {queryProtocol :: QueryState -> QueryProtocol
queryProtocol = CopyQuery CopyQueryState
copyState}] -> CopyQueryState -> Maybe CopyQueryState
forall a. a -> Maybe a
Just CopyQueryState
copyState
        [QueryState]
_ -> Maybe CopyQueryState
forall a. Maybe a
Nothing
  let markCopyFailSent = do
        let sttv :: TVar InternalConnectionState
sttv = HPgConnection -> TVar InternalConnectionState
internalConnectionState HPgConnection
conn
        st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
sttv
        case currentPipeline st of
          [qs :: QueryState
qs@QueryState {queryProtocol :: QueryState -> QueryProtocol
queryProtocol = CopyQuery CopyQueryState
StillCopying}] -> TVar InternalConnectionState -> InternalConnectionState -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar (HPgConnection -> TVar InternalConnectionState
internalConnectionState HPgConnection
conn) (InternalConnectionState -> STM ())
-> InternalConnectionState -> STM ()
forall a b. (a -> b) -> a -> b
$ InternalConnectionState
st {currentPipeline = [qs {queryProtocol = CopyQuery CopyFailAndSyncSent}]}
          [QueryState]
_ -> Text -> STM ()
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Impossible: when marking CopyFail state was invalid"
  case copyState of
    Just CopyQueryState
StillCopying ->
      HPgConnection -> ([SomeMessage], STM ()) -> IO ()
atomicallySendControlMsgs_ HPgConnection
conn ([CopyFail -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage (CopyFail -> SomeMessage) -> CopyFail -> SomeMessage
forall a b. (a -> b) -> a -> b
$ [Char] -> CopyFail
Msgs.CopyFail [Char]
"COPY statement cancelled by Hpgsql", Sync -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage Sync
Sync], STM ()
markCopyFailSent)
    Just CopyQueryState
CopyDoneAndSyncSent ->
      () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
        () -- Already finished, nothing to cancel
    Just CopyQueryState
CopyFailAndSyncSent ->
      () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
        () -- Already cancelled, no need to send another
    Maybe CopyQueryState
Nothing ->
      InternalConnectOrCancelRequest ()
-> ConnectOpts -> ConnectionString -> DiffTime -> IO ()
forall a.
InternalConnectOrCancelRequest a
-> ConnectOpts -> ConnectionString -> DiffTime -> IO a
internalConnectOrCancel
        (CancelRequest -> AddrInfo -> InternalConnectOrCancelRequest ()
CancelNotConnect (Int32 -> Int32 -> CancelRequest
CancelRequest (HPgConnection -> Int32
connPid HPgConnection
conn) (HPgConnection -> Int32
cancelSecretKey HPgConnection
conn)) (HPgConnection -> AddrInfo
connectedTo HPgConnection
conn))
        (HPgConnection -> ConnectOpts
connOpts HPgConnection
conn)
        (HPgConnection -> ConnectionString
originalConnStr HPgConnection
conn)
        (Integer -> DiffTime
secondsToDiffTime Integer
30)

isLastInPipeline :: HPgConnection -> QueryId -> IO Bool
isLastInPipeline :: HPgConnection -> QueryId -> IO Bool
isLastInPipeline HPgConnection
conn QueryId
qryId = STM Bool -> IO Bool
forall a. STM a -> IO a
STM.atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ HPgConnection
-> (TVar InternalConnectionState -> STM Bool) -> STM Bool
forall a.
HPgConnection -> (TVar InternalConnectionState -> STM a) -> STM a
updateConnStateTxn HPgConnection
conn ((TVar InternalConnectionState -> STM Bool) -> STM Bool)
-> (TVar InternalConnectionState -> STM Bool) -> STM Bool
forall a b. (a -> b) -> a -> b
$ \TVar InternalConnectionState
sttv -> do
  InternalConnectionState {currentPipeline = queries} <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
sttv
  pure $ case lastMaybe queries of
    Maybe QueryState
Nothing -> Bool
False
    Just QueryState
q -> QueryState -> QueryId
queryIdentifier QueryState
q QueryId -> QueryId -> Bool
forall a. Eq a => a -> a -> Bool
== QueryId
qryId

updateConnStateTxn :: HPgConnection -> (TVar InternalConnectionState -> STM a) -> STM a
updateConnStateTxn :: forall a.
HPgConnection -> (TVar InternalConnectionState -> STM a) -> STM a
updateConnStateTxn HPgConnection
conn TVar InternalConnectionState -> STM a
f = TVar InternalConnectionState -> STM a
f (HPgConnection -> TVar InternalConnectionState
internalConnectionState HPgConnection
conn)

-- | Run an acquire STM transaction, then the supplied function, then a release
-- STM transaction (bracket style), while holding a connection-level lock.
-- This is necessary for "control" messages, i.e. messages that affect the
-- connection's internal state.
withControlMsgsLock ::
  HPgConnection ->
  (TVar InternalConnectionState -> STM a) ->
  (TVar InternalConnectionState -> STM b) ->
  (a -> IO c) ->
  IO c
withControlMsgsLock :: forall a b c.
HPgConnection
-> (TVar InternalConnectionState -> STM a)
-> (TVar InternalConnectionState -> STM b)
-> (a -> IO c)
-> IO c
withControlMsgsLock conn :: HPgConnection
conn@HPgConnection {Socket
socket :: HPgConnection -> Socket
socket :: Socket
socket, Mutex
socketMutex :: Mutex
socketMutex :: HPgConnection -> Mutex
socketMutex} TVar InternalConnectionState -> STM a
acqStm TVar InternalConnectionState -> STM b
relStm a -> IO c
f = do
  Mutex -> IO c -> IO c
forall a. Mutex -> IO a -> IO a
withMutex Mutex
socketMutex (IO c -> IO c) -> IO c -> IO c
forall a b. (a -> b) -> a -> b
$ do
    -- The lock is acquired, so now we run flushSendBuffer
    -- so the caller will only see internal state after previous
    -- messages were sent
    IO ()
flushSendBuffer
    IO a -> (a -> IO ()) -> (a -> IO c) -> IO c
forall (m :: * -> *) a b c.
(HasCallStack, MonadMask m) =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket
      (STM a -> IO a
forall a. STM a -> IO a
STM.atomically (STM a -> IO a) -> STM a -> IO a
forall a b. (a -> b) -> a -> b
$ HPgConnection -> (TVar InternalConnectionState -> STM a) -> STM a
forall a.
HPgConnection -> (TVar InternalConnectionState -> STM a) -> STM a
updateConnStateTxn HPgConnection
conn TVar InternalConnectionState -> STM a
acqStm)
      (IO () -> a -> IO ()
forall a b. a -> b -> a
const (IO () -> a -> IO ()) -> IO () -> a -> IO ()
forall a b. (a -> b) -> a -> b
$ IO b -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO b -> IO ()) -> IO b -> IO ()
forall a b. (a -> b) -> a -> b
$ STM b -> IO b
forall a. STM a -> IO a
STM.atomically (STM b -> IO b) -> STM b -> IO b
forall a b. (a -> b) -> a -> b
$ HPgConnection -> (TVar InternalConnectionState -> STM b) -> STM b
forall a.
HPgConnection -> (TVar InternalConnectionState -> STM a) -> STM a
updateConnStateTxn HPgConnection
conn TVar InternalConnectionState -> STM b
relStm)
      ( \a
acq -> do
          res <- a -> IO c
f a
acq
          -- Flush the send buffer that `f` may have populated and
          -- apply all internal connection state changes before we
          -- let the release STM transaction look at it.
          flushSendBuffer
          pure res
      )
  where
    flushSendBuffer :: IO ()
    flushSendBuffer :: IO ()
flushSendBuffer =
      -- An exception here could be a socket error or something
      -- that forces us to discard the connection
      IO () -> IO ()
forall a. IO a -> IO a
rethrowAsIrrecoverable (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
        ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall b.
HasCallStack =>
((forall a. IO a -> IO a) -> IO b) -> IO b
forall (m :: * -> *) b.
(MonadMask m, HasCallStack) =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. IO a -> IO a) -> IO ()) -> IO ())
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$
          \forall a. IO a -> IO a
restore -> do
            let go :: IO ()
go = do
                  others <- MVar [(ByteString, STM ())]
-> ([(ByteString, STM ())]
    -> IO ([(ByteString, STM ())], [(ByteString, STM ())]))
-> IO [(ByteString, STM ())]
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar HPgConnection
conn.sendBuffer (([(ByteString, STM ())]
  -> IO ([(ByteString, STM ())], [(ByteString, STM ())]))
 -> IO [(ByteString, STM ())])
-> ([(ByteString, STM ())]
    -> IO ([(ByteString, STM ())], [(ByteString, STM ())]))
-> IO [(ByteString, STM ())]
forall a b. (a -> b) -> a -> b
$ \case
                    [] -> ([(ByteString, STM ())], [(ByteString, STM ())])
-> IO ([(ByteString, STM ())], [(ByteString, STM ())])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([], [])
                    ((!ByteString
msgs, STM ()
afterSentTxn) : [(ByteString, STM ())]
xs) ->
                      if ByteString -> Bool
LBS.null ByteString
msgs
                        then do
                          -- debugPrint "Finished sending msgs"
                          STM () -> IO ()
forall a. STM a -> IO a
STM.atomically STM ()
afterSentTxn
                          ([(ByteString, STM ())], [(ByteString, STM ())])
-> IO ([(ByteString, STM ())], [(ByteString, STM ())])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([(ByteString, STM ())]
xs, [(ByteString, STM ())]
xs)
                        else do
                          IO () -> IO ()
forall a. IO a -> IO a
restore (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Socket -> IO ()
socketWaitWrite Socket
socket
                          n <- [Char] -> IO Int64 -> IO Int64
forall a. [Char] -> IO a -> IO a
timeDebugNonBlockingOperation [Char]
"sendNonBlocking" (IO Int64 -> IO Int64) -> IO Int64 -> IO Int64
forall a b. (a -> b) -> a -> b
$ Socket -> ByteString -> IO Int64
sendNonBlocking Socket
socket ByteString
msgs
                          -- debugPrint $ "Sent " ++ show n ++ ". Left: " ++ show (LBS.length (LBS.drop n msgs))
                          let !remaining = Int64 -> ByteString -> ByteString
LBS.drop Int64
n ByteString
msgs
                              fin = (ByteString
remaining, STM ()
afterSentTxn) (ByteString, STM ())
-> [(ByteString, STM ())] -> [(ByteString, STM ())]
forall a. a -> [a] -> [a]
: [(ByteString, STM ())]
xs
                          pure (fin, fin)

                  -- debugPrint $ show $ stop || null others
                  unless (null others) go
            IO ()
go

-- | Receives the next response message for the given QueryId atomically, updates
-- internal connection state to reflect it, and returns the updated state alongside the
-- received message.
-- This also receives ReadyForQuery if the query is already in ErrorResponse or if it's the
-- last in the pipeline and has received CommandComplete.
-- If a pipeline has already received ReadyForQuery, this will return that ReadyForQuery
-- without receiving any new messages. This is helpful if a thread is interrupted
-- You don't want to use this for receiving DataRows in large scale, because the costs of
-- STM transactions apply here.
receiveOutstandingResponseMsgsAtomically :: WeakThreadId -> HPgConnection -> QueryId -> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveOutstandingResponseMsgsAtomically :: WeakThreadId
-> HPgConnection
-> QueryId
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveOutstandingResponseMsgsAtomically WeakThreadId
thisThreadId HPgConnection
conn QueryId
qryId = do
  -- debugPrint $ "Internal state: [Waiting] until ok to receive response control messages for QueryId " ++ show qryId
  HPgConnection
-> (TVar InternalConnectionState -> STM QueryState)
-> (TVar InternalConnectionState -> STM ())
-> (QueryState -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b c.
HPgConnection
-> (TVar InternalConnectionState -> STM a)
-> (TVar InternalConnectionState -> STM b)
-> (a -> IO c)
-> IO c
withControlMsgsLock
    HPgConnection
conn
    -- Check last received message and take lock before receiving next message
    (STM QueryState -> TVar InternalConnectionState -> STM QueryState
forall a b. a -> b -> a
const STM QueryState
getQueryStateIfFirstOrThrow)
    (STM () -> TVar InternalConnectionState -> STM ()
forall a b. a -> b -> a
const (STM () -> TVar InternalConnectionState -> STM ())
-> STM () -> TVar InternalConnectionState -> STM ()
forall a b. (a -> b) -> a -> b
$ () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
    ((QueryState -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
 -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> (QueryState -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ \QueryState
qryState -> do
      case QueryState -> ResponseMsgsReceived
responseMsgsState QueryState
qryState of
        ResponseMsgsReceived
NoMsgsReceived -> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveParseOrBindCompleteAtomically
        ParseCompleteReceived ParseComplete
_ -> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveBindCompleteAtomically
        BindCompleteReceived BindComplete
_ -> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveNoDataOrRowDescriptionOrCopyInResponseAtomically
        RowDescriptionOrNoDataOrCopyInResponseReceived Either3 NoData RowDescription CopyInResponse
_ -> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveDataRowOrCommandCompleteAtomically
        ErrorResponseReceived Maybe (Either3 NoData RowDescription CopyInResponse)
_ ErrorResponse
_ -> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveReadyForQueryAtomically
        state :: ResponseMsgsReceived
state@(CommandCompleteReceived Either3 NoData RowDescription CopyInResponse
_ CommandComplete
_) -> IO Bool
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a. IO Bool -> IO a -> IO a -> IO a
ifM (HPgConnection -> QueryId -> IO Bool
isLastInPipeline HPgConnection
conn QueryId
qryId) IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveReadyForQueryAtomically ((Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe ResponseMsg
forall a. Maybe a
Nothing, ResponseMsgsReceived
state)) -- Nothing more to receive after CommandComplete unless it's the last query in the pipeline
        state :: ResponseMsgsReceived
state@(ReadyForQueryReceived Either ErrorResponse CommandComplete
_ ReadyForQuery
_) -> (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe ResponseMsg
forall a. Maybe a
Nothing, ResponseMsgsReceived
state) -- Definitely nothing to receive if we're here
  where
    -- \| We don't send a `Parse` msg for already-prepared statements, and so for those
    -- we don't even get a `ParseComplete`.
    -- We do have an assertion in `updateQueryStateIfFirstOrThrow` to ensure we aren't
    -- dismissive of messages arriving in unexpected order.
    receiveParseOrBindCompleteAtomically :: IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveParseOrBindCompleteAtomically = HPgConnection
-> PgMsgParser (Either3 ErrorResponse BindComplete ParseComplete)
-> (Either3 ErrorResponse BindComplete ParseComplete
    -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b.
Show a =>
HPgConnection -> PgMsgParser a -> (a -> IO b) -> IO b
receiveNextMsgWithMaskedContinuation HPgConnection
conn (ParseComplete -> Either3 ErrorResponse BindComplete ParseComplete
forall a b c. c -> Either3 a b c
Right3 (ParseComplete -> Either3 ErrorResponse BindComplete ParseComplete)
-> PgMsgParser ParseComplete
-> PgMsgParser (Either3 ErrorResponse BindComplete ParseComplete)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @ParseComplete PgMsgParser (Either3 ErrorResponse BindComplete ParseComplete)
-> PgMsgParser (Either3 ErrorResponse BindComplete ParseComplete)
-> PgMsgParser (Either3 ErrorResponse BindComplete ParseComplete)
forall a. PgMsgParser a -> PgMsgParser a -> PgMsgParser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ErrorResponse -> Either3 ErrorResponse BindComplete ParseComplete
forall a b c. a -> Either3 a b c
Left3 (ErrorResponse -> Either3 ErrorResponse BindComplete ParseComplete)
-> PgMsgParser ErrorResponse
-> PgMsgParser (Either3 ErrorResponse BindComplete ParseComplete)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @ErrorResponse PgMsgParser (Either3 ErrorResponse BindComplete ParseComplete)
-> PgMsgParser (Either3 ErrorResponse BindComplete ParseComplete)
-> PgMsgParser (Either3 ErrorResponse BindComplete ParseComplete)
forall a. PgMsgParser a -> PgMsgParser a -> PgMsgParser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> BindComplete -> Either3 ErrorResponse BindComplete ParseComplete
forall a b c. b -> Either3 a b c
Middle3 (BindComplete -> Either3 ErrorResponse BindComplete ParseComplete)
-> PgMsgParser BindComplete
-> PgMsgParser (Either3 ErrorResponse BindComplete ParseComplete)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @BindComplete) ((Either3 ErrorResponse BindComplete ParseComplete
  -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
 -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> (Either3 ErrorResponse BindComplete ParseComplete
    -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ \Either3 ErrorResponse BindComplete ParseComplete
msgE -> do
      STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a. STM a -> IO a
STM.atomically (STM (Maybe ResponseMsg, ResponseMsgsReceived)
 -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
updateQueryStateIfFirstOrThrow (ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived))
-> ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ case Either3 ErrorResponse BindComplete ParseComplete
msgE of
        Right3 ParseComplete
msg -> ParseComplete -> ResponseMsg
RespParseComplete ParseComplete
msg
        Middle3 BindComplete
msg -> BindComplete -> ResponseMsg
RespBindComplete BindComplete
msg
        Left3 ErrorResponse
err -> ErrorResponse -> ResponseMsg
RespErrorResponse ErrorResponse
err
    receiveBindCompleteAtomically :: IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveBindCompleteAtomically = HPgConnection
-> PgMsgParser (Either ErrorResponse BindComplete)
-> (Either ErrorResponse BindComplete
    -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b.
Show a =>
HPgConnection -> PgMsgParser a -> (a -> IO b) -> IO b
receiveNextMsgWithMaskedContinuation HPgConnection
conn (BindComplete -> Either ErrorResponse BindComplete
forall a b. b -> Either a b
Right (BindComplete -> Either ErrorResponse BindComplete)
-> PgMsgParser BindComplete
-> PgMsgParser (Either ErrorResponse BindComplete)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @BindComplete PgMsgParser (Either ErrorResponse BindComplete)
-> PgMsgParser (Either ErrorResponse BindComplete)
-> PgMsgParser (Either ErrorResponse BindComplete)
forall a. PgMsgParser a -> PgMsgParser a -> PgMsgParser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ErrorResponse -> Either ErrorResponse BindComplete
forall a b. a -> Either a b
Left (ErrorResponse -> Either ErrorResponse BindComplete)
-> PgMsgParser ErrorResponse
-> PgMsgParser (Either ErrorResponse BindComplete)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @ErrorResponse) ((Either ErrorResponse BindComplete
  -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
 -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> (Either ErrorResponse BindComplete
    -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ \Either ErrorResponse BindComplete
msgE -> do
      STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a. STM a -> IO a
STM.atomically (STM (Maybe ResponseMsg, ResponseMsgsReceived)
 -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
updateQueryStateIfFirstOrThrow (ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived))
-> ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ case Either ErrorResponse BindComplete
msgE of
        Right BindComplete
msg -> BindComplete -> ResponseMsg
RespBindComplete BindComplete
msg
        Left ErrorResponse
err -> ErrorResponse -> ResponseMsg
RespErrorResponse ErrorResponse
err
    receiveNoDataOrRowDescriptionOrCopyInResponseAtomically :: IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveNoDataOrRowDescriptionOrCopyInResponseAtomically = HPgConnection
-> PgMsgParser
     (Either
        ErrorResponse (Either3 NoData CopyInResponse RowDescription))
-> (Either
      ErrorResponse (Either3 NoData CopyInResponse RowDescription)
    -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b.
Show a =>
HPgConnection -> PgMsgParser a -> (a -> IO b) -> IO b
receiveNextMsgWithMaskedContinuation HPgConnection
conn (Either3 NoData CopyInResponse RowDescription
-> Either
     ErrorResponse (Either3 NoData CopyInResponse RowDescription)
forall a b. b -> Either a b
Right (Either3 NoData CopyInResponse RowDescription
 -> Either
      ErrorResponse (Either3 NoData CopyInResponse RowDescription))
-> PgMsgParser (Either3 NoData CopyInResponse RowDescription)
-> PgMsgParser
     (Either
        ErrorResponse (Either3 NoData CopyInResponse RowDescription))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (RowDescription -> Either3 NoData CopyInResponse RowDescription
forall a b c. c -> Either3 a b c
Right3 (RowDescription -> Either3 NoData CopyInResponse RowDescription)
-> PgMsgParser RowDescription
-> PgMsgParser (Either3 NoData CopyInResponse RowDescription)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @RowDescription PgMsgParser (Either3 NoData CopyInResponse RowDescription)
-> PgMsgParser (Either3 NoData CopyInResponse RowDescription)
-> PgMsgParser (Either3 NoData CopyInResponse RowDescription)
forall a. PgMsgParser a -> PgMsgParser a -> PgMsgParser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> NoData -> Either3 NoData CopyInResponse RowDescription
forall a b c. a -> Either3 a b c
Left3 (NoData -> Either3 NoData CopyInResponse RowDescription)
-> PgMsgParser NoData
-> PgMsgParser (Either3 NoData CopyInResponse RowDescription)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @NoData PgMsgParser (Either3 NoData CopyInResponse RowDescription)
-> PgMsgParser (Either3 NoData CopyInResponse RowDescription)
-> PgMsgParser (Either3 NoData CopyInResponse RowDescription)
forall a. PgMsgParser a -> PgMsgParser a -> PgMsgParser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> CopyInResponse -> Either3 NoData CopyInResponse RowDescription
forall a b c. b -> Either3 a b c
Middle3 (CopyInResponse -> Either3 NoData CopyInResponse RowDescription)
-> PgMsgParser CopyInResponse
-> PgMsgParser (Either3 NoData CopyInResponse RowDescription)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @CopyInResponse) PgMsgParser
  (Either
     ErrorResponse (Either3 NoData CopyInResponse RowDescription))
-> PgMsgParser
     (Either
        ErrorResponse (Either3 NoData CopyInResponse RowDescription))
-> PgMsgParser
     (Either
        ErrorResponse (Either3 NoData CopyInResponse RowDescription))
forall a. PgMsgParser a -> PgMsgParser a -> PgMsgParser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ErrorResponse
-> Either
     ErrorResponse (Either3 NoData CopyInResponse RowDescription)
forall a b. a -> Either a b
Left (ErrorResponse
 -> Either
      ErrorResponse (Either3 NoData CopyInResponse RowDescription))
-> PgMsgParser ErrorResponse
-> PgMsgParser
     (Either
        ErrorResponse (Either3 NoData CopyInResponse RowDescription))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @ErrorResponse) ((Either
    ErrorResponse (Either3 NoData CopyInResponse RowDescription)
  -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
 -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> (Either
      ErrorResponse (Either3 NoData CopyInResponse RowDescription)
    -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ \Either ErrorResponse (Either3 NoData CopyInResponse RowDescription)
msgE -> do
      STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a. STM a -> IO a
STM.atomically (STM (Maybe ResponseMsg, ResponseMsgsReceived)
 -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
updateQueryStateIfFirstOrThrow (ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived))
-> ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ case Either ErrorResponse (Either3 NoData CopyInResponse RowDescription)
msgE of
        Right (Left3 NoData
msg) -> NoData -> ResponseMsg
RespNoData NoData
msg
        Right (Middle3 CopyInResponse
msg) -> CopyInResponse -> ResponseMsg
RespCopyInResponse CopyInResponse
msg
        Right (Right3 RowDescription
msg) -> RowDescription -> ResponseMsg
RespRowDescription RowDescription
msg
        Left ErrorResponse
err -> ErrorResponse -> ResponseMsg
RespErrorResponse ErrorResponse
err
    receiveDataRowOrCommandCompleteAtomically :: IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveDataRowOrCommandCompleteAtomically = HPgConnection
-> PgMsgParser (Either3 ErrorResponse CommandComplete DataRow)
-> (Either3 ErrorResponse CommandComplete DataRow
    -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b.
Show a =>
HPgConnection -> PgMsgParser a -> (a -> IO b) -> IO b
receiveNextMsgWithMaskedContinuation HPgConnection
conn (DataRow -> Either3 ErrorResponse CommandComplete DataRow
forall a b c. c -> Either3 a b c
Right3 (DataRow -> Either3 ErrorResponse CommandComplete DataRow)
-> PgMsgParser DataRow
-> PgMsgParser (Either3 ErrorResponse CommandComplete DataRow)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @DataRow PgMsgParser (Either3 ErrorResponse CommandComplete DataRow)
-> PgMsgParser (Either3 ErrorResponse CommandComplete DataRow)
-> PgMsgParser (Either3 ErrorResponse CommandComplete DataRow)
forall a. PgMsgParser a -> PgMsgParser a -> PgMsgParser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> CommandComplete -> Either3 ErrorResponse CommandComplete DataRow
forall a b c. b -> Either3 a b c
Middle3 (CommandComplete -> Either3 ErrorResponse CommandComplete DataRow)
-> PgMsgParser CommandComplete
-> PgMsgParser (Either3 ErrorResponse CommandComplete DataRow)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @CommandComplete PgMsgParser (Either3 ErrorResponse CommandComplete DataRow)
-> PgMsgParser (Either3 ErrorResponse CommandComplete DataRow)
-> PgMsgParser (Either3 ErrorResponse CommandComplete DataRow)
forall a. PgMsgParser a -> PgMsgParser a -> PgMsgParser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ErrorResponse -> Either3 ErrorResponse CommandComplete DataRow
forall a b c. a -> Either3 a b c
Left3 (ErrorResponse -> Either3 ErrorResponse CommandComplete DataRow)
-> PgMsgParser ErrorResponse
-> PgMsgParser (Either3 ErrorResponse CommandComplete DataRow)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @ErrorResponse) ((Either3 ErrorResponse CommandComplete DataRow
  -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
 -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> (Either3 ErrorResponse CommandComplete DataRow
    -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ \Either3 ErrorResponse CommandComplete DataRow
msgE -> do
      STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a. STM a -> IO a
STM.atomically (STM (Maybe ResponseMsg, ResponseMsgsReceived)
 -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
updateQueryStateIfFirstOrThrow (ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived))
-> ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ case Either3 ErrorResponse CommandComplete DataRow
msgE of
        Right3 DataRow
msg -> DataRow -> ResponseMsg
RespDataRow DataRow
msg
        Middle3 CommandComplete
msg -> CommandComplete -> ResponseMsg
RespCommandComplete CommandComplete
msg
        Left3 ErrorResponse
msg -> ErrorResponse -> ResponseMsg
RespErrorResponse ErrorResponse
msg
    receiveReadyForQueryAtomically :: IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveReadyForQueryAtomically = HPgConnection
-> PgMsgParser ReadyForQuery
-> (ReadyForQuery -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b.
Show a =>
HPgConnection -> PgMsgParser a -> (a -> IO b) -> IO b
receiveNextMsgWithMaskedContinuation HPgConnection
conn (forall a. FromPgMessage a => PgMsgParser a
msgParser @ReadyForQuery) ((ReadyForQuery -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
 -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> (ReadyForQuery -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ \ReadyForQuery
rq -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a. STM a -> IO a
STM.atomically (STM (Maybe ResponseMsg, ResponseMsgsReceived)
 -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
updateQueryStateIfFirstOrThrow (ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived))
-> ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ ReadyForQuery -> ResponseMsg
RespReadyForQuery ReadyForQuery
rq
    getQueryStateIfFirstOrThrow :: STM QueryState
    getQueryStateIfFirstOrThrow :: STM QueryState
getQueryStateIfFirstOrThrow = HPgConnection
-> (TVar InternalConnectionState -> STM QueryState)
-> STM QueryState
forall a.
HPgConnection -> (TVar InternalConnectionState -> STM a) -> STM a
updateConnStateTxn HPgConnection
conn ((TVar InternalConnectionState -> STM QueryState)
 -> STM QueryState)
-> (TVar InternalConnectionState -> STM QueryState)
-> STM QueryState
forall a b. (a -> b) -> a -> b
$ \TVar InternalConnectionState
sttv -> do
      st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
sttv
      case currentPipeline st of
        [] -> Text -> STM QueryState
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError (Text -> STM QueryState) -> Text -> STM QueryState
forall a b. (a -> b) -> a -> b
$ Text
"QueryId " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
Text.pack (QueryId -> [Char]
forall a. Show a => a -> [Char]
show QueryId
qryId) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" does not exist because the pipeline is empty. This is most likely a bug in Hpgsql, but just in case, are you trying to consume a pipeline that no longer exists?"
        [QueryState]
queries
          | (QueryState -> Bool) -> [QueryState] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all ((QueryId -> QueryId -> Bool
forall a. Ord a => a -> a -> Bool
> QueryId
qryId) (QueryId -> Bool) -> (QueryState -> QueryId) -> QueryState -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QueryState -> QueryId
queryIdentifier) [QueryState]
queries -> Text -> STM QueryState
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError (Text -> STM QueryState) -> Text -> STM QueryState
forall a b. (a -> b) -> a -> b
$ Text
"Bug in Hpgsql: trying to receive outstanding messages for a pipeline that has already been fully consumed. Information about this pipeline no longer available in internal state.: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
Text.pack ((QueryId, [QueryState]) -> [Char]
forall a. Show a => a -> [Char]
show (QueryId
qryId, [QueryState]
queries))
          | (QueryState -> Bool) -> [QueryState] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any ((WeakThreadId -> WeakThreadId -> Bool
forall a. Eq a => a -> a -> Bool
/= WeakThreadId
thisThreadId) (WeakThreadId -> Bool)
-> (QueryState -> WeakThreadId) -> QueryState -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QueryState -> WeakThreadId
queryOwner) [QueryState]
queries -> Text -> STM QueryState
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Hpgsql does not support consuming different SQL statements' results of the same pipeline from different threads. Behaviour is undefined if you try that."
        ([QueryState] -> ([QueryState], QueryState)
splitQueries -> ([QueryState]
earlierQueries, QueryState
thisQuery))
          | (QueryState -> Bool) -> [QueryState] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any QueryState -> Bool
queryInError [QueryState]
earlierQueries -> Text -> STM QueryState
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Another query in the same pipeline threw an error"
          | Bool -> Bool
not ((QueryState -> Bool) -> [QueryState] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all QueryState -> Bool
queryComplete [QueryState]
earlierQueries) -> Text -> STM QueryState
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Are you trying to consume a statement's results before consuming the results of previous statements of the same pipeline? Hpgsql does not support that. It is also possible a previous statement in the pipeline threw an irrecoverable error, and you still tried to consume another statement's results, which is also not supported."
          | Bool
otherwise -> QueryState -> STM QueryState
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure QueryState
thisQuery

    splitQueries :: [QueryState] -> ([QueryState], QueryState)
    splitQueries :: [QueryState] -> ([QueryState], QueryState)
splitQueries [QueryState]
qs = case (QueryState -> Bool)
-> [QueryState] -> ([QueryState], [QueryState])
forall a. (a -> Bool) -> [a] -> ([a], [a])
List.span ((QueryId -> QueryId -> Bool
forall a. Ord a => a -> a -> Bool
< QueryId
qryId) (QueryId -> Bool) -> (QueryState -> QueryId) -> QueryState -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QueryState -> QueryId
queryIdentifier) [QueryState]
qs of
      ([QueryState]
_, []) -> [Char] -> ([QueryState], QueryState)
forall a. HasCallStack => [Char] -> a
error ([Char] -> ([QueryState], QueryState))
-> [Char] -> ([QueryState], QueryState)
forall a b. (a -> b) -> a -> b
$ [Char]
"Could not find query with right Id: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ ([QueryState], QueryId) -> [Char]
forall a. Show a => a -> [Char]
show ([QueryState]
qs, QueryId
qryId)
      ([QueryState]
as, QueryState
firstQuery : [QueryState]
_others)
        | QueryState -> QueryId
queryIdentifier QueryState
firstQuery QueryId -> QueryId -> Bool
forall a. Eq a => a -> a -> Bool
== QueryId
qryId -> ([QueryState]
as, QueryState
firstQuery)
        | Bool
otherwise -> [Char] -> ([QueryState], QueryState)
forall a. HasCallStack => [Char] -> a
error ([Char] -> ([QueryState], QueryState))
-> [Char] -> ([QueryState], QueryState)
forall a b. (a -> b) -> a -> b
$ [Char]
"Could not find query right Id (part 2): " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ ([QueryState], QueryId) -> [Char]
forall a. Show a => a -> [Char]
show ([QueryState]
qs, QueryId
qryId)

    queryComplete :: QueryState -> Bool
    queryComplete :: QueryState -> Bool
queryComplete QueryState {ResponseMsgsReceived
responseMsgsState :: QueryState -> ResponseMsgsReceived
responseMsgsState :: ResponseMsgsReceived
responseMsgsState} = case ResponseMsgsReceived
responseMsgsState of
      CommandCompleteReceived Either3 NoData RowDescription CopyInResponse
_ CommandComplete
_ -> Bool
True
      ResponseMsgsReceived
_ -> Bool
False

    queryInError :: QueryState -> Bool
    queryInError :: QueryState -> Bool
queryInError QueryState {ResponseMsgsReceived
responseMsgsState :: QueryState -> ResponseMsgsReceived
responseMsgsState :: ResponseMsgsReceived
responseMsgsState} = case ResponseMsgsReceived
responseMsgsState of
      ErrorResponseReceived Maybe (Either3 NoData RowDescription CopyInResponse)
_ ErrorResponse
_ -> Bool
True
      ReadyForQueryReceived (Left ErrorResponse {}) ReadyForQuery
_ -> Bool
True
      ResponseMsgsReceived
_ -> Bool
False

    updateQueryStateIfFirstOrThrow :: ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
    updateQueryStateIfFirstOrThrow :: ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
updateQueryStateIfFirstOrThrow ResponseMsg
respMsg = HPgConnection
-> (TVar InternalConnectionState
    -> STM (Maybe ResponseMsg, ResponseMsgsReceived))
-> STM (Maybe ResponseMsg, ResponseMsgsReceived)
forall a.
HPgConnection -> (TVar InternalConnectionState -> STM a) -> STM a
updateConnStateTxn HPgConnection
conn ((TVar InternalConnectionState
  -> STM (Maybe ResponseMsg, ResponseMsgsReceived))
 -> STM (Maybe ResponseMsg, ResponseMsgsReceived))
-> (TVar InternalConnectionState
    -> STM (Maybe ResponseMsg, ResponseMsgsReceived))
-> STM (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ \TVar InternalConnectionState
sttv -> do
      -- IMPORTANT: No `STM.retry` here as this is called unmasked and must terminate promptly!
      st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
sttv
      firstPendingQry <- getQueryStateIfFirstOrThrow
      (newState, newPrepStmtNames) <- case (respMsg, firstPendingQry.responseMsgsState) of
        (RespParseComplete ParseComplete
msg, ResponseMsgsReceived
NoMsgsReceived) ->
          -- Add the parsed statement name to internal state if there is one
          (ResponseMsgsReceived, Set [Char])
-> STM (ResponseMsgsReceived, Set [Char])
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ParseComplete -> ResponseMsgsReceived
ParseCompleteReceived ParseComplete
msg, Set [Char] -> ([Char] -> Set [Char]) -> Maybe [Char] -> Set [Char]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe InternalConnectionState
st.preparedStatementNames ([Char] -> Set [Char] -> Set [Char]
forall a. Ord a => a -> Set a -> Set a
`Set.insert` InternalConnectionState
st.preparedStatementNames) QueryState
firstPendingQry.queryPrepStmtName)
        (RespBindComplete BindComplete
msg, ParseCompleteReceived ParseComplete
_) -> (ResponseMsgsReceived, Set [Char])
-> STM (ResponseMsgsReceived, Set [Char])
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (BindComplete -> ResponseMsgsReceived
BindCompleteReceived BindComplete
msg, InternalConnectionState
st.preparedStatementNames)
        (RespBindComplete BindComplete
msg, ResponseMsgsReceived
NoMsgsReceived) -> do
          -- This transition should only happen for already-prepared statements.
          Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe [Char] -> Bool
forall a. Maybe a -> Bool
isNothing QueryState
firstPendingQry.queryPrepStmtName) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Text -> STM ()
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement QueryState
firstPendingQry.queryText Text
"Bug in Hpgsql. Received a BindComplete without a ParseComplete but SQL statement wasn't prepared"
          (ResponseMsgsReceived, Set [Char])
-> STM (ResponseMsgsReceived, Set [Char])
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (BindComplete -> ResponseMsgsReceived
BindCompleteReceived BindComplete
msg, InternalConnectionState
st.preparedStatementNames)
        (RespNoData NoData
msg, BindCompleteReceived BindComplete
_) -> (ResponseMsgsReceived, Set [Char])
-> STM (ResponseMsgsReceived, Set [Char])
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either3 NoData RowDescription CopyInResponse
-> ResponseMsgsReceived
RowDescriptionOrNoDataOrCopyInResponseReceived (Either3 NoData RowDescription CopyInResponse
 -> ResponseMsgsReceived)
-> Either3 NoData RowDescription CopyInResponse
-> ResponseMsgsReceived
forall a b. (a -> b) -> a -> b
$ NoData -> Either3 NoData RowDescription CopyInResponse
forall a b c. a -> Either3 a b c
Left3 NoData
msg, InternalConnectionState
st.preparedStatementNames)
        (RespRowDescription RowDescription
msg, BindCompleteReceived BindComplete
_) -> (ResponseMsgsReceived, Set [Char])
-> STM (ResponseMsgsReceived, Set [Char])
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either3 NoData RowDescription CopyInResponse
-> ResponseMsgsReceived
RowDescriptionOrNoDataOrCopyInResponseReceived (Either3 NoData RowDescription CopyInResponse
 -> ResponseMsgsReceived)
-> Either3 NoData RowDescription CopyInResponse
-> ResponseMsgsReceived
forall a b. (a -> b) -> a -> b
$ RowDescription -> Either3 NoData RowDescription CopyInResponse
forall a b c. b -> Either3 a b c
Middle3 RowDescription
msg, InternalConnectionState
st.preparedStatementNames)
        (RespCopyInResponse CopyInResponse
msg, BindCompleteReceived BindComplete
_) -> (ResponseMsgsReceived, Set [Char])
-> STM (ResponseMsgsReceived, Set [Char])
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either3 NoData RowDescription CopyInResponse
-> ResponseMsgsReceived
RowDescriptionOrNoDataOrCopyInResponseReceived (Either3 NoData RowDescription CopyInResponse
 -> ResponseMsgsReceived)
-> Either3 NoData RowDescription CopyInResponse
-> ResponseMsgsReceived
forall a b. (a -> b) -> a -> b
$ CopyInResponse -> Either3 NoData RowDescription CopyInResponse
forall a b c. c -> Either3 a b c
Right3 CopyInResponse
msg, InternalConnectionState
st.preparedStatementNames)
        (RespDataRow DataRow
_, RowDescriptionOrNoDataOrCopyInResponseReceived Either3 NoData RowDescription CopyInResponse
prev) -> (ResponseMsgsReceived, Set [Char])
-> STM (ResponseMsgsReceived, Set [Char])
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either3 NoData RowDescription CopyInResponse
-> ResponseMsgsReceived
RowDescriptionOrNoDataOrCopyInResponseReceived Either3 NoData RowDescription CopyInResponse
prev, InternalConnectionState
st.preparedStatementNames) -- When draining a query that was already fetching rows, this can happen
        (RespErrorResponse ErrorResponse
msg, ResponseMsgsReceived
_) -> (ResponseMsgsReceived, Set [Char])
-> STM (ResponseMsgsReceived, Set [Char])
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Either3 NoData RowDescription CopyInResponse)
-> ErrorResponse -> ResponseMsgsReceived
ErrorResponseReceived Maybe (Either3 NoData RowDescription CopyInResponse)
forall a. Maybe a
Nothing ErrorResponse
msg, InternalConnectionState
st.preparedStatementNames)
        (RespCommandComplete CommandComplete
msg, RowDescriptionOrNoDataOrCopyInResponseReceived Either3 NoData RowDescription CopyInResponse
noDataRowDesc) -> (ResponseMsgsReceived, Set [Char])
-> STM (ResponseMsgsReceived, Set [Char])
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either3 NoData RowDescription CopyInResponse
-> CommandComplete -> ResponseMsgsReceived
CommandCompleteReceived Either3 NoData RowDescription CopyInResponse
noDataRowDesc CommandComplete
msg, InternalConnectionState
st.preparedStatementNames)
        (RespReadyForQuery ReadyForQuery
rq, ErrorResponseReceived Maybe (Either3 NoData RowDescription CopyInResponse)
_ ErrorResponse
err) -> (ResponseMsgsReceived, Set [Char])
-> STM (ResponseMsgsReceived, Set [Char])
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either ErrorResponse CommandComplete
-> ReadyForQuery -> ResponseMsgsReceived
ReadyForQueryReceived (ErrorResponse -> Either ErrorResponse CommandComplete
forall a b. a -> Either a b
Left ErrorResponse
err) ReadyForQuery
rq, InternalConnectionState
st.preparedStatementNames)
        (RespReadyForQuery ReadyForQuery
rq, CommandCompleteReceived Either3 NoData RowDescription CopyInResponse
_ CommandComplete
cmd) -> (ResponseMsgsReceived, Set [Char])
-> STM (ResponseMsgsReceived, Set [Char])
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either ErrorResponse CommandComplete
-> ReadyForQuery -> ResponseMsgsReceived
ReadyForQueryReceived (CommandComplete -> Either ErrorResponse CommandComplete
forall a b. b -> Either a b
Right CommandComplete
cmd) ReadyForQuery
rq, InternalConnectionState
st.preparedStatementNames)
        (ResponseMsg
_, ResponseMsgsReceived
before) -> ByteString -> Text -> STM (ResponseMsgsReceived, Set [Char])
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement QueryState
firstPendingQry.queryText (Text -> STM (ResponseMsgsReceived, Set [Char]))
-> Text -> STM (ResponseMsgsReceived, Set [Char])
forall a b. (a -> b) -> a -> b
$ Text
"Bug in Hpgsql. Response messages in invalid order. Had " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
Text.pack (ResponseMsgsReceived -> [Char]
forall a. Show a => a -> [Char]
show ResponseMsgsReceived
before) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" and received " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
Text.pack (ResponseMsg -> [Char]
forall a. Show a => a -> [Char]
show ResponseMsg
respMsg)
      let allQueries = InternalConnectionState -> [QueryState]
currentPipeline InternalConnectionState
st
      STM.writeTVar sttv $
        st
          { currentPipeline =
              -- We could set the pipeline to an empty list when receiving a ReadyForQuery,
              -- and that would be one fewer state to handle, but it disassociates a QueryId
              -- from ReadyForQuery and makes it impossible to resume interruption of `consumeResults`
              -- for a query that has finished executing, leading to bugs if query result
              -- consumption is interrupted in just the right place.
              map
                ( \QueryState
qs ->
                    if QueryState -> QueryId
queryIdentifier QueryState
qs QueryId -> QueryId -> Bool
forall a. Eq a => a -> a -> Bool
== QueryId
qryId
                      then
                        QueryState
qs
                          { responseMsgsState = newState
                          }
                      else QueryState
qs
                )
                allQueries,
            preparedStatementNames = newPrepStmtNames
          }
      pure (Just respMsg, newState)

-- | After sending one or more queries to the backend, run this function for each query to fetch that query's results.
-- You must call the returned IO function and consume the returned Stream completely until you get to the
-- `Either ErrorResponse CommandComplete` object.
-- For non-row returning statements like INSERT, DELETE, and UPDATE, the returned Stream will be empty but the execution status
-- will still be available in the Stream's result.
-- This function can also "consume results" of a `COPY FROM STDIN` statement, which essentially means it can receive the
-- control messages of that starting from any possible state.
-- By following these rules you will always keep the internal connection's state healthy, even in the presence of concurrency
-- and asynchronous exceptions.
consumeResults ::
  HPgConnection ->
  QueryId ->
  IO (Maybe (Either3 NoData RowDescription CopyInResponse), Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
consumeResults :: HPgConnection
-> QueryId
-> IO
     (Maybe (Either3 NoData RowDescription CopyInResponse),
      Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
consumeResults HPgConnection
conn QueryId
qryId = do
  -- debugPrint "++++ Inside consumeResults"
  -- We assume it's possible to receive a DataRow here even in the first call because `consumeResults`
  -- can be used to drain results or orphaned queries/pipelines that had been partially consumed.
  -- We could have two different functions - one for draining and another for consuming results -, but
  -- that's more code paths to test, with draining very rarely exercised.
  thisThreadId <- IO WeakThreadId
getMyWeakThreadId
  let receiveUntilTimeToReceiveRows :: IO (Maybe (Either3 NoData RowDescription CopyInResponse), Either3 ErrorResponse (Maybe DataRow) CommandComplete)
      receiveUntilTimeToReceiveRows = do
        nextMsg <- WeakThreadId
-> HPgConnection
-> QueryId
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveOutstandingResponseMsgsAtomically WeakThreadId
thisThreadId HPgConnection
conn QueryId
qryId
        case nextMsg of
          (Just (RespDataRow DataRow
dr), RowDescriptionOrNoDataOrCopyInResponseReceived Either3 NoData RowDescription CopyInResponse
noDataOrRowDesc) -> (Maybe (Either3 NoData RowDescription CopyInResponse),
 Either3 ErrorResponse (Maybe DataRow) CommandComplete)
-> IO
     (Maybe (Either3 NoData RowDescription CopyInResponse),
      Either3 ErrorResponse (Maybe DataRow) CommandComplete)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either3 NoData RowDescription CopyInResponse
-> Maybe (Either3 NoData RowDescription CopyInResponse)
forall a. a -> Maybe a
Just Either3 NoData RowDescription CopyInResponse
noDataOrRowDesc, Maybe DataRow
-> Either3 ErrorResponse (Maybe DataRow) CommandComplete
forall a b c. b -> Either3 a b c
Middle3 (Maybe DataRow
 -> Either3 ErrorResponse (Maybe DataRow) CommandComplete)
-> Maybe DataRow
-> Either3 ErrorResponse (Maybe DataRow) CommandComplete
forall a b. (a -> b) -> a -> b
$ DataRow -> Maybe DataRow
forall a. a -> Maybe a
Just DataRow
dr)
          (Just (RespDataRow DataRow
_), ResponseMsgsReceived
_) -> Text
-> IO
     (Maybe (Either3 NoData RowDescription CopyInResponse),
      Either3 ErrorResponse (Maybe DataRow) CommandComplete)
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Impossible: Got DataRow but did not have RowDescOrNoData before?"
          (Maybe ResponseMsg
_, ErrorResponseReceived Maybe (Either3 NoData RowDescription CopyInResponse)
mNoDataOrRowDesc ErrorResponse
err) -> (Maybe (Either3 NoData RowDescription CopyInResponse),
 Either3 ErrorResponse (Maybe DataRow) CommandComplete)
-> IO
     (Maybe (Either3 NoData RowDescription CopyInResponse),
      Either3 ErrorResponse (Maybe DataRow) CommandComplete)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Either3 NoData RowDescription CopyInResponse)
mNoDataOrRowDesc, ErrorResponse
-> Either3 ErrorResponse (Maybe DataRow) CommandComplete
forall a b c. a -> Either3 a b c
Left3 ErrorResponse
err)
          (Maybe ResponseMsg
_, CommandCompleteReceived Either3 NoData RowDescription CopyInResponse
noDataOrRowDesc CommandComplete
cmd) -> (Maybe (Either3 NoData RowDescription CopyInResponse),
 Either3 ErrorResponse (Maybe DataRow) CommandComplete)
-> IO
     (Maybe (Either3 NoData RowDescription CopyInResponse),
      Either3 ErrorResponse (Maybe DataRow) CommandComplete)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either3 NoData RowDescription CopyInResponse
-> Maybe (Either3 NoData RowDescription CopyInResponse)
forall a. a -> Maybe a
Just Either3 NoData RowDescription CopyInResponse
noDataOrRowDesc, CommandComplete
-> Either3 ErrorResponse (Maybe DataRow) CommandComplete
forall a b c. c -> Either3 a b c
Right3 CommandComplete
cmd)
          (Maybe ResponseMsg
_, RowDescriptionOrNoDataOrCopyInResponseReceived Either3 NoData RowDescription CopyInResponse
noDataOrRowDesc) -> (Maybe (Either3 NoData RowDescription CopyInResponse),
 Either3 ErrorResponse (Maybe DataRow) CommandComplete)
-> IO
     (Maybe (Either3 NoData RowDescription CopyInResponse),
      Either3 ErrorResponse (Maybe DataRow) CommandComplete)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either3 NoData RowDescription CopyInResponse
-> Maybe (Either3 NoData RowDescription CopyInResponse)
forall a. a -> Maybe a
Just Either3 NoData RowDescription CopyInResponse
noDataOrRowDesc, Maybe DataRow
-> Either3 ErrorResponse (Maybe DataRow) CommandComplete
forall a b c. b -> Either3 a b c
Middle3 Maybe DataRow
forall a. Maybe a
Nothing)
          (Maybe ResponseMsg
_, ParseCompleteReceived ParseComplete
_) -> IO
  (Maybe (Either3 NoData RowDescription CopyInResponse),
   Either3 ErrorResponse (Maybe DataRow) CommandComplete)
receiveUntilTimeToReceiveRows
          (Maybe ResponseMsg
_, BindCompleteReceived BindComplete
_) -> IO
  (Maybe (Either3 NoData RowDescription CopyInResponse),
   Either3 ErrorResponse (Maybe DataRow) CommandComplete)
receiveUntilTimeToReceiveRows
          (Maybe ResponseMsg
_, ReadyForQueryReceived Either ErrorResponse CommandComplete
errOrCmd ReadyForQuery
_) -> (Maybe (Either3 NoData RowDescription CopyInResponse),
 Either3 ErrorResponse (Maybe DataRow) CommandComplete)
-> IO
     (Maybe (Either3 NoData RowDescription CopyInResponse),
      Either3 ErrorResponse (Maybe DataRow) CommandComplete)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Either3 NoData RowDescription CopyInResponse)
forall a. Maybe a
Nothing, (ErrorResponse
 -> Either3 ErrorResponse (Maybe DataRow) CommandComplete)
-> (CommandComplete
    -> Either3 ErrorResponse (Maybe DataRow) CommandComplete)
-> Either ErrorResponse CommandComplete
-> Either3 ErrorResponse (Maybe DataRow) CommandComplete
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ErrorResponse
-> Either3 ErrorResponse (Maybe DataRow) CommandComplete
forall a b c. a -> Either3 a b c
Left3 CommandComplete
-> Either3 ErrorResponse (Maybe DataRow) CommandComplete
forall a b c. c -> Either3 a b c
Right3 Either ErrorResponse CommandComplete
errOrCmd)
          (Maybe ResponseMsg
_, ResponseMsgsReceived
NoMsgsReceived) -> IO
  (Maybe (Either3 NoData RowDescription CopyInResponse),
   Either3 ErrorResponse (Maybe DataRow) CommandComplete)
receiveUntilTimeToReceiveRows -- TODO: This should be unreachable
  firstMsg <- receiveUntilTimeToReceiveRows
  case firstMsg of
    (Maybe (Either3 NoData RowDescription CopyInResponse)
mERowDesc, Left3 ErrorResponse
err) -> do
      WeakThreadId -> IO ()
receiveReadyForQueryIfNecessary WeakThreadId
thisThreadId
      (Maybe (Either3 NoData RowDescription CopyInResponse),
 Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
-> IO
     (Maybe (Either3 NoData RowDescription CopyInResponse),
      Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Either3 NoData RowDescription CopyInResponse)
mERowDesc, Either ErrorResponse CommandComplete
-> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
forall a. a -> Stream (Of DataRow) IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either ErrorResponse CommandComplete
 -> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
-> Either ErrorResponse CommandComplete
-> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
forall a b. (a -> b) -> a -> b
$ ErrorResponse -> Either ErrorResponse CommandComplete
forall a b. a -> Either a b
Left ErrorResponse
err)
    (Maybe (Either3 NoData RowDescription CopyInResponse)
mERowDesc, Right3 CommandComplete
cmd) -> do
      WeakThreadId -> IO ()
receiveReadyForQueryIfNecessary WeakThreadId
thisThreadId
      (Maybe (Either3 NoData RowDescription CopyInResponse),
 Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
-> IO
     (Maybe (Either3 NoData RowDescription CopyInResponse),
      Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Either3 NoData RowDescription CopyInResponse)
mERowDesc, Either ErrorResponse CommandComplete
-> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
forall a. a -> Stream (Of DataRow) IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either ErrorResponse CommandComplete
 -> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
-> Either ErrorResponse CommandComplete
-> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
forall a b. (a -> b) -> a -> b
$ CommandComplete -> Either ErrorResponse CommandComplete
forall a b. b -> Either a b
Right CommandComplete
cmd)
    (Maybe (Either3 NoData RowDescription CopyInResponse)
mERowDesc, Middle3 Maybe DataRow
mDataRow) -> do
      let allOtherRows :: Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
allOtherRows =
            (()
 -> IO
      (Either (Either ErrorResponse CommandComplete) (Of DataRow ())))
-> ()
-> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
forall (m :: * -> *) (f :: * -> *) s r.
(Monad m, Functor f) =>
(s -> m (Either r (f s))) -> s -> Stream f m r
S.unfold
              ( \() -> do
                  -- This is a bit ugly, but we try very carefully not to bear the cost of STM
                  -- transactions at all when receiving DataRows, since they can come in very large
                  -- amounts, but we need to make sure the _other_ important messages, like ErrorResponse
                  -- and CommandComplete, do update internal state.
                  mRow <- HPgConnection
-> PgMsgParser DataRow
-> (Either (Char, Maybe PostgresError) DataRow
    -> IO (Either (Char, Maybe PostgresError) DataRow))
-> IO (Either (Char, Maybe PostgresError) DataRow)
forall a b.
Show a =>
HPgConnection
-> PgMsgParser a
-> (Either (Char, Maybe PostgresError) a -> IO b)
-> IO b
receiveNextMsgWithMaskedContinuationButDontThrowOnParsingFailure HPgConnection
conn (forall a. FromPgMessage a => PgMsgParser a
msgParser @DataRow) Either (Char, Maybe PostgresError) DataRow
-> IO (Either (Char, Maybe PostgresError) DataRow)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
                  case mRow of
                    Right DataRow
row -> Either (Either ErrorResponse CommandComplete) (Of DataRow ())
-> IO
     (Either (Either ErrorResponse CommandComplete) (Of DataRow ()))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either (Either ErrorResponse CommandComplete) (Of DataRow ())
 -> IO
      (Either (Either ErrorResponse CommandComplete) (Of DataRow ())))
-> Either (Either ErrorResponse CommandComplete) (Of DataRow ())
-> IO
     (Either (Either ErrorResponse CommandComplete) (Of DataRow ()))
forall a b. (a -> b) -> a -> b
$ Of DataRow ()
-> Either (Either ErrorResponse CommandComplete) (Of DataRow ())
forall a b. b -> Either a b
Right (DataRow
row DataRow -> () -> Of DataRow ()
forall a b. a -> b -> Of a b
:> ())
                    Left (Char, Maybe PostgresError)
_ -> do
                      stateAfterNextMsg <- (Maybe ResponseMsg, ResponseMsgsReceived) -> ResponseMsgsReceived
forall a b. (a, b) -> b
snd ((Maybe ResponseMsg, ResponseMsgsReceived) -> ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO ResponseMsgsReceived
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> WeakThreadId
-> HPgConnection
-> QueryId
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveOutstandingResponseMsgsAtomically WeakThreadId
thisThreadId HPgConnection
conn QueryId
qryId
                      case stateAfterNextMsg of
                        ErrorResponseReceived Maybe (Either3 NoData RowDescription CopyInResponse)
_ ErrorResponse
err -> do
                          WeakThreadId -> IO ()
receiveReadyForQueryIfNecessary WeakThreadId
thisThreadId
                          Either (Either ErrorResponse CommandComplete) (Of DataRow ())
-> IO
     (Either (Either ErrorResponse CommandComplete) (Of DataRow ()))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either (Either ErrorResponse CommandComplete) (Of DataRow ())
 -> IO
      (Either (Either ErrorResponse CommandComplete) (Of DataRow ())))
-> Either (Either ErrorResponse CommandComplete) (Of DataRow ())
-> IO
     (Either (Either ErrorResponse CommandComplete) (Of DataRow ()))
forall a b. (a -> b) -> a -> b
$ Either ErrorResponse CommandComplete
-> Either (Either ErrorResponse CommandComplete) (Of DataRow ())
forall a b. a -> Either a b
Left (Either ErrorResponse CommandComplete
 -> Either (Either ErrorResponse CommandComplete) (Of DataRow ()))
-> Either ErrorResponse CommandComplete
-> Either (Either ErrorResponse CommandComplete) (Of DataRow ())
forall a b. (a -> b) -> a -> b
$ ErrorResponse -> Either ErrorResponse CommandComplete
forall a b. a -> Either a b
Left ErrorResponse
err
                        CommandCompleteReceived Either3 NoData RowDescription CopyInResponse
_ CommandComplete
cmd -> do
                          WeakThreadId -> IO ()
receiveReadyForQueryIfNecessary WeakThreadId
thisThreadId
                          Either (Either ErrorResponse CommandComplete) (Of DataRow ())
-> IO
     (Either (Either ErrorResponse CommandComplete) (Of DataRow ()))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either (Either ErrorResponse CommandComplete) (Of DataRow ())
 -> IO
      (Either (Either ErrorResponse CommandComplete) (Of DataRow ())))
-> Either (Either ErrorResponse CommandComplete) (Of DataRow ())
-> IO
     (Either (Either ErrorResponse CommandComplete) (Of DataRow ()))
forall a b. (a -> b) -> a -> b
$ Either ErrorResponse CommandComplete
-> Either (Either ErrorResponse CommandComplete) (Of DataRow ())
forall a b. a -> Either a b
Left (Either ErrorResponse CommandComplete
 -> Either (Either ErrorResponse CommandComplete) (Of DataRow ()))
-> Either ErrorResponse CommandComplete
-> Either (Either ErrorResponse CommandComplete) (Of DataRow ())
forall a b. (a -> b) -> a -> b
$ CommandComplete -> Either ErrorResponse CommandComplete
forall a b. b -> Either a b
Right CommandComplete
cmd
                        ReadyForQueryReceived Either ErrorResponse CommandComplete
errOrCmd ReadyForQuery
_ -> Either (Either ErrorResponse CommandComplete) (Of DataRow ())
-> IO
     (Either (Either ErrorResponse CommandComplete) (Of DataRow ()))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either (Either ErrorResponse CommandComplete) (Of DataRow ())
 -> IO
      (Either (Either ErrorResponse CommandComplete) (Of DataRow ())))
-> Either (Either ErrorResponse CommandComplete) (Of DataRow ())
-> IO
     (Either (Either ErrorResponse CommandComplete) (Of DataRow ()))
forall a b. (a -> b) -> a -> b
$ Either ErrorResponse CommandComplete
-> Either (Either ErrorResponse CommandComplete) (Of DataRow ())
forall a b. a -> Either a b
Left Either ErrorResponse CommandComplete
errOrCmd
                        ResponseMsgsReceived
_ -> Text
-> IO
     (Either (Either ErrorResponse CommandComplete) (Of DataRow ()))
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Internal error in Hpgsql. After a DataRow we should get either an ErrorResponse or a CommandComplete message"
              )
              ()
          finalStream :: Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
finalStream = case Maybe DataRow
mDataRow of
            Maybe DataRow
Nothing -> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
allOtherRows
            Just DataRow
dr ->
              Of
  DataRow
  (Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
-> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
forall (f :: * -> *) (m :: * -> *) r.
f (Stream f m r) -> Stream f m r
SInternal.Step (Of
   DataRow
   (Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
 -> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
-> Of
     DataRow
     (Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
-> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
forall a b. (a -> b) -> a -> b
$
                DataRow
dr DataRow
-> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
-> Of
     DataRow
     (Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
forall a b. a -> b -> Of a b
:> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
allOtherRows
      (Maybe (Either3 NoData RowDescription CopyInResponse),
 Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
-> IO
     (Maybe (Either3 NoData RowDescription CopyInResponse),
      Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Either3 NoData RowDescription CopyInResponse)
mERowDesc, Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
finalStream)
  where
    receiveReadyForQueryIfNecessary :: WeakThreadId -> IO ()
    receiveReadyForQueryIfNecessary :: WeakThreadId -> IO ()
receiveReadyForQueryIfNecessary WeakThreadId
thisThreadId = IO (Maybe ResponseMsg, ResponseMsgsReceived) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ResponseMsg, ResponseMsgsReceived) -> IO ())
-> IO (Maybe ResponseMsg, ResponseMsgsReceived) -> IO ()
forall a b. (a -> b) -> a -> b
$ WeakThreadId
-> HPgConnection
-> QueryId
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveOutstandingResponseMsgsAtomically WeakThreadId
thisThreadId HPgConnection
conn QueryId
qryId

mkPostgresError :: ByteString -> ErrorResponse -> PostgresError
mkPostgresError :: ByteString -> ErrorResponse -> PostgresError
mkPostgresError ByteString
stmtText (ErrorResponse Map ErrorDetail ByteString
errDetailMap) = PostgresError {pgErrorDetails :: Map ErrorDetail ByteString
pgErrorDetails = Map ErrorDetail ByteString
errDetailMap, failedStatement :: ByteString
failedStatement = ByteString
stmtText}

throwPostgresError :: ByteString -> ErrorResponse -> IO a
throwPostgresError :: forall a. ByteString -> ErrorResponse -> IO a
throwPostgresError ByteString
stmtText ErrorResponse
errResp = PostgresError -> IO a
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throw (PostgresError -> IO a) -> PostgresError -> IO a
forall a b. (a -> b) -> a -> b
$ ByteString -> ErrorResponse -> PostgresError
mkPostgresError ByteString
stmtText ErrorResponse
errResp

throwIrrecoverableErrorWithStatement :: (MonadThrow m) => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement :: forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
stmtText Text
errMsg = IrrecoverableHpgsqlError -> m a
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throw (IrrecoverableHpgsqlError -> m a)
-> IrrecoverableHpgsqlError -> m a
forall a b. (a -> b) -> a -> b
$ IrrecoverableHpgsqlError {hpgsqlDetails :: Text
hpgsqlDetails = Text
errMsg, innerException :: Maybe SomeException
innerException = Maybe SomeException
forall a. Maybe a
Nothing, relatedStatement :: Maybe ByteString
relatedStatement = ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
stmtText}

lookupQueryText :: HPgConnection -> QueryId -> IO ByteString
lookupQueryText :: HPgConnection -> QueryId -> IO ByteString
lookupQueryText HPgConnection
conn QueryId
qryId = STM ByteString -> IO ByteString
forall a. STM a -> IO a
STM.atomically (STM ByteString -> IO ByteString)
-> STM ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ do
  st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar (HPgConnection -> TVar InternalConnectionState
internalConnectionState HPgConnection
conn)
  pure $ maybe "" queryText $ List.find ((== qryId) . queryIdentifier) (currentPipeline st)

-- | Executes a SQL statement (that may or may not be row-returning) and
-- returns the count of affected rows of the given query.
execute :: HPgConnection -> Query -> IO Int64
execute :: HPgConnection -> Query -> IO Int64
execute HPgConnection
conn Query
qry = IO (IO Int64) -> IO Int64
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO Int64) -> IO Int64) -> IO (IO Int64) -> IO Int64
forall a b. (a -> b) -> a -> b
$ HPgConnection -> Pipeline (IO Int64) -> IO (IO Int64)
forall a. HPgConnection -> Pipeline a -> IO a
runPipeline HPgConnection
conn (Query -> Pipeline (IO Int64)
pipelineExec Query
qry)

-- | Executes a SQL statement that may or may not be row-returning.
execute_ :: HPgConnection -> Query -> IO ()
execute_ :: HPgConnection -> Query -> IO ()
execute_ HPgConnection
conn Query
qry = IO Int64 -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int64 -> IO ()) -> IO Int64 -> IO ()
forall a b. (a -> b) -> a -> b
$ HPgConnection -> Query -> IO Int64
execute HPgConnection
conn Query
qry

consumeResultsIgnoreRows :: HPgConnection -> QueryId -> IO Int64
consumeResultsIgnoreRows :: HPgConnection -> QueryId -> IO Int64
consumeResultsIgnoreRows HPgConnection
conn QueryId
qryId = do
  qText <- HPgConnection -> QueryId -> IO ByteString
lookupQueryText HPgConnection
conn QueryId
qryId
  (_mRowDesc, resultsStream) <- consumeResults conn qryId
  results <- S.effects resultsStream
  case results of
    Left ErrorResponse
err -> ByteString -> ErrorResponse -> IO Int64
forall a. ByteString -> ErrorResponse -> IO a
throwPostgresError ByteString
qText ErrorResponse
err
    Right (CommandComplete Int64
n) -> Int64 -> IO Int64
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
n

-- | Runs a query and streams results directly from the connection's socket, i.e. without using cursors.
--
-- Note on thread safety: it is important to note the same thread that runs this must
-- be the thread that consumes the returned Stream, and the returned Stream must be
-- consumed completely (up to the last row or a postgres error) before you are able
-- to run other queries.
queryS :: (FromPgRow a) => HPgConnection -> Query -> IO (Stream (Of a) IO ())
queryS :: forall a.
FromPgRow a =>
HPgConnection -> Query -> IO (Stream (Of a) IO ())
queryS = RowDecoder a -> HPgConnection -> Query -> IO (Stream (Of a) IO ())
forall a.
RowDecoder a -> HPgConnection -> Query -> IO (Stream (Of a) IO ())
querySWith RowDecoder a
forall a. FromPgRow a => RowDecoder a
rowDecoder

-- | Runs a query and streams results directly from the connection's socket, i.e. without using cursors.
--
-- Note on thread safety: it is important to note the same thread that runs this must
-- be the thread that consumes the returned Stream, and the returned Stream must be
-- consumed completely (up to the last row or a postgres error) before you are able
-- to run other queries.
querySWith :: RowDecoder a -> HPgConnection -> Query -> IO (Stream (Of a) IO ())
querySWith :: forall a.
RowDecoder a -> HPgConnection -> Query -> IO (Stream (Of a) IO ())
querySWith RowDecoder a
rparser HPgConnection
conn Query
qry = IO (IO (Stream (Of a) IO ())) -> IO (Stream (Of a) IO ())
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO (Stream (Of a) IO ())) -> IO (Stream (Of a) IO ()))
-> IO (IO (Stream (Of a) IO ())) -> IO (Stream (Of a) IO ())
forall a b. (a -> b) -> a -> b
$ HPgConnection
-> Pipeline (IO (Stream (Of a) IO ()))
-> IO (IO (Stream (Of a) IO ()))
forall a. HPgConnection -> Pipeline a -> IO a
runPipeline HPgConnection
conn (Pipeline (IO (Stream (Of a) IO ()))
 -> IO (IO (Stream (Of a) IO ())))
-> Pipeline (IO (Stream (Of a) IO ()))
-> IO (IO (Stream (Of a) IO ()))
forall a b. (a -> b) -> a -> b
$ RowDecoder a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
forall a.
RowDecoder a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
pipelineSWith RowDecoder a
rparser Query
qry

-- | Runs a query and streams results directly from the connection's socket, i.e. without using cursors.
--
-- Prefer to use 'queryS' and 'querySWith', because 'RowDecoder' can typecheck PostgreSQL results
-- even when no rows are returned by queries, and 'RowDecoderMonadic' cannot.
--
-- Note on thread safety: it is important to note the same thread that runs this must
-- be the thread that consumes the returned Stream, and the returned Stream must be
-- consumed completely (up to the last row or a postgres error) before you are able
-- to run other queries.
querySMWith :: RowDecoderMonadic a -> HPgConnection -> Query -> IO (Stream (Of a) IO ())
querySMWith :: forall a.
RowDecoderMonadic a
-> HPgConnection -> Query -> IO (Stream (Of a) IO ())
querySMWith RowDecoderMonadic a
rparser HPgConnection
conn Query
qry = IO (IO (Stream (Of a) IO ())) -> IO (Stream (Of a) IO ())
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO (Stream (Of a) IO ())) -> IO (Stream (Of a) IO ()))
-> IO (IO (Stream (Of a) IO ())) -> IO (Stream (Of a) IO ())
forall a b. (a -> b) -> a -> b
$ HPgConnection
-> Pipeline (IO (Stream (Of a) IO ()))
-> IO (IO (Stream (Of a) IO ()))
forall a. HPgConnection -> Pipeline a -> IO a
runPipeline HPgConnection
conn (Pipeline (IO (Stream (Of a) IO ()))
 -> IO (IO (Stream (Of a) IO ())))
-> Pipeline (IO (Stream (Of a) IO ()))
-> IO (IO (Stream (Of a) IO ()))
forall a b. (a -> b) -> a -> b
$ RowDecoderMonadic a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
forall a.
RowDecoderMonadic a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
pipelineSMWith RowDecoderMonadic a
rparser Query
qry

-- | Sends any number of queries to the backend atomically, or throws an irrecoverable exception
-- if it can't do that. Then runs the continuation.
-- DO NOT CALL THIS FUNCTION WHILE HOLDING THE CONTROL-MSGS-LOCK, because it needs to wait/block
-- until the pipeline is ready, which won't happen if we're holding the control-msgs-lock.
sendPipeline :: HPgConnection -> NonEmpty (ByteString, QueryProtocol, Maybe String) -> (InternalConnectionState -> EncodingContext -> [SomeMessage]) -> STM () -> (NonEmpty QueryId -> IO a) -> IO a
sendPipeline :: forall a.
HPgConnection
-> NonEmpty (ByteString, QueryProtocol, Maybe [Char])
-> (InternalConnectionState -> EncodingContext -> [SomeMessage])
-> STM ()
-> (NonEmpty QueryId -> IO a)
-> IO a
sendPipeline HPgConnection
conn NonEmpty (ByteString, QueryProtocol, Maybe [Char])
queriesBeingSent InternalConnectionState -> EncodingContext -> [SomeMessage]
allMsgs STM ()
onMsgsSentTxn NonEmpty QueryId -> IO a
continuation = do
  thisWeakThreadId <- IO WeakThreadId
getMyWeakThreadId
  qryIds <- waitUntilPipelineIsReadyForNewQuery conn (getUniqueQueryStatesForNewPipeline queriesBeingSent) $ \(QueryId
nextId, QueryId
lastId) -> do
    -- If this thread is interrupted now, it is ok: only `totalQueriesSent` was bumped, but `currentPipeline`
    -- is still empty (it will be modified once we send all control messages to postgres).
    -- This is Note [Only modify totalQueriesSent]
    let newPipelineList :: [QueryState]
newPipelineList = (QueryId
 -> (ByteString, QueryProtocol, Maybe [Char]) -> QueryState)
-> [QueryId]
-> [(ByteString, QueryProtocol, Maybe [Char])]
-> [QueryState]
forall a b c. (a -> b -> c) -> [a] -> [b] -> [c]
zipWith (\QueryId
queryIdentifier (ByteString
queryText, QueryProtocol
queryProtocol, Maybe [Char]
queryPrepStmtName) -> QueryState {QueryId
queryIdentifier :: QueryId
queryIdentifier :: QueryId
queryIdentifier, ByteString
queryText :: ByteString
queryText :: ByteString
queryText, queryOwner :: WeakThreadId
queryOwner = WeakThreadId
thisWeakThreadId, QueryProtocol
queryProtocol :: QueryProtocol
queryProtocol :: QueryProtocol
queryProtocol, Maybe [Char]
queryPrepStmtName :: Maybe [Char]
queryPrepStmtName :: Maybe [Char]
queryPrepStmtName, responseMsgsState :: ResponseMsgsReceived
responseMsgsState = ResponseMsgsReceived
NoMsgsReceived}) [QueryId
nextId .. QueryId
lastId] (NonEmpty (ByteString, QueryProtocol, Maybe [Char])
-> [(ByteString, QueryProtocol, Maybe [Char])]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty (ByteString, QueryProtocol, Maybe [Char])
queriesBeingSent)
    case [QueryState] -> Maybe (NonEmpty QueryState)
forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty [QueryState]
newPipelineList of
      Maybe (NonEmpty QueryState)
Nothing -> Text -> IO (NonEmpty QueryId)
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Bug in Hpgsql: empty newPipeline to be sent"
      Just NonEmpty QueryState
newPipeline -> do
        -- The encodingContext could live within internalConnectionState or be a TVar. It gets read
        -- from and update very infrequently, so no big reason for it to be its own MVar.
        -- Although, in the end, it probably doesn't matter much
        sttv <- STM InternalConnectionState -> IO InternalConnectionState
forall a. STM a -> IO a
STM.atomically (STM InternalConnectionState -> IO InternalConnectionState)
-> STM InternalConnectionState -> IO InternalConnectionState
forall a b. (a -> b) -> a -> b
$ TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar HPgConnection
conn.internalConnectionState
        encCtx <- readMVar conn.encodingContext
        atomicallySendControlMsgs_
          conn
          ( allMsgs sttv encCtx,
            do
              st <- STM.readTVar (internalConnectionState conn)
              (_, txnStatusRightBeforeSendingPipeline) <- fullTransactionStatus (internalConnectionState conn)
              STM.writeTVar (internalConnectionState conn) $ st {currentPipeline = NE.toList newPipeline, transactionStatusBeforeCurrentPipeline = txnStatusRightBeforeSendingPipeline}
              onMsgsSentTxn -- Caller-supplied
          )
        debugPrint $ "+++ Sent QueryIds " ++ show [nextId .. lastId]
        pure $ fmap queryIdentifier newPipeline
  continuation qryIds
  where
    getUniqueQueryStatesForNewPipeline :: NonEmpty (ByteString, QueryProtocol, Maybe String) -> STM (QueryId, QueryId)
    getUniqueQueryStatesForNewPipeline :: NonEmpty (ByteString, QueryProtocol, Maybe [Char])
-> STM (QueryId, QueryId)
getUniqueQueryStatesForNewPipeline (((ByteString, QueryProtocol, Maybe [Char]) -> QueryProtocol)
-> NonEmpty (ByteString, QueryProtocol, Maybe [Char])
-> NonEmpty QueryProtocol
forall a b. (a -> b) -> NonEmpty a -> NonEmpty b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\(ByteString
_, QueryProtocol
proto, Maybe [Char]
_) -> QueryProtocol
proto) -> NonEmpty QueryProtocol
qryprotos) = do
      let sttv :: TVar InternalConnectionState
sttv = HPgConnection -> TVar InternalConnectionState
internalConnectionState HPgConnection
conn
      st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
sttv
      when (isLeft $ connectionReadyForNewPipeline st) $ throwIrrecoverableError "Bug in Hpgsql: the connection should be ready for a new pipeline due to loopUntilNoPipeline"
      -- Reserve N ids
      let nextId = Integer -> QueryId
QueryId (Integer -> QueryId) -> Integer -> QueryId
forall a b. (a -> b) -> a -> b
$ InternalConnectionState -> Integer
totalQueriesSent InternalConnectionState
st
          lastId = QueryId
nextId QueryId -> QueryId -> QueryId
forall a. Num a => a -> a -> a
+ Int -> QueryId
forall a b. (Integral a, Num b) => a -> b
fromIntegral (NonEmpty QueryProtocol -> Int
forall a. NonEmpty a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length NonEmpty QueryProtocol
qryprotos) QueryId -> QueryId -> QueryId
forall a. Num a => a -> a -> a
- QueryId
1
      -- We only modify `totalQueriesSent` in our internal state in this STM transaction.
      -- Check why in Note [Only modify totalQueriesSent]
      STM.writeTVar sttv (st {totalQueriesSent = totalQueriesSent st + fromIntegral (length qryprotos)})
      pure (nextId, lastId)

-- | Checks there is no active pipeline and runs the supplied function with the control-msg lock when there
-- isn't one. If there is an active pipeline, waits until it's done executing (and cancels-drains it if it
-- has been orphaned) until it can run the supplied function.
-- The supplied STM transaction runs while the control-msg lock is held.
-- DO NOT CALL THIS FUNCTION WHILE HOLDING THE CONTROL-MSGS-LOCK, because it needs to wait/block
-- until the pipeline is ready, which won't happen if we're holding the control-msgs-lock.
waitUntilPipelineIsReadyForNewQuery :: forall a b. HPgConnection -> STM a -> (a -> IO b) -> IO b
waitUntilPipelineIsReadyForNewQuery :: forall a b. HPgConnection -> STM a -> (a -> IO b) -> IO b
waitUntilPipelineIsReadyForNewQuery HPgConnection
conn STM a
lockAcquireStm a -> IO b
f = do
  thisWeakThreadId <- IO WeakThreadId
getMyWeakThreadId
  queriesToDrain <- acquireOwnershipOfOrphanedQueries conn
  withControlMsgsLock
    conn
    ( \TVar InternalConnectionState
sttv -> do
        st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
sttv
        (txnStatusBeforePipeline, _) <- fullTransactionStatus sttv
        pure
          ( txnStatusBeforePipeline,
            st.mustIssueRollbackBeforeNextCommand,
            case st.currentPipeline of
              [QueryState {queryProtocol :: QueryState -> QueryProtocol
queryProtocol = CopyQuery CopyQueryState
_, ByteString
queryText :: QueryState -> ByteString
queryText :: ByteString
queryText}] -> ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
queryText
              [QueryState]
_ -> Maybe ByteString
forall a. Maybe a
Nothing
          )
    )
    (const $ pure ())
    $ \(TransactionStatus
txnStatusBeforePipeline, Bool
mustIssueRollback, Maybe ByteString
isCopyCommand) -> do
      Maybe ByteString -> (ByteString -> IO ()) -> IO ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
whenJust Maybe ByteString
isCopyCommand ((ByteString -> IO ()) -> IO ()) -> (ByteString -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ByteString
copyCmd ->
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not ([QueryId] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [QueryId]
queriesToDrain) Bool -> Bool -> Bool
&& TransactionStatus
txnStatusBeforePipeline TransactionStatus -> TransactionStatus -> Bool
forall a. Eq a => a -> a -> Bool
== TransactionStatus
TransInTrans) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Text -> IO ()
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
copyCmd Text
"Hpgsql cannot resume execution from an interrupted COPY statement inside a transaction, because cancelling COPY would leave the transaction in an error state and completing it could partially complete it. There is no semantics preserving action possible."
      -- We don't want to send cancellation requests if we're in an explicit transaction
      -- because that would put the transaction in TransInError, which is not semantics-preserving
      -- (so in that case we let statements complete and just drain them).
      -- Unless of course the transaction was interrupted by an asynchronous exception, in
      -- which case it's fine to cancel because we're about to ROLLBACK anyway.
      let onlyDrainNotCancel :: Bool
onlyDrainNotCancel = TransactionStatus
txnStatusBeforePipeline TransactionStatus -> TransactionStatus -> Bool
forall a. Eq a => a -> a -> Bool
== TransactionStatus
TransInTrans Bool -> Bool -> Bool
&& Bool -> Bool
not Bool
mustIssueRollback
      HPgConnection -> Bool -> IO ()
cancelActiveStatement HPgConnection
conn Bool
onlyDrainNotCancel
      -- After draining, we ROLLBACK if we must. A failed command still leaves
      -- the need for "ROLLBACK", after all.
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
mustIssueRollback Bool -> Bool -> Bool
&& Bool -> Bool
not ([QueryId] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [QueryId]
queriesToDrain)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        [Char] -> IO ()
debugPrint [Char]
"Executing ROLLBACK now that the pipeline is clear."
        -- The command below would go into an infinite loop if not for the
        -- `not (null queriesToDrain)` check a few lines up, but that's a big hack:
        -- We couple updating internal connection state to sending the "ROLLBACK"
        -- to the backend, but if "ROLLBACK" is interrupted here, it's still not
        -- super clear what might happen. Some thoughts:
        -- - If interruption happens before ROLLBACK is sent, the state will still have
        --   mustIssueRollbackBeforeNextCommand=True. But without queries to drain,
        --   this code will never run => Problem! TODO how do we fix this?
        -- - If ROLLBACK gets sent, internal state will be updated with
        --   mustIssueRollbackBeforeNextCommand=False. The next query will try to
        --   drain it.
        --    - The next query won't try to cancel it because of txnStatusBeforePipeline,
        --      being TransInStrans, so it will only drain the ROLLBACK, which is great.
        IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ()) -> IO (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ HPgConnection -> Pipeline (IO ()) -> STM () -> IO (IO ())
forall a. HPgConnection -> Pipeline a -> STM () -> IO a
runPipelineInternal HPgConnection
conn (Query -> Pipeline (IO ())
pipelineExec_ Query
"ROLLBACK") (STM () -> IO (IO ())) -> STM () -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ do
          st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar HPgConnection
conn.internalConnectionState
          STM.writeTVar conn.internalConnectionState st {mustIssueRollbackBeforeNextCommand = False}

  -- Different threads might be racing to send their pipelines,
  -- so we choose the winner with a mutex
  retOrRepeat <- withControlMsgsLock
    conn
    ( \TVar InternalConnectionState
sttv -> do
        st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
sttv
        case connectionReadyForNewPipeline st of
          Left NonEmpty QueryState
p -> Either (NonEmpty QueryState) a
-> STM (Either (NonEmpty QueryState) a)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either (NonEmpty QueryState) a
 -> STM (Either (NonEmpty QueryState) a))
-> Either (NonEmpty QueryState) a
-> STM (Either (NonEmpty QueryState) a)
forall a b. (a -> b) -> a -> b
$ NonEmpty QueryState -> Either (NonEmpty QueryState) a
forall a b. a -> Either a b
Left NonEmpty QueryState
p
          Right TransactionStatus
_ -> a -> Either (NonEmpty QueryState) a
forall a b. b -> Either a b
Right (a -> Either (NonEmpty QueryState) a)
-> STM a -> STM (Either (NonEmpty QueryState) a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM a
lockAcquireStm
    )
    (const $ pure ())
    $ \case
      Left (QueryState {WeakThreadId
queryOwner :: QueryState -> WeakThreadId
queryOwner :: WeakThreadId
queryOwner} :| [QueryState]
_) -> Either WeakThreadId b -> IO (Either WeakThreadId b)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either WeakThreadId b -> IO (Either WeakThreadId b))
-> Either WeakThreadId b -> IO (Either WeakThreadId b)
forall a b. (a -> b) -> a -> b
$ WeakThreadId -> Either WeakThreadId b
forall a b. a -> Either a b
Left WeakThreadId
queryOwner
      Right a
acq -> do
        [Char] -> IO ()
debugPrint [Char]
"+++ No active pipeline found"
        b -> Either WeakThreadId b
forall a b. b -> Either a b
Right (b -> Either WeakThreadId b) -> IO b -> IO (Either WeakThreadId b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> a -> IO b
f a
acq
  case retOrRepeat of
    Left WeakThreadId
existingPipelineOwnerThread -> do
      -- If there is a pipeline, we must wait while _not_ holding
      -- the control-msgs lock so the other pipeline can reach
      -- completion. Also, we resume immediately if the pipeline
      -- state changes, as it's important to resume quickly to avoid introducing
      -- N * intervalMs delays for a concurrent workload with N
      -- threads blocked waiting on each other.
      let intervalMs :: Int
intervalMs = ConnectOpts -> Int
killedThreadPollIntervalMs (ConnectOpts -> Int) -> ConnectOpts -> Int
forall a b. (a -> b) -> a -> b
$ HPgConnection -> ConnectOpts
connOpts HPgConnection
conn
      [Char] -> IO ()
debugPrint ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"There is a pipeline owned by a different thread (" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ WeakThreadId -> [Char]
forall a. Show a => a -> [Char]
show WeakThreadId
existingPipelineOwnerThread [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
") so we (" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ WeakThreadId -> [Char]
forall a. Show a => a -> [Char]
show WeakThreadId
thisWeakThreadId [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
") will try again in " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
intervalMs [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"ms. Pipeline contains: "
      IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
intervalMs) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar (HPgConnection -> TVar InternalConnectionState
internalConnectionState HPgConnection
conn)
        when (isLeft $ connectionReadyForNewPipeline st) STM.retry
      HPgConnection -> STM a -> (a -> IO b) -> IO b
forall a b. HPgConnection -> STM a -> (a -> IO b) -> IO b
waitUntilPipelineIsReadyForNewQuery HPgConnection
conn STM a
lockAcquireStm a -> IO b
f
    Right b
ret -> b -> IO b
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
ret

-- | Cancels any running statements in the current connection, including COPY, or returns if there
-- is no active query to cancel.
-- Make sure you do not try to consume results of queries you have already sent if you run this, or
-- behaviour is undefined. That means if you had a Stream result and you run this function, you should
-- not further inspect the Stream, and if you had sent a pipeline with multiple queries and you run this
-- function, you should not try to consume the results of any query in that pipeline.
-- Also, PostgreSQL's protocol specifies cancellation requests require opening a new connection to the
-- server, which means parallelism can introduce non-determinism, as such:
--
--     forkIO $ query conn "SELECT ..."
--     sendCancellationRequest conn
--
-- That the cancellation request _can_ arrive before the query even arrives, so it won't be cancelled,
-- and this _can_ happen even if all the messages of the "SELECT ..." query are sent first.
-- The cancellation request can also arrive after the active query finishes.
--
-- Modulo the race condition mentioned above, the database connection should be in a healthy
-- and usable state after this function returns, although keep in mind that cancelling a
-- query inside a transaction is equivalent to that query throwing an error, so this
-- can put transactions in an error state.
cancelActiveStatement ::
  HPgConnection ->
  -- | If True, this won't send cancellation requests to postgres and will just drain orphaned/interrupted queries until they complete, if any.
  Bool ->
  IO ()
cancelActiveStatement :: HPgConnection -> Bool -> IO ()
cancelActiveStatement conn :: HPgConnection
conn@HPgConnection {ConnectOpts
connOpts :: HPgConnection -> ConnectOpts
connOpts :: ConnectOpts
connOpts} Bool
onlyDrainNotCancel = do
  -- Drain results of orphaned queries if necessary
  queriesToDrain <- HPgConnection -> IO [QueryId]
acquireOwnershipOfOrphanedQueries HPgConnection
conn
  -- Acquire control-msg lock when draining to avoid a race condition where
  -- soon after draining the last query a different thread runs a new query.
  -- We want the supplied `f` function to run on a clean state/pipeline.
  unless (null queriesToDrain) $ do
    debugPrint $ "Going to take control-msg lock to drain " ++ show queriesToDrain
    withControlMsgsLock conn (const $ pure ()) (const $ pure ()) $ \() -> do
      -- It is possible not just in theory for the cancellation request to
      -- arrive/be processed by postgres _before_ the current pipeline has
      -- reached postgres, since cancellation requests go through a different
      -- connection, so there's no guarantee of ordered delivery.
      -- Even if the cancellation request arrives later at the server,
      -- the kernel can still deliver them in different order, and postgres
      -- itself can process them in different order, at least due to the
      -- kernel's scheduler not giving guarantees.
      -- We've seen it happen in our tests (i.e. "Exercise interruption safety"),
      -- so this is not merely hypothetical.
      -- What we do here is fire a cancellation request every few seconds to cover
      -- for that.
      [Char] -> IO ()
debugPrint [Char]
"Cancelling active pipeline to drain."
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
onlyDrainNotCancel (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ HPgConnection -> IO ()
sendCancellationRequest HPgConnection
conn
      -- debugPrint $ "Draining " ++ show queriesToDrain
      let drainUntilError :: [QueryId] -> IO ()
drainUntilError [] = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
          drainUntilError (QueryId
q : [QueryId]
qs) = do
            (_, res) <- HPgConnection
-> QueryId
-> IO
     (Maybe (Either3 NoData RowDescription CopyInResponse),
      Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
consumeResults HPgConnection
conn QueryId
q
            eErrorOrCmdComplete <- S.effects res
            -- If we get an error, we cannot continue to consume the results
            -- of other queries as the whole pipeline is trashed
            case eErrorOrCmdComplete of
              Right CommandComplete
_cmdComplete -> [QueryId] -> IO ()
drainUntilError [QueryId]
qs
              Left ErrorResponse
_err -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
          alternateDrainingWithCancelReqs :: [QueryId] -> IO ()
alternateDrainingWithCancelReqs [QueryId]
qs = do
            -- If the cancellationRequestResendIntervalMs is too short, draining
            -- will never complete...
            -- We could with `withAsync` to run these in parallel, but it feels overkill.
            -- So we recommend in the docs that people don't set this too low.
            drained <- Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* ConnectOpts -> Int
cancellationRequestResendIntervalMs ConnectOpts
connOpts) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ [QueryId] -> IO ()
drainUntilError [QueryId]
qs
            case drained of
              Just () -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
              Maybe ()
Nothing -> do
                [Char] -> IO ()
debugPrint ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Sending another cancellation request as orphaned pipeline still not completely drained: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [QueryId] -> [Char]
forall a. Show a => a -> [Char]
show [QueryId]
queriesToDrain
                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
onlyDrainNotCancel (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ HPgConnection -> IO ()
sendCancellationRequest HPgConnection
conn
                leftToDrain <- HPgConnection -> IO [QueryId]
acquireOwnershipOfOrphanedQueries HPgConnection
conn
                alternateDrainingWithCancelReqs leftToDrain
      [QueryId] -> IO ()
alternateDrainingWithCancelReqs [QueryId]
queriesToDrain

-- | Returns queries that have been taken possession of by this thread for cancellation and draining
-- or an empty list if there's no need for that.
acquireOwnershipOfOrphanedQueries :: HPgConnection -> IO [QueryId]
acquireOwnershipOfOrphanedQueries :: HPgConnection -> IO [QueryId]
acquireOwnershipOfOrphanedQueries HPgConnection
conn = do
  thisThreadId <- IO WeakThreadId
getMyWeakThreadId
  debugPrint $ "+++ I am " ++ show thisThreadId ++ " and will look for orphaned queries to drain"
  withControlMsgsLock
    conn
    STM.readTVar
    (const $ pure ())
    $ \InternalConnectionState
st -> do
      if Either (NonEmpty QueryState) TransactionStatus -> Bool
forall a b. Either a b -> Bool
isRight (InternalConnectionState
-> Either (NonEmpty QueryState) TransactionStatus
connectionReadyForNewPipeline InternalConnectionState
st)
        then [QueryId] -> IO [QueryId]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
        else do
          -- TODO: We should either move the WeakThreadId owner into the full pipeline,
          -- or change this to a `takeWhile` because the internal model allows different
          -- queries to have different owners, even if in practice that shouldn't happen.
          let activeQueries :: [QueryState]
activeQueries = InternalConnectionState -> [QueryState]
currentPipeline InternalConnectionState
st
          mustTakeOwnership <- ([Bool] -> Bool) -> IO [Bool] -> IO Bool
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Bool -> Bool -> Bool) -> Bool -> [Bool] -> Bool
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
List.foldl' Bool -> Bool -> Bool
(||) Bool
False) (IO [Bool] -> IO Bool) -> IO [Bool] -> IO Bool
forall a b. (a -> b) -> a -> b
$ [QueryState] -> (QueryState -> IO Bool) -> IO [Bool]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [QueryState]
activeQueries ((QueryState -> IO Bool) -> IO [Bool])
-> (QueryState -> IO Bool) -> IO [Bool]
forall a b. (a -> b) -> a -> b
$ \QueryState {WeakThreadId
queryOwner :: QueryState -> WeakThreadId
queryOwner :: WeakThreadId
queryOwner} ->
            -- See Note [`timeout` uses the same ThreadId] for why having the same ThreadId _still_ means
            -- we need to cancel and drain those queries
            if WeakThreadId
queryOwner WeakThreadId -> WeakThreadId -> Bool
forall a. Eq a => a -> a -> Bool
== WeakThreadId
thisThreadId then Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True else WeakThreadId -> IO Bool
threadDoesNotExist WeakThreadId
queryOwner
          if mustTakeOwnership
            then do
              STM.atomically $ STM.writeTVar (internalConnectionState conn) $ st {currentPipeline = map (\QueryState
qs -> QueryState
qs {queryOwner = thisThreadId}) $ currentPipeline st}
              let owner = (QueryState -> WeakThreadId) -> [QueryState] -> [WeakThreadId]
forall a b. (a -> b) -> [a] -> [b]
map QueryState -> WeakThreadId
queryOwner [QueryState]
activeQueries
              debugPrint $ "We (" ++ show thisThreadId ++ ") took ownership of the pipeline orphaned by " ++ show owner
              pure $ map queryIdentifier activeQueries
            else pure []
  where
    threadDoesNotExist :: WeakThreadId -> IO Bool
    threadDoesNotExist :: WeakThreadId -> IO Bool
threadDoesNotExist (WeakThreadId Weak ThreadId
wtid Word64
_) =
      Weak ThreadId -> IO (Maybe ThreadId)
forall v. Weak v -> IO (Maybe v)
deRefWeak Weak ThreadId
wtid IO (Maybe ThreadId) -> (Maybe ThreadId -> IO Bool) -> IO Bool
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Maybe ThreadId
Nothing -> Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
        Just ThreadId
tid -> (ThreadStatus -> [ThreadStatus] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [ThreadStatus
ThreadDied, ThreadStatus
ThreadFinished]) (ThreadStatus -> Bool) -> IO ThreadStatus -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ThreadId -> IO ThreadStatus
threadStatus ThreadId
tid

-- | Fetches any number of rows by streaming them directly from the socket.
pipelineS :: (FromPgRow a) => Query -> Pipeline (IO (Stream (Of a) IO ()))
pipelineS :: forall a.
FromPgRow a =>
Query -> Pipeline (IO (Stream (Of a) IO ()))
pipelineS = RowDecoder a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
forall a.
RowDecoder a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
pipelineSWith RowDecoder a
forall a. FromPgRow a => RowDecoder a
rowDecoder

-- | Fetches any number of rows by streaming them directly from the socket, with a custom row decoder.
pipelineSWith :: RowDecoder a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
pipelineSWith :: forall a.
RowDecoder a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
pipelineSWith rowparser :: RowDecoder a
rowparser@(RowDecoder [FieldInfo] -> Parser a
_ [FieldInfo] -> [(FieldInfo, Bool)]
_ Int
expectedColFmts) (NonEmpty SingleQuery -> ([SingleQuery], SingleQuery)
forall a. NonEmpty a -> ([a], a)
lastAndInitNE (NonEmpty SingleQuery -> ([SingleQuery], SingleQuery))
-> (Query -> NonEmpty SingleQuery)
-> Query
-> ([SingleQuery], SingleQuery)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Query -> NonEmpty SingleQuery
breakQueryIntoStatements -> ([SingleQuery]
firstQueriesToSend, SingleQuery
lastQueryToSend)) =
  [(SingleQuery, Maybe Int)]
-> (HPgConnection -> [QueryId] -> IO (Stream (Of a) IO ()))
-> Pipeline (IO (Stream (Of a) IO ()))
forall a.
[(SingleQuery, Maybe Int)]
-> (HPgConnection -> [QueryId] -> a) -> Pipeline a
Pipeline
    ((SingleQuery -> (SingleQuery, Maybe Int))
-> [SingleQuery] -> [(SingleQuery, Maybe Int)]
forall a b. (a -> b) -> [a] -> [b]
map (,Maybe Int
forall a. Maybe a
Nothing) [SingleQuery]
firstQueriesToSend [(SingleQuery, Maybe Int)]
-> [(SingleQuery, Maybe Int)] -> [(SingleQuery, Maybe Int)]
forall a. [a] -> [a] -> [a]
++ [(SingleQuery
lastQueryToSend, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
expectedColFmts)])
    ( \HPgConnection
conn [QueryId]
qryIds -> do
        case [QueryId] -> ([QueryId], Maybe QueryId)
forall a. [a] -> ([a], Maybe a)
lastAndInit [QueryId]
qryIds of
          ([QueryId]
firstQueries, Maybe QueryId
mLastQry) -> do
            [QueryId] -> (QueryId -> IO Int64) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [QueryId]
firstQueries ((QueryId -> IO Int64) -> IO ()) -> (QueryId -> IO Int64) -> IO ()
forall a b. (a -> b) -> a -> b
$ HPgConnection -> QueryId -> IO Int64
consumeResultsIgnoreRows HPgConnection
conn
            Stream (Of a) IO () -> IO (Stream (Of a) IO ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Stream (Of a) IO () -> IO (Stream (Of a) IO ()))
-> Stream (Of a) IO () -> IO (Stream (Of a) IO ())
forall a b. (a -> b) -> a -> b
$ WhichRowDecoder a
-> HPgConnection -> QueryId -> Stream (Of a) IO ()
forall a.
WhichRowDecoder a
-> HPgConnection -> QueryId -> Stream (Of a) IO ()
consumeStreamingResults (RowDecoder a -> WhichRowDecoder a
forall a. RowDecoder a -> WhichRowDecoder a
ApplicativeRowDecoder RowDecoder a
rowparser) HPgConnection
conn (QueryId -> Maybe QueryId -> QueryId
forall a. a -> Maybe a -> a
fromMaybe ([Char] -> QueryId
forall a. HasCallStack => [Char] -> a
error [Char]
"pipelineS internal bug: no mLastQry") Maybe QueryId
mLastQry)
    )

-- | Prefer to use 'pipelineS' and 'pipelineSWith', because 'RowDecoder' can typecheck PostgreSQL results
-- even when no rows are returned by queries, and 'RowDecoderMonadic' cannot.
pipelineSMWith :: RowDecoderMonadic a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
pipelineSMWith :: forall a.
RowDecoderMonadic a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
pipelineSMWith RowDecoderMonadic a
rowparser (NonEmpty SingleQuery -> ([SingleQuery], SingleQuery)
forall a. NonEmpty a -> ([a], a)
lastAndInitNE (NonEmpty SingleQuery -> ([SingleQuery], SingleQuery))
-> (Query -> NonEmpty SingleQuery)
-> Query
-> ([SingleQuery], SingleQuery)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Query -> NonEmpty SingleQuery
breakQueryIntoStatements -> ([SingleQuery]
firstQueriesToSend, SingleQuery
lastQueryToSend)) =
  [(SingleQuery, Maybe Int)]
-> (HPgConnection -> [QueryId] -> IO (Stream (Of a) IO ()))
-> Pipeline (IO (Stream (Of a) IO ()))
forall a.
[(SingleQuery, Maybe Int)]
-> (HPgConnection -> [QueryId] -> a) -> Pipeline a
Pipeline
    ((SingleQuery -> (SingleQuery, Maybe Int))
-> [SingleQuery] -> [(SingleQuery, Maybe Int)]
forall a b. (a -> b) -> [a] -> [b]
map (,Maybe Int
forall a. Maybe a
Nothing) [SingleQuery]
firstQueriesToSend [(SingleQuery, Maybe Int)]
-> [(SingleQuery, Maybe Int)] -> [(SingleQuery, Maybe Int)]
forall a. [a] -> [a] -> [a]
++ [(SingleQuery
lastQueryToSend, Maybe Int
forall a. Maybe a
Nothing)])
    ( \HPgConnection
conn [QueryId]
qryIds -> do
        case [QueryId] -> ([QueryId], Maybe QueryId)
forall a. [a] -> ([a], Maybe a)
lastAndInit [QueryId]
qryIds of
          ([QueryId]
firstQueries, Maybe QueryId
mLastQry) -> do
            [QueryId] -> (QueryId -> IO Int64) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [QueryId]
firstQueries ((QueryId -> IO Int64) -> IO ()) -> (QueryId -> IO Int64) -> IO ()
forall a b. (a -> b) -> a -> b
$ HPgConnection -> QueryId -> IO Int64
consumeResultsIgnoreRows HPgConnection
conn
            Stream (Of a) IO () -> IO (Stream (Of a) IO ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Stream (Of a) IO () -> IO (Stream (Of a) IO ()))
-> Stream (Of a) IO () -> IO (Stream (Of a) IO ())
forall a b. (a -> b) -> a -> b
$ WhichRowDecoder a
-> HPgConnection -> QueryId -> Stream (Of a) IO ()
forall a.
WhichRowDecoder a
-> HPgConnection -> QueryId -> Stream (Of a) IO ()
consumeStreamingResults (RowDecoderMonadic a -> WhichRowDecoder a
forall a. RowDecoderMonadic a -> WhichRowDecoder a
MonadicRowDecoder RowDecoderMonadic a
rowparser) HPgConnection
conn (QueryId -> Maybe QueryId -> QueryId
forall a. a -> Maybe a -> a
fromMaybe ([Char] -> QueryId
forall a. HasCallStack => [Char] -> a
error [Char]
"pipelineSMWith internal bug: no mLastQry") Maybe QueryId
mLastQry)
    )

-- | Fetches any number of rows from a query.
pipeline :: (FromPgRow a) => Query -> Pipeline (IO [a])
pipeline :: forall a. FromPgRow a => Query -> Pipeline (IO [a])
pipeline = RowDecoder a -> Query -> Pipeline (IO [a])
forall a. RowDecoder a -> Query -> Pipeline (IO [a])
pipelineWith RowDecoder a
forall a. FromPgRow a => RowDecoder a
rowDecoder

-- | Fetches any number of rows from a query with a custom row decoder.
pipelineWith :: RowDecoder a -> Query -> Pipeline (IO [a])
pipelineWith :: forall a. RowDecoder a -> Query -> Pipeline (IO [a])
pipelineWith RowDecoder a
rowparser Query
q = (Stream (Of a) IO () -> IO [a]
forall (m :: * -> *) a r. Monad m => Stream (Of a) m r -> m [a]
S.toList_ (Stream (Of a) IO () -> IO [a])
-> IO (Stream (Of a) IO ()) -> IO [a]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) (IO (Stream (Of a) IO ()) -> IO [a])
-> Pipeline (IO (Stream (Of a) IO ())) -> Pipeline (IO [a])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RowDecoder a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
forall a.
RowDecoder a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
pipelineSWith RowDecoder a
rowparser Query
q

-- | Prefer to use 'pipeline' and 'pipelineWith', because 'RowDecoder' can typecheck PostgreSQL results
-- even when no rows are returned by queries, and 'RowDecoderMonadic' cannot.
pipelineM :: RowDecoderMonadic a -> Query -> Pipeline (IO [a])
pipelineM :: forall a. RowDecoderMonadic a -> Query -> Pipeline (IO [a])
pipelineM RowDecoderMonadic a
rowparser Query
q = (Stream (Of a) IO () -> IO [a]
forall (m :: * -> *) a r. Monad m => Stream (Of a) m r -> m [a]
S.toList_ (Stream (Of a) IO () -> IO [a])
-> IO (Stream (Of a) IO ()) -> IO [a]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) (IO (Stream (Of a) IO ()) -> IO [a])
-> Pipeline (IO (Stream (Of a) IO ())) -> Pipeline (IO [a])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RowDecoderMonadic a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
forall a.
RowDecoderMonadic a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
pipelineSMWith RowDecoderMonadic a
rowparser Query
q

-- | Fetch exactly one row (not zero, not more than one) or throw
-- an exception otherwise.
pipeline1 :: (FromPgRow a) => Query -> Pipeline (IO a)
pipeline1 :: forall a. FromPgRow a => Query -> Pipeline (IO a)
pipeline1 = RowDecoder a -> Query -> Pipeline (IO a)
forall a. RowDecoder a -> Query -> Pipeline (IO a)
pipeline1With RowDecoder a
forall a. FromPgRow a => RowDecoder a
rowDecoder

-- | Fetch exactly one row (not zero, not more than one) or throw
-- an exception otherwise.
pipeline1With :: RowDecoder a -> Query -> Pipeline (IO a)
pipeline1With :: forall a. RowDecoder a -> Query -> Pipeline (IO a)
pipeline1With RowDecoder a
rowparser Query
q = ([a] -> IO a
toSingleRow ([a] -> IO a) -> IO [a] -> IO a
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) (IO [a] -> IO a) -> Pipeline (IO [a]) -> Pipeline (IO a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RowDecoder a -> Query -> Pipeline (IO [a])
forall a. RowDecoder a -> Query -> Pipeline (IO [a])
pipelineWith RowDecoder a
rowparser Query
q
  where
    toSingleRow :: [a] -> IO a
toSingleRow [a]
res = do
      let queryBs :: ByteString
queryBs = Query -> ByteString
queryToByteString Query
q
      case [a]
res of
        [] -> ByteString -> Text -> IO a
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
queryBs Text
"Expected exactly one row in query/pipeline1 call, but got none."
        [a
singleRow] -> a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
singleRow
        [a]
_ -> ByteString -> Text -> IO a
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
queryBs Text
"Expected exactly one row in query/pipeline1 call, but got more than one."

-- | Fetch one or zero rows (not more than one) or throw an exception otherwise.
pipelineMay :: (FromPgRow a) => Query -> Pipeline (IO (Maybe a))
pipelineMay :: forall a. FromPgRow a => Query -> Pipeline (IO (Maybe a))
pipelineMay = RowDecoder a -> Query -> Pipeline (IO (Maybe a))
forall a. RowDecoder a -> Query -> Pipeline (IO (Maybe a))
pipelineMayWith RowDecoder a
forall a. FromPgRow a => RowDecoder a
rowDecoder

-- | Fetch one or zero rows (not more than one) or throw an exception otherwise.
pipelineMayWith :: RowDecoder a -> Query -> Pipeline (IO (Maybe a))
pipelineMayWith :: forall a. RowDecoder a -> Query -> Pipeline (IO (Maybe a))
pipelineMayWith RowDecoder a
rowparser Query
q = ([a] -> IO (Maybe a)
toMaybeRow ([a] -> IO (Maybe a)) -> IO [a] -> IO (Maybe a)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) (IO [a] -> IO (Maybe a))
-> Pipeline (IO [a]) -> Pipeline (IO (Maybe a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RowDecoder a -> Query -> Pipeline (IO [a])
forall a. RowDecoder a -> Query -> Pipeline (IO [a])
pipelineWith RowDecoder a
rowparser Query
q
  where
    toMaybeRow :: [a] -> IO (Maybe a)
toMaybeRow [a]
res = do
      let queryBs :: ByteString
queryBs = Query -> ByteString
queryToByteString Query
q
      case [a]
res of
        [] -> Maybe a -> IO (Maybe a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing
        [a
singleRow] -> Maybe a -> IO (Maybe a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe a -> IO (Maybe a)) -> Maybe a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
singleRow
        [a]
_ -> ByteString -> Text -> IO (Maybe a)
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
queryBs Text
"Expected zero or one row in query/pipelineMay call, but got more than one."

-- | Returns the count of affected rows of the given query.
pipelineExec :: Query -> Pipeline (IO Int64)
pipelineExec :: Query -> Pipeline (IO Int64)
pipelineExec = NonEmpty SingleQuery -> Pipeline (IO Int64)
pipelineExecInternal (NonEmpty SingleQuery -> Pipeline (IO Int64))
-> (Query -> NonEmpty SingleQuery) -> Query -> Pipeline (IO Int64)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Query -> NonEmpty SingleQuery
breakQueryIntoStatements

pipelineExec_ :: Query -> Pipeline (IO ())
pipelineExec_ :: Query -> Pipeline (IO ())
pipelineExec_ = (IO Int64 -> IO ()) -> Pipeline (IO Int64) -> Pipeline (IO ())
forall a b. (a -> b) -> Pipeline a -> Pipeline b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap IO Int64 -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Pipeline (IO Int64) -> Pipeline (IO ()))
-> (Query -> Pipeline (IO Int64)) -> Query -> Pipeline (IO ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NonEmpty SingleQuery -> Pipeline (IO Int64)
pipelineExecInternal (NonEmpty SingleQuery -> Pipeline (IO Int64))
-> (Query -> NonEmpty SingleQuery) -> Query -> Pipeline (IO Int64)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Query -> NonEmpty SingleQuery
breakQueryIntoStatements

pipelineExecInternal :: NonEmpty SingleQuery -> Pipeline (IO Int64)
pipelineExecInternal :: NonEmpty SingleQuery -> Pipeline (IO Int64)
pipelineExecInternal NonEmpty SingleQuery
qs =
  [(SingleQuery, Maybe Int)]
-> (HPgConnection -> [QueryId] -> IO Int64) -> Pipeline (IO Int64)
forall a.
[(SingleQuery, Maybe Int)]
-> (HPgConnection -> [QueryId] -> a) -> Pipeline a
Pipeline
    ((SingleQuery -> (SingleQuery, Maybe Int))
-> [SingleQuery] -> [(SingleQuery, Maybe Int)]
forall a b. (a -> b) -> [a] -> [b]
map (,Maybe Int
forall a. Maybe a
Nothing) (NonEmpty SingleQuery -> [SingleQuery]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty SingleQuery
qs))
    ( \HPgConnection
conn [QueryId]
qryIds -> do
        case [QueryId] -> ([QueryId], Maybe QueryId)
forall a. [a] -> ([a], Maybe a)
lastAndInit [QueryId]
qryIds of
          ([QueryId]
firstQueries, Maybe QueryId
mLastQry) -> do
            [QueryId] -> (QueryId -> IO Int64) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [QueryId]
firstQueries ((QueryId -> IO Int64) -> IO ()) -> (QueryId -> IO Int64) -> IO ()
forall a b. (a -> b) -> a -> b
$ HPgConnection -> QueryId -> IO Int64
consumeResultsIgnoreRows HPgConnection
conn
            HPgConnection -> QueryId -> IO Int64
consumeResultsIgnoreRows HPgConnection
conn (QueryId -> Maybe QueryId -> QueryId
forall a. a -> Maybe a -> a
fromMaybe ([Char] -> QueryId
forall a. HasCallStack => [Char] -> a
error [Char]
"pipelineExec internal bug: no mLastQry") Maybe QueryId
mLastQry)
    )

-- | Runs a pipeline of statements, that is, sends multiple SQL statements in a single round-trip
-- to the server.
--
-- Note on thread safety: the thread that runs this must be the thread that consumes the
-- results of every query in the supplied pipeline _in order_, until all
-- query results are consumed or an error occurs.
-- Anything else is not officially supported by Hpgsql and may result in deadlocks or undefined behaviour.
runPipeline :: HPgConnection -> Pipeline a -> IO a
runPipeline :: forall a. HPgConnection -> Pipeline a -> IO a
runPipeline HPgConnection
conn Pipeline a
pip = HPgConnection -> Pipeline a -> STM () -> IO a
forall a. HPgConnection -> Pipeline a -> STM () -> IO a
runPipelineInternal HPgConnection
conn Pipeline a
pip (() -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())

-- | Sends a Pipeline coupled with an STM transaction that runs when
-- the messages cross the userspace/kernel frontier (aka "are sent").
runPipelineInternal :: HPgConnection -> Pipeline a -> STM () -> IO a
runPipelineInternal :: forall a. HPgConnection -> Pipeline a -> STM () -> IO a
runPipelineInternal HPgConnection
conn (Pipeline ([(SingleQuery, Maybe Int)]
-> Maybe (NonEmpty (SingleQuery, Maybe Int))
forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty -> Maybe (NonEmpty (SingleQuery, Maybe Int))
mQueries) HPgConnection -> [QueryId] -> a
run) STM ()
onMsgsSentTxn =
  case Maybe (NonEmpty (SingleQuery, Maybe Int))
mQueries of
    Maybe (NonEmpty (SingleQuery, Maybe Int))
Nothing -> a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> IO a) -> a -> IO a
forall a b. (a -> b) -> a -> b
$ HPgConnection -> [QueryId] -> a
run HPgConnection
conn []
    Just NonEmpty (SingleQuery, Maybe Int)
queries -> do
      HPgConnection
-> NonEmpty (ByteString, QueryProtocol, Maybe [Char])
-> (InternalConnectionState -> EncodingContext -> [SomeMessage])
-> STM ()
-> (NonEmpty QueryId -> IO a)
-> IO a
forall a.
HPgConnection
-> NonEmpty (ByteString, QueryProtocol, Maybe [Char])
-> (InternalConnectionState -> EncodingContext -> [SomeMessage])
-> STM ()
-> (NonEmpty QueryId -> IO a)
-> IO a
sendPipeline
        HPgConnection
conn
        (((SingleQuery, Maybe Int)
 -> (ByteString, QueryProtocol, Maybe [Char]))
-> NonEmpty (SingleQuery, Maybe Int)
-> NonEmpty (ByteString, QueryProtocol, Maybe [Char])
forall a b. (a -> b) -> NonEmpty a -> NonEmpty b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\(SingleQuery {ByteString
queryString :: ByteString
queryString :: SingleQuery -> ByteString
queryString, Maybe [Char]
preparedStmtHash :: Maybe [Char]
preparedStmtHash :: SingleQuery -> Maybe [Char]
preparedStmtHash}, Maybe Int
_) -> (ByteString
queryString, QueryProtocol
ExtendedQuery, Maybe [Char]
preparedStmtHash)) NonEmpty (SingleQuery, Maybe Int)
queries)
        ( \InternalConnectionState
st EncodingContext
encCtx ->
            let toMessages :: Bool -> (SingleQuery, Maybe Int) -> [SomeMessage]
toMessages Bool
alreadyParsed (SingleQuery ByteString
qryString [EncodingContext -> (Maybe Oid, BinaryField)]
qryParams Maybe [Char]
preparedStmtHash, Maybe Int
mExpectedResultColFmts) =
                  let paramOidsAndValues :: [(Maybe Oid, BinaryField)]
paramOidsAndValues = ((EncodingContext -> (Maybe Oid, BinaryField))
 -> (Maybe Oid, BinaryField))
-> [EncodingContext -> (Maybe Oid, BinaryField)]
-> [(Maybe Oid, BinaryField)]
forall a b. (a -> b) -> [a] -> [b]
map ((EncodingContext -> (Maybe Oid, BinaryField))
-> EncodingContext -> (Maybe Oid, BinaryField)
forall a b. (a -> b) -> a -> b
$ EncodingContext
encCtx) [EncodingContext -> (Maybe Oid, BinaryField)]
qryParams
                   in ( if Bool -> Bool
not Bool
alreadyParsed
                          then
                            [Parse -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage (Parse -> SomeMessage) -> Parse -> SomeMessage
forall a b. (a -> b) -> a -> b
$ ByteString -> [Maybe Oid] -> Maybe [Char] -> Parse
Parse ByteString
qryString (((Maybe Oid, BinaryField) -> Maybe Oid)
-> [(Maybe Oid, BinaryField)] -> [Maybe Oid]
forall a b. (a -> b) -> [a] -> [b]
map (Maybe Oid, BinaryField) -> Maybe Oid
forall a b. (a, b) -> a
fst [(Maybe Oid, BinaryField)]
paramOidsAndValues) Maybe [Char]
preparedStmtHash]
                          else []
                      )
                        [SomeMessage] -> [SomeMessage] -> [SomeMessage]
forall a. [a] -> [a] -> [a]
++ [ Bind -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage (Bind -> SomeMessage) -> Bind -> SomeMessage
forall a b. (a -> b) -> a -> b
$
                               Bind
                                 { paramsValuesInOrder :: [BinaryField]
paramsValuesInOrder = ((Maybe Oid, BinaryField) -> BinaryField)
-> [(Maybe Oid, BinaryField)] -> [BinaryField]
forall a b. (a -> b) -> [a] -> [b]
map (Maybe Oid, BinaryField) -> BinaryField
forall a b. (a, b) -> b
snd [(Maybe Oid, BinaryField)]
paramOidsAndValues,
                                   resultColumnFmts :: Int
resultColumnFmts = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe Int
1 Maybe Int
mExpectedResultColFmts,
                                   Maybe [Char]
preparedStmtHash :: Maybe [Char]
preparedStmtHash :: Maybe [Char]
preparedStmtHash
                                 },
                             Describe -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage Describe
Describe,
                             Execute -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage Execute
Execute
                           ]
                queryMsgs :: [SomeMessage]
queryMsgs =
                  [[SomeMessage]] -> [SomeMessage]
forall a. Monoid a => [a] -> a
mconcat ([[SomeMessage]] -> [SomeMessage])
-> [[SomeMessage]] -> [SomeMessage]
forall a b. (a -> b) -> a -> b
$
                    (Set [Char], [[SomeMessage]]) -> [[SomeMessage]]
forall a b. (a, b) -> b
snd ((Set [Char], [[SomeMessage]]) -> [[SomeMessage]])
-> (Set [Char], [[SomeMessage]]) -> [[SomeMessage]]
forall a b. (a -> b) -> a -> b
$
                      (Set [Char]
 -> (SingleQuery, Maybe Int) -> (Set [Char], [SomeMessage]))
-> Set [Char]
-> [(SingleQuery, Maybe Int)]
-> (Set [Char], [[SomeMessage]])
forall (t :: * -> *) s a b.
Traversable t =>
(s -> a -> (s, b)) -> s -> t a -> (s, t b)
List.mapAccumL
                        ( \Set [Char]
pnames (SingleQuery, Maybe Int)
qry ->
                            case ((SingleQuery, Maybe Int) -> SingleQuery
forall a b. (a, b) -> a
fst (SingleQuery, Maybe Int)
qry).preparedStmtHash of
                              Maybe [Char]
Nothing -> (Set [Char]
pnames, Bool -> (SingleQuery, Maybe Int) -> [SomeMessage]
toMessages Bool
False (SingleQuery, Maybe Int)
qry)
                              Just [Char]
psh -> ([Char] -> Set [Char] -> Set [Char]
forall a. Ord a => a -> Set a -> Set a
Set.insert [Char]
psh Set [Char]
pnames, Bool -> (SingleQuery, Maybe Int) -> [SomeMessage]
toMessages ([Char]
psh [Char] -> Set [Char] -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.member` Set [Char]
pnames) (SingleQuery, Maybe Int)
qry)
                        )
                        InternalConnectionState
st.preparedStatementNames
                        (NonEmpty (SingleQuery, Maybe Int) -> [(SingleQuery, Maybe Int)]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty (SingleQuery, Maybe Int)
queries)
             in [SomeMessage]
queryMsgs [SomeMessage] -> [SomeMessage] -> [SomeMessage]
forall a. [a] -> [a] -> [a]
++ [Sync -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage Sync
Sync] -- concatMap toMessages queries ++ [SomeMessage Sync])
        )
        STM ()
onMsgsSentTxn
        ((NonEmpty QueryId -> IO a) -> IO a)
-> (NonEmpty QueryId -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \NonEmpty QueryId
qryIds -> a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> IO a) -> a -> IO a
forall a b. (a -> b) -> a -> b
$ HPgConnection -> [QueryId] -> a
run HPgConnection
conn (NonEmpty QueryId -> [QueryId]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty QueryId
qryIds)

data WhichRowDecoder a = ApplicativeRowDecoder !(RowDecoder a) | MonadicRowDecoder !(RowDecoderMonadic a)

consumeStreamingResults :: WhichRowDecoder a -> HPgConnection -> QueryId -> Stream (Of a) IO ()
consumeStreamingResults :: forall a.
WhichRowDecoder a
-> HPgConnection -> QueryId -> Stream (Of a) IO ()
consumeStreamingResults WhichRowDecoder a
rp HPgConnection
conn QueryId
qryId = IO (Stream (Of a) IO ()) -> Stream (Of a) IO ()
forall (m :: * -> *) (f :: * -> *) r.
(Monad m, Functor f) =>
m (Stream f m r) -> Stream f m r
S.effect (IO (Stream (Of a) IO ()) -> Stream (Of a) IO ())
-> IO (Stream (Of a) IO ()) -> Stream (Of a) IO ()
forall a b. (a -> b) -> a -> b
$ do
  qText <- HPgConnection -> QueryId -> IO ByteString
lookupQueryText HPgConnection
conn QueryId
qryId
  (mERowDesc, rowsStream) <- consumeResults conn qryId
  case mERowDesc of
    Maybe (Either3 NoData RowDescription CopyInResponse)
Nothing -> do
      -- This is likely an error that happened when binding parameters (e.g. more/fewer params necessary than were sent)
      -- or a query that has no parameters and fails very early (e.g. "SELECT 1/0")
      rowCount :> res <- Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
-> IO (Of Int (Either ErrorResponse CommandComplete))
forall (m :: * -> *) a r.
Monad m =>
Stream (Of a) m r -> m (Of Int r)
S.length Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
rowsStream
      when (rowCount > 0) $ throwIrrecoverableErrorWithStatement qText "Bug in Hpgsql. We didn't get either NoData or RowDescription, so we assumed there was an error binding the query, but we got more than 0 rows in results"
      case res of
        Left ErrorResponse
err -> ByteString -> ErrorResponse -> IO (Stream (Of a) IO ())
forall a. ByteString -> ErrorResponse -> IO a
throwPostgresError ByteString
qText ErrorResponse
err
        Right CommandComplete
_cmd -> ByteString -> Text -> IO (Stream (Of a) IO ())
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
qText Text
"Bug in Hpgsql. We didn't get either NoData or RowDescription, so we assumed there was an error binding the query, but we then received a CommandComplete."
    Just (Left3 NoData
_noData) -> ByteString -> Text -> IO (Stream (Of a) IO ())
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
qText Text
"You have sent a count-returning query but expected it to be a rows-returning query. This is not supported."
    Just (Right3 CopyInResponse
_copyInResponse) -> ByteString -> Text -> IO (Stream (Of a) IO ())
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
qText Text
"You have sent a COPY FROM STDIN query but expected it to be a rows-returning query. This is not supported."
    Just (Middle3 (RowDescription [(Text, Oid)]
coltypes)) -> do
      encodingContext <- MVar EncodingContext -> IO EncodingContext
forall a. MVar a -> IO a
readMVar HPgConnection
conn.encodingContext
      let numResultColumns = [(Text, Oid)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(Text, Oid)]
coltypes
          mkColInfo (Text
colName, Oid
oid) = Oid -> Maybe Text -> EncodingContext -> FieldInfo
FieldInfo Oid
oid (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
colName) EncodingContext
encodingContext
          colInfos = ((Text, Oid) -> FieldInfo) -> [(Text, Oid)] -> [FieldInfo]
forall a b. (a -> b) -> [a] -> [b]
map (Text, Oid) -> FieldInfo
mkColInfo [(Text, Oid)]
coltypes
      !rowparser <- case rp of
        ApplicativeRowDecoder (RowDecoder [FieldInfo] -> Parser a
rparser [FieldInfo] -> [(FieldInfo, Bool)]
rtypecheck Int
expectedNumCols) -> do
          let typecheckedColInfos :: [(FieldInfo, Bool)]
typecheckedColInfos = [FieldInfo] -> [(FieldInfo, Bool)]
rtypecheck [FieldInfo]
colInfos
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Int
numResultColumns Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
expectedNumCols) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Text -> IO ()
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
qText (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Query result contains " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
Text.pack (Int -> [Char]
forall a. Show a => a -> [Char]
show Int
numResultColumns) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" columns but row parser expected " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
Text.pack (Int -> [Char]
forall a. Show a => a -> [Char]
show Int
expectedNumCols)
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (((FieldInfo, Bool) -> Bool) -> [(FieldInfo, Bool)] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all (FieldInfo, Bool) -> Bool
forall a b. (a, b) -> b
snd [(FieldInfo, Bool)]
typecheckedColInfos) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Text -> IO ()
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
qText Text
"Query result column types do not match expected column types"
          Parser a -> IO (Parser a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Parser a -> IO (Parser a)) -> Parser a -> IO (Parser a)
forall a b. (a -> b) -> a -> b
$ [FieldInfo] -> Parser a
rparser [FieldInfo]
colInfos Parser a -> Parser () -> Parser a
forall a b. Parser a -> Parser b -> Parser a
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* Parser ()
Parser.endOfInput
        MonadicRowDecoder (RowDecoderMonadic ConversionState -> Parser (a, Int)
rparser) -> Parser a -> IO (Parser a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Parser a -> IO (Parser a)) -> Parser a -> IO (Parser a)
forall a b. (a -> b) -> a -> b
$ ((a, Int) -> a) -> Parser (a, Int) -> Parser a
forall a b. (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, Int) -> a
forall a b. (a, b) -> a
fst (Parser (a, Int) -> Parser a) -> Parser (a, Int) -> Parser a
forall a b. (a -> b) -> a -> b
$ ConversionState -> Parser (a, Int)
rparser ConversionState {colsLeftToParse :: [FieldInfo]
colsLeftToParse = [FieldInfo]
colInfos} Parser (a, Int) -> Parser () -> Parser (a, Int)
forall a b. Parser a -> Parser b -> Parser a
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* Parser ()
Parser.endOfInput
      pure $ do
        errOrCmdComplete <-
          S.mapM
            ( \(DataRow ByteString
rowColumnData) ->
                case Parser a -> ByteString -> ParseResult a
forall a. Parser a -> ByteString -> ParseResult a
Parser.parseOnly Parser a
rowparser ByteString
rowColumnData of
                  Parser.ParseOk a
row -> a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
row
                  Parser.ParseFail [Char]
err -> ByteString -> Text -> IO a
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
qText (Text -> IO a) -> Text -> IO a
forall a b. (a -> b) -> a -> b
$ Text
"Failed parsing a row: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
Text.pack ([Char] -> [Char]
forall a. Show a => a -> [Char]
show [Char]
err)
            )
            rowsStream
        S.effect $ case errOrCmdComplete of
          Left ErrorResponse
err -> ByteString -> ErrorResponse -> IO (Stream (Of a) IO ())
forall a. ByteString -> ErrorResponse -> IO a
throwPostgresError ByteString
qText ErrorResponse
err
          Right CommandComplete
_cmdComplete -> Stream (Of a) IO () -> IO (Stream (Of a) IO ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Stream (Of a) IO ()
forall a. Monoid a => a
mempty

-- | Fetches any number of rows from a query.
--
-- > query conn "SELECT * FROM table"
query :: forall a. (FromPgRow a) => HPgConnection -> Query -> IO [a]
query :: forall a. FromPgRow a => HPgConnection -> Query -> IO [a]
query = RowDecoder a -> HPgConnection -> Query -> IO [a]
forall a. RowDecoder a -> HPgConnection -> Query -> IO [a]
queryWith (forall a. FromPgRow a => RowDecoder a
rowDecoder @a)

-- | Fetches any number of rows from a query with a custom row decoder.
--
-- > queryWith rowDecoder conn "SELECT * FROM table"
queryWith :: RowDecoder a -> HPgConnection -> Query -> IO [a]
queryWith :: forall a. RowDecoder a -> HPgConnection -> Query -> IO [a]
queryWith RowDecoder a
rparser HPgConnection
conn Query
qry = IO (IO [a]) -> IO [a]
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO [a]) -> IO [a]) -> IO (IO [a]) -> IO [a]
forall a b. (a -> b) -> a -> b
$ HPgConnection -> Pipeline (IO [a]) -> IO (IO [a])
forall a. HPgConnection -> Pipeline a -> IO a
runPipeline HPgConnection
conn (Pipeline (IO [a]) -> IO (IO [a]))
-> Pipeline (IO [a]) -> IO (IO [a])
forall a b. (a -> b) -> a -> b
$ RowDecoder a -> Query -> Pipeline (IO [a])
forall a. RowDecoder a -> Query -> Pipeline (IO [a])
pipelineWith RowDecoder a
rparser Query
qry

-- | Prefer to use 'query' and 'queryWith', because 'RowDecoder' can typecheck PostgreSQL results
-- even when no rows are returned by queries, and 'RowDecoderMonadic' cannot.
queryMWith :: RowDecoderMonadic a -> HPgConnection -> Query -> IO [a]
queryMWith :: forall a. RowDecoderMonadic a -> HPgConnection -> Query -> IO [a]
queryMWith RowDecoderMonadic a
rparser HPgConnection
conn Query
qry = IO (IO [a]) -> IO [a]
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO [a]) -> IO [a]) -> IO (IO [a]) -> IO [a]
forall a b. (a -> b) -> a -> b
$ HPgConnection -> Pipeline (IO [a]) -> IO (IO [a])
forall a. HPgConnection -> Pipeline a -> IO a
runPipeline HPgConnection
conn (Pipeline (IO [a]) -> IO (IO [a]))
-> Pipeline (IO [a]) -> IO (IO [a])
forall a b. (a -> b) -> a -> b
$ RowDecoderMonadic a -> Query -> Pipeline (IO [a])
forall a. RowDecoderMonadic a -> Query -> Pipeline (IO [a])
pipelineM RowDecoderMonadic a
rparser Query
qry

-- | Fetch exactly one row (not zero, not more than one) or throw
-- an exception otherwise.
query1 :: forall a. (FromPgRow a) => HPgConnection -> Query -> IO a
query1 :: forall a. FromPgRow a => HPgConnection -> Query -> IO a
query1 = RowDecoder a -> HPgConnection -> Query -> IO a
forall a. RowDecoder a -> HPgConnection -> Query -> IO a
query1With RowDecoder a
forall a. FromPgRow a => RowDecoder a
rowDecoder

-- | Fetch exactly one row (not zero, not more than one) or throw
-- an exception otherwise.
query1With :: RowDecoder a -> HPgConnection -> Query -> IO a
query1With :: forall a. RowDecoder a -> HPgConnection -> Query -> IO a
query1With RowDecoder a
rparser HPgConnection
conn Query
q = IO (IO a) -> IO a
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO a) -> IO a) -> IO (IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ HPgConnection -> Pipeline (IO a) -> IO (IO a)
forall a. HPgConnection -> Pipeline a -> IO a
runPipeline HPgConnection
conn (Pipeline (IO a) -> IO (IO a)) -> Pipeline (IO a) -> IO (IO a)
forall a b. (a -> b) -> a -> b
$ RowDecoder a -> Query -> Pipeline (IO a)
forall a. RowDecoder a -> Query -> Pipeline (IO a)
pipeline1With RowDecoder a
rparser Query
q

-- | Fetch one or zero rows (not more than one) or throw an exception otherwise.
queryMay :: forall a. (FromPgRow a) => HPgConnection -> Query -> IO (Maybe a)
queryMay :: forall a. FromPgRow a => HPgConnection -> Query -> IO (Maybe a)
queryMay = RowDecoder a -> HPgConnection -> Query -> IO (Maybe a)
forall a. RowDecoder a -> HPgConnection -> Query -> IO (Maybe a)
queryMayWith RowDecoder a
forall a. FromPgRow a => RowDecoder a
rowDecoder

-- | Fetch one or zero rows (not more than one) or throw an exception otherwise.
queryMayWith :: RowDecoder a -> HPgConnection -> Query -> IO (Maybe a)
queryMayWith :: forall a. RowDecoder a -> HPgConnection -> Query -> IO (Maybe a)
queryMayWith RowDecoder a
rparser HPgConnection
conn Query
q = IO (IO (Maybe a)) -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO (Maybe a)) -> IO (Maybe a))
-> IO (IO (Maybe a)) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ HPgConnection -> Pipeline (IO (Maybe a)) -> IO (IO (Maybe a))
forall a. HPgConnection -> Pipeline a -> IO a
runPipeline HPgConnection
conn (Pipeline (IO (Maybe a)) -> IO (IO (Maybe a)))
-> Pipeline (IO (Maybe a)) -> IO (IO (Maybe a))
forall a b. (a -> b) -> a -> b
$ RowDecoder a -> Query -> Pipeline (IO (Maybe a))
forall a. RowDecoder a -> Query -> Pipeline (IO (Maybe a))
pipelineMayWith RowDecoder a
rparser Query
q

-- | Runs a COPY FROM STDIN statement, giving you the ability to pass a
-- row-inserting function.
--
-- > withCopy_ conn "COPY employee FROM STDIN WITH (FORMAT CSV)" $ do
-- >    putCopyData conn "5,Dracula\n"
-- >    putCopyData conn "6,The Grinch\n"
--
-- You can also use `copyFromS` for binary COPY.
--
-- Note on interruption safety: if this is interrupted by an asynchronous
-- exception while running inside a transaction, hpgsql will throw an exception
-- on the next statement. This happens because hpgsql would change semantics if
-- it aborted the COPY statement - because it would abort the entire transaction -,
-- and it couldn't "complete" the COPY either due to the risk of not all rows
-- having been inserted.
withCopy_ :: HPgConnection -> Query -> IO a -> IO Int64
withCopy_ :: forall a. HPgConnection -> Query -> IO a -> IO Int64
withCopy_ HPgConnection
conn Query
copyQ IO a
copyFn = (Int64, a) -> Int64
forall a b. (a, b) -> a
fst ((Int64, a) -> Int64) -> IO (Int64, a) -> IO Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> HPgConnection -> Query -> IO a -> IO (Int64, a)
forall a. HPgConnection -> Query -> IO a -> IO (Int64, a)
withCopy HPgConnection
conn Query
copyQ IO a
copyFn

-- | Runs a COPY FROM STDIN statement, giving you the ability to pass a
-- row-inserting function.
--
-- > withCopy conn "COPY employee FROM STDIN WITH (FORMAT CSV)" $ do
-- >    putCopyData conn "5,Dracula\n"
-- >    putCopyData conn "6,The Grinch\n"
--
-- You can also use `copyFromS` for binary COPY.
--
-- Note on interruption safety: if this is interrupted by an asynchronous
-- exception while running inside a transaction, hpgsql will throw an exception
-- on the next statement. This happens because hpgsql would change semantics if
-- it aborted the COPY statement - because it would abort the entire transaction -,
-- and it couldn't "complete" the COPY either due to the risk of not all rows
-- having been inserted.
withCopy :: HPgConnection -> Query -> IO a -> IO (Int64, a)
withCopy :: forall a. HPgConnection -> Query -> IO a -> IO (Int64, a)
withCopy HPgConnection
conn Query
copyQ IO a
copyFn = HPgConnection
-> Query -> (QueryId -> IO (Int64, a)) -> IO (Int64, a)
forall a.
HPgConnection
-> Query -> (QueryId -> IO (Int64, a)) -> IO (Int64, a)
withCopyInternal HPgConnection
conn Query
copyQ ((QueryId -> IO (Int64, a)) -> IO (Int64, a))
-> (QueryId -> IO (Int64, a)) -> IO (Int64, a)
forall a b. (a -> b) -> a -> b
$ \QueryId
qryId -> do
  ret <- IO a
copyFn
  count <- copyEndInternal conn qryId
  pure (count, ret)

withCopyInternal :: HPgConnection -> Query -> (QueryId -> IO (Int64, a)) -> IO (Int64, a)
withCopyInternal :: forall a.
HPgConnection
-> Query -> (QueryId -> IO (Int64, a)) -> IO (Int64, a)
withCopyInternal HPgConnection
conn (NonEmpty SingleQuery -> ([SingleQuery], SingleQuery)
forall a. NonEmpty a -> ([a], a)
lastAndInitNE (NonEmpty SingleQuery -> ([SingleQuery], SingleQuery))
-> (Query -> NonEmpty SingleQuery)
-> Query
-> ([SingleQuery], SingleQuery)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Query -> NonEmpty SingleQuery
breakQueryIntoStatements -> ([SingleQuery]
firstQueries, SingleQuery {[EncodingContext -> (Maybe Oid, BinaryField)]
Maybe [Char]
ByteString
queryString :: SingleQuery -> ByteString
preparedStmtHash :: SingleQuery -> Maybe [Char]
queryString :: ByteString
queryParams :: [EncodingContext -> (Maybe Oid, BinaryField)]
preparedStmtHash :: Maybe [Char]
queryParams :: SingleQuery -> [EncodingContext -> (Maybe Oid, BinaryField)]
..})) QueryId -> IO (Int64, a)
copyFn = do
  -- We error here if the Query is more than just "COPY .." because if we
  -- ran the other statements in a separate pipeline, they would
  -- run in a different implicit transaction, which could be unexpected.
  [SingleQuery] -> (NonEmpty SingleQuery -> IO (ZonkAny 0)) -> IO ()
forall a b. [a] -> (NonEmpty a -> IO b) -> IO ()
whenNonEmpty [SingleQuery]
firstQueries ((NonEmpty SingleQuery -> IO (ZonkAny 0)) -> IO ())
-> (NonEmpty SingleQuery -> IO (ZonkAny 0)) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO (ZonkAny 0) -> NonEmpty SingleQuery -> IO (ZonkAny 0)
forall a b. a -> b -> a
const (IO (ZonkAny 0) -> NonEmpty SingleQuery -> IO (ZonkAny 0))
-> IO (ZonkAny 0) -> NonEmpty SingleQuery -> IO (ZonkAny 0)
forall a b. (a -> b) -> a -> b
$ Text -> IO (ZonkAny 0)
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Query for COPY must not have other SQL statements"
  thisThreadId <- IO WeakThreadId
getMyWeakThreadId
  sendPipeline
    conn
    ((queryString, CopyQuery StillCopying, Nothing) :| [])
    ( \InternalConnectionState
_st EncodingContext
encCtx ->
        let paramOidsAndValues :: [(Maybe Oid, BinaryField)]
paramOidsAndValues = ((EncodingContext -> (Maybe Oid, BinaryField))
 -> (Maybe Oid, BinaryField))
-> [EncodingContext -> (Maybe Oid, BinaryField)]
-> [(Maybe Oid, BinaryField)]
forall a b. (a -> b) -> [a] -> [b]
map ((EncodingContext -> (Maybe Oid, BinaryField))
-> EncodingContext -> (Maybe Oid, BinaryField)
forall a b. (a -> b) -> a -> b
$ EncodingContext
encCtx) [EncodingContext -> (Maybe Oid, BinaryField)]
queryParams
         in [ Parse -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage (Parse -> SomeMessage) -> Parse -> SomeMessage
forall a b. (a -> b) -> a -> b
$ ByteString -> [Maybe Oid] -> Maybe [Char] -> Parse
Parse ByteString
queryString (((Maybe Oid, BinaryField) -> Maybe Oid)
-> [(Maybe Oid, BinaryField)] -> [Maybe Oid]
forall a b. (a -> b) -> [a] -> [b]
map (Maybe Oid, BinaryField) -> Maybe Oid
forall a b. (a, b) -> a
fst [(Maybe Oid, BinaryField)]
paramOidsAndValues) Maybe [Char]
forall a. Maybe a
Nothing,
              Bind -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage (Bind -> SomeMessage) -> Bind -> SomeMessage
forall a b. (a -> b) -> a -> b
$ Bind {paramsValuesInOrder :: [BinaryField]
paramsValuesInOrder = ((Maybe Oid, BinaryField) -> BinaryField)
-> [(Maybe Oid, BinaryField)] -> [BinaryField]
forall a b. (a -> b) -> [a] -> [b]
map (Maybe Oid, BinaryField) -> BinaryField
forall a b. (a, b) -> b
snd [(Maybe Oid, BinaryField)]
paramOidsAndValues, resultColumnFmts :: Int
resultColumnFmts = Int
0, preparedStmtHash :: Maybe [Char]
preparedStmtHash = Maybe [Char]
forall a. Maybe a
Nothing},
              -- We don't send Msgs.Describe because we expect CopyInResponse in place of NoData
              Execute -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage Execute
Execute,
              Flush -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage Flush
Msgs.Flush -- This might not be necessary for COPY, but possibly useful if the user calls this with not-a-COPY statement so we get errors earlier?
            ]
    )
    (pure ())
    $ \(QueryId
qryId :| [QueryId]
_) -> do
      IO (Maybe ResponseMsg, ResponseMsgsReceived) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ResponseMsg, ResponseMsgsReceived) -> IO ())
-> IO (Maybe ResponseMsg, ResponseMsgsReceived) -> IO ()
forall a b. (a -> b) -> a -> b
$ WeakThreadId
-> HPgConnection
-> QueryId
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveOutstandingResponseMsgsAtomically WeakThreadId
thisThreadId HPgConnection
conn QueryId
qryId
      IO (Maybe ResponseMsg, ResponseMsgsReceived) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ResponseMsg, ResponseMsgsReceived) -> IO ())
-> IO (Maybe ResponseMsg, ResponseMsgsReceived) -> IO ()
forall a b. (a -> b) -> a -> b
$ WeakThreadId
-> HPgConnection
-> QueryId
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveOutstandingResponseMsgsAtomically WeakThreadId
thisThreadId HPgConnection
conn QueryId
qryId
      IO (Maybe ResponseMsg, ResponseMsgsReceived) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ResponseMsg, ResponseMsgsReceived) -> IO ())
-> IO (Maybe ResponseMsg, ResponseMsgsReceived) -> IO ()
forall a b. (a -> b) -> a -> b
$ WeakThreadId
-> HPgConnection
-> QueryId
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveOutstandingResponseMsgsAtomically WeakThreadId
thisThreadId HPgConnection
conn QueryId
qryId
      QueryId -> IO (Int64, a)
copyFn QueryId
qryId

-- | Copies rows into a table with the binary COPY protocol.
-- This must be a "COPY table FROM STDIN WITH (FORMAT BINARY)"-like statement.
-- Returns the Stream's result and the count of inserted rows.
--
-- > let rows :: Stream (Of Employee) IO ()
-- >     rows = someStreamOfEmployees
-- > copyFromS conn "COPY employee FROM STDIN WITH (FORMAT BINARY)" rows
--
-- Note on interruption safety: if this is interrupted by an asynchronous
-- exception while running inside a transaction, hpgsql will throw an exception
-- on the next statement. This happens because hpgsql would change semantics if
-- it aborted the COPY statement - because it would abort the entire transaction -,
-- and it couldn't "complete" the COPY either due to the risk of not all rows
-- having been inserted.
copyFromS :: forall r b. (ToPgRow r) => HPgConnection -> Query -> Stream (Of r) IO b -> IO (Int64, b)
copyFromS :: forall r b.
ToPgRow r =>
HPgConnection -> Query -> Stream (Of r) IO b -> IO (Int64, b)
copyFromS HPgConnection
conn Query
copyQ Stream (Of r) IO b
allRows =
  HPgConnection
-> Query -> (QueryId -> IO (Int64, b)) -> IO (Int64, b)
forall a.
HPgConnection
-> Query -> (QueryId -> IO (Int64, a)) -> IO (Int64, a)
withCopyInternal HPgConnection
conn Query
copyQ ((QueryId -> IO (Int64, b)) -> IO (Int64, b))
-> (QueryId -> IO (Int64, b)) -> IO (Int64, b)
forall a b. (a -> b) -> a -> b
$ \QueryId
qryId -> do
    -- Take the controlMsgsLock to flush all buffers
    HPgConnection
-> (TVar InternalConnectionState -> STM ())
-> (TVar InternalConnectionState -> STM ())
-> (() -> IO ())
-> IO ()
forall a b c.
HPgConnection
-> (TVar InternalConnectionState -> STM a)
-> (TVar InternalConnectionState -> STM b)
-> (a -> IO c)
-> IO c
withControlMsgsLock HPgConnection
conn (STM () -> TVar InternalConnectionState -> STM ()
forall a b. a -> b -> a
const (STM () -> TVar InternalConnectionState -> STM ())
-> STM () -> TVar InternalConnectionState -> STM ()
forall a b. (a -> b) -> a -> b
$ () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (STM () -> TVar InternalConnectionState -> STM ()
forall a b. a -> b -> a
const (STM () -> TVar InternalConnectionState -> STM ())
-> STM () -> TVar InternalConnectionState -> STM ()
forall a b. (a -> b) -> a -> b
$ () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (IO () -> () -> IO ()
forall a b. a -> b -> a
const (IO () -> () -> IO ()) -> IO () -> () -> IO ()
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
    encCtx <- MVar EncodingContext -> IO EncodingContext
forall a. MVar a -> IO a
readMVar HPgConnection
conn.encodingContext
    let !toBinaryRow = RowEncoder r
forall a. ToPgRow a => RowEncoder a
rowEncoder.toBinaryCopyBytes EncodingContext
encCtx
        numColsBs = Int16 -> Builder
Builder.int16BE (Int16 -> Builder) -> Int16 -> Builder
forall a b. (a -> b) -> a -> b
$ Int -> Int16
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int16) -> Int -> Int16
forall a b. (a -> b) -> a -> b
$ [EncodingContext -> Maybe Oid] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ([EncodingContext -> Maybe Oid] -> Int)
-> [EncodingContext -> Maybe Oid] -> Int
forall a b. (a -> b) -> a -> b
$ RowEncoder r
forall a. ToPgRow a => RowEncoder a
rowEncoder.toTypeOids (forall t. Proxy t
forall {k} (t :: k). Proxy t
Proxy @r)
    rethrowAsIrrecoverable $ nonAtomicSendMsg conn $ CopyData $ Builder.byteString "PGCOPY\n\xff\r\n\0" <> Builder.int32BE 0 <> Builder.int32BE 0
    eHasRows <- S.inspect allRows
    ret <- case eHasRows of
      Left b
ret -> b -> IO b
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
ret
      Right (r
firstRow :> Stream (Of r) IO b
otherRows) -> do
        let byteStream :: Stream (Of Builder.Builder) IO b
            byteStream :: Stream (Of Builder) IO b
byteStream = (r -> Builder) -> Stream (Of r) IO b -> Stream (Of Builder) IO b
forall (m :: * -> *) a b r.
Monad m =>
(a -> b) -> Stream (Of a) m r -> Stream (Of b) m r
S.map (\r
row -> Builder
numColsBs Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> r -> Builder
toBinaryRow r
row) (r -> Stream (Of r) IO b -> Stream (Of r) IO b
forall (m :: * -> *) a r.
Monad m =>
a -> Stream (Of a) m r -> Stream (Of a) m r
S.cons r
firstRow Stream (Of r) IO b
otherRows)
            chunkedByteStream :: Stream (Of Builder.Builder) IO b
            chunkedByteStream :: Stream (Of Builder) IO b
chunkedByteStream = Int32 -> Stream (Of Builder) IO b -> Stream (Of Builder) IO b
forall (m :: * -> *) r.
Monad m =>
Int32 -> Stream (Of Builder) m r -> Stream (Of Builder) m r
chunkBuildersBySize Int32
16384 Stream (Of Builder) IO b
byteStream
        -- Using strict bytestrings reduces allocations a tiny little bit.. not sure why, but maybe
        -- fusion rules?
        (Builder -> IO ()) -> Stream (Of Builder) IO b -> IO b
forall (m :: * -> *) a x r.
Monad m =>
(a -> m x) -> Stream (Of a) m r -> m r
S.mapM_
          ( \Builder
bs -> do
              IO () -> IO ()
forall a. IO a -> IO a
rethrowAsIrrecoverable (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Socket -> ByteString -> IO ()
SocketBS.sendAll HPgConnection
conn.socket (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ Builder -> ByteString
Builder.toStrictByteString (Builder -> ByteString) -> Builder -> ByteString
forall a b. (a -> b) -> a -> b
$ CopyData -> Builder
forall a. ToPgMessage a => a -> Builder
toPgMessage (CopyData -> Builder) -> CopyData -> Builder
forall a b. (a -> b) -> a -> b
$ Builder -> CopyData
CopyData Builder
bs
          )
          Stream (Of Builder) IO b
chunkedByteStream
    rethrowAsIrrecoverable $ nonAtomicSendMsg conn $ CopyData $ Builder.int16BE (-1)
    count <- copyEndInternal conn qryId
    pure (count, ret)

-- | Copies rows into a table with the binary COPY protocol.
-- This must be a "COPY table FROM STDIN WITH (FORMAT BINARY)"-like statement.
-- Returns the Stream's result and the count of inserted rows.
--
-- > let rows :: [Employee]
-- >     rows = someListOfEmployees
-- > copyFrom conn "COPY employee FROM STDIN WITH (FORMAT BINARY)" rows
--
-- Note on interruption safety: if this is interrupted by an asynchronous
-- exception while running inside a transaction, hpgsql will throw an exception
-- on the next statement. This happens because hpgsql would change semantics if
-- it aborted the COPY statement - because it would abort the entire transaction -,
-- and it couldn't "complete" the COPY either due to the risk of not all rows
-- having been inserted.
copyFrom :: (ToPgRow r) => HPgConnection -> Query -> [r] -> IO Int64
copyFrom :: forall r. ToPgRow r => HPgConnection -> Query -> [r] -> IO Int64
copyFrom HPgConnection
conn Query
copyQ [r]
rows = (Int64, ()) -> Int64
forall a b. (a, b) -> a
fst ((Int64, ()) -> Int64) -> IO (Int64, ()) -> IO Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> HPgConnection -> Query -> Stream (Of r) IO () -> IO (Int64, ())
forall r b.
ToPgRow r =>
HPgConnection -> Query -> Stream (Of r) IO b -> IO (Int64, b)
copyFromS HPgConnection
conn Query
copyQ ([r] -> Stream (Of r) IO ()
forall (m :: * -> *) (f :: * -> *) a.
(Monad m, Foldable f) =>
f a -> Stream (Of a) m ()
S.each [r]
rows)

-- | Accumulates builders until their combined length reaches or exceeds the
-- given size in bytes, then yields the accumulated builder and starts over.
chunkBuildersBySize :: (Monad m) => Int32 -> Stream (Of Builder.Builder) m r -> Stream (Of Builder.Builder) m r
chunkBuildersBySize :: forall (m :: * -> *) r.
Monad m =>
Int32 -> Stream (Of Builder) m r -> Stream (Of Builder) m r
chunkBuildersBySize Int32
maxSize = Builder -> Stream (Of Builder) m r -> Stream (Of Builder) m r
go Builder
forall a. Monoid a => a
mempty
  where
    go :: Builder -> Stream (Of Builder) m r -> Stream (Of Builder) m r
go !Builder
acc !Stream (Of Builder) m r
stream = do
      e <- m (Either r (Builder, Stream (Of Builder) m r))
-> Stream
     (Of Builder) m (Either r (Builder, Stream (Of Builder) m r))
forall (m :: * -> *) a. Monad m => m a -> Stream (Of Builder) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
S.lift (m (Either r (Builder, Stream (Of Builder) m r))
 -> Stream
      (Of Builder) m (Either r (Builder, Stream (Of Builder) m r)))
-> m (Either r (Builder, Stream (Of Builder) m r))
-> Stream
     (Of Builder) m (Either r (Builder, Stream (Of Builder) m r))
forall a b. (a -> b) -> a -> b
$ Stream (Of Builder) m r
-> m (Either r (Builder, Stream (Of Builder) m r))
forall (m :: * -> *) a r.
Monad m =>
Stream (Of a) m r -> m (Either r (a, Stream (Of a) m r))
S.next Stream (Of Builder) m r
stream
      case e of
        Left r
r -> do
          Bool -> Stream (Of Builder) m () -> Stream (Of Builder) m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Builder -> Int32
Builder.builderLength Builder
acc Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
> Int32
0) (Stream (Of Builder) m () -> Stream (Of Builder) m ())
-> Stream (Of Builder) m () -> Stream (Of Builder) m ()
forall a b. (a -> b) -> a -> b
$
            Builder -> Stream (Of Builder) m ()
forall (m :: * -> *) a. Monad m => a -> Stream (Of a) m ()
S.yield Builder
acc
          r -> Stream (Of Builder) m r
forall a. a -> Stream (Of Builder) m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure r
r
        Right (Builder
b, Stream (Of Builder) m r
rest) ->
          let acc' :: Builder
acc' = Builder
acc Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Builder
b
           in if Builder -> Int32
Builder.builderLength Builder
acc' Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
>= Int32
maxSize
                then Builder -> Stream (Of Builder) m ()
forall (m :: * -> *) a. Monad m => a -> Stream (Of a) m ()
S.yield Builder
acc' Stream (Of Builder) m ()
-> Stream (Of Builder) m r -> Stream (Of Builder) m r
forall a b.
Stream (Of Builder) m a
-> Stream (Of Builder) m b -> Stream (Of Builder) m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Builder -> Stream (Of Builder) m r -> Stream (Of Builder) m r
go Builder
forall a. Monoid a => a
mempty Stream (Of Builder) m r
rest
                else Builder -> Stream (Of Builder) m r -> Stream (Of Builder) m r
go Builder
acc' Stream (Of Builder) m r
rest

copyStart :: HPgConnection -> Query -> IO ()
copyStart :: HPgConnection -> Query -> IO ()
copyStart HPgConnection
conn Query
copyQry = (Int64, ()) -> ()
forall a b. (a, b) -> b
snd ((Int64, ()) -> ()) -> IO (Int64, ()) -> IO ()
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> HPgConnection
-> Query -> (QueryId -> IO (Int64, ())) -> IO (Int64, ())
forall a.
HPgConnection
-> Query -> (QueryId -> IO (Int64, a)) -> IO (Int64, a)
withCopyInternal HPgConnection
conn Query
copyQry (\QueryId
_qryId -> (Int64, ()) -> IO (Int64, ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int64
0, ()))

putCopyData :: HPgConnection -> ByteString -> IO ()
putCopyData :: HPgConnection -> ByteString -> IO ()
putCopyData HPgConnection
conn ByteString
t = HPgConnection -> CopyData -> IO ()
forall msg.
(ToPgMessage msg, Show msg) =>
HPgConnection -> msg -> IO ()
nonAtomicSendMsg HPgConnection
conn (CopyData -> IO ()) -> CopyData -> IO ()
forall a b. (a -> b) -> a -> b
$ Builder -> CopyData
CopyData (ByteString -> Builder
Builder.byteString ByteString
t)

putCopyError :: HPgConnection -> String -> IO ()
putCopyError :: HPgConnection -> [Char] -> IO ()
putCopyError HPgConnection
conn [Char]
causeForFailure = HPgConnection
-> (TVar InternalConnectionState -> STM ())
-> (TVar InternalConnectionState -> STM ())
-> (() -> IO ())
-> IO ()
forall a b c.
HPgConnection
-> (TVar InternalConnectionState -> STM a)
-> (TVar InternalConnectionState -> STM b)
-> (a -> IO c)
-> IO c
withControlMsgsLock HPgConnection
conn (STM () -> TVar InternalConnectionState -> STM ()
forall a b. a -> b -> a
const (STM () -> TVar InternalConnectionState -> STM ())
-> STM () -> TVar InternalConnectionState -> STM ()
forall a b. (a -> b) -> a -> b
$ () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (STM () -> TVar InternalConnectionState -> STM ()
forall a b. a -> b -> a
const (STM () -> TVar InternalConnectionState -> STM ())
-> STM () -> TVar InternalConnectionState -> STM ()
forall a b. (a -> b) -> a -> b
$ () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) ((() -> IO ()) -> IO ()) -> (() -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> () -> IO ()
forall a b. a -> b -> a
const (IO () -> () -> IO ()) -> IO () -> () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  mCopyState <-
    STM (Maybe CopyQueryState) -> IO (Maybe CopyQueryState)
forall a. STM a -> IO a
STM.atomically (STM (Maybe CopyQueryState) -> IO (Maybe CopyQueryState))
-> STM (Maybe CopyQueryState) -> IO (Maybe CopyQueryState)
forall a b. (a -> b) -> a -> b
$
      TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar (HPgConnection -> TVar InternalConnectionState
internalConnectionState HPgConnection
conn) STM InternalConnectionState
-> (InternalConnectionState -> STM (Maybe CopyQueryState))
-> STM (Maybe CopyQueryState)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \InternalConnectionState
st -> Maybe CopyQueryState -> STM (Maybe CopyQueryState)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe CopyQueryState -> STM (Maybe CopyQueryState))
-> Maybe CopyQueryState -> STM (Maybe CopyQueryState)
forall a b. (a -> b) -> a -> b
$ case InternalConnectionState -> [QueryState]
currentPipeline InternalConnectionState
st of
        [QueryState {queryProtocol :: QueryState -> QueryProtocol
queryProtocol = CopyQuery CopyQueryState
copyState}] -> CopyQueryState -> Maybe CopyQueryState
forall a. a -> Maybe a
Just CopyQueryState
copyState
        [QueryState]
_ -> Maybe CopyQueryState
forall a. Maybe a
Nothing
  case mCopyState of
    Just CopyQueryState
StillCopying -> do
      let markCopyFailSent :: STM ()
markCopyFailSent = do
            let sttv :: TVar InternalConnectionState
sttv = HPgConnection -> TVar InternalConnectionState
internalConnectionState HPgConnection
conn
            st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
sttv
            case currentPipeline st of
              [qs :: QueryState
qs@QueryState {queryProtocol :: QueryState -> QueryProtocol
queryProtocol = CopyQuery CopyQueryState
StillCopying}] -> TVar InternalConnectionState -> InternalConnectionState -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar (HPgConnection -> TVar InternalConnectionState
internalConnectionState HPgConnection
conn) (InternalConnectionState -> STM ())
-> InternalConnectionState -> STM ()
forall a b. (a -> b) -> a -> b
$ InternalConnectionState
st {currentPipeline = [qs {queryProtocol = CopyQuery CopyFailAndSyncSent}]}
              [QueryState]
_ -> Text -> STM ()
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Impossible: when marking CopyFail state was invalid"
      HPgConnection -> ([SomeMessage], STM ()) -> IO ()
atomicallySendControlMsgs_ HPgConnection
conn ([CopyFail -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage (CopyFail -> SomeMessage) -> CopyFail -> SomeMessage
forall a b. (a -> b) -> a -> b
$ [Char] -> CopyFail
Msgs.CopyFail [Char]
causeForFailure, Sync -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage Sync
Sync], STM ()
markCopyFailSent)
    Just CopyQueryState
copyState ->
      Text -> IO ()
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Current COPY command is not in a cancellable state. Its current state is " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
Text.pack (CopyQueryState -> [Char]
forall a. Show a => a -> [Char]
show CopyQueryState
copyState)
    Maybe CopyQueryState
Nothing ->
      Text -> IO ()
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"There is no active COPY command to cancel"

copyEnd :: HPgConnection -> IO Int64
copyEnd :: HPgConnection -> IO Int64
copyEnd HPgConnection
conn = do
  thisThreadId <- IO WeakThreadId
getMyWeakThreadId
  qryId <- STM.atomically $ do
    st <- STM.readTVar conn.internalConnectionState
    case st.currentPipeline of
      [] -> ByteString -> Text -> STM QueryId
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
"Ending a COPY statement" Text
"No active COPY statement when running copyEnd"
      [QueryState
qs] -> case QueryState
qs.queryProtocol of
        QueryProtocol
ExtendedQuery -> ByteString -> Text -> STM QueryId
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
"Ending a COPY statement" Text
"No active COPY statement when running copyEnd, but rather there was a regular query"
        CopyQuery CopyQueryState
StillCopying -> if QueryState
qs.queryOwner WeakThreadId -> WeakThreadId -> Bool
forall a. Eq a => a -> a -> Bool
== WeakThreadId
thisThreadId then QueryId -> STM QueryId
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure QueryState
qs.queryIdentifier else ByteString -> Text -> STM QueryId
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
"Ending a COPY statement" Text
"Active COPY statement was issued by a different thread, and Hpgsql does not support multiple threads running/ending the same COPY statement"
        CopyQuery CopyQueryState
CopyDoneAndSyncSent -> ByteString -> Text -> STM QueryId
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
"Ending a COPY statement" Text
"Active COPY statement was already finished"
        CopyQuery CopyQueryState
CopyFailAndSyncSent -> ByteString -> Text -> STM QueryId
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
"Ending a COPY statement" Text
"Active COPY statement had previously failed"
      [QueryState]
_ -> ByteString -> Text -> STM QueryId
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
"Ending a COPY statement" Text
"Active pipeline with other statements running when a copyEnd was attempted"

  copyEndInternal conn qryId

copyEndInternal :: HPgConnection -> QueryId -> IO Int64
copyEndInternal :: HPgConnection -> QueryId -> IO Int64
copyEndInternal HPgConnection
conn QueryId
qryId = do
  HPgConnection -> ([SomeMessage], STM ()) -> IO ()
atomicallySendControlMsgs_
    HPgConnection
conn
    ( [CopyDone -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage CopyDone
CopyDone, Sync -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage Sync
Sync],
      do
        -- When CopyDone+Sync are sent, we must update the connection state
        let sttv :: TVar InternalConnectionState
sttv = HPgConnection -> TVar InternalConnectionState
internalConnectionState HPgConnection
conn
        st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
sttv
        case currentPipeline st of
          [q :: QueryState
q@QueryState {queryProtocol :: QueryState -> QueryProtocol
queryProtocol = CopyQuery CopyQueryState
StillCopying}] -> TVar InternalConnectionState -> InternalConnectionState -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar InternalConnectionState
sttv (InternalConnectionState -> STM ())
-> InternalConnectionState -> STM ()
forall a b. (a -> b) -> a -> b
$ InternalConnectionState
st {currentPipeline = [q {queryProtocol = CopyQuery CopyDoneAndSyncSent}]}
          [QueryState]
_ -> Text -> STM ()
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"putCopyEnd called but there was no active COPY statement"
    )
  qText <- HPgConnection -> QueryId -> IO ByteString
lookupQueryText HPgConnection
conn QueryId
qryId
  (_, stream) <- consumeResults conn qryId
  res <- S.effects stream
  case res of
    Left ErrorResponse
err -> ByteString -> ErrorResponse -> IO Int64
forall a. ByteString -> ErrorResponse -> IO a
throwPostgresError ByteString
qText ErrorResponse
err
    Right (CommandComplete Int64
n) -> Int64 -> IO Int64
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
n

-- | A simpler version of `atomicallySendControlMsgs`.
atomicallySendControlMsgs_ :: HPgConnection -> ([SomeMessage], STM ()) -> IO ()
atomicallySendControlMsgs_ :: HPgConnection -> ([SomeMessage], STM ()) -> IO ()
atomicallySendControlMsgs_ HPgConnection
conn ([SomeMessage]
msgs, STM ()
stateUpdate) = HPgConnection
-> (TVar InternalConnectionState -> STM ())
-> (TVar InternalConnectionState -> STM ())
-> (() -> ((), [SomeMessage], STM ()))
-> IO ()
forall a b.
HPgConnection
-> (TVar InternalConnectionState -> STM a)
-> (TVar InternalConnectionState -> STM ())
-> (a -> (b, [SomeMessage], STM ()))
-> IO b
atomicallySendControlMsgs HPgConnection
conn (STM () -> TVar InternalConnectionState -> STM ()
forall a b. a -> b -> a
const (STM () -> TVar InternalConnectionState -> STM ())
-> STM () -> TVar InternalConnectionState -> STM ()
forall a b. (a -> b) -> a -> b
$ () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (STM () -> TVar InternalConnectionState -> STM ()
forall a b. a -> b -> a
const (STM () -> TVar InternalConnectionState -> STM ())
-> STM () -> TVar InternalConnectionState -> STM ()
forall a b. (a -> b) -> a -> b
$ () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (((), [SomeMessage], STM ()) -> () -> ((), [SomeMessage], STM ())
forall a b. a -> b -> a
const ((), [SomeMessage]
msgs, STM ()
stateUpdate))

data SomeMessage = forall msg. (ToPgMessage msg) => SomeMessage msg

instance ToPgMessage SomeMessage where
  toPgMessage :: SomeMessage -> Builder
toPgMessage (SomeMessage msg
msg) = msg -> Builder
forall a. ToPgMessage a => a -> Builder
toPgMessage msg
msg

-- | Sends messages by running the supplied function in a way that is
-- semantically equivalent to being atomic, and guarantees the success
-- STM transaction will be applied when the messages were sent, even
-- if this thread is killed.
-- This is an essential message-sending primitive that guarantees that
-- if you send a series of messages, that the internal connection
-- state will be updated once they're sent, and other callers calling this
-- will wait for previous messages to be sent, so they see the same state
-- they would under normal/error-free operation.
atomicallySendControlMsgs :: HPgConnection -> (TVar InternalConnectionState -> STM a) -> (TVar InternalConnectionState -> STM ()) -> (a -> (b, [SomeMessage], STM ())) -> IO b
atomicallySendControlMsgs :: forall a b.
HPgConnection
-> (TVar InternalConnectionState -> STM a)
-> (TVar InternalConnectionState -> STM ())
-> (a -> (b, [SomeMessage], STM ()))
-> IO b
atomicallySendControlMsgs HPgConnection
conn TVar InternalConnectionState -> STM a
acquire TVar InternalConnectionState -> STM ()
release a -> (b, [SomeMessage], STM ())
f = do
  HPgConnection
-> (TVar InternalConnectionState -> STM a)
-> (TVar InternalConnectionState -> STM ())
-> (a -> IO b)
-> IO b
forall a b c.
HPgConnection
-> (TVar InternalConnectionState -> STM a)
-> (TVar InternalConnectionState -> STM b)
-> (a -> IO c)
-> IO c
withControlMsgsLock
    HPgConnection
conn
    TVar InternalConnectionState -> STM a
acquire
    TVar InternalConnectionState -> STM ()
release
    ((a -> IO b) -> IO b) -> (a -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \a
acqValue ->
      let (!b
ret, ![SomeMessage]
msgs, !STM ()
afterSentTxn) = a -> (b, [SomeMessage], STM ())
f a
acqValue
       in MVar [(ByteString, STM ())]
-> ([(ByteString, STM ())] -> IO ([(ByteString, STM ())], b))
-> IO b
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (HPgConnection -> MVar [(ByteString, STM ())]
sendBuffer HPgConnection
conn) (([(ByteString, STM ())] -> IO ([(ByteString, STM ())], b))
 -> IO b)
-> ([(ByteString, STM ())] -> IO ([(ByteString, STM ())], b))
-> IO b
forall a b. (a -> b) -> a -> b
$ \[(ByteString, STM ())]
msgsInBuffer ->
            -- Use a DList for appends? Probably not worth it since there are so few control messages.
            ([(ByteString, STM ())], b) -> IO ([(ByteString, STM ())], b)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([(ByteString, STM ())]
msgsInBuffer [(ByteString, STM ())]
-> [(ByteString, STM ())] -> [(ByteString, STM ())]
forall a. [a] -> [a] -> [a]
++ [(Builder -> ByteString
Builder.toLazyByteString ([Builder] -> Builder
forall a. Monoid a => [a] -> a
mconcat ([Builder] -> Builder) -> [Builder] -> Builder
forall a b. (a -> b) -> a -> b
$ (SomeMessage -> Builder) -> [SomeMessage] -> [Builder]
forall a b. (a -> b) -> [a] -> [b]
map SomeMessage -> Builder
forall a. ToPgMessage a => a -> Builder
toPgMessage [SomeMessage]
msgs), STM ()
afterSentTxn)], b
ret)

-- | This function is thread-and-interruption-safe, so you
-- can run it with the same connection in parallel to any other functions.
getNotificationNonBlocking :: HPgConnection -> IO (Maybe NotificationResponse)
getNotificationNonBlocking :: HPgConnection -> IO (Maybe NotificationResponse)
getNotificationNonBlocking HPgConnection
conn = STM (Maybe NotificationResponse) -> IO (Maybe NotificationResponse)
forall a. STM a -> IO a
STM.atomically (STM (Maybe NotificationResponse)
 -> IO (Maybe NotificationResponse))
-> STM (Maybe NotificationResponse)
-> IO (Maybe NotificationResponse)
forall a b. (a -> b) -> a -> b
$ HPgConnection
-> (TVar InternalConnectionState
    -> STM (Maybe NotificationResponse))
-> STM (Maybe NotificationResponse)
forall a.
HPgConnection -> (TVar InternalConnectionState -> STM a) -> STM a
updateConnStateTxn HPgConnection
conn ((TVar InternalConnectionState -> STM (Maybe NotificationResponse))
 -> STM (Maybe NotificationResponse))
-> (TVar InternalConnectionState
    -> STM (Maybe NotificationResponse))
-> STM (Maybe NotificationResponse)
forall a b. (a -> b) -> a -> b
$ \TVar InternalConnectionState
sttv -> do
  st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
sttv
  STM.tryReadTQueue (notificationsReceived st)

-- | Blocks until a new Notification arrives. This function is both thread-safe and
-- interruption-safe, so you can run it with the same connection in parallel to any
-- other functions.
getNotification :: HPgConnection -> IO NotificationResponse
getNotification :: HPgConnection -> IO NotificationResponse
getNotification HPgConnection
conn =
  -- This implementation blocks concurrency in some cases, but should be optimised
  -- for the most common case of no concurrency, and should be correct.
  IO NotificationResponse -> IO NotificationResponse
getNonBlockingOr (IO NotificationResponse -> IO NotificationResponse)
-> IO NotificationResponse -> IO NotificationResponse
forall a b. (a -> b) -> a -> b
$
    HPgConnection
-> STM ()
-> (() -> IO NotificationResponse)
-> IO NotificationResponse
forall a b. HPgConnection -> STM a -> (a -> IO b) -> IO b
waitUntilPipelineIsReadyForNewQuery HPgConnection
conn (() -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) ((() -> IO NotificationResponse) -> IO NotificationResponse)
-> (() -> IO NotificationResponse) -> IO NotificationResponse
forall a b. (a -> b) -> a -> b
$ \() ->
      -- Another query might have filled our internal notification queue while we were
      -- draining, so check that first.
      IO NotificationResponse -> IO NotificationResponse
getNonBlockingOr (IO NotificationResponse -> IO NotificationResponse)
-> IO NotificationResponse -> IO NotificationResponse
forall a b. (a -> b) -> a -> b
$
        HPgConnection
-> PgMsgParser NotificationResponse
-> (NotificationResponse -> IO NotificationResponse)
-> IO NotificationResponse
forall a b.
Show a =>
HPgConnection -> PgMsgParser a -> (a -> IO b) -> IO b
receiveNextMsgWithMaskedContinuation HPgConnection
conn (forall a. FromPgMessage a => PgMsgParser a
msgParser @NotificationResponse) NotificationResponse -> IO NotificationResponse
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
  where
    getNonBlockingOr :: IO NotificationResponse -> IO NotificationResponse
getNonBlockingOr IO NotificationResponse
f = do
      mNotifFromQueue <- HPgConnection -> IO (Maybe NotificationResponse)
getNotificationNonBlocking HPgConnection
conn
      case mNotifFromQueue of
        Just NotificationResponse
notif -> NotificationResponse -> IO NotificationResponse
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NotificationResponse
notif
        Maybe NotificationResponse
Nothing -> IO NotificationResponse
f

-- | Non-atomic because if it's interrupted, it can leave the socket in a state
-- where only part of the message's bytes were pushed across the userspace<->kernel
-- boundary, leaving the connection in a ruined state.
-- TODO: Maybe we should mark the connection as broken on exception?
nonAtomicSendMsg :: (ToPgMessage msg, Show msg) => HPgConnection -> msg -> IO ()
nonAtomicSendMsg :: forall msg.
(ToPgMessage msg, Show msg) =>
HPgConnection -> msg -> IO ()
nonAtomicSendMsg HPgConnection {Socket
socket :: HPgConnection -> Socket
socket :: Socket
socket} msg
msg = do
  Socket -> ByteString -> IO ()
SocketLBS.sendAll Socket
socket (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ Builder -> ByteString
Builder.toLazyByteString (Builder -> ByteString) -> Builder -> ByteString
forall a b. (a -> b) -> a -> b
$ msg -> Builder
forall a. ToPgMessage a => a -> Builder
toPgMessage msg
msg
  [Char] -> IO ()
debugPrint ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Sent " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ msg -> [Char]
forall a. Show a => a -> [Char]
show msg
msg

-- | Wraps an IO action to rethrow any exception as a IrrecoverableHpgsqlError.
rethrowAsIrrecoverable :: IO a -> IO a
rethrowAsIrrecoverable :: forall a. IO a -> IO a
rethrowAsIrrecoverable = (SomeException -> IO a) -> IO a -> IO a
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
(e -> m a) -> m a -> m a
handle (IrrecoverableHpgsqlError -> IO a
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throw (IrrecoverableHpgsqlError -> IO a)
-> (SomeException -> IrrecoverableHpgsqlError)
-> SomeException
-> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> IrrecoverableHpgsqlError
asIrrec)
  where
    asIrrec :: SomeException -> IrrecoverableHpgsqlError
asIrrec (SomeException
ex :: SomeException) = IrrecoverableHpgsqlError {hpgsqlDetails :: Text
hpgsqlDetails = Text
"An inner exception was thrown", innerException :: Maybe SomeException
innerException = SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
ex, relatedStatement :: Maybe ByteString
relatedStatement = Maybe ByteString
forall a. Maybe a
Nothing}

{-# NOINLINE _globalDebugLock #-}
_globalDebugLock :: MVar Bool
_globalDebugLock :: MVar Bool
_globalDebugLock = IO (MVar Bool) -> MVar Bool
forall a. IO a -> a
unsafePerformIO (IO (MVar Bool) -> MVar Bool) -> IO (MVar Bool) -> MVar Bool
forall a b. (a -> b) -> a -> b
$ Bool -> IO (MVar Bool)
forall a. a -> IO (MVar a)
newMVar Bool
True

debugPrint :: String -> IO ()
debugPrint :: [Char] -> IO ()
debugPrint [Char]
_ = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

-- debugPrint str = modifyMVar_ _globalDebugLock $ \p -> when p (putStrLn str) >> pure p

{-# INLINE timeDebugNonBlockingOperation #-}
timeDebugNonBlockingOperation :: String -> IO a -> IO a
timeDebugNonBlockingOperation :: forall a. [Char] -> IO a -> IO a
timeDebugNonBlockingOperation [Char]
_ IO a
f = IO a
f

-- timeDebugNonBlockingOperation opName f = do
--   t1 <- getMonotonicTime
--   ret <- f
--   t2 <- getMonotonicTime
--   when (t2 - t1 > 0.01) $ putStrLn $ opName ++ " took more than 10ms: " ++ show (t2 - t1)
--   pure ret

-- Note [Polling Weak ThreadId instead of finalizers]
-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-- When a connection is shared across multiple threads, and one such thread
-- is interrupted by an asynchronous exception after sending a query to
-- postgres but before consuming its results, the other thread will still
-- be blocked in sending its query until it can know the first thread has died.
--
-- One way to implement this is by adding finalizers on `ThreadId` that would
-- then STM.writeTVar and set `queryOwner = Nothing`, so good old `STM.retry`
-- would make blocked threads retry promptly when that happens.
--
-- However, it is possible to `addFinalizer`, but not to `removeFinalizer`.
-- And that means every new sent query adds a new finalizer, and these accumulate
-- (leak).
-- There are strategies to only register a finalizer once, but that requires
-- keeping state that grows with the number of threads a connection is picked up by.
--
-- I did implement the strategy with finalizers at some point, but thought
-- it wasn't worth the risk, and then switched to polling `deRefWeak` instead.
--
-- One other interesting thing is that I've seen GHC's runtime take a while to
-- collect ThreadIds, so forcing a garbage collection in between waits helps
-- it realise more promptly.
-- This isn't done for users of the library; we do it only in our tests, because
-- we hope real world applications allocate memory a lot more than our tests, so
-- they trigger collections more often. Or one can hope.