{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeApplications #-}

module DataFrame.IO.Parquet where

import Control.Monad
import Data.Bits
import qualified Data.ByteString as BSO
import Data.Char
import Data.Either
import Data.IORef
import Data.Int
import qualified Data.List as L
import qualified Data.Map as M
import qualified Data.Text as T
import Data.Word
import qualified DataFrame.Internal.Column as DI
import DataFrame.Internal.DataFrame (DataFrame)
import qualified DataFrame.Operations.Core as DI

import DataFrame.IO.Parquet.Dictionary
import DataFrame.IO.Parquet.Levels
import DataFrame.IO.Parquet.Page
import DataFrame.IO.Parquet.Thrift
import DataFrame.IO.Parquet.Types

{- | Read a parquet file from path and load it into a dataframe.

==== __Example__
@
ghci> D.readParquet ".\/data\/mtcars.parquet"
@
-}
readParquet :: FilePath -> IO DataFrame
readParquet :: [Char] -> IO DataFrame
readParquet [Char]
path = do
    FileMetadata
fileMetadata <- [Char] -> IO FileMetadata
readMetadataFromPath [Char]
path
    let columnPaths :: [(Text, Int)]
columnPaths = [SchemaElement] -> [(Text, Int)]
getColumnPaths (Int -> [SchemaElement] -> [SchemaElement]
forall a. Int -> [a] -> [a]
drop Int
1 ([SchemaElement] -> [SchemaElement])
-> [SchemaElement] -> [SchemaElement]
forall a b. (a -> b) -> a -> b
$ FileMetadata -> [SchemaElement]
schema FileMetadata
fileMetadata)
    let columnNames :: [Text]
columnNames = ((Text, Int) -> Text) -> [(Text, Int)] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (Text, Int) -> Text
forall a b. (a, b) -> a
fst [(Text, Int)]
columnPaths

    IORef (Map Text Column)
colMap <- Map Text Column -> IO (IORef (Map Text Column))
forall a. a -> IO (IORef a)
newIORef (Map Text Column
forall k a. Map k a
M.empty :: M.Map T.Text DI.Column)

    ByteString
contents <- [Char] -> IO ByteString
BSO.readFile [Char]
path

    let schemaElements :: [SchemaElement]
schemaElements = FileMetadata -> [SchemaElement]
schema FileMetadata
fileMetadata
    let getTypeLength :: [String] -> Maybe Int32
        getTypeLength :: [[Char]] -> Maybe Int32
getTypeLength [[Char]]
path = [SchemaElement] -> [[Char]] -> Integer -> Maybe Int32
forall {t}.
Num t =>
[SchemaElement] -> [[Char]] -> t -> Maybe Int32
findTypeLength [SchemaElement]
schemaElements [[Char]]
path Integer
0
          where
            findTypeLength :: [SchemaElement] -> [[Char]] -> t -> Maybe Int32
findTypeLength [] [[Char]]
_ t
_ = Maybe Int32
forall a. Maybe a
Nothing
            findTypeLength (SchemaElement
s : [SchemaElement]
ss) [[Char]]
targetPath t
depth
                | (Text -> [Char]) -> [Text] -> [[Char]]
forall a b. (a -> b) -> [a] -> [b]
map Text -> [Char]
T.unpack (SchemaElement -> [SchemaElement] -> t -> [Text]
forall {p} {p} {p} {a}. p -> p -> p -> [a]
pathToElement SchemaElement
s [SchemaElement]
ss t
depth) [[Char]] -> [[Char]] -> Bool
forall a. Eq a => a -> a -> Bool
== [[Char]]
targetPath
                    Bool -> Bool -> Bool
&& SchemaElement -> TType
elementType SchemaElement
s TType -> TType -> Bool
forall a. Eq a => a -> a -> Bool
== TType
STRING
                    Bool -> Bool -> Bool
&& SchemaElement -> Int32
typeLength SchemaElement
s Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
> Int32
0 =
                    Int32 -> Maybe Int32
forall a. a -> Maybe a
Just (SchemaElement -> Int32
typeLength SchemaElement
s)
                | Bool
otherwise =
                    [SchemaElement] -> [[Char]] -> t -> Maybe Int32
findTypeLength [SchemaElement]
ss [[Char]]
targetPath (if SchemaElement -> Int32
numChildren SchemaElement
s Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
> Int32
0 then t
depth t -> t -> t
forall a. Num a => a -> a -> a
+ t
1 else t
depth)

            pathToElement :: p -> p -> p -> [a]
pathToElement p
_ p
_ p
_ = []

    [RowGroup] -> (RowGroup -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (FileMetadata -> [RowGroup]
rowGroups FileMetadata
fileMetadata) ((RowGroup -> IO ()) -> IO ()) -> (RowGroup -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \RowGroup
rowGroup -> do
        [(ColumnChunk, Integer)]
-> ((ColumnChunk, Integer) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([ColumnChunk] -> [Integer] -> [(ColumnChunk, Integer)]
forall a b. [a] -> [b] -> [(a, b)]
zip (RowGroup -> [ColumnChunk]
rowGroupColumns RowGroup
rowGroup) [Integer
0 ..]) (((ColumnChunk, Integer) -> IO ()) -> IO ())
-> ((ColumnChunk, Integer) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(ColumnChunk
colChunk, Integer
colIdx) -> do
            let metadata :: ColumnMetaData
metadata = ColumnChunk -> ColumnMetaData
columnMetaData ColumnChunk
colChunk
            let colPath :: [[Char]]
colPath = ColumnMetaData -> [[Char]]
columnPathInSchema ColumnMetaData
metadata
            let colName :: Text
colName =
                    if [[Char]] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [[Char]]
colPath
                        then [Char] -> Text
T.pack ([Char] -> Text) -> [Char] -> Text
forall a b. (a -> b) -> a -> b
$ [Char]
"col_" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Integer -> [Char]
forall a. Show a => a -> [Char]
show Integer
colIdx
                        else [Char] -> Text
T.pack ([Char] -> Text) -> [Char] -> Text
forall a b. (a -> b) -> a -> b
$ [[Char]] -> [Char]
forall a. HasCallStack => [a] -> a
last [[Char]]
colPath

            let colDataPageOffset :: Int64
colDataPageOffset = ColumnMetaData -> Int64
columnDataPageOffset ColumnMetaData
metadata
            let colDictionaryPageOffset :: Int64
colDictionaryPageOffset = ColumnMetaData -> Int64
columnDictionaryPageOffset ColumnMetaData
metadata
            let colStart :: Int64
colStart =
                    if Int64
colDictionaryPageOffset Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
0 Bool -> Bool -> Bool
&& Int64
colDataPageOffset Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
colDictionaryPageOffset
                        then Int64
colDictionaryPageOffset
                        else Int64
colDataPageOffset
            let colLength :: Int64
colLength = ColumnMetaData -> Int64
columnTotalCompressedSize ColumnMetaData
metadata

            let columnBytes :: [Word8]
columnBytes =
                    (Int64 -> Word8) -> [Int64] -> [Word8]
forall a b. (a -> b) -> [a] -> [b]
map (HasCallStack => ByteString -> Int -> Word8
ByteString -> Int -> Word8
BSO.index ByteString
contents (Int -> Word8) -> (Int64 -> Int) -> Int64 -> Word8
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral) [Int64
colStart .. (Int64
colStart Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
colLength Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
1)]

            [Page]
pages <- CompressionCodec -> [Word8] -> IO [Page]
readAllPages (ColumnMetaData -> CompressionCodec
columnCodec ColumnMetaData
metadata) [Word8]
columnBytes

            let maybeTypeLength :: Maybe Int32
maybeTypeLength =
                    if ColumnMetaData -> ParquetType
columnType ColumnMetaData
metadata ParquetType -> ParquetType -> Bool
forall a. Eq a => a -> a -> Bool
== ParquetType
PFIXED_LEN_BYTE_ARRAY
                        then [[Char]] -> Maybe Int32
getTypeLength [[Char]]
colPath
                        else Maybe Int32
forall a. Maybe a
Nothing

            let primaryEncoding :: ParquetEncoding
primaryEncoding = ParquetEncoding
-> ((ParquetEncoding, [ParquetEncoding]) -> ParquetEncoding)
-> Maybe (ParquetEncoding, [ParquetEncoding])
-> ParquetEncoding
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ParquetEncoding
EPLAIN (ParquetEncoding, [ParquetEncoding]) -> ParquetEncoding
forall a b. (a, b) -> a
fst ([ParquetEncoding] -> Maybe (ParquetEncoding, [ParquetEncoding])
forall a. [a] -> Maybe (a, [a])
L.uncons (ColumnMetaData -> [ParquetEncoding]
columnEncodings ColumnMetaData
metadata))

            let schemaTail :: [SchemaElement]
schemaTail = Int -> [SchemaElement] -> [SchemaElement]
forall a. Int -> [a] -> [a]
drop Int
1 (FileMetadata -> [SchemaElement]
schema FileMetadata
fileMetadata)
            let colPath :: [[Char]]
colPath = ColumnMetaData -> [[Char]]
columnPathInSchema (ColumnChunk -> ColumnMetaData
columnMetaData ColumnChunk
colChunk)
            let (Int
maxDef, Int
maxRep) = [SchemaElement] -> [[Char]] -> (Int, Int)
levelsForPath [SchemaElement]
schemaTail [[Char]]
colPath
            Column
column <-
                (Int, Int)
-> [Page]
-> ParquetType
-> ParquetEncoding
-> Maybe Int32
-> IO Column
processColumnPages
                    (Int
maxDef, Int
maxRep)
                    [Page]
pages
                    (ColumnMetaData -> ParquetType
columnType ColumnMetaData
metadata)
                    ParquetEncoding
primaryEncoding
                    Maybe Int32
maybeTypeLength

            IORef (Map Text Column)
-> (Map Text Column -> Map Text Column) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef (Map Text Column)
colMap (Text -> Column -> Map Text Column -> Map Text Column
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert Text
colName Column
column)

    Map Text Column
finalColMap <- IORef (Map Text Column) -> IO (Map Text Column)
forall a. IORef a -> IO a
readIORef IORef (Map Text Column)
colMap
    let orderedColumns :: [(Text, Column)]
orderedColumns =
            (Text -> (Text, Column)) -> [Text] -> [(Text, Column)]
forall a b. (a -> b) -> [a] -> [b]
map
                (\Text
name -> (Text
name, Map Text Column
finalColMap Map Text Column -> Text -> Column
forall k a. Ord k => Map k a -> k -> a
M.! Text
name))
                ((Text -> Bool) -> [Text] -> [Text]
forall a. (a -> Bool) -> [a] -> [a]
filter (Text -> Map Text Column -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`M.member` Map Text Column
finalColMap) [Text]
columnNames)

    DataFrame -> IO DataFrame
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DataFrame -> IO DataFrame) -> DataFrame -> IO DataFrame
forall a b. (a -> b) -> a -> b
$ [(Text, Column)] -> DataFrame
DI.fromNamedColumns [(Text, Column)]
orderedColumns

readMetadataFromPath :: FilePath -> IO FileMetadata
readMetadataFromPath :: [Char] -> IO FileMetadata
readMetadataFromPath [Char]
path = do
    ByteString
contents <- [Char] -> IO ByteString
BSO.readFile [Char]
path
    let (Int
size, ByteString
magicString) = ByteString
contents ByteString -> (Int, ByteString) -> (Int, ByteString)
forall a b. a -> b -> b
`seq` ByteString -> (Int, ByteString)
readMetadataSizeFromFooter ByteString
contents
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString
magicString ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= ByteString
"PAR1") (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Invalid Parquet file"
    ByteString -> Int -> IO FileMetadata
readMetadata ByteString
contents Int
size

readMetadataSizeFromFooter :: BSO.ByteString -> (Int, BSO.ByteString)
readMetadataSizeFromFooter :: ByteString -> (Int, ByteString)
readMetadataSizeFromFooter ByteString
contents =
    let
        footerOffSet :: Int
footerOffSet = ByteString -> Int
BSO.length ByteString
contents Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
8
        sizeBytes :: [Int32]
sizeBytes =
            (Int -> Int32) -> [Int] -> [Int32]
forall a b. (a -> b) -> [a] -> [b]
map
                (forall a b. (Integral a, Num b) => a -> b
fromIntegral @Word8 @Int32 (Word8 -> Int32) -> (Int -> Word8) -> Int -> Int32
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HasCallStack => ByteString -> Int -> Word8
ByteString -> Int -> Word8
BSO.index ByteString
contents)
                [Int
footerOffSet .. Int
footerOffSet Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
3]
        size :: Int
size = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32 -> Int) -> Int32 -> Int
forall a b. (a -> b) -> a -> b
$ (Int32 -> Int32 -> Int32) -> Int32 -> [Int32] -> Int32
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
L.foldl' Int32 -> Int32 -> Int32
forall a. Bits a => a -> a -> a
(.|.) Int32
0 ([Int32] -> Int32) -> [Int32] -> Int32
forall a b. (a -> b) -> a -> b
$ (Int32 -> Int -> Int32) -> [Int32] -> [Int] -> [Int32]
forall a b c. (a -> b -> c) -> [a] -> [b] -> [c]
zipWith Int32 -> Int -> Int32
forall a. Bits a => a -> Int -> a
shift [Int32]
sizeBytes [Int
0, Int
8, Int
16, Int
24]
        magicStringBytes :: [Word8]
magicStringBytes = (Int -> Word8) -> [Int] -> [Word8]
forall a b. (a -> b) -> [a] -> [b]
map (HasCallStack => ByteString -> Int -> Word8
ByteString -> Int -> Word8
BSO.index ByteString
contents) [Int
footerOffSet Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
4 .. Int
footerOffSet Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
7]
        magicString :: ByteString
magicString = [Word8] -> ByteString
BSO.pack [Word8]
magicStringBytes
     in
        (Int
size, ByteString
magicString)

getColumnPaths :: [SchemaElement] -> [(T.Text, Int)]
getColumnPaths :: [SchemaElement] -> [(Text, Int)]
getColumnPaths [SchemaElement]
schema = [SchemaElement] -> Int -> [Text] -> [(Text, Int)]
extractLeafPaths [SchemaElement]
schema Int
0 []
  where
    extractLeafPaths :: [SchemaElement] -> Int -> [T.Text] -> [(T.Text, Int)]
    extractLeafPaths :: [SchemaElement] -> Int -> [Text] -> [(Text, Int)]
extractLeafPaths [] Int
_ [Text]
_ = []
    extractLeafPaths (SchemaElement
s : [SchemaElement]
ss) Int
idx [Text]
path
        | SchemaElement -> Int32
numChildren SchemaElement
s Int32 -> Int32 -> Bool
forall a. Eq a => a -> a -> Bool
== Int32
0 =
            let fullPath :: Text
fullPath = Text -> [Text] -> Text
T.intercalate Text
"." ([Text]
path [Text] -> [Text] -> [Text]
forall a. [a] -> [a] -> [a]
++ [SchemaElement -> Text
elementName SchemaElement
s])
             in (Text
fullPath, Int
idx) (Text, Int) -> [(Text, Int)] -> [(Text, Int)]
forall a. a -> [a] -> [a]
: [SchemaElement] -> Int -> [Text] -> [(Text, Int)]
extractLeafPaths [SchemaElement]
ss (Int
idx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) [Text]
path
        | Bool
otherwise =
            let newPath :: [Text]
newPath = if Text -> Bool
T.null (SchemaElement -> Text
elementName SchemaElement
s) then [Text]
path else [Text]
path [Text] -> [Text] -> [Text]
forall a. [a] -> [a] -> [a]
++ [SchemaElement -> Text
elementName SchemaElement
s]
                childrenCount :: Int
childrenCount = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (SchemaElement -> Int32
numChildren SchemaElement
s)
                ([SchemaElement]
children, [SchemaElement]
remaining) = Int -> [SchemaElement] -> ([SchemaElement], [SchemaElement])
forall a. Int -> [a] -> ([a], [a])
splitAt Int
childrenCount [SchemaElement]
ss
                childResults :: [(Text, Int)]
childResults = [SchemaElement] -> Int -> [Text] -> [(Text, Int)]
extractLeafPaths [SchemaElement]
children Int
idx [Text]
newPath
             in [(Text, Int)]
childResults [(Text, Int)] -> [(Text, Int)] -> [(Text, Int)]
forall a. [a] -> [a] -> [a]
++ [SchemaElement] -> Int -> [Text] -> [(Text, Int)]
extractLeafPaths [SchemaElement]
remaining (Int
idx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ [(Text, Int)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(Text, Int)]
childResults) [Text]
path

processColumnPages ::
    (Int, Int) ->
    [Page] ->
    ParquetType ->
    ParquetEncoding ->
    Maybe Int32 ->
    IO DI.Column
processColumnPages :: (Int, Int)
-> [Page]
-> ParquetType
-> ParquetEncoding
-> Maybe Int32
-> IO Column
processColumnPages (Int
maxDef, Int
maxRep) [Page]
pages ParquetType
pType ParquetEncoding
_ Maybe Int32
maybeTypeLength = do
    let dictPages :: [Page]
dictPages = (Page -> Bool) -> [Page] -> [Page]
forall a. (a -> Bool) -> [a] -> [a]
filter Page -> Bool
isDictionaryPage [Page]
pages
    let dataPages :: [Page]
dataPages = (Page -> Bool) -> [Page] -> [Page]
forall a. (a -> Bool) -> [a] -> [a]
filter Page -> Bool
isDataPage [Page]
pages

    let dictValsM :: Maybe DictVals
dictValsM =
            case [Page]
dictPages of
                [] -> Maybe DictVals
forall a. Maybe a
Nothing
                (Page
dictPage : [Page]
_) ->
                    case PageHeader -> PageTypeHeader
pageTypeHeader (Page -> PageHeader
pageHeader Page
dictPage) of
                        DictionaryPageHeader{Bool
Int32
ParquetEncoding
dictionaryPageHeaderNumValues :: Int32
dictionaryPageHeaderEncoding :: ParquetEncoding
dictionaryPageIsSorted :: Bool
dictionaryPageIsSorted :: PageTypeHeader -> Bool
dictionaryPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dictionaryPageHeaderNumValues :: PageTypeHeader -> Int32
..} ->
                            let countForBools :: Maybe Int32
countForBools =
                                    if ParquetType
pType ParquetType -> ParquetType -> Bool
forall a. Eq a => a -> a -> Bool
== ParquetType
PBOOLEAN
                                        then [Char] -> (Any -> Maybe Any) -> Int32 -> Maybe Int32
forall a. HasCallStack => [Char] -> a
error [Char]
"is bool" Any -> Maybe Any
forall a. a -> Maybe a
Just Int32
dictionaryPageHeaderNumValues
                                        else Maybe Int32
maybeTypeLength
                             in DictVals -> Maybe DictVals
forall a. a -> Maybe a
Just (ParquetType -> [Word8] -> Maybe Int32 -> DictVals
readDictVals ParquetType
pType (Page -> [Word8]
pageBytes Page
dictPage) Maybe Int32
countForBools)
                        PageTypeHeader
_ -> Maybe DictVals
forall a. Maybe a
Nothing

    [Column]
cols <- [Page] -> (Page -> IO Column) -> IO [Column]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Page]
dataPages ((Page -> IO Column) -> IO [Column])
-> (Page -> IO Column) -> IO [Column]
forall a b. (a -> b) -> a -> b
$ \Page
page -> do
        case PageHeader -> PageTypeHeader
pageTypeHeader (Page -> PageHeader
pageHeader Page
page) of
            DataPageHeader{Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderNumValues :: Int32
dataPageHeaderEncoding :: ParquetEncoding
definitionLevelEncoding :: ParquetEncoding
repetitionLevelEncoding :: ParquetEncoding
dataPageHeaderStatistics :: ColumnStatistics
dataPageHeaderStatistics :: PageTypeHeader -> ColumnStatistics
repetitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
definitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderNumValues :: PageTypeHeader -> Int32
..} -> do
                let n :: Int
n = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
dataPageHeaderNumValues
                let bs0 :: [Word8]
bs0 = Page -> [Word8]
pageBytes Page
page
                let ([Int]
defLvls, [Int]
_repLvls, [Word8]
afterLvls) = Int -> Int -> Int -> [Word8] -> ([Int], [Int], [Word8])
readLevelsV1 Int
n Int
maxDef Int
maxRep [Word8]
bs0
                let nPresent :: Int
nPresent = [Int] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ((Int -> Bool) -> [Int] -> [Int]
forall a. (a -> Bool) -> [a] -> [a]
filter (Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
maxDef) [Int]
defLvls)

                case ParquetEncoding
dataPageHeaderEncoding of
                    ParquetEncoding
EPLAIN ->
                        case ParquetType
pType of
                            ParquetType
PBOOLEAN ->
                                let ([Bool]
vals, [Word8]
_) = Int -> [Word8] -> ([Bool], [Word8])
readNBool Int
nPresent [Word8]
afterLvls
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Bool] -> Column
toMaybeBool Int
maxDef [Int]
defLvls [Bool]
vals)
                            ParquetType
