{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}

module DataFrame.IO.Parquet (
    readParquet,
) where

import Control.Monad
import qualified Data.ByteString as BSO
import Data.Char
import Data.Foldable
import Data.IORef
import Data.List
import qualified Data.Map as M
import Data.Maybe
import qualified Data.Text as T
import qualified DataFrame.Internal.Column as DI
import DataFrame.Internal.DataFrame (DataFrame)
import qualified DataFrame.Internal.DataFrame as DI
import qualified DataFrame.Operations.Core as DI
import Foreign
import System.IO

import DataFrame.IO.Parquet.Binary
import DataFrame.IO.Parquet.Dictionary
import DataFrame.IO.Parquet.Levels
import DataFrame.IO.Parquet.Page
import DataFrame.IO.Parquet.Thrift
import DataFrame.IO.Parquet.Types

{- | Read a parquet file from path and load it into a dataframe.

==== __Example__
@
ghci> D.readParquet "./data/mtcars.parquet" df

@
-}
readParquet :: String -> IO DataFrame
readParquet :: [Char] -> IO DataFrame
readParquet [Char]
path = [Char] -> IOMode -> (Handle -> IO DataFrame) -> IO DataFrame
forall r. [Char] -> IOMode -> (Handle -> IO r) -> IO r
withBinaryFile [Char]
path IOMode
ReadMode ((Handle -> IO DataFrame) -> IO DataFrame)
-> (Handle -> IO DataFrame) -> IO DataFrame
forall a b. (a -> b) -> a -> b
$ \Handle
handle -> do
    (Integer
size, ByteString
magicString) <- Handle -> IO (Integer, ByteString)
readMetadataSizeFromFooter Handle
handle
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString
magicString ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= ByteString
"PAR1") (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Invalid Parquet file"

    FileMetadata
fileMetadata <- Handle -> Integer -> IO FileMetadata
readMetadata Handle
handle Integer
size

    let columnPaths :: [(Text, Int)]
columnPaths = [SchemaElement] -> [(Text, Int)]
getColumnPaths (Int -> [SchemaElement] -> [SchemaElement]
forall a. Int -> [a] -> [a]
drop Int
1 ([SchemaElement] -> [SchemaElement])
-> [SchemaElement] -> [SchemaElement]
forall a b. (a -> b) -> a -> b
$ FileMetadata -> [SchemaElement]
schema FileMetadata
fileMetadata)
    let columnNames :: [Text]
columnNames = ((Text, Int) -> Text) -> [(Text, Int)] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (Text, Int) -> Text
forall a b. (a, b) -> a
fst [(Text, Int)]
columnPaths

    IORef (Map Text Column)
colMap <- Map Text Column -> IO (IORef (Map Text Column))
forall a. a -> IO (IORef a)
newIORef (Map Text Column
forall k a. Map k a
M.empty :: M.Map T.Text DI.Column)

    let schemaElements :: [SchemaElement]
schemaElements = FileMetadata -> [SchemaElement]
schema FileMetadata
fileMetadata
    let getTypeLength :: [String] -> Maybe Int32
        getTypeLength :: [[Char]] -> Maybe Int32
getTypeLength [[Char]]
path = [SchemaElement] -> [[Char]] -> Integer -> Maybe Int32
forall {t}.
Num t =>
[SchemaElement] -> [[Char]] -> t -> Maybe Int32
findTypeLength [SchemaElement]
schemaElements [[Char]]
path Integer
0
          where
            findTypeLength :: [SchemaElement] -> [[Char]] -> t -> Maybe Int32
findTypeLength [] [[Char]]
_ t
_ = Maybe Int32
forall a. Maybe a
Nothing
            findTypeLength (SchemaElement
s : [SchemaElement]
ss) [[Char]]
targetPath t
depth
                | (Text -> [Char]) -> [Text] -> [[Char]]
forall a b. (a -> b) -> [a] -> [b]
map Text -> [Char]
T.unpack (SchemaElement -> [SchemaElement] -> t -> [Text]
forall {p} {p} {p} {a}. p -> p -> p -> [a]
pathToElement SchemaElement
s [SchemaElement]
ss t
depth) [[Char]] -> [[Char]] -> Bool
forall a. Eq a => a -> a -> Bool
== [[Char]]
targetPath
                    Bool -> Bool -> Bool
&& SchemaElement -> TType
elementType SchemaElement
s TType -> TType -> Bool
forall a. Eq a => a -> a -> Bool
== TType
STRING
                    Bool -> Bool -> Bool
&& SchemaElement -> Int32
typeLength SchemaElement
s Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
> Int32
0 =
                    Int32 -> Maybe Int32
forall a. a -> Maybe a
Just (SchemaElement -> Int32
typeLength SchemaElement
s)
                | Bool
otherwise = [SchemaElement] -> [[Char]] -> t -> Maybe Int32
findTypeLength [SchemaElement]
ss [[Char]]
targetPath (if SchemaElement -> Int32
numChildren SchemaElement
s Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
> Int32
0 then t
depth t -> t -> t
forall a. Num a => a -> a -> a
+ t
1 else t
depth)

            pathToElement :: p -> p -> p -> [a]
pathToElement p
_ p
_ p
_ = []

    [RowGroup] -> (RowGroup -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (FileMetadata -> [RowGroup]
rowGroups FileMetadata
fileMetadata) ((RowGroup -> IO ()) -> IO ()) -> (RowGroup -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \RowGroup
rowGroup -> do
        [(ColumnChunk, Integer)]
-> ((ColumnChunk, Integer) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([ColumnChunk] -> [Integer] -> [(ColumnChunk, Integer)]
forall a b. [a] -> [b] -> [(a, b)]
zip (RowGroup -> [ColumnChunk]
rowGroupColumns RowGroup
rowGroup) [Integer
0 ..]) (((ColumnChunk, Integer) -> IO ()) -> IO ())
-> ((ColumnChunk, Integer) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(ColumnChunk
colChunk, Integer
colIdx) -> do
            let metadata :: ColumnMetaData
metadata = ColumnChunk -> ColumnMetaData
columnMetaData ColumnChunk
colChunk
            let colPath :: [[Char]]
colPath = ColumnMetaData -> [[Char]]
columnPathInSchema ColumnMetaData
metadata
            let colName :: Text
colName =
                    if [[Char]] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [[Char]]
colPath
                        then [Char] -> Text
T.pack ([Char] -> Text) -> [Char] -> Text
forall a b. (a -> b) -> a -> b
$ [Char]
"col_" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Integer -> [Char]
forall a. Show a => a -> [Char]
show Integer
colIdx
                        else [Char] -> Text
T.pack ([Char] -> Text) -> [Char] -> Text
forall a b. (a -> b) -> a -> b
$ [[Char]] -> [Char]
forall a. HasCallStack => [a] -> a
last [[Char]]
colPath

            let colDataPageOffset :: Int64
colDataPageOffset = ColumnMetaData -> Int64
columnDataPageOffset ColumnMetaData
metadata
            let colDictionaryPageOffset :: Int64
colDictionaryPageOffset = ColumnMetaData -> Int64
columnDictionaryPageOffset ColumnMetaData
metadata
            let colStart :: Int64
colStart =
                    if Int64
colDictionaryPageOffset Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
0 Bool -> Bool -> Bool
&& Int64
colDataPageOffset Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
colDictionaryPageOffset
                        then Int64
colDictionaryPageOffset
                        else Int64
colDataPageOffset
            let colLength :: Int64
colLength = ColumnMetaData -> Int64
columnTotalCompressedSize ColumnMetaData
metadata

            [Word8]
columnBytes <- Handle -> Int64 -> Int64 -> IO [Word8]
readBytes Handle
handle Int64
colStart Int64
colLength

            [Page]
pages <- CompressionCodec -> [Word8] -> IO [Page]
readAllPages (ColumnMetaData -> CompressionCodec
columnCodec ColumnMetaData
metadata) [Word8]
columnBytes

            let maybeTypeLength :: Maybe Int32
maybeTypeLength =
                    if ColumnMetaData -> ParquetType
columnType ColumnMetaData
metadata ParquetType -> ParquetType -> Bool
forall a. Eq a => a -> a -> Bool
== ParquetType
PFIXED_LEN_BYTE_ARRAY
                        then [[Char]] -> Maybe Int32
getTypeLength [[Char]]
colPath
                        else Maybe Int32
forall a. Maybe a
Nothing

            let primaryEncoding :: ParquetEncoding
primaryEncoding = ParquetEncoding -> Maybe ParquetEncoding -> ParquetEncoding
forall a. a -> Maybe a -> a
fromMaybe ParquetEncoding
EPLAIN (((ParquetEncoding, [ParquetEncoding]) -> ParquetEncoding)
-> Maybe (ParquetEncoding, [ParquetEncoding])
-> Maybe ParquetEncoding
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (ParquetEncoding, [ParquetEncoding]) -> ParquetEncoding
forall a b. (a, b) -> a
fst ([ParquetEncoding] -> Maybe (ParquetEncoding, [ParquetEncoding])
forall a. [a] -> Maybe (a, [a])
uncons (ColumnMetaData -> [ParquetEncoding]
columnEncodings ColumnMetaData
metadata)))

            let schemaTail :: [SchemaElement]
schemaTail = Int -> [SchemaElement] -> [SchemaElement]
forall a. Int -> [a] -> [a]
drop Int
1 (FileMetadata -> [SchemaElement]
schema FileMetadata
fileMetadata)
            let colPath :: [[Char]]
colPath = ColumnMetaData -> [[Char]]
columnPathInSchema (ColumnChunk -> ColumnMetaData
columnMetaData ColumnChunk
colChunk)
            let (Int
maxDef, Int
maxRep) = [SchemaElement] -> [[Char]] -> (Int, Int)
levelsForPath [SchemaElement]
schemaTail [[Char]]
colPath
            Column
column <- (Int, Int)
-> [Page]
-> ParquetType
-> ParquetEncoding
-> Maybe Int32
-> IO Column
processColumnPages (Int
maxDef, Int
maxRep) [Page]
pages (ColumnMetaData -> ParquetType
columnType ColumnMetaData
metadata) ParquetEncoding
primaryEncoding Maybe Int32
maybeTypeLength

            IORef (Map Text Column)
-> (Map Text Column -> Map Text Column) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef (Map Text Column)
colMap (Text -> Column -> Map Text Column -> Map Text Column
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert Text
colName Column
column)

    Map Text Column
finalColMap <- IORef (Map Text Column) -> IO (Map Text Column)
forall a. IORef a -> IO a
readIORef IORef (Map Text Column)
colMap
    let orderedColumns :: [(Text, Column)]
orderedColumns =
            (Text -> (Text, Column)) -> [Text] -> [(Text, Column)]
forall a b. (a -> b) -> [a] -> [b]
map
                (\Text
name -> (Text
name, Map Text Column
finalColMap Map Text Column -> Text -> Column
forall k a. Ord k => Map k a -> k -> a
M.! Text
name))
                ((Text -> Bool) -> [Text] -> [Text]
forall a. (a -> Bool) -> [a] -> [a]
filter (Text -> Map Text Column -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`M.member` Map Text Column
finalColMap) [Text]
columnNames)

    DataFrame -> IO DataFrame
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DataFrame -> IO DataFrame) -> DataFrame -> IO DataFrame
forall a b. (a -> b) -> a -> b
$ [(Text, Column)] -> DataFrame
DI.fromNamedColumns [(Text, Column)]
orderedColumns

readMetadataSizeFromFooter :: Handle -> IO (Integer, BSO.ByteString)
readMetadataSizeFromFooter :: Handle -> IO (Integer, ByteString)
readMetadataSizeFromFooter Handle
handle = do
    Integer
footerOffSet <- Handle -> IO Integer
numBytesInFile Handle
handle
    Ptr Word8
buf <- Int -> IO (Ptr Word8)
forall a. Int -> IO (Ptr a)
mallocBytes Int
8 :: IO (Ptr Word8)
    Handle -> SeekMode -> Integer -> IO ()
hSeek Handle
handle SeekMode
AbsoluteSeek (Integer -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Integer -> Integer) -> Integer -> Integer
forall a b. (a -> b) -> a -> b
$ Integer
footerOffSet Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
8)
    Int
_ <- Handle -> Ptr Word8 -> Int -> IO Int
forall a. Handle -> Ptr a -> Int -> IO Int
hGetBuf Handle
handle Ptr Word8
buf Int
8

    [Int32]
sizeBytes <- (Int -> IO Int32) -> [Int] -> IO [Int32]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (\Int
i -> Word8 -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word8 -> Int32) -> IO Word8 -> IO Int32
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr Word8 -> Int -> IO Word8
forall a. Storable a => Ptr a -> Int -> IO a
peekElemOff Ptr Word8
buf Int
i :: IO Word8) :: IO Int32) [Int
0 .. Int
3]
    let size :: Integer
size = Int32 -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32 -> Integer) -> Int32 -> Integer
forall a b. (a -> b) -> a -> b
$ (Int32 -> Int32 -> Int32) -> Int32 -> [Int32] -> Int32
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' Int32 -> Int32 -> Int32
forall a. Bits a => a -> a -> a
(.|.) Int32
0 ([Int32] -> Int32) -> [Int32] -> Int32
forall a b. (a -> b) -> a -> b
$ (Int32 -> Int -> Int32) -> [Int32] -> [Int] -> [Int32]
forall a b c. (a -> b -> c) -> [a] -> [b] -> [c]
zipWith Int32 -> Int -> Int32
forall a. Bits a => a -> Int -> a
shift [Int32]
sizeBytes [Int
0, Int
8, Int
16, Int
24]

    [Word8]
