{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeApplications #-}
module DataFrame.IO.Parquet where
import Control.Monad
import Data.Bits
import qualified Data.ByteString as BSO
import Data.Char
import Data.Either
import Data.IORef
import Data.Int
import qualified Data.List as L
import qualified Data.Map as M
import qualified Data.Text as T
import Data.Word
import qualified DataFrame.Internal.Column as DI
import DataFrame.Internal.DataFrame (DataFrame)
import qualified DataFrame.Operations.Core as DI
import DataFrame.IO.Parquet.Dictionary
import DataFrame.IO.Parquet.Levels
import DataFrame.IO.Parquet.Page
import DataFrame.IO.Parquet.Thrift
import DataFrame.IO.Parquet.Types
readParquet :: FilePath -> IO DataFrame
readParquet :: [Char] -> IO DataFrame
readParquet [Char]
path = do
FileMetadata
fileMetadata <- [Char] -> IO FileMetadata
readMetadataFromPath [Char]
path
let columnPaths :: [(Text, Int)]
columnPaths = [SchemaElement] -> [(Text, Int)]
getColumnPaths (Int -> [SchemaElement] -> [SchemaElement]
forall a. Int -> [a] -> [a]
drop Int
1 ([SchemaElement] -> [SchemaElement])
-> [SchemaElement] -> [SchemaElement]
forall a b. (a -> b) -> a -> b
$ FileMetadata -> [SchemaElement]
schema FileMetadata
fileMetadata)
let columnNames :: [Text]
columnNames = ((Text, Int) -> Text) -> [(Text, Int)] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (Text, Int) -> Text
forall a b. (a, b) -> a
fst [(Text, Int)]
columnPaths
IORef (Map Text Column)
colMap <- Map Text Column -> IO (IORef (Map Text Column))
forall a. a -> IO (IORef a)
newIORef (Map Text Column
forall k a. Map k a
M.empty :: M.Map T.Text DI.Column)
ByteString
contents <- [Char] -> IO ByteString
BSO.readFile [Char]
path
let schemaElements :: [SchemaElement]
schemaElements = FileMetadata -> [SchemaElement]
schema FileMetadata
fileMetadata
let getTypeLength :: [String] -> Maybe Int32
getTypeLength :: [[Char]] -> Maybe Int32
getTypeLength [[Char]]
path = [SchemaElement] -> [[Char]] -> Integer -> Maybe Int32
forall {t}.
Num t =>
[SchemaElement] -> [[Char]] -> t -> Maybe Int32
findTypeLength [SchemaElement]
schemaElements [[Char]]
path Integer
0
where
findTypeLength :: [SchemaElement] -> [[Char]] -> t -> Maybe Int32
findTypeLength [] [[Char]]
_ t
_ = Maybe Int32
forall a. Maybe a
Nothing
findTypeLength (SchemaElement
s : [SchemaElement]
ss) [[Char]]
targetPath t
depth
| (Text -> [Char]) -> [Text] -> [[Char]]
forall a b. (a -> b) -> [a] -> [b]
map Text -> [Char]
T.unpack (SchemaElement -> [SchemaElement] -> t -> [Text]
forall {p} {p} {p} {a}. p -> p -> p -> [a]
pathToElement SchemaElement
s [SchemaElement]
ss t
depth) [[Char]] -> [[Char]] -> Bool
forall a. Eq a => a -> a -> Bool
== [[Char]]
targetPath
Bool -> Bool -> Bool
&& SchemaElement -> TType
elementType SchemaElement
s TType -> TType -> Bool
forall a. Eq a => a -> a -> Bool
== TType
STRING
Bool -> Bool -> Bool
&& SchemaElement -> Int32
typeLength SchemaElement
s Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
> Int32
0 =
Int32 -> Maybe Int32
forall a. a -> Maybe a
Just (SchemaElement -> Int32
typeLength SchemaElement
s)
| Bool
otherwise =
[SchemaElement] -> [[Char]] -> t -> Maybe Int32
findTypeLength [SchemaElement]
ss [[Char]]
targetPath (if SchemaElement -> Int32
numChildren SchemaElement
s Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
> Int32
0 then t
depth t -> t -> t
forall a. Num a => a -> a -> a
+ t
1 else t
depth)
pathToElement :: p -> p -> p -> [a]
pathToElement p
_ p
_ p
_ = []
[RowGroup] -> (RowGroup -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (FileMetadata -> [RowGroup]
rowGroups FileMetadata
fileMetadata) ((RowGroup -> IO ()) -> IO ()) -> (RowGroup -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \RowGroup
rowGroup -> do
[(ColumnChunk, Integer)]
-> ((ColumnChunk, Integer) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([ColumnChunk] -> [Integer] -> [(ColumnChunk, Integer)]
forall a b. [a] -> [b] -> [(a, b)]
zip (RowGroup -> [ColumnChunk]
rowGroupColumns RowGroup
rowGroup) [Integer
0 ..]) (((ColumnChunk, Integer) -> IO ()) -> IO ())
-> ((ColumnChunk, Integer) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(ColumnChunk
colChunk, Integer
colIdx) -> do
let metadata :: ColumnMetaData
metadata = ColumnChunk -> ColumnMetaData
columnMetaData ColumnChunk
colChunk
let colPath :: [[Char]]
colPath = ColumnMetaData -> [[Char]]
columnPathInSchema ColumnMetaData
metadata
let colName :: Text
colName =
if [[Char]] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [[Char]]
colPath
then [Char] -> Text
T.pack ([Char] -> Text) -> [Char] -> Text
forall a b. (a -> b) -> a -> b
$ [Char]
"col_" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Integer -> [Char]
forall a. Show a => a -> [Char]
show Integer
colIdx
else [Char] -> Text
T.pack ([Char] -> Text) -> [Char] -> Text
forall a b. (a -> b) -> a -> b
$ [[Char]] -> [Char]
forall a. HasCallStack => [a] -> a
last [[Char]]
colPath
let colDataPageOffset :: Int64
colDataPageOffset = ColumnMetaData -> Int64
columnDataPageOffset ColumnMetaData
metadata
let colDictionaryPageOffset :: Int64
colDictionaryPageOffset = ColumnMetaData -> Int64
columnDictionaryPageOffset ColumnMetaData
metadata
let colStart :: Int64
colStart =
if Int64
colDictionaryPageOffset Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
0 Bool -> Bool -> Bool
&& Int64
colDataPageOffset Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
colDictionaryPageOffset
then Int64
colDictionaryPageOffset
else Int64
colDataPageOffset
let colLength :: Int64
colLength = ColumnMetaData -> Int64
columnTotalCompressedSize ColumnMetaData
metadata
let columnBytes :: [Word8]
columnBytes =
(Int64 -> Word8) -> [Int64] -> [Word8]
forall a b. (a -> b) -> [a] -> [b]
map (HasCallStack => ByteString -> Int -> Word8
ByteString -> Int -> Word8
BSO.index ByteString
contents (Int -> Word8) -> (Int64 -> Int) -> Int64 -> Word8
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral) [Int64
colStart .. (Int64
colStart Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
colLength Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
1)]
[Page]
pages <- CompressionCodec -> [Word8] -> IO [Page]
readAllPages (ColumnMetaData -> CompressionCodec
columnCodec ColumnMetaData
metadata) [Word8]
columnBytes
let maybeTypeLength :: Maybe Int32
maybeTypeLength =
if ColumnMetaData -> ParquetType
columnType ColumnMetaData
metadata ParquetType -> ParquetType -> Bool
forall a. Eq a => a -> a -> Bool
== ParquetType
PFIXED_LEN_BYTE_ARRAY
then [[Char]] -> Maybe Int32
getTypeLength [[Char]]
colPath
else Maybe Int32
forall a. Maybe a
Nothing
let primaryEncoding :: ParquetEncoding
primaryEncoding = ParquetEncoding
-> ((ParquetEncoding, [ParquetEncoding]) -> ParquetEncoding)
-> Maybe (ParquetEncoding, [ParquetEncoding])
-> ParquetEncoding
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ParquetEncoding
EPLAIN (ParquetEncoding, [ParquetEncoding]) -> ParquetEncoding
forall a b. (a, b) -> a
fst ([ParquetEncoding] -> Maybe (ParquetEncoding, [ParquetEncoding])
forall a. [a] -> Maybe (a, [a])
L.uncons (ColumnMetaData -> [ParquetEncoding]
columnEncodings ColumnMetaData
metadata))
let schemaTail :: [SchemaElement]
schemaTail = Int -> [SchemaElement] -> [SchemaElement]
forall a. Int -> [a] -> [a]
drop Int
1 (FileMetadata -> [SchemaElement]
schema FileMetadata
fileMetadata)
let colPath :: [[Char]]
colPath = ColumnMetaData -> [[Char]]
columnPathInSchema (ColumnChunk -> ColumnMetaData
columnMetaData ColumnChunk
colChunk)
let (Int
maxDef, Int
maxRep) = [SchemaElement] -> [[Char]] -> (Int, Int)
levelsForPath [SchemaElement]
schemaTail [[Char]]
colPath
Column
column <-
(Int, Int)
-> [Page]
-> ParquetType
-> ParquetEncoding
-> Maybe Int32
-> IO Column
processColumnPages
(Int
maxDef, Int
maxRep)
[Page]
pages
(ColumnMetaData -> ParquetType
columnType ColumnMetaData
metadata)
ParquetEncoding
primaryEncoding
Maybe Int32
maybeTypeLength
IORef (Map Text Column)
-> (Map Text Column -> Map Text Column) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef (Map Text Column)
colMap (Text -> Column -> Map Text Column -> Map Text Column
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert Text
colName Column
column)
Map Text Column
finalColMap <- IORef (Map Text Column) -> IO (Map Text Column)
forall a. IORef a -> IO a
readIORef IORef (Map Text Column)
colMap
let orderedColumns :: [(Text, Column)]
orderedColumns =
(Text -> (Text, Column)) -> [Text] -> [(Text, Column)]
forall a b. (a -> b) -> [a] -> [b]
map
(\Text
name -> (Text
name, Map Text Column
finalColMap Map Text Column -> Text -> Column
forall k a. Ord k => Map k a -> k -> a
M.! Text
name))
((Text -> Bool) -> [Text] -> [Text]
forall a. (a -> Bool) -> [a] -> [a]
filter (Text -> Map Text Column -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`M.member` Map Text Column
finalColMap) [Text]
columnNames)
DataFrame -> IO DataFrame
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DataFrame -> IO DataFrame) -> DataFrame -> IO DataFrame
forall a b. (a -> b) -> a -> b
$ [(Text, Column)] -> DataFrame
DI.fromNamedColumns [(Text, Column)]
orderedColumns
readMetadataFromPath :: FilePath -> IO FileMetadata
readMetadataFromPath :: [Char] -> IO FileMetadata
readMetadataFromPath [Char]
path = do
ByteString
contents <- [Char] -> IO ByteString
BSO.readFile [Char]
path
let (Int
size, ByteString
magicString) = ByteString
contents ByteString -> (Int, ByteString) -> (Int, ByteString)
forall a b. a -> b -> b
`seq` ByteString -> (Int, ByteString)
readMetadataSizeFromFooter ByteString
contents
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString
magicString ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= ByteString
"PAR1") (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Invalid Parquet file"
ByteString -> Int -> IO FileMetadata
readMetadata ByteString
contents Int
size
readMetadataSizeFromFooter :: BSO.ByteString -> (Int, BSO.ByteString)
ByteString
contents =
let
footerOffSet :: Int
footerOffSet = ByteString -> Int
BSO.length ByteString
contents Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
8
sizeBytes :: [Int32]
sizeBytes =
(Int -> Int32) -> [Int] -> [Int32]
forall a b. (a -> b) -> [a] -> [b]
map
(forall a b. (Integral a, Num b) => a -> b
fromIntegral @Word8 @Int32 (Word8 -> Int32) -> (Int -> Word8) -> Int -> Int32
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HasCallStack => ByteString -> Int -> Word8
ByteString -> Int -> Word8
BSO.index ByteString
contents)
[Int
footerOffSet .. Int
footerOffSet Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
3]
size :: Int
size = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32 -> Int) -> Int32 -> Int
forall a b. (a -> b) -> a -> b
$ (Int32 -> Int32 -> Int32) -> Int32 -> [Int32] -> Int32
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
L.foldl' Int32 -> Int32 -> Int32
forall a. Bits a => a -> a -> a
(.|.) Int32
0 ([Int32] -> Int32) -> [Int32] -> Int32
forall a b. (a -> b) -> a -> b
$ (Int32 -> Int -> Int32) -> [Int32] -> [Int] -> [Int32]
forall a b c. (a -> b -> c) -> [a] -> [b] -> [c]
zipWith Int32 -> Int -> Int32
forall a. Bits a => a -> Int -> a
shift [Int32]
sizeBytes [Int
0, Int
8, Int
16, Int
24]
magicStringBytes :: [Word8]
magicStringBytes = (Int -> Word8) -> [Int] -> [Word8]
forall a b. (a -> b) -> [a] -> [b]
map (HasCallStack => ByteString -> Int -> Word8
ByteString -> Int -> Word8
BSO.index ByteString
contents) [Int
footerOffSet Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
4 .. Int
footerOffSet Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
7]
magicString :: ByteString
magicString = [Word8] -> ByteString
BSO.pack [Word8]
magicStringBytes
in
(Int
size, ByteString
magicString)
getColumnPaths :: [SchemaElement] -> [(T.Text, Int)]
getColumnPaths :: [SchemaElement] -> [(Text, Int)]
getColumnPaths [SchemaElement]
schema = [SchemaElement] -> Int -> [Text] -> [(Text, Int)]
extractLeafPaths [SchemaElement]
schema Int
0 []
where
extractLeafPaths :: [SchemaElement] -> Int -> [T.Text] -> [(T.Text, Int)]
extractLeafPaths :: [SchemaElement] -> Int -> [Text] -> [(Text, Int)]
extractLeafPaths [] Int
_ [Text]
_ = []
extractLeafPaths (SchemaElement
s : [SchemaElement]
ss) Int
idx [Text]
path
| SchemaElement -> Int32
numChildren SchemaElement
s Int32 -> Int32 -> Bool
forall a. Eq a => a -> a -> Bool
== Int32
0 =
let fullPath :: Text
fullPath = Text -> [Text] -> Text
T.intercalate Text
"." ([Text]
path [Text] -> [Text] -> [Text]
forall a. [a] -> [a] -> [a]
++ [SchemaElement -> Text
elementName SchemaElement
s])
in (Text
fullPath, Int
idx) (Text, Int) -> [(Text, Int)] -> [(Text, Int)]
forall a. a -> [a] -> [a]
: [SchemaElement] -> Int -> [Text] -> [(Text, Int)]
extractLeafPaths [SchemaElement]
ss (Int
idx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) [Text]
path
| Bool
otherwise =
let newPath :: [Text]
newPath = if Text -> Bool
T.null (SchemaElement -> Text
elementName SchemaElement
s) then [Text]
path else [Text]
path [Text] -> [Text] -> [Text]
forall a. [a] -> [a] -> [a]
++ [SchemaElement -> Text
elementName SchemaElement
s]
childrenCount :: Int
childrenCount = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (SchemaElement -> Int32
numChildren SchemaElement
s)
([SchemaElement]
children, [SchemaElement]
remaining) = Int -> [SchemaElement] -> ([SchemaElement], [SchemaElement])
forall a. Int -> [a] -> ([a], [a])
splitAt Int
childrenCount [SchemaElement]
ss
childResults :: [(Text, Int)]
childResults = [SchemaElement] -> Int -> [Text] -> [(Text, Int)]
extractLeafPaths [SchemaElement]
children Int
idx [Text]
newPath
in [(Text, Int)]
childResults [(Text, Int)] -> [(Text, Int)] -> [(Text, Int)]
forall a. [a] -> [a] -> [a]
++ [SchemaElement] -> Int -> [Text] -> [(Text, Int)]
extractLeafPaths [SchemaElement]
remaining (Int
idx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ [(Text, Int)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(Text, Int)]
childResults) [Text]
path
processColumnPages ::
(Int, Int) ->
[Page] ->
ParquetType ->
ParquetEncoding ->
Maybe Int32 ->
IO DI.Column
processColumnPages :: (Int, Int)
-> [Page]
-> ParquetType
-> ParquetEncoding
-> Maybe Int32
-> IO Column
processColumnPages (Int
maxDef, Int
maxRep) [Page]
pages ParquetType
pType ParquetEncoding
_ Maybe Int32
maybeTypeLength = do
let dictPages :: [Page]
dictPages = (Page -> Bool) -> [Page] -> [Page]
forall a. (a -> Bool) -> [a] -> [a]
filter Page -> Bool
isDictionaryPage [Page]
pages
let dataPages :: [Page]
dataPages = (Page -> Bool) -> [Page] -> [Page]
forall a. (a -> Bool) -> [a] -> [a]
filter Page -> Bool
isDataPage [Page]
pages
let dictValsM :: Maybe DictVals
dictValsM =
case [Page]
dictPages of
[] -> Maybe DictVals
forall a. Maybe a
Nothing
(Page
dictPage : [Page]
_) ->
case PageHeader -> PageTypeHeader
pageTypeHeader (Page -> PageHeader
pageHeader Page
dictPage) of
DictionaryPageHeader{Bool
Int32
ParquetEncoding
dictionaryPageHeaderNumValues :: Int32
dictionaryPageHeaderEncoding :: ParquetEncoding
dictionaryPageIsSorted :: Bool
dictionaryPageIsSorted :: PageTypeHeader -> Bool
dictionaryPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dictionaryPageHeaderNumValues :: PageTypeHeader -> Int32
..} ->
let countForBools :: Maybe Int32
countForBools =
if ParquetType
pType ParquetType -> ParquetType -> Bool
forall a. Eq a => a -> a -> Bool
== ParquetType
PBOOLEAN
then [Char] -> (Any -> Maybe Any) -> Int32 -> Maybe Int32
forall a. HasCallStack => [Char] -> a
error [Char]
"is bool" Any -> Maybe Any
forall a. a -> Maybe a
Just Int32
dictionaryPageHeaderNumValues
else Maybe Int32
maybeTypeLength
in DictVals -> Maybe DictVals
forall a. a -> Maybe a
Just (ParquetType -> [Word8] -> Maybe Int32 -> DictVals
readDictVals ParquetType
pType (Page -> [Word8]
pageBytes Page
dictPage) Maybe Int32
countForBools)
PageTypeHeader
_ -> Maybe DictVals
forall a. Maybe a
Nothing
[Column]
cols <- [Page] -> (Page -> IO Column) -> IO [Column]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Page]
dataPages ((Page -> IO Column) -> IO [Column])
-> (Page -> IO Column) -> IO [Column]
forall a b. (a -> b) -> a -> b
$ \Page
page -> do
case PageHeader -> PageTypeHeader
pageTypeHeader (Page -> PageHeader
pageHeader Page
page) of
DataPageHeader{Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderNumValues :: Int32
dataPageHeaderEncoding :: ParquetEncoding
definitionLevelEncoding :: ParquetEncoding
repetitionLevelEncoding :: ParquetEncoding
dataPageHeaderStatistics :: ColumnStatistics
dataPageHeaderStatistics :: PageTypeHeader -> ColumnStatistics
repetitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
definitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderNumValues :: PageTypeHeader -> Int32
..} -> do
let n :: Int
n = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
dataPageHeaderNumValues
let bs0 :: [Word8]
bs0 = Page -> [Word8]
pageBytes Page
page
let ([Int]
defLvls, [Int]
_repLvls, [Word8]
afterLvls) = Int -> Int -> Int -> [Word8] -> ([Int], [Int], [Word8])
readLevelsV1 Int
n Int
maxDef Int
maxRep [Word8]
bs0
let nPresent :: Int
nPresent = [Int] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ((Int -> Bool) -> [Int] -> [Int]
forall a. (a -> Bool) -> [a] -> [a]
filter (Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
maxDef) [Int]
defLvls)
case ParquetEncoding
dataPageHeaderEncoding of
ParquetEncoding
EPLAIN ->
case ParquetType
pType of
ParquetType
PBOOLEAN ->
let ([Bool]
vals, [Word8]
_) = Int -> [Word8] -> ([Bool], [Word8])
readNBool Int
nPresent [Word8]
afterLvls
in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Bool] -> Column
toMaybeBool Int
maxDef [Int]
defLvls [Bool]
vals)
ParquetType
PINT32 ->
let ([Int32]
vals, [Word8]
_) = Int -> [Word8] -> ([Int32], [Word8])
readNInt32 Int
nPresent [Word8]
afterLvls
in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Int32] -> Column
toMaybeInt32 Int
maxDef [Int]
defLvls [Int32]
vals)
ParquetType
PINT64 ->
let ([Int64]
vals, [Word8]
_) = Int -> [Word8] -> ([Int64], [Word8])
readNInt64 Int
nPresent [Word8]
afterLvls
in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Int64] -> Column
toMaybeInt64 Int
maxDef [Int]
defLvls [Int64]
vals)
ParquetType
PINT96 ->
let ([UTCTime]
vals, [Word8]
_) = Int -> [Word8] -> ([UTCTime], [Word8])
readNInt96Times Int
nPresent [Word8]
afterLvls
in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [UTCTime] -> Column
toMaybeUTCTime Int
maxDef [Int]
defLvls [UTCTime]
vals)
ParquetType
PFLOAT ->
let ([Float]
vals, [Word8]
_) = Int -> [Word8] -> ([Float], [Word8])
readNFloat Int
nPresent [Word8]
afterLvls
in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Float] -> Column
toMaybeFloat Int
maxDef [Int]
defLvls [Float]
vals)
ParquetType
PDOUBLE ->
let ([Double]
vals, [Word8]
_) = Int -> [Word8] -> ([Double], [Word8])
readNDouble Int
nPresent [Word8]
afterLvls
in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Double] -> Column
toMaybeDouble Int
maxDef [Int]
defLvls [Double]
vals)
ParquetType
PBYTE_ARRAY ->
let ([[Word8]]
raws, [Word8]
_) = Int -> [Word8] -> ([[Word8]], [Word8])
readNByteArrays Int
nPresent [Word8]
afterLvls
texts :: [Text]
texts = ([Word8] -> Text) -> [[Word8]] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map ([Char] -> Text
T.pack ([Char] -> Text) -> ([Word8] -> [Char]) -> [Word8] -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Word8 -> Char) -> [Word8] -> [Char]
forall a b. (a -> b) -> [a] -> [b]
map (Int -> Char
chr (Int -> Char) -> (Word8 -> Int) -> Word8 -> Char
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word8 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral)) [[Word8]]
raws
in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Text] -> Column
toMaybeText Int
maxDef [Int]
defLvls [Text]
texts)
ParquetType
PFIXED_LEN_BYTE_ARRAY ->
case Maybe Int32
maybeTypeLength of
Just Int32
len ->
let ([[Word8]]
raws, [Word8]
_) = Int -> Int -> [Word8] -> ([[Word8]], [Word8])
splitFixed Int
nPresent (Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
len) [Word8]
afterLvls
texts :: [Text]
texts = ([Word8] -> Text) -> [[Word8]] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map ([Char] -> Text
T.pack ([Char] -> Text) -> ([Word8] -> [Char]) -> [Word8] -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Word8 -> Char) -> [Word8] -> [Char]
forall a b. (a -> b) -> [a] -> [b]
map (Int -> Char
chr (Int -> Char) -> (Word8 -> Int) -> Word8 -> Char
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word8 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral)) [[Word8]]
raws
in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Text] -> Column
toMaybeText Int
maxDef [Int]
defLvls [Text]
texts)
Maybe Int32
Nothing -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"FIXED_LEN_BYTE_ARRAY requires type length"
ParquetType
PARQUET_TYPE_UNKNOWN -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"Cannot read unknown Parquet type"
ParquetEncoding
ERLE_DICTIONARY -> Maybe DictVals -> Int -> [Int] -> Int -> [Word8] -> IO Column
decodeDictV1 Maybe DictVals
dictValsM Int
maxDef [Int]
defLvls Int
nPresent [Word8]
afterLvls
ParquetEncoding
EPLAIN_DICTIONARY -> Maybe DictVals -> Int -> [Int] -> Int -> [Word8] -> IO Column
decodeDictV1 Maybe DictVals
dictValsM Int
maxDef [Int]
defLvls Int
nPresent [Word8]
afterLvls
ParquetEncoding
other -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error ([Char]
"Unsupported v1 encoding: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ ParquetEncoding -> [Char]
forall a. Show a => a -> [Char]
show ParquetEncoding
other)
DataPageHeaderV2{Bool
Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderV2NumValues :: Int32
dataPageHeaderV2NumNulls :: Int32
dataPageHeaderV2NumRows :: Int32
dataPageHeaderV2Encoding :: ParquetEncoding
definitionLevelByteLength :: Int32
repetitionLevelByteLength :: Int32
dataPageHeaderV2IsCompressed :: Bool
dataPageHeaderV2Statistics :: ColumnStatistics
dataPageHeaderV2Statistics :: PageTypeHeader -> ColumnStatistics
dataPageHeaderV2IsCompressed :: PageTypeHeader -> Bool
repetitionLevelByteLength :: PageTypeHeader -> Int32
definitionLevelByteLength :: PageTypeHeader -> Int32
dataPageHeaderV2Encoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderV2NumRows :: PageTypeHeader -> Int32
dataPageHeaderV2NumNulls :: PageTypeHeader -> Int32
dataPageHeaderV2NumValues :: PageTypeHeader -> Int32
..} -> do
let n :: Int
n = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
dataPageHeaderV2NumValues
let bs0 :: [Word8]
bs0 = Page -> [Word8]
pageBytes Page
page
let ([Int]
defLvls, [Int]
_repLvls, [Word8]
afterLvls) =
Int
-> Int
-> Int
-> Int32
-> Int32
-> [Word8]
-> ([Int], [Int], [Word8])
readLevelsV2
Int
n
Int
maxDef
Int
maxRep
Int32
definitionLevelByteLength
Int32
repetitionLevelByteLength
[Word8]
bs0
let nPresent :: Int
nPresent =
if Int32
dataPageHeaderV2NumNulls Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
> Int32
0
then Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32
dataPageHeaderV2NumValues Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
- Int32
dataPageHeaderV2NumNulls)
else [Int] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ((Int -> Bool) -> [Int] -> [Int]
forall a. (a -> Bool) -> [a] -> [a]
filter (Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
maxDef) [Int]
defLvls)
case ParquetEncoding
dataPageHeaderV2Encoding of
ParquetEncoding
EPLAIN ->
case ParquetType
pType of
ParquetType
PBOOLEAN ->
let ([Bool]
vals, [Word8]
_) = Int -> [Word8] -> ([Bool], [Word8])
readNBool Int
nPresent [Word8]
afterLvls
in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Bool] -> Column
toMaybeBool Int
maxDef [Int]
defLvls [Bool]
vals)
ParquetType
PINT32 ->
let ([Int32]
vals, [Word8]
_) = Int -> [Word8] -> ([Int32], [Word8])
readNInt32 Int
nPresent [Word8]
afterLvls
in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Int32] -> Column
toMaybeInt32 Int
maxDef [Int]
defLvls [Int32]
vals)
ParquetType
PINT64 ->
let ([Int64]
vals, [Word8]
_) = Int -> [Word8] -> ([Int64], [Word8])
readNInt64 Int
nPresent [Word8]
afterLvls
in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Int64] -> Column
toMaybeInt64 Int
maxDef [Int]
defLvls [Int64]
vals)
ParquetType
PINT96 ->
let ([UTCTime]
vals, [Word8]
_) = Int -> [Word8] -> ([UTCTime], [Word8])
readNInt96Times Int
nPresent [Word8]
afterLvls
in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [UTCTime] -> Column
toMaybeUTCTime Int
maxDef [Int]
defLvls [UTCTime]
vals)
ParquetType
PFLOAT ->
let ([Float]
vals, [Word8]
_) = Int -> [Word8] -> ([Float], [Word8])
readNFloat Int
nPresent [Word8]
afterLvls
in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Float] -> Column
toMaybeFloat Int
maxDef [Int]
defLvls [Float]
vals)
ParquetType
PDOUBLE ->
let ([Double]
vals, [Word8]
_) = Int -> [Word8] -> ([Double], [Word8])
readNDouble Int
nPresent [Word8]
afterLvls
in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Double] -> Column
toMaybeDouble Int
maxDef [Int]
defLvls [Double]
vals)
ParquetType
PBYTE_ARRAY ->
let ([[Word8]]
raws, [Word8]
_) = Int -> [Word8] -> ([[Word8]], [Word8])
readNByteArrays Int
nPresent [Word8]
afterLvls
texts :: [Text]
texts = ([Word8] -> Text) -> [[Word8]] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map ([Char] -> Text
T.pack ([Char] -> Text) -> ([Word8] -> [Char]) -> [Word8] -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Word8 -> Char) -> [Word8] -> [Char]
forall a b. (a -> b) -> [a] -> [b]
map (Int -> Char
chr (Int -> Char) -> (Word8 -> Int) -> Word8 -> Char
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word8 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral)) [[Word8]]
raws
in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Text] -> Column
toMaybeText Int
maxDef [Int]
defLvls [Text]
texts)
ParquetType
PFIXED_LEN_BYTE_ARRAY ->
case Maybe Int32
maybeTypeLength of
Just Int32
len ->
let ([[Word8]]
raws, [Word8]
_) = Int -> Int -> [Word8] -> ([[Word8]], [Word8])
splitFixed Int
nPresent (Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
len) [Word8]
afterLvls
texts :: [Text]
texts = ([Word8] -> Text) -> [[Word8]] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map ([Char] -> Text
T.pack ([Char] -> Text) -> ([Word8] -> [Char]) -> [Word8] -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Word8 -> Char) -> [Word8] -> [Char]
forall a b. (a -> b) -> [a] -> [b]
map (Int -> Char
chr (Int -> Char) -> (Word8 -> Int) -> Word8 -> Char
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word8 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral)) [[Word8]]
raws
in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [Int] -> [Text] -> Column
toMaybeText Int
maxDef [Int]
defLvls [Text]
texts)
Maybe Int32
Nothing -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"FIXED_LEN_BYTE_ARRAY requires type length"
ParquetType
PARQUET_TYPE_UNKNOWN -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"Cannot read unknown Parquet type"
ParquetEncoding
ERLE_DICTIONARY -> Maybe DictVals -> Int -> [Int] -> Int -> [Word8] -> IO Column
decodeDictV1 Maybe DictVals
dictValsM Int
maxDef [Int]
defLvls Int
nPresent [Word8]
afterLvls
ParquetEncoding
EPLAIN_DICTIONARY -> Maybe DictVals -> Int -> [Int] -> Int -> [Word8] -> IO Column
decodeDictV1 Maybe DictVals
dictValsM Int
maxDef [Int]
defLvls Int
nPresent [Word8]
afterLvls
ParquetEncoding
other -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error ([Char]
"Unsupported v2 encoding: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ ParquetEncoding -> [Char]
forall a. Show a => a -> [Char]
show ParquetEncoding
other)
DictionaryPageHeader{} -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"processColumnPages: impossible DictionaryPageHeader"
PageTypeHeader
INDEX_PAGE_HEADER -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"processColumnPages: impossible INDEX_PAGE_HEADER"
PageTypeHeader
PAGE_TYPE_HEADER_UNKNOWN -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"processColumnPages: impossible PAGE_TYPE_HEADER_UNKNOWN"
case [Column]
cols of
[] -> Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$ [Maybe Int] -> Column
forall a.
(Columnable a, ColumnifyRep (KindOf a) a) =>
[a] -> Column
DI.fromList ([] :: [Maybe Int])
(Column
c : [Column]
cs) ->
Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
(Column -> Column -> Column) -> Column -> [Column] -> Column
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
L.foldl' (\Column
l Column
r -> Column -> Either DataFrameException Column -> Column
forall b a. b -> Either a b -> b
fromRight ([Char] -> Column
forall a. HasCallStack => [Char] -> a
error [Char]
"concat failed") (Column -> Column -> Either DataFrameException Column
DI.concatColumns Column
l Column
r)) Column
c [Column]
cs