PINT32 ->
                                let ([Int32]
vals, [Word8]
_) = Int -> [Word8] -> ([Int32], [Word8])
readNInt32 Int
nPresent [Word8]
afterLvls
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Int32] -> Column
toMaybeInt32 Int
maxDef [Int]
defLvls [Int32]
vals)
                            ParquetType
PINT64 ->
                                let ([Int64]
vals, [Word8]
_) = Int -> [Word8] -> ([Int64], [Word8])
readNInt64 Int
nPresent [Word8]
afterLvls
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Int64] -> Column
toMaybeInt64 Int
maxDef [Int]
defLvls [Int64]
vals)
                            ParquetType
PINT96 ->
                                let ([UTCTime]
vals, [Word8]
_) = Int -> [Word8] -> ([UTCTime], [Word8])
readNInt96Times Int
nPresent [Word8]
afterLvls
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [UTCTime] -> Column
toMaybeUTCTime Int
maxDef [Int]
defLvls [UTCTime]
vals)
                            ParquetType
PFLOAT ->
                                let ([Float]
vals, [Word8]
_) = Int -> [Word8] -> ([Float], [Word8])
readNFloat Int
nPresent [Word8]
afterLvls
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Float] -> Column
toMaybeFloat Int
maxDef [Int]
defLvls [Float]
vals)
                            ParquetType
