module Hpgsql.Internal
(
connect,
connectOpts,
defaultConnectOpts,
withConnection,
withConnectionOpts,
closeGracefully,
closeForcefully,
query,
queryWith,
queryMWith,
queryS,
querySWith,
querySMWith,
query1,
query1With,
queryMay,
queryMayWith,
execute,
execute_,
Pipeline,
runPipeline,
pipelineS,
pipelineSWith,
pipelineSMWith,
pipeline,
pipelineWith,
pipelineExec,
pipelineExec_,
pipeline1,
pipeline1With,
pipelineMay,
pipelineMayWith,
copyStart,
copyEnd,
putCopyData,
withCopy,
withCopy_,
copyFrom,
copyFromS,
resetConnectionState,
transactionStatus,
getNotification,
getNotificationNonBlocking,
refreshTypeInfoCache,
resetTypeInfoCache,
getParameterStatus,
getBackendPid,
cancelActiveStatement,
connectionReadyForNewPipeline,
fullTransactionStatus,
receiveNextMsgWithMaskedContinuation,
receiveNextMsgWithMaskedContinuationButDontThrowOnParsingFailure,
sendCancellationRequest,
isLastInPipeline,
updateConnStateTxn,
withControlMsgsLock,
receiveOutstandingResponseMsgsAtomically,
consumeResults,
consumeResultsIgnoreRows,
consumeStreamingResults,
WhichRowDecoder (..),
mkPostgresError,
throwPostgresError,
throwIrrecoverableErrorWithStatement,
lookupQueryText,
sendPipeline,
waitUntilPipelineIsReadyForNewQuery,
acquireOwnershipOfOrphanedQueries,
nonAtomicSendMsg,
rethrowAsIrrecoverable,
atomicallySendControlMsgs,
atomicallySendControlMsgs_,
SomeMessage (..),
runPipelineInternal,
InternalConnectOrCancelRequest (..),
internalConnectOrCancel,
debugPrint,
_globalDebugLock,
timeDebugNonBlockingOperation,
whenNotClosed,
chunkBuildersBySize,
pipelineExecInternal,
pipelineM,
withCopyInternal,
copyEndInternal,
putCopyError,
)
where
import Control.Applicative (Alternative (..))
import Control.Concurrent (modifyMVar, modifyMVar_, readMVar)
import Control.Concurrent.MVar (MVar, newMVar)
import Control.Concurrent.STM (STM, TVar)
import qualified Control.Concurrent.STM as STM
import Control.Exception.Safe (MonadThrow, SomeException, bracket, bracketOnError, finally, handle, mask, mask_, onException, throw, toException, tryJust)
import Control.Monad (forM, forM_, join, unless, void, when)
import Data.ByteString (ByteString)
import Data.ByteString.Internal (w2c)
import qualified Data.ByteString.Lazy as LBS
import Data.Data (Proxy (..))
import Data.Either (isLeft, isRight)
import Data.Int (Int32, Int64)
import qualified Data.List as List
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as NE
import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe, isNothing, mapMaybe)
import qualified Data.Serialize as Cereal
import qualified Data.Set as Set
import Data.Text (Text)
import qualified Data.Text as Text
import Data.Text.Encoding (decodeUtf8)
import qualified Data.Text.IO as Text
import Data.Time (DiffTime, diffTimeToPicoseconds, secondsToDiffTime)
import GHC.Conc (ThreadStatus (..), threadStatus)
import Hpgsql.Base
import qualified Hpgsql.Builder as Builder
import Hpgsql.Encoding (FieldInfo (..), FromPgRow (..), RowDecoder (..), RowEncoder (..), ToPgRow (..))
import Hpgsql.Encoding.RowDecoderMonadic (ConversionState (..), RowDecoderMonadic (..))
import Hpgsql.InternalTypes (BindComplete (..), CommandComplete (..), ConnectOpts (..), ConnectionString (..), CopyInResponse (..), CopyQueryState (..), DataRow (..), Either3 (..), EncodingContext (..), ErrorDetail (..), ErrorResponse (..), HPgConnection (..), InternalConnectionState (..), IrrecoverableHpgsqlError (..), NoData (..), NotificationResponse (..), ParseComplete (..), Pipeline (..), PostgresError (..), Query (..), QueryId (..), QueryProtocol (..), QueryState (..), ReadyForQuery (..), ResetConnectionOpts (..), ResponseMsg (..), ResponseMsgsReceived (..), RowDescription (..), SingleQuery (..), TransactionStatus (..), WeakThreadId (..), mkMutex, queryToByteString, throwIrrecoverableError)
import Hpgsql.Locking (getMyWeakThreadId, withMutex)
import Hpgsql.Msgs (AuthenticationMethod (..), AuthenticationResponse (..), BackendKeyData (..), Bind (..), CancelRequest (..), CopyData (..), CopyDone (..), Describe (..), Execute (..), FromPgMessage (..), NoticeResponse (..), ParameterStatus (..), Parse (..), PasswordMessage (..), PgMsgParser (..), StartupMessage (..), Sync (..), Terminate (..), ToPgMessage (..), parsePgMessage)
import qualified Hpgsql.Msgs as Msgs
import Hpgsql.Networking (recvNonBlocking, sendNonBlocking, socketWaitRead, socketWaitWrite)
import Hpgsql.Query (breakQueryIntoStatements)
import qualified Hpgsql.SimpleParser as Parser
import Hpgsql.TypeInfo (ArrayTypeDetails (..), TypeDetails (..), TypeInfo (..), buildTypeInfoCache, builtinPgTypesMap)
import Network.Socket (AddrInfo (..))
import qualified Network.Socket as Socket
import qualified Network.Socket.ByteString as SocketBS
import qualified Network.Socket.ByteString.Lazy as SocketLBS
import Streaming (Of (..), Stream)
import qualified Streaming as S
import qualified Streaming.Internal as SInternal
import qualified Streaming.Prelude as S
import System.IO (stderr)
import System.IO.Error (isResourceVanishedError)
import System.IO.Unsafe (unsafePerformIO)
import System.Mem.Weak (deRefWeak)
import System.Timeout (timeout)
connectionReadyForNewPipeline :: InternalConnectionState -> Either (NonEmpty QueryState) TransactionStatus
connectionReadyForNewPipeline :: InternalConnectionState
-> Either (NonEmpty QueryState) TransactionStatus
connectionReadyForNewPipeline (InternalConnectionState -> [QueryState]
currentPipeline -> [QueryState]
pip) =
case [QueryState]
pip of
[] -> TransactionStatus -> Either (NonEmpty QueryState) TransactionStatus
forall a b. b -> Either a b
Right TransactionStatus
TransIdle
(QueryState
q1 : [QueryState]
qs) -> case [TransactionStatus] -> Maybe TransactionStatus
forall a. [a] -> Maybe a
headMaybe ([TransactionStatus] -> Maybe TransactionStatus)
-> [TransactionStatus] -> Maybe TransactionStatus
forall a b. (a -> b) -> a -> b
$
(QueryState -> Maybe TransactionStatus)
-> [QueryState] -> [TransactionStatus]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe
( \QueryState
qstate -> case QueryState -> ResponseMsgsReceived
responseMsgsState QueryState
qstate of
ReadyForQueryReceived Either ErrorResponse CommandComplete
_ (ReadyForQuery TransactionStatus
s) -> TransactionStatus -> Maybe TransactionStatus
forall a. a -> Maybe a
Just TransactionStatus
s
ResponseMsgsReceived
_ -> Maybe TransactionStatus
forall a. Maybe a
Nothing
)
[QueryState]
pip of
Maybe TransactionStatus
Nothing -> NonEmpty QueryState
-> Either (NonEmpty QueryState) TransactionStatus
forall a b. a -> Either a b
Left (QueryState
q1 QueryState -> [QueryState] -> NonEmpty QueryState
forall a. a -> [a] -> NonEmpty a
:| [QueryState]
qs)
Just TransactionStatus
st -> TransactionStatus -> Either (NonEmpty QueryState) TransactionStatus
forall a b. b -> Either a b
Right TransactionStatus
st
fullTransactionStatus :: TVar InternalConnectionState -> STM (TransactionStatus, TransactionStatus)
fullTransactionStatus :: TVar InternalConnectionState
-> STM (TransactionStatus, TransactionStatus)
fullTransactionStatus TVar InternalConnectionState
sttv = do
st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
sttv
(st.transactionStatusBeforeCurrentPipeline,) <$> case connectionReadyForNewPipeline st of
Right TransactionStatus
s -> TransactionStatus -> STM TransactionStatus
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TransactionStatus
s
Left NonEmpty QueryState
pip ->
TransactionStatus -> STM TransactionStatus
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TransactionStatus -> STM TransactionStatus)
-> TransactionStatus -> STM TransactionStatus
forall a b. (a -> b) -> a -> b
$
if (QueryState -> Bool) -> NonEmpty QueryState -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any
( ( \case
ErrorResponseReceived Maybe (Either3 NoData RowDescription CopyInResponse)
_ ErrorResponse
_ -> Bool
True
ResponseMsgsReceived
_ -> Bool
False
)
(ResponseMsgsReceived -> Bool)
-> (QueryState -> ResponseMsgsReceived) -> QueryState -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QueryState -> ResponseMsgsReceived
responseMsgsState
)
NonEmpty QueryState
pip
then
TransactionStatus
TransInError
else TransactionStatus
TransActive
transactionStatus :: HPgConnection -> IO TransactionStatus
transactionStatus :: HPgConnection -> IO TransactionStatus
transactionStatus HPgConnection
conn = (TransactionStatus, TransactionStatus) -> TransactionStatus
forall a b. (a, b) -> b
snd ((TransactionStatus, TransactionStatus) -> TransactionStatus)
-> IO (TransactionStatus, TransactionStatus)
-> IO TransactionStatus
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (TransactionStatus, TransactionStatus)
-> IO (TransactionStatus, TransactionStatus)
forall a. STM a -> IO a
STM.atomically (TVar InternalConnectionState
-> STM (TransactionStatus, TransactionStatus)
fullTransactionStatus HPgConnection
conn.internalConnectionState)
connect :: ConnectionString -> DiffTime -> IO HPgConnection
connect :: ConnectionString -> DiffTime -> IO HPgConnection
connect =
ConnectOpts -> ConnectionString -> DiffTime -> IO HPgConnection
connectOpts ConnectOpts
defaultConnectOpts
connectOpts :: ConnectOpts -> ConnectionString -> DiffTime -> IO HPgConnection
connectOpts :: ConnectOpts -> ConnectionString -> DiffTime -> IO HPgConnection
connectOpts =
InternalConnectOrCancelRequest HPgConnection
-> ConnectOpts -> ConnectionString -> DiffTime -> IO HPgConnection
forall a.
InternalConnectOrCancelRequest a
-> ConnectOpts -> ConnectionString -> DiffTime -> IO a
internalConnectOrCancel
InternalConnectOrCancelRequest HPgConnection
Connect
defaultConnectOpts :: ConnectOpts
defaultConnectOpts :: ConnectOpts
defaultConnectOpts =
ConnectOpts
{ killedThreadPollIntervalMs :: Int
killedThreadPollIntervalMs = Int
500,
cancellationRequestResendIntervalMs :: Int
cancellationRequestResendIntervalMs = Int
500,
fillTypeInfoCache :: Bool
fillTypeInfoCache = Bool
True
}
data InternalConnectOrCancelRequest a where
Connect :: InternalConnectOrCancelRequest HPgConnection
CancelNotConnect :: CancelRequest -> AddrInfo -> InternalConnectOrCancelRequest ()
internalConnectOrCancel :: InternalConnectOrCancelRequest a -> ConnectOpts -> ConnectionString -> DiffTime -> IO a
internalConnectOrCancel :: forall a.
InternalConnectOrCancelRequest a
-> ConnectOpts -> ConnectionString -> DiffTime -> IO a
internalConnectOrCancel InternalConnectOrCancelRequest a
connectOrCancel ConnectOpts
connOpts originalConnStr :: ConnectionString
originalConnStr@ConnectionString {Word16
Text
hostname :: Text
port :: Word16
user :: Text
password :: Text
database :: Text
options :: Text
options :: ConnectionString -> Text
database :: ConnectionString -> Text
password :: ConnectionString -> Text
user :: ConnectionString -> Text
port :: ConnectionString -> Word16
hostname :: ConnectionString -> Text
..} DiffTime
conntimeout = do
sockOrTimeout <- Int -> IO (Socket, AddrInfo) -> IO (Maybe (Socket, AddrInfo))
forall a. Int -> IO a -> IO (Maybe a)
timeout (Integer -> Int
forall a. Num a => Integer -> a
fromInteger (Integer -> Int) -> Integer -> Int
forall a b. (a -> b) -> a -> b
$ DiffTime -> Integer
diffTimeToPicoseconds DiffTime
conntimeout Integer -> Integer -> Integer
forall a. Integral a => a -> a -> a
`div` Integer
1_000_000) IO (Socket, AddrInfo)
getConnectedSocket
case sockOrTimeout of
Maybe (Socket, AddrInfo)
Nothing -> Text -> IO a
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Could not connect in the supplied timeout"
Just (Socket
sock, AddrInfo
addrInfo) -> (IO a -> IO () -> IO a) -> IO () -> IO a -> IO a
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO a -> IO () -> IO a
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
onException (Socket -> IO ()
Socket.close Socket
sock) (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ do
Socket -> (CInt -> IO Bool) -> IO Bool
forall r. Socket -> (CInt -> IO r) -> IO r
Socket.withFdSocket Socket
sock CInt -> IO Bool
Socket.getNonBlock IO Bool -> (Bool -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Bool
False -> Text -> IO ()
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Socket is not marked as non-blocking, which is not supported by hpgsql. You might be running on an unsupported platform"
Bool
True -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
socketIsClosed <- Bool -> IO (MVar Bool)
forall a. a -> IO (MVar a)
newMVar Bool
False
recvBuffer <- newMVar mempty
sendBuffer <- newMVar mempty
socketMutex <- mkMutex
encodingContext <- newMVar (EncodingContext builtinPgTypesMap)
connParams <- newMVar mempty
notifQueue <- STM.newTQueueIO
currentConnectionState <-
STM.newTVarIO $
InternalConnectionState
{ totalQueriesSent = 0,
currentPipeline = [],
notificationsReceived = notifQueue,
mustIssueRollbackBeforeNextCommand = False,
preparedStatementNames = mempty,
transactionStatusBeforeCurrentPipeline = TransIdle
}
let hpgConnPartialDoNotReturn = Socket
-> MVar Bool
-> MVar ByteString
-> MVar [(ByteString, STM ())]
-> Mutex
-> ConnectionString
-> AddrInfo
-> MVar EncodingContext
-> MVar (Map Text Text)
-> TVar InternalConnectionState
-> Int32
-> Int32
-> ConnectOpts
-> HPgConnection
HPgConnection Socket
sock MVar Bool
socketIsClosed MVar ByteString
recvBuffer MVar [(ByteString, STM ())]
sendBuffer Mutex
socketMutex ConnectionString
originalConnStr AddrInfo
addrInfo MVar EncodingContext
encodingContext MVar (Map Text Text)
connParams TVar InternalConnectionState
currentConnectionState Int32
0 Int32
0 ConnectOpts
connOpts
case connectOrCancel of
CancelNotConnect CancelRequest
cancelRequest AddrInfo
_ -> do
HPgConnection -> CancelRequest -> IO ()
forall msg.
(ToPgMessage msg, Show msg) =>
HPgConnection -> msg -> IO ()
nonAtomicSendMsg HPgConnection
hpgConnPartialDoNotReturn CancelRequest
cancelRequest
IO (Either () ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Either () ()) -> IO ()) -> IO (Either () ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ (IOError -> Maybe ()) -> IO () -> IO (Either () ())
forall (m :: * -> *) e b a.
(HasCallStack, MonadCatch m, Exception e) =>
(e -> Maybe b) -> m a -> m (Either b a)
tryJust (\IOError
err -> if IOError -> Bool
isResourceVanishedError IOError
err then () -> Maybe ()
forall a. a -> Maybe a
Just () else Maybe ()
forall a. Maybe a
Nothing) (IO () -> IO (Either () ())) -> IO () -> IO (Either () ())
forall a b. (a -> b) -> a -> b
$ Socket -> IO ()
socketWaitRead (HPgConnection -> Socket
socket HPgConnection
hpgConnPartialDoNotReturn)
Socket -> IO ()
Socket.close Socket
sock
InternalConnectOrCancelRequest a
Connect -> do
[Char] -> IO ()
debugPrint [Char]
"Sending startup message"
HPgConnection -> StartupMessage -> IO ()
forall msg.
(ToPgMessage msg, Show msg) =>
HPgConnection -> msg -> IO ()
nonAtomicSendMsg HPgConnection
hpgConnPartialDoNotReturn (StartupMessage -> IO ()) -> StartupMessage -> IO ()
forall a b. (a -> b) -> a -> b
$ StartupMessage {user :: [Char]
user = Text -> [Char]
Text.unpack Text
user, database :: [Char]
database = Text -> [Char]
Text.unpack Text
database, options :: [Char]
options = [Char]
"-c client_encoding=UTF8 " [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> Text -> [Char]
Text.unpack Text
options}
AuthenticationResponse authMethod <- HPgConnection
-> PgMsgParser AuthenticationResponse
-> (Either (Char, Maybe PostgresError) AuthenticationResponse
-> IO AuthenticationResponse)
-> IO AuthenticationResponse
forall a b.
Show a =>
HPgConnection
-> PgMsgParser a
-> (Either (Char, Maybe PostgresError) a -> IO b)
-> IO b
receiveNextMsgWithMaskedContinuationButDontThrowOnParsingFailure HPgConnection
hpgConnPartialDoNotReturn (forall a. FromPgMessage a => PgMsgParser a
msgParser @Msgs.AuthenticationResponse) ((Either (Char, Maybe PostgresError) AuthenticationResponse
-> IO AuthenticationResponse)
-> IO AuthenticationResponse)
-> (Either (Char, Maybe PostgresError) AuthenticationResponse
-> IO AuthenticationResponse)
-> IO AuthenticationResponse
forall a b. (a -> b) -> a -> b
$ \case
Right AuthenticationResponse
knownAuthMsg -> AuthenticationResponse -> IO AuthenticationResponse
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure AuthenticationResponse
knownAuthMsg
Left (Char, Maybe PostgresError)
unknownAuthMsg -> Text -> IO AuthenticationResponse
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError (Text -> IO AuthenticationResponse)
-> Text -> IO AuthenticationResponse
forall a b. (a -> b) -> a -> b
$ Text
"Received unknown authentication message from PostgreSQL. This is probably an authentication method unsupported by hpgsql. More details about the message: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
Text.pack ((Char, Maybe PostgresError) -> [Char]
forall a. Show a => a -> [Char]
show (Char, Maybe PostgresError)
unknownAuthMsg)
case authMethod of
AuthenticationMethod
AuthOk -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
AuthenticationMethod
AuthCleartextPassword -> do
HPgConnection -> PasswordMessage -> IO ()
forall msg.
(ToPgMessage msg, Show msg) =>
HPgConnection -> msg -> IO ()
nonAtomicSendMsg HPgConnection
hpgConnPartialDoNotReturn (PasswordMessage -> IO ()) -> PasswordMessage -> IO ()
forall a b. (a -> b) -> a -> b
$ AuthenticationMethod -> [Char] -> [Char] -> PasswordMessage
PasswordMessage AuthenticationMethod
authMethod (Text -> [Char]
Text.unpack Text
user) (Text -> [Char]
Text.unpack Text
password)
HPgConnection -> IO ()
receiveAuthOkOrThrow HPgConnection
hpgConnPartialDoNotReturn
AuthMD5Password ByteString
_ -> do
HPgConnection -> PasswordMessage -> IO ()
forall msg.
(ToPgMessage msg, Show msg) =>
HPgConnection -> msg -> IO ()
nonAtomicSendMsg HPgConnection
hpgConnPartialDoNotReturn (PasswordMessage -> IO ()) -> PasswordMessage -> IO ()
forall a b. (a -> b) -> a -> b
$ AuthenticationMethod -> [Char] -> [Char] -> PasswordMessage
PasswordMessage AuthenticationMethod
authMethod (Text -> [Char]
Text.unpack Text
user) (Text -> [Char]
Text.unpack Text
password)
HPgConnection -> IO ()
receiveAuthOkOrThrow HPgConnection
hpgConnPartialDoNotReturn
AuthenticationMethod
_ -> Text -> IO ()
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Hpgsql does not yet support authenticating with method " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
Text.pack (AuthenticationMethod -> [Char]
forall a. Show a => a -> [Char]
show AuthenticationMethod
authMethod)
errorOrBackendKeyData <- receiveNextMsgUnsafe hpgConnPartialDoNotReturn $ Right <$> msgParser @BackendKeyData <|> Left <$> msgParser @ErrorResponse
case errorOrBackendKeyData of
Left ErrorResponse
errResp -> IrrecoverableHpgsqlError -> IO a
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throw (IrrecoverableHpgsqlError -> IO a)
-> IrrecoverableHpgsqlError -> IO a
forall a b. (a -> b) -> a -> b
$ IrrecoverableHpgsqlError {hpgsqlDetails :: Text
hpgsqlDetails = Text
"Socket connected but postgresql threw an error during connection startup handshake", innerException :: Maybe SomeException
innerException = SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just (SomeException -> Maybe SomeException)
-> SomeException -> Maybe SomeException
forall a b. (a -> b) -> a -> b
$ PostgresError -> SomeException
forall e. Exception e => e -> SomeException
toException (PostgresError -> SomeException) -> PostgresError -> SomeException
forall a b. (a -> b) -> a -> b
$ ByteString -> ErrorResponse -> PostgresError
mkPostgresError ByteString
"" ErrorResponse
errResp, relatedStatement :: Maybe ByteString
relatedStatement = Maybe ByteString
forall a. Maybe a
Nothing}
Right BackendKeyData
backendKeyData -> do
readyForQueryOrError <- HPgConnection
-> PgMsgParser (Either ErrorResponse ReadyForQuery)
-> IO (Either ErrorResponse ReadyForQuery)
forall a. Show a => HPgConnection -> PgMsgParser a -> IO a
receiveNextMsgUnsafe HPgConnection
hpgConnPartialDoNotReturn (PgMsgParser (Either ErrorResponse ReadyForQuery)
-> IO (Either ErrorResponse ReadyForQuery))
-> PgMsgParser (Either ErrorResponse ReadyForQuery)
-> IO (Either ErrorResponse ReadyForQuery)
forall a b. (a -> b) -> a -> b
$ ReadyForQuery -> Either ErrorResponse ReadyForQuery
forall a b. b -> Either a b
Right (ReadyForQuery -> Either ErrorResponse ReadyForQuery)
-> PgMsgParser ReadyForQuery
-> PgMsgParser (Either ErrorResponse ReadyForQuery)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @ReadyForQuery PgMsgParser (Either ErrorResponse ReadyForQuery)
-> PgMsgParser (Either ErrorResponse ReadyForQuery)
-> PgMsgParser (Either ErrorResponse ReadyForQuery)
forall a. PgMsgParser a -> PgMsgParser a -> PgMsgParser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ErrorResponse -> Either ErrorResponse ReadyForQuery
forall a b. a -> Either a b
Left (ErrorResponse -> Either ErrorResponse ReadyForQuery)
-> PgMsgParser ErrorResponse
-> PgMsgParser (Either ErrorResponse ReadyForQuery)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @ErrorResponse
case readyForQueryOrError of
Left ErrorResponse
errResp -> IrrecoverableHpgsqlError -> IO ()
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throw (IrrecoverableHpgsqlError -> IO ())
-> IrrecoverableHpgsqlError -> IO ()
forall a b. (a -> b) -> a -> b
$ IrrecoverableHpgsqlError {hpgsqlDetails :: Text
hpgsqlDetails = Text
"A postgresql error happened while connecting", innerException :: Maybe SomeException
innerException = SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just (SomeException -> Maybe SomeException)
-> SomeException -> Maybe SomeException
forall a b. (a -> b) -> a -> b
$ PostgresError -> SomeException
forall e. Exception e => e -> SomeException
toException (PostgresError -> SomeException) -> PostgresError -> SomeException
forall a b. (a -> b) -> a -> b
$ ByteString -> ErrorResponse -> PostgresError
mkPostgresError ByteString
"" ErrorResponse
errResp, relatedStatement :: Maybe ByteString
relatedStatement = Maybe ByteString
forall a. Maybe a
Nothing}
Right ReadyForQuery {} -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
debugPrint $ "Connected with backend PID " ++ show (backendPid backendKeyData)
let finalConn = HPgConnection
hpgConnPartialDoNotReturn {connPid = backendPid backendKeyData, cancelSecretKey = backendSecretKey backendKeyData}
when (fillTypeInfoCache connOpts) $ join $ runPipeline finalConn $ refreshTypeInfoCache finalConn
pure finalConn
where
receiveNextMsgUnsafe :: (Show a) => HPgConnection -> PgMsgParser a -> IO a
receiveNextMsgUnsafe :: forall a. Show a => HPgConnection -> PgMsgParser a -> IO a
receiveNextMsgUnsafe HPgConnection
conn PgMsgParser a
parser = HPgConnection -> PgMsgParser a -> (a -> IO a) -> IO a
forall a b.
Show a =>
HPgConnection -> PgMsgParser a -> (a -> IO b) -> IO b
receiveNextMsgWithMaskedContinuation HPgConnection
conn PgMsgParser a
parser a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
receiveAuthOkOrThrow :: HPgConnection -> IO ()
receiveAuthOkOrThrow :: HPgConnection -> IO ()
receiveAuthOkOrThrow HPgConnection
conn = do
authMsg <- HPgConnection
-> PgMsgParser (Either ErrorResponse AuthenticationResponse)
-> IO (Either ErrorResponse AuthenticationResponse)
forall a. Show a => HPgConnection -> PgMsgParser a -> IO a
receiveNextMsgUnsafe HPgConnection
conn (AuthenticationResponse
-> Either ErrorResponse AuthenticationResponse
forall a b. b -> Either a b
Right (AuthenticationResponse
-> Either ErrorResponse AuthenticationResponse)
-> PgMsgParser AuthenticationResponse
-> PgMsgParser (Either ErrorResponse AuthenticationResponse)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @AuthenticationResponse PgMsgParser (Either ErrorResponse AuthenticationResponse)
-> PgMsgParser (Either ErrorResponse AuthenticationResponse)
-> PgMsgParser (Either ErrorResponse AuthenticationResponse)
forall a. PgMsgParser a -> PgMsgParser a -> PgMsgParser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ErrorResponse -> Either ErrorResponse AuthenticationResponse
forall a b. a -> Either a b
Left (ErrorResponse -> Either ErrorResponse AuthenticationResponse)
-> PgMsgParser ErrorResponse
-> PgMsgParser (Either ErrorResponse AuthenticationResponse)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @ErrorResponse)
case authMsg of
Left ErrorResponse
errResp -> IrrecoverableHpgsqlError -> IO ()
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throw (IrrecoverableHpgsqlError -> IO ())
-> IrrecoverableHpgsqlError -> IO ()
forall a b. (a -> b) -> a -> b
$ IrrecoverableHpgsqlError {hpgsqlDetails :: Text
hpgsqlDetails = Text
"A postgresql error happened while connecting", innerException :: Maybe SomeException
innerException = SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just (SomeException -> Maybe SomeException)
-> SomeException -> Maybe SomeException
forall a b. (a -> b) -> a -> b
$ PostgresError -> SomeException
forall e. Exception e => e -> SomeException
toException (PostgresError -> SomeException) -> PostgresError -> SomeException
forall a b. (a -> b) -> a -> b
$ ByteString -> ErrorResponse -> PostgresError
mkPostgresError ByteString
"" ErrorResponse
errResp, relatedStatement :: Maybe ByteString
relatedStatement = Maybe ByteString
forall a. Maybe a
Nothing}
Right (AuthenticationResponse AuthenticationMethod
authMethod) ->
case AuthenticationMethod
authMethod of
AuthenticationMethod
AuthOk -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
AuthenticationMethod
_ -> Text -> IO ()
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Failed to authenticate user."
getConnectedSocket :: IO (Socket, AddrInfo)
getConnectedSocket = do
addrInfo <- case InternalConnectOrCancelRequest a
connectOrCancel of
CancelNotConnect CancelRequest
_ AddrInfo
addrInfo -> AddrInfo -> IO AddrInfo
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure AddrInfo
addrInfo
InternalConnectOrCancelRequest a
Connect ->
if Text
"/" Text -> Text -> Bool
`Text.isInfixOf` Text
hostname
then
AddrInfo -> IO AddrInfo
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
AddrInfo
{ addrFlags :: [AddrInfoFlag]
addrFlags = [],
addrFamily :: Family
addrFamily = Family
Socket.AF_UNIX,
addrSocketType :: SocketType
addrSocketType = SocketType
Socket.Stream,
addrProtocol :: CInt
addrProtocol = CInt
Socket.defaultProtocol,
addrAddress :: SockAddr
addrAddress = [Char] -> SockAddr
Socket.SockAddrUnix ([Char] -> SockAddr) -> [Char] -> SockAddr
forall a b. (a -> b) -> a -> b
$ Text -> [Char]
Text.unpack ((Char -> Bool) -> Text -> Text
Text.dropWhileEnd (Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
'/') Text
hostname) [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"/.s.PGSQL." [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Word16 -> [Char]
forall a. Show a => a -> [Char]
show Word16
port,
addrCanonName :: Maybe [Char]
addrCanonName = Maybe [Char]
forall a. Maybe a
Nothing
}
else do
addrInfos <- Maybe AddrInfo -> Maybe [Char] -> Maybe [Char] -> IO [AddrInfo]
forall (t :: * -> *).
GetAddrInfo t =>
Maybe AddrInfo -> Maybe [Char] -> Maybe [Char] -> IO (t AddrInfo)
Socket.getAddrInfo (AddrInfo -> Maybe AddrInfo
forall a. a -> Maybe a
Just AddrInfo
Socket.defaultHints) ([Char] -> Maybe [Char]
forall a. a -> Maybe a
Just ([Char] -> Maybe [Char]) -> [Char] -> Maybe [Char]
forall a b. (a -> b) -> a -> b
$ Text -> [Char]
Text.unpack Text
hostname) ([Char] -> Maybe [Char]
forall a. a -> Maybe a
Just ([Char] -> Maybe [Char]) -> [Char] -> Maybe [Char]
forall a b. (a -> b) -> a -> b
$ Word16 -> [Char]
forall a. Show a => a -> [Char]
show Word16
port)
case addrInfos of
[] -> Text -> IO AddrInfo
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Could not resolve address"
AddrInfo
addrInfo : [AddrInfo]
_ -> AddrInfo -> IO AddrInfo
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure AddrInfo
addrInfo
sock <- Socket.openSocket addrInfo
flip onException (Socket.close sock) $ do
Socket.connect sock (Socket.addrAddress addrInfo)
pure (sock, addrInfo)
refreshTypeInfoCache :: HPgConnection -> Pipeline (IO ())
refreshTypeInfoCache :: HPgConnection -> Pipeline (IO ())
refreshTypeInfoCache HPgConnection
conn =
let fetchPipeline :: Pipeline (IO [(Oid, Text, Oid, Oid, Char, Char)])
fetchPipeline = Query -> Pipeline (IO [(Oid, Text, Oid, Oid, Char, Char)])
forall a. FromPgRow a => Query -> Pipeline (IO [a])
pipeline Query
"select oid, typname, typarray, typelem, typcategory, typtype from pg_catalog.pg_type WHERE oid >= 16384"
in IO [(Oid, Text, Oid, Oid, Char, Char)] -> IO ()
fillTypeInfoCache (IO [(Oid, Text, Oid, Oid, Char, Char)] -> IO ())
-> Pipeline (IO [(Oid, Text, Oid, Oid, Char, Char)])
-> Pipeline (IO ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Pipeline (IO [(Oid, Text, Oid, Oid, Char, Char)])
fetchPipeline
where
fillTypeInfoCache :: IO [(Oid, Text, Oid, Oid, Char, Char)] -> IO ()
fillTypeInfoCache IO [(Oid, Text, Oid, Oid, Char, Char)]
queryResultsIO = do
queryResults <- IO [(Oid, Text, Oid, Oid, Char, Char)]
queryResultsIO
let customTypes = [TypeInfo] -> TypeInfoCache
buildTypeInfoCache ([TypeInfo] -> TypeInfoCache) -> [TypeInfo] -> TypeInfoCache
forall a b. (a -> b) -> a -> b
$ ((Oid, Text, Oid, Oid, Char, Char) -> TypeInfo)
-> [(Oid, Text, Oid, Oid, Char, Char)] -> [TypeInfo]
forall a b. (a -> b) -> [a] -> [b]
map (\(Oid
oid, Text
typname, Oid
typarray, Oid
typelem, Char
typcategory, Char
typtype) -> Oid -> Text -> Maybe Oid -> TypeDetails -> TypeInfo
TypeInfo Oid
oid Text
typname (if Oid
typarray Oid -> Oid -> Bool
forall a. Eq a => a -> a -> Bool
== Oid
0 then Maybe Oid
forall a. Maybe a
Nothing else Oid -> Maybe Oid
forall a. a -> Maybe a
Just Oid
typarray) (Oid -> Char -> Char -> TypeDetails
toTypeDetails Oid
typelem Char
typcategory Char
typtype)) [(Oid, Text, Oid, Oid, Char, Char)]
queryResults
modifyMVar_ conn.encodingContext $ \EncodingContext
_ -> EncodingContext -> IO EncodingContext
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (EncodingContext -> IO EncodingContext)
-> EncodingContext -> IO EncodingContext
forall a b. (a -> b) -> a -> b
$ TypeInfoCache -> EncodingContext
EncodingContext (TypeInfoCache -> EncodingContext)
-> TypeInfoCache -> EncodingContext
forall a b. (a -> b) -> a -> b
$ TypeInfoCache
customTypes TypeInfoCache -> TypeInfoCache -> TypeInfoCache
forall a. Semigroup a => a -> a -> a
<> TypeInfoCache
builtinPgTypesMap
toTypeDetails :: Oid -> Char -> Char -> TypeDetails
toTypeDetails Oid
typelem Char
typcategory Char
typtype = case (Char
typcategory, Char
typtype) of
(Char
'A', Char
_) -> ArrayTypeDetails -> TypeDetails
ArrayType (Oid -> ArrayTypeDetails
ArrayTypeDetails Oid
typelem)
(Char
_, Char
'c') -> TypeDetails
CompositeType
(Char
_, Char
'd') -> TypeDetails
DomainType
(Char
_, Char
'e') -> TypeDetails
EnumType
(Char
_, Char
'p') -> TypeDetails
PseudoType
(Char
_, Char
'r') -> TypeDetails
RangeType
(Char
_, Char
'm') -> TypeDetails
MultiRangeType
(Char, Char)
_ -> TypeDetails
BasicType
resetTypeInfoCache :: HPgConnection -> IO ()
resetTypeInfoCache :: HPgConnection -> IO ()
resetTypeInfoCache HPgConnection
conn = MVar EncodingContext
-> (EncodingContext -> IO EncodingContext) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ HPgConnection
conn.encodingContext ((EncodingContext -> IO EncodingContext) -> IO ())
-> (EncodingContext -> IO EncodingContext) -> IO ()
forall a b. (a -> b) -> a -> b
$ \EncodingContext
_ -> EncodingContext -> IO EncodingContext
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (EncodingContext -> IO EncodingContext)
-> EncodingContext -> IO EncodingContext
forall a b. (a -> b) -> a -> b
$ TypeInfoCache -> EncodingContext
EncodingContext TypeInfoCache
builtinPgTypesMap
getParameterStatus :: HPgConnection -> Text -> IO (Maybe Text)
getParameterStatus :: HPgConnection -> Text -> IO (Maybe Text)
getParameterStatus HPgConnection {MVar (Map Text Text)
parameterStatusMap :: MVar (Map Text Text)
parameterStatusMap :: HPgConnection -> MVar (Map Text Text)
parameterStatusMap} Text
paramName = Text -> Map Text Text -> Maybe Text
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Text
paramName (Map Text Text -> Maybe Text)
-> IO (Map Text Text) -> IO (Maybe Text)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MVar (Map Text Text) -> IO (Map Text Text)
forall a. MVar a -> IO a
readMVar MVar (Map Text Text)
parameterStatusMap
getBackendPid :: HPgConnection -> Int32
getBackendPid :: HPgConnection -> Int32
getBackendPid HPgConnection {Int32
connPid :: HPgConnection -> Int32
connPid :: Int32
connPid} = Int32
connPid
resetConnectionState ::
HPgConnection ->
Maybe ResetConnectionOpts ->
IO ()
resetConnectionState :: HPgConnection -> Maybe ResetConnectionOpts -> IO ()
resetConnectionState conn :: HPgConnection
conn@HPgConnection {TVar InternalConnectionState
internalConnectionState :: TVar InternalConnectionState
internalConnectionState :: HPgConnection -> TVar InternalConnectionState
internalConnectionState} Maybe ResetConnectionOpts
mCleanOpts = do
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
internalConnectionState
when (isLeft $ connectionReadyForNewPipeline st) $ throwIrrecoverableError "There are still active queries in progress. Make sure to close this connection with `closeForcefully` or consume all existing queries' results"
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ResetConnectionOpts -> Bool
checkTransactionState ResetConnectionOpts
cleanOpts) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
txnStatus <- HPgConnection -> IO TransactionStatus
transactionStatus HPgConnection
conn
unless (txnStatus == TransIdle) $ throwIrrecoverableError $ "The connection's transaction was left in an invalid state: " <> Text.pack (show txnStatus) <> ". Make sure to close this connection with `closeForcefully`"
do
let qs :: [Query]
qs = (if ResetConnectionOpts -> Bool
unlistenAll ResetConnectionOpts
cleanOpts then [Query
"UNLISTEN *"] else []) [Query] -> [Query] -> [Query]
forall a. [a] -> [a] -> [a]
++ (if ResetConnectionOpts -> Bool
resetAll ResetConnectionOpts
cleanOpts then [Query
"RESET ALL", Query
"RESET ROLE"] else [])
HPgConnection -> Pipeline [IO ()] -> IO [IO ()]
forall a. HPgConnection -> Pipeline a -> IO a
runPipeline HPgConnection
conn ((Query -> Pipeline (IO ())) -> [Query] -> Pipeline [IO ()]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse Query -> Pipeline (IO ())
pipelineExec_ [Query]
qs) IO [IO ()] -> ([IO ()] -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= [IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ResetConnectionOpts -> Bool
unlistenAll ResetConnectionOpts
cleanOpts) IO ()
clearInternalNotificationQueue
where
cleanOpts :: ResetConnectionOpts
cleanOpts = ResetConnectionOpts
-> Maybe ResetConnectionOpts -> ResetConnectionOpts
forall a. a -> Maybe a -> a
fromMaybe ResetConnectionOpts {resetAll :: Bool
resetAll = Bool
True, unlistenAll :: Bool
unlistenAll = Bool
True, checkTransactionState :: Bool
checkTransactionState = Bool
True} Maybe ResetConnectionOpts
mCleanOpts
clearInternalNotificationQueue :: IO ()
clearInternalNotificationQueue = STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
internalConnectionState
emptyQueue <- STM.newTQueue
STM.writeTVar internalConnectionState $ st {notificationsReceived = emptyQueue}
closeGracefully :: HPgConnection -> IO ()
closeGracefully :: HPgConnection -> IO ()
closeGracefully conn :: HPgConnection
conn@(HPgConnection {Socket
socket :: HPgConnection -> Socket
socket :: Socket
socket}) = HPgConnection -> IO () -> IO ()
whenNotClosed HPgConnection
conn (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ (IO () -> IO () -> IO ()) -> IO () -> IO () -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO () -> IO () -> IO ()
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
finally (Socket -> IO ()
Socket.close Socket
socket IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar Bool -> (Bool -> IO Bool) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ HPgConnection
conn.socketClosed (IO Bool -> Bool -> IO Bool
forall a b. a -> b -> a
const (IO Bool -> Bool -> IO Bool) -> IO Bool -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
HPgConnection
-> (TVar InternalConnectionState -> STM ())
-> (TVar InternalConnectionState -> STM ())
-> (() -> IO ())
-> IO ()
forall a b c.
HPgConnection
-> (TVar InternalConnectionState -> STM a)
-> (TVar InternalConnectionState -> STM b)
-> (a -> IO c)
-> IO c
withControlMsgsLock
HPgConnection
conn
(STM () -> TVar InternalConnectionState -> STM ()
forall a b. a -> b -> a
const (STM () -> TVar InternalConnectionState -> STM ())
-> STM () -> TVar InternalConnectionState -> STM ()
forall a b. (a -> b) -> a -> b
$ () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
(STM () -> TVar InternalConnectionState -> STM ()
forall a b. a -> b -> a
const (STM () -> TVar InternalConnectionState -> STM ())
-> STM () -> TVar InternalConnectionState -> STM ()
forall a b. (a -> b) -> a -> b
$ () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
((() -> IO ()) -> IO ()) -> (() -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \() -> do
HPgConnection -> Terminate -> IO ()
forall msg.
(ToPgMessage msg, Show msg) =>
HPgConnection -> msg -> IO ()
nonAtomicSendMsg HPgConnection
conn Terminate
Terminate
closeForcefully :: HPgConnection -> IO ()
closeForcefully :: HPgConnection -> IO ()
closeForcefully conn :: HPgConnection
conn@(HPgConnection {Socket
socket :: HPgConnection -> Socket
socket :: Socket
socket}) = HPgConnection -> IO () -> IO ()
whenNotClosed HPgConnection
conn (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Socket -> IO ()
Socket.close Socket
socket
MVar Bool -> (Bool -> IO Bool) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ HPgConnection
conn.socketClosed ((Bool -> IO Bool) -> IO ()) -> (Bool -> IO Bool) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> Bool -> IO Bool
forall a b. a -> b -> a
const (IO Bool -> Bool -> IO Bool) -> IO Bool -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
whenNotClosed :: HPgConnection -> IO () -> IO ()
whenNotClosed :: HPgConnection -> IO () -> IO ()
whenNotClosed HPgConnection
conn IO ()
f = do
isClosed <- MVar Bool -> IO Bool
forall a. MVar a -> IO a
readMVar HPgConnection
conn.socketClosed
unless isClosed f
withConnection :: ConnectionString -> DiffTime -> (HPgConnection -> IO a) -> IO a
withConnection :: forall a.
ConnectionString -> DiffTime -> (HPgConnection -> IO a) -> IO a
withConnection ConnectionString
connstr DiffTime
conntimeout HPgConnection -> IO a
f = IO HPgConnection
-> (HPgConnection -> IO ()) -> (HPgConnection -> IO a) -> IO a
forall (m :: * -> *) a b c.
(HasCallStack, MonadMask m) =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracketOnError (ConnectionString -> DiffTime -> IO HPgConnection
connect ConnectionString
connstr DiffTime
conntimeout) HPgConnection -> IO ()
closeForcefully ((HPgConnection -> IO a) -> IO a)
-> (HPgConnection -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \HPgConnection
conn -> do
res <- HPgConnection -> IO a
f HPgConnection
conn
closeGracefully conn
pure res
withConnectionOpts :: ConnectOpts -> ConnectionString -> DiffTime -> (HPgConnection -> IO a) -> IO a
withConnectionOpts :: forall a.
ConnectOpts
-> ConnectionString -> DiffTime -> (HPgConnection -> IO a) -> IO a
withConnectionOpts ConnectOpts
connOpts ConnectionString
connstr DiffTime
conntimeout HPgConnection -> IO a
f = IO HPgConnection
-> (HPgConnection -> IO ()) -> (HPgConnection -> IO a) -> IO a
forall (m :: * -> *) a b c.
(HasCallStack, MonadMask m) =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracketOnError (ConnectOpts -> ConnectionString -> DiffTime -> IO HPgConnection
connectOpts ConnectOpts
connOpts ConnectionString
connstr DiffTime
conntimeout) HPgConnection -> IO ()
closeForcefully ((HPgConnection -> IO a) -> IO a)
-> (HPgConnection -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \HPgConnection
conn -> do
res <- HPgConnection -> IO a
f HPgConnection
conn
closeGracefully conn
pure res
receiveNextMsgWithMaskedContinuation :: (Show a) => HPgConnection -> PgMsgParser a -> (a -> IO b) -> IO b
receiveNextMsgWithMaskedContinuation :: forall a b.
Show a =>
HPgConnection -> PgMsgParser a -> (a -> IO b) -> IO b
receiveNextMsgWithMaskedContinuation HPgConnection
conn PgMsgParser a
parser a -> IO b
f =
HPgConnection
-> PgMsgParser a
-> (Either (Char, Maybe PostgresError) a -> IO b)
-> IO b
forall a b.
Show a =>
HPgConnection
-> PgMsgParser a
-> (Either (Char, Maybe PostgresError) a -> IO b)
-> IO b
receiveNextMsgWithMaskedContinuationButDontThrowOnParsingFailure HPgConnection
conn PgMsgParser a
parser ((Either (Char, Maybe PostgresError) a -> IO b) -> IO b)
-> (Either (Char, Maybe PostgresError) a -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \case
Right a
p -> a -> IO b
f a
p
Left (Char
msgIdentChar, Maybe PostgresError
mPgError) -> IrrecoverableHpgsqlError -> IO b
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throw IrrecoverableHpgsqlError {hpgsqlDetails :: Text
hpgsqlDetails = Text
"Could not parse postgres message with ident char " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
Text.pack (Char -> [Char]
forall a. Show a => a -> [Char]
show Char
msgIdentChar) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
". This is an internal error in Hpgsql. Please report it.", innerException :: Maybe SomeException
innerException = PostgresError -> SomeException
forall e. Exception e => e -> SomeException
toException (PostgresError -> SomeException)
-> Maybe PostgresError -> Maybe SomeException
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe PostgresError
mPgError, relatedStatement :: Maybe ByteString
relatedStatement = Maybe ByteString
forall a. Maybe a
Nothing}
receiveNextMsgWithMaskedContinuationButDontThrowOnParsingFailure :: (Show a) => HPgConnection -> PgMsgParser a -> (Either (Char, Maybe PostgresError) a -> IO b) -> IO b
receiveNextMsgWithMaskedContinuationButDontThrowOnParsingFailure :: forall a b.
Show a =>
HPgConnection
-> PgMsgParser a
-> (Either (Char, Maybe PostgresError) a -> IO b)
-> IO b
receiveNextMsgWithMaskedContinuationButDontThrowOnParsingFailure conn :: HPgConnection
conn@HPgConnection {Socket
socket :: HPgConnection -> Socket
socket :: Socket
socket, MVar ByteString
recvBuffer :: MVar ByteString
recvBuffer :: HPgConnection -> MVar ByteString
recvBuffer} PgMsgParser a
parser Either (Char, Maybe PostgresError) a -> IO b
f = do
currentBuf <- Int64 -> IO ByteString
receiveUntilBufferHasAtLeast Int64
5
let bufLen = ByteString -> Int64
LBS.length ByteString
currentBuf
let charAndLength = Int64 -> ByteString -> ByteString
LBS.take Int64
5 ByteString
currentBuf
let (w2c -> msgIdentChar, lenbs) = fromMaybe (error "impossible") $ LBS.uncons charAndLength
lenLeftToFetch :: Int64 = fromIntegral $ either error id (Cereal.decodeLazy @Int32 lenbs) - 4
fullMessageLen = Int64
5 Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
lenLeftToFetch
restOfMsg <- LBS.drop 5 . LBS.take fullMessageLen <$> if bufLen >= fullMessageLen then pure currentBuf else receiveUntilBufferHasAtLeast fullMessageLen
receivedNoticeOrParameterSoTryAgain <- go msgIdentChar restOfMsg fullMessageLen
case receivedNoticeOrParameterSoTryAgain of
Maybe b
Nothing -> HPgConnection
-> PgMsgParser a
-> (Either (Char, Maybe PostgresError) a -> IO b)
-> IO b
forall a b.
Show a =>
HPgConnection
-> PgMsgParser a
-> (Either (Char, Maybe PostgresError) a -> IO b)
-> IO b
receiveNextMsgWithMaskedContinuationButDontThrowOnParsingFailure HPgConnection
conn PgMsgParser a
parser Either (Char, Maybe PostgresError) a -> IO b
f
Just b
res -> b -> IO b
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
res
where
go :: Char -> ByteString -> Int64 -> IO (Maybe b)
go Char
msgIdentChar ByteString
restOfMsg Int64
fullMessageLen = IO (Maybe b) -> IO (Maybe b)
forall (m :: * -> *) a. (HasCallStack, MonadMask m) => m a -> m a
mask_ (IO (Maybe b) -> IO (Maybe b)) -> IO (Maybe b) -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$ MVar ByteString
-> (ByteString -> IO (ByteString, Maybe b)) -> IO (Maybe b)
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar ByteString
recvBuffer ((ByteString -> IO (ByteString, Maybe b)) -> IO (Maybe b))
-> (ByteString -> IO (ByteString, Maybe b)) -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$ \ByteString
lbs -> do
let !bufferWithoutMsg :: ByteString
bufferWithoutMsg =
if ByteString -> Int64
LBS.length ByteString
lbs Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
>= Int64
fullMessageLen
then Int64 -> ByteString -> ByteString
LBS.drop Int64
fullMessageLen ByteString
lbs
else
[Char] -> ByteString
forall a. HasCallStack => [Char] -> a
error [Char]
"Bug in Hpgsql. Internal buffer's bytes weren't filled enough"
case Char -> ByteString -> PgMsgParser a -> Maybe a
forall a. Char -> ByteString -> PgMsgParser a -> Maybe a
parsePgMessage Char
msgIdentChar ByteString
restOfMsg PgMsgParser a
parser of
Just a
msg -> do
[Char] -> IO ()
debugPrint ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Received " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ a -> [Char]
forall a. Show a => a -> [Char]
show a
msg
(Maybe b -> (ByteString, Maybe b))
-> IO (Maybe b) -> IO (ByteString, Maybe b)
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (ByteString
bufferWithoutMsg,) (IO (Maybe b) -> IO (ByteString, Maybe b))
-> IO (Maybe b) -> IO (ByteString, Maybe b)
forall a b. (a -> b) -> a -> b
$ b -> Maybe b
forall a. a -> Maybe a
Just (b -> Maybe b) -> IO b -> IO (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either (Char, Maybe PostgresError) a -> IO b
f (a -> Either (Char, Maybe PostgresError) a
forall a b. b -> Either a b
Right a
msg)
Maybe a
Nothing -> do
case Char
-> ByteString
-> PgMsgParser
(Either3 NotificationResponse NoticeResponse ParameterStatus)
-> Maybe
(Either3 NotificationResponse NoticeResponse ParameterStatus)
forall a. Char -> ByteString -> PgMsgParser a -> Maybe a
parsePgMessage Char
msgIdentChar ByteString
restOfMsg (NotificationResponse
-> Either3 NotificationResponse NoticeResponse ParameterStatus
forall a b c. a -> Either3 a b c
Left3 (NotificationResponse
-> Either3 NotificationResponse NoticeResponse ParameterStatus)
-> PgMsgParser NotificationResponse
-> PgMsgParser
(Either3 NotificationResponse NoticeResponse ParameterStatus)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @NotificationResponse PgMsgParser
(Either3 NotificationResponse NoticeResponse ParameterStatus)
-> PgMsgParser
(Either3 NotificationResponse NoticeResponse ParameterStatus)
-> PgMsgParser
(Either3 NotificationResponse NoticeResponse ParameterStatus)
forall a. PgMsgParser a -> PgMsgParser a -> PgMsgParser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> NoticeResponse
-> Either3 NotificationResponse NoticeResponse ParameterStatus
forall a b c. b -> Either3 a b c
Middle3 (NoticeResponse
-> Either3 NotificationResponse NoticeResponse ParameterStatus)
-> PgMsgParser NoticeResponse
-> PgMsgParser
(Either3 NotificationResponse NoticeResponse ParameterStatus)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @NoticeResponse PgMsgParser
(Either3 NotificationResponse NoticeResponse ParameterStatus)
-> PgMsgParser
(Either3 NotificationResponse NoticeResponse ParameterStatus)
-> PgMsgParser
(Either3 NotificationResponse NoticeResponse ParameterStatus)
forall a. PgMsgParser a -> PgMsgParser a -> PgMsgParser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ParameterStatus
-> Either3 NotificationResponse NoticeResponse ParameterStatus
forall a b c. c -> Either3 a b c
Right3 (ParameterStatus
-> Either3 NotificationResponse NoticeResponse ParameterStatus)
-> PgMsgParser ParameterStatus
-> PgMsgParser
(Either3 NotificationResponse NoticeResponse ParameterStatus)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @ParameterStatus) of
Just (Left3 NotificationResponse
notifResponse) -> do
[Char] -> IO ()
debugPrint [Char]
"Received notification. Will add it to internal queue."
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
sttv <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar (TVar InternalConnectionState -> STM InternalConnectionState)
-> TVar InternalConnectionState -> STM InternalConnectionState
forall a b. (a -> b) -> a -> b
$ HPgConnection -> TVar InternalConnectionState
internalConnectionState HPgConnection
conn
STM.writeTQueue (notificationsReceived sttv) notifResponse
(ByteString, Maybe b) -> IO (ByteString, Maybe b)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString
bufferWithoutMsg, Maybe b
forall a. Maybe a
Nothing)
Just (Middle3 (NoticeResponse Map ErrorDetail ByteString
details)) -> do
let severity :: ByteString
severity =
ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
fromMaybe
ByteString
"Notice of unknown severity"
(ErrorDetail -> Map ErrorDetail ByteString -> Maybe ByteString
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup ErrorDetail
ErrorSeverity Map ErrorDetail ByteString
details)
humanmsg :: ByteString
humanmsg = ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
fromMaybe ByteString
"no message" (ErrorDetail -> Map ErrorDetail ByteString -> Maybe ByteString
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup ErrorDetail
ErrorHumanReadableMsg Map ErrorDetail ByteString
details)
Handle -> Text -> IO ()
Text.hPutStrLn Handle
stderr (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Text
decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
LBS.toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ ByteString
severity ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
": " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
humanmsg
(ByteString, Maybe b) -> IO (ByteString, Maybe b)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString
bufferWithoutMsg, Maybe b
forall a. Maybe a
Nothing)
Just (Right3 (ParameterStatus {Text
parameterName :: Text
parameterValue :: Text
parameterValue :: ParameterStatus -> Text
parameterName :: ParameterStatus -> Text
..})) -> do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Text
parameterName Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Text
"client_encoding" Bool -> Bool -> Bool
&& Text
parameterValue Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
/= Text
"UTF8") (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Text -> IO ()
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Postgres sent us a change of client_encoding to not UTF8, and Hpgsql only supports UTF8. The encoding postgres sent us is " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
parameterValue
MVar (Map Text Text)
-> (Map Text Text -> IO (Map Text Text)) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (HPgConnection -> MVar (Map Text Text)
parameterStatusMap HPgConnection
conn) ((Map Text Text -> IO (Map Text Text)) -> IO ())
-> (Map Text Text -> IO (Map Text Text)) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(!Map Text Text
paramMap) -> Map Text Text -> IO (Map Text Text)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text -> Text -> Map Text Text -> Map Text Text
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Text
parameterName Text
parameterValue Map Text Text
paramMap)
(ByteString, Maybe b) -> IO (ByteString, Maybe b)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString
bufferWithoutMsg, Maybe b
forall a. Maybe a
Nothing)
Maybe (Either3 NotificationResponse NoticeResponse ParameterStatus)
Nothing ->
let mPgError :: Maybe PostgresError
mPgError = ByteString -> ErrorResponse -> PostgresError
mkPostgresError ByteString
"" (ErrorResponse -> PostgresError)
-> Maybe ErrorResponse -> Maybe PostgresError
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Char
-> ByteString -> PgMsgParser ErrorResponse -> Maybe ErrorResponse
forall a. Char -> ByteString -> PgMsgParser a -> Maybe a
parsePgMessage Char
msgIdentChar ByteString
restOfMsg (forall a. FromPgMessage a => PgMsgParser a
msgParser @ErrorResponse)
in (Maybe b -> (ByteString, Maybe b))
-> IO (Maybe b) -> IO (ByteString, Maybe b)
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (ByteString
lbs,) (IO (Maybe b) -> IO (ByteString, Maybe b))
-> IO (Maybe b) -> IO (ByteString, Maybe b)
forall a b. (a -> b) -> a -> b
$ b -> Maybe b
forall a. a -> Maybe a
Just (b -> Maybe b) -> IO b -> IO (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either (Char, Maybe PostgresError) a -> IO b
f ((Char, Maybe PostgresError) -> Either (Char, Maybe PostgresError) a
forall a b. a -> Either a b
Left (Char
msgIdentChar, Maybe PostgresError
mPgError))
receiveUntilBufferHasAtLeast :: Int64 -> IO LBS.ByteString
receiveUntilBufferHasAtLeast :: Int64 -> IO ByteString
receiveUntilBufferHasAtLeast Int64
minBytesNecessary = do
currentBuffer <- MVar ByteString -> IO ByteString
forall a. MVar a -> IO a
readMVar MVar ByteString
recvBuffer
let nBytesInBuffer = ByteString -> Int64
LBS.length ByteString
currentBuffer
if nBytesInBuffer >= minBytesNecessary
then pure currentBuffer
else do
mask $ \forall a. IO a -> IO a
restore -> IO () -> IO ()
forall a. IO a -> IO a
rethrowAsIrrecoverable (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar ByteString -> (ByteString -> IO ByteString) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar ByteString
recvBuffer ((ByteString -> IO ByteString) -> IO ())
-> (ByteString -> IO ByteString) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ByteString
lbs -> do
IO () -> IO ()
forall a. IO a -> IO a
restore (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Socket -> IO ()
socketWaitRead Socket
socket
someBytes <- [Char] -> IO ByteString -> IO ByteString
forall a. [Char] -> IO a -> IO a
timeDebugNonBlockingOperation [Char]
"recv" (IO ByteString -> IO ByteString) -> IO ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ Socket -> CSize -> IO ByteString
recvNonBlocking Socket
socket (CSize -> CSize -> CSize
forall a. Ord a => a -> a -> a
max CSize
16000 (CSize -> CSize) -> CSize -> CSize
forall a b. (a -> b) -> a -> b
$ Int64 -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> CSize) -> Int64 -> CSize
forall a b. (a -> b) -> a -> b
$ Int64
minBytesNecessary Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
nBytesInBuffer)
pure (lbs <> LBS.fromStrict someBytes)
receiveUntilBufferHasAtLeast minBytesNecessary
sendCancellationRequest :: HPgConnection -> IO ()
sendCancellationRequest :: HPgConnection -> IO ()
sendCancellationRequest HPgConnection
conn = do
copyState <-
STM (Maybe CopyQueryState) -> IO (Maybe CopyQueryState)
forall a. STM a -> IO a
STM.atomically (STM (Maybe CopyQueryState) -> IO (Maybe CopyQueryState))
-> STM (Maybe CopyQueryState) -> IO (Maybe CopyQueryState)
forall a b. (a -> b) -> a -> b
$
TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar (HPgConnection -> TVar InternalConnectionState
internalConnectionState HPgConnection
conn) STM InternalConnectionState
-> (InternalConnectionState -> STM (Maybe CopyQueryState))
-> STM (Maybe CopyQueryState)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \InternalConnectionState
st -> Maybe CopyQueryState -> STM (Maybe CopyQueryState)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe CopyQueryState -> STM (Maybe CopyQueryState))
-> Maybe CopyQueryState -> STM (Maybe CopyQueryState)
forall a b. (a -> b) -> a -> b
$ case InternalConnectionState -> [QueryState]
currentPipeline InternalConnectionState
st of
[QueryState {queryProtocol :: QueryState -> QueryProtocol
queryProtocol = CopyQuery CopyQueryState
copyState}] -> CopyQueryState -> Maybe CopyQueryState
forall a. a -> Maybe a
Just CopyQueryState
copyState
[QueryState]
_ -> Maybe CopyQueryState
forall a. Maybe a
Nothing
let markCopyFailSent = do
let sttv :: TVar InternalConnectionState
sttv = HPgConnection -> TVar InternalConnectionState
internalConnectionState HPgConnection
conn
st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
sttv
case currentPipeline st of
[qs :: QueryState
qs@QueryState {queryProtocol :: QueryState -> QueryProtocol
queryProtocol = CopyQuery CopyQueryState
StillCopying}] -> TVar InternalConnectionState -> InternalConnectionState -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar (HPgConnection -> TVar InternalConnectionState
internalConnectionState HPgConnection
conn) (InternalConnectionState -> STM ())
-> InternalConnectionState -> STM ()
forall a b. (a -> b) -> a -> b
$ InternalConnectionState
st {currentPipeline = [qs {queryProtocol = CopyQuery CopyFailAndSyncSent}]}
[QueryState]
_ -> Text -> STM ()
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Impossible: when marking CopyFail state was invalid"
case copyState of
Just CopyQueryState
StillCopying ->
HPgConnection -> ([SomeMessage], STM ()) -> IO ()
atomicallySendControlMsgs_ HPgConnection
conn ([CopyFail -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage (CopyFail -> SomeMessage) -> CopyFail -> SomeMessage
forall a b. (a -> b) -> a -> b
$ [Char] -> CopyFail
Msgs.CopyFail [Char]
"COPY statement cancelled by Hpgsql", Sync -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage Sync
Sync], STM ()
markCopyFailSent)
Just CopyQueryState
CopyDoneAndSyncSent ->
() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
()
Just CopyQueryState
CopyFailAndSyncSent ->
() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
()
Maybe CopyQueryState
Nothing ->
InternalConnectOrCancelRequest ()
-> ConnectOpts -> ConnectionString -> DiffTime -> IO ()
forall a.
InternalConnectOrCancelRequest a
-> ConnectOpts -> ConnectionString -> DiffTime -> IO a
internalConnectOrCancel
(CancelRequest -> AddrInfo -> InternalConnectOrCancelRequest ()
CancelNotConnect (Int32 -> Int32 -> CancelRequest
CancelRequest (HPgConnection -> Int32
connPid HPgConnection
conn) (HPgConnection -> Int32
cancelSecretKey HPgConnection
conn)) (HPgConnection -> AddrInfo
connectedTo HPgConnection
conn))
(HPgConnection -> ConnectOpts
connOpts HPgConnection
conn)
(HPgConnection -> ConnectionString
originalConnStr HPgConnection
conn)
(Integer -> DiffTime
secondsToDiffTime Integer
30)
isLastInPipeline :: HPgConnection -> QueryId -> IO Bool
isLastInPipeline :: HPgConnection -> QueryId -> IO Bool
isLastInPipeline HPgConnection
conn QueryId
qryId = STM Bool -> IO Bool
forall a. STM a -> IO a
STM.atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ HPgConnection
-> (TVar InternalConnectionState -> STM Bool) -> STM Bool
forall a.
HPgConnection -> (TVar InternalConnectionState -> STM a) -> STM a
updateConnStateTxn HPgConnection
conn ((TVar InternalConnectionState -> STM Bool) -> STM Bool)
-> (TVar InternalConnectionState -> STM Bool) -> STM Bool
forall a b. (a -> b) -> a -> b
$ \TVar InternalConnectionState
sttv -> do
InternalConnectionState {currentPipeline = queries} <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
sttv
pure $ case lastMaybe queries of
Maybe QueryState
Nothing -> Bool
False
Just QueryState
q -> QueryState -> QueryId
queryIdentifier QueryState
q QueryId -> QueryId -> Bool
forall a. Eq a => a -> a -> Bool
== QueryId
qryId
updateConnStateTxn :: HPgConnection -> (TVar InternalConnectionState -> STM a) -> STM a
updateConnStateTxn :: forall a.
HPgConnection -> (TVar InternalConnectionState -> STM a) -> STM a
updateConnStateTxn HPgConnection
conn TVar InternalConnectionState -> STM a
f = TVar InternalConnectionState -> STM a
f (HPgConnection -> TVar InternalConnectionState
internalConnectionState HPgConnection
conn)
withControlMsgsLock ::
HPgConnection ->
(TVar InternalConnectionState -> STM a) ->
(TVar InternalConnectionState -> STM b) ->
(a -> IO c) ->
IO c
withControlMsgsLock :: forall a b c.
HPgConnection
-> (TVar InternalConnectionState -> STM a)
-> (TVar InternalConnectionState -> STM b)
-> (a -> IO c)
-> IO c
withControlMsgsLock conn :: HPgConnection
conn@HPgConnection {Socket
socket :: HPgConnection -> Socket
socket :: Socket
socket, Mutex
socketMutex :: Mutex
socketMutex :: HPgConnection -> Mutex
socketMutex} TVar InternalConnectionState -> STM a
acqStm TVar InternalConnectionState -> STM b
relStm a -> IO c
f = do
Mutex -> IO c -> IO c
forall a. Mutex -> IO a -> IO a
withMutex Mutex
socketMutex (IO c -> IO c) -> IO c -> IO c
forall a b. (a -> b) -> a -> b
$ do
IO ()
flushSendBuffer
IO a -> (a -> IO ()) -> (a -> IO c) -> IO c
forall (m :: * -> *) a b c.
(HasCallStack, MonadMask m) =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket
(STM a -> IO a
forall a. STM a -> IO a
STM.atomically (STM a -> IO a) -> STM a -> IO a
forall a b. (a -> b) -> a -> b
$ HPgConnection -> (TVar InternalConnectionState -> STM a) -> STM a
forall a.
HPgConnection -> (TVar InternalConnectionState -> STM a) -> STM a
updateConnStateTxn HPgConnection
conn TVar InternalConnectionState -> STM a
acqStm)
(IO () -> a -> IO ()
forall a b. a -> b -> a
const (IO () -> a -> IO ()) -> IO () -> a -> IO ()
forall a b. (a -> b) -> a -> b
$ IO b -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO b -> IO ()) -> IO b -> IO ()
forall a b. (a -> b) -> a -> b
$ STM b -> IO b
forall a. STM a -> IO a
STM.atomically (STM b -> IO b) -> STM b -> IO b
forall a b. (a -> b) -> a -> b
$ HPgConnection -> (TVar InternalConnectionState -> STM b) -> STM b
forall a.
HPgConnection -> (TVar InternalConnectionState -> STM a) -> STM a
updateConnStateTxn HPgConnection
conn TVar InternalConnectionState -> STM b
relStm)
( \a
acq -> do
res <- a -> IO c
f a
acq
flushSendBuffer
pure res
)
where
flushSendBuffer :: IO ()
flushSendBuffer :: IO ()
flushSendBuffer =
IO () -> IO ()
forall a. IO a -> IO a
rethrowAsIrrecoverable (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall b.
HasCallStack =>
((forall a. IO a -> IO a) -> IO b) -> IO b
forall (m :: * -> *) b.
(MonadMask m, HasCallStack) =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. IO a -> IO a) -> IO ()) -> IO ())
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$
\forall a. IO a -> IO a
restore -> do
let go :: IO ()
go = do
others <- MVar [(ByteString, STM ())]
-> ([(ByteString, STM ())]
-> IO ([(ByteString, STM ())], [(ByteString, STM ())]))
-> IO [(ByteString, STM ())]
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar HPgConnection
conn.sendBuffer (([(ByteString, STM ())]
-> IO ([(ByteString, STM ())], [(ByteString, STM ())]))
-> IO [(ByteString, STM ())])
-> ([(ByteString, STM ())]
-> IO ([(ByteString, STM ())], [(ByteString, STM ())]))
-> IO [(ByteString, STM ())]
forall a b. (a -> b) -> a -> b
$ \case
[] -> ([(ByteString, STM ())], [(ByteString, STM ())])
-> IO ([(ByteString, STM ())], [(ByteString, STM ())])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([], [])
((!ByteString
msgs, STM ()
afterSentTxn) : [(ByteString, STM ())]
xs) ->
if ByteString -> Bool
LBS.null ByteString
msgs
then do
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically STM ()
afterSentTxn
([(ByteString, STM ())], [(ByteString, STM ())])
-> IO ([(ByteString, STM ())], [(ByteString, STM ())])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([(ByteString, STM ())]
xs, [(ByteString, STM ())]
xs)
else do
IO () -> IO ()
forall a. IO a -> IO a
restore (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Socket -> IO ()
socketWaitWrite Socket
socket
n <- [Char] -> IO Int64 -> IO Int64
forall a. [Char] -> IO a -> IO a
timeDebugNonBlockingOperation [Char]
"sendNonBlocking" (IO Int64 -> IO Int64) -> IO Int64 -> IO Int64
forall a b. (a -> b) -> a -> b
$ Socket -> ByteString -> IO Int64
sendNonBlocking Socket
socket ByteString
msgs
let !remaining = Int64 -> ByteString -> ByteString
LBS.drop Int64
n ByteString
msgs
fin = (ByteString
remaining, STM ()
afterSentTxn) (ByteString, STM ())
-> [(ByteString, STM ())] -> [(ByteString, STM ())]
forall a. a -> [a] -> [a]
: [(ByteString, STM ())]
xs
pure (fin, fin)
unless (null others) go
IO ()
go
receiveOutstandingResponseMsgsAtomically :: WeakThreadId -> HPgConnection -> QueryId -> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveOutstandingResponseMsgsAtomically :: WeakThreadId
-> HPgConnection
-> QueryId
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveOutstandingResponseMsgsAtomically WeakThreadId
thisThreadId HPgConnection
conn QueryId
qryId = do
HPgConnection
-> (TVar InternalConnectionState -> STM QueryState)
-> (TVar InternalConnectionState -> STM ())
-> (QueryState -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b c.
HPgConnection
-> (TVar InternalConnectionState -> STM a)
-> (TVar InternalConnectionState -> STM b)
-> (a -> IO c)
-> IO c
withControlMsgsLock
HPgConnection
conn
(STM QueryState -> TVar InternalConnectionState -> STM QueryState
forall a b. a -> b -> a
const STM QueryState
getQueryStateIfFirstOrThrow)
(STM () -> TVar InternalConnectionState -> STM ()
forall a b. a -> b -> a
const (STM () -> TVar InternalConnectionState -> STM ())
-> STM () -> TVar InternalConnectionState -> STM ()
forall a b. (a -> b) -> a -> b
$ () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
((QueryState -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> (QueryState -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ \QueryState
qryState -> do
case QueryState -> ResponseMsgsReceived
responseMsgsState QueryState
qryState of
ResponseMsgsReceived
NoMsgsReceived -> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveParseOrBindCompleteAtomically
ParseCompleteReceived ParseComplete
_ -> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveBindCompleteAtomically
BindCompleteReceived BindComplete
_ -> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveNoDataOrRowDescriptionOrCopyInResponseAtomically
RowDescriptionOrNoDataOrCopyInResponseReceived Either3 NoData RowDescription CopyInResponse
_ -> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveDataRowOrCommandCompleteAtomically
ErrorResponseReceived Maybe (Either3 NoData RowDescription CopyInResponse)
_ ErrorResponse
_ -> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveReadyForQueryAtomically
state :: ResponseMsgsReceived
state@(CommandCompleteReceived Either3 NoData RowDescription CopyInResponse
_ CommandComplete
_) -> IO Bool
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a. IO Bool -> IO a -> IO a -> IO a
ifM (HPgConnection -> QueryId -> IO Bool
isLastInPipeline HPgConnection
conn QueryId
qryId) IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveReadyForQueryAtomically ((Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe ResponseMsg
forall a. Maybe a
Nothing, ResponseMsgsReceived
state))
state :: ResponseMsgsReceived
state@(ReadyForQueryReceived Either ErrorResponse CommandComplete
_ ReadyForQuery
_) -> (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe ResponseMsg
forall a. Maybe a
Nothing, ResponseMsgsReceived
state)
where
receiveParseOrBindCompleteAtomically :: IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveParseOrBindCompleteAtomically = HPgConnection
-> PgMsgParser (Either3 ErrorResponse BindComplete ParseComplete)
-> (Either3 ErrorResponse BindComplete ParseComplete
-> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b.
Show a =>
HPgConnection -> PgMsgParser a -> (a -> IO b) -> IO b
receiveNextMsgWithMaskedContinuation HPgConnection
conn (ParseComplete -> Either3 ErrorResponse BindComplete ParseComplete
forall a b c. c -> Either3 a b c
Right3 (ParseComplete -> Either3 ErrorResponse BindComplete ParseComplete)
-> PgMsgParser ParseComplete
-> PgMsgParser (Either3 ErrorResponse BindComplete ParseComplete)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @ParseComplete PgMsgParser (Either3 ErrorResponse BindComplete ParseComplete)
-> PgMsgParser (Either3 ErrorResponse BindComplete ParseComplete)
-> PgMsgParser (Either3 ErrorResponse BindComplete ParseComplete)
forall a. PgMsgParser a -> PgMsgParser a -> PgMsgParser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ErrorResponse -> Either3 ErrorResponse BindComplete ParseComplete
forall a b c. a -> Either3 a b c
Left3 (ErrorResponse -> Either3 ErrorResponse BindComplete ParseComplete)
-> PgMsgParser ErrorResponse
-> PgMsgParser (Either3 ErrorResponse BindComplete ParseComplete)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @ErrorResponse PgMsgParser (Either3 ErrorResponse BindComplete ParseComplete)
-> PgMsgParser (Either3 ErrorResponse BindComplete ParseComplete)
-> PgMsgParser (Either3 ErrorResponse BindComplete ParseComplete)
forall a. PgMsgParser a -> PgMsgParser a -> PgMsgParser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> BindComplete -> Either3 ErrorResponse BindComplete ParseComplete
forall a b c. b -> Either3 a b c
Middle3 (BindComplete -> Either3 ErrorResponse BindComplete ParseComplete)
-> PgMsgParser BindComplete
-> PgMsgParser (Either3 ErrorResponse BindComplete ParseComplete)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @BindComplete) ((Either3 ErrorResponse BindComplete ParseComplete
-> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> (Either3 ErrorResponse BindComplete ParseComplete
-> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ \Either3 ErrorResponse BindComplete ParseComplete
msgE -> do
STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a. STM a -> IO a
STM.atomically (STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
updateQueryStateIfFirstOrThrow (ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived))
-> ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ case Either3 ErrorResponse BindComplete ParseComplete
msgE of
Right3 ParseComplete
msg -> ParseComplete -> ResponseMsg
RespParseComplete ParseComplete
msg
Middle3 BindComplete
msg -> BindComplete -> ResponseMsg
RespBindComplete BindComplete
msg
Left3 ErrorResponse
err -> ErrorResponse -> ResponseMsg
RespErrorResponse ErrorResponse
err
receiveBindCompleteAtomically :: IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveBindCompleteAtomically = HPgConnection
-> PgMsgParser (Either ErrorResponse BindComplete)
-> (Either ErrorResponse BindComplete
-> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b.
Show a =>
HPgConnection -> PgMsgParser a -> (a -> IO b) -> IO b
receiveNextMsgWithMaskedContinuation HPgConnection
conn (BindComplete -> Either ErrorResponse BindComplete
forall a b. b -> Either a b
Right (BindComplete -> Either ErrorResponse BindComplete)
-> PgMsgParser BindComplete
-> PgMsgParser (Either ErrorResponse BindComplete)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @BindComplete PgMsgParser (Either ErrorResponse BindComplete)
-> PgMsgParser (Either ErrorResponse BindComplete)
-> PgMsgParser (Either ErrorResponse BindComplete)
forall a. PgMsgParser a -> PgMsgParser a -> PgMsgParser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ErrorResponse -> Either ErrorResponse BindComplete
forall a b. a -> Either a b
Left (ErrorResponse -> Either ErrorResponse BindComplete)
-> PgMsgParser ErrorResponse
-> PgMsgParser (Either ErrorResponse BindComplete)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @ErrorResponse) ((Either ErrorResponse BindComplete
-> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> (Either ErrorResponse BindComplete
-> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ \Either ErrorResponse BindComplete
msgE -> do
STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a. STM a -> IO a
STM.atomically (STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
updateQueryStateIfFirstOrThrow (ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived))
-> ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ case Either ErrorResponse BindComplete
msgE of
Right BindComplete
msg -> BindComplete -> ResponseMsg
RespBindComplete BindComplete
msg
Left ErrorResponse
err -> ErrorResponse -> ResponseMsg
RespErrorResponse ErrorResponse
err
receiveNoDataOrRowDescriptionOrCopyInResponseAtomically :: IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveNoDataOrRowDescriptionOrCopyInResponseAtomically = HPgConnection
-> PgMsgParser
(Either
ErrorResponse (Either3 NoData CopyInResponse RowDescription))
-> (Either
ErrorResponse (Either3 NoData CopyInResponse RowDescription)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b.
Show a =>
HPgConnection -> PgMsgParser a -> (a -> IO b) -> IO b
receiveNextMsgWithMaskedContinuation HPgConnection
conn (Either3 NoData CopyInResponse RowDescription
-> Either
ErrorResponse (Either3 NoData CopyInResponse RowDescription)
forall a b. b -> Either a b
Right (Either3 NoData CopyInResponse RowDescription
-> Either
ErrorResponse (Either3 NoData CopyInResponse RowDescription))
-> PgMsgParser (Either3 NoData CopyInResponse RowDescription)
-> PgMsgParser
(Either
ErrorResponse (Either3 NoData CopyInResponse RowDescription))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (RowDescription -> Either3 NoData CopyInResponse RowDescription
forall a b c. c -> Either3 a b c
Right3 (RowDescription -> Either3 NoData CopyInResponse RowDescription)
-> PgMsgParser RowDescription
-> PgMsgParser (Either3 NoData CopyInResponse RowDescription)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @RowDescription PgMsgParser (Either3 NoData CopyInResponse RowDescription)
-> PgMsgParser (Either3 NoData CopyInResponse RowDescription)
-> PgMsgParser (Either3 NoData CopyInResponse RowDescription)
forall a. PgMsgParser a -> PgMsgParser a -> PgMsgParser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> NoData -> Either3 NoData CopyInResponse RowDescription
forall a b c. a -> Either3 a b c
Left3 (NoData -> Either3 NoData CopyInResponse RowDescription)
-> PgMsgParser NoData
-> PgMsgParser (Either3 NoData CopyInResponse RowDescription)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @NoData PgMsgParser (Either3 NoData CopyInResponse RowDescription)
-> PgMsgParser (Either3 NoData CopyInResponse RowDescription)
-> PgMsgParser (Either3 NoData CopyInResponse RowDescription)
forall a. PgMsgParser a -> PgMsgParser a -> PgMsgParser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> CopyInResponse -> Either3 NoData CopyInResponse RowDescription
forall a b c. b -> Either3 a b c
Middle3 (CopyInResponse -> Either3 NoData CopyInResponse RowDescription)
-> PgMsgParser CopyInResponse
-> PgMsgParser (Either3 NoData CopyInResponse RowDescription)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @CopyInResponse) PgMsgParser
(Either
ErrorResponse (Either3 NoData CopyInResponse RowDescription))
-> PgMsgParser
(Either
ErrorResponse (Either3 NoData CopyInResponse RowDescription))
-> PgMsgParser
(Either
ErrorResponse (Either3 NoData CopyInResponse RowDescription))
forall a. PgMsgParser a -> PgMsgParser a -> PgMsgParser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ErrorResponse
-> Either
ErrorResponse (Either3 NoData CopyInResponse RowDescription)
forall a b. a -> Either a b
Left (ErrorResponse
-> Either
ErrorResponse (Either3 NoData CopyInResponse RowDescription))
-> PgMsgParser ErrorResponse
-> PgMsgParser
(Either
ErrorResponse (Either3 NoData CopyInResponse RowDescription))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @ErrorResponse) ((Either
ErrorResponse (Either3 NoData CopyInResponse RowDescription)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> (Either
ErrorResponse (Either3 NoData CopyInResponse RowDescription)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ \Either ErrorResponse (Either3 NoData CopyInResponse RowDescription)
msgE -> do
STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a. STM a -> IO a
STM.atomically (STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
updateQueryStateIfFirstOrThrow (ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived))
-> ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ case Either ErrorResponse (Either3 NoData CopyInResponse RowDescription)
msgE of
Right (Left3 NoData
msg) -> NoData -> ResponseMsg
RespNoData NoData
msg
Right (Middle3 CopyInResponse
msg) -> CopyInResponse -> ResponseMsg
RespCopyInResponse CopyInResponse
msg
Right (Right3 RowDescription
msg) -> RowDescription -> ResponseMsg
RespRowDescription RowDescription
msg
Left ErrorResponse
err -> ErrorResponse -> ResponseMsg
RespErrorResponse ErrorResponse
err
receiveDataRowOrCommandCompleteAtomically :: IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveDataRowOrCommandCompleteAtomically = HPgConnection
-> PgMsgParser (Either3 ErrorResponse CommandComplete DataRow)
-> (Either3 ErrorResponse CommandComplete DataRow
-> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b.
Show a =>
HPgConnection -> PgMsgParser a -> (a -> IO b) -> IO b
receiveNextMsgWithMaskedContinuation HPgConnection
conn (DataRow -> Either3 ErrorResponse CommandComplete DataRow
forall a b c. c -> Either3 a b c
Right3 (DataRow -> Either3 ErrorResponse CommandComplete DataRow)
-> PgMsgParser DataRow
-> PgMsgParser (Either3 ErrorResponse CommandComplete DataRow)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @DataRow PgMsgParser (Either3 ErrorResponse CommandComplete DataRow)
-> PgMsgParser (Either3 ErrorResponse CommandComplete DataRow)
-> PgMsgParser (Either3 ErrorResponse CommandComplete DataRow)
forall a. PgMsgParser a -> PgMsgParser a -> PgMsgParser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> CommandComplete -> Either3 ErrorResponse CommandComplete DataRow
forall a b c. b -> Either3 a b c
Middle3 (CommandComplete -> Either3 ErrorResponse CommandComplete DataRow)
-> PgMsgParser CommandComplete
-> PgMsgParser (Either3 ErrorResponse CommandComplete DataRow)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @CommandComplete PgMsgParser (Either3 ErrorResponse CommandComplete DataRow)
-> PgMsgParser (Either3 ErrorResponse CommandComplete DataRow)
-> PgMsgParser (Either3 ErrorResponse CommandComplete DataRow)
forall a. PgMsgParser a -> PgMsgParser a -> PgMsgParser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ErrorResponse -> Either3 ErrorResponse CommandComplete DataRow
forall a b c. a -> Either3 a b c
Left3 (ErrorResponse -> Either3 ErrorResponse CommandComplete DataRow)
-> PgMsgParser ErrorResponse
-> PgMsgParser (Either3 ErrorResponse CommandComplete DataRow)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromPgMessage a => PgMsgParser a
msgParser @ErrorResponse) ((Either3 ErrorResponse CommandComplete DataRow
-> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> (Either3 ErrorResponse CommandComplete DataRow
-> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ \Either3 ErrorResponse CommandComplete DataRow
msgE -> do
STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a. STM a -> IO a
STM.atomically (STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
updateQueryStateIfFirstOrThrow (ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived))
-> ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ case Either3 ErrorResponse CommandComplete DataRow
msgE of
Right3 DataRow
msg -> DataRow -> ResponseMsg
RespDataRow DataRow
msg
Middle3 CommandComplete
msg -> CommandComplete -> ResponseMsg
RespCommandComplete CommandComplete
msg
Left3 ErrorResponse
msg -> ErrorResponse -> ResponseMsg
RespErrorResponse ErrorResponse
msg
receiveReadyForQueryAtomically :: IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveReadyForQueryAtomically = HPgConnection
-> PgMsgParser ReadyForQuery
-> (ReadyForQuery -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b.
Show a =>
HPgConnection -> PgMsgParser a -> (a -> IO b) -> IO b
receiveNextMsgWithMaskedContinuation HPgConnection
conn (forall a. FromPgMessage a => PgMsgParser a
msgParser @ReadyForQuery) ((ReadyForQuery -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> (ReadyForQuery -> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ \ReadyForQuery
rq -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a. STM a -> IO a
STM.atomically (STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived))
-> STM (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
updateQueryStateIfFirstOrThrow (ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived))
-> ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ ReadyForQuery -> ResponseMsg
RespReadyForQuery ReadyForQuery
rq
getQueryStateIfFirstOrThrow :: STM QueryState
getQueryStateIfFirstOrThrow :: STM QueryState
getQueryStateIfFirstOrThrow = HPgConnection
-> (TVar InternalConnectionState -> STM QueryState)
-> STM QueryState
forall a.
HPgConnection -> (TVar InternalConnectionState -> STM a) -> STM a
updateConnStateTxn HPgConnection
conn ((TVar InternalConnectionState -> STM QueryState)
-> STM QueryState)
-> (TVar InternalConnectionState -> STM QueryState)
-> STM QueryState
forall a b. (a -> b) -> a -> b
$ \TVar InternalConnectionState
sttv -> do
st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
sttv
case currentPipeline st of
[] -> Text -> STM QueryState
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError (Text -> STM QueryState) -> Text -> STM QueryState
forall a b. (a -> b) -> a -> b
$ Text
"QueryId " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
Text.pack (QueryId -> [Char]
forall a. Show a => a -> [Char]
show QueryId
qryId) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" does not exist because the pipeline is empty. This is most likely a bug in Hpgsql, but just in case, are you trying to consume a pipeline that no longer exists?"
[QueryState]
queries
| (QueryState -> Bool) -> [QueryState] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all ((QueryId -> QueryId -> Bool
forall a. Ord a => a -> a -> Bool
> QueryId
qryId) (QueryId -> Bool) -> (QueryState -> QueryId) -> QueryState -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QueryState -> QueryId
queryIdentifier) [QueryState]
queries -> Text -> STM QueryState
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError (Text -> STM QueryState) -> Text -> STM QueryState
forall a b. (a -> b) -> a -> b
$ Text
"Bug in Hpgsql: trying to receive outstanding messages for a pipeline that has already been fully consumed. Information about this pipeline no longer available in internal state.: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
Text.pack ((QueryId, [QueryState]) -> [Char]
forall a. Show a => a -> [Char]
show (QueryId
qryId, [QueryState]
queries))
| (QueryState -> Bool) -> [QueryState] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any ((WeakThreadId -> WeakThreadId -> Bool
forall a. Eq a => a -> a -> Bool
/= WeakThreadId
thisThreadId) (WeakThreadId -> Bool)
-> (QueryState -> WeakThreadId) -> QueryState -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QueryState -> WeakThreadId
queryOwner) [QueryState]
queries -> Text -> STM QueryState
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Hpgsql does not support consuming different SQL statements' results of the same pipeline from different threads. Behaviour is undefined if you try that."
([QueryState] -> ([QueryState], QueryState)
splitQueries -> ([QueryState]
earlierQueries, QueryState
thisQuery))
| (QueryState -> Bool) -> [QueryState] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any QueryState -> Bool
queryInError [QueryState]
earlierQueries -> Text -> STM QueryState
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Another query in the same pipeline threw an error"
| Bool -> Bool
not ((QueryState -> Bool) -> [QueryState] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all QueryState -> Bool
queryComplete [QueryState]
earlierQueries) -> Text -> STM QueryState
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Are you trying to consume a statement's results before consuming the results of previous statements of the same pipeline? Hpgsql does not support that. It is also possible a previous statement in the pipeline threw an irrecoverable error, and you still tried to consume another statement's results, which is also not supported."
| Bool
otherwise -> QueryState -> STM QueryState
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure QueryState
thisQuery
splitQueries :: [QueryState] -> ([QueryState], QueryState)
splitQueries :: [QueryState] -> ([QueryState], QueryState)
splitQueries [QueryState]
qs = case (QueryState -> Bool)
-> [QueryState] -> ([QueryState], [QueryState])
forall a. (a -> Bool) -> [a] -> ([a], [a])
List.span ((QueryId -> QueryId -> Bool
forall a. Ord a => a -> a -> Bool
< QueryId
qryId) (QueryId -> Bool) -> (QueryState -> QueryId) -> QueryState -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QueryState -> QueryId
queryIdentifier) [QueryState]
qs of
([QueryState]
_, []) -> [Char] -> ([QueryState], QueryState)
forall a. HasCallStack => [Char] -> a
error ([Char] -> ([QueryState], QueryState))
-> [Char] -> ([QueryState], QueryState)
forall a b. (a -> b) -> a -> b
$ [Char]
"Could not find query with right Id: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ ([QueryState], QueryId) -> [Char]
forall a. Show a => a -> [Char]
show ([QueryState]
qs, QueryId
qryId)
([QueryState]
as, QueryState
firstQuery : [QueryState]
_others)
| QueryState -> QueryId
queryIdentifier QueryState
firstQuery QueryId -> QueryId -> Bool
forall a. Eq a => a -> a -> Bool
== QueryId
qryId -> ([QueryState]
as, QueryState
firstQuery)
| Bool
otherwise -> [Char] -> ([QueryState], QueryState)
forall a. HasCallStack => [Char] -> a
error ([Char] -> ([QueryState], QueryState))
-> [Char] -> ([QueryState], QueryState)
forall a b. (a -> b) -> a -> b
$ [Char]
"Could not find query right Id (part 2): " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ ([QueryState], QueryId) -> [Char]
forall a. Show a => a -> [Char]
show ([QueryState]
qs, QueryId
qryId)
queryComplete :: QueryState -> Bool
queryComplete :: QueryState -> Bool
queryComplete QueryState {ResponseMsgsReceived
responseMsgsState :: QueryState -> ResponseMsgsReceived
responseMsgsState :: ResponseMsgsReceived
responseMsgsState} = case ResponseMsgsReceived
responseMsgsState of
CommandCompleteReceived Either3 NoData RowDescription CopyInResponse
_ CommandComplete
_ -> Bool
True
ResponseMsgsReceived
_ -> Bool
False
queryInError :: QueryState -> Bool
queryInError :: QueryState -> Bool
queryInError QueryState {ResponseMsgsReceived
responseMsgsState :: QueryState -> ResponseMsgsReceived
responseMsgsState :: ResponseMsgsReceived
responseMsgsState} = case ResponseMsgsReceived
responseMsgsState of
ErrorResponseReceived Maybe (Either3 NoData RowDescription CopyInResponse)
_ ErrorResponse
_ -> Bool
True
ReadyForQueryReceived (Left ErrorResponse {}) ReadyForQuery
_ -> Bool
True
ResponseMsgsReceived
_ -> Bool
False
updateQueryStateIfFirstOrThrow :: ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
updateQueryStateIfFirstOrThrow :: ResponseMsg -> STM (Maybe ResponseMsg, ResponseMsgsReceived)
updateQueryStateIfFirstOrThrow ResponseMsg
respMsg = HPgConnection
-> (TVar InternalConnectionState
-> STM (Maybe ResponseMsg, ResponseMsgsReceived))
-> STM (Maybe ResponseMsg, ResponseMsgsReceived)
forall a.
HPgConnection -> (TVar InternalConnectionState -> STM a) -> STM a
updateConnStateTxn HPgConnection
conn ((TVar InternalConnectionState
-> STM (Maybe ResponseMsg, ResponseMsgsReceived))
-> STM (Maybe ResponseMsg, ResponseMsgsReceived))
-> (TVar InternalConnectionState
-> STM (Maybe ResponseMsg, ResponseMsgsReceived))
-> STM (Maybe ResponseMsg, ResponseMsgsReceived)
forall a b. (a -> b) -> a -> b
$ \TVar InternalConnectionState
sttv -> do
st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
sttv
firstPendingQry <- getQueryStateIfFirstOrThrow
(newState, newPrepStmtNames) <- case (respMsg, firstPendingQry.responseMsgsState) of
(RespParseComplete ParseComplete
msg, ResponseMsgsReceived
NoMsgsReceived) ->
(ResponseMsgsReceived, Set [Char])
-> STM (ResponseMsgsReceived, Set [Char])
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ParseComplete -> ResponseMsgsReceived
ParseCompleteReceived ParseComplete
msg, Set [Char] -> ([Char] -> Set [Char]) -> Maybe [Char] -> Set [Char]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe InternalConnectionState
st.preparedStatementNames ([Char] -> Set [Char] -> Set [Char]
forall a. Ord a => a -> Set a -> Set a
`Set.insert` InternalConnectionState
st.preparedStatementNames) QueryState
firstPendingQry.queryPrepStmtName)
(RespBindComplete BindComplete
msg, ParseCompleteReceived ParseComplete
_) -> (ResponseMsgsReceived, Set [Char])
-> STM (ResponseMsgsReceived, Set [Char])
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (BindComplete -> ResponseMsgsReceived
BindCompleteReceived BindComplete
msg, InternalConnectionState
st.preparedStatementNames)
(RespBindComplete BindComplete
msg, ResponseMsgsReceived
NoMsgsReceived) -> do
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe [Char] -> Bool
forall a. Maybe a -> Bool
isNothing QueryState
firstPendingQry.queryPrepStmtName) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Text -> STM ()
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement QueryState
firstPendingQry.queryText Text
"Bug in Hpgsql. Received a BindComplete without a ParseComplete but SQL statement wasn't prepared"
(ResponseMsgsReceived, Set [Char])
-> STM (ResponseMsgsReceived, Set [Char])
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (BindComplete -> ResponseMsgsReceived
BindCompleteReceived BindComplete
msg, InternalConnectionState
st.preparedStatementNames)
(RespNoData NoData
msg, BindCompleteReceived BindComplete
_) -> (ResponseMsgsReceived, Set [Char])
-> STM (ResponseMsgsReceived, Set [Char])
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either3 NoData RowDescription CopyInResponse
-> ResponseMsgsReceived
RowDescriptionOrNoDataOrCopyInResponseReceived (Either3 NoData RowDescription CopyInResponse
-> ResponseMsgsReceived)
-> Either3 NoData RowDescription CopyInResponse
-> ResponseMsgsReceived
forall a b. (a -> b) -> a -> b
$ NoData -> Either3 NoData RowDescription CopyInResponse
forall a b c. a -> Either3 a b c
Left3 NoData
msg, InternalConnectionState
st.preparedStatementNames)
(RespRowDescription RowDescription
msg, BindCompleteReceived BindComplete
_) -> (ResponseMsgsReceived, Set [Char])
-> STM (ResponseMsgsReceived, Set [Char])
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either3 NoData RowDescription CopyInResponse
-> ResponseMsgsReceived
RowDescriptionOrNoDataOrCopyInResponseReceived (Either3 NoData RowDescription CopyInResponse
-> ResponseMsgsReceived)
-> Either3 NoData RowDescription CopyInResponse
-> ResponseMsgsReceived
forall a b. (a -> b) -> a -> b
$ RowDescription -> Either3 NoData RowDescription CopyInResponse
forall a b c. b -> Either3 a b c
Middle3 RowDescription
msg, InternalConnectionState
st.preparedStatementNames)
(RespCopyInResponse CopyInResponse
msg, BindCompleteReceived BindComplete
_) -> (ResponseMsgsReceived, Set [Char])
-> STM (ResponseMsgsReceived, Set [Char])
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either3 NoData RowDescription CopyInResponse
-> ResponseMsgsReceived
RowDescriptionOrNoDataOrCopyInResponseReceived (Either3 NoData RowDescription CopyInResponse
-> ResponseMsgsReceived)
-> Either3 NoData RowDescription CopyInResponse
-> ResponseMsgsReceived
forall a b. (a -> b) -> a -> b
$ CopyInResponse -> Either3 NoData RowDescription CopyInResponse
forall a b c. c -> Either3 a b c
Right3 CopyInResponse
msg, InternalConnectionState
st.preparedStatementNames)
(RespDataRow DataRow
_, RowDescriptionOrNoDataOrCopyInResponseReceived Either3 NoData RowDescription CopyInResponse
prev) -> (ResponseMsgsReceived, Set [Char])
-> STM (ResponseMsgsReceived, Set [Char])
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either3 NoData RowDescription CopyInResponse
-> ResponseMsgsReceived
RowDescriptionOrNoDataOrCopyInResponseReceived Either3 NoData RowDescription CopyInResponse
prev, InternalConnectionState
st.preparedStatementNames)
(RespErrorResponse ErrorResponse
msg, ResponseMsgsReceived
_) -> (ResponseMsgsReceived, Set [Char])
-> STM (ResponseMsgsReceived, Set [Char])
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Either3 NoData RowDescription CopyInResponse)
-> ErrorResponse -> ResponseMsgsReceived
ErrorResponseReceived Maybe (Either3 NoData RowDescription CopyInResponse)
forall a. Maybe a
Nothing ErrorResponse
msg, InternalConnectionState
st.preparedStatementNames)
(RespCommandComplete CommandComplete
msg, RowDescriptionOrNoDataOrCopyInResponseReceived Either3 NoData RowDescription CopyInResponse
noDataRowDesc) -> (ResponseMsgsReceived, Set [Char])
-> STM (ResponseMsgsReceived, Set [Char])
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either3 NoData RowDescription CopyInResponse
-> CommandComplete -> ResponseMsgsReceived
CommandCompleteReceived Either3 NoData RowDescription CopyInResponse
noDataRowDesc CommandComplete
msg, InternalConnectionState
st.preparedStatementNames)
(RespReadyForQuery ReadyForQuery
rq, ErrorResponseReceived Maybe (Either3 NoData RowDescription CopyInResponse)
_ ErrorResponse
err) -> (ResponseMsgsReceived, Set [Char])
-> STM (ResponseMsgsReceived, Set [Char])
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either ErrorResponse CommandComplete
-> ReadyForQuery -> ResponseMsgsReceived
ReadyForQueryReceived (ErrorResponse -> Either ErrorResponse CommandComplete
forall a b. a -> Either a b
Left ErrorResponse
err) ReadyForQuery
rq, InternalConnectionState
st.preparedStatementNames)
(RespReadyForQuery ReadyForQuery
rq, CommandCompleteReceived Either3 NoData RowDescription CopyInResponse
_ CommandComplete
cmd) -> (ResponseMsgsReceived, Set [Char])
-> STM (ResponseMsgsReceived, Set [Char])
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either ErrorResponse CommandComplete
-> ReadyForQuery -> ResponseMsgsReceived
ReadyForQueryReceived (CommandComplete -> Either ErrorResponse CommandComplete
forall a b. b -> Either a b
Right CommandComplete
cmd) ReadyForQuery
rq, InternalConnectionState
st.preparedStatementNames)
(ResponseMsg
_, ResponseMsgsReceived
before) -> ByteString -> Text -> STM (ResponseMsgsReceived, Set [Char])
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement QueryState
firstPendingQry.queryText (Text -> STM (ResponseMsgsReceived, Set [Char]))
-> Text -> STM (ResponseMsgsReceived, Set [Char])
forall a b. (a -> b) -> a -> b
$ Text
"Bug in Hpgsql. Response messages in invalid order. Had " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
Text.pack (ResponseMsgsReceived -> [Char]
forall a. Show a => a -> [Char]
show ResponseMsgsReceived
before) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" and received " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
Text.pack (ResponseMsg -> [Char]
forall a. Show a => a -> [Char]
show ResponseMsg
respMsg)
let allQueries = InternalConnectionState -> [QueryState]
currentPipeline InternalConnectionState
st
STM.writeTVar sttv $
st
{ currentPipeline =
map
( \QueryState
qs ->
if QueryState -> QueryId
queryIdentifier QueryState
qs QueryId -> QueryId -> Bool
forall a. Eq a => a -> a -> Bool
== QueryId
qryId
then
QueryState
qs
{ responseMsgsState = newState
}
else QueryState
qs
)
allQueries,
preparedStatementNames = newPrepStmtNames
}
pure (Just respMsg, newState)
consumeResults ::
HPgConnection ->
QueryId ->
IO (Maybe (Either3 NoData RowDescription CopyInResponse), Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
consumeResults :: HPgConnection
-> QueryId
-> IO
(Maybe (Either3 NoData RowDescription CopyInResponse),
Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
consumeResults HPgConnection
conn QueryId
qryId = do
thisThreadId <- IO WeakThreadId
getMyWeakThreadId
let receiveUntilTimeToReceiveRows :: IO (Maybe (Either3 NoData RowDescription CopyInResponse), Either3 ErrorResponse (Maybe DataRow) CommandComplete)
receiveUntilTimeToReceiveRows = do
nextMsg <- WeakThreadId
-> HPgConnection
-> QueryId
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveOutstandingResponseMsgsAtomically WeakThreadId
thisThreadId HPgConnection
conn QueryId
qryId
case nextMsg of
(Just (RespDataRow DataRow
dr), RowDescriptionOrNoDataOrCopyInResponseReceived Either3 NoData RowDescription CopyInResponse
noDataOrRowDesc) -> (Maybe (Either3 NoData RowDescription CopyInResponse),
Either3 ErrorResponse (Maybe DataRow) CommandComplete)
-> IO
(Maybe (Either3 NoData RowDescription CopyInResponse),
Either3 ErrorResponse (Maybe DataRow) CommandComplete)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either3 NoData RowDescription CopyInResponse
-> Maybe (Either3 NoData RowDescription CopyInResponse)
forall a. a -> Maybe a
Just Either3 NoData RowDescription CopyInResponse
noDataOrRowDesc, Maybe DataRow
-> Either3 ErrorResponse (Maybe DataRow) CommandComplete
forall a b c. b -> Either3 a b c
Middle3 (Maybe DataRow
-> Either3 ErrorResponse (Maybe DataRow) CommandComplete)
-> Maybe DataRow
-> Either3 ErrorResponse (Maybe DataRow) CommandComplete
forall a b. (a -> b) -> a -> b
$ DataRow -> Maybe DataRow
forall a. a -> Maybe a
Just DataRow
dr)
(Just (RespDataRow DataRow
_), ResponseMsgsReceived
_) -> Text
-> IO
(Maybe (Either3 NoData RowDescription CopyInResponse),
Either3 ErrorResponse (Maybe DataRow) CommandComplete)
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Impossible: Got DataRow but did not have RowDescOrNoData before?"
(Maybe ResponseMsg
_, ErrorResponseReceived Maybe (Either3 NoData RowDescription CopyInResponse)
mNoDataOrRowDesc ErrorResponse
err) -> (Maybe (Either3 NoData RowDescription CopyInResponse),
Either3 ErrorResponse (Maybe DataRow) CommandComplete)
-> IO
(Maybe (Either3 NoData RowDescription CopyInResponse),
Either3 ErrorResponse (Maybe DataRow) CommandComplete)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Either3 NoData RowDescription CopyInResponse)
mNoDataOrRowDesc, ErrorResponse
-> Either3 ErrorResponse (Maybe DataRow) CommandComplete
forall a b c. a -> Either3 a b c
Left3 ErrorResponse
err)
(Maybe ResponseMsg
_, CommandCompleteReceived Either3 NoData RowDescription CopyInResponse
noDataOrRowDesc CommandComplete
cmd) -> (Maybe (Either3 NoData RowDescription CopyInResponse),
Either3 ErrorResponse (Maybe DataRow) CommandComplete)
-> IO
(Maybe (Either3 NoData RowDescription CopyInResponse),
Either3 ErrorResponse (Maybe DataRow) CommandComplete)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either3 NoData RowDescription CopyInResponse
-> Maybe (Either3 NoData RowDescription CopyInResponse)
forall a. a -> Maybe a
Just Either3 NoData RowDescription CopyInResponse
noDataOrRowDesc, CommandComplete
-> Either3 ErrorResponse (Maybe DataRow) CommandComplete
forall a b c. c -> Either3 a b c
Right3 CommandComplete
cmd)
(Maybe ResponseMsg
_, RowDescriptionOrNoDataOrCopyInResponseReceived Either3 NoData RowDescription CopyInResponse
noDataOrRowDesc) -> (Maybe (Either3 NoData RowDescription CopyInResponse),
Either3 ErrorResponse (Maybe DataRow) CommandComplete)
-> IO
(Maybe (Either3 NoData RowDescription CopyInResponse),
Either3 ErrorResponse (Maybe DataRow) CommandComplete)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either3 NoData RowDescription CopyInResponse
-> Maybe (Either3 NoData RowDescription CopyInResponse)
forall a. a -> Maybe a
Just Either3 NoData RowDescription CopyInResponse
noDataOrRowDesc, Maybe DataRow
-> Either3 ErrorResponse (Maybe DataRow) CommandComplete
forall a b c. b -> Either3 a b c
Middle3 Maybe DataRow
forall a. Maybe a
Nothing)
(Maybe ResponseMsg
_, ParseCompleteReceived ParseComplete
_) -> IO
(Maybe (Either3 NoData RowDescription CopyInResponse),
Either3 ErrorResponse (Maybe DataRow) CommandComplete)
receiveUntilTimeToReceiveRows
(Maybe ResponseMsg
_, BindCompleteReceived BindComplete
_) -> IO
(Maybe (Either3 NoData RowDescription CopyInResponse),
Either3 ErrorResponse (Maybe DataRow) CommandComplete)
receiveUntilTimeToReceiveRows
(Maybe ResponseMsg
_, ReadyForQueryReceived Either ErrorResponse CommandComplete
errOrCmd ReadyForQuery
_) -> (Maybe (Either3 NoData RowDescription CopyInResponse),
Either3 ErrorResponse (Maybe DataRow) CommandComplete)
-> IO
(Maybe (Either3 NoData RowDescription CopyInResponse),
Either3 ErrorResponse (Maybe DataRow) CommandComplete)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Either3 NoData RowDescription CopyInResponse)
forall a. Maybe a
Nothing, (ErrorResponse
-> Either3 ErrorResponse (Maybe DataRow) CommandComplete)
-> (CommandComplete
-> Either3 ErrorResponse (Maybe DataRow) CommandComplete)
-> Either ErrorResponse CommandComplete
-> Either3 ErrorResponse (Maybe DataRow) CommandComplete
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ErrorResponse
-> Either3 ErrorResponse (Maybe DataRow) CommandComplete
forall a b c. a -> Either3 a b c
Left3 CommandComplete
-> Either3 ErrorResponse (Maybe DataRow) CommandComplete
forall a b c. c -> Either3 a b c
Right3 Either ErrorResponse CommandComplete
errOrCmd)
(Maybe ResponseMsg
_, ResponseMsgsReceived
NoMsgsReceived) -> IO
(Maybe (Either3 NoData RowDescription CopyInResponse),
Either3 ErrorResponse (Maybe DataRow) CommandComplete)
receiveUntilTimeToReceiveRows
firstMsg <- receiveUntilTimeToReceiveRows
case firstMsg of
(Maybe (Either3 NoData RowDescription CopyInResponse)
mERowDesc, Left3 ErrorResponse
err) -> do
WeakThreadId -> IO ()
receiveReadyForQueryIfNecessary WeakThreadId
thisThreadId
(Maybe (Either3 NoData RowDescription CopyInResponse),
Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
-> IO
(Maybe (Either3 NoData RowDescription CopyInResponse),
Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Either3 NoData RowDescription CopyInResponse)
mERowDesc, Either ErrorResponse CommandComplete
-> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
forall a. a -> Stream (Of DataRow) IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either ErrorResponse CommandComplete
-> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
-> Either ErrorResponse CommandComplete
-> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
forall a b. (a -> b) -> a -> b
$ ErrorResponse -> Either ErrorResponse CommandComplete
forall a b. a -> Either a b
Left ErrorResponse
err)
(Maybe (Either3 NoData RowDescription CopyInResponse)
mERowDesc, Right3 CommandComplete
cmd) -> do
WeakThreadId -> IO ()
receiveReadyForQueryIfNecessary WeakThreadId
thisThreadId
(Maybe (Either3 NoData RowDescription CopyInResponse),
Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
-> IO
(Maybe (Either3 NoData RowDescription CopyInResponse),
Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Either3 NoData RowDescription CopyInResponse)
mERowDesc, Either ErrorResponse CommandComplete
-> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
forall a. a -> Stream (Of DataRow) IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either ErrorResponse CommandComplete
-> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
-> Either ErrorResponse CommandComplete
-> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
forall a b. (a -> b) -> a -> b
$ CommandComplete -> Either ErrorResponse CommandComplete
forall a b. b -> Either a b
Right CommandComplete
cmd)
(Maybe (Either3 NoData RowDescription CopyInResponse)
mERowDesc, Middle3 Maybe DataRow
mDataRow) -> do
let allOtherRows :: Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
allOtherRows =
(()
-> IO
(Either (Either ErrorResponse CommandComplete) (Of DataRow ())))
-> ()
-> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
forall (m :: * -> *) (f :: * -> *) s r.
(Monad m, Functor f) =>
(s -> m (Either r (f s))) -> s -> Stream f m r
S.unfold
( \() -> do
mRow <- HPgConnection
-> PgMsgParser DataRow
-> (Either (Char, Maybe PostgresError) DataRow
-> IO (Either (Char, Maybe PostgresError) DataRow))
-> IO (Either (Char, Maybe PostgresError) DataRow)
forall a b.
Show a =>
HPgConnection
-> PgMsgParser a
-> (Either (Char, Maybe PostgresError) a -> IO b)
-> IO b
receiveNextMsgWithMaskedContinuationButDontThrowOnParsingFailure HPgConnection
conn (forall a. FromPgMessage a => PgMsgParser a
msgParser @DataRow) Either (Char, Maybe PostgresError) DataRow
-> IO (Either (Char, Maybe PostgresError) DataRow)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
case mRow of
Right DataRow
row -> Either (Either ErrorResponse CommandComplete) (Of DataRow ())
-> IO
(Either (Either ErrorResponse CommandComplete) (Of DataRow ()))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either (Either ErrorResponse CommandComplete) (Of DataRow ())
-> IO
(Either (Either ErrorResponse CommandComplete) (Of DataRow ())))
-> Either (Either ErrorResponse CommandComplete) (Of DataRow ())
-> IO
(Either (Either ErrorResponse CommandComplete) (Of DataRow ()))
forall a b. (a -> b) -> a -> b
$ Of DataRow ()
-> Either (Either ErrorResponse CommandComplete) (Of DataRow ())
forall a b. b -> Either a b
Right (DataRow
row DataRow -> () -> Of DataRow ()
forall a b. a -> b -> Of a b
:> ())
Left (Char, Maybe PostgresError)
_ -> do
stateAfterNextMsg <- (Maybe ResponseMsg, ResponseMsgsReceived) -> ResponseMsgsReceived
forall a b. (a, b) -> b
snd ((Maybe ResponseMsg, ResponseMsgsReceived) -> ResponseMsgsReceived)
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
-> IO ResponseMsgsReceived
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> WeakThreadId
-> HPgConnection
-> QueryId
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveOutstandingResponseMsgsAtomically WeakThreadId
thisThreadId HPgConnection
conn QueryId
qryId
case stateAfterNextMsg of
ErrorResponseReceived Maybe (Either3 NoData RowDescription CopyInResponse)
_ ErrorResponse
err -> do
WeakThreadId -> IO ()
receiveReadyForQueryIfNecessary WeakThreadId
thisThreadId
Either (Either ErrorResponse CommandComplete) (Of DataRow ())
-> IO
(Either (Either ErrorResponse CommandComplete) (Of DataRow ()))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either (Either ErrorResponse CommandComplete) (Of DataRow ())
-> IO
(Either (Either ErrorResponse CommandComplete) (Of DataRow ())))
-> Either (Either ErrorResponse CommandComplete) (Of DataRow ())
-> IO
(Either (Either ErrorResponse CommandComplete) (Of DataRow ()))
forall a b. (a -> b) -> a -> b
$ Either ErrorResponse CommandComplete
-> Either (Either ErrorResponse CommandComplete) (Of DataRow ())
forall a b. a -> Either a b
Left (Either ErrorResponse CommandComplete
-> Either (Either ErrorResponse CommandComplete) (Of DataRow ()))
-> Either ErrorResponse CommandComplete
-> Either (Either ErrorResponse CommandComplete) (Of DataRow ())
forall a b. (a -> b) -> a -> b
$ ErrorResponse -> Either ErrorResponse CommandComplete
forall a b. a -> Either a b
Left ErrorResponse
err
CommandCompleteReceived Either3 NoData RowDescription CopyInResponse
_ CommandComplete
cmd -> do
WeakThreadId -> IO ()
receiveReadyForQueryIfNecessary WeakThreadId
thisThreadId
Either (Either ErrorResponse CommandComplete) (Of DataRow ())
-> IO
(Either (Either ErrorResponse CommandComplete) (Of DataRow ()))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either (Either ErrorResponse CommandComplete) (Of DataRow ())
-> IO
(Either (Either ErrorResponse CommandComplete) (Of DataRow ())))
-> Either (Either ErrorResponse CommandComplete) (Of DataRow ())
-> IO
(Either (Either ErrorResponse CommandComplete) (Of DataRow ()))
forall a b. (a -> b) -> a -> b
$ Either ErrorResponse CommandComplete
-> Either (Either ErrorResponse CommandComplete) (Of DataRow ())
forall a b. a -> Either a b
Left (Either ErrorResponse CommandComplete
-> Either (Either ErrorResponse CommandComplete) (Of DataRow ()))
-> Either ErrorResponse CommandComplete
-> Either (Either ErrorResponse CommandComplete) (Of DataRow ())
forall a b. (a -> b) -> a -> b
$ CommandComplete -> Either ErrorResponse CommandComplete
forall a b. b -> Either a b
Right CommandComplete
cmd
ReadyForQueryReceived Either ErrorResponse CommandComplete
errOrCmd ReadyForQuery
_ -> Either (Either ErrorResponse CommandComplete) (Of DataRow ())
-> IO
(Either (Either ErrorResponse CommandComplete) (Of DataRow ()))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either (Either ErrorResponse CommandComplete) (Of DataRow ())
-> IO
(Either (Either ErrorResponse CommandComplete) (Of DataRow ())))
-> Either (Either ErrorResponse CommandComplete) (Of DataRow ())
-> IO
(Either (Either ErrorResponse CommandComplete) (Of DataRow ()))
forall a b. (a -> b) -> a -> b
$ Either ErrorResponse CommandComplete
-> Either (Either ErrorResponse CommandComplete) (Of DataRow ())
forall a b. a -> Either a b
Left Either ErrorResponse CommandComplete
errOrCmd
ResponseMsgsReceived
_ -> Text
-> IO
(Either (Either ErrorResponse CommandComplete) (Of DataRow ()))
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Internal error in Hpgsql. After a DataRow we should get either an ErrorResponse or a CommandComplete message"
)
()
finalStream :: Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
finalStream = case Maybe DataRow
mDataRow of
Maybe DataRow
Nothing -> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
allOtherRows
Just DataRow
dr ->
Of
DataRow
(Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
-> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
forall (f :: * -> *) (m :: * -> *) r.
f (Stream f m r) -> Stream f m r
SInternal.Step (Of
DataRow
(Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
-> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
-> Of
DataRow
(Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
-> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
forall a b. (a -> b) -> a -> b
$
DataRow
dr DataRow
-> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
-> Of
DataRow
(Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
forall a b. a -> b -> Of a b
:> Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
allOtherRows
(Maybe (Either3 NoData RowDescription CopyInResponse),
Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
-> IO
(Maybe (Either3 NoData RowDescription CopyInResponse),
Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Either3 NoData RowDescription CopyInResponse)
mERowDesc, Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
finalStream)
where
receiveReadyForQueryIfNecessary :: WeakThreadId -> IO ()
receiveReadyForQueryIfNecessary :: WeakThreadId -> IO ()
receiveReadyForQueryIfNecessary WeakThreadId
thisThreadId = IO (Maybe ResponseMsg, ResponseMsgsReceived) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ResponseMsg, ResponseMsgsReceived) -> IO ())
-> IO (Maybe ResponseMsg, ResponseMsgsReceived) -> IO ()
forall a b. (a -> b) -> a -> b
$ WeakThreadId
-> HPgConnection
-> QueryId
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveOutstandingResponseMsgsAtomically WeakThreadId
thisThreadId HPgConnection
conn QueryId
qryId
mkPostgresError :: ByteString -> ErrorResponse -> PostgresError
mkPostgresError :: ByteString -> ErrorResponse -> PostgresError
mkPostgresError ByteString
stmtText (ErrorResponse Map ErrorDetail ByteString
errDetailMap) = PostgresError {pgErrorDetails :: Map ErrorDetail ByteString
pgErrorDetails = Map ErrorDetail ByteString
errDetailMap, failedStatement :: ByteString
failedStatement = ByteString
stmtText}
throwPostgresError :: ByteString -> ErrorResponse -> IO a
throwPostgresError :: forall a. ByteString -> ErrorResponse -> IO a
throwPostgresError ByteString
stmtText ErrorResponse
errResp = PostgresError -> IO a
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throw (PostgresError -> IO a) -> PostgresError -> IO a
forall a b. (a -> b) -> a -> b
$ ByteString -> ErrorResponse -> PostgresError
mkPostgresError ByteString
stmtText ErrorResponse
errResp
throwIrrecoverableErrorWithStatement :: (MonadThrow m) => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement :: forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
stmtText Text
errMsg = IrrecoverableHpgsqlError -> m a
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throw (IrrecoverableHpgsqlError -> m a)
-> IrrecoverableHpgsqlError -> m a
forall a b. (a -> b) -> a -> b
$ IrrecoverableHpgsqlError {hpgsqlDetails :: Text
hpgsqlDetails = Text
errMsg, innerException :: Maybe SomeException
innerException = Maybe SomeException
forall a. Maybe a
Nothing, relatedStatement :: Maybe ByteString
relatedStatement = ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
stmtText}
lookupQueryText :: HPgConnection -> QueryId -> IO ByteString
lookupQueryText :: HPgConnection -> QueryId -> IO ByteString
lookupQueryText HPgConnection
conn QueryId
qryId = STM ByteString -> IO ByteString
forall a. STM a -> IO a
STM.atomically (STM ByteString -> IO ByteString)
-> STM ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ do
st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar (HPgConnection -> TVar InternalConnectionState
internalConnectionState HPgConnection
conn)
pure $ maybe "" queryText $ List.find ((== qryId) . queryIdentifier) (currentPipeline st)
execute :: HPgConnection -> Query -> IO Int64
execute :: HPgConnection -> Query -> IO Int64
execute HPgConnection
conn Query
qry = IO (IO Int64) -> IO Int64
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO Int64) -> IO Int64) -> IO (IO Int64) -> IO Int64
forall a b. (a -> b) -> a -> b
$ HPgConnection -> Pipeline (IO Int64) -> IO (IO Int64)
forall a. HPgConnection -> Pipeline a -> IO a
runPipeline HPgConnection
conn (Query -> Pipeline (IO Int64)
pipelineExec Query
qry)
execute_ :: HPgConnection -> Query -> IO ()
execute_ :: HPgConnection -> Query -> IO ()
execute_ HPgConnection
conn Query
qry = IO Int64 -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int64 -> IO ()) -> IO Int64 -> IO ()
forall a b. (a -> b) -> a -> b
$ HPgConnection -> Query -> IO Int64
execute HPgConnection
conn Query
qry
consumeResultsIgnoreRows :: HPgConnection -> QueryId -> IO Int64
consumeResultsIgnoreRows :: HPgConnection -> QueryId -> IO Int64
consumeResultsIgnoreRows HPgConnection
conn QueryId
qryId = do
qText <- HPgConnection -> QueryId -> IO ByteString
lookupQueryText HPgConnection
conn QueryId
qryId
(_mRowDesc, resultsStream) <- consumeResults conn qryId
results <- S.effects resultsStream
case results of
Left ErrorResponse
err -> ByteString -> ErrorResponse -> IO Int64
forall a. ByteString -> ErrorResponse -> IO a
throwPostgresError ByteString
qText ErrorResponse
err
Right (CommandComplete Int64
n) -> Int64 -> IO Int64
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
n
queryS :: (FromPgRow a) => HPgConnection -> Query -> IO (Stream (Of a) IO ())
queryS :: forall a.
FromPgRow a =>
HPgConnection -> Query -> IO (Stream (Of a) IO ())
queryS = RowDecoder a -> HPgConnection -> Query -> IO (Stream (Of a) IO ())
forall a.
RowDecoder a -> HPgConnection -> Query -> IO (Stream (Of a) IO ())
querySWith RowDecoder a
forall a. FromPgRow a => RowDecoder a
rowDecoder
querySWith :: RowDecoder a -> HPgConnection -> Query -> IO (Stream (Of a) IO ())
querySWith :: forall a.
RowDecoder a -> HPgConnection -> Query -> IO (Stream (Of a) IO ())
querySWith RowDecoder a
rparser HPgConnection
conn Query
qry = IO (IO (Stream (Of a) IO ())) -> IO (Stream (Of a) IO ())
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO (Stream (Of a) IO ())) -> IO (Stream (Of a) IO ()))
-> IO (IO (Stream (Of a) IO ())) -> IO (Stream (Of a) IO ())
forall a b. (a -> b) -> a -> b
$ HPgConnection
-> Pipeline (IO (Stream (Of a) IO ()))
-> IO (IO (Stream (Of a) IO ()))
forall a. HPgConnection -> Pipeline a -> IO a
runPipeline HPgConnection
conn (Pipeline (IO (Stream (Of a) IO ()))
-> IO (IO (Stream (Of a) IO ())))
-> Pipeline (IO (Stream (Of a) IO ()))
-> IO (IO (Stream (Of a) IO ()))
forall a b. (a -> b) -> a -> b
$ RowDecoder a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
forall a.
RowDecoder a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
pipelineSWith RowDecoder a
rparser Query
qry
querySMWith :: RowDecoderMonadic a -> HPgConnection -> Query -> IO (Stream (Of a) IO ())
querySMWith :: forall a.
RowDecoderMonadic a
-> HPgConnection -> Query -> IO (Stream (Of a) IO ())
querySMWith RowDecoderMonadic a
rparser HPgConnection
conn Query
qry = IO (IO (Stream (Of a) IO ())) -> IO (Stream (Of a) IO ())
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO (Stream (Of a) IO ())) -> IO (Stream (Of a) IO ()))
-> IO (IO (Stream (Of a) IO ())) -> IO (Stream (Of a) IO ())
forall a b. (a -> b) -> a -> b
$ HPgConnection
-> Pipeline (IO (Stream (Of a) IO ()))
-> IO (IO (Stream (Of a) IO ()))
forall a. HPgConnection -> Pipeline a -> IO a
runPipeline HPgConnection
conn (Pipeline (IO (Stream (Of a) IO ()))
-> IO (IO (Stream (Of a) IO ())))
-> Pipeline (IO (Stream (Of a) IO ()))
-> IO (IO (Stream (Of a) IO ()))
forall a b. (a -> b) -> a -> b
$ RowDecoderMonadic a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
forall a.
RowDecoderMonadic a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
pipelineSMWith RowDecoderMonadic a
rparser Query
qry
sendPipeline :: HPgConnection -> NonEmpty (ByteString, QueryProtocol, Maybe String) -> (InternalConnectionState -> EncodingContext -> [SomeMessage]) -> STM () -> (NonEmpty QueryId -> IO a) -> IO a
sendPipeline :: forall a.
HPgConnection
-> NonEmpty (ByteString, QueryProtocol, Maybe [Char])
-> (InternalConnectionState -> EncodingContext -> [SomeMessage])
-> STM ()
-> (NonEmpty QueryId -> IO a)
-> IO a
sendPipeline HPgConnection
conn NonEmpty (ByteString, QueryProtocol, Maybe [Char])
queriesBeingSent InternalConnectionState -> EncodingContext -> [SomeMessage]
allMsgs STM ()
onMsgsSentTxn NonEmpty QueryId -> IO a
continuation = do
thisWeakThreadId <- IO WeakThreadId
getMyWeakThreadId
qryIds <- waitUntilPipelineIsReadyForNewQuery conn (getUniqueQueryStatesForNewPipeline queriesBeingSent) $ \(QueryId
nextId, QueryId
lastId) -> do
let newPipelineList :: [QueryState]
newPipelineList = (QueryId
-> (ByteString, QueryProtocol, Maybe [Char]) -> QueryState)
-> [QueryId]
-> [(ByteString, QueryProtocol, Maybe [Char])]
-> [QueryState]
forall a b c. (a -> b -> c) -> [a] -> [b] -> [c]
zipWith (\QueryId
queryIdentifier (ByteString
queryText, QueryProtocol
queryProtocol, Maybe [Char]
queryPrepStmtName) -> QueryState {QueryId
queryIdentifier :: QueryId
queryIdentifier :: QueryId
queryIdentifier, ByteString
queryText :: ByteString
queryText :: ByteString
queryText, queryOwner :: WeakThreadId
queryOwner = WeakThreadId
thisWeakThreadId, QueryProtocol
queryProtocol :: QueryProtocol
queryProtocol :: QueryProtocol
queryProtocol, Maybe [Char]
queryPrepStmtName :: Maybe [Char]
queryPrepStmtName :: Maybe [Char]
queryPrepStmtName, responseMsgsState :: ResponseMsgsReceived
responseMsgsState = ResponseMsgsReceived
NoMsgsReceived}) [QueryId
nextId .. QueryId
lastId] (NonEmpty (ByteString, QueryProtocol, Maybe [Char])
-> [(ByteString, QueryProtocol, Maybe [Char])]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty (ByteString, QueryProtocol, Maybe [Char])
queriesBeingSent)
case [QueryState] -> Maybe (NonEmpty QueryState)
forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty [QueryState]
newPipelineList of
Maybe (NonEmpty QueryState)
Nothing -> Text -> IO (NonEmpty QueryId)
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Bug in Hpgsql: empty newPipeline to be sent"
Just NonEmpty QueryState
newPipeline -> do
sttv <- STM InternalConnectionState -> IO InternalConnectionState
forall a. STM a -> IO a
STM.atomically (STM InternalConnectionState -> IO InternalConnectionState)
-> STM InternalConnectionState -> IO InternalConnectionState
forall a b. (a -> b) -> a -> b
$ TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar HPgConnection
conn.internalConnectionState
encCtx <- readMVar conn.encodingContext
atomicallySendControlMsgs_
conn
( allMsgs sttv encCtx,
do
st <- STM.readTVar (internalConnectionState conn)
(_, txnStatusRightBeforeSendingPipeline) <- fullTransactionStatus (internalConnectionState conn)
STM.writeTVar (internalConnectionState conn) $ st {currentPipeline = NE.toList newPipeline, transactionStatusBeforeCurrentPipeline = txnStatusRightBeforeSendingPipeline}
onMsgsSentTxn
)
debugPrint $ "+++ Sent QueryIds " ++ show [nextId .. lastId]
pure $ fmap queryIdentifier newPipeline
continuation qryIds
where
getUniqueQueryStatesForNewPipeline :: NonEmpty (ByteString, QueryProtocol, Maybe String) -> STM (QueryId, QueryId)
getUniqueQueryStatesForNewPipeline :: NonEmpty (ByteString, QueryProtocol, Maybe [Char])
-> STM (QueryId, QueryId)
getUniqueQueryStatesForNewPipeline (((ByteString, QueryProtocol, Maybe [Char]) -> QueryProtocol)
-> NonEmpty (ByteString, QueryProtocol, Maybe [Char])
-> NonEmpty QueryProtocol
forall a b. (a -> b) -> NonEmpty a -> NonEmpty b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\(ByteString
_, QueryProtocol
proto, Maybe [Char]
_) -> QueryProtocol
proto) -> NonEmpty QueryProtocol
qryprotos) = do
let sttv :: TVar InternalConnectionState
sttv = HPgConnection -> TVar InternalConnectionState
internalConnectionState HPgConnection
conn
st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
sttv
when (isLeft $ connectionReadyForNewPipeline st) $ throwIrrecoverableError "Bug in Hpgsql: the connection should be ready for a new pipeline due to loopUntilNoPipeline"
let nextId = Integer -> QueryId
QueryId (Integer -> QueryId) -> Integer -> QueryId
forall a b. (a -> b) -> a -> b
$ InternalConnectionState -> Integer
totalQueriesSent InternalConnectionState
st
lastId = QueryId
nextId QueryId -> QueryId -> QueryId
forall a. Num a => a -> a -> a
+ Int -> QueryId
forall a b. (Integral a, Num b) => a -> b
fromIntegral (NonEmpty QueryProtocol -> Int
forall a. NonEmpty a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length NonEmpty QueryProtocol
qryprotos) QueryId -> QueryId -> QueryId
forall a. Num a => a -> a -> a
- QueryId
1
STM.writeTVar sttv (st {totalQueriesSent = totalQueriesSent st + fromIntegral (length qryprotos)})
pure (nextId, lastId)
waitUntilPipelineIsReadyForNewQuery :: forall a b. HPgConnection -> STM a -> (a -> IO b) -> IO b
waitUntilPipelineIsReadyForNewQuery :: forall a b. HPgConnection -> STM a -> (a -> IO b) -> IO b
waitUntilPipelineIsReadyForNewQuery HPgConnection
conn STM a
lockAcquireStm a -> IO b
f = do
thisWeakThreadId <- IO WeakThreadId
getMyWeakThreadId
queriesToDrain <- acquireOwnershipOfOrphanedQueries conn
withControlMsgsLock
conn
( \TVar InternalConnectionState
sttv -> do
st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
sttv
(txnStatusBeforePipeline, _) <- fullTransactionStatus sttv
pure
( txnStatusBeforePipeline,
st.mustIssueRollbackBeforeNextCommand,
case st.currentPipeline of
[QueryState {queryProtocol :: QueryState -> QueryProtocol
queryProtocol = CopyQuery CopyQueryState
_, ByteString
queryText :: QueryState -> ByteString
queryText :: ByteString
queryText}] -> ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
queryText
[QueryState]
_ -> Maybe ByteString
forall a. Maybe a
Nothing
)
)
(const $ pure ())
$ \(TransactionStatus
txnStatusBeforePipeline, Bool
mustIssueRollback, Maybe ByteString
isCopyCommand) -> do
Maybe ByteString -> (ByteString -> IO ()) -> IO ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
whenJust Maybe ByteString
isCopyCommand ((ByteString -> IO ()) -> IO ()) -> (ByteString -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ByteString
copyCmd ->
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not ([QueryId] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [QueryId]
queriesToDrain) Bool -> Bool -> Bool
&& TransactionStatus
txnStatusBeforePipeline TransactionStatus -> TransactionStatus -> Bool
forall a. Eq a => a -> a -> Bool
== TransactionStatus
TransInTrans) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Text -> IO ()
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
copyCmd Text
"Hpgsql cannot resume execution from an interrupted COPY statement inside a transaction, because cancelling COPY would leave the transaction in an error state and completing it could partially complete it. There is no semantics preserving action possible."
let onlyDrainNotCancel :: Bool
onlyDrainNotCancel = TransactionStatus
txnStatusBeforePipeline TransactionStatus -> TransactionStatus -> Bool
forall a. Eq a => a -> a -> Bool
== TransactionStatus
TransInTrans Bool -> Bool -> Bool
&& Bool -> Bool
not Bool
mustIssueRollback
HPgConnection -> Bool -> IO ()
cancelActiveStatement HPgConnection
conn Bool
onlyDrainNotCancel
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
mustIssueRollback Bool -> Bool -> Bool
&& Bool -> Bool
not ([QueryId] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [QueryId]
queriesToDrain)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
[Char] -> IO ()
debugPrint [Char]
"Executing ROLLBACK now that the pipeline is clear."
IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ()) -> IO (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ HPgConnection -> Pipeline (IO ()) -> STM () -> IO (IO ())
forall a. HPgConnection -> Pipeline a -> STM () -> IO a
runPipelineInternal HPgConnection
conn (Query -> Pipeline (IO ())
pipelineExec_ Query
"ROLLBACK") (STM () -> IO (IO ())) -> STM () -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ do
st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar HPgConnection
conn.internalConnectionState
STM.writeTVar conn.internalConnectionState st {mustIssueRollbackBeforeNextCommand = False}
retOrRepeat <- withControlMsgsLock
conn
( \TVar InternalConnectionState
sttv -> do
st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
sttv
case connectionReadyForNewPipeline st of
Left NonEmpty QueryState
p -> Either (NonEmpty QueryState) a
-> STM (Either (NonEmpty QueryState) a)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either (NonEmpty QueryState) a
-> STM (Either (NonEmpty QueryState) a))
-> Either (NonEmpty QueryState) a
-> STM (Either (NonEmpty QueryState) a)
forall a b. (a -> b) -> a -> b
$ NonEmpty QueryState -> Either (NonEmpty QueryState) a
forall a b. a -> Either a b
Left NonEmpty QueryState
p
Right TransactionStatus
_ -> a -> Either (NonEmpty QueryState) a
forall a b. b -> Either a b
Right (a -> Either (NonEmpty QueryState) a)
-> STM a -> STM (Either (NonEmpty QueryState) a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM a
lockAcquireStm
)
(const $ pure ())
$ \case
Left (QueryState {WeakThreadId
queryOwner :: QueryState -> WeakThreadId
queryOwner :: WeakThreadId
queryOwner} :| [QueryState]
_) -> Either WeakThreadId b -> IO (Either WeakThreadId b)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either WeakThreadId b -> IO (Either WeakThreadId b))
-> Either WeakThreadId b -> IO (Either WeakThreadId b)
forall a b. (a -> b) -> a -> b
$ WeakThreadId -> Either WeakThreadId b
forall a b. a -> Either a b
Left WeakThreadId
queryOwner
Right a
acq -> do
[Char] -> IO ()
debugPrint [Char]
"+++ No active pipeline found"
b -> Either WeakThreadId b
forall a b. b -> Either a b
Right (b -> Either WeakThreadId b) -> IO b -> IO (Either WeakThreadId b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> a -> IO b
f a
acq
case retOrRepeat of
Left WeakThreadId
existingPipelineOwnerThread -> do
let intervalMs :: Int
intervalMs = ConnectOpts -> Int
killedThreadPollIntervalMs (ConnectOpts -> Int) -> ConnectOpts -> Int
forall a b. (a -> b) -> a -> b
$ HPgConnection -> ConnectOpts
connOpts HPgConnection
conn
[Char] -> IO ()
debugPrint ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"There is a pipeline owned by a different thread (" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ WeakThreadId -> [Char]
forall a. Show a => a -> [Char]
show WeakThreadId
existingPipelineOwnerThread [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
") so we (" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ WeakThreadId -> [Char]
forall a. Show a => a -> [Char]
show WeakThreadId
thisWeakThreadId [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
") will try again in " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
intervalMs [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"ms. Pipeline contains: "
IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
intervalMs) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar (HPgConnection -> TVar InternalConnectionState
internalConnectionState HPgConnection
conn)
when (isLeft $ connectionReadyForNewPipeline st) STM.retry
HPgConnection -> STM a -> (a -> IO b) -> IO b
forall a b. HPgConnection -> STM a -> (a -> IO b) -> IO b
waitUntilPipelineIsReadyForNewQuery HPgConnection
conn STM a
lockAcquireStm a -> IO b
f
Right b
ret -> b -> IO b
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
ret
cancelActiveStatement ::
HPgConnection ->
Bool ->
IO ()
cancelActiveStatement :: HPgConnection -> Bool -> IO ()
cancelActiveStatement conn :: HPgConnection
conn@HPgConnection {ConnectOpts
connOpts :: HPgConnection -> ConnectOpts
connOpts :: ConnectOpts
connOpts} Bool
onlyDrainNotCancel = do
queriesToDrain <- HPgConnection -> IO [QueryId]
acquireOwnershipOfOrphanedQueries HPgConnection
conn
unless (null queriesToDrain) $ do
debugPrint $ "Going to take control-msg lock to drain " ++ show queriesToDrain
withControlMsgsLock conn (const $ pure ()) (const $ pure ()) $ \() -> do
[Char] -> IO ()
debugPrint [Char]
"Cancelling active pipeline to drain."
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
onlyDrainNotCancel (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ HPgConnection -> IO ()
sendCancellationRequest HPgConnection
conn
let drainUntilError :: [QueryId] -> IO ()
drainUntilError [] = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
drainUntilError (QueryId
q : [QueryId]
qs) = do
(_, res) <- HPgConnection
-> QueryId
-> IO
(Maybe (Either3 NoData RowDescription CopyInResponse),
Stream (Of DataRow) IO (Either ErrorResponse CommandComplete))
consumeResults HPgConnection
conn QueryId
q
eErrorOrCmdComplete <- S.effects res
case eErrorOrCmdComplete of
Right CommandComplete
_cmdComplete -> [QueryId] -> IO ()
drainUntilError [QueryId]
qs
Left ErrorResponse
_err -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
alternateDrainingWithCancelReqs :: [QueryId] -> IO ()
alternateDrainingWithCancelReqs [QueryId]
qs = do
drained <- Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* ConnectOpts -> Int
cancellationRequestResendIntervalMs ConnectOpts
connOpts) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ [QueryId] -> IO ()
drainUntilError [QueryId]
qs
case drained of
Just () -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Maybe ()
Nothing -> do
[Char] -> IO ()
debugPrint ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Sending another cancellation request as orphaned pipeline still not completely drained: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [QueryId] -> [Char]
forall a. Show a => a -> [Char]
show [QueryId]
queriesToDrain
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
onlyDrainNotCancel (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ HPgConnection -> IO ()
sendCancellationRequest HPgConnection
conn
leftToDrain <- HPgConnection -> IO [QueryId]
acquireOwnershipOfOrphanedQueries HPgConnection
conn
alternateDrainingWithCancelReqs leftToDrain
[QueryId] -> IO ()
alternateDrainingWithCancelReqs [QueryId]
queriesToDrain
acquireOwnershipOfOrphanedQueries :: HPgConnection -> IO [QueryId]
acquireOwnershipOfOrphanedQueries :: HPgConnection -> IO [QueryId]
acquireOwnershipOfOrphanedQueries HPgConnection
conn = do
thisThreadId <- IO WeakThreadId
getMyWeakThreadId
debugPrint $ "+++ I am " ++ show thisThreadId ++ " and will look for orphaned queries to drain"
withControlMsgsLock
conn
STM.readTVar
(const $ pure ())
$ \InternalConnectionState
st -> do
if Either (NonEmpty QueryState) TransactionStatus -> Bool
forall a b. Either a b -> Bool
isRight (InternalConnectionState
-> Either (NonEmpty QueryState) TransactionStatus
connectionReadyForNewPipeline InternalConnectionState
st)
then [QueryId] -> IO [QueryId]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
else do
let activeQueries :: [QueryState]
activeQueries = InternalConnectionState -> [QueryState]
currentPipeline InternalConnectionState
st
mustTakeOwnership <- ([Bool] -> Bool) -> IO [Bool] -> IO Bool
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Bool -> Bool -> Bool) -> Bool -> [Bool] -> Bool
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
List.foldl' Bool -> Bool -> Bool
(||) Bool
False) (IO [Bool] -> IO Bool) -> IO [Bool] -> IO Bool
forall a b. (a -> b) -> a -> b
$ [QueryState] -> (QueryState -> IO Bool) -> IO [Bool]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [QueryState]
activeQueries ((QueryState -> IO Bool) -> IO [Bool])
-> (QueryState -> IO Bool) -> IO [Bool]
forall a b. (a -> b) -> a -> b
$ \QueryState {WeakThreadId
queryOwner :: QueryState -> WeakThreadId
queryOwner :: WeakThreadId
queryOwner} ->
if WeakThreadId
queryOwner WeakThreadId -> WeakThreadId -> Bool
forall a. Eq a => a -> a -> Bool
== WeakThreadId
thisThreadId then Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True else WeakThreadId -> IO Bool
threadDoesNotExist WeakThreadId
queryOwner
if mustTakeOwnership
then do
STM.atomically $ STM.writeTVar (internalConnectionState conn) $ st {currentPipeline = map (\QueryState
qs -> QueryState
qs {queryOwner = thisThreadId}) $ currentPipeline st}
let owner = (QueryState -> WeakThreadId) -> [QueryState] -> [WeakThreadId]
forall a b. (a -> b) -> [a] -> [b]
map QueryState -> WeakThreadId
queryOwner [QueryState]
activeQueries
debugPrint $ "We (" ++ show thisThreadId ++ ") took ownership of the pipeline orphaned by " ++ show owner
pure $ map queryIdentifier activeQueries
else pure []
where
threadDoesNotExist :: WeakThreadId -> IO Bool
threadDoesNotExist :: WeakThreadId -> IO Bool
threadDoesNotExist (WeakThreadId Weak ThreadId
wtid Word64
_) =
Weak ThreadId -> IO (Maybe ThreadId)
forall v. Weak v -> IO (Maybe v)
deRefWeak Weak ThreadId
wtid IO (Maybe ThreadId) -> (Maybe ThreadId -> IO Bool) -> IO Bool
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe ThreadId
Nothing -> Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
Just ThreadId
tid -> (ThreadStatus -> [ThreadStatus] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [ThreadStatus
ThreadDied, ThreadStatus
ThreadFinished]) (ThreadStatus -> Bool) -> IO ThreadStatus -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ThreadId -> IO ThreadStatus
threadStatus ThreadId
tid
pipelineS :: (FromPgRow a) => Query -> Pipeline (IO (Stream (Of a) IO ()))
pipelineS :: forall a.
FromPgRow a =>
Query -> Pipeline (IO (Stream (Of a) IO ()))
pipelineS = RowDecoder a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
forall a.
RowDecoder a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
pipelineSWith RowDecoder a
forall a. FromPgRow a => RowDecoder a
rowDecoder
pipelineSWith :: RowDecoder a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
pipelineSWith :: forall a.
RowDecoder a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
pipelineSWith rowparser :: RowDecoder a
rowparser@(RowDecoder [FieldInfo] -> Parser a
_ [FieldInfo] -> [(FieldInfo, Bool)]
_ Int
expectedColFmts) (NonEmpty SingleQuery -> ([SingleQuery], SingleQuery)
forall a. NonEmpty a -> ([a], a)
lastAndInitNE (NonEmpty SingleQuery -> ([SingleQuery], SingleQuery))
-> (Query -> NonEmpty SingleQuery)
-> Query
-> ([SingleQuery], SingleQuery)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Query -> NonEmpty SingleQuery
breakQueryIntoStatements -> ([SingleQuery]
firstQueriesToSend, SingleQuery
lastQueryToSend)) =
[(SingleQuery, Maybe Int)]
-> (HPgConnection -> [QueryId] -> IO (Stream (Of a) IO ()))
-> Pipeline (IO (Stream (Of a) IO ()))
forall a.
[(SingleQuery, Maybe Int)]
-> (HPgConnection -> [QueryId] -> a) -> Pipeline a
Pipeline
((SingleQuery -> (SingleQuery, Maybe Int))
-> [SingleQuery] -> [(SingleQuery, Maybe Int)]
forall a b. (a -> b) -> [a] -> [b]
map (,Maybe Int
forall a. Maybe a
Nothing) [SingleQuery]
firstQueriesToSend [(SingleQuery, Maybe Int)]
-> [(SingleQuery, Maybe Int)] -> [(SingleQuery, Maybe Int)]
forall a. [a] -> [a] -> [a]
++ [(SingleQuery
lastQueryToSend, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
expectedColFmts)])
( \HPgConnection
conn [QueryId]
qryIds -> do
case [QueryId] -> ([QueryId], Maybe QueryId)
forall a. [a] -> ([a], Maybe a)
lastAndInit [QueryId]
qryIds of
([QueryId]
firstQueries, Maybe QueryId
mLastQry) -> do
[QueryId] -> (QueryId -> IO Int64) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [QueryId]
firstQueries ((QueryId -> IO Int64) -> IO ()) -> (QueryId -> IO Int64) -> IO ()
forall a b. (a -> b) -> a -> b
$ HPgConnection -> QueryId -> IO Int64
consumeResultsIgnoreRows HPgConnection
conn
Stream (Of a) IO () -> IO (Stream (Of a) IO ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Stream (Of a) IO () -> IO (Stream (Of a) IO ()))
-> Stream (Of a) IO () -> IO (Stream (Of a) IO ())
forall a b. (a -> b) -> a -> b
$ WhichRowDecoder a
-> HPgConnection -> QueryId -> Stream (Of a) IO ()
forall a.
WhichRowDecoder a
-> HPgConnection -> QueryId -> Stream (Of a) IO ()
consumeStreamingResults (RowDecoder a -> WhichRowDecoder a
forall a. RowDecoder a -> WhichRowDecoder a
ApplicativeRowDecoder RowDecoder a
rowparser) HPgConnection
conn (QueryId -> Maybe QueryId -> QueryId
forall a. a -> Maybe a -> a
fromMaybe ([Char] -> QueryId
forall a. HasCallStack => [Char] -> a
error [Char]
"pipelineS internal bug: no mLastQry") Maybe QueryId
mLastQry)
)
pipelineSMWith :: RowDecoderMonadic a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
pipelineSMWith :: forall a.
RowDecoderMonadic a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
pipelineSMWith RowDecoderMonadic a
rowparser (NonEmpty SingleQuery -> ([SingleQuery], SingleQuery)
forall a. NonEmpty a -> ([a], a)
lastAndInitNE (NonEmpty SingleQuery -> ([SingleQuery], SingleQuery))
-> (Query -> NonEmpty SingleQuery)
-> Query
-> ([SingleQuery], SingleQuery)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Query -> NonEmpty SingleQuery
breakQueryIntoStatements -> ([SingleQuery]
firstQueriesToSend, SingleQuery
lastQueryToSend)) =
[(SingleQuery, Maybe Int)]
-> (HPgConnection -> [QueryId] -> IO (Stream (Of a) IO ()))
-> Pipeline (IO (Stream (Of a) IO ()))
forall a.
[(SingleQuery, Maybe Int)]
-> (HPgConnection -> [QueryId] -> a) -> Pipeline a
Pipeline
((SingleQuery -> (SingleQuery, Maybe Int))
-> [SingleQuery] -> [(SingleQuery, Maybe Int)]
forall a b. (a -> b) -> [a] -> [b]
map (,Maybe Int
forall a. Maybe a
Nothing) [SingleQuery]
firstQueriesToSend [(SingleQuery, Maybe Int)]
-> [(SingleQuery, Maybe Int)] -> [(SingleQuery, Maybe Int)]
forall a. [a] -> [a] -> [a]
++ [(SingleQuery
lastQueryToSend, Maybe Int
forall a. Maybe a
Nothing)])
( \HPgConnection
conn [QueryId]
qryIds -> do
case [QueryId] -> ([QueryId], Maybe QueryId)
forall a. [a] -> ([a], Maybe a)
lastAndInit [QueryId]
qryIds of
([QueryId]
firstQueries, Maybe QueryId
mLastQry) -> do
[QueryId] -> (QueryId -> IO Int64) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [QueryId]
firstQueries ((QueryId -> IO Int64) -> IO ()) -> (QueryId -> IO Int64) -> IO ()
forall a b. (a -> b) -> a -> b
$ HPgConnection -> QueryId -> IO Int64
consumeResultsIgnoreRows HPgConnection
conn
Stream (Of a) IO () -> IO (Stream (Of a) IO ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Stream (Of a) IO () -> IO (Stream (Of a) IO ()))
-> Stream (Of a) IO () -> IO (Stream (Of a) IO ())
forall a b. (a -> b) -> a -> b
$ WhichRowDecoder a
-> HPgConnection -> QueryId -> Stream (Of a) IO ()
forall a.
WhichRowDecoder a
-> HPgConnection -> QueryId -> Stream (Of a) IO ()
consumeStreamingResults (RowDecoderMonadic a -> WhichRowDecoder a
forall a. RowDecoderMonadic a -> WhichRowDecoder a
MonadicRowDecoder RowDecoderMonadic a
rowparser) HPgConnection
conn (QueryId -> Maybe QueryId -> QueryId
forall a. a -> Maybe a -> a
fromMaybe ([Char] -> QueryId
forall a. HasCallStack => [Char] -> a
error [Char]
"pipelineSMWith internal bug: no mLastQry") Maybe QueryId
mLastQry)
)
pipeline :: (FromPgRow a) => Query -> Pipeline (IO [a])
pipeline :: forall a. FromPgRow a => Query -> Pipeline (IO [a])
pipeline = RowDecoder a -> Query -> Pipeline (IO [a])
forall a. RowDecoder a -> Query -> Pipeline (IO [a])
pipelineWith RowDecoder a
forall a. FromPgRow a => RowDecoder a
rowDecoder
pipelineWith :: RowDecoder a -> Query -> Pipeline (IO [a])
pipelineWith :: forall a. RowDecoder a -> Query -> Pipeline (IO [a])
pipelineWith RowDecoder a
rowparser Query
q = (Stream (Of a) IO () -> IO [a]
forall (m :: * -> *) a r. Monad m => Stream (Of a) m r -> m [a]
S.toList_ (Stream (Of a) IO () -> IO [a])
-> IO (Stream (Of a) IO ()) -> IO [a]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) (IO (Stream (Of a) IO ()) -> IO [a])
-> Pipeline (IO (Stream (Of a) IO ())) -> Pipeline (IO [a])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RowDecoder a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
forall a.
RowDecoder a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
pipelineSWith RowDecoder a
rowparser Query
q
pipelineM :: RowDecoderMonadic a -> Query -> Pipeline (IO [a])
pipelineM :: forall a. RowDecoderMonadic a -> Query -> Pipeline (IO [a])
pipelineM RowDecoderMonadic a
rowparser Query
q = (Stream (Of a) IO () -> IO [a]
forall (m :: * -> *) a r. Monad m => Stream (Of a) m r -> m [a]
S.toList_ (Stream (Of a) IO () -> IO [a])
-> IO (Stream (Of a) IO ()) -> IO [a]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) (IO (Stream (Of a) IO ()) -> IO [a])
-> Pipeline (IO (Stream (Of a) IO ())) -> Pipeline (IO [a])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RowDecoderMonadic a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
forall a.
RowDecoderMonadic a -> Query -> Pipeline (IO (Stream (Of a) IO ()))
pipelineSMWith RowDecoderMonadic a
rowparser Query
q
pipeline1 :: (FromPgRow a) => Query -> Pipeline (IO a)
pipeline1 :: forall a. FromPgRow a => Query -> Pipeline (IO a)
pipeline1 = RowDecoder a -> Query -> Pipeline (IO a)
forall a. RowDecoder a -> Query -> Pipeline (IO a)
pipeline1With RowDecoder a
forall a. FromPgRow a => RowDecoder a
rowDecoder
pipeline1With :: RowDecoder a -> Query -> Pipeline (IO a)
pipeline1With :: forall a. RowDecoder a -> Query -> Pipeline (IO a)
pipeline1With RowDecoder a
rowparser Query
q = ([a] -> IO a
toSingleRow ([a] -> IO a) -> IO [a] -> IO a
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) (IO [a] -> IO a) -> Pipeline (IO [a]) -> Pipeline (IO a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RowDecoder a -> Query -> Pipeline (IO [a])
forall a. RowDecoder a -> Query -> Pipeline (IO [a])
pipelineWith RowDecoder a
rowparser Query
q
where
toSingleRow :: [a] -> IO a
toSingleRow [a]
res = do
let queryBs :: ByteString
queryBs = Query -> ByteString
queryToByteString Query
q
case [a]
res of
[] -> ByteString -> Text -> IO a
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
queryBs Text
"Expected exactly one row in query/pipeline1 call, but got none."
[a
singleRow] -> a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
singleRow
[a]
_ -> ByteString -> Text -> IO a
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
queryBs Text
"Expected exactly one row in query/pipeline1 call, but got more than one."
pipelineMay :: (FromPgRow a) => Query -> Pipeline (IO (Maybe a))
pipelineMay :: forall a. FromPgRow a => Query -> Pipeline (IO (Maybe a))
pipelineMay = RowDecoder a -> Query -> Pipeline (IO (Maybe a))
forall a. RowDecoder a -> Query -> Pipeline (IO (Maybe a))
pipelineMayWith RowDecoder a
forall a. FromPgRow a => RowDecoder a
rowDecoder
pipelineMayWith :: RowDecoder a -> Query -> Pipeline (IO (Maybe a))
pipelineMayWith :: forall a. RowDecoder a -> Query -> Pipeline (IO (Maybe a))
pipelineMayWith RowDecoder a
rowparser Query
q = ([a] -> IO (Maybe a)
toMaybeRow ([a] -> IO (Maybe a)) -> IO [a] -> IO (Maybe a)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) (IO [a] -> IO (Maybe a))
-> Pipeline (IO [a]) -> Pipeline (IO (Maybe a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RowDecoder a -> Query -> Pipeline (IO [a])
forall a. RowDecoder a -> Query -> Pipeline (IO [a])
pipelineWith RowDecoder a
rowparser Query
q
where
toMaybeRow :: [a] -> IO (Maybe a)
toMaybeRow [a]
res = do
let queryBs :: ByteString
queryBs = Query -> ByteString
queryToByteString Query
q
case [a]
res of
[] -> Maybe a -> IO (Maybe a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing
[a
singleRow] -> Maybe a -> IO (Maybe a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe a -> IO (Maybe a)) -> Maybe a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
singleRow
[a]
_ -> ByteString -> Text -> IO (Maybe a)
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
queryBs Text
"Expected zero or one row in query/pipelineMay call, but got more than one."
pipelineExec :: Query -> Pipeline (IO Int64)
pipelineExec :: Query -> Pipeline (IO Int64)
pipelineExec = NonEmpty SingleQuery -> Pipeline (IO Int64)
pipelineExecInternal (NonEmpty SingleQuery -> Pipeline (IO Int64))
-> (Query -> NonEmpty SingleQuery) -> Query -> Pipeline (IO Int64)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Query -> NonEmpty SingleQuery
breakQueryIntoStatements
pipelineExec_ :: Query -> Pipeline (IO ())
pipelineExec_ :: Query -> Pipeline (IO ())
pipelineExec_ = (IO Int64 -> IO ()) -> Pipeline (IO Int64) -> Pipeline (IO ())
forall a b. (a -> b) -> Pipeline a -> Pipeline b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap IO Int64 -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Pipeline (IO Int64) -> Pipeline (IO ()))
-> (Query -> Pipeline (IO Int64)) -> Query -> Pipeline (IO ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NonEmpty SingleQuery -> Pipeline (IO Int64)
pipelineExecInternal (NonEmpty SingleQuery -> Pipeline (IO Int64))
-> (Query -> NonEmpty SingleQuery) -> Query -> Pipeline (IO Int64)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Query -> NonEmpty SingleQuery
breakQueryIntoStatements
pipelineExecInternal :: NonEmpty SingleQuery -> Pipeline (IO Int64)
pipelineExecInternal :: NonEmpty SingleQuery -> Pipeline (IO Int64)
pipelineExecInternal NonEmpty SingleQuery
qs =
[(SingleQuery, Maybe Int)]
-> (HPgConnection -> [QueryId] -> IO Int64) -> Pipeline (IO Int64)
forall a.
[(SingleQuery, Maybe Int)]
-> (HPgConnection -> [QueryId] -> a) -> Pipeline a
Pipeline
((SingleQuery -> (SingleQuery, Maybe Int))
-> [SingleQuery] -> [(SingleQuery, Maybe Int)]
forall a b. (a -> b) -> [a] -> [b]
map (,Maybe Int
forall a. Maybe a
Nothing) (NonEmpty SingleQuery -> [SingleQuery]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty SingleQuery
qs))
( \HPgConnection
conn [QueryId]
qryIds -> do
case [QueryId] -> ([QueryId], Maybe QueryId)
forall a. [a] -> ([a], Maybe a)
lastAndInit [QueryId]
qryIds of
([QueryId]
firstQueries, Maybe QueryId
mLastQry) -> do
[QueryId] -> (QueryId -> IO Int64) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [QueryId]
firstQueries ((QueryId -> IO Int64) -> IO ()) -> (QueryId -> IO Int64) -> IO ()
forall a b. (a -> b) -> a -> b
$ HPgConnection -> QueryId -> IO Int64
consumeResultsIgnoreRows HPgConnection
conn
HPgConnection -> QueryId -> IO Int64
consumeResultsIgnoreRows HPgConnection
conn (QueryId -> Maybe QueryId -> QueryId
forall a. a -> Maybe a -> a
fromMaybe ([Char] -> QueryId
forall a. HasCallStack => [Char] -> a
error [Char]
"pipelineExec internal bug: no mLastQry") Maybe QueryId
mLastQry)
)
runPipeline :: HPgConnection -> Pipeline a -> IO a
runPipeline :: forall a. HPgConnection -> Pipeline a -> IO a
runPipeline HPgConnection
conn Pipeline a
pip = HPgConnection -> Pipeline a -> STM () -> IO a
forall a. HPgConnection -> Pipeline a -> STM () -> IO a
runPipelineInternal HPgConnection
conn Pipeline a
pip (() -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
runPipelineInternal :: HPgConnection -> Pipeline a -> STM () -> IO a
runPipelineInternal :: forall a. HPgConnection -> Pipeline a -> STM () -> IO a
runPipelineInternal HPgConnection
conn (Pipeline ([(SingleQuery, Maybe Int)]
-> Maybe (NonEmpty (SingleQuery, Maybe Int))
forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty -> Maybe (NonEmpty (SingleQuery, Maybe Int))
mQueries) HPgConnection -> [QueryId] -> a
run) STM ()
onMsgsSentTxn =
case Maybe (NonEmpty (SingleQuery, Maybe Int))
mQueries of
Maybe (NonEmpty (SingleQuery, Maybe Int))
Nothing -> a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> IO a) -> a -> IO a
forall a b. (a -> b) -> a -> b
$ HPgConnection -> [QueryId] -> a
run HPgConnection
conn []
Just NonEmpty (SingleQuery, Maybe Int)
queries -> do
HPgConnection
-> NonEmpty (ByteString, QueryProtocol, Maybe [Char])
-> (InternalConnectionState -> EncodingContext -> [SomeMessage])
-> STM ()
-> (NonEmpty QueryId -> IO a)
-> IO a
forall a.
HPgConnection
-> NonEmpty (ByteString, QueryProtocol, Maybe [Char])
-> (InternalConnectionState -> EncodingContext -> [SomeMessage])
-> STM ()
-> (NonEmpty QueryId -> IO a)
-> IO a
sendPipeline
HPgConnection
conn
(((SingleQuery, Maybe Int)
-> (ByteString, QueryProtocol, Maybe [Char]))
-> NonEmpty (SingleQuery, Maybe Int)
-> NonEmpty (ByteString, QueryProtocol, Maybe [Char])
forall a b. (a -> b) -> NonEmpty a -> NonEmpty b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\(SingleQuery {ByteString
queryString :: ByteString
queryString :: SingleQuery -> ByteString
queryString, Maybe [Char]
preparedStmtHash :: Maybe [Char]
preparedStmtHash :: SingleQuery -> Maybe [Char]
preparedStmtHash}, Maybe Int
_) -> (ByteString
queryString, QueryProtocol
ExtendedQuery, Maybe [Char]
preparedStmtHash)) NonEmpty (SingleQuery, Maybe Int)
queries)
( \InternalConnectionState
st EncodingContext
encCtx ->
let toMessages :: Bool -> (SingleQuery, Maybe Int) -> [SomeMessage]
toMessages Bool
alreadyParsed (SingleQuery ByteString
qryString [EncodingContext -> (Maybe Oid, BinaryField)]
qryParams Maybe [Char]
preparedStmtHash, Maybe Int
mExpectedResultColFmts) =
let paramOidsAndValues :: [(Maybe Oid, BinaryField)]
paramOidsAndValues = ((EncodingContext -> (Maybe Oid, BinaryField))
-> (Maybe Oid, BinaryField))
-> [EncodingContext -> (Maybe Oid, BinaryField)]
-> [(Maybe Oid, BinaryField)]
forall a b. (a -> b) -> [a] -> [b]
map ((EncodingContext -> (Maybe Oid, BinaryField))
-> EncodingContext -> (Maybe Oid, BinaryField)
forall a b. (a -> b) -> a -> b
$ EncodingContext
encCtx) [EncodingContext -> (Maybe Oid, BinaryField)]
qryParams
in ( if Bool -> Bool
not Bool
alreadyParsed
then
[Parse -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage (Parse -> SomeMessage) -> Parse -> SomeMessage
forall a b. (a -> b) -> a -> b
$ ByteString -> [Maybe Oid] -> Maybe [Char] -> Parse
Parse ByteString
qryString (((Maybe Oid, BinaryField) -> Maybe Oid)
-> [(Maybe Oid, BinaryField)] -> [Maybe Oid]
forall a b. (a -> b) -> [a] -> [b]
map (Maybe Oid, BinaryField) -> Maybe Oid
forall a b. (a, b) -> a
fst [(Maybe Oid, BinaryField)]
paramOidsAndValues) Maybe [Char]
preparedStmtHash]
else []
)
[SomeMessage] -> [SomeMessage] -> [SomeMessage]
forall a. [a] -> [a] -> [a]
++ [ Bind -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage (Bind -> SomeMessage) -> Bind -> SomeMessage
forall a b. (a -> b) -> a -> b
$
Bind
{ paramsValuesInOrder :: [BinaryField]
paramsValuesInOrder = ((Maybe Oid, BinaryField) -> BinaryField)
-> [(Maybe Oid, BinaryField)] -> [BinaryField]
forall a b. (a -> b) -> [a] -> [b]
map (Maybe Oid, BinaryField) -> BinaryField
forall a b. (a, b) -> b
snd [(Maybe Oid, BinaryField)]
paramOidsAndValues,
resultColumnFmts :: Int
resultColumnFmts = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe Int
1 Maybe Int
mExpectedResultColFmts,
Maybe [Char]
preparedStmtHash :: Maybe [Char]
preparedStmtHash :: Maybe [Char]
preparedStmtHash
},
Describe -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage Describe
Describe,
Execute -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage Execute
Execute
]
queryMsgs :: [SomeMessage]
queryMsgs =
[[SomeMessage]] -> [SomeMessage]
forall a. Monoid a => [a] -> a
mconcat ([[SomeMessage]] -> [SomeMessage])
-> [[SomeMessage]] -> [SomeMessage]
forall a b. (a -> b) -> a -> b
$
(Set [Char], [[SomeMessage]]) -> [[SomeMessage]]
forall a b. (a, b) -> b
snd ((Set [Char], [[SomeMessage]]) -> [[SomeMessage]])
-> (Set [Char], [[SomeMessage]]) -> [[SomeMessage]]
forall a b. (a -> b) -> a -> b
$
(Set [Char]
-> (SingleQuery, Maybe Int) -> (Set [Char], [SomeMessage]))
-> Set [Char]
-> [(SingleQuery, Maybe Int)]
-> (Set [Char], [[SomeMessage]])
forall (t :: * -> *) s a b.
Traversable t =>
(s -> a -> (s, b)) -> s -> t a -> (s, t b)
List.mapAccumL
( \Set [Char]
pnames (SingleQuery, Maybe Int)
qry ->
case ((SingleQuery, Maybe Int) -> SingleQuery
forall a b. (a, b) -> a
fst (SingleQuery, Maybe Int)
qry).preparedStmtHash of
Maybe [Char]
Nothing -> (Set [Char]
pnames, Bool -> (SingleQuery, Maybe Int) -> [SomeMessage]
toMessages Bool
False (SingleQuery, Maybe Int)
qry)
Just [Char]
psh -> ([Char] -> Set [Char] -> Set [Char]
forall a. Ord a => a -> Set a -> Set a
Set.insert [Char]
psh Set [Char]
pnames, Bool -> (SingleQuery, Maybe Int) -> [SomeMessage]
toMessages ([Char]
psh [Char] -> Set [Char] -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.member` Set [Char]
pnames) (SingleQuery, Maybe Int)
qry)
)
InternalConnectionState
st.preparedStatementNames
(NonEmpty (SingleQuery, Maybe Int) -> [(SingleQuery, Maybe Int)]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty (SingleQuery, Maybe Int)
queries)
in [SomeMessage]
queryMsgs [SomeMessage] -> [SomeMessage] -> [SomeMessage]
forall a. [a] -> [a] -> [a]
++ [Sync -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage Sync
Sync]
)
STM ()
onMsgsSentTxn
((NonEmpty QueryId -> IO a) -> IO a)
-> (NonEmpty QueryId -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \NonEmpty QueryId
qryIds -> a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> IO a) -> a -> IO a
forall a b. (a -> b) -> a -> b
$ HPgConnection -> [QueryId] -> a
run HPgConnection
conn (NonEmpty QueryId -> [QueryId]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty QueryId
qryIds)
data WhichRowDecoder a = ApplicativeRowDecoder !(RowDecoder a) | MonadicRowDecoder !(RowDecoderMonadic a)
consumeStreamingResults :: WhichRowDecoder a -> HPgConnection -> QueryId -> Stream (Of a) IO ()
consumeStreamingResults :: forall a.
WhichRowDecoder a
-> HPgConnection -> QueryId -> Stream (Of a) IO ()
consumeStreamingResults WhichRowDecoder a
rp HPgConnection
conn QueryId
qryId = IO (Stream (Of a) IO ()) -> Stream (Of a) IO ()
forall (m :: * -> *) (f :: * -> *) r.
(Monad m, Functor f) =>
m (Stream f m r) -> Stream f m r
S.effect (IO (Stream (Of a) IO ()) -> Stream (Of a) IO ())
-> IO (Stream (Of a) IO ()) -> Stream (Of a) IO ()
forall a b. (a -> b) -> a -> b
$ do
qText <- HPgConnection -> QueryId -> IO ByteString
lookupQueryText HPgConnection
conn QueryId
qryId
(mERowDesc, rowsStream) <- consumeResults conn qryId
case mERowDesc of
Maybe (Either3 NoData RowDescription CopyInResponse)
Nothing -> do
rowCount :> res <- Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
-> IO (Of Int (Either ErrorResponse CommandComplete))
forall (m :: * -> *) a r.
Monad m =>
Stream (Of a) m r -> m (Of Int r)
S.length Stream (Of DataRow) IO (Either ErrorResponse CommandComplete)
rowsStream
when (rowCount > 0) $ throwIrrecoverableErrorWithStatement qText "Bug in Hpgsql. We didn't get either NoData or RowDescription, so we assumed there was an error binding the query, but we got more than 0 rows in results"
case res of
Left ErrorResponse
err -> ByteString -> ErrorResponse -> IO (Stream (Of a) IO ())
forall a. ByteString -> ErrorResponse -> IO a
throwPostgresError ByteString
qText ErrorResponse
err
Right CommandComplete
_cmd -> ByteString -> Text -> IO (Stream (Of a) IO ())
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
qText Text
"Bug in Hpgsql. We didn't get either NoData or RowDescription, so we assumed there was an error binding the query, but we then received a CommandComplete."
Just (Left3 NoData
_noData) -> ByteString -> Text -> IO (Stream (Of a) IO ())
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
qText Text
"You have sent a count-returning query but expected it to be a rows-returning query. This is not supported."
Just (Right3 CopyInResponse
_copyInResponse) -> ByteString -> Text -> IO (Stream (Of a) IO ())
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
qText Text
"You have sent a COPY FROM STDIN query but expected it to be a rows-returning query. This is not supported."
Just (Middle3 (RowDescription [(Text, Oid)]
coltypes)) -> do
encodingContext <- MVar EncodingContext -> IO EncodingContext
forall a. MVar a -> IO a
readMVar HPgConnection
conn.encodingContext
let numResultColumns = [(Text, Oid)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(Text, Oid)]
coltypes
mkColInfo (Text
colName, Oid
oid) = Oid -> Maybe Text -> EncodingContext -> FieldInfo
FieldInfo Oid
oid (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
colName) EncodingContext
encodingContext
colInfos = ((Text, Oid) -> FieldInfo) -> [(Text, Oid)] -> [FieldInfo]
forall a b. (a -> b) -> [a] -> [b]
map (Text, Oid) -> FieldInfo
mkColInfo [(Text, Oid)]
coltypes
!rowparser <- case rp of
ApplicativeRowDecoder (RowDecoder [FieldInfo] -> Parser a
rparser [FieldInfo] -> [(FieldInfo, Bool)]
rtypecheck Int
expectedNumCols) -> do
let typecheckedColInfos :: [(FieldInfo, Bool)]
typecheckedColInfos = [FieldInfo] -> [(FieldInfo, Bool)]
rtypecheck [FieldInfo]
colInfos
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Int
numResultColumns Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
expectedNumCols) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Text -> IO ()
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
qText (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Query result contains " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
Text.pack (Int -> [Char]
forall a. Show a => a -> [Char]
show Int
numResultColumns) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" columns but row parser expected " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
Text.pack (Int -> [Char]
forall a. Show a => a -> [Char]
show Int
expectedNumCols)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (((FieldInfo, Bool) -> Bool) -> [(FieldInfo, Bool)] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all (FieldInfo, Bool) -> Bool
forall a b. (a, b) -> b
snd [(FieldInfo, Bool)]
typecheckedColInfos) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Text -> IO ()
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
qText Text
"Query result column types do not match expected column types"
Parser a -> IO (Parser a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Parser a -> IO (Parser a)) -> Parser a -> IO (Parser a)
forall a b. (a -> b) -> a -> b
$ [FieldInfo] -> Parser a
rparser [FieldInfo]
colInfos Parser a -> Parser () -> Parser a
forall a b. Parser a -> Parser b -> Parser a
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* Parser ()
Parser.endOfInput
MonadicRowDecoder (RowDecoderMonadic ConversionState -> Parser (a, Int)
rparser) -> Parser a -> IO (Parser a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Parser a -> IO (Parser a)) -> Parser a -> IO (Parser a)
forall a b. (a -> b) -> a -> b
$ ((a, Int) -> a) -> Parser (a, Int) -> Parser a
forall a b. (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, Int) -> a
forall a b. (a, b) -> a
fst (Parser (a, Int) -> Parser a) -> Parser (a, Int) -> Parser a
forall a b. (a -> b) -> a -> b
$ ConversionState -> Parser (a, Int)
rparser ConversionState {colsLeftToParse :: [FieldInfo]
colsLeftToParse = [FieldInfo]
colInfos} Parser (a, Int) -> Parser () -> Parser (a, Int)
forall a b. Parser a -> Parser b -> Parser a
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* Parser ()
Parser.endOfInput
pure $ do
errOrCmdComplete <-
S.mapM
( \(DataRow ByteString
rowColumnData) ->
case Parser a -> ByteString -> ParseResult a
forall a. Parser a -> ByteString -> ParseResult a
Parser.parseOnly Parser a
rowparser ByteString
rowColumnData of
Parser.ParseOk a
row -> a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
row
Parser.ParseFail [Char]
err -> ByteString -> Text -> IO a
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
qText (Text -> IO a) -> Text -> IO a
forall a b. (a -> b) -> a -> b
$ Text
"Failed parsing a row: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
Text.pack ([Char] -> [Char]
forall a. Show a => a -> [Char]
show [Char]
err)
)
rowsStream
S.effect $ case errOrCmdComplete of
Left ErrorResponse
err -> ByteString -> ErrorResponse -> IO (Stream (Of a) IO ())
forall a. ByteString -> ErrorResponse -> IO a
throwPostgresError ByteString
qText ErrorResponse
err
Right CommandComplete
_cmdComplete -> Stream (Of a) IO () -> IO (Stream (Of a) IO ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Stream (Of a) IO ()
forall a. Monoid a => a
mempty
query :: forall a. (FromPgRow a) => HPgConnection -> Query -> IO [a]
query :: forall a. FromPgRow a => HPgConnection -> Query -> IO [a]
query = RowDecoder a -> HPgConnection -> Query -> IO [a]
forall a. RowDecoder a -> HPgConnection -> Query -> IO [a]
queryWith (forall a. FromPgRow a => RowDecoder a
rowDecoder @a)
queryWith :: RowDecoder a -> HPgConnection -> Query -> IO [a]
queryWith :: forall a. RowDecoder a -> HPgConnection -> Query -> IO [a]
queryWith RowDecoder a
rparser HPgConnection
conn Query
qry = IO (IO [a]) -> IO [a]
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO [a]) -> IO [a]) -> IO (IO [a]) -> IO [a]
forall a b. (a -> b) -> a -> b
$ HPgConnection -> Pipeline (IO [a]) -> IO (IO [a])
forall a. HPgConnection -> Pipeline a -> IO a
runPipeline HPgConnection
conn (Pipeline (IO [a]) -> IO (IO [a]))
-> Pipeline (IO [a]) -> IO (IO [a])
forall a b. (a -> b) -> a -> b
$ RowDecoder a -> Query -> Pipeline (IO [a])
forall a. RowDecoder a -> Query -> Pipeline (IO [a])
pipelineWith RowDecoder a
rparser Query
qry
queryMWith :: RowDecoderMonadic a -> HPgConnection -> Query -> IO [a]
queryMWith :: forall a. RowDecoderMonadic a -> HPgConnection -> Query -> IO [a]
queryMWith RowDecoderMonadic a
rparser HPgConnection
conn Query
qry = IO (IO [a]) -> IO [a]
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO [a]) -> IO [a]) -> IO (IO [a]) -> IO [a]
forall a b. (a -> b) -> a -> b
$ HPgConnection -> Pipeline (IO [a]) -> IO (IO [a])
forall a. HPgConnection -> Pipeline a -> IO a
runPipeline HPgConnection
conn (Pipeline (IO [a]) -> IO (IO [a]))
-> Pipeline (IO [a]) -> IO (IO [a])
forall a b. (a -> b) -> a -> b
$ RowDecoderMonadic a -> Query -> Pipeline (IO [a])
forall a. RowDecoderMonadic a -> Query -> Pipeline (IO [a])
pipelineM RowDecoderMonadic a
rparser Query
qry
query1 :: forall a. (FromPgRow a) => HPgConnection -> Query -> IO a
query1 :: forall a. FromPgRow a => HPgConnection -> Query -> IO a
query1 = RowDecoder a -> HPgConnection -> Query -> IO a
forall a. RowDecoder a -> HPgConnection -> Query -> IO a
query1With RowDecoder a
forall a. FromPgRow a => RowDecoder a
rowDecoder
query1With :: RowDecoder a -> HPgConnection -> Query -> IO a
query1With :: forall a. RowDecoder a -> HPgConnection -> Query -> IO a
query1With RowDecoder a
rparser HPgConnection
conn Query
q = IO (IO a) -> IO a
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO a) -> IO a) -> IO (IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ HPgConnection -> Pipeline (IO a) -> IO (IO a)
forall a. HPgConnection -> Pipeline a -> IO a
runPipeline HPgConnection
conn (Pipeline (IO a) -> IO (IO a)) -> Pipeline (IO a) -> IO (IO a)
forall a b. (a -> b) -> a -> b
$ RowDecoder a -> Query -> Pipeline (IO a)
forall a. RowDecoder a -> Query -> Pipeline (IO a)
pipeline1With RowDecoder a
rparser Query
q
queryMay :: forall a. (FromPgRow a) => HPgConnection -> Query -> IO (Maybe a)
queryMay :: forall a. FromPgRow a => HPgConnection -> Query -> IO (Maybe a)
queryMay = RowDecoder a -> HPgConnection -> Query -> IO (Maybe a)
forall a. RowDecoder a -> HPgConnection -> Query -> IO (Maybe a)
queryMayWith RowDecoder a
forall a. FromPgRow a => RowDecoder a
rowDecoder
queryMayWith :: RowDecoder a -> HPgConnection -> Query -> IO (Maybe a)
queryMayWith :: forall a. RowDecoder a -> HPgConnection -> Query -> IO (Maybe a)
queryMayWith RowDecoder a
rparser HPgConnection
conn Query
q = IO (IO (Maybe a)) -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO (Maybe a)) -> IO (Maybe a))
-> IO (IO (Maybe a)) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ HPgConnection -> Pipeline (IO (Maybe a)) -> IO (IO (Maybe a))
forall a. HPgConnection -> Pipeline a -> IO a
runPipeline HPgConnection
conn (Pipeline (IO (Maybe a)) -> IO (IO (Maybe a)))
-> Pipeline (IO (Maybe a)) -> IO (IO (Maybe a))
forall a b. (a -> b) -> a -> b
$ RowDecoder a -> Query -> Pipeline (IO (Maybe a))
forall a. RowDecoder a -> Query -> Pipeline (IO (Maybe a))
pipelineMayWith RowDecoder a
rparser Query
q
withCopy_ :: HPgConnection -> Query -> IO a -> IO Int64
withCopy_ :: forall a. HPgConnection -> Query -> IO a -> IO Int64
withCopy_ HPgConnection
conn Query
copyQ IO a
copyFn = (Int64, a) -> Int64
forall a b. (a, b) -> a
fst ((Int64, a) -> Int64) -> IO (Int64, a) -> IO Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> HPgConnection -> Query -> IO a -> IO (Int64, a)
forall a. HPgConnection -> Query -> IO a -> IO (Int64, a)
withCopy HPgConnection
conn Query
copyQ IO a
copyFn
withCopy :: HPgConnection -> Query -> IO a -> IO (Int64, a)
withCopy :: forall a. HPgConnection -> Query -> IO a -> IO (Int64, a)
withCopy HPgConnection
conn Query
copyQ IO a
copyFn = HPgConnection
-> Query -> (QueryId -> IO (Int64, a)) -> IO (Int64, a)
forall a.
HPgConnection
-> Query -> (QueryId -> IO (Int64, a)) -> IO (Int64, a)
withCopyInternal HPgConnection
conn Query
copyQ ((QueryId -> IO (Int64, a)) -> IO (Int64, a))
-> (QueryId -> IO (Int64, a)) -> IO (Int64, a)
forall a b. (a -> b) -> a -> b
$ \QueryId
qryId -> do
ret <- IO a
copyFn
count <- copyEndInternal conn qryId
pure (count, ret)
withCopyInternal :: HPgConnection -> Query -> (QueryId -> IO (Int64, a)) -> IO (Int64, a)
withCopyInternal :: forall a.
HPgConnection
-> Query -> (QueryId -> IO (Int64, a)) -> IO (Int64, a)
withCopyInternal HPgConnection
conn (NonEmpty SingleQuery -> ([SingleQuery], SingleQuery)
forall a. NonEmpty a -> ([a], a)
lastAndInitNE (NonEmpty SingleQuery -> ([SingleQuery], SingleQuery))
-> (Query -> NonEmpty SingleQuery)
-> Query
-> ([SingleQuery], SingleQuery)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Query -> NonEmpty SingleQuery
breakQueryIntoStatements -> ([SingleQuery]
firstQueries, SingleQuery {[EncodingContext -> (Maybe Oid, BinaryField)]
Maybe [Char]
ByteString
queryString :: SingleQuery -> ByteString
preparedStmtHash :: SingleQuery -> Maybe [Char]
queryString :: ByteString
queryParams :: [EncodingContext -> (Maybe Oid, BinaryField)]
preparedStmtHash :: Maybe [Char]
queryParams :: SingleQuery -> [EncodingContext -> (Maybe Oid, BinaryField)]
..})) QueryId -> IO (Int64, a)
copyFn = do
[SingleQuery] -> (NonEmpty SingleQuery -> IO (ZonkAny 0)) -> IO ()
forall a b. [a] -> (NonEmpty a -> IO b) -> IO ()
whenNonEmpty [SingleQuery]
firstQueries ((NonEmpty SingleQuery -> IO (ZonkAny 0)) -> IO ())
-> (NonEmpty SingleQuery -> IO (ZonkAny 0)) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO (ZonkAny 0) -> NonEmpty SingleQuery -> IO (ZonkAny 0)
forall a b. a -> b -> a
const (IO (ZonkAny 0) -> NonEmpty SingleQuery -> IO (ZonkAny 0))
-> IO (ZonkAny 0) -> NonEmpty SingleQuery -> IO (ZonkAny 0)
forall a b. (a -> b) -> a -> b
$ Text -> IO (ZonkAny 0)
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Query for COPY must not have other SQL statements"
thisThreadId <- IO WeakThreadId
getMyWeakThreadId
sendPipeline
conn
((queryString, CopyQuery StillCopying, Nothing) :| [])
( \InternalConnectionState
_st EncodingContext
encCtx ->
let paramOidsAndValues :: [(Maybe Oid, BinaryField)]
paramOidsAndValues = ((EncodingContext -> (Maybe Oid, BinaryField))
-> (Maybe Oid, BinaryField))
-> [EncodingContext -> (Maybe Oid, BinaryField)]
-> [(Maybe Oid, BinaryField)]
forall a b. (a -> b) -> [a] -> [b]
map ((EncodingContext -> (Maybe Oid, BinaryField))
-> EncodingContext -> (Maybe Oid, BinaryField)
forall a b. (a -> b) -> a -> b
$ EncodingContext
encCtx) [EncodingContext -> (Maybe Oid, BinaryField)]
queryParams
in [ Parse -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage (Parse -> SomeMessage) -> Parse -> SomeMessage
forall a b. (a -> b) -> a -> b
$ ByteString -> [Maybe Oid] -> Maybe [Char] -> Parse
Parse ByteString
queryString (((Maybe Oid, BinaryField) -> Maybe Oid)
-> [(Maybe Oid, BinaryField)] -> [Maybe Oid]
forall a b. (a -> b) -> [a] -> [b]
map (Maybe Oid, BinaryField) -> Maybe Oid
forall a b. (a, b) -> a
fst [(Maybe Oid, BinaryField)]
paramOidsAndValues) Maybe [Char]
forall a. Maybe a
Nothing,
Bind -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage (Bind -> SomeMessage) -> Bind -> SomeMessage
forall a b. (a -> b) -> a -> b
$ Bind {paramsValuesInOrder :: [BinaryField]
paramsValuesInOrder = ((Maybe Oid, BinaryField) -> BinaryField)
-> [(Maybe Oid, BinaryField)] -> [BinaryField]
forall a b. (a -> b) -> [a] -> [b]
map (Maybe Oid, BinaryField) -> BinaryField
forall a b. (a, b) -> b
snd [(Maybe Oid, BinaryField)]
paramOidsAndValues, resultColumnFmts :: Int
resultColumnFmts = Int
0, preparedStmtHash :: Maybe [Char]
preparedStmtHash = Maybe [Char]
forall a. Maybe a
Nothing},
Execute -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage Execute
Execute,
Flush -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage Flush
Msgs.Flush
]
)
(pure ())
$ \(QueryId
qryId :| [QueryId]
_) -> do
IO (Maybe ResponseMsg, ResponseMsgsReceived) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ResponseMsg, ResponseMsgsReceived) -> IO ())
-> IO (Maybe ResponseMsg, ResponseMsgsReceived) -> IO ()
forall a b. (a -> b) -> a -> b
$ WeakThreadId
-> HPgConnection
-> QueryId
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveOutstandingResponseMsgsAtomically WeakThreadId
thisThreadId HPgConnection
conn QueryId
qryId
IO (Maybe ResponseMsg, ResponseMsgsReceived) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ResponseMsg, ResponseMsgsReceived) -> IO ())
-> IO (Maybe ResponseMsg, ResponseMsgsReceived) -> IO ()
forall a b. (a -> b) -> a -> b
$ WeakThreadId
-> HPgConnection
-> QueryId
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveOutstandingResponseMsgsAtomically WeakThreadId
thisThreadId HPgConnection
conn QueryId
qryId
IO (Maybe ResponseMsg, ResponseMsgsReceived) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ResponseMsg, ResponseMsgsReceived) -> IO ())
-> IO (Maybe ResponseMsg, ResponseMsgsReceived) -> IO ()
forall a b. (a -> b) -> a -> b
$ WeakThreadId
-> HPgConnection
-> QueryId
-> IO (Maybe ResponseMsg, ResponseMsgsReceived)
receiveOutstandingResponseMsgsAtomically WeakThreadId
thisThreadId HPgConnection
conn QueryId
qryId
QueryId -> IO (Int64, a)
copyFn QueryId
qryId
copyFromS :: forall r b. (ToPgRow r) => HPgConnection -> Query -> Stream (Of r) IO b -> IO (Int64, b)
copyFromS :: forall r b.
ToPgRow r =>
HPgConnection -> Query -> Stream (Of r) IO b -> IO (Int64, b)
copyFromS HPgConnection
conn Query
copyQ Stream (Of r) IO b
allRows =
HPgConnection
-> Query -> (QueryId -> IO (Int64, b)) -> IO (Int64, b)
forall a.
HPgConnection
-> Query -> (QueryId -> IO (Int64, a)) -> IO (Int64, a)
withCopyInternal HPgConnection
conn Query
copyQ ((QueryId -> IO (Int64, b)) -> IO (Int64, b))
-> (QueryId -> IO (Int64, b)) -> IO (Int64, b)
forall a b. (a -> b) -> a -> b
$ \QueryId
qryId -> do
HPgConnection
-> (TVar InternalConnectionState -> STM ())
-> (TVar InternalConnectionState -> STM ())
-> (() -> IO ())
-> IO ()
forall a b c.
HPgConnection
-> (TVar InternalConnectionState -> STM a)
-> (TVar InternalConnectionState -> STM b)
-> (a -> IO c)
-> IO c
withControlMsgsLock HPgConnection
conn (STM () -> TVar InternalConnectionState -> STM ()
forall a b. a -> b -> a
const (STM () -> TVar InternalConnectionState -> STM ())
-> STM () -> TVar InternalConnectionState -> STM ()
forall a b. (a -> b) -> a -> b
$ () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (STM () -> TVar InternalConnectionState -> STM ()
forall a b. a -> b -> a
const (STM () -> TVar InternalConnectionState -> STM ())
-> STM () -> TVar InternalConnectionState -> STM ()
forall a b. (a -> b) -> a -> b
$ () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (IO () -> () -> IO ()
forall a b. a -> b -> a
const (IO () -> () -> IO ()) -> IO () -> () -> IO ()
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
encCtx <- MVar EncodingContext -> IO EncodingContext
forall a. MVar a -> IO a
readMVar HPgConnection
conn.encodingContext
let !toBinaryRow = RowEncoder r
forall a. ToPgRow a => RowEncoder a
rowEncoder.toBinaryCopyBytes EncodingContext
encCtx
numColsBs = Int16 -> Builder
Builder.int16BE (Int16 -> Builder) -> Int16 -> Builder
forall a b. (a -> b) -> a -> b
$ Int -> Int16
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int16) -> Int -> Int16
forall a b. (a -> b) -> a -> b
$ [EncodingContext -> Maybe Oid] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ([EncodingContext -> Maybe Oid] -> Int)
-> [EncodingContext -> Maybe Oid] -> Int
forall a b. (a -> b) -> a -> b
$ RowEncoder r
forall a. ToPgRow a => RowEncoder a
rowEncoder.toTypeOids (forall t. Proxy t
forall {k} (t :: k). Proxy t
Proxy @r)
rethrowAsIrrecoverable $ nonAtomicSendMsg conn $ CopyData $ Builder.byteString "PGCOPY\n\xff\r\n\0" <> Builder.int32BE 0 <> Builder.int32BE 0
eHasRows <- S.inspect allRows
ret <- case eHasRows of
Left b
ret -> b -> IO b
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
ret
Right (r
firstRow :> Stream (Of r) IO b
otherRows) -> do
let byteStream :: Stream (Of Builder.Builder) IO b
byteStream :: Stream (Of Builder) IO b
byteStream = (r -> Builder) -> Stream (Of r) IO b -> Stream (Of Builder) IO b
forall (m :: * -> *) a b r.
Monad m =>
(a -> b) -> Stream (Of a) m r -> Stream (Of b) m r
S.map (\r
row -> Builder
numColsBs Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> r -> Builder
toBinaryRow r
row) (r -> Stream (Of r) IO b -> Stream (Of r) IO b
forall (m :: * -> *) a r.
Monad m =>
a -> Stream (Of a) m r -> Stream (Of a) m r
S.cons r
firstRow Stream (Of r) IO b
otherRows)
chunkedByteStream :: Stream (Of Builder.Builder) IO b
chunkedByteStream :: Stream (Of Builder) IO b
chunkedByteStream = Int32 -> Stream (Of Builder) IO b -> Stream (Of Builder) IO b
forall (m :: * -> *) r.
Monad m =>
Int32 -> Stream (Of Builder) m r -> Stream (Of Builder) m r
chunkBuildersBySize Int32
16384 Stream (Of Builder) IO b
byteStream
(Builder -> IO ()) -> Stream (Of Builder) IO b -> IO b
forall (m :: * -> *) a x r.
Monad m =>
(a -> m x) -> Stream (Of a) m r -> m r
S.mapM_
( \Builder
bs -> do
IO () -> IO ()
forall a. IO a -> IO a
rethrowAsIrrecoverable (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Socket -> ByteString -> IO ()
SocketBS.sendAll HPgConnection
conn.socket (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ Builder -> ByteString
Builder.toStrictByteString (Builder -> ByteString) -> Builder -> ByteString
forall a b. (a -> b) -> a -> b
$ CopyData -> Builder
forall a. ToPgMessage a => a -> Builder
toPgMessage (CopyData -> Builder) -> CopyData -> Builder
forall a b. (a -> b) -> a -> b
$ Builder -> CopyData
CopyData Builder
bs
)
Stream (Of Builder) IO b
chunkedByteStream
rethrowAsIrrecoverable $ nonAtomicSendMsg conn $ CopyData $ Builder.int16BE (-1)
count <- copyEndInternal conn qryId
pure (count, ret)
copyFrom :: (ToPgRow r) => HPgConnection -> Query -> [r] -> IO Int64
copyFrom :: forall r. ToPgRow r => HPgConnection -> Query -> [r] -> IO Int64
copyFrom HPgConnection
conn Query
copyQ [r]
rows = (Int64, ()) -> Int64
forall a b. (a, b) -> a
fst ((Int64, ()) -> Int64) -> IO (Int64, ()) -> IO Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> HPgConnection -> Query -> Stream (Of r) IO () -> IO (Int64, ())
forall r b.
ToPgRow r =>
HPgConnection -> Query -> Stream (Of r) IO b -> IO (Int64, b)
copyFromS HPgConnection
conn Query
copyQ ([r] -> Stream (Of r) IO ()
forall (m :: * -> *) (f :: * -> *) a.
(Monad m, Foldable f) =>
f a -> Stream (Of a) m ()
S.each [r]
rows)
chunkBuildersBySize :: (Monad m) => Int32 -> Stream (Of Builder.Builder) m r -> Stream (Of Builder.Builder) m r
chunkBuildersBySize :: forall (m :: * -> *) r.
Monad m =>
Int32 -> Stream (Of Builder) m r -> Stream (Of Builder) m r
chunkBuildersBySize Int32
maxSize = Builder -> Stream (Of Builder) m r -> Stream (Of Builder) m r
go Builder
forall a. Monoid a => a
mempty
where
go :: Builder -> Stream (Of Builder) m r -> Stream (Of Builder) m r
go !Builder
acc !Stream (Of Builder) m r
stream = do
e <- m (Either r (Builder, Stream (Of Builder) m r))
-> Stream
(Of Builder) m (Either r (Builder, Stream (Of Builder) m r))
forall (m :: * -> *) a. Monad m => m a -> Stream (Of Builder) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
S.lift (m (Either r (Builder, Stream (Of Builder) m r))
-> Stream
(Of Builder) m (Either r (Builder, Stream (Of Builder) m r)))
-> m (Either r (Builder, Stream (Of Builder) m r))
-> Stream
(Of Builder) m (Either r (Builder, Stream (Of Builder) m r))
forall a b. (a -> b) -> a -> b
$ Stream (Of Builder) m r
-> m (Either r (Builder, Stream (Of Builder) m r))
forall (m :: * -> *) a r.
Monad m =>
Stream (Of a) m r -> m (Either r (a, Stream (Of a) m r))
S.next Stream (Of Builder) m r
stream
case e of
Left r
r -> do
Bool -> Stream (Of Builder) m () -> Stream (Of Builder) m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Builder -> Int32
Builder.builderLength Builder
acc Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
> Int32
0) (Stream (Of Builder) m () -> Stream (Of Builder) m ())
-> Stream (Of Builder) m () -> Stream (Of Builder) m ()
forall a b. (a -> b) -> a -> b
$
Builder -> Stream (Of Builder) m ()
forall (m :: * -> *) a. Monad m => a -> Stream (Of a) m ()
S.yield Builder
acc
r -> Stream (Of Builder) m r
forall a. a -> Stream (Of Builder) m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure r
r
Right (Builder
b, Stream (Of Builder) m r
rest) ->
let acc' :: Builder
acc' = Builder
acc Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Builder
b
in if Builder -> Int32
Builder.builderLength Builder
acc' Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
>= Int32
maxSize
then Builder -> Stream (Of Builder) m ()
forall (m :: * -> *) a. Monad m => a -> Stream (Of a) m ()
S.yield Builder
acc' Stream (Of Builder) m ()
-> Stream (Of Builder) m r -> Stream (Of Builder) m r
forall a b.
Stream (Of Builder) m a
-> Stream (Of Builder) m b -> Stream (Of Builder) m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Builder -> Stream (Of Builder) m r -> Stream (Of Builder) m r
go Builder
forall a. Monoid a => a
mempty Stream (Of Builder) m r
rest
else Builder -> Stream (Of Builder) m r -> Stream (Of Builder) m r
go Builder
acc' Stream (Of Builder) m r
rest
copyStart :: HPgConnection -> Query -> IO ()
copyStart :: HPgConnection -> Query -> IO ()
copyStart HPgConnection
conn Query
copyQry = (Int64, ()) -> ()
forall a b. (a, b) -> b
snd ((Int64, ()) -> ()) -> IO (Int64, ()) -> IO ()
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> HPgConnection
-> Query -> (QueryId -> IO (Int64, ())) -> IO (Int64, ())
forall a.
HPgConnection
-> Query -> (QueryId -> IO (Int64, a)) -> IO (Int64, a)
withCopyInternal HPgConnection
conn Query
copyQry (\QueryId
_qryId -> (Int64, ()) -> IO (Int64, ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int64
0, ()))
putCopyData :: HPgConnection -> ByteString -> IO ()
putCopyData :: HPgConnection -> ByteString -> IO ()
putCopyData HPgConnection
conn ByteString
t = HPgConnection -> CopyData -> IO ()
forall msg.
(ToPgMessage msg, Show msg) =>
HPgConnection -> msg -> IO ()
nonAtomicSendMsg HPgConnection
conn (CopyData -> IO ()) -> CopyData -> IO ()
forall a b. (a -> b) -> a -> b
$ Builder -> CopyData
CopyData (ByteString -> Builder
Builder.byteString ByteString
t)
putCopyError :: HPgConnection -> String -> IO ()
putCopyError :: HPgConnection -> [Char] -> IO ()
putCopyError HPgConnection
conn [Char]
causeForFailure = HPgConnection
-> (TVar InternalConnectionState -> STM ())
-> (TVar InternalConnectionState -> STM ())
-> (() -> IO ())
-> IO ()
forall a b c.
HPgConnection
-> (TVar InternalConnectionState -> STM a)
-> (TVar InternalConnectionState -> STM b)
-> (a -> IO c)
-> IO c
withControlMsgsLock HPgConnection
conn (STM () -> TVar InternalConnectionState -> STM ()
forall a b. a -> b -> a
const (STM () -> TVar InternalConnectionState -> STM ())
-> STM () -> TVar InternalConnectionState -> STM ()
forall a b. (a -> b) -> a -> b
$ () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (STM () -> TVar InternalConnectionState -> STM ()
forall a b. a -> b -> a
const (STM () -> TVar InternalConnectionState -> STM ())
-> STM () -> TVar InternalConnectionState -> STM ()
forall a b. (a -> b) -> a -> b
$ () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) ((() -> IO ()) -> IO ()) -> (() -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> () -> IO ()
forall a b. a -> b -> a
const (IO () -> () -> IO ()) -> IO () -> () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
mCopyState <-
STM (Maybe CopyQueryState) -> IO (Maybe CopyQueryState)
forall a. STM a -> IO a
STM.atomically (STM (Maybe CopyQueryState) -> IO (Maybe CopyQueryState))
-> STM (Maybe CopyQueryState) -> IO (Maybe CopyQueryState)
forall a b. (a -> b) -> a -> b
$
TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar (HPgConnection -> TVar InternalConnectionState
internalConnectionState HPgConnection
conn) STM InternalConnectionState
-> (InternalConnectionState -> STM (Maybe CopyQueryState))
-> STM (Maybe CopyQueryState)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \InternalConnectionState
st -> Maybe CopyQueryState -> STM (Maybe CopyQueryState)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe CopyQueryState -> STM (Maybe CopyQueryState))
-> Maybe CopyQueryState -> STM (Maybe CopyQueryState)
forall a b. (a -> b) -> a -> b
$ case InternalConnectionState -> [QueryState]
currentPipeline InternalConnectionState
st of
[QueryState {queryProtocol :: QueryState -> QueryProtocol
queryProtocol = CopyQuery CopyQueryState
copyState}] -> CopyQueryState -> Maybe CopyQueryState
forall a. a -> Maybe a
Just CopyQueryState
copyState
[QueryState]
_ -> Maybe CopyQueryState
forall a. Maybe a
Nothing
case mCopyState of
Just CopyQueryState
StillCopying -> do
let markCopyFailSent :: STM ()
markCopyFailSent = do
let sttv :: TVar InternalConnectionState
sttv = HPgConnection -> TVar InternalConnectionState
internalConnectionState HPgConnection
conn
st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
sttv
case currentPipeline st of
[qs :: QueryState
qs@QueryState {queryProtocol :: QueryState -> QueryProtocol
queryProtocol = CopyQuery CopyQueryState
StillCopying}] -> TVar InternalConnectionState -> InternalConnectionState -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar (HPgConnection -> TVar InternalConnectionState
internalConnectionState HPgConnection
conn) (InternalConnectionState -> STM ())
-> InternalConnectionState -> STM ()
forall a b. (a -> b) -> a -> b
$ InternalConnectionState
st {currentPipeline = [qs {queryProtocol = CopyQuery CopyFailAndSyncSent}]}
[QueryState]
_ -> Text -> STM ()
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"Impossible: when marking CopyFail state was invalid"
HPgConnection -> ([SomeMessage], STM ()) -> IO ()
atomicallySendControlMsgs_ HPgConnection
conn ([CopyFail -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage (CopyFail -> SomeMessage) -> CopyFail -> SomeMessage
forall a b. (a -> b) -> a -> b
$ [Char] -> CopyFail
Msgs.CopyFail [Char]
causeForFailure, Sync -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage Sync
Sync], STM ()
markCopyFailSent)
Just CopyQueryState
copyState ->
Text -> IO ()
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Current COPY command is not in a cancellable state. Its current state is " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
Text.pack (CopyQueryState -> [Char]
forall a. Show a => a -> [Char]
show CopyQueryState
copyState)
Maybe CopyQueryState
Nothing ->
Text -> IO ()
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"There is no active COPY command to cancel"
copyEnd :: HPgConnection -> IO Int64
copyEnd :: HPgConnection -> IO Int64
copyEnd HPgConnection
conn = do
thisThreadId <- IO WeakThreadId
getMyWeakThreadId
qryId <- STM.atomically $ do
st <- STM.readTVar conn.internalConnectionState
case st.currentPipeline of
[] -> ByteString -> Text -> STM QueryId
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
"Ending a COPY statement" Text
"No active COPY statement when running copyEnd"
[QueryState
qs] -> case QueryState
qs.queryProtocol of
QueryProtocol
ExtendedQuery -> ByteString -> Text -> STM QueryId
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
"Ending a COPY statement" Text
"No active COPY statement when running copyEnd, but rather there was a regular query"
CopyQuery CopyQueryState
StillCopying -> if QueryState
qs.queryOwner WeakThreadId -> WeakThreadId -> Bool
forall a. Eq a => a -> a -> Bool
== WeakThreadId
thisThreadId then QueryId -> STM QueryId
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure QueryState
qs.queryIdentifier else ByteString -> Text -> STM QueryId
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
"Ending a COPY statement" Text
"Active COPY statement was issued by a different thread, and Hpgsql does not support multiple threads running/ending the same COPY statement"
CopyQuery CopyQueryState
CopyDoneAndSyncSent -> ByteString -> Text -> STM QueryId
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
"Ending a COPY statement" Text
"Active COPY statement was already finished"
CopyQuery CopyQueryState
CopyFailAndSyncSent -> ByteString -> Text -> STM QueryId
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
"Ending a COPY statement" Text
"Active COPY statement had previously failed"
[QueryState]
_ -> ByteString -> Text -> STM QueryId
forall (m :: * -> *) a. MonadThrow m => ByteString -> Text -> m a
throwIrrecoverableErrorWithStatement ByteString
"Ending a COPY statement" Text
"Active pipeline with other statements running when a copyEnd was attempted"
copyEndInternal conn qryId
copyEndInternal :: HPgConnection -> QueryId -> IO Int64
copyEndInternal :: HPgConnection -> QueryId -> IO Int64
copyEndInternal HPgConnection
conn QueryId
qryId = do
HPgConnection -> ([SomeMessage], STM ()) -> IO ()
atomicallySendControlMsgs_
HPgConnection
conn
( [CopyDone -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage CopyDone
CopyDone, Sync -> SomeMessage
forall msg. ToPgMessage msg => msg -> SomeMessage
SomeMessage Sync
Sync],
do
let sttv :: TVar InternalConnectionState
sttv = HPgConnection -> TVar InternalConnectionState
internalConnectionState HPgConnection
conn
st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
sttv
case currentPipeline st of
[q :: QueryState
q@QueryState {queryProtocol :: QueryState -> QueryProtocol
queryProtocol = CopyQuery CopyQueryState
StillCopying}] -> TVar InternalConnectionState -> InternalConnectionState -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar InternalConnectionState
sttv (InternalConnectionState -> STM ())
-> InternalConnectionState -> STM ()
forall a b. (a -> b) -> a -> b
$ InternalConnectionState
st {currentPipeline = [q {queryProtocol = CopyQuery CopyDoneAndSyncSent}]}
[QueryState]
_ -> Text -> STM ()
forall (m :: * -> *) a. MonadThrow m => Text -> m a
throwIrrecoverableError Text
"putCopyEnd called but there was no active COPY statement"
)
qText <- HPgConnection -> QueryId -> IO ByteString
lookupQueryText HPgConnection
conn QueryId
qryId
(_, stream) <- consumeResults conn qryId
res <- S.effects stream
case res of
Left ErrorResponse
err -> ByteString -> ErrorResponse -> IO Int64
forall a. ByteString -> ErrorResponse -> IO a
throwPostgresError ByteString
qText ErrorResponse
err
Right (CommandComplete Int64
n) -> Int64 -> IO Int64
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
n
atomicallySendControlMsgs_ :: HPgConnection -> ([SomeMessage], STM ()) -> IO ()
atomicallySendControlMsgs_ :: HPgConnection -> ([SomeMessage], STM ()) -> IO ()
atomicallySendControlMsgs_ HPgConnection
conn ([SomeMessage]
msgs, STM ()
stateUpdate) = HPgConnection
-> (TVar InternalConnectionState -> STM ())
-> (TVar InternalConnectionState -> STM ())
-> (() -> ((), [SomeMessage], STM ()))
-> IO ()
forall a b.
HPgConnection
-> (TVar InternalConnectionState -> STM a)
-> (TVar InternalConnectionState -> STM ())
-> (a -> (b, [SomeMessage], STM ()))
-> IO b
atomicallySendControlMsgs HPgConnection
conn (STM () -> TVar InternalConnectionState -> STM ()
forall a b. a -> b -> a
const (STM () -> TVar InternalConnectionState -> STM ())
-> STM () -> TVar InternalConnectionState -> STM ()
forall a b. (a -> b) -> a -> b
$ () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (STM () -> TVar InternalConnectionState -> STM ()
forall a b. a -> b -> a
const (STM () -> TVar InternalConnectionState -> STM ())
-> STM () -> TVar InternalConnectionState -> STM ()
forall a b. (a -> b) -> a -> b
$ () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (((), [SomeMessage], STM ()) -> () -> ((), [SomeMessage], STM ())
forall a b. a -> b -> a
const ((), [SomeMessage]
msgs, STM ()
stateUpdate))
data SomeMessage = forall msg. (ToPgMessage msg) => SomeMessage msg
instance ToPgMessage SomeMessage where
toPgMessage :: SomeMessage -> Builder
toPgMessage (SomeMessage msg
msg) = msg -> Builder
forall a. ToPgMessage a => a -> Builder
toPgMessage msg
msg
atomicallySendControlMsgs :: HPgConnection -> (TVar InternalConnectionState -> STM a) -> (TVar InternalConnectionState -> STM ()) -> (a -> (b, [SomeMessage], STM ())) -> IO b
atomicallySendControlMsgs :: forall a b.
HPgConnection
-> (TVar InternalConnectionState -> STM a)
-> (TVar InternalConnectionState -> STM ())
-> (a -> (b, [SomeMessage], STM ()))
-> IO b
atomicallySendControlMsgs HPgConnection
conn TVar InternalConnectionState -> STM a
acquire TVar InternalConnectionState -> STM ()
release a -> (b, [SomeMessage], STM ())
f = do
HPgConnection
-> (TVar InternalConnectionState -> STM a)
-> (TVar InternalConnectionState -> STM ())
-> (a -> IO b)
-> IO b
forall a b c.
HPgConnection
-> (TVar InternalConnectionState -> STM a)
-> (TVar InternalConnectionState -> STM b)
-> (a -> IO c)
-> IO c
withControlMsgsLock
HPgConnection
conn
TVar InternalConnectionState -> STM a
acquire
TVar InternalConnectionState -> STM ()
release
((a -> IO b) -> IO b) -> (a -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \a
acqValue ->
let (!b
ret, ![SomeMessage]
msgs, !STM ()
afterSentTxn) = a -> (b, [SomeMessage], STM ())
f a
acqValue
in MVar [(ByteString, STM ())]
-> ([(ByteString, STM ())] -> IO ([(ByteString, STM ())], b))
-> IO b
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (HPgConnection -> MVar [(ByteString, STM ())]
sendBuffer HPgConnection
conn) (([(ByteString, STM ())] -> IO ([(ByteString, STM ())], b))
-> IO b)
-> ([(ByteString, STM ())] -> IO ([(ByteString, STM ())], b))
-> IO b
forall a b. (a -> b) -> a -> b
$ \[(ByteString, STM ())]
msgsInBuffer ->
([(ByteString, STM ())], b) -> IO ([(ByteString, STM ())], b)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([(ByteString, STM ())]
msgsInBuffer [(ByteString, STM ())]
-> [(ByteString, STM ())] -> [(ByteString, STM ())]
forall a. [a] -> [a] -> [a]
++ [(Builder -> ByteString
Builder.toLazyByteString ([Builder] -> Builder
forall a. Monoid a => [a] -> a
mconcat ([Builder] -> Builder) -> [Builder] -> Builder
forall a b. (a -> b) -> a -> b
$ (SomeMessage -> Builder) -> [SomeMessage] -> [Builder]
forall a b. (a -> b) -> [a] -> [b]
map SomeMessage -> Builder
forall a. ToPgMessage a => a -> Builder
toPgMessage [SomeMessage]
msgs), STM ()
afterSentTxn)], b
ret)
getNotificationNonBlocking :: HPgConnection -> IO (Maybe NotificationResponse)
getNotificationNonBlocking :: HPgConnection -> IO (Maybe NotificationResponse)
getNotificationNonBlocking HPgConnection
conn = STM (Maybe NotificationResponse) -> IO (Maybe NotificationResponse)
forall a. STM a -> IO a
STM.atomically (STM (Maybe NotificationResponse)
-> IO (Maybe NotificationResponse))
-> STM (Maybe NotificationResponse)
-> IO (Maybe NotificationResponse)
forall a b. (a -> b) -> a -> b
$ HPgConnection
-> (TVar InternalConnectionState
-> STM (Maybe NotificationResponse))
-> STM (Maybe NotificationResponse)
forall a.
HPgConnection -> (TVar InternalConnectionState -> STM a) -> STM a
updateConnStateTxn HPgConnection
conn ((TVar InternalConnectionState -> STM (Maybe NotificationResponse))
-> STM (Maybe NotificationResponse))
-> (TVar InternalConnectionState
-> STM (Maybe NotificationResponse))
-> STM (Maybe NotificationResponse)
forall a b. (a -> b) -> a -> b
$ \TVar InternalConnectionState
sttv -> do
st <- TVar InternalConnectionState -> STM InternalConnectionState
forall a. TVar a -> STM a
STM.readTVar TVar InternalConnectionState
sttv
STM.tryReadTQueue (notificationsReceived st)
getNotification :: HPgConnection -> IO NotificationResponse
getNotification :: HPgConnection -> IO NotificationResponse
getNotification HPgConnection
conn =
IO NotificationResponse -> IO NotificationResponse
getNonBlockingOr (IO NotificationResponse -> IO NotificationResponse)
-> IO NotificationResponse -> IO NotificationResponse
forall a b. (a -> b) -> a -> b
$
HPgConnection
-> STM ()
-> (() -> IO NotificationResponse)
-> IO NotificationResponse
forall a b. HPgConnection -> STM a -> (a -> IO b) -> IO b
waitUntilPipelineIsReadyForNewQuery HPgConnection
conn (() -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) ((() -> IO NotificationResponse) -> IO NotificationResponse)
-> (() -> IO NotificationResponse) -> IO NotificationResponse
forall a b. (a -> b) -> a -> b
$ \() ->
IO NotificationResponse -> IO NotificationResponse
getNonBlockingOr (IO NotificationResponse -> IO NotificationResponse)
-> IO NotificationResponse -> IO NotificationResponse
forall a b. (a -> b) -> a -> b
$
HPgConnection
-> PgMsgParser NotificationResponse
-> (NotificationResponse -> IO NotificationResponse)
-> IO NotificationResponse
forall a b.
Show a =>
HPgConnection -> PgMsgParser a -> (a -> IO b) -> IO b
receiveNextMsgWithMaskedContinuation HPgConnection
conn (forall a. FromPgMessage a => PgMsgParser a
msgParser @NotificationResponse) NotificationResponse -> IO NotificationResponse
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
where
getNonBlockingOr :: IO NotificationResponse -> IO NotificationResponse
getNonBlockingOr IO NotificationResponse
f = do
mNotifFromQueue <- HPgConnection -> IO (Maybe NotificationResponse)
getNotificationNonBlocking HPgConnection
conn
case mNotifFromQueue of
Just NotificationResponse
notif -> NotificationResponse -> IO NotificationResponse
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NotificationResponse
notif
Maybe NotificationResponse
Nothing -> IO NotificationResponse
f
nonAtomicSendMsg :: (ToPgMessage msg, Show msg) => HPgConnection -> msg -> IO ()
nonAtomicSendMsg :: forall msg.
(ToPgMessage msg, Show msg) =>
HPgConnection -> msg -> IO ()
nonAtomicSendMsg HPgConnection {Socket
socket :: HPgConnection -> Socket
socket :: Socket
socket} msg
msg = do
Socket -> ByteString -> IO ()
SocketLBS.sendAll Socket
socket (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ Builder -> ByteString
Builder.toLazyByteString (Builder -> ByteString) -> Builder -> ByteString
forall a b. (a -> b) -> a -> b
$ msg -> Builder
forall a. ToPgMessage a => a -> Builder
toPgMessage msg
msg
[Char] -> IO ()
debugPrint ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Sent " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ msg -> [Char]
forall a. Show a => a -> [Char]
show msg
msg
rethrowAsIrrecoverable :: IO a -> IO a
rethrowAsIrrecoverable :: forall a. IO a -> IO a
rethrowAsIrrecoverable = (SomeException -> IO a) -> IO a -> IO a
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
(e -> m a) -> m a -> m a
handle (IrrecoverableHpgsqlError -> IO a
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throw (IrrecoverableHpgsqlError -> IO a)
-> (SomeException -> IrrecoverableHpgsqlError)
-> SomeException
-> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> IrrecoverableHpgsqlError
asIrrec)
where
asIrrec :: SomeException -> IrrecoverableHpgsqlError
asIrrec (SomeException
ex :: SomeException) = IrrecoverableHpgsqlError {hpgsqlDetails :: Text
hpgsqlDetails = Text
"An inner exception was thrown", innerException :: Maybe SomeException
innerException = SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
ex, relatedStatement :: Maybe ByteString
relatedStatement = Maybe ByteString
forall a. Maybe a
Nothing}
{-# NOINLINE _globalDebugLock #-}
_globalDebugLock :: MVar Bool
_globalDebugLock :: MVar Bool
_globalDebugLock = IO (MVar Bool) -> MVar Bool
forall a. IO a -> a
unsafePerformIO (IO (MVar Bool) -> MVar Bool) -> IO (MVar Bool) -> MVar Bool
forall a b. (a -> b) -> a -> b
$ Bool -> IO (MVar Bool)
forall a. a -> IO (MVar a)
newMVar Bool
True
debugPrint :: String -> IO ()
debugPrint :: [Char] -> IO ()
debugPrint [Char]
_ = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
{-# INLINE timeDebugNonBlockingOperation #-}
timeDebugNonBlockingOperation :: String -> IO a -> IO a
timeDebugNonBlockingOperation :: forall a. [Char] -> IO a -> IO a
timeDebugNonBlockingOperation [Char]
_ IO a
f = IO a
f