{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeApplications #-}

module DataFrame.IO.Parquet.Page where

import Codec.Compression.Zstd.Streaming
import Data.Bits
import qualified Data.ByteString as BSO
import Data.Int
import Data.Maybe (fromMaybe, listToMaybe)
import Data.Word
import DataFrame.IO.Parquet.Binary
import DataFrame.IO.Parquet.Thrift
import DataFrame.IO.Parquet.Types
import GHC.Float
import qualified Snappy

isDataPage :: Page -> Bool
isDataPage :: Page -> Bool
isDataPage Page
page = case PageHeader -> PageTypeHeader
pageTypeHeader (Page -> PageHeader
pageHeader Page
page) of
    DataPageHeader{Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderNumValues :: Int32
dataPageHeaderEncoding :: ParquetEncoding
definitionLevelEncoding :: ParquetEncoding
repetitionLevelEncoding :: ParquetEncoding
dataPageHeaderStatistics :: ColumnStatistics
dataPageHeaderStatistics :: PageTypeHeader -> ColumnStatistics
repetitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
definitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderNumValues :: PageTypeHeader -> Int32
..} -> Bool
True
    DataPageHeaderV2{Bool
Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderV2NumValues :: Int32
dataPageHeaderV2NumNulls :: Int32
dataPageHeaderV2NumRows :: Int32
dataPageHeaderV2Encoding :: ParquetEncoding
definitionLevelByteLength :: Int32
repetitionLevelByteLength :: Int32
dataPageHeaderV2IsCompressed :: Bool
dataPageHeaderV2Statistics :: ColumnStatistics
dataPageHeaderV2Statistics :: PageTypeHeader -> ColumnStatistics
dataPageHeaderV2IsCompressed :: PageTypeHeader -> Bool
repetitionLevelByteLength :: PageTypeHeader -> Int32
definitionLevelByteLength :: PageTypeHeader -> Int32
dataPageHeaderV2Encoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderV2NumRows :: PageTypeHeader -> Int32
dataPageHeaderV2NumNulls :: PageTypeHeader -> Int32
dataPageHeaderV2NumValues :: PageTypeHeader -> Int32
..} -> Bool
True
    PageTypeHeader
_ -> Bool
False

isDictionaryPage :: Page -> Bool
isDictionaryPage :: Page -> Bool
isDictionaryPage Page
page = case PageHeader -> PageTypeHeader
pageTypeHeader (Page -> PageHeader
pageHeader Page
page) of
    DictionaryPageHeader{Bool
Int32
ParquetEncoding
dictionaryPageHeaderNumValues :: Int32
dictionaryPageHeaderEncoding :: ParquetEncoding
dictionaryPageIsSorted :: Bool
dictionaryPageIsSorted :: PageTypeHeader -> Bool
dictionaryPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dictionaryPageHeaderNumValues :: PageTypeHeader -> Int32
..} -> Bool
True
    PageTypeHeader
_ -> Bool
False

readPage :: CompressionCodec -> [Word8] -> IO (Maybe Page, [Word8])
readPage :: CompressionCodec -> [Word8] -> IO (Maybe Page, [Word8])
readPage CompressionCodec
c [] = (Maybe Page, [Word8]) -> IO (Maybe Page, [Word8])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Page
forall a. Maybe a
Nothing, [])
readPage CompressionCodec
c [Word8]
columnBytes = do
    let (PageHeader
hdr, [Word8]
rem) = PageHeader -> [Word8] -> Int16 -> (PageHeader, [Word8])
readPageHeader PageHeader
emptyPageHeader [Word8]
columnBytes Int16
0
    let compressed :: [Word8]
compressed = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take (Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32 -> Int) -> Int32 -> Int
forall a b. (a -> b) -> a -> b
$ PageHeader -> Int32
compressedPageSize PageHeader
hdr) [Word8]
rem

    ByteString
fullData <- case CompressionCodec
c of
        CompressionCodec
ZSTD -> do
            Consume ByteString -> IO Result
dFunc <- IO Result
decompress
            Consume ByteString -> IO Result
dFunc' <- ByteString -> IO Result
dFunc ([Word8] -> ByteString
BSO.pack [Word8]
compressed)
            Done ByteString
res <- ByteString -> IO Result
dFunc' ByteString
BSO.empty
            ByteString -> IO ByteString
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
res
        CompressionCodec