PDOUBLE ->
                                let ([Double]
vals, [Word8]
_) = Int -> [Word8] -> ([Double], [Word8])
readNDouble Int
nPresent [Word8]
afterLvls
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Double] -> Column
toMaybeDouble Int
maxDef [Int]
defLvls [Double]
vals)
                            ParquetType
PBYTE_ARRAY ->
                                let ([[Word8]]
raws, [Word8]
_) = Int -> [Word8] -> ([[Word8]], [Word8])
readNByteArrays Int
nPresent [Word8]
afterLvls
                                    texts :: [Text]
texts = ([Word8] -> Text) -> [[Word8]] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map ([Char] -> Text
T.pack ([Char] -> Text) -> ([Word8] -> [Char]) -> [Word8] -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Word8 -> Char) -> [Word8] -> [Char]
forall a b. (a -> b) -> [a] -> [b]
map (Int -> Char
chr (Int -> Char) -> (Word8 -> Int) -> Word8 -> Char
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word8 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral)) [[Word8]]
raws
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Text] -> Column
toMaybeText Int
maxDef [Int]
defLvls [Text]
texts)
                            ParquetType
PFIXED_LEN_BYTE_ARRAY ->
                                case Maybe Int32
maybeTypeLength of
                                    Just Int32
len ->
                                        let ([[Word8]]
raws, [Word8]
_) = Int -> Int -> [Word8] -> ([[Word8]], [Word8])
splitFixed Int
nPresent (Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
len) [Word8]
afterLvls
                                            texts :: [Text]
