{- Copyright (c) Meta Platforms, Inc. and affiliates. All rights reserved. This source code is licensed under the BSD-style license found in the LICENSE file in the root directory of this source tree. -} module Glean.Write.SendAndRebaseQueue ( SendAndRebaseQueueSettings(..) , SendAndRebaseQueue , withSendAndRebaseQueue , writeSendAndRebaseQueue ) where import Control.Concurrent import Control.Exception import Control.Monad import qualified Data.ByteString as BS import Data.Word import Util.Log import Util.STM import Glean.Backend.Types (Backend) import Glean.FFI (release) import qualified Glean.Write.SendQueue as SendQueue import Glean.Write.SendQueue (SendQueue) import Glean.RTS.Constants (firstAnonId) import Glean.Write.Stats as Stats import Glean.RTS.Foreign.Define import Glean.RTS.Foreign.Ownership import Glean.RTS.Foreign.Stacked import qualified Glean.RTS.Foreign.Lookup as Lookup import qualified Glean.RTS.Foreign.LookupCache as LookupCache import Glean.RTS.Foreign.LookupCache (LookupCache, countFailuresAsMisses) import qualified Glean.RTS.Foreign.FactSet as FactSet import Glean.RTS.Foreign.FactSet (FactSet) import Glean.RTS.Foreign.Inventory (Inventory) import Glean.RTS.Foreign.Subst import Glean.RTS.Types (Fid(..)) import Glean.Util.Metric import qualified Glean.Types as Thrift {- SendAndRebaseQueue ------------------ This is a queue for sending facts to a Glean server. It has a couple of advantages over using a raw SendQueue: * The queue avoids sending duplicate facts to the server by performing some de-duplication before sending the data. * When the input is JSON, the facts are serialized as binary before being sent to the server. De-duplication -------------- De-duplication happens in two ways: 1. Each batch is added to the current fact buffer, and de-duplicatd against the facts already in the buffer. 2. The contents of the buffer are sent to the server asynchronously, and when the server responds with a substitution (mapping facts in the buffer to their real fact IDs) the queue places the renamed facts in a cache, which is used to de-duplicate new batches. Details ------- The queue looks like this: +-------+ +-------------+ | cache | | fact buffer | +-------+ +-------------+ when a new batch is added (via writeSendAndRebaseQueue), it is added to the fact buffer: +---------------+ | new facts | +---------------+ | / | rebase / | / +-------+ +-------------+-----------+ | cache | | fact buffer | new facts | +-------+ +-------------+-----------+ when this happens, any facts in the new batch that duplicate facts in the cache or the existing fact buffer are removed. The fact buffer is sent to the server as a single batch: +-------+ +-------------+ | cache | | fact buffer | +-------+ +------+------+ | v server which will later respond with a substitution. When the server responds, we will have added more facts to the buffer, so the situation is now: +-------+ +-------------+-----------+ | cache | | fact buffer | new facts | +-------+ +------+------+-----------+ | v server The server's substitution maps facts in the buffer to their real fact IDs. At this point we rename the fact buffer using the substitution, and move those facts into the cache, leaving the fact buffer containing just the new facts: +-------+-------------+ +-----------+ | cache | fact buffer | | new facts | +-------+-------------+ +-----------+ The new facts are then sent to the server: +-------+-------------+ +-----------+ | cache | fact buffer | | new facts | +-------+-------------+ +-----+-----+ | v server so over time we build up a cache of facts from the server: essentially a cache of part of the DB, and use it to avoid sending facts that are already in the DB, thereby reducing the amount of data we send to the server. All of this is subject to limits: - sendAndRebaseQueueFactCacheSize: a limit on the size of the cache (we drop old entries according to an eviction policy) - sendAndRebaseQueueFactBufferSize: A limit on the size of the fact buffer (writers will wait for the server to respond if the buffer is full) Threads and SendQueue --------------------- The fact buffer is a bottleneck if multiple threads are writing simultaneously. To avoid that, we can have multiple fact buffers that share a cache: this is done by setting sendAndRebaseQueueThreads to a value higher than 1. SendAndRebaseQueue is built on top of SendQueue, which manages sending the individual batches to the server, retrying and resending as necessary. SendQueue is automatically configured to use the same number of sender threads as SendAndRebaseQueue, to parallelise the sending. -} -- | When the send operation fails or is completed this is called once. type Callback = Either SomeException () -> STM () data WaitSubst = WaitSubstNone | WaitSubstError SomeException | WaitSubstSuccess Thrift.Subst data Sender = Sender { sId :: Integer , sSubstVar :: TMVar WaitSubst , sSent :: TVar Point -- ^ Records the size and time the last batch was sent, for stats , sFacts :: MVar (FactSet, [FactOwnership]) -- ^ [Ownership] is the ownership assignments for facts in the FactSet , sCallbacks :: TQueue Callback } data SendAndRebaseQueue = SendAndRebaseQueue { srqSendQueue :: !SendQueue -- ^ Underlying send queue , srqSenders :: TQueue Sender -- ^ Senders accumulate facts while waiting for a substitution , srqInventory :: !Inventory -- ^ Schema inventory (used for rebasing) , srqFacts :: !LookupCache -- ^ Cache the ids of facts stored on the server , srqCacheStats :: !LookupCache.Stats -- ^ Stats for the Lookup cache , srqStats :: Maybe Stats -- ^ How to report stats. , srqFactBufferSize :: !Int -- ^ Max size of fact buffer } -- | Settings for a 'SendAndRebase' data SendAndRebaseQueueSettings = SendAndRebaseQueueSettings { -- | Settings for the underlying send queue sendAndRebaseQueueSendQueueSettings :: !SendQueue.SendQueueSettings -- | Max memory that the fact cache should use , sendAndRebaseQueueFactCacheSize :: !Int -- | Max memory that the fact buffer should use (per sender). The -- purpose of this limit is to prevent the buffer from growing -- without bound if the server's write queue is full. When the -- buffer exceeds this limit, writers will start waiting for the -- server. Note that the limit is per sender. , sendAndRebaseQueueFactBufferSize :: !Int -- | Number of senders , sendAndRebaseQueueSenders :: !Int -- | Allow facts in the batch to make reference to facts in the remote -- server that may not be in the local cache. , sendAndRebaseQueueAllowRemoteReferences :: Bool -- | How to log stats. Obatin this with 'Glean.Write.Stats.new'. , sendAndRebaseQueueStats :: Maybe Stats } -- | Add a new batch to the fact buffer, de-duplicating against the -- buffer and the cache. rebase :: Inventory -> Thrift.Batch -> LookupCache -> FactSet.FactSet -> IO (FactSet.FactSet, FactOwnership) rebase inventory batch cache base = do LookupCache.withCache Lookup.EmptyLookup cache LookupCache.FIFO $ \cache -> do -- when there are multiple senders, the cache may have new facts since -- we previously rebased, and it may contain fact IDs that overlap with -- the current base. So we have to use a snapshot of the cache, restricted -- to fact IDs less than the current base startingId. first_id <- Lookup.startingId base Lookup.withSnapshot cache first_id $ \snapshot -> do factSet <- Lookup.firstFreeId base >>= FactSet.new let define = snapshot `stacked` base `stacked` factSet subst <- defineBatch define inventory batch DefineFlags { trustRefs = True , ignoreRedef = True -- see Note [redefinition] } owned <- substOwnership subst $ FactOwnership $ Thrift.batch_owned batch return (factSet, owned) {- Note [redefinition] Redefinition is when we have two facts A->B, C->D where A == C but B /= D. Normally this would be an error, and defineBatch rejects it with an error. But it can arise naturally here because when typechecking/rebasing A->B we may have used a different cache from when we typechecked/rebased C->D. So even though B and D may be semantically identical, they are literally different. It doesn't seem possible to ensure that we always use a consistent cache, so it's inevitable that benign fact redefinitions may occur. Therefore we ignore redefinitions in `defineBatch`. Note: this might mean that we are ignoring actual errors and silently picking one of the two facts if they really did differ. That's bad, but I don't see an alternative. -} -- | Send the current fact buffer to the server senderFlush :: SendAndRebaseQueue -> Sender -> IO () senderFlush srq sender = modifyMVar_ (sFacts sender) $ \(facts, owned) -> do factOnlyBatch <- FactSet.serialize facts let batch = factOnlyBatch { Thrift.batch_owned = ownershipUnits (unionOwnership owned) } !size <- FactSet.factMemory facts start <- beginTick (fromIntegral size) atomically $ do callbacks <- flushTQueue (sCallbacks sender) SendQueue.writeSendQueue (srqSendQueue srq) batch $ \result -> do case result of Right subst -> putTMVar (sSubstVar sender) (WaitSubstSuccess subst) Left e -> putTMVar (sSubstVar sender) (WaitSubstError e) forM_ callbacks ($ void result) writeTVar (sSent sender) start return (facts, []) -- | If the server has returned a substutition, rebase the current -- fact buffer against it. Send the current fact buffer to the server, -- unless we're already waiting for a substitution. senderRebaseAndFlush :: Bool -> SendAndRebaseQueue -> Sender -> IO () senderRebaseAndFlush wait srq sender = do maybeSubst <- atomically $ if wait then Just <$> takeTMVar (sSubstVar sender) else tryTakeTMVar (sSubstVar sender) case maybeSubst of Nothing -> do -- waiting on subst log "Waiting on substitution from server" return () Just (WaitSubstError e) -> do log "Send failure" atomically $ putTMVar (sSubstVar sender) (WaitSubstError e) throwIO e Just WaitSubstNone -> do -- first send log "Sending first batch" senderFlush srq sender Just (WaitSubstSuccess thriftSubst) -> do -- got subst log "Sending next batch" handle (\(e :: SomeException) -> do logError (show e) atomically $ putTMVar (sSubstVar sender) (WaitSubstError e) throwIO e) $ modifyMVar_ (sFacts sender) $ \(base,owned) -> -- eagerly release the subst when we're done bracket (deserialize thriftSubst) release $ \subst -> do (newBase, newSubst) <- FactSet.rebase (srqInventory srq) subst (srqFacts srq) base newOwned <- mapM (substOwnership newSubst) owned return (newBase, newOwned) -- "Commit throughput" will be write throughput to the server start <- readTVarIO (sSent sender) statBump srq Stats.commitThroughput =<< endTick start senderFlush srq sender where log msg = vlog 1 $ "Sender " <> show (sId sender) <> ": " <> msg updateLookupCacheStats :: SendAndRebaseQueue -> IO () updateLookupCacheStats SendAndRebaseQueue{..} = forM_ srqStats $ \stats -> do statValues <- LookupCache.readStatsAndResetCounters srqCacheStats Stats.bump stats Stats.lookupCacheStats (countFailuresAsMisses statValues) statTick :: SendAndRebaseQueue -> Bump Tick -> Word64 -> IO a -> IO a statTick SendAndRebaseQueue{..} bump val act = case srqStats of Nothing -> act Just stats -> Stats.tick stats bump val act statBump :: SendAndRebaseQueue -> Bump Tick -> Tick -> IO () statBump SendAndRebaseQueue{..} bump val = case srqStats of Nothing -> return () Just stats -> Stats.bump stats bump val senderSendOrAppend :: SendAndRebaseQueue -> Sender -> Thrift.Batch -> Callback -> Point -> IO () senderSendOrAppend srq sender batch callback latency = do -- "Mutator latency" is the latency between calling writeSendAndRebaseQueue -- and having a free Sender to write the batch. statBump srq Stats.mutatorLatency =<< endTick latency let !size = BS.length $ Thrift.batch_facts batch newSize <- -- "Mutator throughput" is how fast we are appending new facts -- to the FactSet. statTick srq Stats.mutatorThroughput (fromIntegral size) $ do modifyMVar (sFacts sender) $ \(base, ownership) -> do (facts, owned) <- rebase (srqInventory srq) batch (srqFacts srq) base FactSet.append base facts atomically $ writeTQueue (sCallbacks sender) callback newSize <- FactSet.factMemory base return ((base, owned : ownership), newSize) updateLookupCacheStats srq let !wait = newSize >= srqFactBufferSize srq senderRebaseAndFlush wait srq sender withSendAndRebaseQueue :: Backend e => e -> Thrift.Repo -> Inventory -> SendAndRebaseQueueSettings -> (SendAndRebaseQueue -> IO a) -> IO a withSendAndRebaseQueue backend repo inventory settings action = do vlog 1 $ "Allow remote refs: " <> show sendAndRebaseQueueAllowRemoteReferences SendQueue.withSendQueue backend repo sendAndRebaseQueueSendQueueSettings $ \sendQueue -> do cacheStats <- LookupCache.newStats let cacheSize = fromIntegral sendAndRebaseQueueFactCacheSize srq <- SendAndRebaseQueue <$> pure sendQueue <*> newTQueueIO <*> pure inventory <*> LookupCache.new cacheSize 1 cacheStats <*> pure cacheStats <*> pure sendAndRebaseQueueStats <*> pure sendAndRebaseQueueFactBufferSize bracket_ (createSenderPool srq) (deleteSenderPool srq) $ action srq where SendAndRebaseQueueSettings{..} = settings createSenderPool srq = forM_ senderIds $ \i -> do factset <- FactSet.new baseId worker <- Sender i <$> newTMVarIO WaitSubstNone <*> newTVarIO (error "missing sSent") <*> newMVar (factset, []) <*> newTQueueIO atomically $ writeTQueue (srqSenders srq) worker deleteSenderPool srq = forM_ senderIds $ \_i -> do -- don't deadlock here in case we died leaving the queue empty sender <- atomically $ tryReadTQueue (srqSenders srq) forM_ sender $ senderRebaseAndFlush True srq senderIds = take sendAndRebaseQueueSenders [1..] baseId = if sendAndRebaseQueueAllowRemoteReferences then firstLocalId else Fid Thrift.fIRST_FREE_ID -- Higher than Ids in the remote db to avoid remote fact references -- from being mistaken for local fact references. -- Lower than Ids in JSON batches to avoid references within the -- batch from being mistaken for references to the local db. firstLocalId = Fid (firstAnonId `div` 2) writeSendAndRebaseQueue :: SendAndRebaseQueue -> Thrift.Batch -> Callback -> IO () writeSendAndRebaseQueue srq batch callback = do point <- beginTick 1 bracket (atomically $ readTQueue $ srqSenders srq) (\sender -> atomically $ writeTQueue (srqSenders srq) sender) (\sender -> senderSendOrAppend srq sender batch callback point)