{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeApplications #-}

module DataFrame.IO.Parquet.Page where

import Codec.Compression.Zstd.Streaming
import Control.Monad
import Data.Bits
import qualified Data.ByteString as BSO
import Data.Char
import Data.Foldable
import Data.Int
import Data.List
import Data.Maybe
import qualified Data.Text as T
import Data.Word
import DataFrame.IO.Parquet.Binary
import DataFrame.IO.Parquet.Dictionary
import DataFrame.IO.Parquet.Levels
import DataFrame.IO.Parquet.Thrift
import DataFrame.IO.Parquet.Types
import qualified DataFrame.Internal.Column as DI
import GHC.Float
import qualified Snappy as Snappy
import Text.Printf

isDataPage :: Page -> Bool
isDataPage :: Page -> Bool
isDataPage Page
page = case PageHeader -> PageTypeHeader
pageTypeHeader (Page -> PageHeader
pageHeader Page
page) of
    DataPageHeader{Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderNumValues :: Int32
dataPageHeaderEncoding :: ParquetEncoding
definitionLevelEncoding :: ParquetEncoding
repetitionLevelEncoding :: ParquetEncoding
dataPageHeaderStatistics :: ColumnStatistics
dataPageHeaderStatistics :: PageTypeHeader -> ColumnStatistics
repetitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
definitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderNumValues :: PageTypeHeader -> Int32
..} -> Bool
True
    DataPageHeaderV2{Bool
Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderV2NumValues :: Int32
dataPageHeaderV2NumNulls :: Int32
dataPageHeaderV2NumRows :: Int32
dataPageHeaderV2Encoding :: ParquetEncoding
definitionLevelByteLength :: Int32
repetitionLevelByteLength :: Int32
dataPageHeaderV2IsCompressed :: Bool
dataPageHeaderV2Statistics :: ColumnStatistics
dataPageHeaderV2Statistics :: PageTypeHeader -> ColumnStatistics
dataPageHeaderV2IsCompressed :: PageTypeHeader -> Bool
repetitionLevelByteLength :: PageTypeHeader -> Int32
definitionLevelByteLength :: PageTypeHeader -> Int32
dataPageHeaderV2Encoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderV2NumRows :: PageTypeHeader -> Int32
dataPageHeaderV2NumNulls :: PageTypeHeader -> Int32
dataPageHeaderV2NumValues :: PageTypeHeader -> Int32
..} -> Bool
True
    PageTypeHeader
_ -> Bool
False

isDictionaryPage :: Page -> Bool
isDictionaryPage :: Page -> Bool
isDictionaryPage Page
page = case PageHeader -> PageTypeHeader
pageTypeHeader (Page -> PageHeader
pageHeader Page
page) of
    DictionaryPageHeader{Bool
Int32
ParquetEncoding
dictionaryPageHeaderNumValues :: Int32
dictionaryPageHeaderEncoding :: ParquetEncoding
dictionaryPageIsSorted :: Bool
dictionaryPageIsSorted :: PageTypeHeader -> Bool
dictionaryPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dictionaryPageHeaderNumValues :: PageTypeHeader -> Int32
..} -> Bool
True
    PageTypeHeader
_ -> Bool
False

readPage :: CompressionCodec -> [Word8] -> IO (Maybe Page, [Word8])
readPage :: CompressionCodec -> [Word8] -> IO (Maybe Page, [Word8])
readPage CompressionCodec
c [] = (Maybe Page, [Word8]) -> IO (Maybe Page, [Word8])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Page
forall a. Maybe a
Nothing, [])
readPage CompressionCodec
c [Word8]
columnBytes = do
    let (PageHeader
hdr, [Word8]
rem) = PageHeader -> [Word8] -> Int16 -> (PageHeader, [Word8])
readPageHeader PageHeader
emptyPageHeader [Word8]
columnBytes Int16
0
    let compressed :: [Word8]
compressed = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take (Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32 -> Int) -> Int32 -> Int
forall a b. (a -> b) -> a -> b
$ PageHeader -> Int32
compressedPageSize PageHeader
hdr) [Word8]
rem

    ByteString
fullData <- case CompressionCodec
c of
        CompressionCodec
ZSTD -> do
            Consume ByteString -> IO Result
dFunc <- IO Result
decompress
            Consume ByteString -> IO Result