magicStringBytes <- (Int -> IO Word8) -> [Int] -> IO [Word8]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (\Int
i -> Ptr Word8 -> Int -> IO Word8
forall a. Storable a => Ptr a -> Int -> IO a
peekElemOff Ptr Word8
buf Int
i :: IO Word8) [Int
4 .. Int
7]
    let magicString :: ByteString
magicString = [Word8] -> ByteString
BSO.pack [Word8]
magicStringBytes
    Ptr Word8 -> IO ()
forall a. Ptr a -> IO ()
free Ptr Word8
buf
    (Integer, ByteString) -> IO (Integer, ByteString)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Integer
size, ByteString
magicString)

getColumnPaths :: [SchemaElement] -> [(T.Text, Int)]
getColumnPaths :: [SchemaElement] -> [(Text, Int)]
getColumnPaths [SchemaElement]
schema = [SchemaElement] -> Int -> [Text] -> [(Text, Int)]
extractLeafPaths [SchemaElement]
schema Int
0 []
  where
    extractLeafPaths :: [SchemaElement] -> Int -> [T.Text] -> [(T.Text, Int)]
    extractLeafPaths :: [SchemaElement] -> Int -> [Text] -> [(Text, Int)]
extractLeafPaths [] Int
_ [Text]
_ = []
    extractLeafPaths (SchemaElement
s : [SchemaElement]
ss) Int
idx [Text]
path
        | SchemaElement -> Int32
