module Database.PostgreSQL.PQTypes.Internal.Connection
  ( -- * Connection
    Connection (..)
  , ConnectionStats (..)
  , initialConnectionStats
  , ConnectionSettings (..)
  , defaultConnectionSettings
  , ConnectionSourceM (..)
  , InternalConnectionSource (..)
  , ConnectionSource (..)
  , simpleSource
  , poolSource
  , connect
  , disconnect

    -- * Running queries
  , runQueryIO
  , QueryName (..)
  , runPreparedQueryIO
  ) where

import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception qualified as E
import Control.Monad
import Control.Monad.Base
import Control.Monad.Catch
import Data.ByteString.Char8 qualified as BS
import Data.Foldable qualified as F
import Data.Functor.Identity
import Data.IORef
import Data.Int
import Data.Kind
import Data.Maybe
import Data.Pool
import Data.Set qualified as S
import Data.String
import Data.Text qualified as T
import Data.Text.Encoding qualified as T
import Foreign.C.String
import Foreign.ForeignPtr
import Foreign.Ptr
import GHC.Clock (getMonotonicTime)
import GHC.Conc (closeFdWith)
import GHC.Stack

import Database.PostgreSQL.PQTypes.Internal.BackendPid
import Database.PostgreSQL.PQTypes.Internal.C.Interface
import Database.PostgreSQL.PQTypes.Internal.C.Types
import Database.PostgreSQL.PQTypes.Internal.Composite
import Database.PostgreSQL.PQTypes.Internal.Error
import Database.PostgreSQL.PQTypes.Internal.Error.Code
import Database.PostgreSQL.PQTypes.Internal.Exception
import Database.PostgreSQL.PQTypes.Internal.QueryResult
import Database.PostgreSQL.PQTypes.Internal.Utils
import Database.PostgreSQL.PQTypes.SQL.Class
import Database.PostgreSQL.PQTypes.SQL.Raw
import Database.PostgreSQL.PQTypes.ToSQL

data ConnectionSettings = ConnectionSettings
  { ConnectionSettings -> Text
csConnInfo :: !T.Text
  -- ^ Connection info string.
  , ConnectionSettings -> Maybe Text
csClientEncoding :: !(Maybe T.Text)
  -- ^ Client-side encoding. If set to 'Nothing', database encoding is used.
  , ConnectionSettings -> Maybe (RawSQL ())
csRole :: !(Maybe (RawSQL ()))
  -- ^ A custom role to set with "SET ROLE".
  , ConnectionSettings -> [Text]
csComposites :: ![T.Text]
  -- ^ A list of composite types to register. In order to be able to
  -- (de)serialize specific composite types, you need to register them.
  }
  deriving (ConnectionSettings -> ConnectionSettings -> Bool
(ConnectionSettings -> ConnectionSettings -> Bool)
-> (ConnectionSettings -> ConnectionSettings -> Bool)
-> Eq ConnectionSettings
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ConnectionSettings -> ConnectionSettings -> Bool
== :: ConnectionSettings -> ConnectionSettings -> Bool
$c/= :: ConnectionSettings -> ConnectionSettings -> Bool
/= :: ConnectionSettings -> ConnectionSettings -> Bool
Eq, Eq ConnectionSettings
Eq ConnectionSettings =>
(ConnectionSettings -> ConnectionSettings -> Ordering)
-> (ConnectionSettings -> ConnectionSettings -> Bool)
-> (ConnectionSettings -> ConnectionSettings -> Bool)
-> (ConnectionSettings -> ConnectionSettings -> Bool)
-> (ConnectionSettings -> ConnectionSettings -> Bool)
-> (ConnectionSettings -> ConnectionSettings -> ConnectionSettings)
-> (ConnectionSettings -> ConnectionSettings -> ConnectionSettings)
-> Ord ConnectionSettings
ConnectionSettings -> ConnectionSettings -> Bool
ConnectionSettings -> ConnectionSettings -> Ordering
ConnectionSettings -> ConnectionSettings -> ConnectionSettings
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: ConnectionSettings -> ConnectionSettings -> Ordering
compare :: ConnectionSettings -> ConnectionSettings -> Ordering
$c< :: ConnectionSettings -> ConnectionSettings -> Bool
< :: ConnectionSettings -> ConnectionSettings -> Bool
$c<= :: ConnectionSettings -> ConnectionSettings -> Bool
<= :: ConnectionSettings -> ConnectionSettings -> Bool
$c> :: ConnectionSettings -> ConnectionSettings -> Bool
> :: ConnectionSettings -> ConnectionSettings -> Bool
$c>= :: ConnectionSettings -> ConnectionSettings -> Bool
>= :: ConnectionSettings -> ConnectionSettings -> Bool
$cmax :: ConnectionSettings -> ConnectionSettings -> ConnectionSettings
max :: ConnectionSettings -> ConnectionSettings -> ConnectionSettings
$cmin :: ConnectionSettings -> ConnectionSettings -> ConnectionSettings
min :: ConnectionSettings -> ConnectionSettings -> ConnectionSettings
Ord, Int -> ConnectionSettings -> ShowS
[ConnectionSettings] -> ShowS
ConnectionSettings -> String
(Int -> ConnectionSettings -> ShowS)
-> (ConnectionSettings -> String)
-> ([ConnectionSettings] -> ShowS)
-> Show ConnectionSettings
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ConnectionSettings -> ShowS
showsPrec :: Int -> ConnectionSettings -> ShowS
$cshow :: ConnectionSettings -> String
show :: ConnectionSettings -> String
$cshowList :: [ConnectionSettings] -> ShowS
showList :: [ConnectionSettings] -> ShowS
Show)

-- | Default connection settings. Note that all strings sent to PostgreSQL by
-- the library are encoded as UTF-8, so don't alter client encoding unless you
-- know what you're doing.
defaultConnectionSettings :: ConnectionSettings
defaultConnectionSettings :: ConnectionSettings
defaultConnectionSettings =
  ConnectionSettings
    { csConnInfo :: Text
csConnInfo = Text
T.empty
    , csClientEncoding :: Maybe Text
csClientEncoding = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
"UTF-8"
    , csRole :: Maybe (RawSQL ())
csRole = Maybe (RawSQL ())
forall a. Maybe a
Nothing
    , csComposites :: [Text]
csComposites = []
    }

----------------------------------------

