{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeApplications #-}

module DataFrame.IO.Parquet.Page where

import qualified Codec.Compression.GZip as GZip
import qualified Codec.Compression.Zstd.Streaming as Zstd
import Data.Bits
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as LB
import Data.Int
import Data.Maybe (fromMaybe)
import qualified Data.Vector.Unboxed as VU
import DataFrame.IO.Parquet.Binary
import DataFrame.IO.Parquet.Thrift
import DataFrame.IO.Parquet.Types
import GHC.Float
import qualified Snappy

isDataPage :: Page -> Bool
isDataPage :: Page -> Bool
isDataPage Page
page = case PageHeader -> PageTypeHeader
pageTypeHeader (Page -> PageHeader
pageHeader Page
page) of
    DataPageHeader{Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderNumValues :: Int32
dataPageHeaderEncoding :: ParquetEncoding
definitionLevelEncoding :: ParquetEncoding
repetitionLevelEncoding :: ParquetEncoding
dataPageHeaderStatistics :: ColumnStatistics
dataPageHeaderStatistics :: PageTypeHeader -> ColumnStatistics
repetitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
definitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderNumValues :: PageTypeHeader -> Int32
..} -> Bool
True
    DataPageHeaderV2{Bool
Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderV2NumValues :: Int32
dataPageHeaderV2NumNulls :: Int32
dataPageHeaderV2NumRows :: Int32
dataPageHeaderV2Encoding :: ParquetEncoding
definitionLevelByteLength :: Int32
repetitionLevelByteLength :: Int32
dataPageHeaderV2IsCompressed :: Bool
dataPageHeaderV2Statistics :: ColumnStatistics
dataPageHeaderV2Statistics :: PageTypeHeader -> ColumnStatistics
dataPageHeaderV2IsCompressed :: PageTypeHeader -> Bool
repetitionLevelByteLength :: PageTypeHeader -> Int32
definitionLevelByteLength :: PageTypeHeader -> Int32
dataPageHeaderV2Encoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderV2NumRows :: PageTypeHeader -> Int32
dataPageHeaderV2NumNulls :: PageTypeHeader -> Int32
dataPageHeaderV2NumValues :: PageTypeHeader -> Int32
..} -> Bool
True
    PageTypeHeader
_ -> Bool
False

isDictionaryPage :: Page -> Bool
isDictionaryPage :: Page -> Bool
isDictionaryPage Page
page = case PageHeader -> PageTypeHeader
pageTypeHeader (Page -> PageHeader
pageHeader Page
page) of
    DictionaryPageHeader{Bool
Int32
ParquetEncoding
dictionaryPageHeaderNumValues :: Int32
dictionaryPageHeaderEncoding :: ParquetEncoding
dictionaryPageIsSorted :: Bool
dictionaryPageIsSorted :: PageTypeHeader -> Bool
dictionaryPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dictionaryPageHeaderNumValues :: PageTypeHeader -> Int32
..} -> Bool
True
    PageTypeHeader
_ -> Bool
False

readPage :: CompressionCodec -> BS.ByteString -> IO (Maybe Page, BS.ByteString)
readPage :: CompressionCodec -> ByteString -> IO (Maybe Page, ByteString)
readPage CompressionCodec
c ByteString
columnBytes =
    if ByteString -> Bool
BS.null ByteString
columnBytes
        then (Maybe Page, ByteString) -> IO (Maybe Page, ByteString)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Page
forall a. Maybe a
Nothing, ByteString
BS.empty)
        else do
            let (PageHeader
hdr, ByteString
rem) = PageHeader -> ByteString -> Int16 -> (PageHeader, ByteString)
readPageHeader PageHeader
emptyPageHeader ByteString
columnBytes Int16
0

            let compressed :: ByteString
compressed = Int -> ByteString -> ByteString
BS.take (Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32 -> Int) -> Int32 -> Int
forall a b. (a -> b) -> a -> b
$ PageHeader -> Int32
compressedPageSize PageHeader
hdr) ByteString
rem

            ByteString
fullData <- case CompressionCodec
c of
                CompressionCodec
ZSTD -> do
                    Result
result <- IO Result
Zstd.decompress
                    Result -> ByteString -> [ByteString] -> IO ByteString
drainZstd Result
result ByteString
compressed []
                  where
                    drainZstd :: Result -> ByteString -> [ByteString] -> IO ByteString
drainZstd (Zstd.Consume ByteString -> IO Result
f) ByteString
input [ByteString]
acc = do
                        Result
result <- ByteString -> IO Result
f ByteString
input
                        Result -> ByteString -> [ByteString] -> IO ByteString
drainZstd Result
result ByteString
BS.empty [ByteString]
acc
                    drainZstd (Zstd.Produce ByteString
chunk IO Result
next) ByteString
_ [ByteString]
acc = do
                        Result
result <- IO Result
next
                        Result -> ByteString -> [ByteString] -> IO ByteString