dFunc' <- ByteString -> IO Result
dFunc ([Word8] -> ByteString
BSO.pack [Word8]
compressed)
            Done ByteString
res <- ByteString -> IO Result
dFunc' ByteString
BSO.empty
            ByteString -> IO ByteString
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
res
        CompressionCodec
SNAPPY -> case ByteString -> Either ParseException ByteString
Snappy.decompress ([Word8] -> ByteString
BSO.pack [Word8]
compressed) of
            Left ParseException
e -> [Char] -> IO ByteString
forall a. HasCallStack => [Char] -> a
error (ParseException -> [Char]
forall a. Show a => a -> [Char]
show ParseException
e)
            Right ByteString
res -> ByteString -> IO ByteString
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
res
        CompressionCodec
UNCOMPRESSED -> ByteString -> IO ByteString
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([Word8] -> ByteString
BSO.pack [Word8]
compressed)
        CompressionCodec
other -> [Char] -> IO ByteString
forall a. HasCallStack => [Char] -> a
error ([Char]
"Unsupported compression type: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ CompressionCodec -> [Char]
forall a. Show a => a -> [Char]
show CompressionCodec
other)
    (Maybe Page, [Word8]) -> IO (Maybe Page, [Word8])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((Maybe Page, [Word8]) -> IO (Maybe Page, [Word8]))
-> (Maybe Page, [Word8]) -> IO (Maybe Page, [Word8])
forall a b. (a -> b) -> a -> b
$ (Page -> Maybe Page
forall a. a -> Maybe a
Just (Page -> Maybe Page) -> Page -> Maybe Page
forall a b. (a -> b) -> a -> b
$ PageHeader -> [Word8] -> Page
Page PageHeader
hdr (ByteString -> [Word8]
BSO.unpack ByteString
fullData), Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop (Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32 -> Int) -> Int32 -> Int
forall a b. (a -> b) -> a -> b
$ PageHeader -> Int32
compressedPageSize PageHeader
hdr) [Word8]
rem)

readPageHeader :: PageHeader -> [Word8] -> Int16 -> (PageHeader, [Word8])
readPageHeader :: PageHeader -> [Word8] -> Int16 -> (PageHeader, [Word8])
readPageHeader PageHeader
hdr [] Int16
_ = (PageHeader
hdr, [])
readPageHeader PageHeader
hdr [Word8]
xs Int16
lastFieldId =
    let
        fieldContents :: Maybe ([Word8], TType, Int16)
fieldContents = [Word8] -> Int16 -> Maybe ([Word8], TType, Int16)
readField' [Word8]
xs Int16
lastFieldId
     in
        case Maybe ([Word8], TType, Int16)
fieldContents of
            Maybe ([Word8], TType, Int16)
Nothing -> (PageHeader
hdr, Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
1 [Word8]
xs)
            Just ([Word8]
rem, TType
elemType, Int16
identifier) -> case Int16
identifier of
                Int16