-- | Simple connection statistics.
data ConnectionStats = ConnectionStats
  { ConnectionStats -> Int
statsQueries :: !Int
  -- ^ Number of queries executed so far.
  , ConnectionStats -> Int
statsRows :: !Int
  -- ^ Number of rows fetched from the database.
  , ConnectionStats -> Int
statsValues :: !Int
  -- ^ Number of values fetched from the database.
  , ConnectionStats -> Int
statsParams :: !Int
  -- ^ Number of parameters sent to the database.
  , ConnectionStats -> Double
statsTime :: !Double
  -- ^ Time spent executing queries (in seconds).
  }
  deriving (ConnectionStats -> ConnectionStats -> Bool
(ConnectionStats -> ConnectionStats -> Bool)
-> (ConnectionStats -> ConnectionStats -> Bool)
-> Eq ConnectionStats
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ConnectionStats -> ConnectionStats -> Bool
== :: ConnectionStats -> ConnectionStats -> Bool
$c/= :: ConnectionStats -> ConnectionStats -> Bool
/= :: ConnectionStats -> ConnectionStats -> Bool
Eq, Eq ConnectionStats
Eq ConnectionStats =>
(ConnectionStats -> ConnectionStats -> Ordering)
-> (ConnectionStats -> ConnectionStats -> Bool)
-> (ConnectionStats -> ConnectionStats -> Bool)
-> (ConnectionStats -> ConnectionStats -> Bool)
-> (ConnectionStats -> ConnectionStats -> Bool)
-> (ConnectionStats -> ConnectionStats -> ConnectionStats)
-> (ConnectionStats -> ConnectionStats -> ConnectionStats)
-> Ord ConnectionStats
ConnectionStats -> ConnectionStats -> Bool
ConnectionStats -> ConnectionStats -> Ordering
ConnectionStats -> ConnectionStats -> ConnectionStats
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: ConnectionStats -> ConnectionStats -> Ordering
compare :: ConnectionStats -> ConnectionStats -> Ordering
$c< :: ConnectionStats -> ConnectionStats -> Bool
< :: ConnectionStats -> ConnectionStats -> Bool
$c<= :: ConnectionStats -> ConnectionStats -> Bool
<= :: ConnectionStats -> ConnectionStats -> Bool
$c> :: ConnectionStats -> ConnectionStats -> Bool
> :: ConnectionStats -> ConnectionStats -> Bool
$c>= :: ConnectionStats -> ConnectionStats -> Bool
>= :: ConnectionStats -> ConnectionStats -> Bool
$cmax :: ConnectionStats -> ConnectionStats -> ConnectionStats
max :: ConnectionStats -> ConnectionStats -> ConnectionStats
$cmin :: ConnectionStats -> ConnectionStats -> ConnectionStats
min :: ConnectionStats -> ConnectionStats -> ConnectionStats
Ord, Int -> ConnectionStats -> ShowS
[ConnectionStats] -> ShowS
ConnectionStats -> String
(Int -> ConnectionStats -> ShowS)
-> (ConnectionStats -> String)
-> ([ConnectionStats] -> ShowS)
-> Show ConnectionStats
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ConnectionStats -> ShowS
showsPrec :: Int -> ConnectionStats -> ShowS
$cshow :: ConnectionStats -> String
show :: ConnectionStats -> String
$cshowList :: [ConnectionStats] -> ShowS
showList :: [ConnectionStats] -> ShowS
Show)

-- | Initial connection statistics.
initialConnectionStats :: ConnectionStats
initialConnectionStats :: ConnectionStats
initialConnectionStats =
  ConnectionStats
    { statsQueries :: Int
statsQueries = Int
0
    , statsRows :: Int
statsRows = Int
0
    , statsValues :: Int
statsValues = Int
0
    , statsParams :: Int
statsParams = Int
0
    , statsTime :: Double
statsTime = Double
0
    }

-- | Representation of a connection object.
--
-- /Note:/ PGconn is not managed with a ForeignPtr because finalizers are broken
-- and at program exit might run even though another thread is inside the
-- relevant withForeignPtr block, executing a safe FFI call (in this case
-- executing an SQL query).
--
-- See https://gitlab.haskell.org/ghc/ghc/-/issues/10975 for more info.
data Connection = Connection
  { Connection -> Ptr PGconn
connPtr :: !(Ptr PGconn)
  -- ^ Pointer to connection object.
  , Connection -> BackendPid
connBackendPid :: !BackendPid
  -- ^ Process ID of the server process attached to the current session.
  , Connection -> IORef (Set Text)
connPreparedQueries :: !(IORef (S.Set T.Text))
  -- ^ A set of named prepared statements of the connection.
  }

data InternalConnectionSource m cdata = InternalConnectionSource
  { forall (m :: * -> *) cdata.
InternalConnectionSource m cdata -> m (Connection, cdata)
takeConnection :: !(m (Connection, cdata))
  , forall (m :: * -> *) cdata.
InternalConnectionSource m cdata
-> forall r. (Connection, cdata) -> ExitCase r -> m ()
putConnection :: !(forall r. (Connection, cdata) -> ExitCase r -> m ())
  }

-- | Database connection supplier.
data ConnectionSourceM m
  = forall cdata. ConnectionSourceM !(InternalConnectionSource m cdata)

-- | Wrapper for a polymorphic connection source.
newtype ConnectionSource (cs :: [(Type -> Type) -> Constraint]) = ConnectionSource
  { forall (cs :: [(* -> *) -> Constraint]).
ConnectionSource cs
-> forall (m :: * -> *). MkConstraint m cs => ConnectionSourceM m
unConnectionSource :: forall m. MkConstraint m cs => ConnectionSourceM m
  }

-- | Default connection supplier. It establishes new database connection each
-- time 'withConnection' is called.
simpleSource
  :: ConnectionSettings
  -> ConnectionSource [MonadBase IO, MonadMask]
simpleSource :: ConnectionSettings -> ConnectionSource '[MonadBase IO, MonadMask]
simpleSource ConnectionSettings
cs =
  (forall (m :: * -> *).
 MkConstraint m '[MonadBase IO, MonadMask] =>
 ConnectionSourceM m)
