{-# LANGUAGE CPP #-}
module Data.Acid.Log
( FileLog(..)
, LogKey(..)
, EntryId
, openFileLog
, closeFileLog
, pushEntry
, pushAction
, ensureLeastEntryId
, readEntriesFrom
, rollbackTo
, rollbackWhile
, newestEntry
, askCurrentEntryId
, cutFileLog
, archiveFileLog
, findLogFiles
) where
import Data.Acid.Archive (Archiver(..), Entries(..), entriesToList)
import Data.Acid.Core
import System.Directory
import System.FilePath
import System.IO
import FileIO
import Foreign.Ptr
import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM
import qualified Data.ByteString.Lazy as Lazy
import qualified Data.ByteString as Strict
import qualified Data.ByteString.Unsafe as Strict
import Data.List ( (\\), stripPrefix, sort )
import Data.Maybe
#if !MIN_VERSION_base(4,11,0)
import Data.Monoid ( (<>) )
#endif
import Text.Printf ( printf )
import Paths_acid_state ( version )
import Data.Version ( showVersion )
import Control.Exception ( handle, IOException )
type EntryId = Int
data FileLog object
= FileLog { forall object. FileLog object -> LogKey object
logIdentifier :: LogKey object
, forall object. FileLog object -> MVar FHandle
logCurrent :: MVar FHandle
, forall object. FileLog object -> TVar EntryId
logNextEntryId :: TVar EntryId
, forall object. FileLog object -> TVar ([ByteString], [IO ()])
logQueue :: TVar ([Lazy.ByteString], [IO ()])
, forall object. FileLog object -> [ThreadId]
logThreads :: [ThreadId]
}
data LogKey object
= LogKey
{ forall object. LogKey object -> String
logDirectory :: FilePath
, forall object. LogKey object -> String
logPrefix :: String
, forall object. LogKey object -> Serialiser object
logSerialiser :: Serialiser object
, forall object. LogKey object -> Archiver
logArchiver :: Archiver
}
formatLogFile :: String -> EntryId -> String
formatLogFile :: String -> EntryId -> String
formatLogFile = String -> String -> EntryId -> String
forall r. PrintfType r => String -> r
printf String
"%s-%010d.log"
findLogFiles :: LogKey object -> IO [(EntryId, FilePath)]
findLogFiles :: forall object. LogKey object -> IO [(EntryId, String)]
findLogFiles LogKey object
identifier = do
Bool -> String -> IO ()
createDirectoryIfMissing Bool
True (LogKey object -> String
forall object. LogKey object -> String
logDirectory LogKey object
identifier)
files <- String -> IO [String]
getDirectoryContents (LogKey object -> String
forall object. LogKey object -> String
logDirectory LogKey object
identifier)
return [ (tid, logDirectory identifier </> file)
| file <- files
, logFile <- maybeToList (stripPrefix (logPrefix identifier ++ "-") file)
, (tid, ".log") <- reads logFile ]
saveVersionFile :: LogKey object -> IO ()
saveVersionFile :: forall object. LogKey object -> IO ()
saveVersionFile LogKey object
key = do
exist <- String -> IO Bool
doesFileExist String
versionFile
unless exist $ writeFile versionFile (showVersion version)
where
versionFile :: String
versionFile = LogKey object -> String
forall object. LogKey object -> String
logDirectory LogKey object
key String -> String -> String
</> LogKey object -> String
forall object. LogKey object -> String
logPrefix LogKey object
key String -> String -> String
<.> String
"version"
openFileLog :: LogKey object -> IO (FileLog object)
openFileLog :: forall object. LogKey object -> IO (FileLog object)
openFileLog LogKey object
identifier = do
logFiles <- LogKey object -> IO [(EntryId, String)]
forall object. LogKey object -> IO [(EntryId, String)]
findLogFiles LogKey object
identifier
saveVersionFile identifier
currentState <- newEmptyMVar
queue <- newTVarIO ([], [])
nextEntryRef <- newTVarIO 0
tid1 <- myThreadId
tid2 <- forkIO $ fileWriter (logArchiver identifier) currentState queue tid1
let fLog = FileLog { logIdentifier :: LogKey object
logIdentifier = LogKey object
identifier
, logCurrent :: MVar FHandle
logCurrent = MVar FHandle
currentState
, logNextEntryId :: TVar EntryId
logNextEntryId = TVar EntryId
nextEntryRef
, logQueue :: TVar ([ByteString], [IO ()])
logQueue = TVar ([ByteString], [IO ()])
queue
, logThreads :: [ThreadId]
logThreads = [ThreadId
tid2] }
if null logFiles
then do let currentEntryId = EntryId
0
handle <- open (logDirectory identifier </> formatLogFile (logPrefix identifier) currentEntryId)
putMVar currentState handle
else do let (lastFileEntryId, lastFilePath) = maximum logFiles
entries <- readEntities (logArchiver identifier) lastFilePath
let currentEntryId = EntryId
lastFileEntryId EntryId -> EntryId -> EntryId
forall a. Num a => a -> a -> a
+ [ByteString] -> EntryId
forall a. [a] -> EntryId
forall (t :: * -> *) a. Foldable t => t a -> EntryId
length [ByteString]
entries
atomically $ writeTVar nextEntryRef currentEntryId
handle <- open (logDirectory identifier </> formatLogFile (logPrefix identifier) currentEntryId)
putMVar currentState handle
return fLog
fileWriter :: Archiver -> MVar FHandle -> TVar ([Lazy.ByteString], [IO ()]) -> ThreadId -> IO ()
fileWriter :: Archiver
-> MVar FHandle
-> TVar ([ByteString], [IO ()])
-> ThreadId
-> IO ()
fileWriter Archiver
archiver MVar FHandle
currentState TVar ([ByteString], [IO ()])
queue ThreadId
parentTid = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
(entries, actions) <- STM ([ByteString], [IO ()]) -> IO ([ByteString], [IO ()])
forall a. STM a -> IO a
atomically (STM ([ByteString], [IO ()]) -> IO ([ByteString], [IO ()]))
-> STM ([ByteString], [IO ()]) -> IO ([ByteString], [IO ()])
forall a b. (a -> b) -> a -> b
$ do
(entries, actions) <- TVar ([ByteString], [IO ()]) -> STM ([ByteString], [IO ()])
forall a. TVar a -> STM a
readTVar TVar ([ByteString], [IO ()])
queue
when (null entries && null actions) retry
writeTVar queue ([], [])
return (reverse entries, reverse actions)
handle (\IOException
e -> ThreadId -> IOException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
parentTid (IOException
e :: IOException)) $
withMVar currentState $ \FHandle
fd -> do
let arch :: ByteString
arch = Archiver -> [ByteString] -> ByteString
archiveWrite Archiver
archiver [ByteString]
entries
FHandle -> [ByteString] -> IO ()
writeToDisk FHandle
fd (ByteString -> [ByteString]
repack ByteString
arch)
sequence_ actions
yield
repack :: Lazy.ByteString -> [Strict.ByteString]
repack :: ByteString -> [ByteString]
repack = ByteString -> [ByteString]
worker
where
worker :: ByteString -> [ByteString]
worker ByteString
bs
| ByteString -> Bool
Lazy.null ByteString
bs = []
| Bool
otherwise = [ByteString] -> ByteString
Strict.concat (ByteString -> [ByteString]
Lazy.toChunks (Int64 -> ByteString -> ByteString
Lazy.take Int64
blockSize ByteString
bs)) ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: ByteString -> [ByteString]
worker (Int64 -> ByteString -> ByteString
Lazy.drop Int64
blockSize ByteString
bs)
blockSize :: Int64
blockSize = Int64
4Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
*Int64
1024
writeToDisk :: FHandle -> [Strict.ByteString] -> IO ()
writeToDisk :: FHandle -> [ByteString] -> IO ()
writeToDisk FHandle
_ [] = () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
writeToDisk FHandle
handle [ByteString]
xs = do
(ByteString -> IO ()) -> [ByteString] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ByteString -> IO ()
worker [ByteString]
xs
FHandle -> IO ()
flush FHandle
handle
where
worker :: ByteString -> IO ()
worker ByteString
bs = do
let len :: EntryId
len = ByteString -> EntryId
Strict.length ByteString
bs
count <- ByteString -> (CString -> IO Word32) -> IO Word32
forall a. ByteString -> (CString -> IO a) -> IO a
Strict.unsafeUseAsCString ByteString
bs ((CString -> IO Word32) -> IO Word32)
-> (CString -> IO Word32) -> IO Word32
forall a b. (a -> b) -> a -> b
$ \CString
ptr -> FHandle -> Ptr Word8 -> Word32 -> IO Word32
write FHandle
handle (CString -> Ptr Word8
forall a b. Ptr a -> Ptr b
castPtr CString
ptr) (EntryId -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral EntryId
len)
when (fromIntegral count < len) $
worker (Strict.drop (fromIntegral count) bs)
closeFileLog :: FileLog object -> IO ()
closeFileLog :: forall object. FileLog object -> IO ()
closeFileLog FileLog object
fLog =
MVar FHandle -> (FHandle -> IO FHandle) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (FileLog object -> MVar FHandle
forall object. FileLog object -> MVar FHandle
logCurrent FileLog object
fLog) ((FHandle -> IO FHandle) -> IO ())
-> (FHandle -> IO FHandle) -> IO ()
forall a b. (a -> b) -> a -> b
$ \FHandle
handle -> do
FHandle -> IO ()
close FHandle
handle
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ [ThreadId] -> (ThreadId -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (FileLog object -> [ThreadId]
forall object. FileLog object -> [ThreadId]
logThreads FileLog object
fLog) ThreadId -> IO ()
killThread
return $ error "Data.Acid.Log: FileLog has been closed"
readEntities :: Archiver -> FilePath -> IO [Lazy.ByteString]
readEntities :: Archiver -> String -> IO [ByteString]
readEntities Archiver
archiver String
path = do
archive <- String -> IO ByteString
Lazy.readFile String
path
return $ entriesToList (archiveRead archiver archive)
ensureLeastEntryId :: FileLog object -> EntryId -> IO ()
ensureLeastEntryId :: forall object. FileLog object -> EntryId -> IO ()
ensureLeastEntryId FileLog object
fLog EntryId
youngestEntry = do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
entryId <- TVar EntryId -> STM EntryId
forall a. TVar a -> STM a
readTVar (FileLog object -> TVar EntryId
forall object. FileLog object -> TVar EntryId
logNextEntryId FileLog object
fLog)
writeTVar (logNextEntryId fLog) (max entryId youngestEntry)
FileLog object -> IO EntryId
forall object. FileLog object -> IO EntryId
cutFileLog FileLog object
fLog
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
readEntriesFrom :: FileLog object -> EntryId -> IO [object]
readEntriesFrom :: forall object. FileLog object -> EntryId -> IO [object]
readEntriesFrom FileLog object
fLog EntryId
youngestEntry = do
entryCap <- FileLog object -> IO EntryId
forall object. FileLog object -> IO EntryId
cutFileLog FileLog object
fLog
logFiles <- findLogFiles (logIdentifier fLog)
let sorted = [(EntryId, String)] -> [(EntryId, String)]
forall a. Ord a => [a] -> [a]
sort [(EntryId, String)]
logFiles
relevant = Maybe EntryId
-> Maybe EntryId -> [(EntryId, String)] -> [(EntryId, String)]
filterLogFiles (EntryId -> Maybe EntryId
forall a. a -> Maybe a
Just EntryId
youngestEntry) (EntryId -> Maybe EntryId
forall a. a -> Maybe a
Just EntryId
entryCap) [(EntryId, String)]
sorted
firstEntryId = case [(EntryId, String)]
relevant of
[] -> EntryId
0
( (EntryId, String)
logFile : [(EntryId, String)]
_logFiles) -> (EntryId, String) -> EntryId
forall {a} {b}. (a, b) -> a
rangeStart (EntryId, String)
logFile
archive <- liftM Lazy.fromChunks $ mapM (Strict.readFile . snd) relevant
let entries = Entries -> [ByteString]
entriesToList (Entries -> [ByteString]) -> Entries -> [ByteString]
forall a b. (a -> b) -> a -> b
$ Archiver -> ByteString -> Entries
archiveRead (LogKey object -> Archiver
forall object. LogKey object -> Archiver
logArchiver LogKey object
identifier) ByteString
archive
return $ map (decode' identifier)
$ take (entryCap - youngestEntry)
$ drop (youngestEntry - firstEntryId) entries
where
rangeStart :: (a, b) -> a
rangeStart (a
firstEntryId, b
_path) = a
firstEntryId
identifier :: LogKey object
identifier = FileLog object -> LogKey object
forall object. FileLog object -> LogKey object
logIdentifier FileLog object
fLog
rollbackTo :: LogKey object -> EntryId -> IO ()
rollbackTo :: forall object. LogKey object -> EntryId -> IO ()
rollbackTo LogKey object
identifier EntryId
youngestEntry = do
logFiles <- LogKey object -> IO [(EntryId, String)]
forall object. LogKey object -> IO [(EntryId, String)]
findLogFiles LogKey object
identifier
let sorted = [(EntryId, String)] -> [(EntryId, String)]
forall a. Ord a => [a] -> [a]
sort [(EntryId, String)]
logFiles
loop [] = () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
loop ((EntryId
rangeStart, String
path) : [(EntryId, String)]
xs)
| EntryId
rangeStart EntryId -> EntryId -> Bool
forall a. Ord a => a -> a -> Bool
>= EntryId
youngestEntry = String -> IO ()
removeFile String
path IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> [(EntryId, String)] -> IO ()
loop [(EntryId, String)]
xs
| Bool
otherwise = do
archive <- String -> IO ByteString
Strict.readFile String
path
pathHandle <- openFile path WriteMode
let entries = Entries -> [ByteString]
entriesToList (Entries -> [ByteString]) -> Entries -> [ByteString]
forall a b. (a -> b) -> a -> b
$ Archiver -> ByteString -> Entries
archiveRead (LogKey object -> Archiver
forall object. LogKey object -> Archiver
logArchiver LogKey object
identifier) ([ByteString] -> ByteString
Lazy.fromChunks [ByteString
archive])
entriesToKeep = EntryId -> [ByteString] -> [ByteString]
forall a. EntryId -> [a] -> [a]
take (EntryId
youngestEntry EntryId -> EntryId -> EntryId
forall a. Num a => a -> a -> a
- EntryId
rangeStart EntryId -> EntryId -> EntryId
forall a. Num a => a -> a -> a
+ EntryId
1) [ByteString]
entries
lengthToKeep = ByteString -> Int64
Lazy.length (Archiver -> [ByteString] -> ByteString
archiveWrite (LogKey object -> Archiver
forall object. LogKey object -> Archiver
logArchiver LogKey object
identifier) [ByteString]
entriesToKeep)
hSetFileSize pathHandle (fromIntegral lengthToKeep)
hClose pathHandle
loop (reverse sorted)
rollbackWhile :: LogKey object
-> (object -> Bool)
-> IO ()
rollbackWhile :: forall object. LogKey object -> (object -> Bool) -> IO ()
rollbackWhile LogKey object
identifier object -> Bool
filterFn = do
logFiles <- LogKey object -> IO [(EntryId, String)]
forall object. LogKey object -> IO [(EntryId, String)]
findLogFiles LogKey object
identifier
let sorted = [(EntryId, String)] -> [(EntryId, String)]
forall a. Ord a => [a] -> [a]
sort [(EntryId, String)]
logFiles
loop [] = () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
loop ((a
_rangeStart, String
path) : [(a, String)]
xs) = do
archive <- String -> IO ByteString
Strict.readFile String
path
let entries = Entries -> [ByteString]
entriesToList (Entries -> [ByteString]) -> Entries -> [ByteString]
forall a b. (a -> b) -> a -> b
$ Archiver -> ByteString -> Entries
archiveRead (LogKey object -> Archiver
forall object. LogKey object -> Archiver
logArchiver LogKey object
identifier) ([ByteString] -> ByteString
Lazy.fromChunks [ByteString
archive])
entriesToSkip = (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
takeWhile (object -> Bool
filterFn (object -> Bool) -> (ByteString -> object) -> ByteString -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LogKey object -> ByteString -> object
forall object. LogKey object -> ByteString -> object
decode' LogKey object
identifier) ([ByteString] -> [ByteString]) -> [ByteString] -> [ByteString]
forall a b. (a -> b) -> a -> b
$ [ByteString] -> [ByteString]
forall a. [a] -> [a]
reverse [ByteString]
entries
skip_size = ByteString -> Int64
Lazy.length (Archiver -> [ByteString] -> ByteString
archiveWrite (LogKey object -> Archiver
forall object. LogKey object -> Archiver
logArchiver LogKey object
identifier) [ByteString]
entriesToSkip)
orig_size = EntryId -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (EntryId -> Int64) -> EntryId -> Int64
forall a b. (a -> b) -> a -> b
$ ByteString -> EntryId
Strict.length ByteString
archive
new_size = Int64
orig_size Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
skip_size
if new_size == 0
then do removeFile path; loop xs
else do pathHandle <- openFile path WriteMode
hSetFileSize pathHandle (fromIntegral new_size)
hClose pathHandle
loop (reverse sorted)
filterLogFiles
:: Maybe EntryId
-> Maybe EntryId
-> [(EntryId, FilePath)] -> [(EntryId, FilePath)]
filterLogFiles :: Maybe EntryId
-> Maybe EntryId -> [(EntryId, String)] -> [(EntryId, String)]
filterLogFiles Maybe EntryId
minEntryIdMb Maybe EntryId
maxEntryIdMb [(EntryId, String)]
logFiles = [(EntryId, String)] -> [(EntryId, String)]
forall {b}. [(EntryId, b)] -> [(EntryId, b)]
worker [(EntryId, String)]
logFiles
where
worker :: [(EntryId, b)] -> [(EntryId, b)]
worker [] = []
worker [ (EntryId, b)
logFile ]
| EntryId -> Bool
ltMaxEntryId ((EntryId, b) -> EntryId
forall {a} {b}. (a, b) -> a
rangeStart (EntryId, b)
logFile)
= [ (EntryId, b)
logFile ]
| Bool
otherwise
= []
worker ( (EntryId, b)
left : (EntryId, b)
right : [(EntryId, b)]
xs)
| EntryId -> Bool
ltMinEntryId ((EntryId, b) -> EntryId
forall {a} {b}. (a, b) -> a
rangeStart (EntryId, b)
right)
= [(EntryId, b)] -> [(EntryId, b)]
worker ((EntryId, b)
right (EntryId, b) -> [(EntryId, b)] -> [(EntryId, b)]
forall a. a -> [a] -> [a]
: [(EntryId, b)]
xs)
| EntryId -> Bool
ltMaxEntryId ((EntryId, b) -> EntryId
forall {a} {b}. (a, b) -> a
rangeStart (EntryId, b)
left)
= (EntryId, b)
left (EntryId, b) -> [(EntryId, b)] -> [(EntryId, b)]
forall a. a -> [a] -> [a]
: [(EntryId, b)] -> [(EntryId, b)]
worker ((EntryId, b)
right (EntryId, b) -> [(EntryId, b)] -> [(EntryId, b)]
forall a. a -> [a] -> [a]
: [(EntryId, b)]
xs)
| Bool
otherwise
= []
ltMinEntryId :: EntryId -> Bool
ltMinEntryId = case Maybe EntryId
minEntryIdMb of Maybe EntryId
Nothing -> Bool -> EntryId -> Bool
forall a b. a -> b -> a
const Bool
False
Just EntryId
minEntryId -> (EntryId -> EntryId -> Bool
forall a. Ord a => a -> a -> Bool
<= EntryId
minEntryId)
ltMaxEntryId :: EntryId -> Bool
ltMaxEntryId = case Maybe EntryId
maxEntryIdMb of Maybe EntryId
Nothing -> Bool -> EntryId -> Bool
forall a b. a -> b -> a
const Bool
True
Just EntryId
maxEntryId -> (EntryId -> EntryId -> Bool
forall a. Ord a => a -> a -> Bool
< EntryId
maxEntryId)
rangeStart :: (a, b) -> a
rangeStart (a
firstEntryId, b
_path) = a
firstEntryId
archiveFileLog :: FileLog object -> EntryId -> IO ()
archiveFileLog :: forall object. FileLog object -> EntryId -> IO ()
archiveFileLog FileLog object
fLog EntryId
entryId = do
logFiles <- LogKey object -> IO [(EntryId, String)]
forall object. LogKey object -> IO [(EntryId, String)]
findLogFiles (FileLog object -> LogKey object
forall object. FileLog object -> LogKey object
logIdentifier FileLog object
fLog)
let sorted = [(EntryId, String)] -> [(EntryId, String)]
forall a. Ord a => [a] -> [a]
sort [(EntryId, String)]
logFiles
relevant = Maybe EntryId
-> Maybe EntryId -> [(EntryId, String)] -> [(EntryId, String)]
filterLogFiles Maybe EntryId
forall a. Maybe a
Nothing (EntryId -> Maybe EntryId
forall a. a -> Maybe a
Just EntryId
entryId) [(EntryId, String)]
sorted
[(EntryId, String)] -> [(EntryId, String)] -> [(EntryId, String)]
forall a. Eq a => [a] -> [a] -> [a]
\\ Maybe EntryId
-> Maybe EntryId -> [(EntryId, String)] -> [(EntryId, String)]
filterLogFiles (EntryId -> Maybe EntryId
forall a. a -> Maybe a
Just EntryId
entryId) (EntryId -> Maybe EntryId
forall a. a -> Maybe a
Just (EntryId
entryIdEntryId -> EntryId -> EntryId
forall a. Num a => a -> a -> a
+EntryId
1)) [(EntryId, String)]
sorted
createDirectoryIfMissing True archiveDir
forM_ relevant $ \(EntryId
_startEntry, String
logFilePath) ->
String -> String -> IO ()
renameFile String
logFilePath (String
archiveDir String -> String -> String
</> String -> String
takeFileName String
logFilePath)
where
archiveDir :: String
archiveDir = LogKey object -> String
forall object. LogKey object -> String
logDirectory (FileLog object -> LogKey object
forall object. FileLog object -> LogKey object
logIdentifier FileLog object
fLog) String -> String -> String
</> String
"Archive"
getNextDurableEntryId :: FileLog object -> IO EntryId
getNextDurableEntryId :: forall object. FileLog object -> IO EntryId
getNextDurableEntryId FileLog object
fLog = STM EntryId -> IO EntryId
forall a. STM a -> IO a
atomically (STM EntryId -> IO EntryId) -> STM EntryId -> IO EntryId
forall a b. (a -> b) -> a -> b
$ do
(entries, _) <- TVar ([ByteString], [IO ()]) -> STM ([ByteString], [IO ()])
forall a. TVar a -> STM a
readTVar (FileLog object -> TVar ([ByteString], [IO ()])
forall object. FileLog object -> TVar ([ByteString], [IO ()])
logQueue FileLog object
fLog)
next <- readTVar (logNextEntryId fLog)
return (next - length entries)
cutFileLog :: FileLog object -> IO EntryId
cutFileLog :: forall object. FileLog object -> IO EntryId
cutFileLog FileLog object
fLog = do
mvar <- IO (MVar EntryId)
forall a. IO (MVar a)
newEmptyMVar
let action = do currentEntryId <- FileLog object -> IO EntryId
forall object. FileLog object -> IO EntryId
getNextDurableEntryId FileLog object
fLog
modifyMVar_ (logCurrent fLog) $ \FHandle
old ->
do FHandle -> IO ()
close FHandle
old
String -> IO FHandle
open (LogKey object -> String
forall object. LogKey object -> String
logDirectory LogKey object
key String -> String -> String
</> String -> EntryId -> String
formatLogFile (LogKey object -> String
forall object. LogKey object -> String
logPrefix LogKey object
key) EntryId
currentEntryId)
putMVar mvar currentEntryId
pushAction fLog action
takeMVar mvar
where
key :: LogKey object
key = FileLog object -> LogKey object
forall object. FileLog object -> LogKey object
logIdentifier FileLog object
fLog
newestEntry :: LogKey object -> IO (Maybe object)
newestEntry :: forall object. LogKey object -> IO (Maybe object)
newestEntry LogKey object
identifier = do
logFiles <- LogKey object -> IO [(EntryId, String)]
forall object. LogKey object -> IO [(EntryId, String)]
findLogFiles LogKey object
identifier
let sorted = [(EntryId, String)] -> [(EntryId, String)]
forall a. [a] -> [a]
reverse ([(EntryId, String)] -> [(EntryId, String)])
-> [(EntryId, String)] -> [(EntryId, String)]
forall a b. (a -> b) -> a -> b
$ [(EntryId, String)] -> [(EntryId, String)]
forall a. Ord a => [a] -> [a]
sort [(EntryId, String)]
logFiles
(_eventIds, files) = unzip sorted
worker files
where
worker :: [String] -> IO (Maybe object)
worker [] = Maybe object -> IO (Maybe object)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe object
forall a. Maybe a
Nothing
worker (String
logFile:[String]
logFiles) = do
archive <- (ByteString -> ByteString) -> IO ByteString -> IO ByteString
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ByteString -> ByteString
Lazy.fromStrict (IO ByteString -> IO ByteString) -> IO ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ String -> IO ByteString
Strict.readFile String
logFile
case archiveRead (logArchiver identifier) archive of
Entries
Done -> [String] -> IO (Maybe object)
worker [String]
logFiles
Next ByteString
entry Entries
next -> Maybe object -> IO (Maybe object)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe object -> IO (Maybe object))
-> Maybe object -> IO (Maybe object)
forall a b. (a -> b) -> a -> b
$ object -> Maybe object
forall a. a -> Maybe a
Just (LogKey object -> ByteString -> object
forall object. LogKey object -> ByteString -> object
decode' LogKey object
identifier (ByteString -> Entries -> ByteString
lastEntry ByteString
entry Entries
next))
Fail String
msg -> String -> IO (Maybe object)
forall a. HasCallStack => String -> a
error (String -> IO (Maybe object)) -> String -> IO (Maybe object)
forall a b. (a -> b) -> a -> b
$ String
"Data.Acid.Log: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
msg
lastEntry :: ByteString -> Entries -> ByteString
lastEntry ByteString
entry Entries
Done = ByteString
entry
lastEntry ByteString
entry (Fail String
msg) = String -> ByteString
forall a. HasCallStack => String -> a
error (String -> ByteString) -> String -> ByteString
forall a b. (a -> b) -> a -> b
$ String
"Data.Acid.Log: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
msg
lastEntry ByteString
_ (Next ByteString
entry Entries
next) = ByteString -> Entries -> ByteString
lastEntry ByteString
entry Entries
next
pushEntry :: FileLog object -> object -> IO () -> IO ()
pushEntry :: forall object. FileLog object -> object -> IO () -> IO ()
pushEntry FileLog object
fLog object
object IO ()
finally = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
tid <- TVar EntryId -> STM EntryId
forall a. TVar a -> STM a
readTVar (FileLog object -> TVar EntryId
forall object. FileLog object -> TVar EntryId
logNextEntryId FileLog object
fLog)
writeTVar (logNextEntryId fLog) $! tid+1
(entries, actions) <- readTVar (logQueue fLog)
writeTVar (logQueue fLog) ( encoded : entries, finally : actions )
where
encoded :: ByteString
encoded = [ByteString] -> ByteString
Lazy.fromChunks [ ByteString -> ByteString
Strict.copy (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
Lazy.toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$
Serialiser object -> object -> ByteString
forall a. Serialiser a -> a -> ByteString
serialiserEncode (LogKey object -> Serialiser object
forall object. LogKey object -> Serialiser object
logSerialiser (FileLog object -> LogKey object
forall object. FileLog object -> LogKey object
logIdentifier FileLog object
fLog)) object
object ]
pushAction :: FileLog object -> IO () -> IO ()
pushAction :: forall object. FileLog object -> IO () -> IO ()
pushAction FileLog object
fLog IO ()
finally = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
(entries, actions) <- TVar ([ByteString], [IO ()]) -> STM ([ByteString], [IO ()])
forall a. TVar a -> STM a
readTVar (FileLog object -> TVar ([ByteString], [IO ()])
forall object. FileLog object -> TVar ([ByteString], [IO ()])
logQueue FileLog object
fLog)
writeTVar (logQueue fLog) (entries, finally : actions)
askCurrentEntryId :: FileLog object -> IO EntryId
askCurrentEntryId :: forall object. FileLog object -> IO EntryId
askCurrentEntryId FileLog object
fLog = STM EntryId -> IO EntryId
forall a. STM a -> IO a
atomically (STM EntryId -> IO EntryId) -> STM EntryId -> IO EntryId
forall a b. (a -> b) -> a -> b
$
TVar EntryId -> STM EntryId
forall a. TVar a -> STM a
readTVar (FileLog object -> TVar EntryId
forall object. FileLog object -> TVar EntryId
logNextEntryId FileLog object
fLog)
decode' :: LogKey object -> Lazy.ByteString -> object
decode' :: forall object. LogKey object -> ByteString -> object
decode' LogKey object
s ByteString
inp =
case Serialiser object -> ByteString -> Either String object
forall a. Serialiser a -> ByteString -> Either String a
serialiserDecode (LogKey object -> Serialiser object
forall object. LogKey object -> Serialiser object
logSerialiser LogKey object
s) ByteString
inp of
Left String
msg -> String -> object
forall a. HasCallStack => String -> a
error (String -> object) -> String -> object
forall a b. (a -> b) -> a -> b
$ String
"Data.Acid.Log: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
msg
Right object
val -> object
val