numChildren SchemaElement
s Int32 -> Int32 -> Bool
forall a. Eq a => a -> a -> Bool
== Int32
0 =
            let fullPath :: Text
fullPath = Text -> [Text] -> Text
T.intercalate Text
"." ([Text]
path [Text] -> [Text] -> [Text]
forall a. [a] -> [a] -> [a]
++ [SchemaElement -> Text
elementName SchemaElement
s])
             in (Text
fullPath, Int
idx) (Text, Int) -> [(Text, Int)] -> [(Text, Int)]
forall a. a -> [a] -> [a]
: [SchemaElement] -> Int -> [Text] -> [(Text, Int)]
extractLeafPaths [SchemaElement]
ss (Int
idx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) [Text]
path
        | Bool
otherwise =
            let newPath :: [Text]
newPath = if Text -> Bool
T.null (SchemaElement -> Text
elementName SchemaElement
s) then [Text]
path else [Text]
path [Text] -> [Text] -> [Text]
forall a. [a] -> [a] -> [a]
++ [SchemaElement -> Text
elementName SchemaElement
s]
                childrenCount :: Int
childrenCount = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (SchemaElement -> Int32
numChildren SchemaElement
s)
                ([SchemaElement]
children, [SchemaElement]
remaining) = Int -> [SchemaElement] -> ([SchemaElement], [SchemaElement])
forall a. Int -> [a] -> ([a], [a])
splitAt Int
childrenCount [SchemaElement]
ss
                childResults :: [(Text, Int)]
