{- | This module contains low-level utilities around file seeking

potentially also contains all Streamly related low-level utilities.

later this module can be renamed / moved to an internal module.
-}
module DataFrame.IO.Parquet.Seeking (
    SeekableHandle (getSeekableHandle),
    SeekMode (..),
    FileBufferedOrSeekable (..),
    ForceNonSeekable,
    advanceBytes,
    mkFileBufferedOrSeekable,
    mkSeekableHandle,
    readLastBytes,
    seekAndReadBytes,
    seekAndStreamBytes,
    withFileBufferedOrSeekable,
) where

import Control.Monad
import Control.Monad.IO.Class
import qualified Data.ByteString as BS
import Data.IORef
import Data.Int
import Data.Word
import Streamly.Data.Stream (Stream)
import qualified Streamly.Data.Stream as S
import qualified Streamly.External.ByteString as SBS
import qualified Streamly.FileSystem.Handle as SHandle
import System.IO

{- | This handle carries a proof that it must be seekable.
Note: Handle and SeekableHandle are not thread safe, should not be
shared across threads, beaware when running parallel/concurrent code.

Not seekable:
  - stdin / stdout
  - pipes / FIFOs

But regular files are always seekable. Parquet fundamentally wants random
access, a non-seekable source will not support effecient access without
buffering the entire file.
-}
newtype SeekableHandle = SeekableHandle {SeekableHandle -> Handle
getSeekableHandle :: Handle}

{- | If we truely want to support non-seekable files, we need to also consider the case
to buffer the entire file in memory.

Not thread safe, contains mutable reference (as Handle already is).

If we need concurrent / parallel parsing or something, we need to read into ByteString
first, not sharing the same handle.
-}
data FileBufferedOrSeekable
    = FileBuffered !(IORef Int64) !BS.ByteString
    | FileSeekable !SeekableHandle

-- | Smart constructor for SeekableHandle
mkSeekableHandle :: Handle -> IO (Maybe SeekableHandle)
mkSeekableHandle :: Handle -> IO (Maybe SeekableHandle)
mkSeekableHandle Handle
h = do
    Bool
seekable <- Handle -> IO Bool
hIsSeekable Handle
h
    Maybe SeekableHandle -> IO (Maybe SeekableHandle)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe SeekableHandle -> IO (Maybe SeekableHandle))
-> Maybe SeekableHandle -> IO (Maybe SeekableHandle)
forall a b. (a -> b) -> a -> b
$ if Bool
seekable then SeekableHandle -> Maybe SeekableHandle
forall a. a -> Maybe a
Just (Handle -> SeekableHandle
SeekableHandle Handle
h) else Maybe SeekableHandle
forall a. Maybe a
Nothing

-- | For testing only
type ForceNonSeekable = Maybe Bool

{- | Smart constructor for FileBufferedOrSeekable, tries to keep in the seekable case
if possible.
-}
mkFileBufferedOrSeekable ::
    ForceNonSeekable -> Handle -> IO FileBufferedOrSeekable
mkFileBufferedOrSeekable :: ForceNonSeekable -> Handle -> IO FileBufferedOrSeekable
mkFileBufferedOrSeekable ForceNonSeekable
forceNonSeek Handle
h = do
    Bool
seekable <- Handle -> IO Bool
hIsSeekable Handle
h
    if Bool -> Bool
not Bool
seekable Bool -> Bool -> Bool
|| ForceNonSeekable
forceNonSeek ForceNonSeekable -> ForceNonSeekable -> Bool
forall a. Eq a => a -> a -> Bool
== Bool -> ForceNonSeekable
forall a. a -> Maybe a
Just Bool
True
        then IORef Int64 -> ByteString -> FileBufferedOrSeekable
FileBuffered (IORef Int64 -> ByteString -> FileBufferedOrSeekable)
-> IO (IORef Int64) -> IO (ByteString -> FileBufferedOrSeekable)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int64 -> IO (IORef Int64)
forall a. a -> IO (IORef a)
newIORef Int64
0 IO (ByteString -> FileBufferedOrSeekable)
-> IO ByteString -> IO FileBufferedOrSeekable
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Handle -> IO ByteString
BS.hGetContents Handle
h
        else FileBufferedOrSeekable -> IO FileBufferedOrSeekable
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (FileBufferedOrSeekable -> IO FileBufferedOrSeekable)
-> FileBufferedOrSeekable -> IO FileBufferedOrSeekable
forall a b. (a -> b) -> a -> b
$ SeekableHandle -> FileBufferedOrSeekable
FileSeekable (SeekableHandle -> FileBufferedOrSeekable)
-> SeekableHandle -> FileBufferedOrSeekable
forall a b. (a -> b) -> a -> b
$ Handle -> SeekableHandle
SeekableHandle Handle
h