SNAPPY -> case ByteString -> Either DecodeError ByteString
Snappy.decompress ([Word8] -> ByteString
BSO.pack [Word8]
compressed) of
            Left DecodeError
e -> [Char] -> IO ByteString
forall a. HasCallStack => [Char] -> a
error (DecodeError -> [Char]
forall a. Show a => a -> [Char]
show DecodeError
e)
            Right ByteString
res -> ByteString -> IO ByteString
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
res
        CompressionCodec
UNCOMPRESSED -> ByteString -> IO ByteString
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([Word8] -> ByteString
BSO.pack [Word8]
compressed)
        CompressionCodec
other -> [Char] -> IO ByteString
forall a. HasCallStack => [Char] -> a
error ([Char]
"Unsupported compression type: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ CompressionCodec -> [Char]
forall a. Show a => a -> [Char]
show CompressionCodec
other)
    (Maybe Page, [Word8]) -> IO (Maybe Page, [Word8])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
        ( Page -> Maybe Page
forall a. a -> Maybe a
Just (Page -> Maybe Page) -> Page -> Maybe Page
forall a b. (a -> b) -> a -> b
$ PageHeader -> [Word8] -> Page
Page PageHeader
hdr (ByteString -> [Word8]
BSO.unpack ByteString
fullData)
        , Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop (Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32 -> Int) -> Int32 -> Int
forall a b. (a -> b) -> a -> b
$ PageHeader -> Int32
compressedPageSize PageHeader
hdr) [Word8]
rem
        )

readPageHeader :: PageHeader -> [Word8] -> Int16 -> (PageHeader, [Word8])
readPageHeader :: PageHeader -> [Word8] -> Int16 -> (PageHeader, [Word8])
readPageHeader PageHeader
hdr [] Int16
_ = (PageHeader
hdr, [])
readPageHeader PageHeader
hdr [Word8]
xs Int16
lastFieldId =
    let
        fieldContents :: Maybe ([Word8], TType, Int16)
fieldContents = [Word8] -> Int16 -> Maybe ([Word8], TType, Int16)
readField' [Word8]
xs Int16
lastFieldId
     in
        case Maybe ([Word8], TType, Int16)
fieldContents of
            Maybe ([Word8], TType, Int16)
Nothing -> (PageHeader
hdr, Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
1 [Word8]
xs)
            Just ([Word8]
rem, TType
elemType, Int16
identifier) -> case Int16
identifier of
                Int16