childResults = [SchemaElement] -> Int -> [Text] -> [(Text, Int)]
extractLeafPaths [SchemaElement]
children Int
idx [Text]
newPath
             in [(Text, Int)]
childResults [(Text, Int)] -> [(Text, Int)] -> [(Text, Int)]
forall a. [a] -> [a] -> [a]
++ [SchemaElement] -> Int -> [Text] -> [(Text, Int)]
extractLeafPaths [SchemaElement]
remaining (Int
idx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ [(Text, Int)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(Text, Int)]
childResults) [Text]
path

processColumnPages :: (Int, Int) -> [Page] -> ParquetType -> ParquetEncoding -> Maybe Int32 -> IO DI.Column
processColumnPages :: (Int, Int)
-> [Page]
-> ParquetType
-> ParquetEncoding
-> Maybe Int32
-> IO Column
processColumnPages (Int
maxDef, Int
maxRep) [Page]
pages ParquetType
pType ParquetEncoding
_ Maybe Int32
maybeTypeLength = do
    let dictPages :: [Page]
dictPages = (Page -> Bool) -> [Page] -> [Page]
forall a. (a -> Bool) -> [a] -> [a]
filter Page -> Bool
isDictionaryPage [Page]
pages
    let dataPages :: [Page]
dataPages = (Page -> Bool) -> [Page] -> [Page]
forall a. (a -> Bool) -> [a] -> [a]
filter Page -> Bool
isDataPage [Page]
pages

    let dictValsM :: Maybe DictVals
