module Database.Bolty.Pool
( BoltPool(..)
, PoolConfig(..)
, defaultPoolConfig
, ValidationStrategy(..)
, RetryConfig(..)
, defaultRetryConfig
, PoolCounters(..)
, createPool
, destroyPool
, withConnection
, CheckedOutConnection(..)
, acquireConnection
, releaseConnection
, releaseConnectionOnError
, poolCounters
) where
import Control.Exception (SomeException, mask, try, fromException, throwIO)
import Data.IORef (IORef, newIORef, readIORef, atomicModifyIORef')
import Data.Pool (Pool)
import qualified Data.Pool as Pool
import Data.Word (Word64)
import GHC.Clock (getMonotonicTimeNSec)
import GHC.Stack (HasCallStack)
import Data.Kind (Type)
import Database.Bolty.Connection.Type (Connection, ValidatedConfig, Error(..))
import qualified Database.Bolty.Connection.Pipe as P
import Database.Bolty.Connection.Pipe (touchConnection, connectionLastActivity,
connectionServerIdleTimeout)
type RetryConfig :: Type
data RetryConfig = RetryConfig
{ RetryConfig -> Int
maxRetries :: !Int
, RetryConfig -> Int
initialDelay :: !Int
, RetryConfig -> Int
maxDelay :: !Int
}
deriving stock (Int -> RetryConfig -> ShowS
[RetryConfig] -> ShowS
RetryConfig -> String
(Int -> RetryConfig -> ShowS)
-> (RetryConfig -> String)
-> ([RetryConfig] -> ShowS)
-> Show RetryConfig
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RetryConfig -> ShowS
showsPrec :: Int -> RetryConfig -> ShowS
$cshow :: RetryConfig -> String
show :: RetryConfig -> String
$cshowList :: [RetryConfig] -> ShowS
showList :: [RetryConfig] -> ShowS
Show, RetryConfig -> RetryConfig -> Bool
(RetryConfig -> RetryConfig -> Bool)
-> (RetryConfig -> RetryConfig -> Bool) -> Eq RetryConfig
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: RetryConfig -> RetryConfig -> Bool
== :: RetryConfig -> RetryConfig -> Bool
$c/= :: RetryConfig -> RetryConfig -> Bool
/= :: RetryConfig -> RetryConfig -> Bool
Eq)
defaultRetryConfig :: RetryConfig
defaultRetryConfig :: RetryConfig
defaultRetryConfig = RetryConfig
{ maxRetries :: Int
maxRetries = Int
5
, initialDelay :: Int
initialDelay = Int
200_000
, maxDelay :: Int
maxDelay = Int
5_000_000
}
type ValidationStrategy :: Type
data ValidationStrategy
= AlwaysPing
| PingIfIdle !Int
| NeverPing
deriving stock (Int -> ValidationStrategy -> ShowS
[ValidationStrategy] -> ShowS
ValidationStrategy -> String
(Int -> ValidationStrategy -> ShowS)
-> (ValidationStrategy -> String)
-> ([ValidationStrategy] -> ShowS)
-> Show ValidationStrategy
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ValidationStrategy -> ShowS
showsPrec :: Int -> ValidationStrategy -> ShowS
$cshow :: ValidationStrategy -> String
show :: ValidationStrategy -> String
$cshowList :: [ValidationStrategy] -> ShowS
showList :: [ValidationStrategy] -> ShowS
Show, ValidationStrategy -> ValidationStrategy -> Bool
(ValidationStrategy -> ValidationStrategy -> Bool)
-> (ValidationStrategy -> ValidationStrategy -> Bool)
-> Eq ValidationStrategy
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ValidationStrategy -> ValidationStrategy -> Bool
== :: ValidationStrategy -> ValidationStrategy -> Bool
$c/= :: ValidationStrategy -> ValidationStrategy -> Bool
/= :: ValidationStrategy -> ValidationStrategy -> Bool
Eq)
type PoolConfig :: Type
data PoolConfig = PoolConfig
{ PoolConfig -> Int
maxConnections :: Int
, PoolConfig -> Double
idleTimeout :: Double
, PoolConfig -> Int
maxPingRetries :: Int
, PoolConfig -> RetryConfig
retryConfig :: RetryConfig
, PoolConfig -> ValidationStrategy
validationStrategy :: ValidationStrategy
}
defaultPoolConfig :: PoolConfig
defaultPoolConfig :: PoolConfig
defaultPoolConfig = PoolConfig
{ maxConnections :: Int
maxConnections = Int
10
, idleTimeout :: Double
idleTimeout = Double
60
, maxPingRetries :: Int
maxPingRetries = Int
1
, retryConfig :: RetryConfig
retryConfig = RetryConfig
defaultRetryConfig
, validationStrategy :: ValidationStrategy
validationStrategy = Int -> ValidationStrategy
PingIfIdle Int
30
}
type BoltPool :: Type
data BoltPool = BoltPool
{ BoltPool -> Pool Connection
bpPool :: !(Pool Connection)
, BoltPool -> Int
bpMaxRetries :: !Int
, BoltPool -> RetryConfig
bpRetryConfig :: !RetryConfig
, BoltPool -> ValidationStrategy
bpValidation :: !ValidationStrategy
, BoltPool -> IORef Int
bpActiveConns :: !(IORef Int)
, BoltPool -> IORef Int
bpTotalAcqs :: !(IORef Int)
, BoltPool -> IORef Word64
bpTotalWaitNs :: !(IORef Word64)
}
type PoolCounters :: Type
data PoolCounters = PoolCounters
{ PoolCounters -> Int
pcActive :: !Int
, PoolCounters -> Int
pcTotalAcqs :: !Int
, PoolCounters -> Word64
pcTotalWaitNs :: !Word64
} deriving stock (Int -> PoolCounters -> ShowS
[PoolCounters] -> ShowS
PoolCounters -> String
(Int -> PoolCounters -> ShowS)
-> (PoolCounters -> String)
-> ([PoolCounters] -> ShowS)
-> Show PoolCounters
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> PoolCounters -> ShowS
showsPrec :: Int -> PoolCounters -> ShowS
$cshow :: PoolCounters -> String
show :: PoolCounters -> String
$cshowList :: [PoolCounters] -> ShowS
showList :: [PoolCounters] -> ShowS
Show, PoolCounters -> PoolCounters -> Bool
(PoolCounters -> PoolCounters -> Bool)
-> (PoolCounters -> PoolCounters -> Bool) -> Eq PoolCounters
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: PoolCounters -> PoolCounters -> Bool
== :: PoolCounters -> PoolCounters -> Bool
$c/= :: PoolCounters -> PoolCounters -> Bool
/= :: PoolCounters -> PoolCounters -> Bool
Eq)
createPool :: HasCallStack => ValidatedConfig -> PoolConfig -> IO BoltPool
createPool :: HasCallStack => ValidatedConfig -> PoolConfig -> IO BoltPool
createPool ValidatedConfig
cfg PoolConfig{Double
Int
ValidationStrategy
RetryConfig
maxConnections :: PoolConfig -> Int
idleTimeout :: PoolConfig -> Double
maxPingRetries :: PoolConfig -> Int
retryConfig :: PoolConfig -> RetryConfig
validationStrategy :: PoolConfig -> ValidationStrategy
maxConnections :: Int
idleTimeout :: Double
maxPingRetries :: Int
retryConfig :: RetryConfig
validationStrategy :: ValidationStrategy
..} = do
probe <- ValidatedConfig -> IO Connection
forall (m :: * -> *).
(MonadIO m, HasCallStack) =>
ValidatedConfig -> m Connection
P.connect ValidatedConfig
cfg
let effectiveIdle = case Connection -> Maybe Int
connectionServerIdleTimeout Connection
probe of
Just Int
serverSecs | Int
serverSecs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
5 ->
Double -> Double -> Double
forall a. Ord a => a -> a -> a
min Double
idleTimeout (Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int
serverSecs Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
5))
Maybe Int
_ -> Double
idleTimeout
P.close probe
let poolCfg = Maybe Int -> PoolConfig Connection -> PoolConfig Connection
forall a. Maybe Int -> PoolConfig a -> PoolConfig a
Pool.setNumStripes (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
(PoolConfig Connection -> PoolConfig Connection)
-> PoolConfig Connection -> PoolConfig Connection
forall a b. (a -> b) -> a -> b
$ IO Connection
-> (Connection -> IO ()) -> Double -> Int -> PoolConfig Connection
forall a. IO a -> (a -> IO ()) -> Double -> Int -> PoolConfig a
Pool.defaultPoolConfig
(ValidatedConfig -> IO Connection
forall (m :: * -> *).
(MonadIO m, HasCallStack) =>
ValidatedConfig -> m Connection
P.connect ValidatedConfig
cfg)
Connection -> IO ()
forall (m :: * -> *).
(MonadIO m, HasCallStack) =>
Connection -> m ()
P.close
Double
effectiveIdle
Int
maxConnections
pool <- Pool.newPool poolCfg
activeRef <- newIORef 0
acqRef <- newIORef 0
waitRef <- newIORef 0
pure BoltPool { bpPool = pool, bpMaxRetries = maxPingRetries
, bpRetryConfig = retryConfig, bpValidation = validationStrategy
, bpActiveConns = activeRef, bpTotalAcqs = acqRef, bpTotalWaitNs = waitRef }
destroyPool :: BoltPool -> IO ()
destroyPool :: BoltPool -> IO ()
destroyPool BoltPool{Pool Connection
bpPool :: BoltPool -> Pool Connection
bpPool :: Pool Connection
bpPool} = Pool Connection -> IO ()
forall a. Pool a -> IO ()
Pool.destroyAllResources Pool Connection
bpPool
withConnection :: HasCallStack => BoltPool -> (Connection -> IO a) -> IO a
withConnection :: forall a. HasCallStack => BoltPool -> (Connection -> IO a) -> IO a
withConnection BoltPool{Pool Connection
bpPool :: BoltPool -> Pool Connection
bpPool :: Pool Connection
bpPool, Int
bpMaxRetries :: BoltPool -> Int
bpMaxRetries :: Int
bpMaxRetries, ValidationStrategy
bpValidation :: BoltPool -> ValidationStrategy
bpValidation :: ValidationStrategy
bpValidation, IORef Int
bpActiveConns :: BoltPool -> IORef Int
bpActiveConns :: IORef Int
bpActiveConns, IORef Int
bpTotalAcqs :: BoltPool -> IORef Int
bpTotalAcqs :: IORef Int
bpTotalAcqs, IORef Word64
bpTotalWaitNs :: BoltPool -> IORef Word64
bpTotalWaitNs :: IORef Word64
bpTotalWaitNs} Connection -> IO a
action =
Int -> Bool -> IO a
go (Int
bpMaxRetries Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Bool
True
where
go :: Int -> Bool -> IO a
go Int
0 Bool
_ = String -> IO a
forall a. String -> IO a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"withConnection: no healthy connection available"
go Int
n Bool
canRetryDead = ((forall a. IO a -> IO a) -> IO a) -> IO a
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO a) -> IO a)
-> ((forall a. IO a -> IO a) -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
t0 <- IO Word64
getMonotonicTimeNSec
(conn, localPool) <- Pool.takeResource bpPool
t1 <- getMonotonicTimeNSec
atomicModifyIORef' bpActiveConns $ \Int
x -> (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1, ())
atomicModifyIORef' bpTotalAcqs $ \Int
x -> (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1, ())
atomicModifyIORef' bpTotalWaitNs $ \Word64
x -> (Word64
x Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ (Word64
t1 Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
- Word64
t0), ())
let decActive = IORef Int -> (Int -> (Int, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
bpActiveConns ((Int -> (Int, ())) -> IO ()) -> (Int -> (Int, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
x -> (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1, ())
shouldPing <- needsPing bpValidation conn
healthy <- if shouldPing then P.ping conn else pure True
if healthy
then do
result <- try $ restore (action conn)
case result of
Right a
x -> do
Connection -> IO ()
forall (m :: * -> *). MonadIO m => Connection -> m ()
touchConnection Connection
conn
LocalPool Connection -> Connection -> IO ()
forall a. LocalPool a -> a -> IO ()
Pool.putResource LocalPool Connection
localPool Connection
conn
IO ()
decActive
a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
x
Left (SomeException
e :: SomeException)
| Bool
canRetryDead, SomeException -> Bool
isConnectionError SomeException
e -> do
Pool Connection -> LocalPool Connection -> Connection -> IO ()
forall a. Pool a -> LocalPool a -> a -> IO ()
Pool.destroyResource Pool Connection
bpPool LocalPool Connection
localPool Connection
conn
IO ()
decActive
IO a -> IO a
forall a. IO a -> IO a
restore (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ Int -> Bool -> IO a
go Int
n Bool
False
| Bool
otherwise -> do
Pool Connection -> LocalPool Connection -> Connection -> IO ()
forall a. Pool a -> LocalPool a -> a -> IO ()
Pool.destroyResource Pool Connection
bpPool LocalPool Connection
localPool Connection
conn
IO ()
decActive
IO a -> IO a
forall a. IO a -> IO a
restore (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ SomeException -> IO a
forall a. SomeException -> IO a
throwIOSome SomeException
e
else do
Pool.destroyResource bpPool localPool conn
decActive
restore $ go (n - 1) canRetryDead
throwIOSome :: SomeException -> IO a
throwIOSome :: forall a. SomeException -> IO a
throwIOSome = SomeException -> IO a
forall a. SomeException -> IO a
throwIO' where
throwIO' :: SomeException -> IO a
throwIO' :: forall a. SomeException -> IO a
throwIO' = SomeException -> IO a
forall e a. (HasCallStack, Exception e) => e -> IO a
Control.Exception.throwIO
isConnectionError :: SomeException -> Bool
isConnectionError :: SomeException -> Bool
isConnectionError SomeException
e = case SomeException -> Maybe Error
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e :: Maybe Error of
Just (NonboltyError SomeException
_) -> Bool
True
Maybe Error
_ -> Bool
False
needsPing :: ValidationStrategy -> Connection -> IO Bool
needsPing :: ValidationStrategy -> Connection -> IO Bool
needsPing ValidationStrategy
AlwaysPing Connection
_ = Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
needsPing ValidationStrategy
NeverPing Connection
_ = Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
needsPing (PingIfIdle Int
secs) Connection
conn = do
now <- IO Word64
getMonotonicTimeNSec
lastAct <- connectionLastActivity conn
let idleNs = Word64
now Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
- Word64
lastAct
let thresholdNs = Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
secs Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
* Word64
1_000_000_000
pure (idleNs >= thresholdNs)
type CheckedOutConnection :: Type
data CheckedOutConnection = CheckedOutConnection
{ CheckedOutConnection -> Connection
cocConnection :: !Connection
, CheckedOutConnection -> LocalPool Connection
cocLocalPool :: !(Pool.LocalPool Connection)
, CheckedOutConnection -> BoltPool
cocBoltPool :: !BoltPool
}
acquireConnection :: BoltPool -> IO CheckedOutConnection
acquireConnection :: BoltPool -> IO CheckedOutConnection
acquireConnection bp :: BoltPool
bp@BoltPool{Pool Connection
bpPool :: BoltPool -> Pool Connection
bpPool :: Pool Connection
bpPool, Int
bpMaxRetries :: BoltPool -> Int
bpMaxRetries :: Int
bpMaxRetries, ValidationStrategy
bpValidation :: BoltPool -> ValidationStrategy
bpValidation :: ValidationStrategy
bpValidation, IORef Int
bpActiveConns :: BoltPool -> IORef Int
bpActiveConns :: IORef Int
bpActiveConns, IORef Int
bpTotalAcqs :: BoltPool -> IORef Int
bpTotalAcqs :: IORef Int
bpTotalAcqs, IORef Word64
bpTotalWaitNs :: BoltPool -> IORef Word64
bpTotalWaitNs :: IORef Word64
bpTotalWaitNs} =
Int -> IO CheckedOutConnection
go (Int
bpMaxRetries Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
where
go :: Int -> IO CheckedOutConnection
go Int
0 = String -> IO CheckedOutConnection
forall a. String -> IO a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"acquireConnection: no healthy connection available"
go Int
n = do
t0 <- IO Word64
getMonotonicTimeNSec
(conn, localPool) <- Pool.takeResource bpPool
t1 <- getMonotonicTimeNSec
atomicModifyIORef' bpActiveConns $ \Int
x -> (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1, ())
atomicModifyIORef' bpTotalAcqs $ \Int
x -> (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1, ())
atomicModifyIORef' bpTotalWaitNs $ \Word64
x -> (Word64
x Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ (Word64
t1 Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
- Word64
t0), ())
shouldPing <- needsPing bpValidation conn
healthy <- if shouldPing then P.ping conn else pure True
if healthy
then pure CheckedOutConnection
{ cocConnection = conn, cocLocalPool = localPool, cocBoltPool = bp }
else do
Pool.destroyResource bpPool localPool conn
atomicModifyIORef' bpActiveConns $ \Int
x -> (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1, ())
go (n - 1)
releaseConnection :: CheckedOutConnection -> IO ()
releaseConnection :: CheckedOutConnection -> IO ()
releaseConnection CheckedOutConnection{Connection
cocConnection :: CheckedOutConnection -> Connection
cocConnection :: Connection
cocConnection, LocalPool Connection
cocLocalPool :: CheckedOutConnection -> LocalPool Connection
cocLocalPool :: LocalPool Connection
cocLocalPool, BoltPool
cocBoltPool :: CheckedOutConnection -> BoltPool
cocBoltPool :: BoltPool
cocBoltPool} = do
Connection -> IO ()
forall (m :: * -> *). MonadIO m => Connection -> m ()
touchConnection Connection
cocConnection
LocalPool Connection -> Connection -> IO ()
forall a. LocalPool a -> a -> IO ()
Pool.putResource LocalPool Connection
cocLocalPool Connection
cocConnection
IORef Int -> (Int -> (Int, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' (BoltPool -> IORef Int
bpActiveConns BoltPool
cocBoltPool) ((Int -> (Int, ())) -> IO ()) -> (Int -> (Int, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
x -> (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1, ())
releaseConnectionOnError :: CheckedOutConnection -> IO ()
releaseConnectionOnError :: CheckedOutConnection -> IO ()
releaseConnectionOnError CheckedOutConnection{Connection
cocConnection :: CheckedOutConnection -> Connection
cocConnection :: Connection
cocConnection, LocalPool Connection
cocLocalPool :: CheckedOutConnection -> LocalPool Connection
cocLocalPool :: LocalPool Connection
cocLocalPool, BoltPool
cocBoltPool :: CheckedOutConnection -> BoltPool
cocBoltPool :: BoltPool
cocBoltPool} = do
Pool Connection -> LocalPool Connection -> Connection -> IO ()
forall a. Pool a -> LocalPool a -> a -> IO ()
Pool.destroyResource (BoltPool -> Pool Connection
bpPool BoltPool
cocBoltPool) LocalPool Connection
cocLocalPool Connection
cocConnection
IORef Int -> (Int -> (Int, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' (BoltPool -> IORef Int
bpActiveConns BoltPool
cocBoltPool) ((Int -> (Int, ())) -> IO ()) -> (Int -> (Int, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
x -> (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1, ())
poolCounters :: BoltPool -> IO PoolCounters
poolCounters :: BoltPool -> IO PoolCounters
poolCounters BoltPool{IORef Int
bpActiveConns :: BoltPool -> IORef Int
bpActiveConns :: IORef Int
bpActiveConns, IORef Int
bpTotalAcqs :: BoltPool -> IORef Int
bpTotalAcqs :: IORef Int
bpTotalAcqs, IORef Word64
bpTotalWaitNs :: BoltPool -> IORef Word64
bpTotalWaitNs :: IORef Word64
bpTotalWaitNs} = do
active <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
bpActiveConns
acqs <- readIORef bpTotalAcqs
waitNs <- readIORef bpTotalWaitNs
pure PoolCounters{pcActive = active, pcTotalAcqs = acqs, pcTotalWaitNs = waitNs}