module DataFrame.IO.Parquet.Seeking (
SeekableHandle (getSeekableHandle),
SeekMode (..),
FileBufferedOrSeekable (..),
ForceNonSeekable,
advanceBytes,
mkFileBufferedOrSeekable,
mkSeekableHandle,
readLastBytes,
seekAndReadBytes,
seekAndStreamBytes,
withFileBufferedOrSeekable,
) where
import Control.Monad
import Control.Monad.IO.Class
import qualified Data.ByteString as BS
import Data.IORef
import Data.Int
import Data.Word
import Streamly.Data.Stream (Stream)
import qualified Streamly.Data.Stream as S
import qualified Streamly.External.ByteString as SBS
import qualified Streamly.FileSystem.Handle as SHandle
import System.IO
newtype SeekableHandle = SeekableHandle {SeekableHandle -> Handle
getSeekableHandle :: Handle}
data FileBufferedOrSeekable
= FileBuffered !(IORef Int64) !BS.ByteString
| FileSeekable !SeekableHandle
mkSeekableHandle :: Handle -> IO (Maybe SeekableHandle)
mkSeekableHandle :: Handle -> IO (Maybe SeekableHandle)
mkSeekableHandle Handle
h = do
Bool
seekable <- Handle -> IO Bool
hIsSeekable Handle
h
Maybe SeekableHandle -> IO (Maybe SeekableHandle)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe SeekableHandle -> IO (Maybe SeekableHandle))
-> Maybe SeekableHandle -> IO (Maybe SeekableHandle)
forall a b. (a -> b) -> a -> b
$ if Bool
seekable then SeekableHandle -> Maybe SeekableHandle
forall a. a -> Maybe a
Just (Handle -> SeekableHandle
SeekableHandle Handle
h) else Maybe SeekableHandle
forall a. Maybe a
Nothing
type ForceNonSeekable = Maybe Bool
mkFileBufferedOrSeekable ::
ForceNonSeekable -> Handle -> IO FileBufferedOrSeekable
mkFileBufferedOrSeekable :: ForceNonSeekable -> Handle -> IO FileBufferedOrSeekable
mkFileBufferedOrSeekable ForceNonSeekable
forceNonSeek Handle
h = do
Bool
seekable <- Handle -> IO Bool
hIsSeekable Handle
h
if Bool -> Bool
not Bool
seekable Bool -> Bool -> Bool
|| ForceNonSeekable
forceNonSeek ForceNonSeekable -> ForceNonSeekable -> Bool
forall a. Eq a => a -> a -> Bool
== Bool -> ForceNonSeekable
forall a. a -> Maybe a
Just Bool
True
then IORef Int64 -> ByteString -> FileBufferedOrSeekable
FileBuffered (IORef Int64 -> ByteString -> FileBufferedOrSeekable)
-> IO (IORef Int64) -> IO (ByteString -> FileBufferedOrSeekable)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int64 -> IO (IORef Int64)
forall a. a -> IO (IORef a)
newIORef Int64
0 IO (ByteString -> FileBufferedOrSeekable)
-> IO ByteString -> IO FileBufferedOrSeekable
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Handle -> IO ByteString
BS.hGetContents Handle
h
else FileBufferedOrSeekable -> IO FileBufferedOrSeekable
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (FileBufferedOrSeekable -> IO FileBufferedOrSeekable)
-> FileBufferedOrSeekable -> IO FileBufferedOrSeekable
forall a b. (a -> b) -> a -> b
$ SeekableHandle -> FileBufferedOrSeekable
FileSeekable (SeekableHandle -> FileBufferedOrSeekable)
-> SeekableHandle -> FileBufferedOrSeekable
forall a b. (a -> b) -> a -> b
$ Handle -> SeekableHandle
SeekableHandle Handle
h
withFileBufferedOrSeekable ::
ForceNonSeekable ->
FilePath ->
IOMode ->
(FileBufferedOrSeekable -> IO a) ->
IO a
withFileBufferedOrSeekable :: forall a.
ForceNonSeekable
-> FilePath -> IOMode -> (FileBufferedOrSeekable -> IO a) -> IO a
withFileBufferedOrSeekable ForceNonSeekable
forceNonSeek FilePath
path IOMode
ioMode FileBufferedOrSeekable -> IO a
action = FilePath -> IOMode -> (Handle -> IO a) -> IO a
forall r. FilePath -> IOMode -> (Handle -> IO r) -> IO r
withFile FilePath
path IOMode
ioMode ((Handle -> IO a) -> IO a) -> (Handle -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Handle
h -> do
FileBufferedOrSeekable
fbos <- ForceNonSeekable -> Handle -> IO FileBufferedOrSeekable
mkFileBufferedOrSeekable ForceNonSeekable
forceNonSeek Handle
h
FileBufferedOrSeekable -> IO a
action FileBufferedOrSeekable
fbos
readLastBytes :: Integer -> FileBufferedOrSeekable -> IO BS.ByteString
readLastBytes :: Integer -> FileBufferedOrSeekable -> IO ByteString
readLastBytes Integer
n (FileSeekable SeekableHandle
sh) = do
let h :: Handle
h = SeekableHandle -> Handle
getSeekableHandle SeekableHandle
sh
Handle -> SeekMode -> Integer -> IO ()
hSeek Handle
h SeekMode
SeekFromEnd (Integer -> Integer
forall a. Num a => a -> a
negate Integer
n)
Fold IO Word8 ByteString -> Stream IO Word8 -> IO ByteString
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
S.fold Fold IO Word8 ByteString
forall (m :: * -> *). MonadIO m => Fold m Word8 ByteString
SBS.write (Handle -> Stream IO Word8
forall (m :: * -> *). MonadIO m => Handle -> Stream m Word8
SHandle.read Handle
h)
readLastBytes Integer
n (FileBuffered IORef Int64
i ByteString
bs) = do
IORef Int64 -> Int64 -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Int64
i (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ ByteString -> Int
BS.length ByteString
bs)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Integer
n Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
> Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteString -> Int
BS.length ByteString
bs)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath -> IO ()
forall a. HasCallStack => FilePath -> a
error FilePath
"lastBytes: n > length bs"
ByteString -> IO ByteString
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> ByteString
BS.drop (ByteString -> Int
BS.length ByteString
bs Int -> Int -> Int
forall a. Num a => a -> a -> a
- Integer -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
n) ByteString
bs
advanceBytes :: Int -> FileBufferedOrSeekable -> IO BS.ByteString
advanceBytes :: Int -> FileBufferedOrSeekable -> IO ByteString
advanceBytes = Maybe (SeekMode, Integer)
-> Int -> FileBufferedOrSeekable -> IO ByteString
seekAndReadBytes Maybe (SeekMode, Integer)
forall a. Maybe a
Nothing
seekAndReadBytes ::
Maybe (SeekMode, Integer) -> Int -> FileBufferedOrSeekable -> IO BS.ByteString
seekAndReadBytes :: Maybe (SeekMode, Integer)
-> Int -> FileBufferedOrSeekable -> IO ByteString
seekAndReadBytes Maybe (SeekMode, Integer)
mSeek Int
len FileBufferedOrSeekable
f = Maybe (SeekMode, Integer)
-> Int -> FileBufferedOrSeekable -> IO (Stream IO Word8)
forall (m :: * -> *).
MonadIO m =>
Maybe (SeekMode, Integer)
-> Int -> FileBufferedOrSeekable -> m (Stream m Word8)
seekAndStreamBytes Maybe (SeekMode, Integer)
mSeek Int
len FileBufferedOrSeekable
f IO (Stream IO Word8)
-> (Stream IO Word8 -> IO ByteString) -> IO ByteString
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Fold IO Word8 ByteString -> Stream IO Word8 -> IO ByteString
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
S.fold Fold IO Word8 ByteString
forall (m :: * -> *). MonadIO m => Fold m Word8 ByteString
SBS.write
seekAndStreamBytes ::
(MonadIO m) =>
Maybe (SeekMode, Integer) -> Int -> FileBufferedOrSeekable -> m (Stream m Word8)
seekAndStreamBytes :: forall (m :: * -> *).
MonadIO m =>
Maybe (SeekMode, Integer)
-> Int -> FileBufferedOrSeekable -> m (Stream m Word8)
seekAndStreamBytes Maybe (SeekMode, Integer)
mSeek Int
len FileBufferedOrSeekable
f = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
case Maybe (SeekMode, Integer)
mSeek of
Maybe (SeekMode, Integer)
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just (SeekMode
seekMode, Integer
seekTo) -> FileBufferedOrSeekable -> SeekMode -> Integer -> IO ()
fSeek FileBufferedOrSeekable
f SeekMode
seekMode Integer
seekTo
Stream m Word8 -> m (Stream m Word8)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Stream m Word8 -> m (Stream m Word8))
-> Stream m Word8 -> m (Stream m Word8)
forall a b. (a -> b) -> a -> b
$ Int -> Stream m Word8 -> Stream m Word8
forall (m :: * -> *) a.
Applicative m =>
Int -> Stream m a -> Stream m a
S.take Int
len (Stream m Word8 -> Stream m Word8)
-> Stream m Word8 -> Stream m Word8
forall a b. (a -> b) -> a -> b
$ FileBufferedOrSeekable -> Stream m Word8
forall (m :: * -> *).
MonadIO m =>
FileBufferedOrSeekable -> Stream m Word8
fRead FileBufferedOrSeekable
f
fSeek :: FileBufferedOrSeekable -> SeekMode -> Integer -> IO ()
fSeek :: FileBufferedOrSeekable -> SeekMode -> Integer -> IO ()
fSeek (FileSeekable (SeekableHandle Handle
h)) SeekMode
seekMode Integer
seekTo = Handle -> SeekMode -> Integer -> IO ()
hSeek Handle
h SeekMode
seekMode Integer
seekTo
fSeek (FileBuffered IORef Int64
i ByteString
bs) SeekMode
AbsoluteSeek Integer
seekTo = IORef Int64 -> Int64 -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Int64
i (Integer -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
seekTo)
fSeek (FileBuffered IORef Int64
i ByteString
bs) SeekMode
RelativeSeek Integer
seekTo = IORef Int64 -> (Int64 -> Int64) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef Int64
i (Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Integer -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
seekTo)
fSeek (FileBuffered IORef Int64
i ByteString
bs) SeekMode
SeekFromEnd Integer
seekTo = IORef Int64 -> Int64 -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Int64
i (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ ByteString -> Int
BS.length ByteString
bs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Integer -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
seekTo)
fRead :: (MonadIO m) => FileBufferedOrSeekable -> Stream m Word8
fRead :: forall (m :: * -> *).
MonadIO m =>
FileBufferedOrSeekable -> Stream m Word8
fRead (FileSeekable (SeekableHandle Handle
h)) = Handle -> Stream m Word8
forall (m :: * -> *). MonadIO m => Handle -> Stream m Word8
SHandle.read Handle
h
fRead (FileBuffered IORef Int64
i ByteString
bs) = m (Stream m Word8) -> Stream m Word8
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
S.concatEffect (m (Stream m Word8) -> Stream m Word8)
-> m (Stream m Word8) -> Stream m Word8
forall a b. (a -> b) -> a -> b
$ do
Int64
pos <- IO Int64 -> m Int64
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int64 -> m Int64) -> IO Int64 -> m Int64
forall a b. (a -> b) -> a -> b
$ IORef Int64 -> IO Int64
forall a. IORef a -> IO a
readIORef IORef Int64
i
Stream m Word8 -> m (Stream m Word8)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Stream m Word8 -> m (Stream m Word8))
-> Stream m Word8 -> m (Stream m Word8)
forall a b. (a -> b) -> a -> b
$
(Word8 -> m Word8) -> Stream m Word8 -> Stream m Word8
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
S.mapM
( \Word8
x -> do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef Int64 -> (Int64 -> Int64) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef Int64
i (Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
1))
Word8 -> m Word8
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Word8
x
)
(Unfold m ByteString Word8 -> ByteString -> Stream m Word8
forall (m :: * -> *) a b.
Applicative m =>
Unfold m a b -> a -> Stream m b
S.unfold Unfold m ByteString Word8
forall (m :: * -> *). Monad m => Unfold m ByteString Word8
SBS.reader (Int -> ByteString -> ByteString
BS.drop (Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
pos) ByteString
bs))