dictValsM =
            case [Page]
dictPages of
                [] -> Maybe DictVals
forall a. Maybe a
Nothing
                (Page
dictPage : [Page]
_) ->
                    case PageHeader -> PageTypeHeader
pageTypeHeader (Page -> PageHeader
pageHeader Page
dictPage) of
                        DictionaryPageHeader{Bool
Int32
ParquetEncoding
dictionaryPageHeaderNumValues :: Int32
dictionaryPageHeaderEncoding :: ParquetEncoding
dictionaryPageIsSorted :: Bool
dictionaryPageIsSorted :: PageTypeHeader -> Bool
dictionaryPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dictionaryPageHeaderNumValues :: PageTypeHeader -> Int32
..} ->
                            let countForBools :: Maybe Int32
countForBools =
                                    if ParquetType
pType ParquetType -> ParquetType -> Bool
forall a. Eq a => a -> a -> Bool
== ParquetType
PBOOLEAN
                                        then [Char] -> (Any -> Maybe Any) -> Int32 -> Maybe Int32
forall a. HasCallStack => [Char] -> a
error [Char]
"is bool" Any -> Maybe Any
forall a. a -> Maybe a
Just Int32
dictionaryPageHeaderNumValues
                                        else Maybe Int32
maybeTypeLength
                             in DictVals -> Maybe DictVals
forall a. a -> Maybe a
Just (ParquetType -> [Word8] -> Maybe Int32 -> DictVals
readDictVals ParquetType
pType (Page -> [Word8]
pageBytes Page
dictPage) Maybe Int32
countForBools)
                        PageTypeHeader
_ -> Maybe DictVals
forall a. Maybe a
Nothing

    [Column]
cols <- [Page] -> (Page -> IO Column) -> IO [Column]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Page]
dataPages ((Page -> IO Column) -> IO [Column])
-> (Page -> IO Column) -> IO [Column]
forall a b. (a -> b) -> a -> b
$ \Page
page -> do
        case PageHeader -> PageTypeHeader
pageTypeHeader (Page -> PageHeader
pageHeader Page
page) of
            DataPageHeader{Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderNumValues :: Int32
dataPageHeaderEncoding :: ParquetEncoding
definitionLevelEncoding :: ParquetEncoding
repetitionLevelEncoding :: ParquetEncoding
dataPageHeaderStatistics :: ColumnStatistics
dataPageHeaderStatistics :: PageTypeHeader -> ColumnStatistics
repetitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
definitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderNumValues :: PageTypeHeader -> Int32
..} -> do
                let n :: Int
n = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
dataPageHeaderNumValues
                let bs0 :: [Word8]
bs0 = Page -> [Word8]
pageBytes Page
page
                let ([Int]
defLvls, [Int]
_repLvls, [Word8]
afterLvls) = Int -> Int -> Int -> [Word8] -> ([Int], [Int], [Word8])
readLevelsV1 Int
n Int
maxDef Int
maxRep [Word8]
bs0
                let nPresent :: Int