drainZstd Result
result ByteString
BS.empty (ByteString
chunk ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: [ByteString]
acc)
                    drainZstd (Zstd.Done ByteString
final) ByteString
_ [ByteString]
acc =
                        ByteString -> IO ByteString
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
BS.concat ([ByteString] -> [ByteString]
forall a. [a] -> [a]
reverse (ByteString
final ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: [ByteString]
acc))
                    drainZstd (Zstd.Error String
msg String
msg2) ByteString
_ [ByteString]
_ =
                        String -> IO ByteString
forall a. HasCallStack => String -> a
error (String
"ZSTD error: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
msg String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
msg2)
                CompressionCodec
SNAPPY -> case ByteString -> Either DecodeError ByteString
Snappy.decompress ByteString
compressed of
                    Left DecodeError
e -> String -> IO ByteString
forall a. HasCallStack => String -> a
error (DecodeError -> String
forall a. Show a => a -> String
show DecodeError
e)
                    Right ByteString
res -> ByteString -> IO ByteString
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
res
                CompressionCodec
UNCOMPRESSED -> ByteString -> IO ByteString
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
compressed
                CompressionCodec
GZIP -> ByteString -> IO ByteString
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LazyByteString -> ByteString
LB.toStrict (LazyByteString -> LazyByteString
GZip.decompress (ByteString -> LazyByteString
BS.fromStrict ByteString
compressed)))
                CompressionCodec
other -> String -> IO ByteString
forall a. HasCallStack => String -> a
error (String
"Unsupported compression type: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ CompressionCodec -> String
forall a. Show a => a -> String
show CompressionCodec
other)
            (Maybe Page, ByteString) -> IO (Maybe Page, ByteString)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
                ( Page -> Maybe Page
forall a. a -> Maybe a
Just (Page -> Maybe Page) -> Page -> Maybe Page
forall a b. (a -> b) -> a -> b
$ PageHeader -> ByteString -> Page
Page PageHeader
hdr ByteString
fullData
                , Int -> ByteString -> ByteString
BS.drop (Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32 -> Int) -> Int32 -> Int
forall a b. (a -> b) -> a -> b
$ PageHeader -> Int32
compressedPageSize PageHeader
hdr) ByteString
rem
                )

readPageHeader ::
    PageHeader -> BS.ByteString -> Int16 -> (PageHeader, BS.ByteString)
readPageHeader :: PageHeader -> ByteString -> Int16 -> (PageHeader, ByteString)
readPageHeader PageHeader
hdr ByteString
xs Int16
lastFieldId =
    if ByteString -> Bool
BS.null ByteString
xs
        then (PageHeader
hdr, ByteString
BS.empty)
        else
            let
                fieldContents :: Maybe (ByteString, TType, Int16)
fieldContents = ByteString -> Int16 -> Maybe (ByteString, TType, Int16)
readField' ByteString
xs Int16
lastFieldId
             in
                case Maybe (ByteString, TType, Int16)
fieldContents of
                    Maybe (ByteString, TType, Int16)
Nothing -> (PageHeader
hdr, Int -> ByteString -> ByteString
BS.drop Int
1 ByteString
xs)
                    Just (ByteString
rem, TType
elemType, Int16
identifier) -> case Int16
identifier of
                        Int16