{- | With / bracket pattern for FileBufferedOrSeekable

Warning: do not return the FileBufferedOrSeekable outside the scope of the action as
it will be closed.
-}
withFileBufferedOrSeekable ::
    ForceNonSeekable ->
    FilePath ->
    IOMode ->
    (FileBufferedOrSeekable -> IO a) ->
    IO a
withFileBufferedOrSeekable :: forall a.
ForceNonSeekable
-> FilePath -> IOMode -> (FileBufferedOrSeekable -> IO a) -> IO a
withFileBufferedOrSeekable ForceNonSeekable
forceNonSeek FilePath
path IOMode
ioMode FileBufferedOrSeekable -> IO a
action = FilePath -> IOMode -> (Handle -> IO a) -> IO a
forall r. FilePath -> IOMode -> (Handle -> IO r) -> IO r
withFile FilePath
path IOMode
ioMode ((Handle -> IO a) -> IO a) -> (Handle -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Handle
h -> do
    FileBufferedOrSeekable
fbos <- ForceNonSeekable -> Handle -> IO FileBufferedOrSeekable
mkFileBufferedOrSeekable ForceNonSeekable
forceNonSeek Handle
h
    FileBufferedOrSeekable -> IO a
action FileBufferedOrSeekable
fbos

-- | Read from the end, useful for reading metadata without loading entire file
readLastBytes :: Integer -> FileBufferedOrSeekable -> IO BS.ByteString
readLastBytes :: Integer -> FileBufferedOrSeekable -> IO ByteString
readLastBytes Integer
n (FileSeekable SeekableHandle
sh) = do
    let h :: Handle
h = SeekableHandle -> Handle
getSeekableHandle SeekableHandle
sh
    Handle -> SeekMode -> Integer -> IO ()
hSeek Handle
h SeekMode
SeekFromEnd (Integer -> Integer
forall a. Num a => a -> a
negate Integer
n)
    Fold IO Word8 ByteString -> Stream IO Word8 -> IO ByteString
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
S.fold Fold IO Word8 ByteString
forall (m :: * -> *). MonadIO m => Fold m Word8 ByteString
SBS.write (Handle -> Stream IO Word8
forall (m :: * -> *). MonadIO m => Handle -> Stream m Word8
SHandle.read Handle
h)
readLastBytes Integer
n (FileBuffered IORef Int64
i ByteString
bs) = do
    IORef Int64 -> Int64 -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Int64
i (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ ByteString -> Int
BS.length ByteString
bs)
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Integer
n Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
> Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteString -> Int
BS.length ByteString
bs)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath -> IO ()
forall a. HasCallStack => FilePath -> a
error FilePath
"lastBytes: n > length bs"
    ByteString -> IO ByteString
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> ByteString
BS.drop (ByteString -> Int
BS.length ByteString
bs Int -> Int -> Int
forall a. Num a => a -> a -> a
- Integer -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
n) ByteString
bs

-- | Note: this does not guarantee n bytes (if it ends early)
advanceBytes :: Int -> FileBufferedOrSeekable -> IO BS.ByteString
advanceBytes :: Int -> FileBufferedOrSeekable -> IO ByteString
advanceBytes = Maybe (SeekMode, Integer)
-> Int -> FileBufferedOrSeekable -> IO ByteString
seekAndReadBytes Maybe (SeekMode, Integer)
forall a. Maybe a
Nothing

-- | Note: this does not guarantee n bytes (if it ends early)
seekAndReadBytes ::
    Maybe (SeekMode, Integer) -> Int -> FileBufferedOrSeekable -> IO BS.ByteString
seekAndReadBytes :: Maybe (SeekMode, Integer)
-> Int -> FileBufferedOrSeekable -> IO ByteString
seekAndReadBytes Maybe (SeekMode, Integer)
mSeek Int
len FileBufferedOrSeekable
f = Maybe (SeekMode, Integer)
-> Int -> FileBufferedOrSeekable -> IO (Stream IO Word8)
forall (m :: * -> *).
MonadIO m =>
Maybe (SeekMode, Integer)
-> Int -> FileBufferedOrSeekable -> m (Stream m Word8)
seekAndStreamBytes Maybe (SeekMode, Integer)
mSeek Int
len FileBufferedOrSeekable
f IO (Stream IO Word8)
-> (Stream IO Word8 -> IO ByteString) -> IO ByteString
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Fold IO Word8 ByteString -> Stream IO Word8 -> IO ByteString
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
S.fold Fold IO Word8 ByteString
forall (m :: * -> *). MonadIO m => Fold m Word8 ByteString
SBS.write