nPresent = [Int] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ((Int -> Bool) -> [Int] -> [Int]
forall a. (a -> Bool) -> [a] -> [a]
filter (Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
maxDef) [Int]
defLvls)

                case ParquetEncoding
dataPageHeaderEncoding of
                    ParquetEncoding
EPLAIN ->
                        case ParquetType
pType of
                            ParquetType
PBOOLEAN ->
                                let ([Bool]
vals, [Word8]
_) = Int -> [Word8] -> ([Bool], [Word8])
readNBool Int
nPresent [Word8]
afterLvls
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Bool] -> Column
toMaybeBool Int
maxDef [Int]
defLvls [Bool]
vals)
                            ParquetType
PINT32 ->
                                let ([Int32]
vals, [Word8]
_) = Int -> [Word8] -> ([Int32], [Word8])
readNInt32 Int
nPresent [Word8]
afterLvls
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Int32] -> Column
toMaybeInt32 Int
maxDef [Int]
defLvls [Int32]
vals)
                            ParquetType
PINT64 ->
                                let ([Int64]
vals, [Word8]
_) = Int -> [Word8] -> ([Int64], [Word8])
readNInt64 Int
nPresent [Word8]
afterLvls
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Int64] -> Column
toMaybeInt64 Int
maxDef [Int]
defLvls [Int64]
vals)
                            ParquetType
PINT96 ->
                                let ([UTCTime]
vals, [Word8]
_) = Int -> [Word8] -> ([UTCTime], [Word8])
readNInt96Times Int
nPresent [Word8]
afterLvls
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [UTCTime] -> Column
toMaybeUTCTime Int
maxDef [Int]
defLvls [UTCTime]
vals)
                            ParquetType
PFLOAT ->
                                let ([Float]
vals, [Word8]
_) = Int -> [Word8] -> ([Float], [Word8])
readNFloat Int
nPresent [Word8]
afterLvls
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Float] -> Column
toMaybeFloat Int
maxDef [Int]
defLvls [Float]
vals)
                            ParquetType
PDOUBLE ->
                                let ([Double]
vals, [Word8]
_) = Int -> [Word8] -> ([Double], [Word8])
readNDouble Int
nPresent [Word8]
afterLvls
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Double] -> Column
toMaybeDouble Int
maxDef [Int]
defLvls [Double]
vals)
                            ParquetType
PBYTE_ARRAY ->
                                let ([[Word8]]
raws, [Word8]
_) = Int -> [Word8] -> ([[Word8]], [Word8])
readNByteArrays Int
nPresent [Word8]
afterLvls
                                    texts :: [Text]
texts = ([Word8] -> Text) -> [[Word8]] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map ([Char] -> Text
T.pack ([Char] -> Text) -> ([Word8] -> [Char]) -> [Word8] -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Word8 -> Char) -> [Word8] -> [Char]
forall a b. (a -> b) -> [a] -> [b]
map (Int -> Char
chr (Int -> Char) -> (Word8 -> Int) -> Word8 -> Char
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word8 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral)) [[Word8]]
raws
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Text] -> Column
toMaybeText Int
maxDef [Int]
defLvls [Text]
texts)
                            ParquetType
PFIXED_LEN_BYTE_ARRAY ->
                                case Maybe Int32
maybeTypeLength of
                                    Just Int32
len ->
                                        let ([[Word8]]
raws, [Word8]
_) = Int -> Int -> [Word8] -> ([[Word8]], [Word8])
splitFixed Int
nPresent (Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
len) [Word8]
afterLvls
                                            texts :: [Text]
texts = ([Word8] -> Text) -> [[Word8]] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map ([Char] -> Text
T.pack ([Char] -> Text) -> ([Word8] -> [Char]) -> [Word8] -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Word8 -> Char) -> [Word8] -> [Char]
forall a b. (a -> b) -> [a] -> [b]
map (Int -> Char
chr (Int -> Char) -> (Word8 -> Int) -> Word8 -> Char
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word8 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral)) [[Word8]]
raws
                                         in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Text] -> Column
toMaybeText Int
maxDef [Int]
defLvls [Text]
texts)
                                    Maybe Int32
Nothing -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"FIXED_LEN_BYTE_ARRAY requires type length"
                            ParquetType