1 ->
                    let
                        (Int32
pType, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageHeader -> [Word8] -> Int16 -> (PageHeader, [Word8])
readPageHeader (PageHeader
hdr{pageHeaderPageType = pageTypeFromInt pType}) [Word8]
rem' Int16
identifier
                Int16
2 ->
                    let
                        (Int32
uncompressedPageSize, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageHeader -> [Word8] -> Int16 -> (PageHeader, [Word8])
readPageHeader
                            (PageHeader
hdr{uncompressedPageSize = uncompressedPageSize})
                            [Word8]
rem'
                            Int16
identifier
                Int16
3 ->
                    let
                        (Int32
compressedPageSize, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageHeader -> [Word8] -> Int16 -> (PageHeader, [Word8])
readPageHeader (PageHeader
hdr{compressedPageSize = compressedPageSize}) [Word8]
rem' Int16
identifier
                Int16
4 ->
                    let
                        (Int32
crc, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageHeader -> [Word8] -> Int16 -> (PageHeader, [Word8])
readPageHeader (PageHeader
hdr{pageHeaderCrcChecksum = crc}) [Word8]
rem' Int16
identifier
                Int16
5 ->
                    let
                        (PageTypeHeader
dataPageHeader, [Word8]
rem') = PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader PageTypeHeader
emptyDataPageHeader [Word8]
rem Int16
0
                     in
                        PageHeader -> [Word8] -> Int16 -> (PageHeader, [Word8])
readPageHeader (PageHeader
hdr{pageTypeHeader = dataPageHeader}) [Word8]
rem' Int16
identifier
                Int16
6 -> [Char] -> (PageHeader, [Word8])
forall a. HasCallStack => [Char] -> a
error [Char]
"Index page header not supported"
                Int16
7 ->
                    let
                        (PageTypeHeader
dictionaryPageHeader, [Word8]
rem') = PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader PageTypeHeader
emptyDictionaryPageHeader [Word8]
rem Int16
0
                     in
                        PageHeader -> [Word8] -> Int16 -> (PageHeader, [Word8])
readPageHeader (PageHeader
hdr{pageTypeHeader = dictionaryPageHeader}) [Word8]
rem' Int16
identifier
                Int16
8 ->
                    let
                        (PageTypeHeader
dataPageHeaderV2, [Word8]
rem') = PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader PageTypeHeader
emptyDataPageHeaderV2 [Word8]
rem Int16
0
                     in
                        PageHeader -> [Word8] -> Int16 -> (PageHeader, [Word8])
readPageHeader (PageHeader
hdr{pageTypeHeader = dataPageHeaderV2}) [Word8]
rem' Int16
identifier
                Int16
n -> [Char] -> (PageHeader, [Word8])
forall a. HasCallStack => [Char] -> a
error ([Char] -> (PageHeader, [Word8]))
-> [Char] -> (PageHeader, [Word8])
forall a b. (a -> b) -> a -> b
$ [Char]
"Unknown page header field" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int16 -> [Char]
forall a. Show a => a -> [Char]
show Int16
n

readPageTypeHeader ::
    PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader :: PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader PageTypeHeader
hdr [] Int16
_ = (PageTypeHeader
hdr, [])
readPageTypeHeader PageTypeHeader
INDEX_PAGE_HEADER [Word8]
_ Int16
_ = [Char] -> (PageTypeHeader, [Word8])
forall a. HasCallStack => [Char] -> a
error [Char]
"readPageTypeHeader: unsupported INDEX_PAGE_HEADER"
readPageTypeHeader PageTypeHeader
PAGE_TYPE_HEADER_UNKNOWN [Word8]
_ Int16
_ = [Char] -> (PageTypeHeader, [Word8])
forall a. HasCallStack => [Char] -> a
error [Char]
"readPageTypeHeader: unsupported PAGE_TYPE_HEADER_UNKNOWN"
readPageTypeHeader hdr :: PageTypeHeader
hdr@(DictionaryPageHeader{Bool
Int32
ParquetEncoding
dictionaryPageIsSorted :: PageTypeHeader -> Bool
dictionaryPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dictionaryPageHeaderNumValues :: PageTypeHeader -> Int32
dictionaryPageHeaderNumValues :: Int32
dictionaryPageHeaderEncoding :: ParquetEncoding
dictionaryPageIsSorted :: Bool
..}) [Word8]
xs Int16
lastFieldId =
    let
        fieldContents :: Maybe ([Word8], TType, Int16)
fieldContents = [Word8] -> Int16 -> Maybe ([Word8], TType, Int16)
readField' [Word8]
xs Int16
lastFieldId
     in
        case Maybe ([Word8], TType, Int16)
fieldContents of
            Maybe ([Word8], TType, Int16)
Nothing -> (PageTypeHeader
hdr, Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
1 [Word8]
xs)
            Just ([Word8]
rem, TType
elemType, Int16
identifier) -> case Int16
identifier of
                Int16
1 ->
                    let
                        (Int32
numValues, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader
                            (PageTypeHeader
hdr{dictionaryPageHeaderNumValues = numValues})
                            [Word8]
rem'
                            Int16
identifier
                Int16
2 ->
                    let
                        (Int32
enc, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader
                            (PageTypeHeader
hdr{dictionaryPageHeaderEncoding = parquetEncodingFromInt enc})
                            [Word8]
rem'
                            Int16
identifier
                Int16
3 ->
                    let
                        isSorted :: Word8
isSorted = Word8 -> Maybe Word8 -> Word8
forall a. a -> Maybe a -> a
fromMaybe ([Char] -> Word8
forall a. HasCallStack => [Char] -> a
error [Char]
"readPageTypeHeader: not enough bytes") ([Word8] -> Maybe Word8
forall a. [a] -> Maybe a
listToMaybe [Word8]
rem)
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader
                            (PageTypeHeader
hdr{dictionaryPageIsSorted = isSorted == compactBooleanTrue})
                            (Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
1 [Word8]
rem)
                            Int16
identifier
                Int16
n ->
                    [Char] -> (PageTypeHeader, [Word8])
forall a. HasCallStack => [Char] -> a
error ([Char] -> (PageTypeHeader, [Word8]))
-> [Char] -> (PageTypeHeader, [Word8])
forall a b. (a -> b) -> a -> b
$ [Char]
"readPageTypeHeader: unsupported identifier " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int16 -> [Char]
forall a. Show a => a -> [Char]
show Int16
n
readPageTypeHeader hdr :: PageTypeHeader
hdr@(DataPageHeader{Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderStatistics :: PageTypeHeader -> ColumnStatistics
repetitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
definitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderNumValues :: PageTypeHeader -> Int32
dataPageHeaderNumValues :: Int32
dataPageHeaderEncoding :: ParquetEncoding
definitionLevelEncoding :: ParquetEncoding
repetitionLevelEncoding :: ParquetEncoding
dataPageHeaderStatistics :: ColumnStatistics
..}) [Word8]
xs Int16
lastFieldId =
    let
        fieldContents :: Maybe ([Word8], TType, Int16)
fieldContents = [Word8] -> Int16 -> Maybe ([Word8], TType, Int16)
readField' [Word8]
xs Int16
lastFieldId
     in
        case Maybe ([Word8], TType, Int16)
fieldContents of
            Maybe ([Word8], TType, Int16)
Nothing -> (PageTypeHeader
hdr, Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
1 [Word8]
xs)
            Just ([Word8]
rem, TType
elemType, Int16
identifier) -> case Int16
identifier of
                Int16
1 ->
                    let
                        (Int32
numValues, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader (PageTypeHeader
hdr{dataPageHeaderNumValues = numValues}) [Word8]
rem' Int16
identifier
                Int16
2 ->
                    let
                        (Int32
enc, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader
                            (PageTypeHeader
hdr{dataPageHeaderEncoding = parquetEncodingFromInt enc})
                            [Word8]
rem'
                            Int16
identifier
                Int16
3 ->
                    let
                        (Int32
enc, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader
                            (PageTypeHeader
hdr{definitionLevelEncoding = parquetEncodingFromInt enc})
                            [Word8]
rem'
                            Int16
identifier
                Int16
4 ->
                    let
                        (Int32
enc, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader
                            (PageTypeHeader
hdr{repetitionLevelEncoding = parquetEncodingFromInt enc})
                            [Word8]
rem'
                            Int16
identifier
                Int16
5 ->
                    let
                        (ColumnStatistics
stats, [Word8]
rem') = ColumnStatistics -> [Word8] -> Int16 -> (ColumnStatistics, [Word8])
readStatisticsFromBytes ColumnStatistics
emptyColumnStatistics [Word8]
rem Int16
0
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader (PageTypeHeader
hdr{dataPageHeaderStatistics = stats}) [Word8]
rem' Int16
identifier
                Int16
n -> [Char] -> (PageTypeHeader, [Word8])
forall a. HasCallStack => [Char] -> a
error ([Char] -> (PageTypeHeader, [Word8]))
-> [Char] -> (PageTypeHeader, [Word8])
forall a b. (a -> b) -> a -> b
$ Int16 -> [Char]
forall a. Show a => a -> [Char]
show Int16
n
readPageTypeHeader hdr :: PageTypeHeader
hdr@(DataPageHeaderV2{Bool
Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderV2Statistics :: PageTypeHeader -> ColumnStatistics
dataPageHeaderV2IsCompressed :: PageTypeHeader -> Bool
repetitionLevelByteLength :: PageTypeHeader -> Int32
definitionLevelByteLength :: PageTypeHeader -> Int32
dataPageHeaderV2Encoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderV2NumRows :: PageTypeHeader -> Int32
dataPageHeaderV2NumNulls :: PageTypeHeader -> Int32
dataPageHeaderV2NumValues :: PageTypeHeader -> Int32
dataPageHeaderV2NumValues :: Int32
dataPageHeaderV2NumNulls :: Int32
dataPageHeaderV2NumRows :: Int32
dataPageHeaderV2Encoding :: ParquetEncoding
definitionLevelByteLength :: Int32
repetitionLevelByteLength :: Int32
dataPageHeaderV2IsCompressed :: Bool
dataPageHeaderV2Statistics :: ColumnStatistics
..}) [Word8]
xs Int16
lastFieldId =
    let
        fieldContents :: Maybe ([Word8], TType, Int16)
fieldContents = [Word8] -> Int16 -> Maybe ([Word8], TType, Int16)
readField' [Word8]
xs Int16
lastFieldId
     in
        case Maybe ([Word8], TType, Int16)
fieldContents of
            Maybe ([Word8], TType, Int16)
Nothing -> (PageTypeHeader
hdr, Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
1 [Word8]
xs)
            Just ([Word8]
rem, TType
elemType, Int16
identifier) -> case Int16
identifier of
                Int16
1 ->
                    let
                        (Int32
numValues, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader (PageTypeHeader
hdr{dataPageHeaderV2NumValues = numValues}) [Word8]
rem' Int16
identifier
                Int16
2 ->
                    let
                        (Int32
numNulls, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader (PageTypeHeader
hdr{dataPageHeaderV2NumNulls = numNulls}) [Word8]
rem' Int16
identifier
                Int16
3 ->
                    let
                        (Int32
numRows, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader (PageTypeHeader
hdr{dataPageHeaderV2NumRows = numRows}) [Word8]
rem' Int16
identifier
                Int16
4 ->
                    let
                        (Int32
enc, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader
                            (PageTypeHeader
hdr{dataPageHeaderV2Encoding = parquetEncodingFromInt enc})
                            [Word8]
rem'
                            Int16
identifier
                Int16
5 ->
                    let
                        (Int32
n, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader (PageTypeHeader
hdr{definitionLevelByteLength = n}) [Word8]
rem' Int16
identifier
                Int16
6 ->
                    let
                        (Int32
n, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader (PageTypeHeader
hdr{repetitionLevelByteLength = n}) [Word8]
rem' Int16
identifier
                Int16
7 ->
                    let
                        (Bool
isCompressed, [Word8]
rem') = case [Word8]
rem of
                            Word8
b : [Word8]
bytes -> ((Word8
b Word8 -> Word8 -> Word8
forall a. Bits a => a -> a -> a
.&. Word8
0x0f) Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
compactBooleanTrue, [Word8]
bytes)
                            [] -> (Bool
True, [])
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader
                            (PageTypeHeader
hdr{dataPageHeaderV2IsCompressed = isCompressed})
                            [Word8]
rem'
                            Int16
identifier
                Int16
8 ->
                    let
                        (ColumnStatistics
stats, [Word8]
rem') = ColumnStatistics -> [Word8] -> Int16 -> (ColumnStatistics, [Word8])
readStatisticsFromBytes ColumnStatistics
emptyColumnStatistics [Word8]
rem Int16
0
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader
                            (PageTypeHeader
hdr{dataPageHeaderV2Statistics = stats})
                            [Word8]
rem'
                            Int16
identifier
                Int16
n -> [Char] -> (PageTypeHeader, [Word8])
forall a. HasCallStack => [Char] -> a
error ([Char] -> (PageTypeHeader, [Word8]))
-> [Char] -> (PageTypeHeader, [Word8])
forall a b. (a -> b) -> a -> b
$ Int16 -> [Char]
forall a. Show a => a -> [Char]
show Int16
n

readField' :: [Word8] -> Int16 -> Maybe ([Word8], TType, Int16)
readField' :: [Word8] -> Int16 -> Maybe ([Word8], TType, Int16)
readField' [] Int16
_ = Maybe ([Word8], TType, Int16)
forall a. Maybe a
Nothing
readField' (Word8
x : [Word8]
xs) Int16
lastFieldId
    | Word8
x Word8 -> Word8 -> Word8
forall a. Bits a => a -> a -> a
.&. Word8
0x0f Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
0 = Maybe ([Word8], TType, Int16)
forall a. Maybe a
Nothing
    | Bool
otherwise =
        let modifier :: Int16
modifier = Word8 -> Int16
forall a b. (Integral a, Num b) => a -> b
fromIntegral ((Word8
x Word8 -> Word8 -> Word8
forall a. Bits a => a -> a -> a
.&. Word8
0xf0) Word8 -> Int -> Word8
forall a. Bits a => a -> Int -> a
`shiftR` Int
4) :: Int16
            (Int16
identifier, [Word8]
rem) =
                if Int16
modifier Int16 -> Int16 -> Bool
forall a. Eq a => a -> a -> Bool
== Int16
0
                    then forall a. Integral a => [Word8] -> (a, [Word8])
readIntFromBytes @Int16 [Word8]
xs
                    else (Int16
lastFieldId Int16 -> Int16 -> Int16
forall a. Num a => a -> a -> a
+ Int16
modifier, [Word8]
xs)
            elemType :: TType
elemType = Word8 -> TType
toTType (Word8
x Word8 -> Word8 -> Word8
forall a. Bits a => a -> a -> a
.&. Word8
0x0f)
         in ([Word8], TType, Int16) -> Maybe ([Word8], TType, Int16)
forall a. a -> Maybe a
Just ([Word8]
rem, TType
elemType, Int16
identifier)

readAllPages :: CompressionCodec -> [Word8] -> IO [Page]
readAllPages :: CompressionCodec -> [Word8] -> IO [Page]
readAllPages CompressionCodec
codec [Word8]
bytes = [Word8] -> [Page] -> IO [Page]
go [Word8]
bytes []
  where
    go :: [Word8] -> [Page] -> IO [Page]
go [] [Page]
acc = [Page] -> IO [Page]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Page] -> [Page]
forall a. [a] -> [a]
reverse [Page]
acc)
    go [Word8]
bs [Page]
acc = do
        (Maybe Page
maybePage, [Word8]
remaining) <- CompressionCodec -> [Word8] -> IO (Maybe Page, [Word8])
readPage CompressionCodec
codec [Word8]
bs
        case Maybe Page
maybePage of
            Maybe Page
Nothing -> [Page] -> IO [Page]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Page] -> [Page]
forall a. [a] -> [a]
reverse [Page]
acc)
            Just Page
page -> [Word8] -> [Page] -> IO [Page]
go [Word8]
remaining (Page
page Page -> [Page] -> [Page]
forall a. a -> [a] -> [a]
: [Page]
acc)

readNInt32 :: Int -> [Word8] -> ([Int32], [Word8])
readNInt32 :: Int -> [Word8] -> ([Int32], [Word8])
readNInt32 Int
0 [Word8]
bs = ([], [Word8]
bs)
readNInt32 Int
k [Word8]
bs =
    let x :: Int32
x = [Word8] -> Int32
littleEndianInt32 (Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take Int
4 [Word8]
bs)
        bs' :: [Word8]
bs' = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
4 [Word8]
bs
        ([Int32]
xs, [Word8]
rest) = Int -> [Word8] -> ([Int32], [Word8])
readNInt32 (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) [Word8]
bs'
     in (Int32
x Int32 -> [Int32] -> [Int32]
forall a. a -> [a] -> [a]
: [Int32]
xs, [Word8]
rest)

readNDouble :: Int -> [Word8] -> ([Double], [Word8])
readNDouble :: Int -> [Word8] -> ([Double], [Word8])
readNDouble Int
0 [Word8]
bs = ([], [Word8]
bs)
readNDouble Int
k [Word8]
bs =
    let x :: Double
x = Word64 -> Double
castWord64ToDouble ([Word8] -> Word64
littleEndianWord64 (Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take Int
8 [Word8]
bs))
        bs' :: [Word8]
bs' = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
8 [Word8]
bs
        ([Double]
xs, [Word8]
rest) = Int -> [Word8] -> ([Double], [Word8])
readNDouble (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) [Word8]
bs'
     in (Double
x Double -> [Double] -> [Double]
forall a. a -> [a] -> [a]
: [Double]
xs, [Word8]
rest)

readNByteArrays :: Int -> [Word8] -> ([[Word8]], [Word8])
readNByteArrays :: Int -> [Word8] -> ([[Word8]], [Word8])
readNByteArrays Int
0 [Word8]
bs = ([], [Word8]
bs)
readNByteArrays Int
k [Word8]
bs =
    let len :: Int
len = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral ([Word8] -> Int32
littleEndianInt32 (Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take Int
4 [Word8]
bs)) :: Int
        body :: [Word8]
body = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take Int
len (Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
4 [Word8]
bs)
        bs' :: [Word8]
bs' = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop (Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
len) [Word8]
bs
        ([[Word8]]
xs, [Word8]
rest) = Int -> [Word8] -> ([[Word8]], [Word8])
readNByteArrays (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) [Word8]
bs'
     in ([Word8]
body [Word8] -> [[Word8]] -> [[Word8]]
forall a. a -> [a] -> [a]
: [[Word8]]
xs, [Word8]
rest)

readNBool :: Int -> [Word8] -> ([Bool], [Word8])
readNBool :: Int -> [Word8] -> ([Bool], [Word8])
readNBool Int
0 [Word8]
bs = ([], [Word8]
bs)
readNBool Int
count [Word8]
bs =
    let totalBytes :: Int
totalBytes = (Int
count Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
7) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
8
        chunk :: [Word8]
chunk = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take Int
totalBytes [Word8]
bs
        rest :: [Word8]
rest = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
totalBytes [Word8]
bs
        bits :: [Bool]
bits = (Word8 -> [Bool]) -> [Word8] -> [Bool]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (\Word8
b -> (Int -> Bool) -> [Int] -> [Bool]
forall a b. (a -> b) -> [a] -> [b]
map (\Int
i -> (Word8
b Word8 -> Int -> Word8
forall a. Bits a => a -> Int -> a
`shiftR` Int
i) Word8 -> Word8 -> Word8
forall a. Bits a => a -> a -> a
.&. Word8
1 Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
1) [Int
0 .. Int
7]) [Word8]
chunk
        bools :: [Bool]
bools = Int -> [Bool] -> [Bool]
forall a. Int -> [a] -> [a]
take Int
count [Bool]
bits
     in ([Bool]
bools, [Word8]
rest)

readNInt64 :: Int -> [Word8] -> ([Int64], [Word8])
readNInt64 :: Int -> [Word8] -> ([Int64], [Word8])
readNInt64 Int
0 [Word8]
bs = ([], [Word8]
bs)
readNInt64 Int
k [Word8]
bs =
    let x :: Int64
x = Word64 -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral ([Word8] -> Word64
littleEndianWord64 (Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take Int
8 [Word8]
bs))
        bs' :: [Word8]
bs' = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
8 [Word8]
bs
        ([Int64]
xs, [Word8]
rest) = Int -> [Word8] -> ([Int64], [Word8])
readNInt64 (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) [Word8]
bs'
     in (Int64
x Int64 -> [Int64] -> [Int64]
forall a. a -> [a] -> [a]
: [Int64]
xs, [Word8]
rest)

readNFloat :: Int -> [Word8] -> ([Float], [Word8])
readNFloat :: Int -> [Word8] -> ([Float], [Word8])
readNFloat Int
0 [Word8]
bs = ([], [Word8]
bs)
readNFloat Int
k [Word8]
bs =
    let x :: Float
x = Word32 -> Float
castWord32ToFloat ([Word8] -> Word32
littleEndianWord32 (Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take Int
4 [Word8]
bs))
        bs' :: [Word8]
bs' = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
4 [Word8]
bs
        ([Float]
xs, [Word8]
rest) = Int -> [Word8] -> ([Float], [Word8])
readNFloat (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) [Word8]
bs'
     in (Float
x Float -> [Float] -> [Float]
forall a. a -> [a] -> [a]
: [Float]
xs, [Word8]
rest)

splitFixed :: Int -> Int -> [Word8] -> ([[Word8]], [Word8])
splitFixed :: Int -> Int -> [Word8] -> ([[Word8]], [Word8])
splitFixed Int
0 Int
_ [Word8]
bs = ([], [Word8]
bs)
splitFixed Int
k Int
len [Word8]
bs =
    let body :: [Word8]
body = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take Int
len [Word8]
bs
        bs' :: [Word8]
bs' = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
len [Word8]
bs
        ([[Word8]]
xs, [Word8]
rest) = Int -> Int -> [Word8] -> ([[Word8]], [Word8])
splitFixed (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) Int
len [Word8]
bs'
     in ([Word8]
body [Word8] -> [[Word8]] -> [[Word8]]
forall a. a -> [a] -> [a]
: [[Word8]]
xs, [Word8]
rest)

readStatisticsFromBytes ::
    ColumnStatistics -> [Word8] -> Int16 -> (ColumnStatistics, [Word8])
readStatisticsFromBytes :: ColumnStatistics -> [Word8] -> Int16 -> (ColumnStatistics, [Word8])
readStatisticsFromBytes ColumnStatistics
cs [Word8]
xs Int16
lastFieldId =
    let
        fieldContents :: Maybe ([Word8], TType, Int16)
fieldContents = [Word8] -> Int16 -> Maybe ([Word8], TType, Int16)
readField' [Word8]
xs Int16
lastFieldId
     in
        case Maybe ([Word8], TType, Int16)
fieldContents of
            Maybe ([Word8], TType, Int16)
Nothing -> (ColumnStatistics
cs, Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
1 [Word8]
xs)
            Just ([Word8]
rem, TType
elemType, Int16
identifier) -> case Int16
identifier of
                Int16
1 ->
                    let
                        ([Word8]
maxInBytes, [Word8]
rem') = [Word8] -> ([Word8], [Word8])
readByteStringFromBytes [Word8]
rem
                     in
                        ColumnStatistics -> [Word8] -> Int16 -> (ColumnStatistics, [Word8])
readStatisticsFromBytes (ColumnStatistics
cs{columnMax = maxInBytes}) [Word8]
rem' Int16
identifier
                Int16
2 ->
                    let
                        ([Word8]
minInBytes, [Word8]
rem') = [Word8] -> ([Word8], [Word8])
readByteStringFromBytes [Word8]
rem
                     in
                        ColumnStatistics -> [Word8] -> Int16 -> (ColumnStatistics, [Word8])
readStatisticsFromBytes (ColumnStatistics
cs{columnMin = minInBytes}) [Word8]
rem' Int16
identifier
                Int16
3 ->
                    let
                        (Int64
nullCount, [Word8]
rem') = forall a. Integral a => [Word8] -> (a, [Word8])
readIntFromBytes @Int64 [Word8]
rem
                     in
                        ColumnStatistics -> [Word8] -> Int16 -> (ColumnStatistics, [Word8])
readStatisticsFromBytes (ColumnStatistics
cs{columnNullCount = nullCount}) [Word8]
rem' Int16
identifier
                Int16
4 ->
                    let
                        (Int64
distinctCount, [Word8]
rem') = forall a. Integral a => [Word8] -> (a, [Word8])
readIntFromBytes @Int64 [Word8]
rem
                     in
                        ColumnStatistics -> [Word8] -> Int16 -> (ColumnStatistics, [Word8])
readStatisticsFromBytes (ColumnStatistics
cs{columnDistictCount = distinctCount}) [Word8]
rem' Int16
identifier
                Int16
5 ->
                    let
                        ([Word8]
maxInBytes, [Word8]
rem') = [Word8] -> ([Word8], [Word8])
readByteStringFromBytes [Word8]
rem
                     in
                        ColumnStatistics -> [Word8] -> Int16 -> (ColumnStatistics, [Word8])
readStatisticsFromBytes (ColumnStatistics
cs{columnMaxValue = maxInBytes}) [Word8]
rem' Int16
identifier
                Int16
6 ->
                    let
                        ([Word8]
minInBytes, [Word8]
rem') = [Word8] -> ([Word8], [Word8])
readByteStringFromBytes [Word8]
rem
                     in
                        ColumnStatistics -> [Word8] -> Int16 -> (ColumnStatistics, [Word8])
readStatisticsFromBytes (ColumnStatistics
cs{columnMinValue = minInBytes}) [Word8]
rem' Int16
identifier
                Int16
7 ->
                    case [Word8]
rem of
                        [] ->
                            [Char] -> (ColumnStatistics, [Word8])
forall a. HasCallStack => [Char] -> a
error [Char]
"readStatisticsFromBytes: not enough bytes"
                        (Word8
isMaxValueExact : [Word8]
rem') ->
                            ColumnStatistics -> [Word8] -> Int16 -> (ColumnStatistics, [Word8])
readStatisticsFromBytes
                                (ColumnStatistics
cs{isColumnMaxValueExact = isMaxValueExact == compactBooleanTrue})
                                [Word8]
rem'
                                Int16
identifier
                Int16
8 ->
                    case [Word8]
rem of
                        [] ->
                            [Char] -> (ColumnStatistics, [Word8])
forall a. HasCallStack => [Char] -> a
error [Char]
"readStatisticsFromBytes: not enough bytes"
                        (Word8
isMinValueExact : [Word8]
rem') ->
                            ColumnStatistics -> [Word8] -> Int16 -> (ColumnStatistics, [Word8])
readStatisticsFromBytes
                                (ColumnStatistics
cs{isColumnMinValueExact = isMinValueExact == compactBooleanTrue})
                                [Word8]
rem'
                                Int16
identifier
                Int16
n -> [Char] -> (ColumnStatistics, [Word8])
forall a. HasCallStack => [Char] -> a
error ([Char] -> (ColumnStatistics, [Word8]))
-> [Char] -> (ColumnStatistics, [Word8])
forall a b. (a -> b) -> a -> b
$ Int16 -> [Char]
forall a. Show a => a -> [Char]
show Int16
n