-> ConnectionSource '[MonadBase IO, MonadMask]
forall (cs :: [(* -> *) -> Constraint]).
(forall (m :: * -> *). MkConstraint m cs => ConnectionSourceM m)
-> ConnectionSource cs
ConnectionSource ((forall (m :: * -> *).
  MkConstraint m '[MonadBase IO, MonadMask] =>
  ConnectionSourceM m)
 -> ConnectionSource '[MonadBase IO, MonadMask])
-> (forall (m :: * -> *).
    MkConstraint m '[MonadBase IO, MonadMask] =>
    ConnectionSourceM m)
-> ConnectionSource '[MonadBase IO, MonadMask]
forall a b. (a -> b) -> a -> b
$
    InternalConnectionSource m () -> ConnectionSourceM m
forall (m :: * -> *) cdata.
InternalConnectionSource m cdata -> ConnectionSourceM m
ConnectionSourceM
      InternalConnectionSource
        { takeConnection :: m (Connection, ())
takeConnection = (,()) (Connection -> (Connection, ()))
-> m Connection -> m (Connection, ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO Connection -> m Connection
forall α. IO α -> m α
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (ConnectionSettings -> IO Connection
connect ConnectionSettings
cs)
        , putConnection :: forall r. (Connection, ()) -> ExitCase r -> m ()
putConnection = \(Connection
conn, ()) ExitCase r
_ -> IO () -> m ()
forall α. IO α -> m α
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Connection -> IO ()
disconnect Connection
conn
        }

-- | Pooled source. It uses striped pool from @resource-pool@ package to cache
-- established connections and reuse them.
poolSource
  :: ConnectionSettings
  -> (IO Connection -> (Connection -> IO ()) -> PoolConfig Connection)
  -- ^ A function for creating the 'PoolConfig' with desired parameters.
  --
  -- /Note:/ supplied arguments are for creation and destruction of a database
  -- connection.
  -> IO (ConnectionSource [MonadBase IO, MonadMask])
poolSource :: ConnectionSettings
-> (IO Connection
    -> (Connection -> IO ()) -> PoolConfig Connection)
-> IO (ConnectionSource '[MonadBase IO, MonadMask])
poolSource ConnectionSettings
cs IO Connection -> (Connection -> IO ()) -> PoolConfig Connection
mkPoolConfig = do
  pool <- PoolConfig Connection -> IO (Pool Connection)
forall a. PoolConfig a -> IO (Pool a)
newPool (PoolConfig Connection -> IO (Pool Connection))
-> PoolConfig Connection -> IO (Pool Connection)
forall a b. (a -> b) -> a -> b
$ IO Connection -> (Connection -> IO ()) -> PoolConfig Connection
mkPoolConfig (ConnectionSettings -> IO Connection
connect ConnectionSettings
cs) Connection -> IO ()
disconnect
  pure $ ConnectionSource (sourceM pool)
  where
    sourceM :: Pool Connection -> ConnectionSourceM m
sourceM Pool Connection
pool =
      InternalConnectionSource m (LocalPool Connection)
-> ConnectionSourceM m
forall (m :: * -> *) cdata.
InternalConnectionSource m cdata -> ConnectionSourceM m
ConnectionSourceM
        InternalConnectionSource
          { takeConnection :: m (Connection, LocalPool Connection)
takeConnection = IO (Connection, LocalPool Connection)
-> m (Connection, LocalPool Connection)
forall α. IO α -> m α
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO (Connection, LocalPool Connection)
 -> m (Connection, LocalPool Connection))
-> IO (Connection, LocalPool Connection)
-> m (Connection, LocalPool Connection)
forall a b. (a -> b) -> a -> b
$ Pool Connection -> IO (Connection, LocalPool Connection)
forall a. Pool a -> IO (a, LocalPool a)
takeResource Pool Connection
pool
          , putConnection :: forall r. (Connection, LocalPool Connection) -> ExitCase r -> m ()
putConnection = \(Connection
resource, LocalPool Connection
local) -> \case
              ExitCaseSuccess r
_ -> IO () -> m ()
forall α. IO α -> m α
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ LocalPool Connection -> Connection -> IO ()
forall a. LocalPool a -> a -> IO ()
putResource LocalPool Connection
local Connection
resource
              ExitCase r
_ -> IO () -> m ()
forall α. IO α -> m α
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Pool Connection -> LocalPool Connection -> Connection -> IO ()
forall a. Pool a -> LocalPool a -> a -> IO ()
destroyResource Pool Connection
pool LocalPool Connection
local Connection
resource
          }

----------------------------------------

-- | Low-level function for connecting to the database. Useful if one wants to
-- implement custom connection source.
--
-- /Warning:/ the 'Connection' needs to be explicitly destroyed with
-- 'disconnect', otherwise there will be a resource leak.
connect :: ConnectionSettings -> IO Connection
connect :: ConnectionSettings -> IO Connection
connect ConnectionSettings {[Text]
Maybe Text
Maybe (RawSQL ())
Text
csConnInfo :: ConnectionSettings -> Text
csClientEncoding :: ConnectionSettings -> Maybe Text
csRole :: ConnectionSettings -> Maybe (RawSQL ())
csComposites :: ConnectionSettings -> [Text]
csConnInfo :: Text
csClientEncoding :: Maybe Text
csRole :: Maybe (RawSQL ())
csComposites :: [Text]
..} = ((forall a. IO a -> IO a) -> IO Connection) -> IO Connection
forall b.
HasCallStack =>
((forall a. IO a -> IO a) -> IO b) -> IO b
forall (m :: * -> *) b.
(MonadMask m, HasCallStack) =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. IO a -> IO a) -> IO Connection) -> IO Connection)
-> ((forall a. IO a -> IO a) -> IO Connection) -> IO Connection
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> do
  connPtr <- ByteString -> (CString -> IO (Ptr PGconn)) -> IO (Ptr PGconn)
forall a. ByteString -> (CString -> IO a) -> IO a
BS.useAsCString (Text -> ByteString
T.encodeUtf8 Text
csConnInfo) ((forall a. IO a -> IO a) -> CString -> IO (Ptr PGconn)
openConnection IO r -> IO r
forall a. IO a -> IO a
unmask)
  (`onException` c_PQfinish connPtr) . unmask $ do
    status <- c_PQstatus connPtr
    when (status /= c_CONNECTION_OK) $
      throwLibPQError connPtr fname
    F.forM_ csClientEncoding $ \Text
enc -> do
      res <- ByteString -> (CString -> IO CInt) -> IO CInt
forall a. ByteString -> (CString -> IO a) -> IO a
BS.useAsCString (Text -> ByteString
T.encodeUtf8 Text
enc) (Ptr PGconn -> CString -> IO CInt
c_PQsetClientEncoding Ptr PGconn
connPtr)
      when (res == -1) $
        throwLibPQError connPtr fname
    c_PQinitTypes connPtr
    registerComposites connPtr csComposites
    conn <- do
      preparedQueries <- newIORef S.empty
      pure
        Connection
          { connPtr = connPtr
          , connBackendPid = noBackendPid
          , connPreparedQueries = preparedQueries
          }
    F.forM_ csRole $ \RawSQL ()