PARQUET_TYPE_UNKNOWN -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"Cannot read unknown Parquet type"
                    ParquetEncoding
ERLE_DICTIONARY -> Maybe DictVals -> Int -> [Int] -> Int -> [Word8] -> IO Column
decodeDictV1 Maybe DictVals
dictValsM Int
maxDef [Int]
defLvls Int
nPresent [Word8]
afterLvls
                    ParquetEncoding
EPLAIN_DICTIONARY -> Maybe DictVals -> Int -> [Int] -> Int -> [Word8] -> IO Column
decodeDictV1 Maybe DictVals
dictValsM Int
maxDef [Int]
defLvls Int
nPresent [Word8]
afterLvls
                    ParquetEncoding
other -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error ([Char]
"Unsupported v1 encoding: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ ParquetEncoding -> [Char]
forall a. Show a => a -> [Char]
show ParquetEncoding
other)
            DataPageHeaderV2{Bool
Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderV2NumValues :: Int32
dataPageHeaderV2NumNulls :: Int32
dataPageHeaderV2NumRows :: Int32
dataPageHeaderV2Encoding :: ParquetEncoding
definitionLevelByteLength :: Int32
repetitionLevelByteLength :: Int32
dataPageHeaderV2IsCompressed :: Bool
dataPageHeaderV2Statistics :: ColumnStatistics
dataPageHeaderV2Statistics :: PageTypeHeader -> ColumnStatistics
dataPageHeaderV2IsCompressed :: PageTypeHeader -> Bool
repetitionLevelByteLength :: PageTypeHeader -> Int32
definitionLevelByteLength :: PageTypeHeader -> Int32
dataPageHeaderV2Encoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderV2NumRows :: PageTypeHeader -> Int32
dataPageHeaderV2NumNulls :: PageTypeHeader -> Int32
dataPageHeaderV2NumValues :: PageTypeHeader -> Int32
..} -> do
                let n :: Int
n = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
dataPageHeaderV2NumValues
                let bs0 :: [Word8]
bs0 = Page -> [Word8]
pageBytes Page
page
                let ([Int]
defLvls, [Int]
_repLvls, [Word8]
afterLvls) =
                        Int
-> Int
-> Int
-> Int32
-> Int32
-> [Word8]
-> ([Int], [Int], [Word8])
readLevelsV2 Int
n Int
maxDef Int
maxRep Int32
definitionLevelByteLength Int32
repetitionLevelByteLength [Word8]
bs0
                let nPresent :: Int
nPresent =
                        if Int32
dataPageHeaderV2NumNulls Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
> Int32
0
                            then Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32
dataPageHeaderV2NumValues Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
- Int32
dataPageHeaderV2NumNulls)
                            else [Int] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ((Int -> Bool) -> [Int] -> [Int]
forall a. (a -> Bool) -> [a] -> [a]
filter (Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
maxDef) [Int]
defLvls)

                case ParquetEncoding
dataPageHeaderV2Encoding of
                    ParquetEncoding
EPLAIN ->
                        case ParquetType
pType of
                            ParquetType
PBOOLEAN ->
                                let ([Bool]
vals, [Word8]
_) = Int -> [Word8] -> ([Bool], [Word8])
readNBool Int
nPresent [Word8]
afterLvls
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Bool] -> Column
toMaybeBool Int
maxDef [Int]
defLvls [Bool]
vals)
                            ParquetType
PINT32 ->
                                let ([Int32]
vals, [Word8]
_) = Int -> [Word8] -> ([Int32], [Word8])
readNInt32 Int
nPresent [Word8]
afterLvls
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Int32] -> Column
toMaybeInt32 Int
maxDef [Int]
defLvls [Int32]
vals)
                            ParquetType
PINT64 ->
                                let ([Int64]
vals, [Word8]
_) = Int -> [Word8] -> ([Int64], [Word8])
readNInt64 Int
nPresent [Word8]
afterLvls
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Int64] -> Column
toMaybeInt64 Int
maxDef [Int]
defLvls [Int64]
vals)
                            ParquetType
PINT96 ->
                                let ([UTCTime]
vals, [Word8]
_) = Int -> [Word8] -> ([UTCTime], [Word8])
readNInt96Times Int
nPresent [Word8]
afterLvls
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [UTCTime] -> Column
toMaybeUTCTime Int
maxDef [Int]
defLvls [UTCTime]
vals)
                            ParquetType