texts = ([Word8] -> Text) -> [[Word8]] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map ([Char] -> Text
T.pack ([Char] -> Text) -> ([Word8] -> [Char]) -> [Word8] -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Word8 -> Char) -> [Word8] -> [Char]
forall a b. (a -> b) -> [a] -> [b]
map (Int -> Char
chr (Int -> Char) -> (Word8 -> Int) -> Word8 -> Char
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word8 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral)) [[Word8]]
raws
                                         in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Text] -> Column
toMaybeText Int
maxDef [Int]
defLvls [Text]
texts)
                                    Maybe Int32
Nothing -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"FIXED_LEN_BYTE_ARRAY requires type length"
                            ParquetType
PARQUET_TYPE_UNKNOWN -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"Cannot read unknown Parquet type"
                    ParquetEncoding
ERLE_DICTIONARY -> Maybe DictVals -> Int -> [Int] -> Int -> [Word8] -> IO Column
decodeDictV1 Maybe DictVals
dictValsM Int
maxDef [Int]
defLvls Int
nPresent [Word8]
afterLvls
                    ParquetEncoding
EPLAIN_DICTIONARY -> Maybe DictVals -> Int -> [Int] -> Int -> [Word8] -> IO Column
decodeDictV1 Maybe DictVals
dictValsM Int
maxDef [Int]
defLvls Int
nPresent [Word8]
afterLvls
                    ParquetEncoding