{- | Warning: the stream produced from this function accesses to the mutable handler.
if multiple streams are pulled from the same handler at the same time, chaos happen.
Make sure there is only one stream running at one time for each SeekableHandle,
and streams are not read again when they are not used anymore.
-}
seekAndStreamBytes ::
    (MonadIO m) =>
    Maybe (SeekMode, Integer) -> Int -> FileBufferedOrSeekable -> m (Stream m Word8)
seekAndStreamBytes :: forall (m :: * -> *).
MonadIO m =>
Maybe (SeekMode, Integer)
-> Int -> FileBufferedOrSeekable -> m (Stream m Word8)
seekAndStreamBytes Maybe (SeekMode, Integer)
mSeek Int
len FileBufferedOrSeekable
f = do
    IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
        case Maybe (SeekMode, Integer)
mSeek of
            Maybe (SeekMode, Integer)
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            Just (SeekMode
seekMode, Integer
seekTo) -> FileBufferedOrSeekable -> SeekMode -> Integer -> IO ()
fSeek FileBufferedOrSeekable
f SeekMode
seekMode Integer
seekTo
    Stream m Word8 -> m (Stream m Word8)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Stream m Word8 -> m (Stream m Word8))
-> Stream m Word8 -> m (Stream m Word8)
forall a b. (a -> b) -> a -> b
$ Int -> Stream m Word8 -> Stream m Word8
forall (m :: * -> *) a.
Applicative m =>
Int -> Stream m a -> Stream m a
S.take Int
len (Stream m Word8 -> Stream m Word8)
-> Stream m Word8 -> Stream m Word8
forall a b. (a -> b) -> a -> b
$ FileBufferedOrSeekable -> Stream m Word8
forall (m :: * -> *).
MonadIO m =>
FileBufferedOrSeekable -> Stream m Word8
fRead FileBufferedOrSeekable
f

fSeek :: FileBufferedOrSeekable -> SeekMode -> Integer -> IO ()
fSeek :: FileBufferedOrSeekable -> SeekMode -> Integer -> IO ()
fSeek (FileSeekable (SeekableHandle Handle
h)) SeekMode
seekMode Integer
seekTo = Handle -> SeekMode -> Integer -> IO ()
hSeek Handle
h SeekMode
seekMode Integer
seekTo
fSeek (FileBuffered IORef Int64
i ByteString
bs) SeekMode
AbsoluteSeek Integer
seekTo = IORef Int64 -> Int64 -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Int64
i (Integer -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
seekTo)
fSeek (FileBuffered IORef Int64
i ByteString
bs) SeekMode
RelativeSeek Integer
seekTo = IORef Int64 -> (Int64 -> Int64) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef Int64
i (Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Integer -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
seekTo)
fSeek (FileBuffered IORef Int64
i ByteString
bs) SeekMode
SeekFromEnd Integer
seekTo = IORef Int64 -> Int64 -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Int64
i (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int64) -> Int -> Int64
forall a b. (a -> b) -> a -> b
$ ByteString -> Int
BS.length ByteString
bs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Integer -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
seekTo)

fRead :: (MonadIO m) => FileBufferedOrSeekable -> Stream m Word8
fRead :: forall (m :: * -> *).
MonadIO m =>
FileBufferedOrSeekable -> Stream m Word8
fRead (FileSeekable (SeekableHandle Handle
h)) = Handle -> Stream m Word8
forall (m :: * -> *). MonadIO m => Handle -> Stream m Word8
SHandle.read Handle
h
fRead (FileBuffered IORef Int64
i ByteString
bs) = m (Stream m Word8) -> Stream m Word8
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
S.concatEffect (m (Stream m Word8) -> Stream m Word8)
-> m (Stream m Word8) -> Stream m Word8
forall a b. (a -> b) -> a -> b
$ do
    Int64
pos <- IO Int64 -> m Int64
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int64 -> m Int64) -> IO Int64 -> m Int64
forall a b. (a -> b) -> a -> b
$ IORef Int64 -> IO Int64
forall a. IORef a -> IO a
readIORef IORef Int64
i
    Stream m Word8 -> m (Stream m Word8)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Stream m Word8 -> m (Stream m Word8))
-> Stream m Word8 -> m (Stream m Word8)
forall a b. (a -> b) -> a -> b
$
        (Word8 -> m Word8) -> Stream m Word8 -> Stream m Word8
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
S.mapM
            ( \Word8
x -> do
                IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef Int64 -> (Int64 -> Int64) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef Int64
i (Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
1))
                Word8 -> m Word8
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Word8
x
            )
            (Unfold m ByteString Word8 -> ByteString -> Stream m Word8
forall (m :: * -> *) a b.
Applicative m =>
Unfold m a b -> a -> Stream m b
S.unfold Unfold m ByteString Word8
forall (m :: * -> *). Monad m => Unfold m ByteString Word8
SBS.reader (Int -> ByteString -> ByteString
BS.drop (Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
pos) ByteString
bs))