module Network.Nakadi.EventTypes.CursorsLag
  ( cursorsLag'
  , cursorsLagR'
  , cursorsLag
  , cursorsLagR
  ) where
import           Network.Nakadi.Internal.Prelude
import           Control.Arrow
import           Control.Lens
import qualified Data.Map.Strict                 as Map
import           Network.Nakadi.Internal.Http
import qualified Network.Nakadi.Internal.Lenses  as L
import           Network.Nakadi.Internal.Util
path :: EventTypeName -> ByteString
path eventTypeName =
  "/event-types/"
  <> encodeUtf8 (unEventTypeName eventTypeName)
  <> "/cursors-lag"
cursorsLag' ::
  MonadNakadi m
  => Config        
  -> EventTypeName 
  -> [Cursor]      
  -> m [Partition] 
                   
cursorsLag' config eventTypeName cursors =
  httpJsonBody config ok200 []
  (setRequestMethod "POST"
   . setRequestPath (path eventTypeName)
   . setRequestBodyJSON cursors)
cursorsLagR' ::
  MonadNakadiEnv r m
  => EventTypeName 
  -> [Cursor]      
  -> m [Partition] 
                   
cursorsLagR' eventTypeName cursors = do
  config <- asks (view L.nakadiConfig)
  cursorsLag' config eventTypeName cursors
cursorsLag ::
  MonadNakadi m
  => Config                         
  -> EventTypeName                  
  -> Map PartitionName CursorOffset 
                                    
  -> m (Map PartitionName Int64)    
cursorsLag config eventTypeName cursorsMap = do
  partitionStats <- cursorsLag' config eventTypeName cursors
  return $ partitionStats & map ((view L.partition &&& view L.unconsumedEvents) >>> sequenceSnd)
                          & catMaybes
                          & Map.fromList
  where cursors = map (uncurry Cursor) (Map.toList cursorsMap)
cursorsLagR ::
  MonadNakadiEnv r m
  => EventTypeName                  
  -> Map PartitionName CursorOffset 
                                    
  -> m (Map PartitionName Int64)    
cursorsLagR eventTypeName cursorsMap = do
  config <- asks (view L.nakadiConfig)
  cursorsLag config eventTypeName cursorsMap