other -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error ([Char]
"Unsupported v1 encoding: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ ParquetEncoding -> [Char]
forall a. Show a => a -> [Char]
show ParquetEncoding
other)
            DataPageHeaderV2{Bool
Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderV2NumValues :: Int32
dataPageHeaderV2NumNulls :: Int32
dataPageHeaderV2NumRows :: Int32
dataPageHeaderV2Encoding :: ParquetEncoding
definitionLevelByteLength :: Int32
repetitionLevelByteLength :: Int32
dataPageHeaderV2IsCompressed :: Bool
dataPageHeaderV2Statistics :: ColumnStatistics
dataPageHeaderV2Statistics :: PageTypeHeader -> ColumnStatistics
dataPageHeaderV2IsCompressed :: PageTypeHeader -> Bool
repetitionLevelByteLength :: PageTypeHeader -> Int32
definitionLevelByteLength :: PageTypeHeader -> Int32
dataPageHeaderV2Encoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderV2NumRows :: PageTypeHeader -> Int32
dataPageHeaderV2NumNulls :: PageTypeHeader -> Int32
dataPageHeaderV2NumValues :: PageTypeHeader -> Int32
..} -> do
                let n :: Int
n = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
dataPageHeaderV2NumValues
                let bs0 :: [Word8]
bs0 = Page -> [Word8]
pageBytes Page
page
                let ([Int]
defLvls, [Int]
_repLvls, [Word8]
afterLvls) =
                        Int
