module Faktory.Pool
( FaktoryPool
, HasFaktoryPool (..)
, Settings
, PoolSettings
, newFaktoryPool
, perform
, buildJob
, withProducer
, takeProducer
, module Faktory.Job
) where
import Faktory.Prelude
import Control.Monad.IO.Class (MonadIO (..))
import Control.Monad.Reader (MonadReader, asks)
import Data.Aeson (ToJSON)
import Data.Pool.Compat (Pool)
import qualified Data.Pool.Compat as Pool
import Faktory.Job hiding (buildJob, perform)
import qualified Faktory.Job as Job
import Faktory.Producer
import Faktory.Settings (PoolSettings (..), Settings)
import GHC.Stack (HasCallStack)
import Lens.Micro (Lens', (^.))
import UnliftIO (MonadUnliftIO, withRunInIO)
newtype FaktoryPool = FaktoryPool (Pool Producer)
class HasFaktoryPool env where
faktoryPoolL :: Lens' env FaktoryPool
instance HasFaktoryPool FaktoryPool where
faktoryPoolL :: Lens' FaktoryPool FaktoryPool
faktoryPoolL = (FaktoryPool -> f FaktoryPool) -> FaktoryPool -> f FaktoryPool
forall a. a -> a
id
newFaktoryPool
:: MonadIO m
=> Settings
-> PoolSettings
-> m FaktoryPool
newFaktoryPool :: forall (m :: * -> *).
MonadIO m =>
Settings -> PoolSettings -> m FaktoryPool
newFaktoryPool Settings
settings PoolSettings {Natural
settingsSize :: Natural
settingsTimeout :: Natural
settingsSize :: PoolSettings -> Natural
settingsTimeout :: PoolSettings -> Natural
..} = do
IO FaktoryPool -> m FaktoryPool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO FaktoryPool -> m FaktoryPool)
-> IO FaktoryPool -> m FaktoryPool
forall a b. (a -> b) -> a -> b
$
Pool Producer -> FaktoryPool
FaktoryPool
(Pool Producer -> FaktoryPool)
-> IO (Pool Producer) -> IO FaktoryPool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO Producer
-> (Producer -> IO ()) -> Double -> Int -> IO (Pool Producer)
forall a. IO a -> (a -> IO ()) -> Double -> Int -> IO (Pool a)
Pool.createPool
(Settings -> IO Producer
newProducer Settings
settings)
Producer -> IO ()
closeProducer
(Natural -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral Natural
settingsTimeout)
(Natural -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Natural
settingsSize)
perform
:: ( MonadUnliftIO m
, MonadReader env m
, HasFaktoryPool env
, ToJSON arg
, HasCallStack
)
=> JobOptions
-> arg
-> m JobId
perform :: forall (m :: * -> *) env arg.
(MonadUnliftIO m, MonadReader env m, HasFaktoryPool env,
ToJSON arg, HasCallStack) =>
JobOptions -> arg -> m JobId
perform JobOptions
options arg
arg = do
(Producer -> m JobId) -> m JobId
forall (m :: * -> *) env a.
(MonadUnliftIO m, MonadReader env m, HasFaktoryPool env) =>
(Producer -> m a) -> m a
withProducer ((Producer -> m JobId) -> m JobId)
-> (Producer -> m JobId) -> m JobId
forall a b. (a -> b) -> a -> b
$ \Producer
producer -> do
IO JobId -> m JobId
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO JobId -> m JobId) -> IO JobId -> m JobId
forall a b. (a -> b) -> a -> b
$ JobOptions -> Producer -> arg -> IO JobId
forall arg.
(HasCallStack, ToJSON arg) =>
JobOptions -> Producer -> arg -> IO JobId
Job.perform JobOptions
options Producer
producer arg
arg
buildJob
:: (MonadUnliftIO m, MonadReader env m, HasFaktoryPool env)
=> JobOptions
-> arg
-> m (Job arg)
buildJob :: forall (m :: * -> *) env arg.
(MonadUnliftIO m, MonadReader env m, HasFaktoryPool env) =>
JobOptions -> arg -> m (Job arg)
buildJob JobOptions
options arg
arg = do
(Producer -> m (Job arg)) -> m (Job arg)
forall (m :: * -> *) env a.
(MonadUnliftIO m, MonadReader env m, HasFaktoryPool env) =>
(Producer -> m a) -> m a
withProducer ((Producer -> m (Job arg)) -> m (Job arg))
-> (Producer -> m (Job arg)) -> m (Job arg)
forall a b. (a -> b) -> a -> b
$ \Producer
producer -> do
IO (Job arg) -> m (Job arg)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Job arg) -> m (Job arg)) -> IO (Job arg) -> m (Job arg)
forall a b. (a -> b) -> a -> b
$ JobOptions -> Producer -> arg -> IO (Job arg)
forall arg. JobOptions -> Producer -> arg -> IO (Job arg)
Job.buildJob JobOptions
options Producer
producer arg
arg
withProducer
:: (MonadUnliftIO m, MonadReader env m, HasFaktoryPool env)
=> (Producer -> m a)
-> m a
withProducer :: forall (m :: * -> *) env a.
(MonadUnliftIO m, MonadReader env m, HasFaktoryPool env) =>
(Producer -> m a) -> m a
withProducer Producer -> m a
f = do
FaktoryPool Pool Producer
p <- (env -> FaktoryPool) -> m FaktoryPool
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks (env -> Getting FaktoryPool env FaktoryPool -> FaktoryPool
forall s a. s -> Getting a s a -> a
^. Getting FaktoryPool env FaktoryPool
forall env. HasFaktoryPool env => Lens' env FaktoryPool
Lens' env FaktoryPool
faktoryPoolL)
((forall a. m a -> IO a) -> IO a) -> m a
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO a) -> m a)
-> ((forall a. m a -> IO a) -> IO a) -> m a
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
runInIO -> do
Pool Producer -> (Producer -> IO a) -> IO a
forall a r. Pool a -> (a -> IO r) -> IO r
Pool.withResource Pool Producer
p ((Producer -> IO a) -> IO a) -> (Producer -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ m a -> IO a
forall a. m a -> IO a
runInIO (m a -> IO a) -> (Producer -> m a) -> Producer -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Producer -> m a
f
takeProducer
:: (MonadIO m, MonadReader env m, HasFaktoryPool env) => m (Producer, m ())
takeProducer :: forall (m :: * -> *) env.
(MonadIO m, MonadReader env m, HasFaktoryPool env) =>
m (Producer, m ())
takeProducer = do
FaktoryPool Pool Producer
p <- (env -> FaktoryPool) -> m FaktoryPool
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks (env -> Getting FaktoryPool env FaktoryPool -> FaktoryPool
forall s a. s -> Getting a s a -> a
^. Getting FaktoryPool env FaktoryPool
forall env. HasFaktoryPool env => Lens' env FaktoryPool
Lens' env FaktoryPool
faktoryPoolL)
(Producer
producer, LocalPool Producer
lp) <- IO (Producer, LocalPool Producer)
-> m (Producer, LocalPool Producer)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Producer, LocalPool Producer)
-> m (Producer, LocalPool Producer))
-> IO (Producer, LocalPool Producer)
-> m (Producer, LocalPool Producer)
forall a b. (a -> b) -> a -> b
$ Pool Producer -> IO (Producer, LocalPool Producer)
forall a. Pool a -> IO (a, LocalPool a)
Pool.takeResource Pool Producer
p
(Producer, m ()) -> m (Producer, m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Producer
producer, IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ LocalPool Producer -> Producer -> IO ()
forall a. LocalPool a -> a -> IO ()
Pool.putResource LocalPool Producer
lp Producer
producer)