1 ->
                            let
                                (Int32
pType, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
                             in
                                PageHeader -> ByteString -> Int16 -> (PageHeader, ByteString)
readPageHeader (PageHeader
hdr{pageHeaderPageType = pageTypeFromInt pType}) ByteString
rem' Int16
identifier
                        Int16
2 ->
                            let
                                (Int32
uncompressedPageSize, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
                             in
                                PageHeader -> ByteString -> Int16 -> (PageHeader, ByteString)
readPageHeader
                                    (PageHeader
hdr{uncompressedPageSize = uncompressedPageSize})
                                    ByteString
rem'
                                    Int16
identifier
                        Int16
3 ->
                            let
                                (Int32
compressedPageSize, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
                             in
                                PageHeader -> ByteString -> Int16 -> (PageHeader, ByteString)
readPageHeader (PageHeader
hdr{compressedPageSize = compressedPageSize}) ByteString
rem' Int16
identifier
                        Int16
4 ->
                            let
                                (Int32
crc, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
                             in
                                PageHeader -> ByteString -> Int16 -> (PageHeader, ByteString)
readPageHeader (PageHeader
hdr{pageHeaderCrcChecksum = crc}) ByteString
rem' Int16
identifier
                        Int16
5 ->
                            let
                                (PageTypeHeader
dataPageHeader, ByteString
rem') = PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader PageTypeHeader
emptyDataPageHeader ByteString
rem Int16
0
                             in
                                PageHeader -> ByteString -> Int16 -> (PageHeader, ByteString)
readPageHeader (PageHeader
hdr{pageTypeHeader = dataPageHeader}) ByteString
rem' Int16
identifier
                        Int16
6 -> String -> (PageHeader, ByteString)
forall a. HasCallStack => String -> a
error String
"Index page header not supported"
                        Int16
7 ->
                            let
                                (PageTypeHeader
dictionaryPageHeader, ByteString
rem') = PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader PageTypeHeader
emptyDictionaryPageHeader ByteString
rem Int16
0
                             in
                                PageHeader -> ByteString -> Int16 -> (PageHeader, ByteString)
readPageHeader (PageHeader
hdr{pageTypeHeader = dictionaryPageHeader}) ByteString
rem' Int16
identifier
                        Int16
8 ->
                            let
                                (PageTypeHeader
dataPageHeaderV2, ByteString
rem') = PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader PageTypeHeader
emptyDataPageHeaderV2 ByteString
rem Int16
0
                             in
                                PageHeader -> ByteString -> Int16 -> (PageHeader, ByteString)
readPageHeader (PageHeader
hdr{pageTypeHeader = dataPageHeaderV2}) ByteString
rem' Int16
identifier
                        Int16
n -> String -> (PageHeader, ByteString)
forall a. HasCallStack => String -> a
error (String -> (PageHeader, ByteString))
-> String -> (PageHeader, ByteString)
forall a b. (a -> b) -> a -> b
$ String
"Unknown page header field " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Int16 -> String
forall a. Show a => a -> String
show Int16
n

readPageTypeHeader ::
    PageTypeHeader -> BS.ByteString -> Int16 -> (PageTypeHeader, BS.ByteString)
readPageTypeHeader :: PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader PageTypeHeader
INDEX_PAGE_HEADER ByteString
_ Int16
_ = String -> (PageTypeHeader, ByteString)
forall a. HasCallStack => String -> a
error String
"readPageTypeHeader: unsupported INDEX_PAGE_HEADER"
readPageTypeHeader PageTypeHeader
PAGE_TYPE_HEADER_UNKNOWN ByteString
_ Int16
_ = String -> (PageTypeHeader, ByteString)
forall a. HasCallStack => String -> a
error String
"readPageTypeHeader: unsupported PAGE_TYPE_HEADER_UNKNOWN"
readPageTypeHeader hdr :: PageTypeHeader
hdr@(DictionaryPageHeader{Bool
Int32
ParquetEncoding
dictionaryPageIsSorted :: PageTypeHeader -> Bool
dictionaryPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dictionaryPageHeaderNumValues :: PageTypeHeader -> Int32
dictionaryPageHeaderNumValues :: Int32
dictionaryPageHeaderEncoding :: ParquetEncoding
dictionaryPageIsSorted :: Bool
..}) ByteString
xs Int16
lastFieldId =
    if ByteString -> Bool
BS.null ByteString
xs
        then (PageTypeHeader
hdr, ByteString
BS.empty)
        else
            let
                fieldContents :: Maybe (ByteString, TType, Int16)
fieldContents = ByteString -> Int16 -> Maybe (ByteString, TType, Int16)
readField' ByteString
xs Int16
lastFieldId
             in
                case Maybe (ByteString, TType, Int16)
fieldContents of
                    Maybe (ByteString, TType, Int16)
Nothing -> (PageTypeHeader
hdr, Int -> ByteString -> ByteString
BS.drop Int
1 ByteString
xs)
                    Just (ByteString
rem, TType
elemType, Int16
identifier) -> case Int16
identifier of
                        Int16
1 ->
                            let
                                (Int32
numValues, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
                             in
                                PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader
                                    (PageTypeHeader
hdr{dictionaryPageHeaderNumValues = numValues})
                                    ByteString
rem'
                                    Int16
identifier
                        Int16
2 ->
                            let
                                (Int32
enc, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
                             in
                                PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader
                                    (PageTypeHeader
hdr{dictionaryPageHeaderEncoding = parquetEncodingFromInt enc})
                                    ByteString
rem'
                                    Int16
identifier
                        Int16
3 ->
                            let
                                isSorted :: Word8
isSorted = Word8 -> Maybe Word8 -> Word8
forall a. a -> Maybe a -> a
fromMaybe (String -> Word8
forall a. HasCallStack => String -> a
error String
"readPageTypeHeader: not enough bytes") (ByteString
rem ByteString -> Int -> Maybe Word8
BS.!? Int
0)
                             in
                                PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader
                                    (PageTypeHeader
hdr{dictionaryPageIsSorted = isSorted == compactBooleanTrue})
                                    -- TODO(mchavinda): The bool logic here is a little tricky.
                                    -- If the field is a bool then you can get the value
                                    -- from the byte (and you don't have to drop a field).
                                    -- But in other cases you do.
                                    -- This might become a problem later but in the mean
                                    -- time I'm not dropping (this assumes this is the common case).
                                    ByteString
rem
                                    Int16
identifier
                        Int16
n ->
                            String -> (PageTypeHeader, ByteString)
forall a. HasCallStack => String -> a
error (String -> (PageTypeHeader, ByteString))
-> String -> (PageTypeHeader, ByteString)
forall a b. (a -> b) -> a -> b
$ String
"readPageTypeHeader: unsupported identifier " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Int16 -> String
forall a. Show a => a -> String
show Int16
n
readPageTypeHeader hdr :: PageTypeHeader
hdr@(DataPageHeader{Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderStatistics :: PageTypeHeader -> ColumnStatistics
repetitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
definitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderNumValues :: PageTypeHeader -> Int32
dataPageHeaderNumValues :: Int32
dataPageHeaderEncoding :: ParquetEncoding
definitionLevelEncoding :: ParquetEncoding
repetitionLevelEncoding :: ParquetEncoding
dataPageHeaderStatistics :: ColumnStatistics
..}) ByteString
xs Int16
lastFieldId =
    if ByteString -> Bool
BS.null ByteString
xs
        then (PageTypeHeader
hdr, ByteString
BS.empty)
        else
            let
                fieldContents :: Maybe (ByteString, TType, Int16)
fieldContents = ByteString -> Int16 -> Maybe (ByteString, TType, Int16)
readField' ByteString
xs Int16
lastFieldId
             in
                case Maybe (ByteString, TType, Int16)
fieldContents of
                    Maybe (ByteString, TType, Int16)
Nothing -> (PageTypeHeader
hdr, Int -> ByteString -> ByteString
BS.drop Int
1 ByteString
xs)
                    Just (ByteString
rem, TType
elemType, Int16
identifier) -> case Int16
identifier of
                        Int16
1 ->
                            let
                                (Int32
numValues, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
                             in
                                PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader (PageTypeHeader
hdr{dataPageHeaderNumValues = numValues}) ByteString
rem' Int16
identifier
                        Int16
2 ->
                            let
                                (Int32
enc, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
                             in
                                PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader
                                    (PageTypeHeader
hdr{dataPageHeaderEncoding = parquetEncodingFromInt enc})
                                    ByteString
rem'
                                    Int16
identifier
                        Int16
3 ->
                            let
                                (Int32
enc, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
                             in
                                PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader
                                    (PageTypeHeader
hdr{definitionLevelEncoding = parquetEncodingFromInt enc})
                                    ByteString
rem'
                                    Int16
identifier
                        Int16
4 ->
                            let
                                (Int32
enc, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
                             in
                                PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader
                                    (PageTypeHeader
hdr{repetitionLevelEncoding = parquetEncodingFromInt enc})
                                    ByteString
rem'
                                    Int16
identifier
                        Int16
5 ->
                            let
                                (ColumnStatistics
stats, ByteString
rem') = ColumnStatistics
-> ByteString -> Int16 -> (ColumnStatistics, ByteString)
readStatisticsFromBytes ColumnStatistics
emptyColumnStatistics ByteString
rem Int16
0
                             in
                                PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader (PageTypeHeader
hdr{dataPageHeaderStatistics = stats}) ByteString
rem' Int16
identifier
                        Int16
n -> String -> (PageTypeHeader, ByteString)
forall a. HasCallStack => String -> a
error (String -> (PageTypeHeader, ByteString))
-> String -> (PageTypeHeader, ByteString)
forall a b. (a -> b) -> a -> b
$ Int16 -> String
forall a. Show a => a -> String
show Int16
n
readPageTypeHeader hdr :: PageTypeHeader
hdr@(DataPageHeaderV2{Bool
Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderV2Statistics :: PageTypeHeader -> ColumnStatistics
dataPageHeaderV2IsCompressed :: PageTypeHeader -> Bool
repetitionLevelByteLength :: PageTypeHeader -> Int32
definitionLevelByteLength :: PageTypeHeader -> Int32
dataPageHeaderV2Encoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderV2NumRows :: PageTypeHeader -> Int32
dataPageHeaderV2NumNulls :: PageTypeHeader -> Int32
dataPageHeaderV2NumValues :: PageTypeHeader -> Int32
dataPageHeaderV2NumValues :: Int32
dataPageHeaderV2NumNulls :: Int32
dataPageHeaderV2NumRows :: Int32
dataPageHeaderV2Encoding :: ParquetEncoding
definitionLevelByteLength :: Int32
repetitionLevelByteLength :: Int32
dataPageHeaderV2IsCompressed :: Bool
dataPageHeaderV2Statistics :: ColumnStatistics
..}) ByteString
xs Int16
lastFieldId =
    if ByteString -> Bool
BS.null ByteString
xs
        then (PageTypeHeader
hdr, ByteString
BS.empty)
        else
            let
                fieldContents :: Maybe (ByteString, TType, Int16)
fieldContents = ByteString -> Int16 -> Maybe (ByteString, TType, Int16)
readField' ByteString
xs Int16
lastFieldId
             in
                case Maybe (ByteString, TType, Int16)
fieldContents of
                    Maybe (ByteString, TType, Int16)
Nothing -> (PageTypeHeader
hdr, Int -> ByteString -> ByteString
BS.drop Int
1 ByteString
xs)
                    Just (ByteString
rem, TType
elemType, Int16
identifier) -> case Int16
identifier of
                        Int16
1 ->
                            let
                                (Int32
numValues, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
                             in
                                PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader (PageTypeHeader
hdr{dataPageHeaderV2NumValues = numValues}) ByteString
rem' Int16
identifier
                        Int16
2 ->
                            let
                                (Int32
numNulls, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
                             in
                                PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader (PageTypeHeader
hdr{dataPageHeaderV2NumNulls = numNulls}) ByteString
rem' Int16
identifier
                        Int16
3 ->
                            let
                                (Int32
numRows, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
                             in
                                PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader (PageTypeHeader
hdr{dataPageHeaderV2NumRows = numRows}) ByteString
rem' Int16
identifier
                        Int16
4 ->
                            let
                                (Int32
enc, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
                             in
                                PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader
                                    (PageTypeHeader
hdr{dataPageHeaderV2Encoding = parquetEncodingFromInt enc})
                                    ByteString
rem'
                                    Int16
identifier
                        Int16
5 ->
                            let
                                (Int32
n, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
                             in
                                PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader (PageTypeHeader
hdr{definitionLevelByteLength = n}) ByteString
rem' Int16
identifier
                        Int16
6 ->
                            let
                                (Int32
n, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
                             in
                                PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader (PageTypeHeader
hdr{repetitionLevelByteLength = n}) ByteString
rem' Int16
identifier
                        Int16
7 ->
                            let
                                (Bool
isCompressed, ByteString
rem') = case ByteString -> Maybe (Word8, ByteString)
BS.uncons ByteString
rem of
                                    Just (Word8
b, ByteString
bytes) -> ((Word8
b Word8 -> Word8 -> Word8
forall a. Bits a => a -> a -> a
.&. Word8
0x0f) Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
compactBooleanTrue, ByteString
bytes)
                                    Maybe (Word8, ByteString)
Nothing -> (Bool
True, ByteString
BS.empty)
                             in
                                PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader
                                    (PageTypeHeader
hdr{dataPageHeaderV2IsCompressed = isCompressed})
                                    ByteString
rem'
                                    Int16
identifier
                        Int16
8 ->
                            let
                                (ColumnStatistics
stats, ByteString
rem') = ColumnStatistics
-> ByteString -> Int16 -> (ColumnStatistics, ByteString)
readStatisticsFromBytes ColumnStatistics
emptyColumnStatistics ByteString
rem Int16
0
                             in
                                PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader
                                    (PageTypeHeader
hdr{dataPageHeaderV2Statistics = stats})
                                    ByteString
rem'
                                    Int16
identifier
                        Int16
n -> String -> (PageTypeHeader, ByteString)
forall a. HasCallStack => String -> a
error (String -> (PageTypeHeader, ByteString))
-> String -> (PageTypeHeader, ByteString)
forall a b. (a -> b) -> a -> b
$ Int16 -> String
forall a. Show a => a -> String
show Int16
n

readField' :: BS.ByteString -> Int16 -> Maybe (BS.ByteString, TType, Int16)
readField' :: ByteString -> Int16 -> Maybe (ByteString, TType, Int16)
readField' ByteString
bs Int16
lastFieldId = case ByteString -> Maybe (Word8, ByteString)
BS.uncons ByteString
bs of
    Maybe (Word8, ByteString)
Nothing -> Maybe (ByteString, TType, Int16)
forall a. Maybe a
Nothing
    Just (Word8
x, ByteString
xs) ->
        if Word8
x Word8 -> Word8 -> Word8
forall a. Bits a => a -> a -> a
.&. Word8
0x0f Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
0
            then Maybe (ByteString, TType, Int16)
forall a. Maybe a
Nothing
            else
                let modifier :: Int16
modifier = Word8 -> Int16
forall a b. (Integral a, Num b) => a -> b
fromIntegral ((Word8
x Word8 -> Word8 -> Word8
forall a. Bits a => a -> a -> a
.&. Word8
0xf0) Word8 -> Int -> Word8
forall a. Bits a => a -> Int -> a
`shiftR` Int
4) :: Int16
                    (Int16
identifier, ByteString
rem) =
                        if Int16
modifier Int16 -> Int16 -> Bool
forall a. Eq a => a -> a -> Bool
== Int16
0
                            then forall a. Integral a => ByteString -> (a, ByteString)
readIntFromBytes @Int16 ByteString
xs
                            else (Int16
lastFieldId Int16 -> Int16 -> Int16
forall a. Num a => a -> a -> a
+ Int16
modifier, ByteString
xs)
                    elemType :: TType
elemType = Word8 -> TType
toTType (Word8
x Word8 -> Word8 -> Word8
forall a. Bits a => a -> a -> a
.&. Word8
0x0f)
                 in (ByteString, TType, Int16) -> Maybe (ByteString, TType, Int16)
forall a. a -> Maybe a
Just (ByteString
rem, TType
elemType, Int16
identifier)

readAllPages :: CompressionCodec -> BS.ByteString -> IO [Page]
readAllPages :: CompressionCodec -> ByteString -> IO [Page]
readAllPages CompressionCodec
codec ByteString
bytes = ByteString -> [Page] -> IO [Page]
go ByteString
bytes []
  where
    go :: ByteString -> [Page] -> IO [Page]
go ByteString
bs [Page]
acc =
        if ByteString -> Bool
BS.null ByteString
bs
            then [Page] -> IO [Page]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Page] -> [Page]
forall a. [a] -> [a]
reverse [Page]
acc)
            else do
                (Maybe Page
maybePage, ByteString
remaining) <- CompressionCodec -> ByteString -> IO (Maybe Page, ByteString)
readPage CompressionCodec
codec ByteString
bs
                case Maybe Page
maybePage of
                    Maybe Page
Nothing -> [Page] -> IO [Page]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Page] -> [Page]
forall a. [a] -> [a]
reverse [Page]
acc)
                    Just Page
page -> ByteString -> [Page] -> IO [Page]
go ByteString
remaining (Page
page Page -> [Page] -> [Page]
forall a. a -> [a] -> [a]
: [Page]
acc)

-- | Read n Int32 values directly into an unboxed vector (no intermediate list).
readNInt32Vec :: Int -> BS.ByteString -> VU.Vector Int32
readNInt32Vec :: Int -> ByteString -> Vector Int32
readNInt32Vec Int
n ByteString
bs = Int -> (Int -> Int32) -> Vector Int32
forall a. Unbox a => Int -> (Int -> a) -> Vector a
VU.generate Int
n (\Int
i -> ByteString -> Int32
littleEndianInt32 (Int -> ByteString -> ByteString
BS.drop (Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
i) ByteString
bs))

-- | Read n Int64 values directly into an unboxed vector.
readNInt64Vec :: Int -> BS.ByteString -> VU.Vector Int64
readNInt64Vec :: Int -> ByteString -> Vector Int64
readNInt64Vec Int
n ByteString
bs = Int -> (Int -> Int64) -> Vector Int64
forall a. Unbox a => Int -> (Int -> a) -> Vector a
VU.generate Int
n (\Int
i -> Word64 -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteString -> Word64
littleEndianWord64 (Int -> ByteString -> ByteString
BS.drop (Int
8 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
i) ByteString
bs)))

-- | Read n Float values directly into an unboxed vector.
readNFloatVec :: Int -> BS.ByteString -> VU.Vector Float
readNFloatVec :: Int -> ByteString -> Vector Float
readNFloatVec Int
n ByteString
bs =
    Int -> (Int -> Float) -> Vector Float
forall a. Unbox a => Int -> (Int -> a) -> Vector a
VU.generate
        Int
n
        (\Int
i -> Word32 -> Float
castWord32ToFloat (ByteString -> Word32
littleEndianWord32 (Int -> ByteString -> ByteString
BS.drop (Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
i) ByteString
bs)))

-- | Read n Double values directly into an unboxed vector.
readNDoubleVec :: Int -> BS.ByteString -> VU.Vector Double
readNDoubleVec :: Int -> ByteString -> Vector Double
readNDoubleVec Int
n ByteString
bs =
    Int -> (Int -> Double) -> Vector Double
forall a. Unbox a => Int -> (Int -> a) -> Vector a
VU.generate
        Int
n
        (\Int
i -> Word64 -> Double
castWord64ToDouble (ByteString -> Word64
littleEndianWord64 (Int -> ByteString -> ByteString
BS.drop (Int
8 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
i) ByteString
bs)))

readNInt32 :: Int -> BS.ByteString -> ([Int32], BS.ByteString)
readNInt32 :: Int -> ByteString -> ([Int32], ByteString)
readNInt32 Int
0 ByteString
bs = ([], ByteString
bs)
readNInt32 Int
k ByteString
bs =
    let x :: Int32
x = ByteString -> Int32
littleEndianInt32 (Int -> ByteString -> ByteString
BS.take Int
4 ByteString
bs)
        bs' :: ByteString
bs' = Int -> ByteString -> ByteString
BS.drop Int
4 ByteString
bs
        ([Int32]
xs, ByteString
rest) = Int -> ByteString -> ([Int32], ByteString)
readNInt32 (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) ByteString
bs'
     in (Int32
x Int32 -> [Int32] -> [Int32]
forall a. a -> [a] -> [a]
: [Int32]
xs, ByteString
rest)

readNDouble :: Int -> BS.ByteString -> ([Double], BS.ByteString)
readNDouble :: Int -> ByteString -> ([Double], ByteString)
readNDouble Int
0 ByteString
bs = ([], ByteString
bs)
readNDouble Int
k ByteString
bs =
    let x :: Double
x = Word64 -> Double
castWord64ToDouble (ByteString -> Word64
littleEndianWord64 (Int -> ByteString -> ByteString
BS.take Int
8 ByteString
bs))
        bs' :: ByteString
bs' = Int -> ByteString -> ByteString
BS.drop Int
8 ByteString
bs
        ([Double]
xs, ByteString
rest) = Int -> ByteString -> ([Double], ByteString)
readNDouble (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) ByteString
bs'
     in (Double
x Double -> [Double] -> [Double]
forall a. a -> [a] -> [a]
: [Double]
xs, ByteString
rest)

readNByteArrays :: Int -> BS.ByteString -> ([BS.ByteString], BS.ByteString)
readNByteArrays :: Int -> ByteString -> ([ByteString], ByteString)
readNByteArrays Int
0 ByteString
bs = ([], ByteString
bs)
readNByteArrays Int
k ByteString
bs =
    let len :: Int
len = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteString -> Int32
littleEndianInt32 (Int -> ByteString -> ByteString
BS.take Int
4 ByteString
bs)) :: Int
        body :: ByteString
body = Int -> ByteString -> ByteString
BS.take Int
len (Int -> ByteString -> ByteString
BS.drop Int
4 ByteString
bs)
        bs' :: ByteString
bs' = Int -> ByteString -> ByteString
BS.drop (Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
len) ByteString
bs
        ([ByteString]
xs, ByteString
rest) = Int -> ByteString -> ([ByteString], ByteString)
readNByteArrays (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) ByteString
bs'
     in (ByteString
body ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: [ByteString]
xs, ByteString
rest)

readNBool :: Int -> BS.ByteString -> ([Bool], BS.ByteString)
readNBool :: Int -> ByteString -> ([Bool], ByteString)
readNBool Int
0 ByteString
bs = ([], ByteString
bs)
readNBool Int
count ByteString
bs =
    let totalBytes :: Int
totalBytes = (Int
count Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
7) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
8
        chunk :: ByteString
chunk = Int -> ByteString -> ByteString
BS.take Int
totalBytes ByteString
bs
        rest :: ByteString
rest = Int -> ByteString -> ByteString
BS.drop Int
totalBytes ByteString
bs
        bits :: [Bool]
bits =
            (Word8 -> [Bool]) -> [Word8] -> [Bool]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap
                (\Word8
b -> (Int -> Bool) -> [Int] -> [Bool]
forall a b. (a -> b) -> [a] -> [b]
map (\Int
i -> (Word8
b Word8 -> Int -> Word8
forall a. Bits a => a -> Int -> a
`shiftR` Int
i) Word8 -> Word8 -> Word8
forall a. Bits a => a -> a -> a
.&. Word8
1 Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
1) [Int
0 .. Int
7])
                (ByteString -> [Word8]
BS.unpack ByteString
chunk)
        bools :: [Bool]
bools = Int -> [Bool] -> [Bool]
forall a. Int -> [a] -> [a]
take Int
count [Bool]
bits
     in ([Bool]
bools, ByteString
rest)

readNInt64 :: Int -> BS.ByteString -> ([Int64], BS.ByteString)
readNInt64 :: Int -> ByteString -> ([Int64], ByteString)
readNInt64 Int
0 ByteString
bs = ([], ByteString
bs)
readNInt64 Int
k ByteString
bs =
    let x :: Int64
x = Word64 -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteString -> Word64
littleEndianWord64 (Int -> ByteString -> ByteString
BS.take Int
8 ByteString
bs))
        bs' :: ByteString
bs' = Int -> ByteString -> ByteString
BS.drop Int
8 ByteString
bs
        ([Int64]
xs, ByteString
rest) = Int -> ByteString -> ([Int64], ByteString)
readNInt64 (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) ByteString
bs'
     in (Int64
x Int64 -> [Int64] -> [Int64]
forall a. a -> [a] -> [a]
: [Int64]
xs, ByteString
rest)

readNFloat :: Int -> BS.ByteString -> ([Float], BS.ByteString)
readNFloat :: Int -> ByteString -> ([Float], ByteString)
readNFloat Int
0 ByteString
bs = ([], ByteString
bs)
readNFloat Int
k ByteString
bs =
    let x :: Float
x = Word32 -> Float
castWord32ToFloat (ByteString -> Word32
littleEndianWord32 (Int -> ByteString -> ByteString
BS.take Int
4 ByteString
bs))
        bs' :: ByteString
bs' = Int -> ByteString -> ByteString
BS.drop Int
4 ByteString
bs
        ([Float]
xs, ByteString
rest) = Int -> ByteString -> ([Float], ByteString)
readNFloat (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) ByteString
bs'
     in (Float
x Float -> [Float] -> [Float]
forall a. a -> [a] -> [a]
: [Float]
xs, ByteString
rest)

splitFixed :: Int -> Int -> BS.ByteString -> ([BS.ByteString], BS.ByteString)
splitFixed :: Int -> Int -> ByteString -> ([ByteString], ByteString)
splitFixed Int
0 Int
_ ByteString
bs = ([], ByteString
bs)
splitFixed Int
k Int
len ByteString
bs =
    let body :: ByteString
body = Int -> ByteString -> ByteString
BS.take Int
len ByteString
bs
        bs' :: ByteString
bs' = Int -> ByteString -> ByteString
BS.drop Int
len ByteString
bs
        ([ByteString]
xs, ByteString
rest) = Int -> Int -> ByteString -> ([ByteString], ByteString)
splitFixed (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) Int
len ByteString
bs'
     in (ByteString
body ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: [ByteString]
xs, ByteString
rest)

readStatisticsFromBytes ::
    ColumnStatistics -> BS.ByteString -> Int16 -> (ColumnStatistics, BS.ByteString)
readStatisticsFromBytes :: ColumnStatistics
-> ByteString -> Int16 -> (ColumnStatistics, ByteString)
readStatisticsFromBytes ColumnStatistics
cs ByteString
xs Int16
lastFieldId =
    let
        fieldContents :: Maybe (ByteString, TType, Int16)
fieldContents = ByteString -> Int16 -> Maybe (ByteString, TType, Int16)
readField' ByteString
xs Int16
lastFieldId
     in
        case Maybe (ByteString, TType, Int16)
fieldContents of
            Maybe (ByteString, TType, Int16)
Nothing -> (ColumnStatistics
cs, Int -> ByteString -> ByteString
BS.drop Int
1 ByteString
xs)
            Just (ByteString
rem, TType
elemType, Int16
identifier) -> case Int16
identifier of
                Int16
1 ->
                    let
                        (ByteString
maxInBytes, ByteString
rem') = ByteString -> (ByteString, ByteString)
readByteStringFromBytes ByteString
rem
                     in
                        ColumnStatistics
-> ByteString -> Int16 -> (ColumnStatistics, ByteString)
readStatisticsFromBytes (ColumnStatistics
cs{columnMax = maxInBytes}) ByteString
rem' Int16
identifier
                Int16
2 ->
                    let
                        (ByteString
minInBytes, ByteString
rem') = ByteString -> (ByteString, ByteString)
readByteStringFromBytes ByteString
rem
                     in
                        ColumnStatistics
-> ByteString -> Int16 -> (ColumnStatistics, ByteString)
readStatisticsFromBytes (ColumnStatistics
cs{columnMin = minInBytes}) ByteString
rem' Int16
identifier
                Int16
3 ->
                    let
                        (Int64
nullCount, ByteString
rem') = forall a. Integral a => ByteString -> (a, ByteString)
readIntFromBytes @Int64 ByteString
rem
                     in
                        ColumnStatistics
-> ByteString -> Int16 -> (ColumnStatistics, ByteString)
readStatisticsFromBytes (ColumnStatistics
cs{columnNullCount = nullCount}) ByteString
rem' Int16
identifier
                Int16
4 ->
                    let
                        (Int64
distinctCount, ByteString
rem') = forall a. Integral a => ByteString -> (a, ByteString)
readIntFromBytes @Int64 ByteString
rem
                     in
                        ColumnStatistics
-> ByteString -> Int16 -> (ColumnStatistics, ByteString)
readStatisticsFromBytes (ColumnStatistics
cs{columnDistictCount = distinctCount}) ByteString
rem' Int16
identifier
                Int16
5 ->
                    let
                        (ByteString
maxInBytes, ByteString
rem') = ByteString -> (ByteString, ByteString)
readByteStringFromBytes ByteString
rem
                     in
                        ColumnStatistics
-> ByteString -> Int16 -> (ColumnStatistics, ByteString)
readStatisticsFromBytes (ColumnStatistics
cs{columnMaxValue = maxInBytes}) ByteString
rem' Int16
identifier
                Int16
6 ->
                    let
                        (ByteString
minInBytes, ByteString
rem') = ByteString -> (ByteString, ByteString)
readByteStringFromBytes ByteString
rem
                     in
                        ColumnStatistics
-> ByteString -> Int16 -> (ColumnStatistics, ByteString)
readStatisticsFromBytes (ColumnStatistics
cs{columnMinValue = minInBytes}) ByteString
rem' Int16
identifier
                Int16
7 ->
                    case ByteString -> Maybe (Word8, ByteString)
BS.uncons ByteString
rem of
                        Maybe (Word8, ByteString)
Nothing ->
                            String -> (ColumnStatistics, ByteString)
forall a. HasCallStack => String -> a
error String
"readStatisticsFromBytes: not enough bytes"
                        Just (Word8
isMaxValueExact, ByteString
rem') ->
                            ColumnStatistics
-> ByteString -> Int16 -> (ColumnStatistics, ByteString)
readStatisticsFromBytes
                                (ColumnStatistics
cs{isColumnMaxValueExact = isMaxValueExact == compactBooleanTrue})
                                ByteString
rem'
                                Int16
identifier
                Int16
8 ->
                    case ByteString -> Maybe (Word8, ByteString)
BS.uncons ByteString
rem of
                        Maybe (Word8, ByteString)
Nothing ->
                            String -> (ColumnStatistics, ByteString)
forall a. HasCallStack => String -> a
error String
"readStatisticsFromBytes: not enough bytes"
                        Just (Word8
isMinValueExact, ByteString
rem') ->
                            ColumnStatistics
-> ByteString -> Int16 -> (ColumnStatistics, ByteString)
readStatisticsFromBytes
                                (ColumnStatistics
cs{isColumnMinValueExact = isMinValueExact == compactBooleanTrue})
                                ByteString
rem'
                                Int16
identifier
                Int16
n -> String -> (ColumnStatistics, ByteString)
forall a. HasCallStack => String -> a
error (String -> (ColumnStatistics, ByteString))
-> String -> (ColumnStatistics, ByteString)
forall a b. (a -> b) -> a -> b
$ Int16 -> String
forall a. Show a => a -> String
show Int16
n