-> Int
-> Int
-> Int32
-> Int32
-> [Word8]
-> ([Int], [Int], [Word8])
readLevelsV2
                            Int
n
                            Int
maxDef
                            Int
maxRep
                            Int32
definitionLevelByteLength
                            Int32
repetitionLevelByteLength
                            [Word8]
bs0
                let nPresent :: Int
nPresent =
                        if Int32
dataPageHeaderV2NumNulls Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
> Int32
0
                            then Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32
dataPageHeaderV2NumValues Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
- Int32
dataPageHeaderV2NumNulls)
                            else [Int] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ((Int -> Bool) -> [Int] -> [Int]
forall a. (a -> Bool) -> [a] -> [a]
filter (Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
maxDef) [Int]
defLvls)

                case ParquetEncoding
dataPageHeaderV2Encoding of
                    ParquetEncoding
EPLAIN ->
                        case ParquetType
pType of
                            ParquetType
PBOOLEAN ->
                                let ([Bool]
vals, [Word8]
_) = Int -> [Word8] -> ([Bool], [Word8])
readNBool Int
nPresent [Word8]
afterLvls
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Bool] -> Column
toMaybeBool Int
maxDef [Int]
defLvls [Bool]
vals)
                            ParquetType
PINT32 ->
                                let ([Int32]
vals, [Word8]
_) = Int -> [Word8] -> ([Int32], [Word8])
readNInt32 Int
nPresent [Word8]
afterLvls
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Int32] -> Column
toMaybeInt32 Int
maxDef [Int]
defLvls [Int32]
vals)
                            ParquetType