role -> Connection
-> RawSQL ()
-> IO
     (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
forall sql.
(HasCallStack, IsSQL sql) =>
Connection
-> sql
-> IO
     (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
runQueryIO Connection
conn (RawSQL ()
 -> IO
      (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats))
-> RawSQL ()
-> IO
     (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
forall a b. (a -> b) -> a -> b
$ RawSQL ()
"SET ROLE " RawSQL () -> RawSQL () -> RawSQL ()
forall a. Semigroup a => a -> a -> a
<> RawSQL ()
role

    let selectPid = RawSQL ()
"SELECT pg_backend_pid()" :: RawSQL ()
    (_, res, _) <- runQueryIO conn selectPid
    case F.toList $ mkQueryResult @(Identity Int32) selectPid noBackendPid res of
      [Identity Int32
pid] -> Connection -> IO Connection
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Connection -> IO Connection) -> Connection -> IO Connection
forall a b. (a -> b) -> a -> b
$ Connection
conn {connBackendPid = BackendPid $ fromIntegral pid}
      [Identity Int32]
pids -> do
        let err :: HPQTypesError
err = String -> HPQTypesError
HPQTypesError (String -> HPQTypesError) -> String -> HPQTypesError
forall a b. (a -> b) -> a -> b
$ String
"unexpected backend pid: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ [Identity Int32] -> String
forall a. Show a => a -> String
show [Identity Int32]
pids
        RawSQL () -> BackendPid -> SomeException -> IO Connection
forall sql a.
(HasCallStack, IsSQL sql) =>
sql -> BackendPid -> SomeException -> IO a
rethrowWithContext RawSQL ()
selectPid BackendPid
noBackendPid (SomeException -> IO Connection) -> SomeException -> IO Connection
forall a b. (a -> b) -> a -> b
$ HPQTypesError -> SomeException
forall e. Exception e => e -> SomeException
toException HPQTypesError
err
  where
    fname :: String
fname = String
"connect"

    openConnection :: (forall r. IO r -> IO r) -> CString -> IO (Ptr PGconn)
    openConnection :: (forall a. IO a -> IO a) -> CString -> IO (Ptr PGconn)
openConnection forall a. IO a -> IO a
unmask CString
conninfo = do
      -- We use synchronous version of connecting to the database using
      -- 'PQconnectdb' instead of 'PQconnectStart' and 'PQconnectPoll', because
      -- the second method doesn't properly support the connect_timeout
      -- parameter from the connection string nor multihost setups.
      --
      -- The disadvantage of this is that a call to 'PQconnectdb' cannot be
      -- interrupted if the Haskell thread running it receives an asynchronous
      -- exception, so to guarantee prompt return in such scenario 'PQconnectdb'
      -- is run in a separate child thread. If the parent receives an exception
      -- while the child still runs, the child is signaled to clean up after
      -- itself and left behind.
      connVar <- IO (TMVar (Ptr PGconn))
forall a. IO (TMVar a)
newEmptyTMVarIO
      runningVar <- newTVarIO True
      _ <- forkIO $ do
        conn <- c_PQconnectdb conninfo
        join . atomically $
          readTVar runningVar >>= \case
            Bool
True -> do
              TMVar (Ptr PGconn) -> Ptr PGconn -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Ptr PGconn)
connVar Ptr PGconn
conn
              IO () -> STM (IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (IO () -> STM (IO ())) -> IO () -> STM (IO ())
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            Bool
False -> IO () -> STM (IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (IO () -> STM (IO ())) -> IO () -> STM (IO ())
forall a b. (a -> b) -> a -> b
$ Ptr PGconn -> IO ()
c_PQfinish Ptr PGconn
conn
      conn <-
        atomically (takeTMVar connVar) `onException` do
          join . atomically $ do
            writeTVar runningVar False
            maybe (pure ()) c_PQfinish <$> tryTakeTMVar connVar
      (`onException` c_PQfinish conn) . unmask $ do
        when (conn == nullPtr) $ do
          throwError "PQconnectdb returned a null pointer"
        status <- c_PQstatus conn
        when (status /= c_CONNECTION_OK) $ do
          merr <- c_PQerrorMessage conn >>= safePeekCString
          let reason = String -> ShowS -> Maybe String -> String
forall b a. b -> (a -> b) -> Maybe a -> b
maybe String
"" (String
": " String -> ShowS
forall a. Semigroup a => a -> a -> a
<>) Maybe String
merr
          throwError $ "openConnection failed" <> reason
        pure conn
      where
        throwError :: String -> IO a
        throwError :: forall a. String -> IO a
throwError = String -> IO a
forall a. HasCallStack => String -> IO a
hpqTypesError (String -> IO a) -> ShowS -> String -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (String
fname String -> ShowS
forall a. [a] -> [a] -> [a]
++) ShowS -> ShowS -> ShowS
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (String
": " String -> ShowS
forall a. [a] -> [a] -> [a]
++)

-- | Low-level function for disconnecting from the database. Useful if one wants
-- to implement custom connection source.
disconnect :: Connection -> IO ()
disconnect :: Connection -> IO ()
disconnect Connection {Ptr PGconn
IORef (Set Text)
BackendPid
connPtr :: Connection -> Ptr PGconn
connBackendPid :: Connection -> BackendPid
connPreparedQueries :: Connection -> IORef (Set Text)
connPtr :: Ptr PGconn
connBackendPid :: BackendPid
connPreparedQueries :: IORef (Set Text)
..} = do
  -- This covers the case when a connection is closed while other Haskell
  -- threads are using GHC's IO manager to wait on the descriptor. This is
  -- commonly the case with asynchronous notifications, for example. Since libpq
  -- is responsible for opening and closing the file descriptor, GHC's IO
  -- manager needs to be informed that the file descriptor has been closed. The
  -- IO manager will then raise an exception in those threads.
  Ptr PGconn -> IO Fd
c_PQsocket Ptr PGconn
connPtr IO Fd -> (Fd -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    -1 -> Ptr PGconn -> IO ()
c_PQfinish Ptr PGconn
connPtr -- can happen if the connection is bad/lost
    Fd
fd -> (Fd -> IO ()) -> Fd -> IO ()
closeFdWith (\Fd
_ -> Ptr PGconn -> IO ()
c_PQfinish Ptr PGconn
connPtr) Fd
fd

----------------------------------------
-- Query running

-- | Low-level function for running an SQL query.
runQueryIO
  :: (HasCallStack, IsSQL sql)
  => Connection
  -> sql
  -> IO (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
runQueryIO :: forall sql.
(HasCallStack, IsSQL sql) =>
Connection
-> sql
-> IO
     (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
runQueryIO conn :: Connection
conn@Connection {Ptr PGconn
IORef (Set Text)
BackendPid
connPtr :: Connection -> Ptr PGconn
connBackendPid :: Connection -> BackendPid
connPreparedQueries :: Connection -> IORef (Set Text)
connPtr :: Ptr PGconn
connBackendPid :: BackendPid
connPreparedQueries :: IORef (Set Text)
..} sql
sql = do
  Connection
-> sql
-> IO (Int, ForeignPtr PGresult)
-> IO
     (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
forall sql.
(HasCallStack, IsSQL sql) =>
Connection
-> sql
-> IO (Int, ForeignPtr PGresult)
-> IO
     (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
runQueryImpl Connection
conn sql
sql (IO (Int, ForeignPtr PGresult)
 -> IO
      (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats))
-> IO (Int, ForeignPtr PGresult)
-> IO
     (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
forall a b. (a -> b) -> a -> b
$ do
    let allocParam :: ParamAllocator
allocParam = (forall r. (Ptr PGparam -> IO r) -> IO r) -> ParamAllocator
ParamAllocator ((forall r. (Ptr PGparam -> IO r) -> IO r) -> ParamAllocator)
-> (forall r. (Ptr PGparam -> IO r) -> IO r) -> ParamAllocator
forall a b. (a -> b) -> a -> b
$ Ptr PGconn -> (Ptr PGparam -> IO r) -> IO r
forall r.
HasCallStack =>
Ptr PGconn -> (Ptr PGparam -> IO r) -> IO r
withPGparam Ptr PGconn
connPtr
    sql
-> ParamAllocator
-> (Ptr PGparam -> CString -> IO (Int, ForeignPtr PGresult))
-> IO (Int, ForeignPtr PGresult)
forall r.
sql -> ParamAllocator -> (Ptr PGparam -> CString -> IO r) -> IO r
forall sql r.
IsSQL sql =>
sql -> ParamAllocator -> (Ptr PGparam -> CString -> IO r) -> IO r
withSQL sql
sql ParamAllocator
allocParam ((Ptr PGparam -> CString -> IO (Int, ForeignPtr PGresult))
 -> IO (Int, ForeignPtr PGresult))
-> (Ptr PGparam -> CString -> IO (Int, ForeignPtr PGresult))
-> IO (Int, ForeignPtr PGresult)
forall a b. (a -> b) -> a -> b
$ \Ptr PGparam
param CString
query ->
      (,)
        (Int -> ForeignPtr PGresult -> (Int, ForeignPtr PGresult))
-> IO Int -> IO (ForeignPtr PGresult -> (Int, ForeignPtr PGresult))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (CInt -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (CInt -> Int) -> IO CInt -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Ptr PGparam -> IO CInt
c_PQparamCount Ptr PGparam
param)
        IO (ForeignPtr PGresult -> (Int, ForeignPtr PGresult))
-> IO (ForeignPtr PGresult) -> IO (Int, ForeignPtr PGresult)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Ptr PGconn
-> Ptr PGerror
-> Ptr PGparam
-> CString
-> ResultFormat
-> IO (ForeignPtr PGresult)
c_PQparamExec Ptr PGconn
connPtr Ptr PGerror
forall a. Ptr a
nullPtr Ptr PGparam
param CString
query ResultFormat
c_RESULT_BINARY

-- | Name of a prepared query.
newtype QueryName = QueryName T.Text
  deriving (QueryName -> QueryName -> Bool
(QueryName -> QueryName -> Bool)
-> (QueryName -> QueryName -> Bool) -> Eq QueryName
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: QueryName -> QueryName -> Bool
== :: QueryName -> QueryName -> Bool
$c/= :: QueryName -> QueryName -> Bool
/= :: QueryName -> QueryName -> Bool
Eq, Eq QueryName
Eq QueryName =>
(QueryName -> QueryName -> Ordering)
-> (QueryName -> QueryName -> Bool)
-> (QueryName -> QueryName -> Bool)
-> (QueryName -> QueryName -> Bool)
-> (QueryName -> QueryName -> Bool)
-> (QueryName -> QueryName -> QueryName)
-> (QueryName -> QueryName -> QueryName)
-> Ord QueryName
QueryName -> QueryName -> Bool
QueryName -> QueryName -> Ordering
QueryName -> QueryName -> QueryName
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: QueryName -> QueryName -> Ordering
compare :: QueryName -> QueryName -> Ordering
$c< :: QueryName -> QueryName -> Bool
< :: QueryName -> QueryName -> Bool
$c<= :: QueryName -> QueryName -> Bool
<= :: QueryName -> QueryName -> Bool
$c> :: QueryName -> QueryName -> Bool
> :: QueryName -> QueryName -> Bool
$c>= :: QueryName -> QueryName -> Bool
>= :: QueryName -> QueryName -> Bool
$cmax :: QueryName -> QueryName -> QueryName
max :: QueryName -> QueryName -> QueryName
$cmin :: QueryName -> QueryName -> QueryName
min :: QueryName -> QueryName -> QueryName
Ord, Int -> QueryName -> ShowS
[QueryName] -> ShowS
QueryName -> String
(Int -> QueryName -> ShowS)
-> (QueryName -> String)
-> ([QueryName] -> ShowS)
-> Show QueryName
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> QueryName -> ShowS
showsPrec :: Int -> QueryName -> ShowS
$cshow :: QueryName -> String
show :: QueryName -> String
$cshowList :: [QueryName] -> ShowS
showList :: [QueryName] -> ShowS
Show, String -> QueryName
(String -> QueryName) -> IsString QueryName
forall a. (String -> a) -> IsString a
$cfromString :: String -> QueryName
fromString :: String -> QueryName
IsString)

-- | Low-level function for running a prepared SQL query.
runPreparedQueryIO
  :: (HasCallStack, IsSQL sql)
  => Connection
  -> QueryName
  -> sql
  -> IO (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
runPreparedQueryIO :: forall sql.
(HasCallStack, IsSQL sql) =>
Connection
-> QueryName
-> sql
-> IO
     (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
runPreparedQueryIO conn :: Connection
conn@Connection {Ptr PGconn
IORef (Set Text)
BackendPid
connPtr :: Connection -> Ptr PGconn
connBackendPid :: Connection -> BackendPid
connPreparedQueries :: Connection -> IORef (Set Text)
connPtr :: Ptr PGconn
connBackendPid :: BackendPid
connPreparedQueries :: IORef (Set Text)
..} (QueryName Text
queryName) sql
sql = do
  Connection
-> sql
-> IO (Int, ForeignPtr PGresult)
-> IO
     (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
forall sql.
(HasCallStack, IsSQL sql) =>
Connection
-> sql
-> IO (Int, ForeignPtr PGresult)
-> IO
     (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
runQueryImpl Connection
conn sql
sql (IO (Int, ForeignPtr PGresult)
 -> IO
      (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats))
-> IO (Int, ForeignPtr PGresult)
-> IO
     (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
forall a b. (a -> b) -> a -> b
$ do
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Text -> Bool
T.null Text
queryName) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      DBException -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
E.throwIO
        DBException
          { dbeQueryContext :: sql
dbeQueryContext = sql
sql
          , dbeBackendPid :: BackendPid
dbeBackendPid = BackendPid
connBackendPid
          , dbeError :: HPQTypesError
dbeError = String -> HPQTypesError
HPQTypesError String
"runPreparedQueryIO: unnamed prepared query is not supported"
          , dbeCallStack :: CallStack
dbeCallStack = CallStack
HasCallStack => CallStack
callStack
          }
    let allocParam :: ParamAllocator
allocParam = (forall r. (Ptr PGparam -> IO r) -> IO r) -> ParamAllocator
ParamAllocator ((forall r. (Ptr PGparam -> IO r) -> IO r) -> ParamAllocator)
-> (forall r. (Ptr PGparam -> IO r) -> IO r) -> ParamAllocator
forall a b. (a -> b) -> a -> b
$ Ptr PGconn -> (Ptr PGparam -> IO r) -> IO r
forall r.
HasCallStack =>
Ptr PGconn -> (Ptr PGparam -> IO r) -> IO r
withPGparam Ptr PGconn
connPtr
    sql
-> ParamAllocator
-> (Ptr PGparam -> CString -> IO (Int, ForeignPtr PGresult))
-> IO (Int, ForeignPtr PGresult)
forall r.
sql -> ParamAllocator -> (Ptr PGparam -> CString -> IO r) -> IO r
forall sql r.
IsSQL sql =>
sql -> ParamAllocator -> (Ptr PGparam -> CString -> IO r) -> IO r
withSQL sql
sql ParamAllocator
allocParam ((Ptr PGparam -> CString -> IO (Int, ForeignPtr PGresult))
 -> IO (Int, ForeignPtr PGresult))
-> (Ptr PGparam -> CString -> IO (Int, ForeignPtr PGresult))
-> IO (Int, ForeignPtr PGresult)
forall a b. (a -> b) -> a -> b
$ \Ptr PGparam
param CString
query -> do
      preparedQueries <- IORef (Set Text) -> IO (Set Text)
forall a. IORef a -> IO a
readIORef IORef (Set Text)
connPreparedQueries
      BS.useAsCString (T.encodeUtf8 queryName) $ \CString
cname -> do
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Text
queryName Text -> Set Text -> Bool
forall a. Ord a => a -> Set a -> Bool
`S.notMember` Set Text
preparedQueries) (IO () -> IO ()) -> (IO () -> IO ()) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ()
forall a. IO a -> IO a
E.mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          -- Mask asynchronous exceptions, because if preparation of the query
          -- succeeds, we need to reflect that fact in cdPreparedQueries since
          -- you can't prepare a query with the same name more than once.
          res <- Ptr PGconn
-> Ptr PGerror
-> Ptr PGparam
-> CString
-> CString
-> IO (ForeignPtr PGresult)
c_PQparamPrepare Ptr PGconn
connPtr Ptr PGerror
forall a. Ptr a
nullPtr Ptr PGparam
param CString
cname CString
query
          void . withForeignPtr res $ verifyResult sql connBackendPid connPtr
          modifyIORef' connPreparedQueries $ S.insert queryName
        (,)
          (Int -> ForeignPtr PGresult -> (Int, ForeignPtr PGresult))
-> IO Int -> IO (ForeignPtr PGresult -> (Int, ForeignPtr PGresult))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (CInt -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (CInt -> Int) -> IO CInt -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Ptr PGparam -> IO CInt
c_PQparamCount Ptr PGparam
param)
          IO (ForeignPtr PGresult -> (Int, ForeignPtr PGresult))
-> IO (ForeignPtr PGresult) -> IO (Int, ForeignPtr PGresult)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Ptr PGconn
-> Ptr PGerror
-> Ptr PGparam
-> CString
-> ResultFormat
-> IO (ForeignPtr PGresult)
c_PQparamExecPrepared Ptr PGconn
connPtr Ptr PGerror
forall a. Ptr a
nullPtr Ptr PGparam
param CString
cname ResultFormat
c_RESULT_BINARY

-- | Shared implementation of 'runQueryIO' and 'runPreparedQueryIO'.
runQueryImpl
  :: (HasCallStack, IsSQL sql)
  => Connection
  -> sql
  -> IO (Int, ForeignPtr PGresult)
  -> IO (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
runQueryImpl :: forall sql.
(HasCallStack, IsSQL sql) =>
Connection
-> sql
-> IO (Int, ForeignPtr PGresult)
-> IO
     (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
runQueryImpl Connection {Ptr PGconn
IORef (Set Text)
BackendPid
connPtr :: Connection -> Ptr PGconn
connBackendPid :: Connection -> BackendPid
connPreparedQueries :: Connection -> IORef (Set Text)
connPtr :: Ptr PGconn
connBackendPid :: BackendPid
connPreparedQueries :: IORef (Set Text)
..} sql
sql IO (Int, ForeignPtr PGresult)
execSql = do
  IO MaskingState
E.getMaskingState IO MaskingState
-> (MaskingState
    -> IO
         (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats))
-> IO
     (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    MaskingState
E.MaskedUninterruptible -> do
      -- If asynchronous exceptions are already hard-masked, skip spawning a
      -- separate worker thread and the interruption logic as it won't do
      -- anything at this point.
      doRunQuery
    MaskingState
_ -> ((forall a. IO a -> IO a)
 -> IO
      (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats))
-> IO
     (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
E.uninterruptibleMask (((forall a. IO a -> IO a)
  -> IO
       (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats))
 -> IO
      (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats))
-> ((forall a. IO a -> IO a)
    -> IO
         (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats))
-> IO
     (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
      -- While the query runs, the current thread will not be able to receive
      -- asynchronous exceptions. This prevents clients of the library from
      -- interrupting execution of the query. To remedy that we spawn a separate
      -- thread for the query execution and while we wait for its completion, we
      -- are able to receive asynchronous exceptions (assuming that threaded GHC
      -- runtime system is used) and react appropriately.
      queryRunner <- ((forall a. IO a -> IO a)
 -> IO
      (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats))
-> IO
     (Async
        (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats))
forall a. ((forall a. IO a -> IO a) -> IO a) -> IO (Async a)
asyncWithUnmask (((forall a. IO a -> IO a)
  -> IO
       (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats))
 -> IO
      (Async
         (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)))
-> ((forall a. IO a -> IO a)
    -> IO
         (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats))
-> IO
     (Async
        (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats))
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> do
        -- Unconditionally unmask asynchronous exceptions here so that 'cancel'
        -- operation potentially invoked below works as expected.
        IO (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
-> IO
     (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
forall a. IO a -> IO a
unmask IO (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
doRunQuery
      -- If we receive an exception while waiting for the execution to complete,
      -- we need to send a request to PostgreSQL for query cancellation and wait
      -- for the query runner thread to terminate. It is paramount we make the
      -- exception handler uninterruptible as we can't exit from the main block
      -- until the query runner thread has terminated.
      E.onException (restore $ wait queryRunner) $ do
        c_PQcancel connPtr >>= \case
          -- If query cancellation request was successfully processed, there is
          -- nothing else to do apart from waiting for the runner to terminate.
          Maybe String
Nothing -> Async
  (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
-> IO ()
forall a. Async a -> IO ()
cancel Async
  (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
queryRunner
          -- Otherwise we check what happened with the runner. If it already
          -- finished we're fine, just ignore the result. If it didn't, something
          -- weird is going on. Maybe the cancellation request went through when
          -- the thread wasn't making a request to the server? In any case, try to
          -- cancel again and wait for the thread to terminate.
          Just String
_ ->
            Async
  (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
-> IO
     (Maybe
        (Either
           SomeException
           (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)))
forall a. Async a -> IO (Maybe (Either SomeException a))
poll Async
  (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
queryRunner IO
  (Maybe
     (Either
        SomeException
        (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)))
-> (Maybe
      (Either
         SomeException
         (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats))
    -> IO ())
-> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
              Just Either
  SomeException
  (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
              Maybe
  (Either
     SomeException
     (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats))
Nothing -> do
                IO (Maybe String) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe String) -> IO ()) -> IO (Maybe String) -> IO ()
forall a b. (a -> b) -> a -> b
$ Ptr PGconn -> IO (Maybe String)
c_PQcancel Ptr PGconn
connPtr
                Async
  (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
-> IO ()
forall a. Async a -> IO ()
cancel Async
  (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
queryRunner
  where
    doRunQuery :: IO (Int, ForeignPtr PGresult, ConnectionStats -> ConnectionStats)
doRunQuery = do
      t1 <- IO Double
getMonotonicTime
      (paramCount, res) <- execSql
      t2 <- getMonotonicTime
      affected <- withForeignPtr res $ verifyResult sql connBackendPid connPtr
      updateStats <- case affected of
        Left Int
_ ->
          (ConnectionStats -> ConnectionStats)
-> IO (ConnectionStats -> ConnectionStats)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((ConnectionStats -> ConnectionStats)
 -> IO (ConnectionStats -> ConnectionStats))
-> (ConnectionStats -> ConnectionStats)
-> IO (ConnectionStats -> ConnectionStats)
forall a b. (a -> b) -> a -> b
$ \ConnectionStats
stats ->
            ConnectionStats
stats
              { statsQueries = statsQueries stats + 1
              , statsParams = statsParams stats + paramCount
              , statsTime = statsTime stats + (t2 - t1)
              }
        Right Int
rows -> do
          columns <- CInt -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (CInt -> Int) -> IO CInt -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ForeignPtr PGresult -> (Ptr PGresult -> IO CInt) -> IO CInt
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr PGresult
res Ptr PGresult -> IO CInt
c_PQnfields
          pure $ \ConnectionStats
stats ->
            ConnectionStats
              { statsQueries :: Int
statsQueries = ConnectionStats -> Int
statsQueries ConnectionStats
stats Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
              , statsRows :: Int
statsRows = ConnectionStats -> Int
statsRows ConnectionStats
stats Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
rows
              , statsValues :: Int
statsValues = ConnectionStats -> Int
statsValues ConnectionStats
stats Int -> Int -> Int
forall a. Num a => a -> a -> a
+ (Int
rows Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
columns)
              , statsParams :: Int
statsParams = ConnectionStats -> Int
statsParams ConnectionStats
stats Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
paramCount
              , statsTime :: Double
statsTime = ConnectionStats -> Double
statsTime ConnectionStats
stats Double -> Double -> Double
forall a. Num a => a -> a -> a
+ (Double
t2 Double -> Double -> Double
forall a. Num a => a -> a -> a
- Double
t1)
              }
      pure (either id id affected, res, updateStats)

verifyResult
  :: (HasCallStack, IsSQL sql)
  => sql
  -> BackendPid
  -> Ptr PGconn
  -> Ptr PGresult
  -> IO (Either Int Int)
verifyResult :: forall sql.
(HasCallStack, IsSQL sql) =>
sql
-> BackendPid -> Ptr PGconn -> Ptr PGresult -> IO (Either Int Int)
verifyResult sql
sql BackendPid
pid Ptr PGconn
conn Ptr PGresult
res = do
  -- works even if res is NULL
  rst <- Ptr PGresult -> IO ExecStatusType
c_PQresultStatus Ptr PGresult
res
  case rst of
    ExecStatusType
_ | ExecStatusType
rst ExecStatusType -> ExecStatusType -> Bool
forall a. Eq a => a -> a -> Bool
== ExecStatusType
c_PGRES_COMMAND_OK -> do
      sn <- Ptr PGresult -> IO CString
c_PQcmdTuples Ptr PGresult
res IO CString -> (CString -> IO ByteString) -> IO ByteString
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= CString -> IO ByteString
BS.packCString
      case BS.readInt sn of
        Maybe (Int, ByteString)
Nothing
          | ByteString -> Bool
BS.null ByteString
sn -> Either Int Int -> IO (Either Int Int)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either Int Int -> IO (Either Int Int))
-> (Int -> Either Int Int) -> Int -> IO (Either Int Int)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Either Int Int
forall a b. a -> Either a b
Left (Int -> IO (Either Int Int)) -> Int -> IO (Either Int Int)
forall a b. (a -> b) -> a -> b
$ Int
0
          | Bool
otherwise -> ByteString -> IO (Either Int Int)
throwParseError ByteString
sn
        Just (Int
n, ByteString
rest)
          | ByteString
rest ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= ByteString
BS.empty -> ByteString -> IO (Either Int Int)
throwParseError ByteString
sn
          | Bool
otherwise -> Either Int Int -> IO (Either Int Int)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either Int Int -> IO (Either Int Int))
-> (Int -> Either Int Int) -> Int -> IO (Either Int Int)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Either Int Int
forall a b. a -> Either a b
Left (Int -> IO (Either Int Int)) -> Int -> IO (Either Int Int)
forall a b. (a -> b) -> a -> b
$ Int
n
    ExecStatusType
_ | ExecStatusType
rst ExecStatusType -> ExecStatusType -> Bool
forall a. Eq a => a -> a -> Bool
== ExecStatusType
c_PGRES_TUPLES_OK -> Int -> Either Int Int
forall a b. b -> Either a b
Right (Int -> Either Int Int) -> (CInt -> Int) -> CInt -> Either Int Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CInt -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (CInt -> Either Int Int) -> IO CInt -> IO (Either Int Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Ptr PGresult -> IO CInt
c_PQntuples Ptr PGresult
res
    ExecStatusType
_ | ExecStatusType
rst ExecStatusType -> ExecStatusType -> Bool
forall a. Eq a => a -> a -> Bool
== ExecStatusType
c_PGRES_FATAL_ERROR -> IO (Either Int Int)
throwSQLError
    ExecStatusType
_ | ExecStatusType
rst ExecStatusType -> ExecStatusType -> Bool
forall a. Eq a => a -> a -> Bool
== ExecStatusType
c_PGRES_BAD_RESPONSE -> IO (Either Int Int)
throwSQLError
    ExecStatusType
_ | Bool
otherwise -> Either Int Int -> IO (Either Int Int)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either Int Int -> IO (Either Int Int))
-> (Int -> Either Int Int) -> Int -> IO (Either Int Int)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Either Int Int
forall a b. a -> Either a b
Left (Int -> IO (Either Int Int)) -> Int -> IO (Either Int Int)
forall a b. (a -> b) -> a -> b
$ Int
0
  where
    throwSQLError :: IO (Either Int Int)
throwSQLError =
      sql -> BackendPid -> SomeException -> IO (Either Int Int)
forall sql a.
(HasCallStack, IsSQL sql) =>
sql -> BackendPid -> SomeException -> IO a
rethrowWithContext sql
sql BackendPid
pid
        (SomeException -> IO (Either Int Int))
-> IO SomeException -> IO (Either Int Int)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< if Ptr PGresult
res Ptr PGresult -> Ptr PGresult -> Bool
forall a. Eq a => a -> a -> Bool
== Ptr PGresult
forall a. Ptr a
nullPtr
          then
            QueryError -> SomeException
forall e. Exception e => e -> SomeException
E.toException (QueryError -> SomeException)
-> (String -> QueryError) -> String -> SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> QueryError
QueryError (String -> SomeException) -> IO String -> IO SomeException
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (CString -> IO String
safePeekCString' (CString -> IO String) -> IO CString -> IO String
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Ptr PGconn -> IO CString
c_PQerrorMessage Ptr PGconn
conn)
          else
            DetailedQueryError -> SomeException
forall e. Exception e => e -> SomeException
E.toException
              (DetailedQueryError -> SomeException)
-> IO DetailedQueryError -> IO SomeException
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ( String
-> ErrorCode
-> String
-> Maybe String
-> Maybe String
-> Maybe Int
-> Maybe Int
-> Maybe String
-> Maybe String
-> Maybe String
-> Maybe Int
-> Maybe String
-> DetailedQueryError
DetailedQueryError
                      (String
 -> ErrorCode
 -> String
 -> Maybe String
 -> Maybe String
 -> Maybe Int
 -> Maybe Int
 -> Maybe String
 -> Maybe String
 -> Maybe String
 -> Maybe Int
 -> Maybe String
 -> DetailedQueryError)
-> IO String
-> IO
     (ErrorCode
      -> String
      -> Maybe String
      -> Maybe String
      -> Maybe Int
      -> Maybe Int
      -> Maybe String
      -> Maybe String
      -> Maybe String
      -> Maybe Int
      -> Maybe String
      -> DetailedQueryError)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ErrorField -> IO String
field ErrorField
c_PG_DIAG_SEVERITY
                      IO
  (ErrorCode
   -> String
   -> Maybe String
   -> Maybe String
   -> Maybe Int
   -> Maybe Int
   -> Maybe String
   -> Maybe String
   -> Maybe String
   -> Maybe Int
   -> Maybe String
   -> DetailedQueryError)
-> IO ErrorCode
-> IO
     (String
      -> Maybe String
      -> Maybe String
      -> Maybe Int
      -> Maybe Int
      -> Maybe String
      -> Maybe String
      -> Maybe String
      -> Maybe Int
      -> Maybe String
      -> DetailedQueryError)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (String -> ErrorCode
stringToErrorCode (String -> ErrorCode) -> IO String -> IO ErrorCode
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ErrorField -> IO String
field ErrorField
c_PG_DIAG_SQLSTATE)
                      IO
  (String
   -> Maybe String
   -> Maybe String
   -> Maybe Int
   -> Maybe Int
   -> Maybe String
   -> Maybe String
   -> Maybe String
   -> Maybe Int
   -> Maybe String
   -> DetailedQueryError)
-> IO String
-> IO
     (Maybe String
      -> Maybe String
      -> Maybe Int
      -> Maybe Int
      -> Maybe String
      -> Maybe String
      -> Maybe String
      -> Maybe Int
      -> Maybe String
      -> DetailedQueryError)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ErrorField -> IO String
field ErrorField
c_PG_DIAG_MESSAGE_PRIMARY
                      IO
  (Maybe String
   -> Maybe String
   -> Maybe Int
   -> Maybe Int
   -> Maybe String
   -> Maybe String
   -> Maybe String
   -> Maybe Int
   -> Maybe String
   -> DetailedQueryError)
-> IO (Maybe String)
-> IO
     (Maybe String
      -> Maybe Int
      -> Maybe Int
      -> Maybe String
      -> Maybe String
      -> Maybe String
      -> Maybe Int
      -> Maybe String
      -> DetailedQueryError)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ErrorField -> IO (Maybe String)
mfield ErrorField
c_PG_DIAG_MESSAGE_DETAIL
                      IO
  (Maybe String
   -> Maybe Int
   -> Maybe Int
   -> Maybe String
   -> Maybe String
   -> Maybe String
   -> Maybe Int
   -> Maybe String
   -> DetailedQueryError)
-> IO (Maybe String)
-> IO
     (Maybe Int
      -> Maybe Int
      -> Maybe String
      -> Maybe String
      -> Maybe String
      -> Maybe Int
      -> Maybe String
      -> DetailedQueryError)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ErrorField -> IO (Maybe String)
mfield ErrorField
c_PG_DIAG_MESSAGE_HINT
                      IO
  (Maybe Int
   -> Maybe Int
   -> Maybe String
   -> Maybe String
   -> Maybe String
   -> Maybe Int
   -> Maybe String
   -> DetailedQueryError)
-> IO (Maybe Int)
-> IO
     (Maybe Int
      -> Maybe String
      -> Maybe String
      -> Maybe String
      -> Maybe Int
      -> Maybe String
      -> DetailedQueryError)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ((String -> Maybe Int
forall a. Read a => String -> Maybe a
mread (String -> Maybe Int) -> Maybe String -> Maybe Int
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) (Maybe String -> Maybe Int) -> IO (Maybe String) -> IO (Maybe Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ErrorField -> IO (Maybe String)
mfield ErrorField
c_PG_DIAG_STATEMENT_POSITION)
                      IO
  (Maybe Int
   -> Maybe String
   -> Maybe String
   -> Maybe String
   -> Maybe Int
   -> Maybe String
   -> DetailedQueryError)
-> IO (Maybe Int)
-> IO
     (Maybe String
      -> Maybe String
      -> Maybe String
      -> Maybe Int
      -> Maybe String
      -> DetailedQueryError)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ((String -> Maybe Int
forall a. Read a => String -> Maybe a
mread (String -> Maybe Int) -> Maybe String -> Maybe Int
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) (Maybe String -> Maybe Int) -> IO (Maybe String) -> IO (Maybe Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ErrorField -> IO (Maybe String)
mfield ErrorField
c_PG_DIAG_INTERNAL_POSITION)
                      IO
  (Maybe String
   -> Maybe String
   -> Maybe String
   -> Maybe Int
   -> Maybe String
   -> DetailedQueryError)
-> IO (Maybe String)
-> IO
     (Maybe String
      -> Maybe String -> Maybe Int -> Maybe String -> DetailedQueryError)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ErrorField -> IO (Maybe String)
mfield ErrorField
c_PG_DIAG_INTERNAL_QUERY
                      IO
  (Maybe String
   -> Maybe String -> Maybe Int -> Maybe String -> DetailedQueryError)
-> IO (Maybe String)
-> IO
     (Maybe String -> Maybe Int -> Maybe String -> DetailedQueryError)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ErrorField -> IO (Maybe String)
mfield ErrorField
c_PG_DIAG_CONTEXT
                      IO
  (Maybe String -> Maybe Int -> Maybe String -> DetailedQueryError)
-> IO (Maybe String)
-> IO (Maybe Int -> Maybe String -> DetailedQueryError)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ErrorField -> IO (Maybe String)
mfield ErrorField
c_PG_DIAG_SOURCE_FILE
                      IO (Maybe Int -> Maybe String -> DetailedQueryError)
-> IO (Maybe Int) -> IO (Maybe String -> DetailedQueryError)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ((String -> Maybe Int
forall a. Read a => String -> Maybe a
mread (String -> Maybe Int) -> Maybe String -> Maybe Int
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) (Maybe String -> Maybe Int) -> IO (Maybe String) -> IO (Maybe Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ErrorField -> IO (Maybe String)
mfield ErrorField
c_PG_DIAG_SOURCE_LINE)
                      IO (Maybe String -> DetailedQueryError)
-> IO (Maybe String) -> IO DetailedQueryError
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ErrorField -> IO (Maybe String)
mfield ErrorField
c_PG_DIAG_SOURCE_FUNCTION
                  )
      where
        field :: ErrorField -> IO String
field ErrorField
f = String -> Maybe String -> String
forall a. a -> Maybe a -> a
fromMaybe String
"" (Maybe String -> String) -> IO (Maybe String) -> IO String
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ErrorField -> IO (Maybe String)
mfield ErrorField
f
        mfield :: ErrorField -> IO (Maybe String)
mfield ErrorField
f = CString -> IO (Maybe String)
safePeekCString (CString -> IO (Maybe String)) -> IO CString -> IO (Maybe String)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Ptr PGresult -> ErrorField -> IO CString
c_PQresultErrorField Ptr PGresult
res ErrorField
f

    throwParseError :: ByteString -> IO (Either Int Int)
throwParseError ByteString
sn =
      DBException -> IO (Either Int Int)
forall e a. (HasCallStack, Exception e) => e -> IO a
E.throwIO
        DBException
          { dbeQueryContext :: sql
dbeQueryContext = sql
sql
          , dbeBackendPid :: BackendPid
dbeBackendPid = BackendPid
pid
          , dbeError :: HPQTypesError
dbeError = String -> HPQTypesError
HPQTypesError (String
"verifyResult: string returned by PQcmdTuples is not a valid number: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ ByteString -> String
forall a. Show a => a -> String
show ByteString
sn)
          , dbeCallStack :: CallStack
dbeCallStack = CallStack
HasCallStack => CallStack
callStack
          }