{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

module DataFrame.IO.Parquet where

import Control.Exception (throw)
import Control.Monad
import Data.Bits
import qualified Data.ByteString as BSO
import Data.Either
import Data.IORef
import Data.Int
import qualified Data.List as L
import qualified Data.Map as M
import qualified Data.Set as S
import qualified Data.Text as T
import Data.Text.Encoding
import Data.Time
import Data.Time.Clock.POSIX (posixSecondsToUTCTime)
import Data.Word
import DataFrame.Errors (DataFrameException (ColumnNotFoundException))
import qualified DataFrame.Internal.Column as DI
import DataFrame.Internal.DataFrame (DataFrame)
import DataFrame.Internal.Expression (Expr, getColumns)
import qualified DataFrame.Operations.Core as DI
import DataFrame.Operations.Merge ()
import qualified DataFrame.Operations.Subset as DS
import System.FilePath.Glob (glob)

import DataFrame.IO.Parquet.Dictionary
import DataFrame.IO.Parquet.Levels
import DataFrame.IO.Parquet.Page
import DataFrame.IO.Parquet.Thrift
import DataFrame.IO.Parquet.Types
import System.Directory (doesDirectoryExist)

import qualified Data.Vector.Unboxed as VU
import System.FilePath ((</>))

-- Options -----------------------------------------------------------------

{- | Options for reading Parquet data.

These options are applied in this order:

1. predicate filtering
2. column projection
3. row range

Column selection for @selectedColumns@ uses leaf column names only.
-}
data ParquetReadOptions = ParquetReadOptions
    { ParquetReadOptions -> Maybe [Text]
selectedColumns :: Maybe [T.Text]
    {- ^ Columns to keep in the final dataframe. If set, only these columns are returned.
    Predicate-referenced columns are read automatically when needed and projected out after filtering.
    -}
    , ParquetReadOptions -> Maybe (Expr Bool)
predicate :: Maybe (Expr Bool)
    -- ^ Optional row filter expression applied before projection.
    , ParquetReadOptions -> Maybe (Int, Int)
rowRange :: Maybe (Int, Int)
    -- ^ Optional row slice @(start, end)@ with start-inclusive/end-exclusive semantics.
    }
    deriving (ParquetReadOptions -> ParquetReadOptions -> Bool
(ParquetReadOptions -> ParquetReadOptions -> Bool)
-> (ParquetReadOptions -> ParquetReadOptions -> Bool)
-> Eq ParquetReadOptions
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ParquetReadOptions -> ParquetReadOptions -> Bool
== :: ParquetReadOptions -> ParquetReadOptions -> Bool
$c/= :: ParquetReadOptions -> ParquetReadOptions -> Bool
/= :: ParquetReadOptions -> ParquetReadOptions -> Bool
Eq, Int -> ParquetReadOptions -> ShowS
[ParquetReadOptions] -> ShowS
ParquetReadOptions -> [Char]
(Int -> ParquetReadOptions -> ShowS)
-> (ParquetReadOptions -> [Char])
-> ([ParquetReadOptions] -> ShowS)
-> Show ParquetReadOptions
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ParquetReadOptions -> ShowS
showsPrec :: Int -> ParquetReadOptions -> ShowS
$cshow :: ParquetReadOptions -> [Char]
show :: ParquetReadOptions -> [Char]
$cshowList :: [ParquetReadOptions] -> ShowS
showList :: [ParquetReadOptions] -> ShowS
Show)

{- | Default Parquet read options.

Equivalent to:

@
ParquetReadOptions
    { selectedColumns = Nothing
    , predicate = Nothing
    , rowRange = Nothing
    }
@
-}
defaultParquetReadOptions :: ParquetReadOptions
defaultParquetReadOptions :: ParquetReadOptions
defaultParquetReadOptions =
    ParquetReadOptions
        { selectedColumns :: Maybe [Text]
selectedColumns = Maybe [Text]
forall a. Maybe a
Nothing
        , predicate :: Maybe (Expr Bool)
predicate = Maybe (Expr Bool)
forall a. Maybe a
Nothing
        , rowRange :: Maybe (Int, Int)
rowRange = Maybe (Int, Int)
forall a. Maybe a
Nothing
        }

-- Public API --------------------------------------------------------------

{- | Read a parquet file from path and load it into a dataframe.

==== __Example__
@
ghci> D.readParquet ".\/data\/mtcars.parquet"
@
-}
readParquet :: FilePath -> IO DataFrame
readParquet :: [Char] -> IO DataFrame
readParquet = ParquetReadOptions -> [Char] -> IO DataFrame
readParquetWithOpts ParquetReadOptions
defaultParquetReadOptions

{- | Read a Parquet file using explicit read options.

==== __Example__
@
ghci> D.readParquetWithOpts
ghci|   (D.defaultParquetReadOptions{D.selectedColumns = Just ["id"], D.rowRange = Just (0, 10)})
ghci|   "./tests/data/alltypes_plain.parquet"
@

When @selectedColumns@ is set and @predicate@ references other columns, those predicate columns
are auto-included for decoding, then projected back to the requested output columns.
-}

{- | Strip Parquet encoding artifact names (REPEATED wrappers and their single
  list-element children) from a raw column path, leaving user-visible names.
-}
cleanColPath :: [SNode] -> [String] -> [String]
cleanColPath :: [SNode] -> [[Char]] -> [[Char]]
cleanColPath [SNode]
nodes [[Char]]
path = [SNode] -> [[Char]] -> Bool -> [[Char]]
go [SNode]
nodes [[Char]]
path Bool
False
  where
    go :: [SNode] -> [[Char]] -> Bool -> [[Char]]
go [SNode]
_ [] Bool
_ = []
    go [SNode]
ns ([Char]
p : [[Char]]
ps) Bool
skipThis =
        case (SNode -> Bool) -> [SNode] -> Maybe SNode
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
L.find (\SNode
n -> SNode -> [Char]
sName SNode
n [Char] -> [Char] -> Bool
forall a. Eq a => a -> a -> Bool
== [Char]
p) [SNode]
ns of
            Maybe SNode
Nothing -> []
            Just SNode
n
                | SNode -> RepetitionType
sRep SNode
n RepetitionType -> RepetitionType -> Bool
forall a. Eq a => a -> a -> Bool
== RepetitionType
REPEATED Bool -> Bool -> Bool
&& Bool -> Bool
not ([SNode] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (SNode -> [SNode]
sChildren SNode
n)) ->
                    let skipChildren :: Bool
skipChildren = [SNode] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (SNode -> [SNode]
sChildren SNode
n) Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
                     in [SNode] -> [[Char]] -> Bool -> [[Char]]
go (SNode -> [SNode]
sChildren SNode
n) [[Char]]
ps Bool
skipChildren
                | Bool
skipThis ->
                    [SNode] -> [[Char]] -> Bool -> [[Char]]
go (SNode -> [SNode]
sChildren SNode
n) [[Char]]
ps Bool
False
                | [SNode] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (SNode -> [SNode]
sChildren SNode
n) ->
                    [[Char]
p]
                | Bool
otherwise ->
                    [Char]
p [Char] -> [[Char]] -> [[Char]]
forall a. a -> [a] -> [a]
: [SNode] -> [[Char]] -> Bool -> [[Char]]
go (SNode -> [SNode]
sChildren SNode
n) [[Char]]
ps Bool
False

readParquetWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame
readParquetWithOpts :: ParquetReadOptions -> [Char] -> IO DataFrame
readParquetWithOpts ParquetReadOptions
opts [Char]
path = do
    (FileMetadata
fileMetadata, ByteString
contents) <- [Char] -> IO (FileMetadata, ByteString)
readMetadataFromPath [Char]
path
    let columnPaths :: [(Text, Int)]
columnPaths = [SchemaElement] -> [(Text, Int)]
getColumnPaths (Int -> [SchemaElement] -> [SchemaElement]
forall a. Int -> [a] -> [a]
drop Int
1 ([SchemaElement] -> [SchemaElement])
-> [SchemaElement] -> [SchemaElement]
forall a b. (a -> b) -> a -> b
$ FileMetadata -> [SchemaElement]
schema FileMetadata
fileMetadata)
    let columnNames :: [Text]
columnNames = ((Text, Int) -> Text) -> [(Text, Int)] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (Text, Int) -> Text
forall a b. (a, b) -> a
fst [(Text, Int)]
columnPaths
    let leafNames :: [Text]
leafNames = (Text -> Text) -> [Text] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map ([Text] -> Text
forall a. HasCallStack => [a] -> a
last ([Text] -> Text) -> (Text -> [Text]) -> Text -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HasCallStack => Text -> Text -> [Text]
Text -> Text -> [Text]
T.splitOn Text
".") [Text]
columnNames
    let availableSelectedColumns :: [Text]
availableSelectedColumns = [Text] -> [Text]
forall a. Eq a => [a] -> [a]
L.nub [Text]
leafNames
    let predicateColumns :: [Text]
predicateColumns = [Text] -> (Expr Bool -> [Text]) -> Maybe (Expr Bool) -> [Text]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [] ([Text] -> [Text]
forall a. Eq a => [a] -> [a]
L.nub ([Text] -> [Text]) -> (Expr Bool -> [Text]) -> Expr Bool -> [Text]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Expr Bool -> [Text]
forall a. Expr a -> [Text]
getColumns) (ParquetReadOptions -> Maybe (Expr Bool)
predicate ParquetReadOptions
opts)
    let selectedColumnsForRead :: Maybe [Text]
selectedColumnsForRead = case ParquetReadOptions -> Maybe [Text]
selectedColumns ParquetReadOptions
opts of
            Maybe [Text]
Nothing -> Maybe [Text]
forall a. Maybe a
Nothing
            Just [Text]
selected -> [Text] -> Maybe [Text]
forall a. a -> Maybe a
Just ([Text] -> [Text]
forall a. Eq a => [a] -> [a]
L.nub ([Text]
selected [Text] -> [Text] -> [Text]
forall a. [a] -> [a] -> [a]
++ [Text]
predicateColumns))
    let selectedColumnSet :: Maybe (Set Text)
selectedColumnSet = [Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList ([Text] -> Set Text) -> Maybe [Text] -> Maybe (Set Text)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe [Text]
selectedColumnsForRead
    let shouldReadColumn :: Text -> p -> Bool
shouldReadColumn Text
colName p
_ =
            case Maybe (Set Text)
selectedColumnSet of
                Maybe (Set Text)
Nothing -> Bool
True
                Just Set Text
selected -> Text
colName Text -> Set Text -> Bool
forall a. Ord a => a -> Set a -> Bool
`S.member` Set Text
selected

    case Maybe [Text]
selectedColumnsForRead of
        Maybe [Text]
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        Just [Text]
requested ->
            let missing :: [Text]
missing = [Text]
requested [Text] -> [Text] -> [Text]
forall a. Eq a => [a] -> [a] -> [a]
L.\\ [Text]
availableSelectedColumns
             in Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless
                    ([Text] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
L.null [Text]
missing)
                    ( DataFrameException -> IO ()
forall a e. Exception e => e -> a
throw
                        ( Text -> Text -> [Text] -> DataFrameException
ColumnNotFoundException
                            ([Char] -> Text
T.pack ([Char] -> Text) -> [Char] -> Text
forall a b. (a -> b) -> a -> b
$ [Text] -> [Char]
forall a. Show a => a -> [Char]
show [Text]
missing)
                            Text
"readParquetWithOpts"
                            [Text]
availableSelectedColumns
                        )
                    )

    let totalRows :: Int
totalRows = [Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ((RowGroup -> Int) -> [RowGroup] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map (Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int) -> (RowGroup -> Int64) -> RowGroup -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RowGroup -> Int64
rowGroupNumRows) (FileMetadata -> [RowGroup]
rowGroups FileMetadata
fileMetadata)) :: Int
    IORef (Map Text MutableColumn)
colMutMap <- Map Text MutableColumn -> IO (IORef (Map Text MutableColumn))
forall a. a -> IO (IORef a)
newIORef (Map Text MutableColumn
forall k a. Map k a
M.empty :: M.Map T.Text DI.MutableColumn)
    IORef (Map Text Int)
colOffMap <- Map Text Int -> IO (IORef (Map Text Int))
forall a. a -> IO (IORef a)
newIORef (Map Text Int
forall k a. Map k a
M.empty :: M.Map T.Text Int)
    IORef (Map Text LogicalType)
lTypeMap <- Map Text LogicalType -> IO (IORef (Map Text LogicalType))
forall a. a -> IO (IORef a)
newIORef (Map Text LogicalType
forall k a. Map k a
M.empty :: M.Map T.Text LogicalType)

    let schemaElements :: [SchemaElement]
schemaElements = FileMetadata -> [SchemaElement]
schema FileMetadata
fileMetadata
    let sNodes :: [SNode]
sNodes = [SchemaElement] -> [SNode]
parseAll (Int -> [SchemaElement] -> [SchemaElement]
forall a. Int -> [a] -> [a]
drop Int
1 [SchemaElement]
schemaElements)
    let getTypeLength :: [String] -> Maybe Int32
        getTypeLength :: [[Char]] -> Maybe Int32
getTypeLength [[Char]]
path = [SchemaElement] -> [[Char]] -> Integer -> Maybe Int32
forall {t}.
Num t =>
[SchemaElement] -> [[Char]] -> t -> Maybe Int32
findTypeLength [SchemaElement]
schemaElements [[Char]]
path Integer
0
          where
            findTypeLength :: [SchemaElement] -> [[Char]] -> t -> Maybe Int32
findTypeLength [] [[Char]]
_ t
_ = Maybe Int32
forall a. Maybe a
Nothing
            findTypeLength (SchemaElement
s : [SchemaElement]
ss) [[Char]]
targetPath t
depth
                | (Text -> [Char]) -> [Text] -> [[Char]]
forall a b. (a -> b) -> [a] -> [b]
map Text -> [Char]
T.unpack (SchemaElement -> [SchemaElement] -> t -> [Text]
forall {p} {p} {p} {a}. p -> p -> p -> [a]
pathToElement SchemaElement
s [SchemaElement]
ss t
depth) [[Char]] -> [[Char]] -> Bool
forall a. Eq a => a -> a -> Bool
== [[Char]]
targetPath
                    Bool -> Bool -> Bool
&& SchemaElement -> TType
elementType SchemaElement
s TType -> TType -> Bool
forall a. Eq a => a -> a -> Bool
== TType
STRING
                    Bool -> Bool -> Bool
&& SchemaElement -> Int32
typeLength SchemaElement
s Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
> Int32
0 =
                    Int32 -> Maybe Int32
forall a. a -> Maybe a
Just (SchemaElement -> Int32
typeLength SchemaElement
s)
                | Bool
otherwise =
                    [SchemaElement] -> [[Char]] -> t -> Maybe Int32
findTypeLength [SchemaElement]
ss [[Char]]
targetPath (if SchemaElement -> Int32
numChildren SchemaElement
s Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
> Int32
0 then t
depth t -> t -> t
forall a. Num a => a -> a -> a
+ t
1 else t
depth)

            pathToElement :: p -> p -> p -> [a]
pathToElement p
_ p
_ p
_ = []

    [RowGroup] -> (RowGroup -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (FileMetadata -> [RowGroup]
rowGroups FileMetadata
fileMetadata) ((RowGroup -> IO ()) -> IO ()) -> (RowGroup -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \RowGroup
rowGroup -> do
        [(ColumnChunk, Integer)]
-> ((ColumnChunk, Integer) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([ColumnChunk] -> [Integer] -> [(ColumnChunk, Integer)]
forall a b. [a] -> [b] -> [(a, b)]
zip (RowGroup -> [ColumnChunk]
rowGroupColumns RowGroup
rowGroup) [Integer
0 ..]) (((ColumnChunk, Integer) -> IO ()) -> IO ())
-> ((ColumnChunk, Integer) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(ColumnChunk
colChunk, Integer
colIdx) -> do
            let metadata :: ColumnMetaData
metadata = ColumnChunk -> ColumnMetaData
columnMetaData ColumnChunk
colChunk
            let colPath :: [[Char]]
colPath = ColumnMetaData -> [[Char]]
columnPathInSchema ColumnMetaData
metadata
            let cleanPath :: [[Char]]
cleanPath = [SNode] -> [[Char]] -> [[Char]]
cleanColPath [SNode]
sNodes [[Char]]
colPath
            let colLeafName :: Text
colLeafName =
                    if [[Char]] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [[Char]]
cleanPath
                        then [Char] -> Text
T.pack ([Char] -> Text) -> [Char] -> Text
forall a b. (a -> b) -> a -> b
$ [Char]
"col_" [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Integer -> [Char]
forall a. Show a => a -> [Char]
show Integer
colIdx
                        else [Char] -> Text
T.pack ([Char] -> Text) -> [Char] -> Text
forall a b. (a -> b) -> a -> b
$ [[Char]] -> [Char]
forall a. HasCallStack => [a] -> a
last [[Char]]
cleanPath
            let colFullName :: Text
colFullName =
                    if [[Char]] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [[Char]]
cleanPath
                        then Text
colLeafName
                        else Text -> [Text] -> Text
T.intercalate Text
"." ([Text] -> Text) -> [Text] -> Text
forall a b. (a -> b) -> a -> b
$ ([Char] -> Text) -> [[Char]] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map [Char] -> Text
T.pack [[Char]]
cleanPath

            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Text -> [[Char]] -> Bool
forall {p}. Text -> p -> Bool
shouldReadColumn Text
colLeafName [[Char]]
colPath) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                let colDataPageOffset :: Int64
colDataPageOffset = ColumnMetaData -> Int64
columnDataPageOffset ColumnMetaData
metadata
                let colDictionaryPageOffset :: Int64
colDictionaryPageOffset = ColumnMetaData -> Int64
columnDictionaryPageOffset ColumnMetaData
metadata
                let colStart :: Int64
colStart =
                        if Int64
colDictionaryPageOffset Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
0 Bool -> Bool -> Bool
&& Int64
colDataPageOffset Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
colDictionaryPageOffset
                            then Int64
colDictionaryPageOffset
                            else Int64
colDataPageOffset
                let colLength :: Int64
colLength = ColumnMetaData -> Int64
columnTotalCompressedSize ColumnMetaData
metadata

                let columnBytes :: ByteString
columnBytes = Int -> ByteString -> ByteString
BSO.take (Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
colLength) (Int -> ByteString -> ByteString
BSO.drop (Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
colStart) ByteString
contents)

                [Page]
pages <- CompressionCodec -> ByteString -> IO [Page]
readAllPages (ColumnMetaData -> CompressionCodec
columnCodec ColumnMetaData
metadata) ByteString
columnBytes

                let maybeTypeLength :: Maybe Int32
maybeTypeLength =
                        if ColumnMetaData -> ParquetType
columnType ColumnMetaData
metadata ParquetType -> ParquetType -> Bool
forall a. Eq a => a -> a -> Bool
== ParquetType
PFIXED_LEN_BYTE_ARRAY
                            then [[Char]] -> Maybe Int32
getTypeLength [[Char]]
colPath
                            else Maybe Int32
forall a. Maybe a
Nothing

                let primaryEncoding :: ParquetEncoding
primaryEncoding = ParquetEncoding
-> ((ParquetEncoding, [ParquetEncoding]) -> ParquetEncoding)
-> Maybe (ParquetEncoding, [ParquetEncoding])
-> ParquetEncoding
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ParquetEncoding
EPLAIN (ParquetEncoding, [ParquetEncoding]) -> ParquetEncoding
forall a b. (a, b) -> a
fst ([ParquetEncoding] -> Maybe (ParquetEncoding, [ParquetEncoding])
forall a. [a] -> Maybe (a, [a])
L.uncons (ColumnMetaData -> [ParquetEncoding]
columnEncodings ColumnMetaData
metadata))

                let schemaTail :: [SchemaElement]
schemaTail = Int -> [SchemaElement] -> [SchemaElement]
forall a. Int -> [a] -> [a]
drop Int
1 (FileMetadata -> [SchemaElement]
schema FileMetadata
fileMetadata)
                let (Int
maxDef, Int
maxRep) = [SchemaElement] -> [[Char]] -> (Int, Int)
levelsForPath [SchemaElement]
schemaTail [[Char]]
colPath
                let lType :: LogicalType
lType =
                        LogicalType
-> (SchemaElement -> LogicalType)
-> Maybe SchemaElement
-> LogicalType
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
                            LogicalType
LOGICAL_TYPE_UNKNOWN
                            SchemaElement -> LogicalType
logicalType
                            ([SchemaElement] -> [[Char]] -> Maybe SchemaElement
findLeafSchema [SchemaElement]
schemaTail [[Char]]
colPath)
                Column
column <-
                    (Int, Int)
-> [Page]
-> ParquetType
-> ParquetEncoding
-> Maybe Int32
-> LogicalType
-> IO Column
processColumnPages
                        (Int
maxDef, Int
maxRep)
                        [Page]
pages
                        (ColumnMetaData -> ParquetType
columnType ColumnMetaData
metadata)
                        ParquetEncoding
primaryEncoding
                        Maybe Int32
maybeTypeLength
                        LogicalType
lType

                Map Text MutableColumn
mutMapSnap <- IORef (Map Text MutableColumn) -> IO (Map Text MutableColumn)
forall a. IORef a -> IO a
readIORef IORef (Map Text MutableColumn)
colMutMap
                case Text -> Map Text MutableColumn -> Maybe MutableColumn
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup Text
colFullName Map Text MutableColumn
mutMapSnap of
                    Maybe MutableColumn
Nothing -> do
                        MutableColumn
mc <- Int -> Column -> IO MutableColumn
DI.newMutableColumn Int
totalRows Column
column
                        MutableColumn -> Int -> Column -> IO ()
DI.copyIntoMutableColumn MutableColumn
mc Int
0 Column
column
                        IORef (Map Text MutableColumn)
-> (Map Text MutableColumn -> Map Text MutableColumn) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef (Map Text MutableColumn)
colMutMap (Text
-> MutableColumn
-> Map Text MutableColumn
-> Map Text MutableColumn
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert Text
colFullName MutableColumn
mc)
                        IORef (Map Text Int) -> (Map Text Int -> Map Text Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef (Map Text Int)
colOffMap (Text -> Int -> Map Text Int -> Map Text Int
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert Text
colFullName (Column -> Int
DI.columnLength Column
column))
                    Just MutableColumn
mc -> do
                        Int
off <- (Map Text Int -> Text -> Int
forall k a. Ord k => Map k a -> k -> a
M.! Text
colFullName) (Map Text Int -> Int) -> IO (Map Text Int) -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef (Map Text Int) -> IO (Map Text Int)
forall a. IORef a -> IO a
readIORef IORef (Map Text Int)
colOffMap
                        MutableColumn -> Int -> Column -> IO ()
DI.copyIntoMutableColumn MutableColumn
mc Int
off Column
column
                        IORef (Map Text Int) -> (Map Text Int -> Map Text Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef (Map Text Int)
colOffMap ((Int -> Int) -> Text -> Map Text Int -> Map Text Int
forall k a. Ord k => (a -> a) -> k -> Map k a -> Map k a
M.adjust (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Column -> Int
DI.columnLength Column
column) Text
colFullName)
                IORef (Map Text LogicalType)
-> (Map Text LogicalType -> Map Text LogicalType) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef (Map Text LogicalType)
lTypeMap (Text -> LogicalType -> Map Text LogicalType -> Map Text LogicalType
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert Text
colFullName LogicalType
lType)

    Map Text MutableColumn
finalMutMap <- IORef (Map Text MutableColumn) -> IO (Map Text MutableColumn)
forall a. IORef a -> IO a
readIORef IORef (Map Text MutableColumn)
colMutMap
    Map Text Column
finalColMap <-
        (Text -> MutableColumn -> IO Column)
-> Map Text MutableColumn -> IO (Map Text Column)
forall (t :: * -> *) k a b.
Applicative t =>
(k -> a -> t b) -> Map k a -> t (Map k b)
M.traverseWithKey (\Text
_ MutableColumn
mc -> MutableColumn -> IO Column
DI.freezeMutableColumn MutableColumn
mc) Map Text MutableColumn
finalMutMap
    Map Text LogicalType
finalLTypeMap <- IORef (Map Text LogicalType) -> IO (Map Text LogicalType)
forall a. IORef a -> IO a
readIORef IORef (Map Text LogicalType)
lTypeMap
    let orderedColumns :: [(Text, Column)]
orderedColumns =
            (Text -> (Text, Column)) -> [Text] -> [(Text, Column)]
forall a b. (a -> b) -> [a] -> [b]
map
                ( \Text
name ->
                    ( Text
name
                    , LogicalType -> Column -> Column
applyLogicalType (Map Text LogicalType
finalLTypeMap Map Text LogicalType -> Text -> LogicalType
forall k a. Ord k => Map k a -> k -> a
M.! Text
name) (Column -> Column) -> Column -> Column
forall a b. (a -> b) -> a -> b
$ Map Text Column
finalColMap Map Text Column -> Text -> Column
forall k a. Ord k => Map k a -> k -> a
M.! Text
name
                    )
                )
                ((Text -> Bool) -> [Text] -> [Text]
forall a. (a -> Bool) -> [a] -> [a]
filter (Text -> Map Text Column -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`M.member` Map Text Column
finalColMap) [Text]
columnNames)

    DataFrame -> IO DataFrame
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DataFrame -> IO DataFrame) -> DataFrame -> IO DataFrame
forall a b. (a -> b) -> a -> b
$ ParquetReadOptions -> DataFrame -> DataFrame
applyReadOptions ParquetReadOptions
opts ([(Text, Column)] -> DataFrame
DI.fromNamedColumns [(Text, Column)]
orderedColumns)

{- | Read Parquet files from a directory or glob path.

This is equivalent to calling 'readParquetFilesWithOpts' with 'defaultParquetReadOptions'.
-}
readParquetFiles :: FilePath -> IO DataFrame
readParquetFiles :: [Char] -> IO DataFrame
readParquetFiles = ParquetReadOptions -> [Char] -> IO DataFrame
readParquetFilesWithOpts ParquetReadOptions
defaultParquetReadOptions

{- | Read multiple Parquet files (directory or glob) using explicit options.

If @path@ is a directory, all non-directory entries are read.
If @path@ is a glob, matching files are read.

For multi-file reads, @rowRange@ is applied once after concatenation (global range semantics).

==== __Example__
@
ghci> D.readParquetFilesWithOpts
ghci|   (D.defaultParquetReadOptions{D.selectedColumns = Just ["id"], D.rowRange = Just (0, 5)})
ghci|   "./tests/data/alltypes_plain*.parquet"
@
-}
readParquetFilesWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame
readParquetFilesWithOpts :: ParquetReadOptions -> [Char] -> IO DataFrame
readParquetFilesWithOpts ParquetReadOptions
opts [Char]
path = do
    Bool
isDir <- [Char] -> IO Bool
doesDirectoryExist [Char]
path

    let pat :: [Char]
pat = if Bool
isDir then [Char]
path [Char] -> ShowS
</> [Char]
"*.parquet" else [Char]
path

    [[Char]]
matches <- [Char] -> IO [[Char]]
glob [Char]
pat

    [[Char]]
files <- ([Char] -> IO Bool) -> [[Char]] -> IO [[Char]]
forall (m :: * -> *) a.
Applicative m =>
(a -> m Bool) -> [a] -> m [a]
filterM ((Bool -> Bool) -> IO Bool -> IO Bool
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Bool -> Bool
not (IO Bool -> IO Bool) -> ([Char] -> IO Bool) -> [Char] -> IO Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Char] -> IO Bool
doesDirectoryExist) [[Char]]
matches

    case [[Char]]
files of
        [] ->
            [Char] -> IO DataFrame
forall a. HasCallStack => [Char] -> a
error ([Char] -> IO DataFrame) -> [Char] -> IO DataFrame
forall a b. (a -> b) -> a -> b
$
                [Char]
"readParquetFiles: no parquet files found for " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
path
        [[Char]]
_ -> do
            let optsWithoutRowRange :: ParquetReadOptions
optsWithoutRowRange = ParquetReadOptions
opts{rowRange = Nothing}
            [DataFrame]
dfs <- ([Char] -> IO DataFrame) -> [[Char]] -> IO [DataFrame]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (ParquetReadOptions -> [Char] -> IO DataFrame
readParquetWithOpts ParquetReadOptions
optsWithoutRowRange) [[Char]]
files
            DataFrame -> IO DataFrame
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ParquetReadOptions -> DataFrame -> DataFrame
applyRowRange ParquetReadOptions
opts ([DataFrame] -> DataFrame
forall a. Monoid a => [a] -> a
mconcat [DataFrame]
dfs))

-- Options application -----------------------------------------------------

applyRowRange :: ParquetReadOptions -> DataFrame -> DataFrame
applyRowRange :: ParquetReadOptions -> DataFrame -> DataFrame
applyRowRange ParquetReadOptions
opts DataFrame
df =
    DataFrame
-> ((Int, Int) -> DataFrame) -> Maybe (Int, Int) -> DataFrame
forall b a. b -> (a -> b) -> Maybe a -> b
maybe DataFrame
df ((Int, Int) -> DataFrame -> DataFrame
`DS.range` DataFrame
df) (ParquetReadOptions -> Maybe (Int, Int)
rowRange ParquetReadOptions
opts)

applySelectedColumns :: ParquetReadOptions -> DataFrame -> DataFrame
applySelectedColumns :: ParquetReadOptions -> DataFrame -> DataFrame
applySelectedColumns ParquetReadOptions
opts DataFrame
df =
    DataFrame -> ([Text] -> DataFrame) -> Maybe [Text] -> DataFrame
forall b a. b -> (a -> b) -> Maybe a -> b
maybe DataFrame
df ([Text] -> DataFrame -> DataFrame
`DS.select` DataFrame
df) (ParquetReadOptions -> Maybe [Text]
selectedColumns ParquetReadOptions
opts)

applyPredicate :: ParquetReadOptions -> DataFrame -> DataFrame
applyPredicate :: ParquetReadOptions -> DataFrame -> DataFrame
applyPredicate ParquetReadOptions
opts DataFrame
df =
    DataFrame
-> (Expr Bool -> DataFrame) -> Maybe (Expr Bool) -> DataFrame
forall b a. b -> (a -> b) -> Maybe a -> b
maybe DataFrame
df (Expr Bool -> DataFrame -> DataFrame
`DS.filterWhere` DataFrame
df) (ParquetReadOptions -> Maybe (Expr Bool)
predicate ParquetReadOptions
opts)

applyReadOptions :: ParquetReadOptions -> DataFrame -> DataFrame
applyReadOptions :: ParquetReadOptions -> DataFrame -> DataFrame
applyReadOptions ParquetReadOptions
opts =
    ParquetReadOptions -> DataFrame -> DataFrame
applyRowRange ParquetReadOptions
opts
        (DataFrame -> DataFrame)
-> (DataFrame -> DataFrame) -> DataFrame -> DataFrame
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ParquetReadOptions -> DataFrame -> DataFrame
applySelectedColumns ParquetReadOptions
opts
        (DataFrame -> DataFrame)
-> (DataFrame -> DataFrame) -> DataFrame -> DataFrame
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ParquetReadOptions -> DataFrame -> DataFrame
applyPredicate ParquetReadOptions
opts

-- File and metadata parsing -----------------------------------------------

readMetadataFromPath :: FilePath -> IO (FileMetadata, BSO.ByteString)
readMetadataFromPath :: [Char] -> IO (FileMetadata, ByteString)
readMetadataFromPath [Char]
path = do
    ByteString
contents <- [Char] -> IO ByteString
BSO.readFile [Char]
path
    let (Int
size, ByteString
magicString) = ByteString
contents ByteString -> (Int, ByteString) -> (Int, ByteString)
forall a b. a -> b -> b
`seq` ByteString -> (Int, ByteString)
readMetadataSizeFromFooter ByteString
contents
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString
magicString ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= ByteString
"PAR1") (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Invalid Parquet file"
    FileMetadata
meta <- ByteString -> Int -> IO FileMetadata
readMetadata ByteString
contents Int
size
    (FileMetadata, ByteString) -> IO (FileMetadata, ByteString)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (FileMetadata
meta, ByteString
contents)

readMetadataSizeFromFooter :: BSO.ByteString -> (Int, BSO.ByteString)
readMetadataSizeFromFooter :: ByteString -> (Int, ByteString)
readMetadataSizeFromFooter ByteString
contents =
    let
        footerOffSet :: Int
footerOffSet = ByteString -> Int
BSO.length ByteString
contents Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
8
        sizeBytes :: [Int32]
sizeBytes =
            (Int -> Int32) -> [Int] -> [Int32]
forall a b. (a -> b) -> [a] -> [b]
map
                (forall a b. (Integral a, Num b) => a -> b
fromIntegral @Word8 @Int32 (Word8 -> Int32) -> (Int -> Word8) -> Int -> Int32
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HasCallStack => ByteString -> Int -> Word8
ByteString -> Int -> Word8
BSO.index ByteString
contents)
                [Int
footerOffSet .. Int
footerOffSet Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
3]
        size :: Int
size = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32 -> Int) -> Int32 -> Int
forall a b. (a -> b) -> a -> b
$ (Int32 -> Int32 -> Int32) -> Int32 -> [Int32] -> Int32
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
L.foldl' Int32 -> Int32 -> Int32
forall a. Bits a => a -> a -> a
(.|.) Int32
0 ([Int32] -> Int32) -> [Int32] -> Int32
forall a b. (a -> b) -> a -> b
$ (Int32 -> Int -> Int32) -> [Int32] -> [Int] -> [Int32]
forall a b c. (a -> b -> c) -> [a] -> [b] -> [c]
zipWith Int32 -> Int -> Int32
forall a. Bits a => a -> Int -> a
shift [Int32]
sizeBytes [Int
0, Int
8, Int
16, Int
24]
        magicStringBytes :: [Word8]
magicStringBytes = (Int -> Word8) -> [Int] -> [Word8]
forall a b. (a -> b) -> [a] -> [b]
map (HasCallStack => ByteString -> Int -> Word8
ByteString -> Int -> Word8
BSO.index ByteString
contents) [Int
footerOffSet Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
4 .. Int
footerOffSet Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
7]
        magicString :: ByteString
magicString = [Word8] -> ByteString
BSO.pack [Word8]
magicStringBytes
     in
        (Int
size, ByteString
magicString)

-- Schema navigation -------------------------------------------------------

getColumnPaths :: [SchemaElement] -> [(T.Text, Int)]
getColumnPaths :: [SchemaElement] -> [(Text, Int)]
getColumnPaths [SchemaElement]
schemaElements =
    let nodes :: [SNode]
nodes = [SchemaElement] -> [SNode]
parseAll [SchemaElement]
schemaElements
     in [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go [SNode]
nodes Int
0 [] Bool
False
  where
    go :: [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go [] Int
_ [Text]
_ Bool
_ = []
    go (SNode
n : [SNode]
ns) Int
idx [Text]
path Bool
skipThis
        | [SNode] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (SNode -> [SNode]
sChildren SNode
n) =
            let newPath :: [Text]
newPath = if Bool
skipThis then [Text]
path else [Text]
path [Text] -> [Text] -> [Text]
forall a. [a] -> [a] -> [a]
++ [[Char] -> Text
T.pack (SNode -> [Char]
sName SNode
n)]
                fullPath :: Text
fullPath = Text -> [Text] -> Text
T.intercalate Text
"." [Text]
newPath
             in (Text
fullPath, Int
idx) (Text, Int) -> [(Text, Int)] -> [(Text, Int)]
forall a. a -> [a] -> [a]
: [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go [SNode]
ns (Int
idx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) [Text]
path Bool
skipThis
        | SNode -> RepetitionType
sRep SNode
n RepetitionType -> RepetitionType -> Bool
forall a. Eq a => a -> a -> Bool
== RepetitionType
REPEATED =
            let skipChildren :: Bool
skipChildren = [SNode] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (SNode -> [SNode]
sChildren SNode
n) Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
                childLeaves :: [(Text, Int)]
childLeaves = [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go (SNode -> [SNode]
sChildren SNode
n) Int
idx [Text]
path Bool
skipChildren
             in [(Text, Int)]
childLeaves [(Text, Int)] -> [(Text, Int)] -> [(Text, Int)]
forall a. [a] -> [a] -> [a]
++ [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go [SNode]
ns (Int
idx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ [(Text, Int)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(Text, Int)]
childLeaves) [Text]
path Bool
skipThis
        | Bool
skipThis =
            let childLeaves :: [(Text, Int)]
childLeaves = [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go (SNode -> [SNode]
sChildren SNode
n) Int
idx [Text]
path Bool
False
             in [(Text, Int)]
childLeaves [(Text, Int)] -> [(Text, Int)] -> [(Text, Int)]
forall a. [a] -> [a] -> [a]
++ [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go [SNode]
ns (Int
idx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ [(Text, Int)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(Text, Int)]
childLeaves) [Text]
path Bool
skipThis
        | Bool
otherwise =
            let subPath :: [Text]
subPath = [Text]
path [Text] -> [Text] -> [Text]
forall a. [a] -> [a] -> [a]
++ [[Char] -> Text
T.pack (SNode -> [Char]
sName SNode
n)]
                childLeaves :: [(Text, Int)]
childLeaves = [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go (SNode -> [SNode]
sChildren SNode
n) Int
idx [Text]
subPath Bool
False
             in [(Text, Int)]
childLeaves [(Text, Int)] -> [(Text, Int)] -> [(Text, Int)]
forall a. [a] -> [a] -> [a]
++ [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go [SNode]
ns (Int
idx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ [(Text, Int)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(Text, Int)]
childLeaves) [Text]
path Bool
skipThis

findLeafSchema :: [SchemaElement] -> [String] -> Maybe SchemaElement
findLeafSchema :: [SchemaElement] -> [[Char]] -> Maybe SchemaElement
findLeafSchema [SchemaElement]
elems [[Char]]
path =
    case [SNode] -> [[Char]] -> Maybe SNode
go ([SchemaElement] -> [SNode]
parseAll [SchemaElement]
elems) [[Char]]
path of
        Just SNode
node -> (SchemaElement -> Bool) -> [SchemaElement] -> Maybe SchemaElement
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
L.find (\SchemaElement
e -> Text -> [Char]
T.unpack (SchemaElement -> Text
elementName SchemaElement
e) [Char] -> [Char] -> Bool
forall a. Eq a => a -> a -> Bool
== SNode -> [Char]
sName SNode
node) [SchemaElement]
elems
        Maybe SNode
Nothing -> Maybe SchemaElement
forall a. Maybe a
Nothing
  where
    go :: [SNode] -> [[Char]] -> Maybe SNode
go [] [[Char]]
_ = Maybe SNode
forall a. Maybe a
Nothing
    go [SNode]
_ [] = Maybe SNode
forall a. Maybe a
Nothing
    go [SNode]
nodes [[Char]
p] = (SNode -> Bool) -> [SNode] -> Maybe SNode
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
L.find (\SNode
n -> SNode -> [Char]
sName SNode
n [Char] -> [Char] -> Bool
forall a. Eq a => a -> a -> Bool
== [Char]
p) [SNode]
nodes
    go [SNode]
nodes ([Char]
p : [[Char]]
ps) = (SNode -> Bool) -> [SNode] -> Maybe SNode
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
L.find (\SNode
n -> SNode -> [Char]
sName SNode
n [Char] -> [Char] -> Bool
forall a. Eq a => a -> a -> Bool
== [Char]
p) [SNode]
nodes Maybe SNode -> (SNode -> Maybe SNode) -> Maybe SNode
forall a b. Maybe a -> (a -> Maybe b) -> Maybe b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \SNode
n -> [SNode] -> [[Char]] -> Maybe SNode
go (SNode -> [SNode]
sChildren SNode
n) [[Char]]
ps

-- Page decoding -----------------------------------------------------------

processColumnPages ::
    (Int, Int) ->
    [Page] ->
    ParquetType ->
    ParquetEncoding ->
    Maybe Int32 ->
    LogicalType ->
    IO DI.Column
processColumnPages :: (Int, Int)
-> [Page]
-> ParquetType
-> ParquetEncoding
-> Maybe Int32
-> LogicalType
-> IO Column
processColumnPages (Int
maxDef, Int
maxRep) [Page]
pages ParquetType
pType ParquetEncoding
_ Maybe Int32
maybeTypeLength LogicalType
lType = do
    let dictPages :: [Page]
dictPages = (Page -> Bool) -> [Page] -> [Page]
forall a. (a -> Bool) -> [a] -> [a]
filter Page -> Bool
isDictionaryPage [Page]
pages
    let dataPages :: [Page]
dataPages = (Page -> Bool) -> [Page] -> [Page]
forall a. (a -> Bool) -> [a] -> [a]
filter Page -> Bool
isDataPage [Page]
pages

    let dictValsM :: Maybe DictVals
dictValsM =
            case [Page]
dictPages of
                [] -> Maybe DictVals
forall a. Maybe a
Nothing
                (Page
dictPage : [Page]
_) ->
                    case PageHeader -> PageTypeHeader
pageTypeHeader (Page -> PageHeader
pageHeader Page
dictPage) of
                        DictionaryPageHeader{Bool
Int32
ParquetEncoding
dictionaryPageHeaderNumValues :: Int32
dictionaryPageHeaderEncoding :: ParquetEncoding
dictionaryPageIsSorted :: Bool
dictionaryPageIsSorted :: PageTypeHeader -> Bool
dictionaryPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dictionaryPageHeaderNumValues :: PageTypeHeader -> Int32
..} ->
                            let countForBools :: Maybe Int32
countForBools =
                                    if ParquetType
pType ParquetType -> ParquetType -> Bool
forall a. Eq a => a -> a -> Bool
== ParquetType
PBOOLEAN
                                        then Int32 -> Maybe Int32
forall a. a -> Maybe a
Just Int32
dictionaryPageHeaderNumValues
                                        else Maybe Int32
maybeTypeLength
                             in DictVals -> Maybe DictVals
forall a. a -> Maybe a
Just (ParquetType -> ByteString -> Maybe Int32 -> DictVals
readDictVals ParquetType
pType (Page -> ByteString
pageBytes Page
dictPage) Maybe Int32
countForBools)
                        PageTypeHeader
_ -> Maybe DictVals
forall a. Maybe a
Nothing

    [Column]
cols <- [Page] -> (Page -> IO Column) -> IO [Column]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Page]
dataPages ((Page -> IO Column) -> IO [Column])
-> (Page -> IO Column) -> IO [Column]
forall a b. (a -> b) -> a -> b
$ \Page
page -> do
        let bs0 :: ByteString
bs0 = Page -> ByteString
pageBytes Page
page
        case PageHeader -> PageTypeHeader
pageTypeHeader (Page -> PageHeader
pageHeader Page
page) of
            DataPageHeader{Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderNumValues :: Int32
dataPageHeaderEncoding :: ParquetEncoding
definitionLevelEncoding :: ParquetEncoding
repetitionLevelEncoding :: ParquetEncoding
dataPageHeaderStatistics :: ColumnStatistics
dataPageHeaderStatistics :: PageTypeHeader -> ColumnStatistics
repetitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
definitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderNumValues :: PageTypeHeader -> Int32
..} -> do
                let n :: Int
n = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
dataPageHeaderNumValues
                    ([Int]
defLvls, [Int]
repLvls, ByteString
afterLvls) = Int -> Int -> Int -> ByteString -> ([Int], [Int], ByteString)
readLevelsV1 Int
n Int
maxDef Int
maxRep ByteString
bs0
                    nPresent :: Int
nPresent = [Int] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ((Int -> Bool) -> [Int] -> [Int]
forall a. (a -> Bool) -> [a] -> [a]
filter (Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
maxDef) [Int]
defLvls)
                Maybe DictVals
-> (Int, Int)
-> ParquetType
-> Maybe Int32
-> ParquetEncoding
-> [Int]
-> [Int]
-> Int
-> ByteString
-> [Char]
-> IO Column
decodePageData
                    Maybe DictVals
dictValsM
                    (Int
maxDef, Int
maxRep)
                    ParquetType
pType
                    Maybe Int32
maybeTypeLength
                    ParquetEncoding
dataPageHeaderEncoding
                    [Int]
defLvls
                    [Int]
repLvls
                    Int
nPresent
                    ByteString
afterLvls
                    [Char]
"v1"
            DataPageHeaderV2{Bool
Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderV2NumValues :: Int32
dataPageHeaderV2NumNulls :: Int32
dataPageHeaderV2NumRows :: Int32
dataPageHeaderV2Encoding :: ParquetEncoding
definitionLevelByteLength :: Int32
repetitionLevelByteLength :: Int32
dataPageHeaderV2IsCompressed :: Bool
dataPageHeaderV2Statistics :: ColumnStatistics
dataPageHeaderV2Statistics :: PageTypeHeader -> ColumnStatistics
dataPageHeaderV2IsCompressed :: PageTypeHeader -> Bool
repetitionLevelByteLength :: PageTypeHeader -> Int32
definitionLevelByteLength :: PageTypeHeader -> Int32
dataPageHeaderV2Encoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderV2NumRows :: PageTypeHeader -> Int32
dataPageHeaderV2NumNulls :: PageTypeHeader -> Int32
dataPageHeaderV2NumValues :: PageTypeHeader -> Int32
..} -> do
                let n :: Int
n = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
dataPageHeaderV2NumValues
                    ([Int]
defLvls, [Int]
repLvls, ByteString
afterLvls) =
                        Int
-> Int
-> Int
-> Int32
-> Int32
-> ByteString
-> ([Int], [Int], ByteString)
readLevelsV2
                            Int
n
                            Int
maxDef
                            Int
maxRep
                            Int32
definitionLevelByteLength
                            Int32
repetitionLevelByteLength
                            ByteString
bs0
                    nPresent :: Int
nPresent
                        | Int32
dataPageHeaderV2NumNulls Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
> Int32
0 =
                            Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32
dataPageHeaderV2NumValues Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
- Int32
dataPageHeaderV2NumNulls)
                        | Bool
otherwise = [Int] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ((Int -> Bool) -> [Int] -> [Int]
forall a. (a -> Bool) -> [a] -> [a]
filter (Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
maxDef) [Int]
defLvls)
                Maybe DictVals
-> (Int, Int)
-> ParquetType
-> Maybe Int32
-> ParquetEncoding
-> [Int]
-> [Int]
-> Int
-> ByteString
-> [Char]
-> IO Column
decodePageData
                    Maybe DictVals
dictValsM
                    (Int
maxDef, Int
maxRep)
                    ParquetType
pType
                    Maybe Int32
maybeTypeLength
                    ParquetEncoding
dataPageHeaderV2Encoding
                    [Int]
defLvls
                    [Int]
repLvls
                    Int
nPresent
                    ByteString
afterLvls
                    [Char]
"v2"

            -- Cannot happen as these are filtered out by isDataPage above
            DictionaryPageHeader{} -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"processColumnPages: impossible DictionaryPageHeader"
            PageTypeHeader
INDEX_PAGE_HEADER -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"processColumnPages: impossible INDEX_PAGE_HEADER"
            PageTypeHeader
PAGE_TYPE_HEADER_UNKNOWN -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"processColumnPages: impossible PAGE_TYPE_HEADER_UNKNOWN"
    Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$ [Column] -> Column
DI.concatManyColumns [Column]
cols

decodePageData ::
    Maybe DictVals ->
    (Int, Int) ->
    ParquetType ->
    Maybe Int32 ->
    ParquetEncoding ->
    [Int] ->
    [Int] ->
    Int ->
    BSO.ByteString ->
    String ->
    IO DI.Column
decodePageData :: Maybe DictVals
-> (Int, Int)
-> ParquetType
-> Maybe Int32
-> ParquetEncoding
-> [Int]
-> [Int]
-> Int
-> ByteString
-> [Char]
-> IO Column
decodePageData Maybe DictVals
dictValsM (Int
maxDef, Int
maxRep) ParquetType
pType Maybe Int32
maybeTypeLength ParquetEncoding
encoding [Int]
defLvls [Int]
repLvls Int
nPresent ByteString
afterLvls [Char]
versionLabel =
    case ParquetEncoding
encoding of
        ParquetEncoding
EPLAIN ->
            case ParquetType
pType of
                ParquetType
PBOOLEAN ->
                    let ([Bool]
vals, ByteString
_) = Int -> ByteString -> ([Bool], ByteString)
readNBool Int
nPresent ByteString
afterLvls
                     in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
                            if Int
maxRep Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
                                then Int -> Int -> [Int] -> [Int] -> [Bool] -> Column
stitchForRepBool Int
maxRep Int
maxDef [Int]
repLvls [Int]
defLvls [Bool]
vals
                                else Int -> [Int] -> [Bool] -> Column
toMaybeBool Int
maxDef [Int]
defLvls [Bool]
vals
                ParquetType
PINT32
                    | Int
maxDef Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
                    , Int
maxRep Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 ->
                        Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$ Vector Int32 -> Column
forall a. (Columnable a, Unbox a) => Vector a -> Column
DI.fromUnboxedVector (Int -> ByteString -> Vector Int32
readNInt32Vec Int
nPresent ByteString
afterLvls)
                ParquetType
PINT32 ->
                    let ([Int32]
vals, ByteString
_) = Int -> ByteString -> ([Int32], ByteString)
readNInt32 Int
nPresent ByteString
afterLvls
                     in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
                            if Int
maxRep Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
                                then Int -> Int -> [Int] -> [Int] -> [Int32] -> Column
stitchForRepInt32 Int
maxRep Int
maxDef [Int]
repLvls [Int]
defLvls [Int32]
vals
                                else Int -> [Int] -> [Int32] -> Column
toMaybeInt32 Int
maxDef [Int]
defLvls [Int32]
vals
                ParquetType
PINT64
                    | Int
maxDef Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
                    , Int
maxRep Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 ->
                        Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$ Vector Int64 -> Column
forall a. (Columnable a, Unbox a) => Vector a -> Column
DI.fromUnboxedVector (Int -> ByteString -> Vector Int64
readNInt64Vec Int
nPresent ByteString
afterLvls)
                ParquetType
PINT64 ->
                    let ([Int64]
vals, ByteString
_) = Int -> ByteString -> ([Int64], ByteString)
readNInt64 Int
nPresent ByteString
afterLvls
                     in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
                            if Int
maxRep Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
                                then Int -> Int -> [Int] -> [Int] -> [Int64] -> Column
stitchForRepInt64 Int
maxRep Int
maxDef [Int]
repLvls [Int]
defLvls [Int64]
vals
                                else Int -> [Int] -> [Int64] -> Column
toMaybeInt64 Int
maxDef [Int]
defLvls [Int64]
vals
                ParquetType
PINT96 ->
                    let ([UTCTime]
vals, ByteString
_) = Int -> ByteString -> ([UTCTime], ByteString)
readNInt96Times Int
nPresent ByteString
afterLvls
                     in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
                            if Int
maxRep Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
                                then Int -> Int -> [Int] -> [Int] -> [UTCTime] -> Column
stitchForRepUTCTime Int
maxRep Int
maxDef [Int]
repLvls [Int]
defLvls [UTCTime]
vals
                                else Int -> [Int] -> [UTCTime] -> Column
toMaybeUTCTime Int
maxDef [Int]
defLvls [UTCTime]
vals
                ParquetType
PFLOAT
                    | Int
maxDef Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
                    , Int
maxRep Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 ->
                        Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$ Vector Float -> Column
forall a. (Columnable a, Unbox a) => Vector a -> Column
DI.fromUnboxedVector (Int -> ByteString -> Vector Float
readNFloatVec Int
nPresent ByteString
afterLvls)
                ParquetType
PFLOAT ->
                    let ([Float]
vals, ByteString
_) = Int -> ByteString -> ([Float], ByteString)
readNFloat Int
nPresent ByteString
afterLvls
                     in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
                            if Int
maxRep Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
                                then Int -> Int -> [Int] -> [Int] -> [Float] -> Column
stitchForRepFloat Int
maxRep Int
maxDef [Int]
repLvls [Int]
defLvls [Float]
vals
                                else Int -> [Int] -> [Float] -> Column
toMaybeFloat Int
maxDef [Int]
defLvls [Float]
vals
                ParquetType
PDOUBLE
                    | Int
maxDef Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
                    , Int
maxRep Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 ->
                        Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$ Vector Double -> Column
forall a. (Columnable a, Unbox a) => Vector a -> Column
DI.fromUnboxedVector (Int -> ByteString -> Vector Double
readNDoubleVec Int
nPresent ByteString
afterLvls)
                ParquetType
PDOUBLE ->
                    let ([Double]
vals, ByteString
_) = Int -> ByteString -> ([Double], ByteString)
readNDouble Int
nPresent ByteString
afterLvls
                     in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
                            if Int
maxRep Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
                                then Int -> Int -> [Int] -> [Int] -> [Double] -> Column
stitchForRepDouble Int
maxRep Int
maxDef [Int]
repLvls [Int]
defLvls [Double]
vals
                                else Int -> [Int] -> [Double] -> Column
toMaybeDouble Int
maxDef [Int]
defLvls [Double]
vals
                ParquetType
PBYTE_ARRAY ->
                    let ([ByteString]
raws, ByteString
_) = Int -> ByteString -> ([ByteString], ByteString)
readNByteArrays Int
nPresent ByteString
afterLvls
                        texts :: [Text]
texts = (ByteString -> Text) -> [ByteString] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map ByteString -> Text
decodeUtf8Lenient [ByteString]
raws
                     in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
                            if Int
maxRep Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
                                then Int -> Int -> [Int] -> [Int] -> [Text] -> Column
stitchForRepText Int
maxRep Int
maxDef [Int]
repLvls [Int]
defLvls [Text]
texts
                                else Int -> [Int] -> [Text] -> Column
toMaybeText Int
maxDef [Int]
defLvls [Text]
texts
                ParquetType
PFIXED_LEN_BYTE_ARRAY ->
                    case Maybe Int32
maybeTypeLength of
                        Just Int32
len ->
                            let ([ByteString]
raws, ByteString
_) = Int -> Int -> ByteString -> ([ByteString], ByteString)
splitFixed Int
nPresent (Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
len) ByteString
afterLvls
                                texts :: [Text]
texts = (ByteString -> Text) -> [ByteString] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map ByteString -> Text
decodeUtf8Lenient [ByteString]
raws
                             in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
                                    if Int
maxRep Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
                                        then Int -> Int -> [Int] -> [Int] -> [Text] -> Column
stitchForRepText Int
maxRep Int
maxDef [Int]
repLvls [Int]
defLvls [Text]
texts
                                        else Int -> [Int] -> [Text] -> Column
toMaybeText Int
maxDef [Int]
defLvls [Text]
texts
                        Maybe Int32
Nothing -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"FIXED_LEN_BYTE_ARRAY requires type length"
                ParquetType
PARQUET_TYPE_UNKNOWN -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"Cannot read unknown Parquet type"
        ParquetEncoding
ERLE_DICTIONARY -> Maybe DictVals
-> Int -> Int -> [Int] -> [Int] -> Int -> ByteString -> IO Column
decodeDictV1 Maybe DictVals
dictValsM Int
maxDef Int
maxRep [Int]
repLvls [Int]
defLvls Int
nPresent ByteString
afterLvls
        ParquetEncoding
EPLAIN_DICTIONARY -> Maybe DictVals
-> Int -> Int -> [Int] -> [Int] -> Int -> ByteString -> IO Column
decodeDictV1 Maybe DictVals
dictValsM Int
maxDef Int
maxRep [Int]
repLvls [Int]
defLvls Int
nPresent ByteString
afterLvls
        ParquetEncoding
other -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error ([Char]
"Unsupported " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
versionLabel [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" encoding: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ ParquetEncoding -> [Char]
forall a. Show a => a -> [Char]
show ParquetEncoding
other)

-- Logical type conversion -------------------------------------------------

applyLogicalType :: LogicalType -> DI.Column -> DI.Column
applyLogicalType :: LogicalType -> Column -> Column
applyLogicalType (TimestampType Bool
_ TimeUnit
unit) Column
col =
    Column -> Either DataFrameException Column -> Column
forall b a. b -> Either a b -> b
fromRight Column
col (Either DataFrameException Column -> Column)
-> Either DataFrameException Column -> Column
forall a b. (a -> b) -> a -> b
$
        (Int64 -> UTCTime) -> Column -> Either DataFrameException Column
forall b c.
(Columnable b, Columnable c) =>
(b -> c) -> Column -> Either DataFrameException Column
DI.mapColumn
            (Int64 -> UTCTime
microsecondsToUTCTime (Int64 -> UTCTime) -> (Int64 -> Int64) -> Int64 -> UTCTime
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* (Int64
1_000_000 Int64 -> Int64 -> Int64
forall a. Integral a => a -> a -> a
`div` TimeUnit -> Int64
unitDivisor TimeUnit
unit)))
            Column
col
applyLogicalType (DecimalType Int32
precision Int32
scale) Column
col
    | Int32
precision Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
<= Int32
9 = case forall a (v :: * -> *).
(Vector v a, Columnable a) =>
Column -> Either DataFrameException (v a)
DI.toVector @Int32 @VU.Vector Column
col of
        Right Vector Int32
xs ->
            Vector Double -> Column
forall a. (Columnable a, Unbox a) => Vector a -> Column
DI.fromUnboxedVector (Vector Double -> Column) -> Vector Double -> Column
forall a b. (a -> b) -> a -> b
$
                (Int32 -> Double) -> Vector Int32 -> Vector Double
forall a b. (Unbox a, Unbox b) => (a -> b) -> Vector a -> Vector b
VU.map (\Int32
raw -> forall a b. (Integral a, Num b) => a -> b
fromIntegral @Int32 @Double Int32
raw Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
10 Double -> Int32 -> Double
forall a b. (Num a, Integral b) => a -> b -> a
^ Int32
scale) Vector Int32
xs
        Left DataFrameException
_ -> Column
col
    | Int32
precision Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
<= Int32
18 = case forall a (v :: * -> *).
(Vector v a, Columnable a) =>
Column -> Either DataFrameException (v a)
DI.toVector @Int64 @VU.Vector Column
col of
        Right Vector Int64
xs ->
            Vector Double -> Column
forall a. (Columnable a, Unbox a) => Vector a -> Column
DI.fromUnboxedVector (Vector Double -> Column) -> Vector Double -> Column
forall a b. (a -> b) -> a -> b
$
                (Int64 -> Double) -> Vector Int64 -> Vector Double
forall a b. (Unbox a, Unbox b) => (a -> b) -> Vector a -> Vector b
VU.map (\Int64
raw -> forall a b. (Integral a, Num b) => a -> b
fromIntegral @Int64 @Double Int64
raw Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
10 Double -> Int32 -> Double
forall a b. (Num a, Integral b) => a -> b -> a
^ Int32
scale) Vector Int64
xs
        Left DataFrameException
_ -> Column
col
    | Bool
otherwise = Column
col
applyLogicalType LogicalType
_ Column
col = Column
col

microsecondsToUTCTime :: Int64 -> UTCTime
microsecondsToUTCTime :: Int64 -> UTCTime
microsecondsToUTCTime Int64
us =
    POSIXTime -> UTCTime
posixSecondsToUTCTime (Int64 -> POSIXTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
us POSIXTime -> POSIXTime -> POSIXTime
forall a. Fractional a => a -> a -> a
/ POSIXTime
1_000_000)

unitDivisor :: TimeUnit -> Int64
unitDivisor :: TimeUnit -> Int64
unitDivisor TimeUnit
MILLISECONDS = Int64
1_000
unitDivisor TimeUnit
MICROSECONDS = Int64
1_000_000
unitDivisor TimeUnit
NANOSECONDS = Int64
1_000_000_000
unitDivisor TimeUnit
TIME_UNIT_UNKNOWN = Int64
1

applyScale :: Int32 -> Int32 -> Double
applyScale :: Int32 -> Int32 -> Double
applyScale Int32
scale Int32
rawValue =
    Int32 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
rawValue Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ (Double
10 Double -> Int32 -> Double
forall a b. (Num a, Integral b) => a -> b -> a
^ Int32
scale)