PINT64 ->
                                let ([Int64]
vals, [Word8]
_) = Int -> [Word8] -> ([Int64], [Word8])
readNInt64 Int
nPresent [Word8]
afterLvls
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Int64] -> Column
toMaybeInt64 Int
maxDef [Int]
defLvls [Int64]
vals)
                            ParquetType
PINT96 ->
                                let ([UTCTime]
vals, [Word8]
_) = Int -> [Word8] -> ([UTCTime], [Word8])
readNInt96Times Int
nPresent [Word8]
afterLvls
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [UTCTime] -> Column
toMaybeUTCTime Int
maxDef [Int]
defLvls [UTCTime]
vals)
                            ParquetType
PFLOAT ->
                                let ([Float]
vals, [Word8]
_) = Int -> [Word8] -> ([Float], [Word8])
readNFloat Int
nPresent [Word8]
afterLvls
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Float] -> Column
toMaybeFloat Int
maxDef [Int]
defLvls [Float]
vals)
                            ParquetType
PDOUBLE ->
                                let ([Double]
vals, [Word8]
_) = Int -> [Word8] -> ([Double], [Word8])
readNDouble Int
nPresent [Word8]
afterLvls
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Double] -> Column
toMaybeDouble Int
maxDef [Int]
defLvls [Double]
vals)
                            ParquetType
