{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeApplications #-}
module DataFrame.IO.Parquet.Page where
import qualified Codec.Compression.GZip as GZip
import qualified Codec.Compression.Zstd.Streaming as Zstd
import Data.Bits
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as LB
import Data.Int
import Data.Maybe (fromMaybe)
import qualified Data.Vector.Unboxed as VU
import DataFrame.IO.Parquet.Binary
import DataFrame.IO.Parquet.Thrift
import DataFrame.IO.Parquet.Types
import GHC.Float
import qualified Snappy
isDataPage :: Page -> Bool
isDataPage :: Page -> Bool
isDataPage Page
page = case PageHeader -> PageTypeHeader
pageTypeHeader (Page -> PageHeader
pageHeader Page
page) of
DataPageHeader{Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderNumValues :: Int32
dataPageHeaderEncoding :: ParquetEncoding
definitionLevelEncoding :: ParquetEncoding
repetitionLevelEncoding :: ParquetEncoding
dataPageHeaderStatistics :: ColumnStatistics
dataPageHeaderStatistics :: PageTypeHeader -> ColumnStatistics
repetitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
definitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderNumValues :: PageTypeHeader -> Int32
..} -> Bool
True
DataPageHeaderV2{Bool
Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderV2NumValues :: Int32
dataPageHeaderV2NumNulls :: Int32
dataPageHeaderV2NumRows :: Int32
dataPageHeaderV2Encoding :: ParquetEncoding
definitionLevelByteLength :: Int32
repetitionLevelByteLength :: Int32
dataPageHeaderV2IsCompressed :: Bool
dataPageHeaderV2Statistics :: ColumnStatistics
dataPageHeaderV2Statistics :: PageTypeHeader -> ColumnStatistics
dataPageHeaderV2IsCompressed :: PageTypeHeader -> Bool
repetitionLevelByteLength :: PageTypeHeader -> Int32
definitionLevelByteLength :: PageTypeHeader -> Int32
dataPageHeaderV2Encoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderV2NumRows :: PageTypeHeader -> Int32
dataPageHeaderV2NumNulls :: PageTypeHeader -> Int32
dataPageHeaderV2NumValues :: PageTypeHeader -> Int32
..} -> Bool
True
PageTypeHeader
_ -> Bool
False
isDictionaryPage :: Page -> Bool
isDictionaryPage :: Page -> Bool
isDictionaryPage Page
page = case PageHeader -> PageTypeHeader
pageTypeHeader (Page -> PageHeader
pageHeader Page
page) of
DictionaryPageHeader{Bool
Int32
ParquetEncoding
dictionaryPageHeaderNumValues :: Int32
dictionaryPageHeaderEncoding :: ParquetEncoding
dictionaryPageIsSorted :: Bool
dictionaryPageIsSorted :: PageTypeHeader -> Bool
dictionaryPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dictionaryPageHeaderNumValues :: PageTypeHeader -> Int32
..} -> Bool
True
PageTypeHeader
_ -> Bool
False
readPage :: CompressionCodec -> BS.ByteString -> IO (Maybe Page, BS.ByteString)
readPage :: CompressionCodec -> ByteString -> IO (Maybe Page, ByteString)
readPage CompressionCodec
c ByteString
columnBytes =
if ByteString -> Bool
BS.null ByteString
columnBytes
then (Maybe Page, ByteString) -> IO (Maybe Page, ByteString)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Page
forall a. Maybe a
Nothing, ByteString
BS.empty)
else do
let (PageHeader
hdr, ByteString
rem) = PageHeader -> ByteString -> Int16 -> (PageHeader, ByteString)
readPageHeader PageHeader
emptyPageHeader ByteString
columnBytes Int16
0
let compressed :: ByteString
compressed = Int -> ByteString -> ByteString
BS.take (Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32 -> Int) -> Int32 -> Int
forall a b. (a -> b) -> a -> b
$ PageHeader -> Int32
compressedPageSize PageHeader
hdr) ByteString
rem
ByteString
fullData <- case CompressionCodec
c of
CompressionCodec
ZSTD -> do
Result
result <- IO Result
Zstd.decompress
Result -> ByteString -> [ByteString] -> IO ByteString
drainZstd Result
result ByteString
compressed []
where
drainZstd :: Result -> ByteString -> [ByteString] -> IO ByteString
drainZstd (Zstd.Consume ByteString -> IO Result
f) ByteString
input [ByteString]
acc = do
Result
result <- ByteString -> IO Result
f ByteString
input
Result -> ByteString -> [ByteString] -> IO ByteString
drainZstd Result
result ByteString
BS.empty [ByteString]
acc
drainZstd (Zstd.Produce ByteString
chunk IO Result
next) ByteString
_ [ByteString]
acc = do
Result
result <- IO Result
next
Result -> ByteString -> [ByteString] -> IO ByteString
drainZstd Result
result ByteString
BS.empty (ByteString
chunk ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: [ByteString]
acc)
drainZstd (Zstd.Done ByteString
final) ByteString
_ [ByteString]
acc =
ByteString -> IO ByteString
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
BS.concat ([ByteString] -> [ByteString]
forall a. [a] -> [a]
reverse (ByteString
final ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: [ByteString]
acc))
drainZstd (Zstd.Error String
msg String
msg2) ByteString
_ [ByteString]
_ =
String -> IO ByteString
forall a. HasCallStack => String -> a
error (String
"ZSTD error: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
msg String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
msg2)
CompressionCodec
SNAPPY -> case ByteString -> Either DecodeError ByteString
Snappy.decompress ByteString
compressed of
Left DecodeError
e -> String -> IO ByteString
forall a. HasCallStack => String -> a
error (DecodeError -> String
forall a. Show a => a -> String
show DecodeError
e)
Right ByteString
res -> ByteString -> IO ByteString
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
res
CompressionCodec
UNCOMPRESSED -> ByteString -> IO ByteString
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
compressed
CompressionCodec
GZIP -> ByteString -> IO ByteString
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LazyByteString -> ByteString
LB.toStrict (LazyByteString -> LazyByteString
GZip.decompress (ByteString -> LazyByteString
BS.fromStrict ByteString
compressed)))
CompressionCodec
other -> String -> IO ByteString
forall a. HasCallStack => String -> a
error (String
"Unsupported compression type: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ CompressionCodec -> String
forall a. Show a => a -> String
show CompressionCodec
other)
(Maybe Page, ByteString) -> IO (Maybe Page, ByteString)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
( Page -> Maybe Page
forall a. a -> Maybe a
Just (Page -> Maybe Page) -> Page -> Maybe Page
forall a b. (a -> b) -> a -> b
$ PageHeader -> ByteString -> Page
Page PageHeader
hdr ByteString
fullData
, Int -> ByteString -> ByteString
BS.drop (Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32 -> Int) -> Int32 -> Int
forall a b. (a -> b) -> a -> b
$ PageHeader -> Int32
compressedPageSize PageHeader
hdr) ByteString
rem
)
readPageHeader ::
PageHeader -> BS.ByteString -> Int16 -> (PageHeader, BS.ByteString)
PageHeader
hdr ByteString
xs Int16
lastFieldId =
if ByteString -> Bool
BS.null ByteString
xs
then (PageHeader
hdr, ByteString
BS.empty)
else
let
fieldContents :: Maybe (ByteString, TType, Int16)
fieldContents = ByteString -> Int16 -> Maybe (ByteString, TType, Int16)
readField' ByteString
xs Int16
lastFieldId
in
case Maybe (ByteString, TType, Int16)
fieldContents of
Maybe (ByteString, TType, Int16)
Nothing -> (PageHeader
hdr, Int -> ByteString -> ByteString
BS.drop Int
1 ByteString
xs)
Just (ByteString
rem, TType
elemType, Int16
identifier) -> case Int16
identifier of
Int16
1 ->
let
(Int32
pType, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
in
PageHeader -> ByteString -> Int16 -> (PageHeader, ByteString)
readPageHeader (PageHeader
hdr{pageHeaderPageType = pageTypeFromInt pType}) ByteString
rem' Int16
identifier
Int16
2 ->
let
(Int32
uncompressedPageSize, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
in
PageHeader -> ByteString -> Int16 -> (PageHeader, ByteString)
readPageHeader
(PageHeader
hdr{uncompressedPageSize = uncompressedPageSize})
ByteString
rem'
Int16
identifier
Int16
3 ->
let
(Int32
compressedPageSize, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
in
PageHeader -> ByteString -> Int16 -> (PageHeader, ByteString)
readPageHeader (PageHeader
hdr{compressedPageSize = compressedPageSize}) ByteString
rem' Int16
identifier
Int16
4 ->
let
(Int32
crc, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
in
PageHeader -> ByteString -> Int16 -> (PageHeader, ByteString)
readPageHeader (PageHeader
hdr{pageHeaderCrcChecksum = crc}) ByteString
rem' Int16
identifier
Int16
5 ->
let
(PageTypeHeader
dataPageHeader, ByteString
rem') = PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader PageTypeHeader
emptyDataPageHeader ByteString
rem Int16
0
in
PageHeader -> ByteString -> Int16 -> (PageHeader, ByteString)
readPageHeader (PageHeader
hdr{pageTypeHeader = dataPageHeader}) ByteString
rem' Int16
identifier
Int16
6 -> String -> (PageHeader, ByteString)
forall a. HasCallStack => String -> a
error String
"Index page header not supported"
Int16
7 ->
let
(PageTypeHeader
dictionaryPageHeader, ByteString
rem') = PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader PageTypeHeader
emptyDictionaryPageHeader ByteString
rem Int16
0
in
PageHeader -> ByteString -> Int16 -> (PageHeader, ByteString)
readPageHeader (PageHeader
hdr{pageTypeHeader = dictionaryPageHeader}) ByteString
rem' Int16
identifier
Int16
8 ->
let
(PageTypeHeader
dataPageHeaderV2, ByteString
rem') = PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader PageTypeHeader
emptyDataPageHeaderV2 ByteString
rem Int16
0
in
PageHeader -> ByteString -> Int16 -> (PageHeader, ByteString)
readPageHeader (PageHeader
hdr{pageTypeHeader = dataPageHeaderV2}) ByteString
rem' Int16
identifier
Int16
n -> String -> (PageHeader, ByteString)
forall a. HasCallStack => String -> a
error (String -> (PageHeader, ByteString))
-> String -> (PageHeader, ByteString)
forall a b. (a -> b) -> a -> b
$ String
"Unknown page header field " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Int16 -> String
forall a. Show a => a -> String
show Int16
n
readPageTypeHeader ::
PageTypeHeader -> BS.ByteString -> Int16 -> (PageTypeHeader, BS.ByteString)
PageTypeHeader
INDEX_PAGE_HEADER ByteString
_ Int16
_ = String -> (PageTypeHeader, ByteString)
forall a. HasCallStack => String -> a
error String
"readPageTypeHeader: unsupported INDEX_PAGE_HEADER"
readPageTypeHeader PageTypeHeader
PAGE_TYPE_HEADER_UNKNOWN ByteString
_ Int16
_ = String -> (PageTypeHeader, ByteString)
forall a. HasCallStack => String -> a
error String
"readPageTypeHeader: unsupported PAGE_TYPE_HEADER_UNKNOWN"
readPageTypeHeader hdr :: PageTypeHeader
hdr@(DictionaryPageHeader{Bool
Int32
ParquetEncoding
dictionaryPageIsSorted :: PageTypeHeader -> Bool
dictionaryPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dictionaryPageHeaderNumValues :: PageTypeHeader -> Int32
dictionaryPageHeaderNumValues :: Int32
dictionaryPageHeaderEncoding :: ParquetEncoding
dictionaryPageIsSorted :: Bool
..}) ByteString
xs Int16
lastFieldId =
if ByteString -> Bool
BS.null ByteString
xs
then (PageTypeHeader
hdr, ByteString
BS.empty)
else
let
fieldContents :: Maybe (ByteString, TType, Int16)
fieldContents = ByteString -> Int16 -> Maybe (ByteString, TType, Int16)
readField' ByteString
xs Int16
lastFieldId
in
case Maybe (ByteString, TType, Int16)
fieldContents of
Maybe (ByteString, TType, Int16)
Nothing -> (PageTypeHeader
hdr, Int -> ByteString -> ByteString
BS.drop Int
1 ByteString
xs)
Just (ByteString
rem, TType
elemType, Int16
identifier) -> case Int16
identifier of
Int16
1 ->
let
(Int32
numValues, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
in
PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader
(PageTypeHeader
hdr{dictionaryPageHeaderNumValues = numValues})
ByteString
rem'
Int16
identifier
Int16
2 ->
let
(Int32
enc, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
in
PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader
(PageTypeHeader
hdr{dictionaryPageHeaderEncoding = parquetEncodingFromInt enc})
ByteString
rem'
Int16
identifier
Int16
3 ->
let
isSorted :: Word8
isSorted = Word8 -> Maybe Word8 -> Word8
forall a. a -> Maybe a -> a
fromMaybe (String -> Word8
forall a. HasCallStack => String -> a
error String
"readPageTypeHeader: not enough bytes") (ByteString
rem ByteString -> Int -> Maybe Word8
BS.!? Int
0)
in
PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader
(PageTypeHeader
hdr{dictionaryPageIsSorted = isSorted == compactBooleanTrue})
ByteString
rem
Int16
identifier
Int16
n ->
String -> (PageTypeHeader, ByteString)
forall a. HasCallStack => String -> a
error (String -> (PageTypeHeader, ByteString))
-> String -> (PageTypeHeader, ByteString)
forall a b. (a -> b) -> a -> b
$ String
"readPageTypeHeader: unsupported identifier " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Int16 -> String
forall a. Show a => a -> String
show Int16
n
readPageTypeHeader hdr :: PageTypeHeader
hdr@(DataPageHeader{Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderStatistics :: PageTypeHeader -> ColumnStatistics
repetitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
definitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderNumValues :: PageTypeHeader -> Int32
dataPageHeaderNumValues :: Int32
dataPageHeaderEncoding :: ParquetEncoding
definitionLevelEncoding :: ParquetEncoding
repetitionLevelEncoding :: ParquetEncoding
dataPageHeaderStatistics :: ColumnStatistics
..}) ByteString
xs Int16
lastFieldId =
if ByteString -> Bool
BS.null ByteString
xs
then (PageTypeHeader
hdr, ByteString
BS.empty)
else
let
fieldContents :: Maybe (ByteString, TType, Int16)
fieldContents = ByteString -> Int16 -> Maybe (ByteString, TType, Int16)
readField' ByteString
xs Int16
lastFieldId
in
case Maybe (ByteString, TType, Int16)
fieldContents of
Maybe (ByteString, TType, Int16)
Nothing -> (PageTypeHeader
hdr, Int -> ByteString -> ByteString
BS.drop Int
1 ByteString
xs)
Just (ByteString
rem, TType
elemType, Int16
identifier) -> case Int16
identifier of
Int16
1 ->
let
(Int32
numValues, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
in
PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader (PageTypeHeader
hdr{dataPageHeaderNumValues = numValues}) ByteString
rem' Int16
identifier
Int16
2 ->
let
(Int32
enc, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
in
PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader
(PageTypeHeader
hdr{dataPageHeaderEncoding = parquetEncodingFromInt enc})
ByteString
rem'
Int16
identifier
Int16
3 ->
let
(Int32
enc, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
in
PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader
(PageTypeHeader
hdr{definitionLevelEncoding = parquetEncodingFromInt enc})
ByteString
rem'
Int16
identifier
Int16
4 ->
let
(Int32
enc, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
in
PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader
(PageTypeHeader
hdr{repetitionLevelEncoding = parquetEncodingFromInt enc})
ByteString
rem'
Int16
identifier
Int16
5 ->
let
(ColumnStatistics
stats, ByteString
rem') = ColumnStatistics
-> ByteString -> Int16 -> (ColumnStatistics, ByteString)
readStatisticsFromBytes ColumnStatistics
emptyColumnStatistics ByteString
rem Int16
0
in
PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader (PageTypeHeader
hdr{dataPageHeaderStatistics = stats}) ByteString
rem' Int16
identifier
Int16
n -> String -> (PageTypeHeader, ByteString)
forall a. HasCallStack => String -> a
error (String -> (PageTypeHeader, ByteString))
-> String -> (PageTypeHeader, ByteString)
forall a b. (a -> b) -> a -> b
$ Int16 -> String
forall a. Show a => a -> String
show Int16
n
readPageTypeHeader hdr :: PageTypeHeader
hdr@(DataPageHeaderV2{Bool
Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderV2Statistics :: PageTypeHeader -> ColumnStatistics
dataPageHeaderV2IsCompressed :: PageTypeHeader -> Bool
repetitionLevelByteLength :: PageTypeHeader -> Int32
definitionLevelByteLength :: PageTypeHeader -> Int32
dataPageHeaderV2Encoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderV2NumRows :: PageTypeHeader -> Int32
dataPageHeaderV2NumNulls :: PageTypeHeader -> Int32
dataPageHeaderV2NumValues :: PageTypeHeader -> Int32
dataPageHeaderV2NumValues :: Int32
dataPageHeaderV2NumNulls :: Int32
dataPageHeaderV2NumRows :: Int32
dataPageHeaderV2Encoding :: ParquetEncoding
definitionLevelByteLength :: Int32
repetitionLevelByteLength :: Int32
dataPageHeaderV2IsCompressed :: Bool
dataPageHeaderV2Statistics :: ColumnStatistics
..}) ByteString
xs Int16
lastFieldId =
if ByteString -> Bool
BS.null ByteString
xs
then (PageTypeHeader
hdr, ByteString
BS.empty)
else
let
fieldContents :: Maybe (ByteString, TType, Int16)
fieldContents = ByteString -> Int16 -> Maybe (ByteString, TType, Int16)
readField' ByteString
xs Int16
lastFieldId
in
case Maybe (ByteString, TType, Int16)
fieldContents of
Maybe (ByteString, TType, Int16)
Nothing -> (PageTypeHeader
hdr, Int -> ByteString -> ByteString
BS.drop Int
1 ByteString
xs)
Just (ByteString
rem, TType
elemType, Int16
identifier) -> case Int16
identifier of
Int16
1 ->
let
(Int32
numValues, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
in
PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader (PageTypeHeader
hdr{dataPageHeaderV2NumValues = numValues}) ByteString
rem' Int16
identifier
Int16
2 ->
let
(Int32
numNulls, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
in
PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader (PageTypeHeader
hdr{dataPageHeaderV2NumNulls = numNulls}) ByteString
rem' Int16
identifier
Int16
3 ->
let
(Int32
numRows, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
in
PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader (PageTypeHeader
hdr{dataPageHeaderV2NumRows = numRows}) ByteString
rem' Int16
identifier
Int16
4 ->
let
(Int32
enc, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
in
PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader
(PageTypeHeader
hdr{dataPageHeaderV2Encoding = parquetEncodingFromInt enc})
ByteString
rem'
Int16
identifier
Int16
5 ->
let
(Int32
n, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
in
PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader (PageTypeHeader
hdr{definitionLevelByteLength = n}) ByteString
rem' Int16
identifier
Int16
6 ->
let
(Int32
n, ByteString
rem') = ByteString -> (Int32, ByteString)
readInt32FromBytes ByteString
rem
in
PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader (PageTypeHeader
hdr{repetitionLevelByteLength = n}) ByteString
rem' Int16
identifier
Int16
7 ->
let
(Bool
isCompressed, ByteString
rem') = case ByteString -> Maybe (Word8, ByteString)
BS.uncons ByteString
rem of
Just (Word8
b, ByteString
bytes) -> ((Word8
b Word8 -> Word8 -> Word8
forall a. Bits a => a -> a -> a
.&. Word8
0x0f) Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
compactBooleanTrue, ByteString
bytes)
Maybe (Word8, ByteString)
Nothing -> (Bool
True, ByteString
BS.empty)
in
PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader
(PageTypeHeader
hdr{dataPageHeaderV2IsCompressed = isCompressed})
ByteString
rem'
Int16
identifier
Int16
8 ->
let
(ColumnStatistics
stats, ByteString
rem') = ColumnStatistics
-> ByteString -> Int16 -> (ColumnStatistics, ByteString)
readStatisticsFromBytes ColumnStatistics
emptyColumnStatistics ByteString
rem Int16
0
in
PageTypeHeader
-> ByteString -> Int16 -> (PageTypeHeader, ByteString)
readPageTypeHeader
(PageTypeHeader
hdr{dataPageHeaderV2Statistics = stats})
ByteString
rem'
Int16
identifier
Int16
n -> String -> (PageTypeHeader, ByteString)
forall a. HasCallStack => String -> a
error (String -> (PageTypeHeader, ByteString))
-> String -> (PageTypeHeader, ByteString)
forall a b. (a -> b) -> a -> b
$ Int16 -> String
forall a. Show a => a -> String
show Int16
n
readField' :: BS.ByteString -> Int16 -> Maybe (BS.ByteString, TType, Int16)
readField' :: ByteString -> Int16 -> Maybe (ByteString, TType, Int16)
readField' ByteString
bs Int16
lastFieldId = case ByteString -> Maybe (Word8, ByteString)
BS.uncons ByteString
bs of
Maybe (Word8, ByteString)
Nothing -> Maybe (ByteString, TType, Int16)
forall a. Maybe a
Nothing
Just (Word8
x, ByteString
xs) ->
if Word8
x Word8 -> Word8 -> Word8
forall a. Bits a => a -> a -> a
.&. Word8
0x0f Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
0
then Maybe (ByteString, TType, Int16)
forall a. Maybe a
Nothing
else
let modifier :: Int16
modifier = Word8 -> Int16
forall a b. (Integral a, Num b) => a -> b
fromIntegral ((Word8
x Word8 -> Word8 -> Word8
forall a. Bits a => a -> a -> a
.&. Word8
0xf0) Word8 -> Int -> Word8
forall a. Bits a => a -> Int -> a
`shiftR` Int
4) :: Int16
(Int16
identifier, ByteString
rem) =
if Int16
modifier Int16 -> Int16 -> Bool
forall a. Eq a => a -> a -> Bool
== Int16
0
then forall a. Integral a => ByteString -> (a, ByteString)
readIntFromBytes @Int16 ByteString
xs
else (Int16
lastFieldId Int16 -> Int16 -> Int16
forall a. Num a => a -> a -> a
+ Int16
modifier, ByteString
xs)
elemType :: TType
elemType = Word8 -> TType
toTType (Word8
x Word8 -> Word8 -> Word8
forall a. Bits a => a -> a -> a
.&. Word8
0x0f)
in (ByteString, TType, Int16) -> Maybe (ByteString, TType, Int16)
forall a. a -> Maybe a
Just (ByteString
rem, TType
elemType, Int16
identifier)
readAllPages :: CompressionCodec -> BS.ByteString -> IO [Page]
readAllPages :: CompressionCodec -> ByteString -> IO [Page]
readAllPages CompressionCodec
codec ByteString
bytes = ByteString -> [Page] -> IO [Page]
go ByteString
bytes []
where
go :: ByteString -> [Page] -> IO [Page]
go ByteString
bs [Page]
acc =
if ByteString -> Bool
BS.null ByteString
bs
then [Page] -> IO [Page]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Page] -> [Page]
forall a. [a] -> [a]
reverse [Page]
acc)
else do
(Maybe Page
maybePage, ByteString
remaining) <- CompressionCodec -> ByteString -> IO (Maybe Page, ByteString)
readPage CompressionCodec
codec ByteString
bs
case Maybe Page
maybePage of
Maybe Page
Nothing -> [Page] -> IO [Page]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Page] -> [Page]
forall a. [a] -> [a]
reverse [Page]
acc)
Just Page
page -> ByteString -> [Page] -> IO [Page]
go ByteString
remaining (Page
page Page -> [Page] -> [Page]
forall a. a -> [a] -> [a]
: [Page]
acc)
readNInt32Vec :: Int -> BS.ByteString -> VU.Vector Int32
readNInt32Vec :: Int -> ByteString -> Vector Int32
readNInt32Vec Int
n ByteString
bs = Int -> (Int -> Int32) -> Vector Int32
forall a. Unbox a => Int -> (Int -> a) -> Vector a
VU.generate Int
n (\Int
i -> ByteString -> Int32
littleEndianInt32 (Int -> ByteString -> ByteString
BS.drop (Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
i) ByteString
bs))
readNInt64Vec :: Int -> BS.ByteString -> VU.Vector Int64
readNInt64Vec :: Int -> ByteString -> Vector Int64
readNInt64Vec Int
n ByteString
bs = Int -> (Int -> Int64) -> Vector Int64
forall a. Unbox a => Int -> (Int -> a) -> Vector a
VU.generate Int
n (\Int
i -> Word64 -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteString -> Word64
littleEndianWord64 (Int -> ByteString -> ByteString
BS.drop (Int
8 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
i) ByteString
bs)))
readNFloatVec :: Int -> BS.ByteString -> VU.Vector Float
readNFloatVec :: Int -> ByteString -> Vector Float
readNFloatVec Int
n ByteString
bs =
Int -> (Int -> Float) -> Vector Float
forall a. Unbox a => Int -> (Int -> a) -> Vector a
VU.generate
Int
n
(\Int
i -> Word32 -> Float
castWord32ToFloat (ByteString -> Word32
littleEndianWord32 (Int -> ByteString -> ByteString
BS.drop (Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
i) ByteString
bs)))
readNDoubleVec :: Int -> BS.ByteString -> VU.Vector Double
readNDoubleVec :: Int -> ByteString -> Vector Double
readNDoubleVec Int
n ByteString
bs =
Int -> (Int -> Double) -> Vector Double
forall a. Unbox a => Int -> (Int -> a) -> Vector a
VU.generate
Int
n
(\Int
i -> Word64 -> Double
castWord64ToDouble (ByteString -> Word64
littleEndianWord64 (Int -> ByteString -> ByteString
BS.drop (Int
8 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
i) ByteString
bs)))
readNInt32 :: Int -> BS.ByteString -> ([Int32], BS.ByteString)
readNInt32 :: Int -> ByteString -> ([Int32], ByteString)
readNInt32 Int
0 ByteString
bs = ([], ByteString
bs)
readNInt32 Int
k ByteString
bs =
let x :: Int32
x = ByteString -> Int32
littleEndianInt32 (Int -> ByteString -> ByteString
BS.take Int
4 ByteString
bs)
bs' :: ByteString
bs' = Int -> ByteString -> ByteString
BS.drop Int
4 ByteString
bs
([Int32]
xs, ByteString
rest) = Int -> ByteString -> ([Int32], ByteString)
readNInt32 (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) ByteString
bs'
in (Int32
x Int32 -> [Int32] -> [Int32]
forall a. a -> [a] -> [a]
: [Int32]
xs, ByteString
rest)
readNDouble :: Int -> BS.ByteString -> ([Double], BS.ByteString)
readNDouble :: Int -> ByteString -> ([Double], ByteString)
readNDouble Int
0 ByteString
bs = ([], ByteString
bs)
readNDouble Int
k ByteString
bs =
let x :: Double
x = Word64 -> Double
castWord64ToDouble (ByteString -> Word64
littleEndianWord64 (Int -> ByteString -> ByteString
BS.take Int
8 ByteString
bs))
bs' :: ByteString
bs' = Int -> ByteString -> ByteString
BS.drop Int
8 ByteString
bs
([Double]
xs, ByteString
rest) = Int -> ByteString -> ([Double], ByteString)
readNDouble (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) ByteString
bs'
in (Double
x Double -> [Double] -> [Double]
forall a. a -> [a] -> [a]
: [Double]
xs, ByteString
rest)
readNByteArrays :: Int -> BS.ByteString -> ([BS.ByteString], BS.ByteString)
readNByteArrays :: Int -> ByteString -> ([ByteString], ByteString)
readNByteArrays Int
0 ByteString
bs = ([], ByteString
bs)
readNByteArrays Int
k ByteString
bs =
let len :: Int
len = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteString -> Int32
littleEndianInt32 (Int -> ByteString -> ByteString
BS.take Int
4 ByteString
bs)) :: Int
body :: ByteString
body = Int -> ByteString -> ByteString
BS.take Int
len (Int -> ByteString -> ByteString
BS.drop Int
4 ByteString
bs)
bs' :: ByteString
bs' = Int -> ByteString -> ByteString
BS.drop (Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
len) ByteString
bs
([ByteString]
xs, ByteString
rest) = Int -> ByteString -> ([ByteString], ByteString)
readNByteArrays (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) ByteString
bs'
in (ByteString
body ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: [ByteString]
xs, ByteString
rest)
readNBool :: Int -> BS.ByteString -> ([Bool], BS.ByteString)
readNBool :: Int -> ByteString -> ([Bool], ByteString)
readNBool Int
0 ByteString
bs = ([], ByteString
bs)
readNBool Int
count ByteString
bs =
let totalBytes :: Int
totalBytes = (Int
count Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
7) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
8
chunk :: ByteString
chunk = Int -> ByteString -> ByteString
BS.take Int
totalBytes ByteString
bs
rest :: ByteString
rest = Int -> ByteString -> ByteString
BS.drop Int
totalBytes ByteString
bs
bits :: [Bool]
bits =
(Word8 -> [Bool]) -> [Word8] -> [Bool]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap
(\Word8
b -> (Int -> Bool) -> [Int] -> [Bool]
forall a b. (a -> b) -> [a] -> [b]
map (\Int
i -> (Word8
b Word8 -> Int -> Word8
forall a. Bits a => a -> Int -> a
`shiftR` Int
i) Word8 -> Word8 -> Word8
forall a. Bits a => a -> a -> a
.&. Word8
1 Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
1) [Int
0 .. Int
7])
(ByteString -> [Word8]
BS.unpack ByteString
chunk)
bools :: [Bool]
bools = Int -> [Bool] -> [Bool]
forall a. Int -> [a] -> [a]
take Int
count [Bool]
bits
in ([Bool]
bools, ByteString
rest)
readNInt64 :: Int -> BS.ByteString -> ([Int64], BS.ByteString)
readNInt64 :: Int -> ByteString -> ([Int64], ByteString)
readNInt64 Int
0 ByteString
bs = ([], ByteString
bs)
readNInt64 Int
k ByteString
bs =
let x :: Int64
x = Word64 -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteString -> Word64
littleEndianWord64 (Int -> ByteString -> ByteString
BS.take Int
8 ByteString
bs))
bs' :: ByteString
bs' = Int -> ByteString -> ByteString
BS.drop Int
8 ByteString
bs
([Int64]
xs, ByteString
rest) = Int -> ByteString -> ([Int64], ByteString)
readNInt64 (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) ByteString
bs'
in (Int64
x Int64 -> [Int64] -> [Int64]
forall a. a -> [a] -> [a]
: [Int64]
xs, ByteString
rest)
readNFloat :: Int -> BS.ByteString -> ([Float], BS.ByteString)
readNFloat :: Int -> ByteString -> ([Float], ByteString)
readNFloat Int
0 ByteString
bs = ([], ByteString
bs)
readNFloat Int
k ByteString
bs =
let x :: Float
x = Word32 -> Float
castWord32ToFloat (ByteString -> Word32
littleEndianWord32 (Int -> ByteString -> ByteString
BS.take Int
4 ByteString
bs))
bs' :: ByteString
bs' = Int -> ByteString -> ByteString
BS.drop Int
4 ByteString
bs
([Float]
xs, ByteString
rest) = Int -> ByteString -> ([Float], ByteString)
readNFloat (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) ByteString
bs'
in (Float
x Float -> [Float] -> [Float]
forall a. a -> [a] -> [a]
: [Float]
xs, ByteString
rest)
splitFixed :: Int -> Int -> BS.ByteString -> ([BS.ByteString], BS.ByteString)
splitFixed :: Int -> Int -> ByteString -> ([ByteString], ByteString)
splitFixed Int
0 Int
_ ByteString
bs = ([], ByteString
bs)
splitFixed Int
k Int
len ByteString
bs =
let body :: ByteString
body = Int -> ByteString -> ByteString
BS.take Int
len ByteString
bs
bs' :: ByteString
bs' = Int -> ByteString -> ByteString
BS.drop Int
len ByteString
bs
([ByteString]
xs, ByteString
rest) = Int -> Int -> ByteString -> ([ByteString], ByteString)
splitFixed (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) Int
len ByteString
bs'
in (ByteString
body ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: [ByteString]
xs, ByteString
rest)
readStatisticsFromBytes ::
ColumnStatistics -> BS.ByteString -> Int16 -> (ColumnStatistics, BS.ByteString)
readStatisticsFromBytes :: ColumnStatistics
-> ByteString -> Int16 -> (ColumnStatistics, ByteString)
readStatisticsFromBytes ColumnStatistics
cs ByteString
xs Int16
lastFieldId =
let
fieldContents :: Maybe (ByteString, TType, Int16)
fieldContents = ByteString -> Int16 -> Maybe (ByteString, TType, Int16)
readField' ByteString
xs Int16
lastFieldId
in
case Maybe (ByteString, TType, Int16)
fieldContents of
Maybe (ByteString, TType, Int16)
Nothing -> (ColumnStatistics
cs, Int -> ByteString -> ByteString
BS.drop Int
1 ByteString
xs)
Just (ByteString
rem, TType
elemType, Int16
identifier) -> case Int16
identifier of
Int16
1 ->
let
(ByteString
maxInBytes, ByteString
rem') = ByteString -> (ByteString, ByteString)
readByteStringFromBytes ByteString
rem
in
ColumnStatistics
-> ByteString -> Int16 -> (ColumnStatistics, ByteString)
readStatisticsFromBytes (ColumnStatistics
cs{columnMax = maxInBytes}) ByteString
rem' Int16
identifier
Int16
2 ->
let
(ByteString
minInBytes, ByteString
rem') = ByteString -> (ByteString, ByteString)
readByteStringFromBytes ByteString
rem
in
ColumnStatistics
-> ByteString -> Int16 -> (ColumnStatistics, ByteString)
readStatisticsFromBytes (ColumnStatistics
cs{columnMin = minInBytes}) ByteString
rem' Int16
identifier
Int16
3 ->
let
(Int64
nullCount, ByteString
rem') = forall a. Integral a => ByteString -> (a, ByteString)
readIntFromBytes @Int64 ByteString
rem
in
ColumnStatistics
-> ByteString -> Int16 -> (ColumnStatistics, ByteString)
readStatisticsFromBytes (ColumnStatistics
cs{columnNullCount = nullCount}) ByteString
rem' Int16
identifier
Int16
4 ->
let
(Int64
distinctCount, ByteString
rem') = forall a. Integral a => ByteString -> (a, ByteString)
readIntFromBytes @Int64 ByteString
rem
in
ColumnStatistics
-> ByteString -> Int16 -> (ColumnStatistics, ByteString)
readStatisticsFromBytes (ColumnStatistics
cs{columnDistictCount = distinctCount}) ByteString
rem' Int16
identifier
Int16
5 ->
let
(ByteString
maxInBytes, ByteString
rem') = ByteString -> (ByteString, ByteString)
readByteStringFromBytes ByteString
rem
in
ColumnStatistics
-> ByteString -> Int16 -> (ColumnStatistics, ByteString)
readStatisticsFromBytes (ColumnStatistics
cs{columnMaxValue = maxInBytes}) ByteString
rem' Int16
identifier
Int16
6 ->
let
(ByteString
minInBytes, ByteString
rem') = ByteString -> (ByteString, ByteString)
readByteStringFromBytes ByteString
rem
in
ColumnStatistics
-> ByteString -> Int16 -> (ColumnStatistics, ByteString)
readStatisticsFromBytes (ColumnStatistics
cs{columnMinValue = minInBytes}) ByteString
rem' Int16
identifier
Int16
7 ->
case ByteString -> Maybe (Word8, ByteString)
BS.uncons ByteString
rem of
Maybe (Word8, ByteString)
Nothing ->
String -> (ColumnStatistics, ByteString)
forall a. HasCallStack => String -> a
error String
"readStatisticsFromBytes: not enough bytes"
Just (Word8
isMaxValueExact, ByteString
rem') ->
ColumnStatistics
-> ByteString -> Int16 -> (ColumnStatistics, ByteString)
readStatisticsFromBytes
(ColumnStatistics
cs{isColumnMaxValueExact = isMaxValueExact == compactBooleanTrue})
ByteString
rem'
Int16
identifier
Int16
8 ->
case ByteString -> Maybe (Word8, ByteString)
BS.uncons ByteString
rem of
Maybe (Word8, ByteString)
Nothing ->
String -> (ColumnStatistics, ByteString)
forall a. HasCallStack => String -> a
error String
"readStatisticsFromBytes: not enough bytes"
Just (Word8
isMinValueExact, ByteString
rem') ->
ColumnStatistics
-> ByteString -> Int16 -> (ColumnStatistics, ByteString)
readStatisticsFromBytes
(ColumnStatistics
cs{isColumnMinValueExact = isMinValueExact == compactBooleanTrue})
ByteString
rem'
Int16
identifier
Int16
n -> String -> (ColumnStatistics, ByteString)
forall a. HasCallStack => String -> a
error (String -> (ColumnStatistics, ByteString))
-> String -> (ColumnStatistics, ByteString)
forall a b. (a -> b) -> a -> b
$ Int16 -> String
forall a. Show a => a -> String
show Int16
n