PFLOAT ->
                                let ([Float]
vals, [Word8]
_) = Int -> [Word8] -> ([Float], [Word8])
readNFloat Int
nPresent [Word8]
afterLvls
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Float] -> Column
toMaybeFloat Int
maxDef [Int]
defLvls [Float]
vals)
                            ParquetType
PDOUBLE ->
                                let ([Double]
vals, [Word8]
_) = Int -> [Word8] -> ([Double], [Word8])
readNDouble Int
nPresent [Word8]
afterLvls
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Double] -> Column
toMaybeDouble Int
maxDef [Int]
defLvls [Double]
vals)
                            ParquetType
PBYTE_ARRAY ->
                                let ([[Word8]]
raws, [Word8]
_) = Int -> [Word8] -> ([[Word8]], [Word8])
readNByteArrays Int
nPresent [Word8]
afterLvls
                                    texts :: [Text]
texts = ([Word8] -> Text) -> [[Word8]] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map ([Char] -> Text
T.pack ([Char] -> Text) -> ([Word8] -> [Char]) -> [Word8] -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Word8 -> Char) -> [Word8] -> [Char]
forall a b. (a -> b) -> [a] -> [b]
map (Int -> Char
chr (Int -> Char) -> (Word8 -> Int) -> Word8 -> Char
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word8 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral)) [[Word8]]
raws
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Text] -> Column
toMaybeText Int
maxDef [Int]
defLvls [Text]
texts)
                            ParquetType
PFIXED_LEN_BYTE_ARRAY ->
                                case Maybe Int32
maybeTypeLength of
                                    Just Int32
len ->
                                        let ([[Word8]]
raws, [Word8]
_) = Int -> Int -> [Word8] -> ([[Word8]], [Word8])
splitFixed Int
nPresent (Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
len) [Word8]
afterLvls
                                            texts :: [Text]
texts = ([Word8] -> Text) -> [[Word8]] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map ([Char] -> Text
T.pack ([Char] -> Text) -> ([Word8] -> [Char]) -> [Word8] -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Word8 -> Char) -> [Word8] -> [Char]
forall a b. (a -> b) -> [a] -> [b]
map (Int -> Char
chr (Int -> Char) -> (Word8 -> Int) -> Word8 -> Char
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word8 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral)) [[Word8]]
raws
                                         in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Text] -> Column
toMaybeText Int
maxDef [Int]
defLvls [Text]
texts)
                                    Maybe Int32
Nothing -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"FIXED_LEN_BYTE_ARRAY requires type length"
                            ParquetType
PARQUET_TYPE_UNKNOWN -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"Cannot read unknown Parquet type"
                    ParquetEncoding
ERLE_DICTIONARY -> Maybe DictVals -> Int -> [Int] -> Int -> [Word8] -> IO Column
decodeDictV1 Maybe DictVals
dictValsM Int
maxDef [Int]
defLvls Int
nPresent [Word8]
afterLvls
                    ParquetEncoding
EPLAIN_DICTIONARY -> Maybe DictVals -> Int -> [Int] -> Int -> [Word8] -> IO Column
decodeDictV1 Maybe DictVals
dictValsM Int
maxDef [Int]
defLvls Int
nPresent [Word8]
afterLvls
                    ParquetEncoding
other -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error ([Char]
"Unsupported v2 encoding: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ ParquetEncoding -> [Char]
forall a. Show a => a -> [Char]
show ParquetEncoding
other)

    case [Column]
cols of
        [] -> Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$ [Maybe Int] -> Column
forall a.
(Columnable a, ColumnifyRep (KindOf a) a) =>
[a] -> Column
DI.fromList ([] :: [Maybe Int])
        (Column
c : [Column]
cs) -> Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$ (Column -> Column -> Column) -> Column -> [Column] -> Column
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (\Column
l Column
r -> Column -> Maybe Column -> Column
forall a. a -> Maybe a -> a
fromMaybe ([Char] -> Column
forall a. HasCallStack => [Char] -> a
error [Char]
"concat failed") (Column -> Column -> Maybe Column
DI.concatColumns Column
l Column
r)) Column
c [Column]
cs