{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
module DataFrame.IO.Parquet where
import Control.Exception (throw)
import Control.Monad
import Data.Bits
import qualified Data.ByteString as BSO
import Data.Either
import Data.IORef
import Data.Int
import qualified Data.List as L
import qualified Data.Map as M
import qualified Data.Set as S
import qualified Data.Text as T
import Data.Text.Encoding
import Data.Time
import Data.Time.Clock.POSIX (posixSecondsToUTCTime)
import Data.Word
import DataFrame.Errors (DataFrameException (ColumnNotFoundException))
import qualified DataFrame.Internal.Column as DI
import DataFrame.Internal.DataFrame (DataFrame)
import DataFrame.Internal.Expression (Expr, getColumns)
import qualified DataFrame.Operations.Core as DI
import DataFrame.Operations.Merge ()
import qualified DataFrame.Operations.Subset as DS
import System.FilePath.Glob (glob)
import DataFrame.IO.Parquet.Dictionary
import DataFrame.IO.Parquet.Levels
import DataFrame.IO.Parquet.Page
import DataFrame.IO.Parquet.Thrift
import DataFrame.IO.Parquet.Types
import System.Directory (doesDirectoryExist)
import qualified Data.Vector.Unboxed as VU
import System.FilePath ((</>))
data ParquetReadOptions = ParquetReadOptions
{ ParquetReadOptions -> Maybe [Text]
selectedColumns :: Maybe [T.Text]
, ParquetReadOptions -> Maybe (Expr Bool)
predicate :: Maybe (Expr Bool)
, ParquetReadOptions -> Maybe (Int, Int)
rowRange :: Maybe (Int, Int)
}
deriving (ParquetReadOptions -> ParquetReadOptions -> Bool
(ParquetReadOptions -> ParquetReadOptions -> Bool)
-> (ParquetReadOptions -> ParquetReadOptions -> Bool)
-> Eq ParquetReadOptions
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ParquetReadOptions -> ParquetReadOptions -> Bool
== :: ParquetReadOptions -> ParquetReadOptions -> Bool
$c/= :: ParquetReadOptions -> ParquetReadOptions -> Bool
/= :: ParquetReadOptions -> ParquetReadOptions -> Bool
Eq, Int -> ParquetReadOptions -> ShowS
[ParquetReadOptions] -> ShowS
ParquetReadOptions -> [Char]
(Int -> ParquetReadOptions -> ShowS)
-> (ParquetReadOptions -> [Char])
-> ([ParquetReadOptions] -> ShowS)
-> Show ParquetReadOptions
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ParquetReadOptions -> ShowS
showsPrec :: Int -> ParquetReadOptions -> ShowS
$cshow :: ParquetReadOptions -> [Char]
show :: ParquetReadOptions -> [Char]
$cshowList :: [ParquetReadOptions] -> ShowS
showList :: [ParquetReadOptions] -> ShowS
Show)
defaultParquetReadOptions :: ParquetReadOptions
defaultParquetReadOptions :: ParquetReadOptions
defaultParquetReadOptions =
ParquetReadOptions
{ selectedColumns :: Maybe [Text]
selectedColumns = Maybe [Text]
forall a. Maybe a
Nothing
, predicate :: Maybe (Expr Bool)
predicate = Maybe (Expr Bool)
forall a. Maybe a
Nothing
, rowRange :: Maybe (Int, Int)
rowRange = Maybe (Int, Int)
forall a. Maybe a
Nothing
}
readParquet :: FilePath -> IO DataFrame
readParquet :: [Char] -> IO DataFrame
readParquet = ParquetReadOptions -> [Char] -> IO DataFrame
readParquetWithOpts ParquetReadOptions
defaultParquetReadOptions
cleanColPath :: [SNode] -> [String] -> [String]
cleanColPath :: [SNode] -> [[Char]] -> [[Char]]
cleanColPath [SNode]
nodes [[Char]]
path = [SNode] -> [[Char]] -> Bool -> [[Char]]
go [SNode]
nodes [[Char]]
path Bool
False
where
go :: [SNode] -> [[Char]] -> Bool -> [[Char]]
go [SNode]
_ [] Bool
_ = []
go [SNode]
ns ([Char]
p : [[Char]]
ps) Bool
skipThis =
case (SNode -> Bool) -> [SNode] -> Maybe SNode
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
L.find (\SNode
n -> SNode -> [Char]
sName SNode
n [Char] -> [Char] -> Bool
forall a. Eq a => a -> a -> Bool
== [Char]
p) [SNode]
ns of
Maybe SNode
Nothing -> []
Just SNode
n
| SNode -> RepetitionType
sRep SNode
n RepetitionType -> RepetitionType -> Bool
forall a. Eq a => a -> a -> Bool
== RepetitionType
REPEATED Bool -> Bool -> Bool
&& Bool -> Bool
not ([SNode] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (SNode -> [SNode]
sChildren SNode
n)) ->
let skipChildren :: Bool
skipChildren = [SNode] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (SNode -> [SNode]
sChildren SNode
n) Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
in [SNode] -> [[Char]] -> Bool -> [[Char]]
go (SNode -> [SNode]
sChildren SNode
n) [[Char]]
ps Bool
skipChildren
| Bool
skipThis ->
[SNode] -> [[Char]] -> Bool -> [[Char]]
go (SNode -> [SNode]
sChildren SNode
n) [[Char]]
ps Bool
False
| [SNode] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (SNode -> [SNode]
sChildren SNode
n) ->
[[Char]
p]
| Bool
otherwise ->
[Char]
p [Char] -> [[Char]] -> [[Char]]
forall a. a -> [a] -> [a]
: [SNode] -> [[Char]] -> Bool -> [[Char]]
go (SNode -> [SNode]
sChildren SNode
n) [[Char]]
ps Bool
False
readParquetWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame
readParquetWithOpts :: ParquetReadOptions -> [Char] -> IO DataFrame
readParquetWithOpts ParquetReadOptions
opts [Char]
path = do
(FileMetadata
fileMetadata, ByteString
contents) <- [Char] -> IO (FileMetadata, ByteString)
readMetadataFromPath [Char]
path
let columnPaths :: [(Text, Int)]
columnPaths = [SchemaElement] -> [(Text, Int)]
getColumnPaths (Int -> [SchemaElement] -> [SchemaElement]
forall a. Int -> [a] -> [a]
drop Int
1 ([SchemaElement] -> [SchemaElement])
-> [SchemaElement] -> [SchemaElement]
forall a b. (a -> b) -> a -> b
$ FileMetadata -> [SchemaElement]
schema FileMetadata
fileMetadata)
let columnNames :: [Text]
columnNames = ((Text, Int) -> Text) -> [(Text, Int)] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (Text, Int) -> Text
forall a b. (a, b) -> a
fst [(Text, Int)]
columnPaths
let leafNames :: [Text]
leafNames = (Text -> Text) -> [Text] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map ([Text] -> Text
forall a. HasCallStack => [a] -> a
last ([Text] -> Text) -> (Text -> [Text]) -> Text -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HasCallStack => Text -> Text -> [Text]
Text -> Text -> [Text]
T.splitOn Text
".") [Text]
columnNames
let availableSelectedColumns :: [Text]
availableSelectedColumns = [Text] -> [Text]
forall a. Eq a => [a] -> [a]
L.nub [Text]
leafNames
let predicateColumns :: [Text]
predicateColumns = [Text] -> (Expr Bool -> [Text]) -> Maybe (Expr Bool) -> [Text]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [] ([Text] -> [Text]
forall a. Eq a => [a] -> [a]
L.nub ([Text] -> [Text]) -> (Expr Bool -> [Text]) -> Expr Bool -> [Text]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Expr Bool -> [Text]
forall a. Expr a -> [Text]
getColumns) (ParquetReadOptions -> Maybe (Expr Bool)
predicate ParquetReadOptions
opts)
let selectedColumnsForRead :: Maybe [Text]
selectedColumnsForRead = case ParquetReadOptions -> Maybe [Text]
selectedColumns ParquetReadOptions
opts of
Maybe [Text]
Nothing -> Maybe [Text]
forall a. Maybe a
Nothing
Just [Text]
selected -> [Text] -> Maybe [Text]
forall a. a -> Maybe a
Just ([Text] -> [Text]
forall a. Eq a => [a] -> [a]
L.nub ([Text]
selected [Text] -> [Text] -> [Text]
forall a. [a] -> [a] -> [a]
++ [Text]
predicateColumns))
let selectedColumnSet :: Maybe (Set Text)
selectedColumnSet = [Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList ([Text] -> Set Text) -> Maybe [Text] -> Maybe (Set Text)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe [Text]
selectedColumnsForRead
let shouldReadColumn :: Text -> p -> Bool
shouldReadColumn Text
colName p
_ =
case Maybe (Set Text)
selectedColumnSet of
Maybe (Set Text)
Nothing -> Bool
True
Just Set Text
selected -> Text
colName Text -> Set Text -> Bool
forall a. Ord a => a -> Set a -> Bool
`S.member` Set Text
selected
case Maybe [Text]
selectedColumnsForRead of
Maybe [Text]
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just [Text]
requested ->
let missing :: [Text]
missing = [Text]
requested [Text] -> [Text] -> [Text]
forall a. Eq a => [a] -> [a] -> [a]
L.\\ [Text]
availableSelectedColumns
in Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless
([Text] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
L.null [Text]
missing)
( DataFrameException -> IO ()
forall a e. Exception e => e -> a
throw
( Text -> Text -> [Text] -> DataFrameException
ColumnNotFoundException
([Char] -> Text
T.pack ([Char] -> Text) -> [Char] -> Text
forall a b. (a -> b) -> a -> b
$ [Text] -> [Char]
forall a. Show a => a -> [Char]
show [Text]
missing)
Text
"readParquetWithOpts"
[Text]
availableSelectedColumns
)
)
let totalRows :: Int
totalRows = [Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ((RowGroup -> Int) -> [RowGroup] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map (Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int) -> (RowGroup -> Int64) -> RowGroup -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RowGroup -> Int64
rowGroupNumRows) (FileMetadata -> [RowGroup]
rowGroups FileMetadata
fileMetadata)) :: Int
IORef (Map Text MutableColumn)
colMutMap <- Map Text MutableColumn -> IO (IORef (Map Text MutableColumn))
forall a. a -> IO (IORef a)
newIORef (Map Text MutableColumn
forall k a. Map k a
M.empty :: M.Map T.Text DI.MutableColumn)
IORef (Map Text Int)
colOffMap <- Map Text Int -> IO (IORef (Map Text Int))
forall a. a -> IO (IORef a)
newIORef (Map Text Int
forall k a. Map k a
M.empty :: M.Map T.Text Int)
IORef (Map Text LogicalType)
lTypeMap <- Map Text LogicalType -> IO (IORef (Map Text LogicalType))
forall a. a -> IO (IORef a)
newIORef (Map Text LogicalType
forall k a. Map k a
M.empty :: M.Map T.Text LogicalType)
let schemaElements :: [SchemaElement]
schemaElements = FileMetadata -> [SchemaElement]
schema FileMetadata
fileMetadata
let sNodes :: [SNode]
sNodes = [SchemaElement] -> [SNode]
parseAll (Int -> [SchemaElement] -> [SchemaElement]
forall a. Int -> [a] -> [a]
drop Int
1 [SchemaElement]
schemaElements)
let getTypeLength :: [String] -> Maybe Int32
getTypeLength :: [[Char]] -> Maybe Int32
getTypeLength [[Char]]
path = [SchemaElement] -> [[Char]] -> Integer -> Maybe Int32
forall {t}.
Num t =>
[SchemaElement] -> [[Char]] -> t -> Maybe Int32
findTypeLength [SchemaElement]
schemaElements [[Char]]
path Integer
0
where
findTypeLength :: [SchemaElement] -> [[Char]] -> t -> Maybe Int32
findTypeLength [] [[Char]]
_ t
_ = Maybe Int32
forall a. Maybe a
Nothing
findTypeLength (SchemaElement
s : [SchemaElement]
ss) [[Char]]
targetPath t
depth
| (Text -> [Char]) -> [Text] -> [[Char]]
forall a b. (a -> b) -> [a] -> [b]
map Text -> [Char]
T.unpack (SchemaElement -> [SchemaElement] -> t -> [Text]
forall {p} {p} {p} {a}. p -> p -> p -> [a]
pathToElement SchemaElement
s [SchemaElement]
ss t
depth) [[Char]] -> [[Char]] -> Bool
forall a. Eq a => a -> a -> Bool
== [[Char]]
targetPath
Bool -> Bool -> Bool
&& SchemaElement -> TType
elementType SchemaElement
s TType -> TType -> Bool
forall a. Eq a => a -> a -> Bool
== TType
STRING
Bool -> Bool -> Bool
&& SchemaElement -> Int32
typeLength SchemaElement
s Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
> Int32
0 =
Int32 -> Maybe Int32
forall a. a -> Maybe a
Just (SchemaElement -> Int32
typeLength SchemaElement
s)
| Bool
otherwise =
[SchemaElement] -> [[Char]] -> t -> Maybe Int32
findTypeLength [SchemaElement]
ss [[Char]]
targetPath (if SchemaElement -> Int32
numChildren SchemaElement
s Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
> Int32
0 then t
depth t -> t -> t
forall a. Num a => a -> a -> a
+ t
1 else t
depth)
pathToElement :: p -> p -> p -> [a]
pathToElement p
_ p
_ p
_ = []
[RowGroup] -> (RowGroup -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (FileMetadata -> [RowGroup]
rowGroups FileMetadata
fileMetadata) ((RowGroup -> IO ()) -> IO ()) -> (RowGroup -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \RowGroup
rowGroup -> do
[(ColumnChunk, Integer)]
-> ((ColumnChunk, Integer) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([ColumnChunk] -> [Integer] -> [(ColumnChunk, Integer)]
forall a b. [a] -> [b] -> [(a, b)]
zip (RowGroup -> [ColumnChunk]
rowGroupColumns RowGroup
rowGroup) [Integer
0 ..]) (((ColumnChunk, Integer) -> IO ()) -> IO ())
-> ((ColumnChunk, Integer) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(ColumnChunk
colChunk, Integer
colIdx) -> do
let metadata :: ColumnMetaData
metadata = ColumnChunk -> ColumnMetaData
columnMetaData ColumnChunk
colChunk
let colPath :: [[Char]]
colPath = ColumnMetaData -> [[Char]]
columnPathInSchema ColumnMetaData
metadata
let cleanPath :: [[Char]]
cleanPath = [SNode] -> [[Char]] -> [[Char]]
cleanColPath [SNode]
sNodes [[Char]]
colPath
let colLeafName :: Text
colLeafName =
if [[Char]] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [[Char]]
cleanPath
then [Char] -> Text
T.pack ([Char] -> Text) -> [Char] -> Text
forall a b. (a -> b) -> a -> b
$ [Char]
"col_" [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Integer -> [Char]
forall a. Show a => a -> [Char]
show Integer
colIdx
else [Char] -> Text
T.pack ([Char] -> Text) -> [Char] -> Text
forall a b. (a -> b) -> a -> b
$ [[Char]] -> [Char]
forall a. HasCallStack => [a] -> a
last [[Char]]
cleanPath
let colFullName :: Text
colFullName =
if [[Char]] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [[Char]]
cleanPath
then Text
colLeafName
else Text -> [Text] -> Text
T.intercalate Text
"." ([Text] -> Text) -> [Text] -> Text
forall a b. (a -> b) -> a -> b
$ ([Char] -> Text) -> [[Char]] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map [Char] -> Text
T.pack [[Char]]
cleanPath
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Text -> [[Char]] -> Bool
forall {p}. Text -> p -> Bool
shouldReadColumn Text
colLeafName [[Char]]
colPath) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
let colDataPageOffset :: Int64
colDataPageOffset = ColumnMetaData -> Int64
columnDataPageOffset ColumnMetaData
metadata
let colDictionaryPageOffset :: Int64
colDictionaryPageOffset = ColumnMetaData -> Int64
columnDictionaryPageOffset ColumnMetaData
metadata
let colStart :: Int64
colStart =
if Int64
colDictionaryPageOffset Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
0 Bool -> Bool -> Bool
&& Int64
colDataPageOffset Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
colDictionaryPageOffset
then Int64
colDictionaryPageOffset
else Int64
colDataPageOffset
let colLength :: Int64
colLength = ColumnMetaData -> Int64
columnTotalCompressedSize ColumnMetaData
metadata
let columnBytes :: ByteString
columnBytes = Int -> ByteString -> ByteString
BSO.take (Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
colLength) (Int -> ByteString -> ByteString
BSO.drop (Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
colStart) ByteString
contents)
[Page]
pages <- CompressionCodec -> ByteString -> IO [Page]
readAllPages (ColumnMetaData -> CompressionCodec
columnCodec ColumnMetaData
metadata) ByteString
columnBytes
let maybeTypeLength :: Maybe Int32
maybeTypeLength =
if ColumnMetaData -> ParquetType
columnType ColumnMetaData
metadata ParquetType -> ParquetType -> Bool
forall a. Eq a => a -> a -> Bool
== ParquetType
PFIXED_LEN_BYTE_ARRAY
then [[Char]] -> Maybe Int32
getTypeLength [[Char]]
colPath
else Maybe Int32
forall a. Maybe a
Nothing
let primaryEncoding :: ParquetEncoding
primaryEncoding = ParquetEncoding
-> ((ParquetEncoding, [ParquetEncoding]) -> ParquetEncoding)
-> Maybe (ParquetEncoding, [ParquetEncoding])
-> ParquetEncoding
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ParquetEncoding
EPLAIN (ParquetEncoding, [ParquetEncoding]) -> ParquetEncoding
forall a b. (a, b) -> a
fst ([ParquetEncoding] -> Maybe (ParquetEncoding, [ParquetEncoding])
forall a. [a] -> Maybe (a, [a])
L.uncons (ColumnMetaData -> [ParquetEncoding]
columnEncodings ColumnMetaData
metadata))
let schemaTail :: [SchemaElement]
schemaTail = Int -> [SchemaElement] -> [SchemaElement]
forall a. Int -> [a] -> [a]
drop Int
1 (FileMetadata -> [SchemaElement]
schema FileMetadata
fileMetadata)
let (Int
maxDef, Int
maxRep) = [SchemaElement] -> [[Char]] -> (Int, Int)
levelsForPath [SchemaElement]
schemaTail [[Char]]
colPath
let lType :: LogicalType
lType =
LogicalType
-> (SchemaElement -> LogicalType)
-> Maybe SchemaElement
-> LogicalType
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
LogicalType
LOGICAL_TYPE_UNKNOWN
SchemaElement -> LogicalType
logicalType
([SchemaElement] -> [[Char]] -> Maybe SchemaElement
findLeafSchema [SchemaElement]
schemaTail [[Char]]
colPath)
Column
column <-
(Int, Int)
-> [Page]
-> ParquetType
-> ParquetEncoding
-> Maybe Int32
-> LogicalType
-> IO Column
processColumnPages
(Int
maxDef, Int
maxRep)
[Page]
pages
(ColumnMetaData -> ParquetType
columnType ColumnMetaData
metadata)
ParquetEncoding
primaryEncoding
Maybe Int32
maybeTypeLength
LogicalType
lType
Map Text MutableColumn
mutMapSnap <- IORef (Map Text MutableColumn) -> IO (Map Text MutableColumn)
forall a. IORef a -> IO a
readIORef IORef (Map Text MutableColumn)
colMutMap
case Text -> Map Text MutableColumn -> Maybe MutableColumn
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup Text
colFullName Map Text MutableColumn
mutMapSnap of
Maybe MutableColumn
Nothing -> do
MutableColumn
mc <- Int -> Column -> IO MutableColumn
DI.newMutableColumn Int
totalRows Column
column
MutableColumn -> Int -> Column -> IO ()
DI.copyIntoMutableColumn MutableColumn
mc Int
0 Column
column
IORef (Map Text MutableColumn)
-> (Map Text MutableColumn -> Map Text MutableColumn) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef (Map Text MutableColumn)
colMutMap (Text
-> MutableColumn
-> Map Text MutableColumn
-> Map Text MutableColumn
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert Text
colFullName MutableColumn
mc)
IORef (Map Text Int) -> (Map Text Int -> Map Text Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef (Map Text Int)
colOffMap (Text -> Int -> Map Text Int -> Map Text Int
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert Text
colFullName (Column -> Int
DI.columnLength Column
column))
Just MutableColumn
mc -> do
Int
off <- (Map Text Int -> Text -> Int
forall k a. Ord k => Map k a -> k -> a
M.! Text
colFullName) (Map Text Int -> Int) -> IO (Map Text Int) -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef (Map Text Int) -> IO (Map Text Int)
forall a. IORef a -> IO a
readIORef IORef (Map Text Int)
colOffMap
MutableColumn -> Int -> Column -> IO ()
DI.copyIntoMutableColumn MutableColumn
mc Int
off Column
column
IORef (Map Text Int) -> (Map Text Int -> Map Text Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef (Map Text Int)
colOffMap ((Int -> Int) -> Text -> Map Text Int -> Map Text Int
forall k a. Ord k => (a -> a) -> k -> Map k a -> Map k a
M.adjust (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Column -> Int
DI.columnLength Column
column) Text
colFullName)
IORef (Map Text LogicalType)
-> (Map Text LogicalType -> Map Text LogicalType) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef (Map Text LogicalType)
lTypeMap (Text -> LogicalType -> Map Text LogicalType -> Map Text LogicalType
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert Text
colFullName LogicalType
lType)
Map Text MutableColumn
finalMutMap <- IORef (Map Text MutableColumn) -> IO (Map Text MutableColumn)
forall a. IORef a -> IO a
readIORef IORef (Map Text MutableColumn)
colMutMap
Map Text Column
finalColMap <-
(Text -> MutableColumn -> IO Column)
-> Map Text MutableColumn -> IO (Map Text Column)
forall (t :: * -> *) k a b.
Applicative t =>
(k -> a -> t b) -> Map k a -> t (Map k b)
M.traverseWithKey (\Text
_ MutableColumn
mc -> MutableColumn -> IO Column
DI.freezeMutableColumn MutableColumn
mc) Map Text MutableColumn
finalMutMap
Map Text LogicalType
finalLTypeMap <- IORef (Map Text LogicalType) -> IO (Map Text LogicalType)
forall a. IORef a -> IO a
readIORef IORef (Map Text LogicalType)
lTypeMap
let orderedColumns :: [(Text, Column)]
orderedColumns =
(Text -> (Text, Column)) -> [Text] -> [(Text, Column)]
forall a b. (a -> b) -> [a] -> [b]
map
( \Text
name ->
( Text
name
, LogicalType -> Column -> Column
applyLogicalType (Map Text LogicalType
finalLTypeMap Map Text LogicalType -> Text -> LogicalType
forall k a. Ord k => Map k a -> k -> a
M.! Text
name) (Column -> Column) -> Column -> Column
forall a b. (a -> b) -> a -> b
$ Map Text Column
finalColMap Map Text Column -> Text -> Column
forall k a. Ord k => Map k a -> k -> a
M.! Text
name
)
)
((Text -> Bool) -> [Text] -> [Text]
forall a. (a -> Bool) -> [a] -> [a]
filter (Text -> Map Text Column -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`M.member` Map Text Column
finalColMap) [Text]
columnNames)
DataFrame -> IO DataFrame
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DataFrame -> IO DataFrame) -> DataFrame -> IO DataFrame
forall a b. (a -> b) -> a -> b
$ ParquetReadOptions -> DataFrame -> DataFrame
applyReadOptions ParquetReadOptions
opts ([(Text, Column)] -> DataFrame
DI.fromNamedColumns [(Text, Column)]
orderedColumns)
readParquetFiles :: FilePath -> IO DataFrame
readParquetFiles :: [Char] -> IO DataFrame
readParquetFiles = ParquetReadOptions -> [Char] -> IO DataFrame
readParquetFilesWithOpts ParquetReadOptions
defaultParquetReadOptions
readParquetFilesWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame
readParquetFilesWithOpts :: ParquetReadOptions -> [Char] -> IO DataFrame
readParquetFilesWithOpts ParquetReadOptions
opts [Char]
path = do
Bool
isDir <- [Char] -> IO Bool
doesDirectoryExist [Char]
path
let pat :: [Char]
pat = if Bool
isDir then [Char]
path [Char] -> ShowS
</> [Char]
"*.parquet" else [Char]
path
[[Char]]
matches <- [Char] -> IO [[Char]]
glob [Char]
pat
[[Char]]
files <- ([Char] -> IO Bool) -> [[Char]] -> IO [[Char]]
forall (m :: * -> *) a.
Applicative m =>
(a -> m Bool) -> [a] -> m [a]
filterM ((Bool -> Bool) -> IO Bool -> IO Bool
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Bool -> Bool
not (IO Bool -> IO Bool) -> ([Char] -> IO Bool) -> [Char] -> IO Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Char] -> IO Bool
doesDirectoryExist) [[Char]]
matches
case [[Char]]
files of
[] ->
[Char] -> IO DataFrame
forall a. HasCallStack => [Char] -> a
error ([Char] -> IO DataFrame) -> [Char] -> IO DataFrame
forall a b. (a -> b) -> a -> b
$
[Char]
"readParquetFiles: no parquet files found for " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
path
[[Char]]
_ -> do
let optsWithoutRowRange :: ParquetReadOptions
optsWithoutRowRange = ParquetReadOptions
opts{rowRange = Nothing}
[DataFrame]
dfs <- ([Char] -> IO DataFrame) -> [[Char]] -> IO [DataFrame]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (ParquetReadOptions -> [Char] -> IO DataFrame
readParquetWithOpts ParquetReadOptions
optsWithoutRowRange) [[Char]]
files
DataFrame -> IO DataFrame
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ParquetReadOptions -> DataFrame -> DataFrame
applyRowRange ParquetReadOptions
opts ([DataFrame] -> DataFrame
forall a. Monoid a => [a] -> a
mconcat [DataFrame]
dfs))
applyRowRange :: ParquetReadOptions -> DataFrame -> DataFrame
applyRowRange :: ParquetReadOptions -> DataFrame -> DataFrame
applyRowRange ParquetReadOptions
opts DataFrame
df =
DataFrame
-> ((Int, Int) -> DataFrame) -> Maybe (Int, Int) -> DataFrame
forall b a. b -> (a -> b) -> Maybe a -> b
maybe DataFrame
df ((Int, Int) -> DataFrame -> DataFrame
`DS.range` DataFrame
df) (ParquetReadOptions -> Maybe (Int, Int)
rowRange ParquetReadOptions
opts)
applySelectedColumns :: ParquetReadOptions -> DataFrame -> DataFrame
applySelectedColumns :: ParquetReadOptions -> DataFrame -> DataFrame
applySelectedColumns ParquetReadOptions
opts DataFrame
df =
DataFrame -> ([Text] -> DataFrame) -> Maybe [Text] -> DataFrame
forall b a. b -> (a -> b) -> Maybe a -> b
maybe DataFrame
df ([Text] -> DataFrame -> DataFrame
`DS.select` DataFrame
df) (ParquetReadOptions -> Maybe [Text]
selectedColumns ParquetReadOptions
opts)
applyPredicate :: ParquetReadOptions -> DataFrame -> DataFrame
applyPredicate :: ParquetReadOptions -> DataFrame -> DataFrame
applyPredicate ParquetReadOptions
opts DataFrame
df =
DataFrame
-> (Expr Bool -> DataFrame) -> Maybe (Expr Bool) -> DataFrame
forall b a. b -> (a -> b) -> Maybe a -> b
maybe DataFrame
df (Expr Bool -> DataFrame -> DataFrame
`DS.filterWhere` DataFrame
df) (ParquetReadOptions -> Maybe (Expr Bool)
predicate ParquetReadOptions
opts)
applyReadOptions :: ParquetReadOptions -> DataFrame -> DataFrame
applyReadOptions :: ParquetReadOptions -> DataFrame -> DataFrame
applyReadOptions ParquetReadOptions
opts =
ParquetReadOptions -> DataFrame -> DataFrame
applyRowRange ParquetReadOptions
opts
(DataFrame -> DataFrame)
-> (DataFrame -> DataFrame) -> DataFrame -> DataFrame
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ParquetReadOptions -> DataFrame -> DataFrame
applySelectedColumns ParquetReadOptions
opts
(DataFrame -> DataFrame)
-> (DataFrame -> DataFrame) -> DataFrame -> DataFrame
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ParquetReadOptions -> DataFrame -> DataFrame
applyPredicate ParquetReadOptions
opts
readMetadataFromPath :: FilePath -> IO (FileMetadata, BSO.ByteString)
readMetadataFromPath :: [Char] -> IO (FileMetadata, ByteString)
readMetadataFromPath [Char]
path = do
ByteString
contents <- [Char] -> IO ByteString
BSO.readFile [Char]
path
let (Int
size, ByteString
magicString) = ByteString
contents ByteString -> (Int, ByteString) -> (Int, ByteString)
forall a b. a -> b -> b
`seq` ByteString -> (Int, ByteString)
readMetadataSizeFromFooter ByteString
contents
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString
magicString ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= ByteString
"PAR1") (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Invalid Parquet file"
FileMetadata
meta <- ByteString -> Int -> IO FileMetadata
readMetadata ByteString
contents Int
size
(FileMetadata, ByteString) -> IO (FileMetadata, ByteString)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (FileMetadata
meta, ByteString
contents)
readMetadataSizeFromFooter :: BSO.ByteString -> (Int, BSO.ByteString)
ByteString
contents =
let
footerOffSet :: Int
footerOffSet = ByteString -> Int
BSO.length ByteString
contents Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
8
sizeBytes :: [Int32]
sizeBytes =
(Int -> Int32) -> [Int] -> [Int32]
forall a b. (a -> b) -> [a] -> [b]
map
(forall a b. (Integral a, Num b) => a -> b
fromIntegral @Word8 @Int32 (Word8 -> Int32) -> (Int -> Word8) -> Int -> Int32
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HasCallStack => ByteString -> Int -> Word8
ByteString -> Int -> Word8
BSO.index ByteString
contents)
[Int
footerOffSet .. Int
footerOffSet Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
3]
size :: Int
size = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32 -> Int) -> Int32 -> Int
forall a b. (a -> b) -> a -> b
$ (Int32 -> Int32 -> Int32) -> Int32 -> [Int32] -> Int32
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
L.foldl' Int32 -> Int32 -> Int32
forall a. Bits a => a -> a -> a
(.|.) Int32
0 ([Int32] -> Int32) -> [Int32] -> Int32
forall a b. (a -> b) -> a -> b
$ (Int32 -> Int -> Int32) -> [Int32] -> [Int] -> [Int32]
forall a b c. (a -> b -> c) -> [a] -> [b] -> [c]
zipWith Int32 -> Int -> Int32
forall a. Bits a => a -> Int -> a
shift [Int32]
sizeBytes [Int
0, Int
8, Int
16, Int
24]
magicStringBytes :: [Word8]
magicStringBytes = (Int -> Word8) -> [Int] -> [Word8]
forall a b. (a -> b) -> [a] -> [b]
map (HasCallStack => ByteString -> Int -> Word8
ByteString -> Int -> Word8
BSO.index ByteString
contents) [Int
footerOffSet Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
4 .. Int
footerOffSet Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
7]
magicString :: ByteString
magicString = [Word8] -> ByteString
BSO.pack [Word8]
magicStringBytes
in
(Int
size, ByteString
magicString)
getColumnPaths :: [SchemaElement] -> [(T.Text, Int)]
getColumnPaths :: [SchemaElement] -> [(Text, Int)]
getColumnPaths [SchemaElement]
schemaElements =
let nodes :: [SNode]
nodes = [SchemaElement] -> [SNode]
parseAll [SchemaElement]
schemaElements
in [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go [SNode]
nodes Int
0 [] Bool
False
where
go :: [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go [] Int
_ [Text]
_ Bool
_ = []
go (SNode
n : [SNode]
ns) Int
idx [Text]
path Bool
skipThis
| [SNode] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (SNode -> [SNode]
sChildren SNode
n) =
let newPath :: [Text]
newPath = if Bool
skipThis then [Text]
path else [Text]
path [Text] -> [Text] -> [Text]
forall a. [a] -> [a] -> [a]
++ [[Char] -> Text
T.pack (SNode -> [Char]
sName SNode
n)]
fullPath :: Text
fullPath = Text -> [Text] -> Text
T.intercalate Text
"." [Text]
newPath
in (Text
fullPath, Int
idx) (Text, Int) -> [(Text, Int)] -> [(Text, Int)]
forall a. a -> [a] -> [a]
: [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go [SNode]
ns (Int
idx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) [Text]
path Bool
skipThis
| SNode -> RepetitionType
sRep SNode
n RepetitionType -> RepetitionType -> Bool
forall a. Eq a => a -> a -> Bool
== RepetitionType
REPEATED =
let skipChildren :: Bool
skipChildren = [SNode] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (SNode -> [SNode]
sChildren SNode
n) Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
childLeaves :: [(Text, Int)]
childLeaves = [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go (SNode -> [SNode]
sChildren SNode
n) Int
idx [Text]
path Bool
skipChildren
in [(Text, Int)]
childLeaves [(Text, Int)] -> [(Text, Int)] -> [(Text, Int)]
forall a. [a] -> [a] -> [a]
++ [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go [SNode]
ns (Int
idx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ [(Text, Int)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(Text, Int)]
childLeaves) [Text]
path Bool
skipThis
| Bool
skipThis =
let childLeaves :: [(Text, Int)]
childLeaves = [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go (SNode -> [SNode]
sChildren SNode
n) Int
idx [Text]
path Bool
False
in [(Text, Int)]
childLeaves [(Text, Int)] -> [(Text, Int)] -> [(Text, Int)]
forall a. [a] -> [a] -> [a]
++ [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go [SNode]
ns (Int
idx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ [(Text, Int)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(Text, Int)]
childLeaves) [Text]
path Bool
skipThis
| Bool
otherwise =
let subPath :: [Text]
subPath = [Text]
path [Text] -> [Text] -> [Text]
forall a. [a] -> [a] -> [a]
++ [[Char] -> Text
T.pack (SNode -> [Char]
sName SNode
n)]
childLeaves :: [(Text, Int)]
childLeaves = [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go (SNode -> [SNode]
sChildren SNode
n) Int
idx [Text]
subPath Bool
False
in [(Text, Int)]
childLeaves [(Text, Int)] -> [(Text, Int)] -> [(Text, Int)]
forall a. [a] -> [a] -> [a]
++ [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go [SNode]
ns (Int
idx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ [(Text, Int)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(Text, Int)]
childLeaves) [Text]
path Bool
skipThis
findLeafSchema :: [SchemaElement] -> [String] -> Maybe SchemaElement
findLeafSchema :: [SchemaElement] -> [[Char]] -> Maybe SchemaElement
findLeafSchema [SchemaElement]
elems [[Char]]
path =
case [SNode] -> [[Char]] -> Maybe SNode
go ([SchemaElement] -> [SNode]
parseAll [SchemaElement]
elems) [[Char]]
path of
Just SNode
node -> (SchemaElement -> Bool) -> [SchemaElement] -> Maybe SchemaElement
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
L.find (\SchemaElement
e -> Text -> [Char]
T.unpack (SchemaElement -> Text
elementName SchemaElement
e) [Char] -> [Char] -> Bool
forall a. Eq a => a -> a -> Bool
== SNode -> [Char]
sName SNode
node) [SchemaElement]
elems
Maybe SNode
Nothing -> Maybe SchemaElement
forall a. Maybe a
Nothing
where
go :: [SNode] -> [[Char]] -> Maybe SNode
go [] [[Char]]
_ = Maybe SNode
forall a. Maybe a
Nothing
go [SNode]
_ [] = Maybe SNode
forall a. Maybe a
Nothing
go [SNode]
nodes [[Char]
p] = (SNode -> Bool) -> [SNode] -> Maybe SNode
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
L.find (\SNode
n -> SNode -> [Char]
sName SNode
n [Char] -> [Char] -> Bool
forall a. Eq a => a -> a -> Bool
== [Char]
p) [SNode]
nodes
go [SNode]
nodes ([Char]
p : [[Char]]
ps) = (SNode -> Bool) -> [SNode] -> Maybe SNode
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
L.find (\SNode
n -> SNode -> [Char]
sName SNode
n [Char] -> [Char] -> Bool
forall a. Eq a => a -> a -> Bool
== [Char]
p) [SNode]
nodes Maybe SNode -> (SNode -> Maybe SNode) -> Maybe SNode
forall a b. Maybe a -> (a -> Maybe b) -> Maybe b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \SNode
n -> [SNode] -> [[Char]] -> Maybe SNode
go (SNode -> [SNode]
sChildren SNode
n) [[Char]]
ps
processColumnPages ::
(Int, Int) ->
[Page] ->
ParquetType ->
ParquetEncoding ->
Maybe Int32 ->
LogicalType ->
IO DI.Column
processColumnPages :: (Int, Int)
-> [Page]
-> ParquetType
-> ParquetEncoding
-> Maybe Int32
-> LogicalType
-> IO Column
processColumnPages (Int
maxDef, Int
maxRep) [Page]
pages ParquetType
pType ParquetEncoding
_ Maybe Int32
maybeTypeLength LogicalType
lType = do
let dictPages :: [Page]
dictPages = (Page -> Bool) -> [Page] -> [Page]
forall a. (a -> Bool) -> [a] -> [a]
filter Page -> Bool
isDictionaryPage [Page]
pages
let dataPages :: [Page]
dataPages = (Page -> Bool) -> [Page] -> [Page]
forall a. (a -> Bool) -> [a] -> [a]
filter Page -> Bool
isDataPage [Page]
pages
let dictValsM :: Maybe DictVals
dictValsM =
case [Page]
dictPages of
[] -> Maybe DictVals
forall a. Maybe a
Nothing
(Page
dictPage : [Page]
_) ->
case PageHeader -> PageTypeHeader
pageTypeHeader (Page -> PageHeader
pageHeader Page
dictPage) of
DictionaryPageHeader{Bool
Int32
ParquetEncoding
dictionaryPageHeaderNumValues :: Int32
dictionaryPageHeaderEncoding :: ParquetEncoding
dictionaryPageIsSorted :: Bool
dictionaryPageIsSorted :: PageTypeHeader -> Bool
dictionaryPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dictionaryPageHeaderNumValues :: PageTypeHeader -> Int32
..} ->
let countForBools :: Maybe Int32
countForBools =
if ParquetType
pType ParquetType -> ParquetType -> Bool
forall a. Eq a => a -> a -> Bool
== ParquetType
PBOOLEAN
then Int32 -> Maybe Int32
forall a. a -> Maybe a
Just Int32
dictionaryPageHeaderNumValues
else Maybe Int32
maybeTypeLength
in DictVals -> Maybe DictVals
forall a. a -> Maybe a
Just (ParquetType -> ByteString -> Maybe Int32 -> DictVals
readDictVals ParquetType
pType (Page -> ByteString
pageBytes Page
dictPage) Maybe Int32
countForBools)
PageTypeHeader
_ -> Maybe DictVals
forall a. Maybe a
Nothing
[Column]
cols <- [Page] -> (Page -> IO Column) -> IO [Column]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Page]
dataPages ((Page -> IO Column) -> IO [Column])
-> (Page -> IO Column) -> IO [Column]
forall a b. (a -> b) -> a -> b
$ \Page
page -> do
let bs0 :: ByteString
bs0 = Page -> ByteString
pageBytes Page
page
case PageHeader -> PageTypeHeader
pageTypeHeader (Page -> PageHeader
pageHeader Page
page) of
DataPageHeader{Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderNumValues :: Int32
dataPageHeaderEncoding :: ParquetEncoding
definitionLevelEncoding :: ParquetEncoding
repetitionLevelEncoding :: ParquetEncoding
dataPageHeaderStatistics :: ColumnStatistics
dataPageHeaderStatistics :: PageTypeHeader -> ColumnStatistics
repetitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
definitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderNumValues :: PageTypeHeader -> Int32
..} -> do
let n :: Int
n = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
dataPageHeaderNumValues
([Int]
defLvls, [Int]
repLvls, ByteString
afterLvls) = Int -> Int -> Int -> ByteString -> ([Int], [Int], ByteString)
readLevelsV1 Int
n Int
maxDef Int
maxRep ByteString
bs0
nPresent :: Int
nPresent = [Int] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ((Int -> Bool) -> [Int] -> [Int]
forall a. (a -> Bool) -> [a] -> [a]
filter (Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
maxDef) [Int]
defLvls)
Maybe DictVals
-> (Int, Int)
-> ParquetType
-> Maybe Int32
-> ParquetEncoding
-> [Int]
-> [Int]
-> Int
-> ByteString
-> [Char]
-> IO Column
decodePageData
Maybe DictVals
dictValsM
(Int
maxDef, Int
maxRep)
ParquetType
pType
Maybe Int32
maybeTypeLength
ParquetEncoding
dataPageHeaderEncoding
[Int]
defLvls
[Int]
repLvls
Int
nPresent
ByteString
afterLvls
[Char]
"v1"
DataPageHeaderV2{Bool
Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderV2NumValues :: Int32
dataPageHeaderV2NumNulls :: Int32
dataPageHeaderV2NumRows :: Int32
dataPageHeaderV2Encoding :: ParquetEncoding
definitionLevelByteLength :: Int32
repetitionLevelByteLength :: Int32
dataPageHeaderV2IsCompressed :: Bool
dataPageHeaderV2Statistics :: ColumnStatistics
dataPageHeaderV2Statistics :: PageTypeHeader -> ColumnStatistics
dataPageHeaderV2IsCompressed :: PageTypeHeader -> Bool
repetitionLevelByteLength :: PageTypeHeader -> Int32
definitionLevelByteLength :: PageTypeHeader -> Int32
dataPageHeaderV2Encoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderV2NumRows :: PageTypeHeader -> Int32
dataPageHeaderV2NumNulls :: PageTypeHeader -> Int32
dataPageHeaderV2NumValues :: PageTypeHeader -> Int32
..} -> do
let n :: Int
n = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
dataPageHeaderV2NumValues
([Int]
defLvls, [Int]
repLvls, ByteString
afterLvls) =
Int
-> Int
-> Int
-> Int32
-> Int32
-> ByteString
-> ([Int], [Int], ByteString)
readLevelsV2
Int
n
Int
maxDef
Int
maxRep
Int32
definitionLevelByteLength
Int32
repetitionLevelByteLength
ByteString
bs0
nPresent :: Int
nPresent
| Int32
dataPageHeaderV2NumNulls Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
> Int32
0 =
Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32
dataPageHeaderV2NumValues Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
- Int32
dataPageHeaderV2NumNulls)
| Bool
otherwise = [Int] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ((Int -> Bool) -> [Int] -> [Int]
forall a. (a -> Bool) -> [a] -> [a]
filter (Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
maxDef) [Int]
defLvls)
Maybe DictVals
-> (Int, Int)
-> ParquetType
-> Maybe Int32
-> ParquetEncoding
-> [Int]
-> [Int]
-> Int
-> ByteString
-> [Char]
-> IO Column
decodePageData
Maybe DictVals
dictValsM
(Int
maxDef, Int
maxRep)
ParquetType
pType
Maybe Int32
maybeTypeLength
ParquetEncoding
dataPageHeaderV2Encoding
[Int]
defLvls
[Int]
repLvls
Int
nPresent
ByteString
afterLvls
[Char]
"v2"
DictionaryPageHeader{} -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"processColumnPages: impossible DictionaryPageHeader"
PageTypeHeader
INDEX_PAGE_HEADER -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"processColumnPages: impossible INDEX_PAGE_HEADER"
PageTypeHeader
PAGE_TYPE_HEADER_UNKNOWN -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"processColumnPages: impossible PAGE_TYPE_HEADER_UNKNOWN"
Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$ [Column] -> Column
DI.concatManyColumns [Column]
cols
decodePageData ::
Maybe DictVals ->
(Int, Int) ->
ParquetType ->
Maybe Int32 ->
ParquetEncoding ->
[Int] ->
[Int] ->
Int ->
BSO.ByteString ->
String ->
IO DI.Column
decodePageData :: Maybe DictVals
-> (Int, Int)
-> ParquetType
-> Maybe Int32
-> ParquetEncoding
-> [Int]
-> [Int]
-> Int
-> ByteString
-> [Char]
-> IO Column
decodePageData Maybe DictVals
dictValsM (Int
maxDef, Int
maxRep) ParquetType
pType Maybe Int32
maybeTypeLength ParquetEncoding
encoding [Int]
defLvls [Int]
repLvls Int
nPresent ByteString
afterLvls [Char]
versionLabel =
case ParquetEncoding
encoding of
ParquetEncoding
EPLAIN ->
case ParquetType
pType of
ParquetType
PBOOLEAN ->
let ([Bool]
vals, ByteString
_) = Int -> ByteString -> ([Bool], ByteString)
readNBool Int
nPresent ByteString
afterLvls
in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
if Int
maxRep Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then Int -> Int -> [Int] -> [Int] -> [Bool] -> Column
stitchForRepBool Int
maxRep Int
maxDef [Int]
repLvls [Int]
defLvls [Bool]
vals
else Int -> [Int] -> [Bool] -> Column
toMaybeBool Int
maxDef [Int]
defLvls [Bool]
vals
ParquetType
PINT32
| Int
maxDef Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
, Int
maxRep Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 ->
Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$ Vector Int32 -> Column
forall a. (Columnable a, Unbox a) => Vector a -> Column
DI.fromUnboxedVector (Int -> ByteString -> Vector Int32
readNInt32Vec Int
nPresent ByteString
afterLvls)
ParquetType
PINT32 ->
let ([Int32]
vals, ByteString
_) = Int -> ByteString -> ([Int32], ByteString)
readNInt32 Int
nPresent ByteString
afterLvls
in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
if Int
maxRep Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then Int -> Int -> [Int] -> [Int] -> [Int32] -> Column
stitchForRepInt32 Int
maxRep Int
maxDef [Int]
repLvls [Int]
defLvls [Int32]
vals
else Int -> [Int] -> [Int32] -> Column
toMaybeInt32 Int
maxDef [Int]
defLvls [Int32]
vals
ParquetType
PINT64
| Int
maxDef Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
, Int
maxRep Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 ->
Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$ Vector Int64 -> Column
forall a. (Columnable a, Unbox a) => Vector a -> Column
DI.fromUnboxedVector (Int -> ByteString -> Vector Int64
readNInt64Vec Int
nPresent ByteString
afterLvls)
ParquetType
PINT64 ->
let ([Int64]
vals, ByteString
_) = Int -> ByteString -> ([Int64], ByteString)
readNInt64 Int
nPresent ByteString
afterLvls
in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
if Int
maxRep Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then Int -> Int -> [Int] -> [Int] -> [Int64] -> Column
stitchForRepInt64 Int
maxRep Int
maxDef [Int]
repLvls [Int]
defLvls [Int64]
vals
else Int -> [Int] -> [Int64] -> Column
toMaybeInt64 Int
maxDef [Int]
defLvls [Int64]
vals
ParquetType
PINT96 ->
let ([UTCTime]
vals, ByteString
_) = Int -> ByteString -> ([UTCTime], ByteString)
readNInt96Times Int
nPresent ByteString
afterLvls
in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
if Int
maxRep Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then Int -> Int -> [Int] -> [Int] -> [UTCTime] -> Column
stitchForRepUTCTime Int
maxRep Int
maxDef [Int]
repLvls [Int]
defLvls [UTCTime]
vals
else Int -> [Int] -> [UTCTime] -> Column
toMaybeUTCTime Int
maxDef [Int]
defLvls [UTCTime]
vals
ParquetType
PFLOAT
| Int
maxDef Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
, Int
maxRep Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 ->
Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$ Vector Float -> Column
forall a. (Columnable a, Unbox a) => Vector a -> Column
DI.fromUnboxedVector (Int -> ByteString -> Vector Float
readNFloatVec Int
nPresent ByteString
afterLvls)
ParquetType
PFLOAT ->
let ([Float]
vals, ByteString
_) = Int -> ByteString -> ([Float], ByteString)
readNFloat Int
nPresent ByteString
afterLvls
in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
if Int
maxRep Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then Int -> Int -> [Int] -> [Int] -> [Float] -> Column
stitchForRepFloat Int
maxRep Int
maxDef [Int]
repLvls [Int]
defLvls [Float]
vals
else Int -> [Int] -> [Float] -> Column
toMaybeFloat Int
maxDef [Int]
defLvls [Float]
vals
ParquetType
PDOUBLE
| Int
maxDef Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
, Int
maxRep Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 ->
Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$ Vector Double -> Column
forall a. (Columnable a, Unbox a) => Vector a -> Column
DI.fromUnboxedVector (Int -> ByteString -> Vector Double
readNDoubleVec Int
nPresent ByteString
afterLvls)
ParquetType
PDOUBLE ->
let ([Double]
vals, ByteString
_) = Int -> ByteString -> ([Double], ByteString)
readNDouble Int
nPresent ByteString
afterLvls
in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
if Int
maxRep Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then Int -> Int -> [Int] -> [Int] -> [Double] -> Column
stitchForRepDouble Int
maxRep Int
maxDef [Int]
repLvls [Int]
defLvls [Double]
vals
else Int -> [Int] -> [Double] -> Column
toMaybeDouble Int
maxDef [Int]
defLvls [Double]
vals
ParquetType
PBYTE_ARRAY ->
let ([ByteString]
raws, ByteString
_) = Int -> ByteString -> ([ByteString], ByteString)
readNByteArrays Int
nPresent ByteString
afterLvls
texts :: [Text]
texts = (ByteString -> Text) -> [ByteString] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map ByteString -> Text
decodeUtf8Lenient [ByteString]
raws
in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
if Int
maxRep Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then Int -> Int -> [Int] -> [Int] -> [Text] -> Column
stitchForRepText Int
maxRep Int
maxDef [Int]
repLvls [Int]
defLvls [Text]
texts
else Int -> [Int] -> [Text] -> Column
toMaybeText Int
maxDef [Int]
defLvls [Text]
texts
ParquetType
PFIXED_LEN_BYTE_ARRAY ->
case Maybe Int32
maybeTypeLength of
Just Int32
len ->
let ([ByteString]
raws, ByteString
_) = Int -> Int -> ByteString -> ([ByteString], ByteString)
splitFixed Int
nPresent (Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
len) ByteString
afterLvls
texts :: [Text]
texts = (ByteString -> Text) -> [ByteString] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map ByteString -> Text
decodeUtf8Lenient [ByteString]
raws
in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
if Int
maxRep Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then Int -> Int -> [Int] -> [Int] -> [Text] -> Column
stitchForRepText Int
maxRep Int
maxDef [Int]
repLvls [Int]
defLvls [Text]
texts
else Int -> [Int] -> [Text] -> Column
toMaybeText Int
maxDef [Int]
defLvls [Text]
texts
Maybe Int32
Nothing -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"FIXED_LEN_BYTE_ARRAY requires type length"
ParquetType
PARQUET_TYPE_UNKNOWN -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"Cannot read unknown Parquet type"
ParquetEncoding
ERLE_DICTIONARY -> Maybe DictVals
-> Int -> Int -> [Int] -> [Int] -> Int -> ByteString -> IO Column
decodeDictV1 Maybe DictVals
dictValsM Int
maxDef Int
maxRep [Int]
repLvls [Int]
defLvls Int
nPresent ByteString
afterLvls
ParquetEncoding
EPLAIN_DICTIONARY -> Maybe DictVals
-> Int -> Int -> [Int] -> [Int] -> Int -> ByteString -> IO Column
decodeDictV1 Maybe DictVals
dictValsM Int
maxDef Int
maxRep [Int]
repLvls [Int]
defLvls Int
nPresent ByteString
afterLvls
ParquetEncoding
other -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error ([Char]
"Unsupported " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
versionLabel [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" encoding: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ ParquetEncoding -> [Char]
forall a. Show a => a -> [Char]
show ParquetEncoding
other)
applyLogicalType :: LogicalType -> DI.Column -> DI.Column
applyLogicalType :: LogicalType -> Column -> Column
applyLogicalType (TimestampType Bool
_ TimeUnit
unit) Column
col =
Column -> Either DataFrameException Column -> Column
forall b a. b -> Either a b -> b
fromRight Column
col (Either DataFrameException Column -> Column)
-> Either DataFrameException Column -> Column
forall a b. (a -> b) -> a -> b
$
(Int64 -> UTCTime) -> Column -> Either DataFrameException Column
forall b c.
(Columnable b, Columnable c) =>
(b -> c) -> Column -> Either DataFrameException Column
DI.mapColumn
(Int64 -> UTCTime
microsecondsToUTCTime (Int64 -> UTCTime) -> (Int64 -> Int64) -> Int64 -> UTCTime
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* (Int64
1_000_000 Int64 -> Int64 -> Int64
forall a. Integral a => a -> a -> a
`div` TimeUnit -> Int64
unitDivisor TimeUnit
unit)))
Column
col
applyLogicalType (DecimalType Int32
precision Int32
scale) Column
col
| Int32
precision Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
<= Int32
9 = case forall a (v :: * -> *).
(Vector v a, Columnable a) =>
Column -> Either DataFrameException (v a)
DI.toVector @Int32 @VU.Vector Column
col of
Right Vector Int32
xs ->
Vector Double -> Column
forall a. (Columnable a, Unbox a) => Vector a -> Column
DI.fromUnboxedVector (Vector Double -> Column) -> Vector Double -> Column
forall a b. (a -> b) -> a -> b
$
(Int32 -> Double) -> Vector Int32 -> Vector Double
forall a b. (Unbox a, Unbox b) => (a -> b) -> Vector a -> Vector b
VU.map (\Int32
raw -> forall a b. (Integral a, Num b) => a -> b
fromIntegral @Int32 @Double Int32
raw Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
10 Double -> Int32 -> Double
forall a b. (Num a, Integral b) => a -> b -> a
^ Int32
scale) Vector Int32
xs
Left DataFrameException
_ -> Column
col
| Int32
precision Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
<= Int32
18 = case forall a (v :: * -> *).
(Vector v a, Columnable a) =>
Column -> Either DataFrameException (v a)
DI.toVector @Int64 @VU.Vector Column
col of
Right Vector Int64
xs ->
Vector Double -> Column
forall a. (Columnable a, Unbox a) => Vector a -> Column
DI.fromUnboxedVector (Vector Double -> Column) -> Vector Double -> Column
forall a b. (a -> b) -> a -> b
$
(Int64 -> Double) -> Vector Int64 -> Vector Double
forall a b. (Unbox a, Unbox b) => (a -> b) -> Vector a -> Vector b
VU.map (\Int64
raw -> forall a b. (Integral a, Num b) => a -> b
fromIntegral @Int64 @Double Int64
raw Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
10 Double -> Int32 -> Double
forall a b. (Num a, Integral b) => a -> b -> a
^ Int32
scale) Vector Int64
xs
Left DataFrameException
_ -> Column
col
| Bool
otherwise = Column
col
applyLogicalType LogicalType
_ Column
col = Column
col
microsecondsToUTCTime :: Int64 -> UTCTime
microsecondsToUTCTime :: Int64 -> UTCTime
microsecondsToUTCTime Int64
us =
POSIXTime -> UTCTime
posixSecondsToUTCTime (Int64 -> POSIXTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
us POSIXTime -> POSIXTime -> POSIXTime
forall a. Fractional a => a -> a -> a
/ POSIXTime
1_000_000)
unitDivisor :: TimeUnit -> Int64
unitDivisor :: TimeUnit -> Int64
unitDivisor TimeUnit
MILLISECONDS = Int64
1_000
unitDivisor TimeUnit
MICROSECONDS = Int64
1_000_000
unitDivisor TimeUnit
NANOSECONDS = Int64
1_000_000_000
unitDivisor TimeUnit
TIME_UNIT_UNKNOWN = Int64
1
applyScale :: Int32 -> Int32 -> Double
applyScale :: Int32 -> Int32 -> Double
applyScale Int32
scale Int32
rawValue =
Int32 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
rawValue Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ (Double
10 Double -> Int32 -> Double
forall a b. (Num a, Integral b) => a -> b -> a
^ Int32
scale)