1 ->
                    let
                        (Int32
pType, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageHeader -> [Word8] -> Int16 -> (PageHeader, [Word8])
readPageHeader (PageHeader
hdr{pageHeaderPageType = pageTypeFromInt pType}) [Word8]
rem' Int16
identifier
                Int16
2 ->
                    let
                        (Int32
uncompressedPageSize, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageHeader -> [Word8] -> Int16 -> (PageHeader, [Word8])
readPageHeader (PageHeader
hdr{uncompressedPageSize = uncompressedPageSize}) [Word8]
rem' Int16
identifier
                Int16
3 ->
                    let
                        (Int32
compressedPageSize, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageHeader -> [Word8] -> Int16 -> (PageHeader, [Word8])
readPageHeader (PageHeader
hdr{compressedPageSize = compressedPageSize}) [Word8]
rem' Int16
identifier
                Int16
4 ->
                    let
                        (Int32
crc, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageHeader -> [Word8] -> Int16 -> (PageHeader, [Word8])
readPageHeader (PageHeader
hdr{pageHeaderCrcChecksum = crc}) [Word8]
rem' Int16
identifier
                Int16
5 ->
                    let
                        (PageTypeHeader
dataPageHeader, [Word8]
rem') = PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader PageTypeHeader
emptyDataPageHeader [Word8]
rem Int16
0
                     in
                        PageHeader -> [Word8] -> Int16 -> (PageHeader, [Word8])
readPageHeader (PageHeader
hdr{pageTypeHeader = dataPageHeader}) [Word8]
rem' Int16
identifier
                Int16
6 -> [Char] -> (PageHeader, [Word8])
forall a. HasCallStack => [Char] -> a
error [Char]
"Index page header not supported"
                Int16
7 ->
                    let
                        (PageTypeHeader
dictionaryPageHeader, [Word8]
rem') = PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader PageTypeHeader
emptyDictionaryPageHeader [Word8]
rem Int16
0
                     in
                        PageHeader -> [Word8] -> Int16 -> (PageHeader, [Word8])
readPageHeader (PageHeader
hdr{pageTypeHeader = dictionaryPageHeader}) [Word8]
rem' Int16
identifier
                Int16
8 ->
                    let
                        (PageTypeHeader
dataPageHeaderV2, [Word8]
rem') = PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader PageTypeHeader
emptyDataPageHeaderV2 [Word8]
rem Int16
0
                     in
                        PageHeader -> [Word8] -> Int16 -> (PageHeader, [Word8])
readPageHeader (PageHeader
hdr{pageTypeHeader = dataPageHeaderV2}) [Word8]
rem' Int16
identifier
                Int16
n -> [Char] -> (PageHeader, [Word8])
forall a. HasCallStack => [Char] -> a
error ([Char] -> (PageHeader, [Word8]))
-> [Char] -> (PageHeader, [Word8])
forall a b. (a -> b) -> a -> b
$ [Char]
"Unknown page header field" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int16 -> [Char]
forall a. Show a => a -> [Char]
show Int16
n

readPageTypeHeader :: PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader :: PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader PageTypeHeader
hdr [] Int16
_ = (PageTypeHeader
hdr, [])
readPageTypeHeader hdr :: PageTypeHeader
hdr@(DictionaryPageHeader{Bool
Int32
ParquetEncoding
dictionaryPageIsSorted :: PageTypeHeader -> Bool
dictionaryPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dictionaryPageHeaderNumValues :: PageTypeHeader -> Int32
dictionaryPageHeaderNumValues :: Int32
dictionaryPageHeaderEncoding :: ParquetEncoding
dictionaryPageIsSorted :: Bool
..}) [Word8]
xs Int16
lastFieldId =
    let
        fieldContents :: Maybe ([Word8], TType, Int16)
fieldContents = [Word8] -> Int16 -> Maybe ([Word8], TType, Int16)
readField' [Word8]
xs Int16
lastFieldId
     in
        case Maybe ([Word8], TType, Int16)
fieldContents of
            Maybe ([Word8], TType, Int16)
Nothing -> (PageTypeHeader
hdr, Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
1 [Word8]
xs)
            Just ([Word8]
rem, TType
elemType, Int16
identifier) -> case Int16
identifier of
                Int16
1 ->
                    let
                        (Int32
numValues, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader (PageTypeHeader
hdr{dictionaryPageHeaderNumValues = numValues}) [Word8]
rem' Int16
identifier
                Int16
2 ->
                    let
                        (Int32
enc, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader (PageTypeHeader
hdr{dictionaryPageHeaderEncoding = parquetEncodingFromInt enc}) [Word8]
rem' Int16
identifier
                Int16
3 ->
                    let
                        (Word8
isSorted : [Word8]
rem') = [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader (PageTypeHeader
hdr{dictionaryPageIsSorted = isSorted == compactBooleanTrue}) [Word8]
rem' Int16
identifier
                Int16
n -> [Char] -> (PageTypeHeader, [Word8])
forall a. HasCallStack => [Char] -> a
error ([Char] -> (PageTypeHeader, [Word8]))
-> [Char] -> (PageTypeHeader, [Word8])
forall a b. (a -> b) -> a -> b
$ Int16 -> [Char]
forall a. Show a => a -> [Char]
show Int16
n
readPageTypeHeader hdr :: PageTypeHeader
hdr@(DataPageHeader{Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderStatistics :: PageTypeHeader -> ColumnStatistics
repetitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
definitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderNumValues :: PageTypeHeader -> Int32
dataPageHeaderNumValues :: Int32
dataPageHeaderEncoding :: ParquetEncoding
definitionLevelEncoding :: ParquetEncoding
repetitionLevelEncoding :: ParquetEncoding
dataPageHeaderStatistics :: ColumnStatistics
..}) [Word8]
xs Int16
lastFieldId =
    let
        fieldContents :: Maybe ([Word8], TType, Int16)
fieldContents = [Word8] -> Int16 -> Maybe ([Word8], TType, Int16)
readField' [Word8]
xs Int16
lastFieldId
     in
        case Maybe ([Word8], TType, Int16)
fieldContents of
            Maybe ([Word8], TType, Int16)
Nothing -> (PageTypeHeader
hdr, Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
1 [Word8]
xs)
            Just ([Word8]
rem, TType
elemType, Int16
identifier) -> case Int16
identifier of
                Int16
1 ->
                    let
                        (Int32
numValues, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader (PageTypeHeader
hdr{dataPageHeaderNumValues = numValues}) [Word8]
rem' Int16
identifier
                Int16
2 ->
                    let
                        (Int32
enc, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader (PageTypeHeader
hdr{dataPageHeaderEncoding = parquetEncodingFromInt enc}) [Word8]
rem' Int16
identifier
                Int16
3 ->
                    let
                        (Int32
enc, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader (PageTypeHeader
hdr{definitionLevelEncoding = parquetEncodingFromInt enc}) [Word8]
rem' Int16
identifier
                Int16
4 ->
                    let
                        (Int32
enc, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader (PageTypeHeader
hdr{repetitionLevelEncoding = parquetEncodingFromInt enc}) [Word8]
rem' Int16
identifier
                Int16
5 ->
                    let
                        (ColumnStatistics
stats, [Word8]
rem') = ColumnStatistics -> [Word8] -> Int16 -> (ColumnStatistics, [Word8])
readStatisticsFromBytes ColumnStatistics
emptyColumnStatistics [Word8]
rem Int16
0
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader (PageTypeHeader
hdr{dataPageHeaderStatistics = stats}) [Word8]
rem' Int16
identifier
                Int16
n -> [Char] -> (PageTypeHeader, [Word8])
forall a. HasCallStack => [Char] -> a
error ([Char] -> (PageTypeHeader, [Word8]))
-> [Char] -> (PageTypeHeader, [Word8])
forall a b. (a -> b) -> a -> b
$ Int16 -> [Char]
forall a. Show a => a -> [Char]
show Int16
n
readPageTypeHeader hdr :: PageTypeHeader
hdr@(DataPageHeaderV2{Bool
Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderV2Statistics :: PageTypeHeader -> ColumnStatistics
dataPageHeaderV2IsCompressed :: PageTypeHeader -> Bool
repetitionLevelByteLength :: PageTypeHeader -> Int32
definitionLevelByteLength :: PageTypeHeader -> Int32
dataPageHeaderV2Encoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderV2NumRows :: PageTypeHeader -> Int32
dataPageHeaderV2NumNulls :: PageTypeHeader -> Int32
dataPageHeaderV2NumValues :: PageTypeHeader -> Int32
dataPageHeaderV2NumValues :: Int32
dataPageHeaderV2NumNulls :: Int32
dataPageHeaderV2NumRows :: Int32
dataPageHeaderV2Encoding :: ParquetEncoding
definitionLevelByteLength :: Int32
repetitionLevelByteLength :: Int32
dataPageHeaderV2IsCompressed :: Bool
dataPageHeaderV2Statistics :: ColumnStatistics
..}) [Word8]
xs Int16
lastFieldId =
    let
        fieldContents :: Maybe ([Word8], TType, Int16)
fieldContents = [Word8] -> Int16 -> Maybe ([Word8], TType, Int16)
readField' [Word8]
xs Int16
lastFieldId
     in
        case Maybe ([Word8], TType, Int16)
fieldContents of
            Maybe ([Word8], TType, Int16)
Nothing -> (PageTypeHeader
hdr, Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
1 [Word8]
xs)
            Just ([Word8]
rem, TType
elemType, Int16
identifier) -> case Int16
identifier of
                Int16
1 ->
                    let
                        (Int32
numValues, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader (PageTypeHeader
hdr{dataPageHeaderV2NumValues = numValues}) [Word8]
rem' Int16
identifier
                Int16
2 ->
                    let
                        (Int32
numNulls, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader (PageTypeHeader
hdr{dataPageHeaderV2NumNulls = numNulls}) [Word8]
rem' Int16
identifier
                Int16
3 ->
                    let
                        (Int32
numRows, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader (PageTypeHeader
hdr{dataPageHeaderV2NumRows = numRows}) [Word8]
rem' Int16
identifier
                Int16
4 ->
                    let
                        (Int32
enc, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader (PageTypeHeader
hdr{dataPageHeaderV2Encoding = parquetEncodingFromInt enc}) [Word8]
rem' Int16
identifier
                Int16
5 ->
                    let
                        (Int32
n, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader (PageTypeHeader
hdr{definitionLevelByteLength = n}) [Word8]
rem' Int16
identifier
                Int16
6 ->
                    let
                        (Int32
n, [Word8]
rem') = [Word8] -> (Int32, [Word8])
readInt32FromBytes [Word8]
rem
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader (PageTypeHeader
hdr{repetitionLevelByteLength = n}) [Word8]
rem' Int16
identifier
                Int16
7 ->
                    let
                        isCompressed :: Bool
isCompressed = Bool -> Maybe Bool -> Bool
forall a. a -> Maybe a -> a
fromMaybe Bool
True (Maybe Bool -> Bool) -> Maybe Bool -> Bool
forall a b. (a -> b) -> a -> b
$ (Word8 -> Bool) -> Maybe Word8 -> Maybe Bool
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
compactBooleanTrue) (Word8 -> Bool) -> (Word8 -> Word8) -> Word8 -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Word8 -> Word8 -> Word8
forall a. Bits a => a -> a -> a
.&. Word8
0x0f)) ([Word8] -> Maybe Word8
forall a. [a] -> Maybe a
safeHead [Word8]
xs)
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader (PageTypeHeader
hdr{dataPageHeaderV2IsCompressed = dataPageHeaderV2IsCompressed}) [Word8]
rem Int16
identifier
                Int16
8 ->
                    let
                        (ColumnStatistics
stats, [Word8]
rem') = ColumnStatistics -> [Word8] -> Int16 -> (ColumnStatistics, [Word8])
readStatisticsFromBytes ColumnStatistics
emptyColumnStatistics [Word8]
rem Int16
0
                     in
                        PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
readPageTypeHeader (PageTypeHeader
hdr{dataPageHeaderV2Statistics = stats}) [Word8]
rem' Int16
identifier
                Int16
n -> [Char] -> (PageTypeHeader, [Word8])
forall a. HasCallStack => [Char] -> a
error ([Char] -> (PageTypeHeader, [Word8]))
-> [Char] -> (PageTypeHeader, [Word8])
forall a b. (a -> b) -> a -> b
$ Int16 -> [Char]
forall a. Show a => a -> [Char]
show Int16
n

safeHead :: [a] -> Maybe a
safeHead :: forall a. [a] -> Maybe a
safeHead [] = Maybe a
forall a. Maybe a
Nothing
safeHead (a
x : [a]
_) = a -> Maybe a
forall a. a -> Maybe a
Just a
x

readField' :: [Word8] -> Int16 -> Maybe ([Word8], TType, Int16)
readField' :: [Word8] -> Int16 -> Maybe ([Word8], TType, Int16)
readField' [] Int16
_ = Maybe ([Word8], TType, Int16)
forall a. Maybe a
Nothing
readField' (Word8
x : [Word8]
xs) Int16
lastFieldId
    | Word8
x Word8 -> Word8 -> Word8
forall a. Bits a => a -> a -> a
.&. Word8
0x0f Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
0 = Maybe ([Word8], TType, Int16)
forall a. Maybe a
Nothing
    | Bool
otherwise =
        let modifier :: Int16
modifier = Word8 -> Int16
forall a b. (Integral a, Num b) => a -> b
fromIntegral ((Word8
x Word8 -> Word8 -> Word8
forall a. Bits a => a -> a -> a
.&. Word8
0xf0) Word8 -> Int -> Word8
forall a. Bits a => a -> Int -> a
`shiftR` Int
4) :: Int16
            (Int16
identifier, [Word8]
rem) = if Int16
modifier Int16 -> Int16 -> Bool
forall a. Eq a => a -> a -> Bool
== Int16
0 then forall a. Integral a => [Word8] -> (a, [Word8])
readIntFromBytes @Int16 [Word8]
xs else (Int16
lastFieldId Int16 -> Int16 -> Int16
forall a. Num a => a -> a -> a
+ Int16
modifier, [Word8]
xs)
            elemType :: TType
elemType = Word8 -> TType
toTType (Word8
x Word8 -> Word8 -> Word8
forall a. Bits a => a -> a -> a
.&. Word8
0x0f)
         in ([Word8], TType, Int16) -> Maybe ([Word8], TType, Int16)
forall a. a -> Maybe a
Just ([Word8]
rem, TType
elemType, Int16
identifier)

readAllPages :: CompressionCodec -> [Word8] -> IO [Page]
readAllPages :: CompressionCodec -> [Word8] -> IO [Page]
readAllPages CompressionCodec
codec [Word8]
bytes = [Word8] -> [Page] -> IO [Page]
go [Word8]
bytes []
  where
    go :: [Word8] -> [Page] -> IO [Page]
go [] [Page]
acc = [Page] -> IO [Page]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Page] -> [Page]
forall a. [a] -> [a]
reverse [Page]
acc)
    go [Word8]
bs [Page]
acc = do
        (Maybe Page
maybePage, [Word8]
remaining) <- CompressionCodec -> [Word8] -> IO (Maybe Page, [Word8])
readPage CompressionCodec
codec [Word8]
bs
        case Maybe Page
maybePage of
            Maybe Page
Nothing -> [Page] -> IO [Page]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Page] -> [Page]
forall a. [a] -> [a]
reverse [Page]
acc)
            Just Page
page -> [Word8] -> [Page] -> IO [Page]
go [Word8]
remaining (Page
page Page -> [Page] -> [Page]
forall a. a -> [a] -> [a]
: [Page]
acc)

readNInt32 :: Int -> [Word8] -> ([Int32], [Word8])
readNInt32 :: Int -> [Word8] -> ([Int32], [Word8])
readNInt32 Int
0 [Word8]
bs = ([], [Word8]
bs)
readNInt32 Int
k [Word8]
bs =
    let x :: Int32
x = [Word8] -> Int32
littleEndianInt32 (Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take Int
4 [Word8]
bs)
        bs' :: [Word8]
bs' = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
4 [Word8]
bs
        ([Int32]
xs, [Word8]
rest) = Int -> [Word8] -> ([Int32], [Word8])
readNInt32 (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) [Word8]
bs'
     in (Int32
x Int32 -> [Int32] -> [Int32]
forall a. a -> [a] -> [a]
: [Int32]
xs, [Word8]
rest)

readNDouble :: Int -> [Word8] -> ([Double], [Word8])
readNDouble :: Int -> [Word8] -> ([Double], [Word8])
readNDouble Int
0 [Word8]
bs = ([], [Word8]
bs)
readNDouble Int
k [Word8]
bs =
    let x :: Double
x = Word64 -> Double
castWord64ToDouble ([Word8] -> Word64
littleEndianWord64 (Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take Int
8 [Word8]
bs))
        bs' :: [Word8]
bs' = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
8 [Word8]
bs
        ([Double]
xs, [Word8]
rest) = Int -> [Word8] -> ([Double], [Word8])
readNDouble (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) [Word8]
bs'
     in (Double
x Double -> [Double] -> [Double]
forall a. a -> [a] -> [a]
: [Double]
xs, [Word8]
rest)

readNByteArrays :: Int -> [Word8] -> ([[Word8]], [Word8])
readNByteArrays :: Int -> [Word8] -> ([[Word8]], [Word8])
readNByteArrays Int
0 [Word8]
bs = ([], [Word8]
bs)
readNByteArrays Int
k [Word8]
bs =
    let len :: Int
len = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral ([Word8] -> Int32
littleEndianInt32 (Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take Int
4 [Word8]
bs)) :: Int
        body :: [Word8]
body = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take Int
len (Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
4 [Word8]
bs)
        bs' :: [Word8]
bs' = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop (Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
len) [Word8]
bs
        ([[Word8]]
xs, [Word8]
rest) = Int -> [Word8] -> ([[Word8]], [Word8])
readNByteArrays (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) [Word8]
bs'
     in ([Word8]
body [Word8] -> [[Word8]] -> [[Word8]]
forall a. a -> [a] -> [a]
: [[Word8]]
xs, [Word8]
rest)

readNBool :: Int -> [Word8] -> ([Bool], [Word8])
readNBool :: Int -> [Word8] -> ([Bool], [Word8])
readNBool Int
0 [Word8]
bs = ([], [Word8]
bs)
readNBool Int
count [Word8]
bs =
    let totalBytes :: Int
totalBytes = (Int
count Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
7) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
8
        chunk :: [Word8]
chunk = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take Int
totalBytes [Word8]
bs
        rest :: [Word8]
rest = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
totalBytes [Word8]
bs
        bits :: [Bool]
bits = (Word8 -> [Bool]) -> [Word8] -> [Bool]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (\Word8
b -> (Int -> Bool) -> [Int] -> [Bool]
forall a b. (a -> b) -> [a] -> [b]
map (\Int
i -> (Word8
b Word8 -> Int -> Word8
forall a. Bits a => a -> Int -> a
`shiftR` Int
i) Word8 -> Word8 -> Word8
forall a. Bits a => a -> a -> a
.&. Word8
1 Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
1) [Int
0 .. Int
7]) [Word8]
chunk
        bools :: [Bool]
bools = Int -> [Bool] -> [Bool]
forall a. Int -> [a] -> [a]
take Int
count [Bool]
bits
     in ([Bool]
bools, [Word8]
rest)

readNInt64 :: Int -> [Word8] -> ([Int64], [Word8])
readNInt64 :: Int -> [Word8] -> ([Int64], [Word8])
readNInt64 Int
0 [Word8]
bs = ([], [Word8]
bs)
readNInt64 Int
k [Word8]
bs =
    let x :: Int64
x = Word64 -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral ([Word8] -> Word64
littleEndianWord64 (Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take Int
8 [Word8]
bs))
        bs' :: [Word8]
bs' = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
8 [Word8]
bs
        ([Int64]
xs, [Word8]
rest) = Int -> [Word8] -> ([Int64], [Word8])
readNInt64 (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) [Word8]
bs'
     in (Int64
x Int64 -> [Int64] -> [Int64]
forall a. a -> [a] -> [a]
: [Int64]
xs, [Word8]
rest)

readNFloat :: Int -> [Word8] -> ([Float], [Word8])
readNFloat :: Int -> [Word8] -> ([Float], [Word8])
readNFloat Int
0 [Word8]
bs = ([], [Word8]
bs)
readNFloat Int
k [Word8]
bs =
    let x :: Float
x = Word32 -> Float
castWord32ToFloat ([Word8] -> Word32
littleEndianWord32 (Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take Int
4 [Word8]
bs))
        bs' :: [Word8]
bs' = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
4 [Word8]
bs
        ([Float]
xs, [Word8]
rest) = Int -> [Word8] -> ([Float], [Word8])
readNFloat (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) [Word8]
bs'
     in (Float
x Float -> [Float] -> [Float]
forall a. a -> [a] -> [a]
: [Float]
xs, [Word8]
rest)

splitFixed :: Int -> Int -> [Word8] -> ([[Word8]], [Word8])
splitFixed :: Int -> Int -> [Word8] -> ([[Word8]], [Word8])
splitFixed Int
0 Int
_ [Word8]
bs = ([], [Word8]
bs)
splitFixed Int
k Int
len [Word8]
bs =
    let body :: [Word8]
body = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take Int
len [Word8]
bs
        bs' :: [Word8]
bs' = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
len [Word8]
bs
        ([[Word8]]
xs, [Word8]
rest) = Int -> Int -> [Word8] -> ([[Word8]], [Word8])
splitFixed (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) Int
len [Word8]
bs'
     in ([Word8]
body [Word8] -> [[Word8]] -> [[Word8]]
forall a. a -> [a] -> [a]
: [[Word8]]
xs, [Word8]
rest)

readStatisticsFromBytes :: ColumnStatistics -> [Word8] -> Int16 -> (ColumnStatistics, [Word8])
readStatisticsFromBytes :: ColumnStatistics -> [Word8] -> Int16 -> (ColumnStatistics, [Word8])
readStatisticsFromBytes ColumnStatistics
cs [Word8]
xs Int16
lastFieldId =
    let
        fieldContents :: Maybe ([Word8], TType, Int16)
fieldContents = [Word8] -> Int16 -> Maybe ([Word8], TType, Int16)
readField' [Word8]
xs Int16
lastFieldId
     in
        case Maybe ([Word8], TType, Int16)
fieldContents of
            Maybe ([Word8], TType, Int16)
Nothing -> (ColumnStatistics
cs, Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
1 [Word8]
xs)
            Just ([Word8]
rem, TType
elemType, Int16
identifier) -> case Int16
identifier of
                Int16
1 ->
                    let
                        ([Word8]
maxInBytes, [Word8]
rem') = [Word8] -> ([Word8], [Word8])
readByteStringFromBytes [Word8]
rem
                     in
                        ColumnStatistics -> [Word8] -> Int16 -> (ColumnStatistics, [Word8])
readStatisticsFromBytes (ColumnStatistics
cs{columnMax = maxInBytes}) [Word8]
rem' Int16
identifier
                Int16
2 ->
                    let
                        ([Word8]
minInBytes, [Word8]
rem') = [Word8] -> ([Word8], [Word8])
readByteStringFromBytes [Word8]
rem
                     in
                        ColumnStatistics -> [Word8] -> Int16 -> (ColumnStatistics, [Word8])
readStatisticsFromBytes (ColumnStatistics
cs{columnMin = minInBytes}) [Word8]
rem' Int16
identifier
                Int16
3 ->
                    let
                        (Int64
nullCount, [Word8]
rem') = forall a. Integral a => [Word8] -> (a, [Word8])
readIntFromBytes @Int64 [Word8]
rem
                     in
                        ColumnStatistics -> [Word8] -> Int16 -> (ColumnStatistics, [Word8])
readStatisticsFromBytes (ColumnStatistics
cs{columnNullCount = nullCount}) [Word8]
rem' Int16
identifier
                Int16
4 ->
                    let
                        (Int64
distinctCount, [Word8]
rem') = forall a. Integral a => [Word8] -> (a, [Word8])
readIntFromBytes @Int64 [Word8]
rem
                     in
                        ColumnStatistics -> [Word8] -> Int16 -> (ColumnStatistics, [Word8])
readStatisticsFromBytes (ColumnStatistics
cs{columnDistictCount = distinctCount}) [Word8]
rem' Int16
identifier
                Int16
5 ->
                    let
                        ([Word8]
maxInBytes, [Word8]
rem') = [Word8] -> ([Word8], [Word8])
readByteStringFromBytes [Word8]
rem
                     in
                        ColumnStatistics -> [Word8] -> Int16 -> (ColumnStatistics, [Word8])
readStatisticsFromBytes (ColumnStatistics
cs{columnMaxValue = maxInBytes}) [Word8]
rem' Int16
identifier
                Int16
6 ->
                    let
                        ([Word8]
minInBytes, [Word8]
rem') = [Word8] -> ([Word8], [Word8])
readByteStringFromBytes [Word8]
rem
                     in
                        ColumnStatistics -> [Word8] -> Int16 -> (ColumnStatistics, [Word8])
readStatisticsFromBytes (ColumnStatistics
cs{columnMinValue = minInBytes}) [Word8]
rem' Int16
identifier
                Int16
7 ->
                    let
                        (Word8
isMaxValueExact : [Word8]
rem') = [Word8]
rem
                     in
                        ColumnStatistics -> [Word8] -> Int16 -> (ColumnStatistics, [Word8])
readStatisticsFromBytes (ColumnStatistics
cs{isColumnMaxValueExact = isMaxValueExact == compactBooleanTrue}) [Word8]
rem' Int16
identifier
                Int16
8 ->
                    let
                        (Word8
isMinValueExact : [Word8]
rem') = [Word8]
rem
                     in
                        ColumnStatistics -> [Word8] -> Int16 -> (ColumnStatistics, [Word8])
readStatisticsFromBytes (ColumnStatistics
cs{isColumnMinValueExact = isMinValueExact == compactBooleanTrue}) [Word8]
rem' Int16
identifier
                Int16
n -> [Char] -> (ColumnStatistics, [Word8])
forall a. HasCallStack => [Char] -> a
error ([Char] -> (ColumnStatistics, [Word8]))
-> [Char] -> (ColumnStatistics, [Word8])
forall a b. (a -> b) -> a -> b
$ Int16 -> [Char]
forall a. Show a => a -> [Char]
show Int16
n