PBYTE_ARRAY ->
                                let ([[Word8]]
raws, [Word8]
_) = Int -> [Word8] -> ([[Word8]], [Word8])
readNByteArrays Int
nPresent [Word8]
afterLvls
                                    texts :: [Text]
texts = ([Word8] -> Text) -> [[Word8]] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map ([Char] -> Text
T.pack ([Char] -> Text) -> ([Word8] -> [Char]) -> [Word8] -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Word8 -> Char) -> [Word8] -> [Char]
forall a b. (a -> b) -> [a] -> [b]
map (Int -> Char
chr (Int -> Char) -> (Word8 -> Int) -> Word8 -> Char
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word8 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral)) [[Word8]]
raws
                                 in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Text] -> Column
toMaybeText Int
maxDef [Int]
defLvls [Text]
texts)
                            ParquetType
PFIXED_LEN_BYTE_ARRAY ->
                                case Maybe Int32
maybeTypeLength of
                                    Just Int32
len ->
                                        let ([[Word8]]
raws, [Word8]
_) = Int -> Int -> [Word8] -> ([[Word8]], [Word8])
splitFixed Int
nPresent (Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
len) [Word8]
afterLvls
                                            texts :: [Text]
texts = ([Word8] -> Text) -> [[Word8]] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map ([Char] -> Text
T.pack ([Char] -> Text) -> ([Word8] -> [Char]) -> [Word8] -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Word8 -> Char) -> [Word8] -> [Char]
forall a b. (a -> b) -> [a] -> [b]
map (Int -> Char
chr (Int -> Char) -> (Word8 -> Int) -> Word8 -> Char
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word8 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral)) [[Word8]]
raws
                                         in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Text] -> Column
toMaybeText Int
maxDef [Int]
defLvls [Text]
texts)
                                    Maybe Int32
Nothing -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"FIXED_LEN_BYTE_ARRAY requires type length"
                            ParquetType
PARQUET_TYPE_UNKNOWN -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"Cannot read unknown Parquet type"
                    ParquetEncoding
ERLE_DICTIONARY -> Maybe DictVals -> Int -> [Int] -> Int -> [Word8] -> IO Column
decodeDictV1 Maybe DictVals
dictValsM Int
maxDef [Int]
defLvls Int
nPresent [Word8]
afterLvls
                    ParquetEncoding
EPLAIN_DICTIONARY -> Maybe DictVals -> Int -> [Int] -> Int -> [Word8] -> IO Column
decodeDictV1 Maybe DictVals
dictValsM Int
maxDef [Int]
defLvls Int
nPresent [Word8]
afterLvls
                    ParquetEncoding
other -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error ([Char]
"Unsupported v2 encoding: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ ParquetEncoding -> [Char]
forall a. Show a => a -> [Char]
show ParquetEncoding
other)
            -- Cannot happen as these are filtered out by isDataPage above
            DictionaryPageHeader{} -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"processColumnPages: impossible DictionaryPageHeader"
            PageTypeHeader
INDEX_PAGE_HEADER -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"processColumnPages: impossible INDEX_PAGE_HEADER"
            PageTypeHeader
PAGE_TYPE_HEADER_UNKNOWN -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"processColumnPages: impossible PAGE_TYPE_HEADER_UNKNOWN"
    case [Column]
cols of
        [] -> Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$ [Maybe Int] -> Column
forall a.
(Columnable a, ColumnifyRep (KindOf a) a) =>
[a] -> Column
DI.fromList ([] :: [Maybe Int])
        (Column
c : [Column]
cs) ->
            Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
                (Column -> Column -> Column) -> Column -> [Column] -> Column
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
L.foldl' (\Column
l Column
r -> Column -> Either DataFrameException Column -> Column
forall b a. b -> Either a b -> b
fromRight ([Char] -> Column
forall a. HasCallStack => [Char] -> a
error [Char]
"concat failed") (Column -> Column -> Either DataFrameException Column
DI.concatColumns Column
l Column
r)) Column
c [Column]
cs