{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

module DataFrame.IO.Parquet where

import Control.Exception (throw, try)
import Control.Monad
import qualified Data.ByteString as BSO
import Data.Either
import Data.IORef
import Data.Int
import qualified Data.List as L
import qualified Data.Map.Strict as M
import qualified Data.Set as S
import qualified Data.Text as T
import Data.Text.Encoding
import Data.Time
import Data.Time.Clock.POSIX (posixSecondsToUTCTime)
import DataFrame.Errors (DataFrameException (ColumnNotFoundException))
import DataFrame.Internal.Binary (littleEndianWord32)
import qualified DataFrame.Internal.Column as DI
import DataFrame.Internal.DataFrame (DataFrame)
import DataFrame.Internal.Expression (Expr, getColumns)
import qualified DataFrame.Operations.Core as DI
import DataFrame.Operations.Merge ()
import qualified DataFrame.Operations.Subset as DS
import System.FilePath.Glob (compile, glob, match)

import Data.Aeson (FromJSON (..), eitherDecodeStrict, withObject, (.:))
import DataFrame.IO.Parquet.Dictionary
import DataFrame.IO.Parquet.Levels
import DataFrame.IO.Parquet.Page
import DataFrame.IO.Parquet.Thrift
import DataFrame.IO.Parquet.Types
import Network.HTTP.Simple (
    getResponseBody,
    getResponseStatusCode,
    httpBS,
    parseRequest,
    setRequestHeader,
 )
import System.Directory (
    doesDirectoryExist,
    getHomeDirectory,
    getTemporaryDirectory,
 )
import System.Environment (lookupEnv)

import qualified Data.Vector.Unboxed as VU
import DataFrame.IO.Parquet.Seeking
import System.FilePath ((</>))
import System.IO (IOMode (ReadMode))

-- Options -----------------------------------------------------------------

{- | Options for reading Parquet data.

These options are applied in this order:

1. predicate filtering
2. column projection
3. row range

Column selection for @selectedColumns@ uses leaf column names only.
-}
data ParquetReadOptions = ParquetReadOptions
    { ParquetReadOptions -> Maybe [Text]
selectedColumns :: Maybe [T.Text]
    {- ^ Columns to keep in the final dataframe. If set, only these columns are returned.
    Predicate-referenced columns are read automatically when needed and projected out after filtering.
    -}
    , ParquetReadOptions -> Maybe (Expr Bool)
predicate :: Maybe (Expr Bool)
    -- ^ Optional row filter expression applied before projection.
    , ParquetReadOptions -> Maybe (Int, Int)
rowRange :: Maybe (Int, Int)
    -- ^ Optional row slice @(start, end)@ with start-inclusive/end-exclusive semantics.
    }
    deriving (ParquetReadOptions -> ParquetReadOptions -> Bool
(ParquetReadOptions -> ParquetReadOptions -> Bool)
-> (ParquetReadOptions -> ParquetReadOptions -> Bool)
-> Eq ParquetReadOptions
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ParquetReadOptions -> ParquetReadOptions -> Bool
== :: ParquetReadOptions -> ParquetReadOptions -> Bool
$c/= :: ParquetReadOptions -> ParquetReadOptions -> Bool
/= :: ParquetReadOptions -> ParquetReadOptions -> Bool
Eq, Int -> ParquetReadOptions -> ShowS
[ParquetReadOptions] -> ShowS
ParquetReadOptions -> [Char]
(Int -> ParquetReadOptions -> ShowS)
-> (ParquetReadOptions -> [Char])
-> ([ParquetReadOptions] -> ShowS)
-> Show ParquetReadOptions
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ParquetReadOptions -> ShowS
showsPrec :: Int -> ParquetReadOptions -> ShowS
$cshow :: ParquetReadOptions -> [Char]
show :: ParquetReadOptions -> [Char]
$cshowList :: [ParquetReadOptions] -> ShowS
showList :: [ParquetReadOptions] -> ShowS
Show)

{- | Default Parquet read options.

Equivalent to:

@
ParquetReadOptions
    { selectedColumns = Nothing
    , predicate = Nothing
    , rowRange = Nothing
    }
@
-}
defaultParquetReadOptions :: ParquetReadOptions
defaultParquetReadOptions :: ParquetReadOptions
defaultParquetReadOptions =
    ParquetReadOptions
        { selectedColumns :: Maybe [Text]
selectedColumns = Maybe [Text]
forall a. Maybe a
Nothing
        , predicate :: Maybe (Expr Bool)
predicate = Maybe (Expr Bool)
forall a. Maybe a
Nothing
        , rowRange :: Maybe (Int, Int)
rowRange = Maybe (Int, Int)
forall a. Maybe a
Nothing
        }

-- Public API --------------------------------------------------------------

{- | Read a parquet file from path and load it into a dataframe.

==== __Example__
@
ghci> D.readParquet ".\/data\/mtcars.parquet"
@
-}
readParquet :: FilePath -> IO DataFrame
readParquet :: [Char] -> IO DataFrame
readParquet = ParquetReadOptions -> [Char] -> IO DataFrame
readParquetWithOpts ParquetReadOptions
defaultParquetReadOptions

{- | Read a Parquet file using explicit read options.

==== __Example__
@
ghci> D.readParquetWithOpts
ghci|   (D.defaultParquetReadOptions{D.selectedColumns = Just ["id"], D.rowRange = Just (0, 10)})
ghci|   "./tests/data/alltypes_plain.parquet"
@

When @selectedColumns@ is set and @predicate@ references other columns, those predicate columns
are auto-included for decoding, then projected back to the requested output columns.
-}

{- | Strip Parquet encoding artifact names (REPEATED wrappers and their single
  list-element children) from a raw column path, leaving user-visible names.
-}
cleanColPath :: [SNode] -> [String] -> [String]
cleanColPath :: [SNode] -> [[Char]] -> [[Char]]
cleanColPath [SNode]
nodes [[Char]]
path = [SNode] -> [[Char]] -> Bool -> [[Char]]
go [SNode]
nodes [[Char]]
path Bool
False
  where
    go :: [SNode] -> [[Char]] -> Bool -> [[Char]]
go [SNode]
_ [] Bool
_ = []
    go [SNode]
ns ([Char]
p : [[Char]]
ps) Bool
skipThis =
        case (SNode -> Bool) -> [SNode] -> Maybe SNode
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
L.find (\SNode
n -> SNode -> [Char]
sName SNode
n [Char] -> [Char] -> Bool
forall a. Eq a => a -> a -> Bool
== [Char]
p) [SNode]
ns of
            Maybe SNode
Nothing -> []
            Just SNode
n
                | SNode -> RepetitionType
sRep SNode
n RepetitionType -> RepetitionType -> Bool
forall a. Eq a => a -> a -> Bool
== RepetitionType
REPEATED Bool -> Bool -> Bool
&& Bool -> Bool
not ([SNode] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (SNode -> [SNode]
sChildren SNode
n)) ->
                    let skipChildren :: Bool
skipChildren = [SNode] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (SNode -> [SNode]
sChildren SNode
n) Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
                     in [SNode] -> [[Char]] -> Bool -> [[Char]]
go (SNode -> [SNode]
sChildren SNode
n) [[Char]]
ps Bool
skipChildren
                | Bool
skipThis ->
                    [SNode] -> [[Char]] -> Bool -> [[Char]]
go (SNode -> [SNode]
sChildren SNode
n) [[Char]]
ps Bool
False
                | [SNode] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (SNode -> [SNode]
sChildren SNode
n) ->
                    [[Char]
p]
                | Bool
otherwise ->
                    [Char]
p [Char] -> [[Char]] -> [[Char]]
forall a. a -> [a] -> [a]
: [SNode] -> [[Char]] -> Bool -> [[Char]]
go (SNode -> [SNode]
sChildren SNode
n) [[Char]]
ps Bool
False

readParquetWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame
readParquetWithOpts :: ParquetReadOptions -> [Char] -> IO DataFrame
readParquetWithOpts ParquetReadOptions
opts [Char]
path
    | [Char] -> Bool
isHFUri [Char]
path = do
        [[Char]]
paths <- [Char] -> IO [[Char]]
fetchHFParquetFiles [Char]
path
        let optsNoRange :: ParquetReadOptions
optsNoRange = ParquetReadOptions
opts{rowRange = Nothing}
        [DataFrame]
dfs <- ([Char] -> IO DataFrame) -> [[Char]] -> IO [DataFrame]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (ForceNonSeekable -> ParquetReadOptions -> [Char] -> IO DataFrame
_readParquetWithOpts ForceNonSeekable
forall a. Maybe a
Nothing ParquetReadOptions
optsNoRange) [[Char]]
paths
        DataFrame -> IO DataFrame
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ParquetReadOptions -> DataFrame -> DataFrame
applyRowRange ParquetReadOptions
opts ([DataFrame] -> DataFrame
forall a. Monoid a => [a] -> a
mconcat [DataFrame]
dfs))
    | Bool
otherwise = ForceNonSeekable -> ParquetReadOptions -> [Char] -> IO DataFrame
_readParquetWithOpts ForceNonSeekable
forall a. Maybe a
Nothing ParquetReadOptions
opts [Char]
path

-- | Internal function to pass testing parameters
_readParquetWithOpts ::
    ForceNonSeekable -> ParquetReadOptions -> FilePath -> IO DataFrame
_readParquetWithOpts :: ForceNonSeekable -> ParquetReadOptions -> [Char] -> IO DataFrame
_readParquetWithOpts ForceNonSeekable
extraConfig ParquetReadOptions
opts [Char]
path = ForceNonSeekable
-> [Char]
-> IOMode
-> (FileBufferedOrSeekable -> IO DataFrame)
-> IO DataFrame
forall a.
ForceNonSeekable
-> [Char] -> IOMode -> (FileBufferedOrSeekable -> IO a) -> IO a
withFileBufferedOrSeekable ForceNonSeekable
extraConfig [Char]
path IOMode
ReadMode ((FileBufferedOrSeekable -> IO DataFrame) -> IO DataFrame)
-> (FileBufferedOrSeekable -> IO DataFrame) -> IO DataFrame
forall a b. (a -> b) -> a -> b
$ \FileBufferedOrSeekable
file -> do
    FileMetadata
fileMetadata <- FileBufferedOrSeekable -> IO FileMetadata
readMetadataFromHandle FileBufferedOrSeekable
file
    let columnPaths :: [(Text, Int)]
columnPaths = [SchemaElement] -> [(Text, Int)]
getColumnPaths (Int -> [SchemaElement] -> [SchemaElement]
forall a. Int -> [a] -> [a]
drop Int
1 ([SchemaElement] -> [SchemaElement])
-> [SchemaElement] -> [SchemaElement]
forall a b. (a -> b) -> a -> b
$ FileMetadata -> [SchemaElement]
schema FileMetadata
fileMetadata)
    let columnNames :: [Text]
columnNames = ((Text, Int) -> Text) -> [(Text, Int)] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (Text, Int) -> Text
forall a b. (a, b) -> a
fst [(Text, Int)]
columnPaths
    let leafNames :: [Text]
leafNames = (Text -> Text) -> [Text] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map ([Text] -> Text
forall a. HasCallStack => [a] -> a
last ([Text] -> Text) -> (Text -> [Text]) -> Text -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HasCallStack => Text -> Text -> [Text]
Text -> Text -> [Text]
T.splitOn Text
".") [Text]
columnNames
    let availableSelectedColumns :: [Text]
availableSelectedColumns = [Text] -> [Text]
forall a. Eq a => [a] -> [a]
L.nub [Text]
leafNames
    let predicateColumns :: [Text]
predicateColumns = [Text] -> (Expr Bool -> [Text]) -> Maybe (Expr Bool) -> [Text]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [] ([Text] -> [Text]
forall a. Eq a => [a] -> [a]
L.nub ([Text] -> [Text]) -> (Expr Bool -> [Text]) -> Expr Bool -> [Text]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Expr Bool -> [Text]
forall a. Expr a -> [Text]
getColumns) (ParquetReadOptions -> Maybe (Expr Bool)
predicate ParquetReadOptions
opts)
    let selectedColumnsForRead :: Maybe [Text]
selectedColumnsForRead = case ParquetReadOptions -> Maybe [Text]
selectedColumns ParquetReadOptions
opts of
            Maybe [Text]
Nothing -> Maybe [Text]
forall a. Maybe a
Nothing
            Just [Text]
selected -> [Text] -> Maybe [Text]
forall a. a -> Maybe a
Just ([Text] -> [Text]
forall a. Eq a => [a] -> [a]
L.nub ([Text]
selected [Text] -> [Text] -> [Text]
forall a. [a] -> [a] -> [a]
++ [Text]
predicateColumns))
    let selectedColumnSet :: Maybe (Set Text)
selectedColumnSet = [Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList ([Text] -> Set Text) -> Maybe [Text] -> Maybe (Set Text)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe [Text]
selectedColumnsForRead
    let shouldReadColumn :: Text -> p -> Bool
shouldReadColumn Text
colName p
_ =
            case Maybe (Set Text)
selectedColumnSet of
                Maybe (Set Text)
Nothing -> Bool
True
                Just Set Text
selected -> Text
colName Text -> Set Text -> Bool
forall a. Ord a => a -> Set a -> Bool
`S.member` Set Text
selected

    case Maybe [Text]
selectedColumnsForRead of
        Maybe [Text]
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        Just [Text]
requested ->
            let missing :: [Text]
missing = [Text]
requested [Text] -> [Text] -> [Text]
forall a. Eq a => [a] -> [a] -> [a]
L.\\ [Text]
availableSelectedColumns
             in Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless
                    ([Text] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
L.null [Text]
missing)
                    ( DataFrameException -> IO ()
forall a e. Exception e => e -> a
throw
                        ( Text -> Text -> [Text] -> DataFrameException
ColumnNotFoundException
                            ([Char] -> Text
T.pack ([Char] -> Text) -> [Char] -> Text
forall a b. (a -> b) -> a -> b
$ [Text] -> [Char]
forall a. Show a => a -> [Char]
show [Text]
missing)
                            Text
"readParquetWithOpts"
                            [Text]
availableSelectedColumns
                        )
                    )

    let totalRows :: Int
totalRows = [Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ((RowGroup -> Int) -> [RowGroup] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map (Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int) -> (RowGroup -> Int64) -> RowGroup -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RowGroup -> Int64
rowGroupNumRows) (FileMetadata -> [RowGroup]
rowGroups FileMetadata
fileMetadata)) :: Int
    IORef (Map Text MutableColumn)
colMutMap <- Map Text MutableColumn -> IO (IORef (Map Text MutableColumn))
forall a. a -> IO (IORef a)
newIORef (Map Text MutableColumn
forall k a. Map k a
M.empty :: M.Map T.Text DI.MutableColumn)
    IORef (Map Text Int)
colOffMap <- Map Text Int -> IO (IORef (Map Text Int))
forall a. a -> IO (IORef a)
newIORef (Map Text Int
forall k a. Map k a
M.empty :: M.Map T.Text Int)
    IORef (Map Text LogicalType)
lTypeMap <- Map Text LogicalType -> IO (IORef (Map Text LogicalType))
forall a. a -> IO (IORef a)
newIORef (Map Text LogicalType
forall k a. Map k a
M.empty :: M.Map T.Text LogicalType)

    let schemaElements :: [SchemaElement]
schemaElements = FileMetadata -> [SchemaElement]
schema FileMetadata
fileMetadata
    let sNodes :: [SNode]
sNodes = [SchemaElement] -> [SNode]
parseAll (Int -> [SchemaElement] -> [SchemaElement]
forall a. Int -> [a] -> [a]
drop Int
1 [SchemaElement]
schemaElements)
    let getTypeLength :: [String] -> Maybe Int32
        getTypeLength :: [[Char]] -> Maybe Int32
getTypeLength [[Char]]
path = [SchemaElement] -> [[Char]] -> Integer -> Maybe Int32
forall {t}.
Num t =>
[SchemaElement] -> [[Char]] -> t -> Maybe Int32
findTypeLength [SchemaElement]
schemaElements [[Char]]
path Integer
0
          where
            findTypeLength :: [SchemaElement] -> [[Char]] -> t -> Maybe Int32
findTypeLength [] [[Char]]
_ t
_ = Maybe Int32
forall a. Maybe a
Nothing
            findTypeLength (SchemaElement
s : [SchemaElement]
ss) [[Char]]
targetPath t
depth
                | (Text -> [Char]) -> [Text] -> [[Char]]
forall a b. (a -> b) -> [a] -> [b]
map Text -> [Char]
T.unpack (SchemaElement -> [SchemaElement] -> t -> [Text]
forall {p} {p} {p} {a}. p -> p -> p -> [a]
pathToElement SchemaElement
s [SchemaElement]
ss t
depth) [[Char]] -> [[Char]] -> Bool
forall a. Eq a => a -> a -> Bool
== [[Char]]
targetPath
                    Bool -> Bool -> Bool
&& SchemaElement -> TType
elementType SchemaElement
s TType -> TType -> Bool
forall a. Eq a => a -> a -> Bool
== TType
STRING
                    Bool -> Bool -> Bool
&& SchemaElement -> Int32
typeLength SchemaElement
s Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
> Int32
0 =
                    Int32 -> Maybe Int32
forall a. a -> Maybe a
Just (SchemaElement -> Int32
typeLength SchemaElement
s)
                | Bool
otherwise =
                    [SchemaElement] -> [[Char]] -> t -> Maybe Int32
findTypeLength [SchemaElement]
ss [[Char]]
targetPath (if SchemaElement -> Int32
numChildren SchemaElement
s Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
> Int32
0 then t
depth t -> t -> t
forall a. Num a => a -> a -> a
+ t
1 else t
depth)

            pathToElement :: p -> p -> p -> [a]
pathToElement p
_ p
_ p
_ = []

    [RowGroup] -> (RowGroup -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (FileMetadata -> [RowGroup]
rowGroups FileMetadata
fileMetadata) ((RowGroup -> IO ()) -> IO ()) -> (RowGroup -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \RowGroup
rowGroup -> do
        [(ColumnChunk, Integer)]
-> ((ColumnChunk, Integer) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([ColumnChunk] -> [Integer] -> [(ColumnChunk, Integer)]
forall a b. [a] -> [b] -> [(a, b)]
zip (RowGroup -> [ColumnChunk]
rowGroupColumns RowGroup
rowGroup) [Integer
0 ..]) (((ColumnChunk, Integer) -> IO ()) -> IO ())
-> ((ColumnChunk, Integer) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(ColumnChunk
colChunk, Integer
colIdx) -> do
            let metadata :: ColumnMetaData
metadata = ColumnChunk -> ColumnMetaData
columnMetaData ColumnChunk
colChunk
            let colPath :: [[Char]]
colPath = ColumnMetaData -> [[Char]]
columnPathInSchema ColumnMetaData
metadata
            let cleanPath :: [[Char]]
cleanPath = [SNode] -> [[Char]] -> [[Char]]
cleanColPath [SNode]
sNodes [[Char]]
colPath
            let colLeafName :: Text
colLeafName =
                    if [[Char]] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [[Char]]
cleanPath
                        then [Char] -> Text
T.pack ([Char] -> Text) -> [Char] -> Text
forall a b. (a -> b) -> a -> b
$ [Char]
"col_" [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Integer -> [Char]
forall a. Show a => a -> [Char]
show Integer
colIdx
                        else [Char] -> Text
T.pack ([Char] -> Text) -> [Char] -> Text
forall a b. (a -> b) -> a -> b
$ [[Char]] -> [Char]
forall a. HasCallStack => [a] -> a
last [[Char]]
cleanPath
            let colFullName :: Text
colFullName =
                    if [[Char]] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [[Char]]
cleanPath
                        then Text
colLeafName
                        else Text -> [Text] -> Text
T.intercalate Text
"." ([Text] -> Text) -> [Text] -> Text
forall a b. (a -> b) -> a -> b
$ ([Char] -> Text) -> [[Char]] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map [Char] -> Text
T.pack [[Char]]
cleanPath

            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Text -> [[Char]] -> Bool
forall {p}. Text -> p -> Bool
shouldReadColumn Text
colLeafName [[Char]]
colPath) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                let colDataPageOffset :: Int64
colDataPageOffset = ColumnMetaData -> Int64
columnDataPageOffset ColumnMetaData
metadata
                let colDictionaryPageOffset :: Int64
colDictionaryPageOffset = ColumnMetaData -> Int64
columnDictionaryPageOffset ColumnMetaData
metadata
                let colStart :: Int64
colStart =
                        if Int64
colDictionaryPageOffset Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
0 Bool -> Bool -> Bool
&& Int64
colDataPageOffset Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
colDictionaryPageOffset
                            then Int64
colDictionaryPageOffset
                            else Int64
colDataPageOffset
                let colLength :: Int64
colLength = ColumnMetaData -> Int64
columnTotalCompressedSize ColumnMetaData
metadata

                ByteString
columnBytes <-
                    Maybe (SeekMode, Integer)
-> Int -> FileBufferedOrSeekable -> IO ByteString
seekAndReadBytes
                        ((SeekMode, Integer) -> Maybe (SeekMode, Integer)
forall a. a -> Maybe a
Just (SeekMode
AbsoluteSeek, Int64 -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
colStart))
                        (Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
colLength)
                        FileBufferedOrSeekable
file

                [Page]
pages <- CompressionCodec -> ByteString -> IO [Page]
readAllPages (ColumnMetaData -> CompressionCodec
columnCodec ColumnMetaData
metadata) ByteString
columnBytes

                let maybeTypeLength :: Maybe Int32
maybeTypeLength =
                        if ColumnMetaData -> ParquetType
columnType ColumnMetaData
metadata ParquetType -> ParquetType -> Bool
forall a. Eq a => a -> a -> Bool
== ParquetType
PFIXED_LEN_BYTE_ARRAY
                            then [[Char]] -> Maybe Int32
getTypeLength [[Char]]
colPath
                            else Maybe Int32
forall a. Maybe a
Nothing

                let primaryEncoding :: ParquetEncoding
primaryEncoding = ParquetEncoding
-> ((ParquetEncoding, [ParquetEncoding]) -> ParquetEncoding)
-> Maybe (ParquetEncoding, [ParquetEncoding])
-> ParquetEncoding
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ParquetEncoding
EPLAIN (ParquetEncoding, [ParquetEncoding]) -> ParquetEncoding
forall a b. (a, b) -> a
fst ([ParquetEncoding] -> Maybe (ParquetEncoding, [ParquetEncoding])
forall a. [a] -> Maybe (a, [a])
L.uncons (ColumnMetaData -> [ParquetEncoding]
columnEncodings ColumnMetaData
metadata))

                let schemaTail :: [SchemaElement]
schemaTail = Int -> [SchemaElement] -> [SchemaElement]
forall a. Int -> [a] -> [a]
drop Int
1 (FileMetadata -> [SchemaElement]
schema FileMetadata
fileMetadata)
                let (Int
maxDef, Int
maxRep) = [SchemaElement] -> [[Char]] -> (Int, Int)
levelsForPath [SchemaElement]
schemaTail [[Char]]
colPath
                let lType :: LogicalType
lType =
                        LogicalType
-> (SchemaElement -> LogicalType)
-> Maybe SchemaElement
-> LogicalType
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
                            LogicalType
LOGICAL_TYPE_UNKNOWN
                            SchemaElement -> LogicalType
logicalType
                            ([SchemaElement] -> [[Char]] -> Maybe SchemaElement
findLeafSchema [SchemaElement]
schemaTail [[Char]]
colPath)
                Column
column <-
                    (Int, Int)
-> [Page]
-> ParquetType
-> ParquetEncoding
-> Maybe Int32
-> LogicalType
-> IO Column
processColumnPages
                        (Int
maxDef, Int
maxRep)
                        [Page]
pages
                        (ColumnMetaData -> ParquetType
columnType ColumnMetaData
metadata)
                        ParquetEncoding
primaryEncoding
                        Maybe Int32
maybeTypeLength
                        LogicalType
lType

                Map Text MutableColumn
mutMapSnap <- IORef (Map Text MutableColumn) -> IO (Map Text MutableColumn)
forall a. IORef a -> IO a
readIORef IORef (Map Text MutableColumn)
colMutMap
                case Text -> Map Text MutableColumn -> Maybe MutableColumn
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup Text
colFullName Map Text MutableColumn
mutMapSnap of
                    Maybe MutableColumn
Nothing -> do
                        MutableColumn
mc <- Int -> Column -> IO MutableColumn
DI.newMutableColumn Int
totalRows Column
column
                        MutableColumn -> Int -> Column -> IO ()
DI.copyIntoMutableColumn MutableColumn
mc Int
0 Column
column
                        IORef (Map Text MutableColumn)
-> (Map Text MutableColumn -> Map Text MutableColumn) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef (Map Text MutableColumn)
colMutMap (Text
-> MutableColumn
-> Map Text MutableColumn
-> Map Text MutableColumn
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert Text
colFullName MutableColumn
mc)
                        IORef (Map Text Int) -> (Map Text Int -> Map Text Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef (Map Text Int)
colOffMap (Text -> Int -> Map Text Int -> Map Text Int
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert Text
colFullName (Column -> Int
DI.columnLength Column
column))
                    Just MutableColumn
mc -> do
                        Int
off <- (Map Text Int -> Text -> Int
forall k a. Ord k => Map k a -> k -> a
M.! Text
colFullName) (Map Text Int -> Int) -> IO (Map Text Int) -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef (Map Text Int) -> IO (Map Text Int)
forall a. IORef a -> IO a
readIORef IORef (Map Text Int)
colOffMap
                        MutableColumn -> Int -> Column -> IO ()
DI.copyIntoMutableColumn MutableColumn
mc Int
off Column
column
                        IORef (Map Text Int) -> (Map Text Int -> Map Text Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef (Map Text Int)
colOffMap ((Int -> Int) -> Text -> Map Text Int -> Map Text Int
forall k a. Ord k => (a -> a) -> k -> Map k a -> Map k a
M.adjust (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Column -> Int
DI.columnLength Column
column) Text
colFullName)
                IORef (Map Text LogicalType)
-> (Map Text LogicalType -> Map Text LogicalType) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef (Map Text LogicalType)
lTypeMap (Text -> LogicalType -> Map Text LogicalType -> Map Text LogicalType
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert Text
colFullName LogicalType
lType)

    Map Text MutableColumn
finalMutMap <- IORef (Map Text MutableColumn) -> IO (Map Text MutableColumn)
forall a. IORef a -> IO a
readIORef IORef (Map Text MutableColumn)
colMutMap
    Map Text Column
finalColMap <-
        (Text -> MutableColumn -> IO Column)
-> Map Text MutableColumn -> IO (Map Text Column)
forall (t :: * -> *) k a b.
Applicative t =>
(k -> a -> t b) -> Map k a -> t (Map k b)
M.traverseWithKey (\Text
_ MutableColumn
mc -> MutableColumn -> IO Column
DI.freezeMutableColumn MutableColumn
mc) Map Text MutableColumn
finalMutMap
    Map Text LogicalType
finalLTypeMap <- IORef (Map Text LogicalType) -> IO (Map Text LogicalType)
forall a. IORef a -> IO a
readIORef IORef (Map Text LogicalType)
lTypeMap
    let orderedColumns :: [(Text, Column)]
orderedColumns =
            (Text -> (Text, Column)) -> [Text] -> [(Text, Column)]
forall a b. (a -> b) -> [a] -> [b]
map
                ( \Text
name ->
                    ( Text
name
                    , LogicalType -> Column -> Column
applyLogicalType (Map Text LogicalType
finalLTypeMap Map Text LogicalType -> Text -> LogicalType
forall k a. Ord k => Map k a -> k -> a
M.! Text
name) (Column -> Column) -> Column -> Column
forall a b. (a -> b) -> a -> b
$ Map Text Column
finalColMap Map Text Column -> Text -> Column
forall k a. Ord k => Map k a -> k -> a
M.! Text
name
                    )
                )
                ((Text -> Bool) -> [Text] -> [Text]
forall a. (a -> Bool) -> [a] -> [a]
filter (Text -> Map Text Column -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`M.member` Map Text Column
finalColMap) [Text]
columnNames)

    DataFrame -> IO DataFrame
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DataFrame -> IO DataFrame) -> DataFrame -> IO DataFrame
forall a b. (a -> b) -> a -> b
$ ParquetReadOptions -> DataFrame -> DataFrame
applyReadOptions ParquetReadOptions
opts ([(Text, Column)] -> DataFrame
DI.fromNamedColumns [(Text, Column)]
orderedColumns)

{- | Read Parquet files from a directory or glob path.

This is equivalent to calling 'readParquetFilesWithOpts' with 'defaultParquetReadOptions'.
-}
readParquetFiles :: FilePath -> IO DataFrame
readParquetFiles :: [Char] -> IO DataFrame
readParquetFiles = ParquetReadOptions -> [Char] -> IO DataFrame
readParquetFilesWithOpts ParquetReadOptions
defaultParquetReadOptions

{- | Read multiple Parquet files (directory or glob) using explicit options.

If @path@ is a directory, all non-directory entries are read.
If @path@ is a glob, matching files are read.

For multi-file reads, @rowRange@ is applied once after concatenation (global range semantics).

==== __Example__
@
ghci> D.readParquetFilesWithOpts
ghci|   (D.defaultParquetReadOptions{D.selectedColumns = Just ["id"], D.rowRange = Just (0, 5)})
ghci|   "./tests/data/alltypes_plain*.parquet"
@
-}
readParquetFilesWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame
readParquetFilesWithOpts :: ParquetReadOptions -> [Char] -> IO DataFrame
readParquetFilesWithOpts ParquetReadOptions
opts [Char]
path
    | [Char] -> Bool
isHFUri [Char]
path = do
        [[Char]]
files <- [Char] -> IO [[Char]]
fetchHFParquetFiles [Char]
path
        let optsWithoutRowRange :: ParquetReadOptions
optsWithoutRowRange = ParquetReadOptions
opts{rowRange = Nothing}
        [DataFrame]
dfs <- ([Char] -> IO DataFrame) -> [[Char]] -> IO [DataFrame]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (ForceNonSeekable -> ParquetReadOptions -> [Char] -> IO DataFrame
_readParquetWithOpts ForceNonSeekable
forall a. Maybe a
Nothing ParquetReadOptions
optsWithoutRowRange) [[Char]]
files
        DataFrame -> IO DataFrame
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ParquetReadOptions -> DataFrame -> DataFrame
applyRowRange ParquetReadOptions
opts ([DataFrame] -> DataFrame
forall a. Monoid a => [a] -> a
mconcat [DataFrame]
dfs))
    | Bool
otherwise = do
        Bool
isDir <- [Char] -> IO Bool
doesDirectoryExist [Char]
path

        let pat :: [Char]
pat = if Bool
isDir then [Char]
path [Char] -> ShowS
</> [Char]
"*.parquet" else [Char]
path

        [[Char]]
matches <- [Char] -> IO [[Char]]
glob [Char]
pat

        [[Char]]
files <- ([Char] -> IO Bool) -> [[Char]] -> IO [[Char]]
forall (m :: * -> *) a.
Applicative m =>
(a -> m Bool) -> [a] -> m [a]
filterM ((Bool -> Bool) -> IO Bool -> IO Bool
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Bool -> Bool
not (IO Bool -> IO Bool) -> ([Char] -> IO Bool) -> [Char] -> IO Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Char] -> IO Bool
doesDirectoryExist) [[Char]]
matches

        case [[Char]]
files of
            [] ->
                [Char] -> IO DataFrame
forall a. HasCallStack => [Char] -> a
error ([Char] -> IO DataFrame) -> [Char] -> IO DataFrame
forall a b. (a -> b) -> a -> b
$
                    [Char]
"readParquetFiles: no parquet files found for " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
path
            [[Char]]
_ -> do
                let optsWithoutRowRange :: ParquetReadOptions
optsWithoutRowRange = ParquetReadOptions
opts{rowRange = Nothing}
                [DataFrame]
dfs <- ([Char] -> IO DataFrame) -> [[Char]] -> IO [DataFrame]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (ParquetReadOptions -> [Char] -> IO DataFrame
readParquetWithOpts ParquetReadOptions
optsWithoutRowRange) [[Char]]
files
                DataFrame -> IO DataFrame
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ParquetReadOptions -> DataFrame -> DataFrame
applyRowRange ParquetReadOptions
opts ([DataFrame] -> DataFrame
forall a. Monoid a => [a] -> a
mconcat [DataFrame]
dfs))

-- Options application -----------------------------------------------------

applyRowRange :: ParquetReadOptions -> DataFrame -> DataFrame
applyRowRange :: ParquetReadOptions -> DataFrame -> DataFrame
applyRowRange ParquetReadOptions
opts DataFrame
df =
    DataFrame
-> ((Int, Int) -> DataFrame) -> Maybe (Int, Int) -> DataFrame
forall b a. b -> (a -> b) -> Maybe a -> b
maybe DataFrame
df ((Int, Int) -> DataFrame -> DataFrame
`DS.range` DataFrame
df) (ParquetReadOptions -> Maybe (Int, Int)
rowRange ParquetReadOptions
opts)

applySelectedColumns :: ParquetReadOptions -> DataFrame -> DataFrame
applySelectedColumns :: ParquetReadOptions -> DataFrame -> DataFrame
applySelectedColumns ParquetReadOptions
opts DataFrame
df =
    DataFrame -> ([Text] -> DataFrame) -> Maybe [Text] -> DataFrame
forall b a. b -> (a -> b) -> Maybe a -> b
maybe DataFrame
df ([Text] -> DataFrame -> DataFrame
`DS.select` DataFrame
df) (ParquetReadOptions -> Maybe [Text]
selectedColumns ParquetReadOptions
opts)

applyPredicate :: ParquetReadOptions -> DataFrame -> DataFrame
applyPredicate :: ParquetReadOptions -> DataFrame -> DataFrame
applyPredicate ParquetReadOptions
opts DataFrame
df =
    DataFrame
-> (Expr Bool -> DataFrame) -> Maybe (Expr Bool) -> DataFrame
forall b a. b -> (a -> b) -> Maybe a -> b
maybe DataFrame
df (Expr Bool -> DataFrame -> DataFrame
`DS.filterWhere` DataFrame
df) (ParquetReadOptions -> Maybe (Expr Bool)
predicate ParquetReadOptions
opts)

applyReadOptions :: ParquetReadOptions -> DataFrame -> DataFrame
applyReadOptions :: ParquetReadOptions -> DataFrame -> DataFrame
applyReadOptions ParquetReadOptions
opts =
    ParquetReadOptions -> DataFrame -> DataFrame
applyRowRange ParquetReadOptions
opts
        (DataFrame -> DataFrame)
-> (DataFrame -> DataFrame) -> DataFrame -> DataFrame
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ParquetReadOptions -> DataFrame -> DataFrame
applySelectedColumns ParquetReadOptions
opts
        (DataFrame -> DataFrame)
-> (DataFrame -> DataFrame) -> DataFrame -> DataFrame
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ParquetReadOptions -> DataFrame -> DataFrame
applyPredicate ParquetReadOptions
opts

-- File and metadata parsing -----------------------------------------------

-- | read the file in memory at once, parse magicString and return the entire file ByteString
readMetadataFromPath :: FilePath -> IO (FileMetadata, BSO.ByteString)
readMetadataFromPath :: [Char] -> IO (FileMetadata, ByteString)
readMetadataFromPath [Char]
path = do
    ByteString
contents <- [Char] -> IO ByteString
BSO.readFile [Char]
path
    let (Int
size, ByteString
magicString) = ByteString -> (Int, ByteString)
readMetadataSizeFromFooter ByteString
contents
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString
magicString ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= ByteString
"PAR1") (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Invalid Parquet file"
    FileMetadata
meta <- ByteString -> Int -> IO FileMetadata
readMetadata ByteString
contents Int
size
    (FileMetadata, ByteString) -> IO (FileMetadata, ByteString)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (FileMetadata
meta, ByteString
contents)

-- | read from the end of the file, parse magicString and return the entire file ByteString
readMetadataFromHandle :: FileBufferedOrSeekable -> IO FileMetadata
readMetadataFromHandle :: FileBufferedOrSeekable -> IO FileMetadata
readMetadataFromHandle FileBufferedOrSeekable
sh = do
    ByteString
footerBs <- Integer -> FileBufferedOrSeekable -> IO ByteString
readLastBytes (Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
footerSize) FileBufferedOrSeekable
sh
    let (Int
size, ByteString
magicString) = ByteString -> (Int, ByteString)
readMetadataSizeFromFooterSlice ByteString
footerBs
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString
magicString ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= ByteString
"PAR1") (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Invalid Parquet file"
    FileBufferedOrSeekable -> Int -> IO FileMetadata
readMetadataByHandleMetaSize FileBufferedOrSeekable
sh Int
size

-- | Takes the last 8 bit of the file to parse metadata size and magic string
readMetadataSizeFromFooterSlice :: BSO.ByteString -> (Int, BSO.ByteString)
readMetadataSizeFromFooterSlice :: ByteString -> (Int, ByteString)
readMetadataSizeFromFooterSlice ByteString
contents =
    let
        size :: Int
size = Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteString -> Word32
littleEndianWord32 ByteString
contents)
        magicString :: ByteString
magicString = Int -> ByteString -> ByteString
BSO.take Int
4 (Int -> ByteString -> ByteString
BSO.drop Int
4 ByteString
contents)
     in
        (Int
size, ByteString
magicString)

readMetadataSizeFromFooter :: BSO.ByteString -> (Int, BSO.ByteString)
readMetadataSizeFromFooter :: ByteString -> (Int, ByteString)
readMetadataSizeFromFooter = ByteString -> (Int, ByteString)
readMetadataSizeFromFooterSlice (ByteString -> (Int, ByteString))
-> (ByteString -> ByteString) -> ByteString -> (Int, ByteString)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> ByteString -> ByteString
BSO.takeEnd Int
8

-- Schema navigation -------------------------------------------------------

getColumnPaths :: [SchemaElement] -> [(T.Text, Int)]
getColumnPaths :: [SchemaElement] -> [(Text, Int)]
getColumnPaths [SchemaElement]
schemaElements =
    let nodes :: [SNode]
nodes = [SchemaElement] -> [SNode]
parseAll [SchemaElement]
schemaElements
     in [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go [SNode]
nodes Int
0 [] Bool
False
  where
    go :: [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go [] Int
_ [Text]
_ Bool
_ = []
    go (SNode
n : [SNode]
ns) Int
idx [Text]
path Bool
skipThis
        | [SNode] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (SNode -> [SNode]
sChildren SNode
n) =
            let newPath :: [Text]
newPath = if Bool
skipThis then [Text]
path else [Text]
path [Text] -> [Text] -> [Text]
forall a. [a] -> [a] -> [a]
++ [[Char] -> Text
T.pack (SNode -> [Char]
sName SNode
n)]
                fullPath :: Text
fullPath = Text -> [Text] -> Text
T.intercalate Text
"." [Text]
newPath
             in (Text
fullPath, Int
idx) (Text, Int) -> [(Text, Int)] -> [(Text, Int)]
forall a. a -> [a] -> [a]
: [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go [SNode]
ns (Int
idx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) [Text]
path Bool
skipThis
        | SNode -> RepetitionType
sRep SNode
n RepetitionType -> RepetitionType -> Bool
forall a. Eq a => a -> a -> Bool
== RepetitionType
REPEATED =
            let skipChildren :: Bool
skipChildren = [SNode] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (SNode -> [SNode]
sChildren SNode
n) Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
                childLeaves :: [(Text, Int)]
childLeaves = [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go (SNode -> [SNode]
sChildren SNode
n) Int
idx [Text]
path Bool
skipChildren
             in [(Text, Int)]
childLeaves [(Text, Int)] -> [(Text, Int)] -> [(Text, Int)]
forall a. [a] -> [a] -> [a]
++ [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go [SNode]
ns (Int
idx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ [(Text, Int)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(Text, Int)]
childLeaves) [Text]
path Bool
skipThis
        | Bool
skipThis =
            let childLeaves :: [(Text, Int)]
childLeaves = [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go (SNode -> [SNode]
sChildren SNode
n) Int
idx [Text]
path Bool
False
             in [(Text, Int)]
childLeaves [(Text, Int)] -> [(Text, Int)] -> [(Text, Int)]
forall a. [a] -> [a] -> [a]
++ [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go [SNode]
ns (Int
idx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ [(Text, Int)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(Text, Int)]
childLeaves) [Text]
path Bool
skipThis
        | Bool
otherwise =
            let subPath :: [Text]
subPath = [Text]
path [Text] -> [Text] -> [Text]
forall a. [a] -> [a] -> [a]
++ [[Char] -> Text
T.pack (SNode -> [Char]
sName SNode
n)]
                childLeaves :: [(Text, Int)]
childLeaves = [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go (SNode -> [SNode]
sChildren SNode
n) Int
idx [Text]
subPath Bool
False
             in [(Text, Int)]
childLeaves [(Text, Int)] -> [(Text, Int)] -> [(Text, Int)]
forall a. [a] -> [a] -> [a]
++ [SNode] -> Int -> [Text] -> Bool -> [(Text, Int)]
go [SNode]
ns (Int
idx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ [(Text, Int)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(Text, Int)]
childLeaves) [Text]
path Bool
skipThis

findLeafSchema :: [SchemaElement] -> [String] -> Maybe SchemaElement
findLeafSchema :: [SchemaElement] -> [[Char]] -> Maybe SchemaElement
findLeafSchema [SchemaElement]
elems [[Char]]
path =
    case [SNode] -> [[Char]] -> Maybe SNode
go ([SchemaElement] -> [SNode]
parseAll [SchemaElement]
elems) [[Char]]
path of
        Just SNode
node -> (SchemaElement -> Bool) -> [SchemaElement] -> Maybe SchemaElement
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
L.find (\SchemaElement
e -> Text -> [Char]
T.unpack (SchemaElement -> Text
elementName SchemaElement
e) [Char] -> [Char] -> Bool
forall a. Eq a => a -> a -> Bool
== SNode -> [Char]
sName SNode
node) [SchemaElement]
elems
        Maybe SNode
Nothing -> Maybe SchemaElement
forall a. Maybe a
Nothing
  where
    go :: [SNode] -> [[Char]] -> Maybe SNode
go [] [[Char]]
_ = Maybe SNode
forall a. Maybe a
Nothing
    go [SNode]
_ [] = Maybe SNode
forall a. Maybe a
Nothing
    go [SNode]
nodes [[Char]
p] = (SNode -> Bool) -> [SNode] -> Maybe SNode
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
L.find (\SNode
n -> SNode -> [Char]
sName SNode
n [Char] -> [Char] -> Bool
forall a. Eq a => a -> a -> Bool
== [Char]
p) [SNode]
nodes
    go [SNode]
nodes ([Char]
p : [[Char]]
ps) = (SNode -> Bool) -> [SNode] -> Maybe SNode
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
L.find (\SNode
n -> SNode -> [Char]
sName SNode
n [Char] -> [Char] -> Bool
forall a. Eq a => a -> a -> Bool
== [Char]
p) [SNode]
nodes Maybe SNode -> (SNode -> Maybe SNode) -> Maybe SNode
forall a b. Maybe a -> (a -> Maybe b) -> Maybe b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \SNode
n -> [SNode] -> [[Char]] -> Maybe SNode
go (SNode -> [SNode]
sChildren SNode
n) [[Char]]
ps

-- Page decoding -----------------------------------------------------------

processColumnPages ::
    (Int, Int) ->
    [Page] ->
    ParquetType ->
    ParquetEncoding ->
    Maybe Int32 ->
    LogicalType ->
    IO DI.Column
processColumnPages :: (Int, Int)
-> [Page]
-> ParquetType
-> ParquetEncoding
-> Maybe Int32
-> LogicalType
-> IO Column
processColumnPages (Int
maxDef, Int
maxRep) [Page]
pages ParquetType
pType ParquetEncoding
_ Maybe Int32
maybeTypeLength LogicalType
lType = do
    let dictPages :: [Page]
dictPages = (Page -> Bool) -> [Page] -> [Page]
forall a. (a -> Bool) -> [a] -> [a]
filter Page -> Bool
isDictionaryPage [Page]
pages
    let dataPages :: [Page]
dataPages = (Page -> Bool) -> [Page] -> [Page]
forall a. (a -> Bool) -> [a] -> [a]
filter Page -> Bool
isDataPage [Page]
pages

    let dictValsM :: Maybe DictVals
dictValsM =
            case [Page]
dictPages of
                [] -> Maybe DictVals
forall a. Maybe a
Nothing
                (Page
dictPage : [Page]
_) ->
                    case PageHeader -> PageTypeHeader
pageTypeHeader (Page -> PageHeader
pageHeader Page
dictPage) of
                        DictionaryPageHeader{Bool
Int32
ParquetEncoding
dictionaryPageHeaderNumValues :: Int32
dictionaryPageHeaderEncoding :: ParquetEncoding
dictionaryPageIsSorted :: Bool
dictionaryPageIsSorted :: PageTypeHeader -> Bool
dictionaryPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dictionaryPageHeaderNumValues :: PageTypeHeader -> Int32
..} ->
                            let countForBools :: Maybe Int32
countForBools =
                                    if ParquetType
pType ParquetType -> ParquetType -> Bool
forall a. Eq a => a -> a -> Bool
== ParquetType
PBOOLEAN
                                        then Int32 -> Maybe Int32
forall a. a -> Maybe a
Just Int32
dictionaryPageHeaderNumValues
                                        else Maybe Int32
maybeTypeLength
                             in DictVals -> Maybe DictVals
forall a. a -> Maybe a
Just (ParquetType -> ByteString -> Maybe Int32 -> DictVals
readDictVals ParquetType
pType (Page -> ByteString
pageBytes Page
dictPage) Maybe Int32
countForBools)
                        PageTypeHeader
_ -> Maybe DictVals
forall a. Maybe a
Nothing

    [Column]
cols <- [Page] -> (Page -> IO Column) -> IO [Column]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Page]
dataPages ((Page -> IO Column) -> IO [Column])
-> (Page -> IO Column) -> IO [Column]
forall a b. (a -> b) -> a -> b
$ \Page
page -> do
        let bs0 :: ByteString
bs0 = Page -> ByteString
pageBytes Page
page
        case PageHeader -> PageTypeHeader
pageTypeHeader (Page -> PageHeader
pageHeader Page
page) of
            DataPageHeader{Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderNumValues :: Int32
dataPageHeaderEncoding :: ParquetEncoding
definitionLevelEncoding :: ParquetEncoding
repetitionLevelEncoding :: ParquetEncoding
dataPageHeaderStatistics :: ColumnStatistics
dataPageHeaderStatistics :: PageTypeHeader -> ColumnStatistics
repetitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
definitionLevelEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderEncoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderNumValues :: PageTypeHeader -> Int32
..} -> do
                let n :: Int
n = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
dataPageHeaderNumValues
                    ([Int]
defLvls, [Int]
repLvls, ByteString
afterLvls) = Int -> Int -> Int -> ByteString -> ([Int], [Int], ByteString)
readLevelsV1 Int
n Int
maxDef Int
maxRep ByteString
bs0
                    nPresent :: Int
nPresent = [Int] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ((Int -> Bool) -> [Int] -> [Int]
forall a. (a -> Bool) -> [a] -> [a]
filter (Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
maxDef) [Int]
defLvls)
                Maybe DictVals
-> (Int, Int)
-> ParquetType
-> Maybe Int32
-> ParquetEncoding
-> [Int]
-> [Int]
-> Int
-> ByteString
-> [Char]
-> IO Column
decodePageData
                    Maybe DictVals
dictValsM
                    (Int
maxDef, Int
maxRep)
                    ParquetType
pType
                    Maybe Int32
maybeTypeLength
                    ParquetEncoding
dataPageHeaderEncoding
                    [Int]
defLvls
                    [Int]
repLvls
                    Int
nPresent
                    ByteString
afterLvls
                    [Char]
"v1"
            DataPageHeaderV2{Bool
Int32
ColumnStatistics
ParquetEncoding
dataPageHeaderV2NumValues :: Int32
dataPageHeaderV2NumNulls :: Int32
dataPageHeaderV2NumRows :: Int32
dataPageHeaderV2Encoding :: ParquetEncoding
definitionLevelByteLength :: Int32
repetitionLevelByteLength :: Int32
dataPageHeaderV2IsCompressed :: Bool
dataPageHeaderV2Statistics :: ColumnStatistics
dataPageHeaderV2Statistics :: PageTypeHeader -> ColumnStatistics
dataPageHeaderV2IsCompressed :: PageTypeHeader -> Bool
repetitionLevelByteLength :: PageTypeHeader -> Int32
definitionLevelByteLength :: PageTypeHeader -> Int32
dataPageHeaderV2Encoding :: PageTypeHeader -> ParquetEncoding
dataPageHeaderV2NumRows :: PageTypeHeader -> Int32
dataPageHeaderV2NumNulls :: PageTypeHeader -> Int32
dataPageHeaderV2NumValues :: PageTypeHeader -> Int32
..} -> do
                let n :: Int
n = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
dataPageHeaderV2NumValues
                    ([Int]
defLvls, [Int]
repLvls, ByteString
afterLvls) =
                        Int
-> Int
-> Int
-> Int32
-> Int32
-> ByteString
-> ([Int], [Int], ByteString)
readLevelsV2
                            Int
n
                            Int
maxDef
                            Int
maxRep
                            Int32
definitionLevelByteLength
                            Int32
repetitionLevelByteLength
                            ByteString
bs0
                    nPresent :: Int
nPresent
                        | Int32
dataPageHeaderV2NumNulls Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
> Int32
0 =
                            Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32
dataPageHeaderV2NumValues Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
- Int32
dataPageHeaderV2NumNulls)
                        | Bool
otherwise = [Int] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ((Int -> Bool) -> [Int] -> [Int]
forall a. (a -> Bool) -> [a] -> [a]
filter (Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
maxDef) [Int]
defLvls)
                Maybe DictVals
-> (Int, Int)
-> ParquetType
-> Maybe Int32
-> ParquetEncoding
-> [Int]
-> [Int]
-> Int
-> ByteString
-> [Char]
-> IO Column
decodePageData
                    Maybe DictVals
dictValsM
                    (Int
maxDef, Int
maxRep)
                    ParquetType
pType
                    Maybe Int32
maybeTypeLength
                    ParquetEncoding
dataPageHeaderV2Encoding
                    [Int]
defLvls
                    [Int]
repLvls
                    Int
nPresent
                    ByteString
afterLvls
                    [Char]
"v2"

            -- Cannot happen as these are filtered out by isDataPage above
            DictionaryPageHeader{} -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"processColumnPages: impossible DictionaryPageHeader"
            PageTypeHeader
INDEX_PAGE_HEADER -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"processColumnPages: impossible INDEX_PAGE_HEADER"
            PageTypeHeader
PAGE_TYPE_HEADER_UNKNOWN -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"processColumnPages: impossible PAGE_TYPE_HEADER_UNKNOWN"
    Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$ [Column] -> Column
DI.concatManyColumns [Column]
cols

decodePageData ::
    Maybe DictVals ->
    (Int, Int) ->
    ParquetType ->
    Maybe Int32 ->
    ParquetEncoding ->
    [Int] ->
    [Int] ->
    Int ->
    BSO.ByteString ->
    String ->
    IO DI.Column
decodePageData :: Maybe DictVals
-> (Int, Int)
-> ParquetType
-> Maybe Int32
-> ParquetEncoding
-> [Int]
-> [Int]
-> Int
-> ByteString
-> [Char]
-> IO Column
decodePageData Maybe DictVals
dictValsM (Int
maxDef, Int
maxRep) ParquetType
pType Maybe Int32
maybeTypeLength ParquetEncoding
encoding [Int]
defLvls [Int]
repLvls Int
nPresent ByteString
afterLvls [Char]
versionLabel =
    case ParquetEncoding
encoding of
        ParquetEncoding
EPLAIN ->
            case ParquetType
pType of
                ParquetType
PBOOLEAN ->
                    let ([Bool]
vals, ByteString
_) = Int -> ByteString -> ([Bool], ByteString)
readNBool Int
nPresent ByteString
afterLvls
                     in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
                            if Int
maxRep Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
                                then Int -> Int -> [Int] -> [Int] -> [Bool] -> Column
stitchForRepBool Int
maxRep Int
maxDef [Int]
repLvls [Int]
defLvls [Bool]
vals
                                else Int -> [Int] -> [Bool] -> Column
toMaybeBool Int
maxDef [Int]
defLvls [Bool]
vals
                ParquetType
PINT32
                    | Int
maxDef Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
                    , Int
maxRep Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 ->
                        Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$ Vector Int32 -> Column
forall a. (Columnable a, Unbox a) => Vector a -> Column
DI.fromUnboxedVector (Int -> ByteString -> Vector Int32
readNInt32Vec Int
nPresent ByteString
afterLvls)
                ParquetType
PINT32 ->
                    let ([Int32]
vals, ByteString
_) = Int -> ByteString -> ([Int32], ByteString)
readNInt32 Int
nPresent ByteString
afterLvls
                     in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
                            if Int
maxRep Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
                                then Int -> Int -> [Int] -> [Int] -> [Int32] -> Column
stitchForRepInt32 Int
maxRep Int
maxDef [Int]
repLvls [Int]
defLvls [Int32]
vals
                                else Int -> [Int] -> [Int32] -> Column
toMaybeInt32 Int
maxDef [Int]
defLvls [Int32]
vals
                ParquetType
PINT64
                    | Int
maxDef Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
                    , Int
maxRep Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 ->
                        Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$ Vector Int64 -> Column
forall a. (Columnable a, Unbox a) => Vector a -> Column
DI.fromUnboxedVector (Int -> ByteString -> Vector Int64
readNInt64Vec Int
nPresent ByteString
afterLvls)
                ParquetType
PINT64 ->
                    let ([Int64]
vals, ByteString
_) = Int -> ByteString -> ([Int64], ByteString)
readNInt64 Int
nPresent ByteString
afterLvls
                     in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
                            if Int
maxRep Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
                                then Int -> Int -> [Int] -> [Int] -> [Int64] -> Column
stitchForRepInt64 Int
maxRep Int
maxDef [Int]
repLvls [Int]
defLvls [Int64]
vals
                                else Int -> [Int] -> [Int64] -> Column
toMaybeInt64 Int
maxDef [Int]
defLvls [Int64]
vals
                ParquetType
PINT96 ->
                    let ([UTCTime]
vals, ByteString
_) = Int -> ByteString -> ([UTCTime], ByteString)
readNInt96Times Int
nPresent ByteString
afterLvls
                     in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
                            if Int
maxRep Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
                                then Int -> Int -> [Int] -> [Int] -> [UTCTime] -> Column
stitchForRepUTCTime Int
maxRep Int
maxDef [Int]
repLvls [Int]
defLvls [UTCTime]
vals
                                else Int -> [Int] -> [UTCTime] -> Column
toMaybeUTCTime Int
maxDef [Int]
defLvls [UTCTime]
vals
                ParquetType
PFLOAT
                    | Int
maxDef Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
                    , Int
maxRep Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 ->
                        Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$ Vector Float -> Column
forall a. (Columnable a, Unbox a) => Vector a -> Column
DI.fromUnboxedVector (Int -> ByteString -> Vector Float
readNFloatVec Int
nPresent ByteString
afterLvls)
                ParquetType
PFLOAT ->
                    let ([Float]
vals, ByteString
_) = Int -> ByteString -> ([Float], ByteString)
readNFloat Int
nPresent ByteString
afterLvls
                     in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
                            if Int
maxRep Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
                                then Int -> Int -> [Int] -> [Int] -> [Float] -> Column
stitchForRepFloat Int
maxRep Int
maxDef [Int]
repLvls [Int]
defLvls [Float]
vals
                                else Int -> [Int] -> [Float] -> Column
toMaybeFloat Int
maxDef [Int]
defLvls [Float]
vals
                ParquetType
PDOUBLE
                    | Int
maxDef Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
                    , Int
maxRep Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 ->
                        Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$ Vector Double -> Column
forall a. (Columnable a, Unbox a) => Vector a -> Column
DI.fromUnboxedVector (Int -> ByteString -> Vector Double
readNDoubleVec Int
nPresent ByteString
afterLvls)
                ParquetType
PDOUBLE ->
                    let ([Double]
vals, ByteString
_) = Int -> ByteString -> ([Double], ByteString)
readNDouble Int
nPresent ByteString
afterLvls
                     in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
                            if Int
maxRep Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
                                then Int -> Int -> [Int] -> [Int] -> [Double] -> Column
stitchForRepDouble Int
maxRep Int
maxDef [Int]
repLvls [Int]
defLvls [Double]
vals
                                else Int -> [Int] -> [Double] -> Column
toMaybeDouble Int
maxDef [Int]
defLvls [Double]
vals
                ParquetType
PBYTE_ARRAY ->
                    let ([ByteString]
raws, ByteString
_) = Int -> ByteString -> ([ByteString], ByteString)
readNByteArrays Int
nPresent ByteString
afterLvls
                        texts :: [Text]
texts = (ByteString -> Text) -> [ByteString] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map ByteString -> Text
decodeUtf8Lenient [ByteString]
raws
                     in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
                            if Int
maxRep Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
                                then Int -> Int -> [Int] -> [Int] -> [Text] -> Column
stitchForRepText Int
maxRep Int
maxDef [Int]
repLvls [Int]
defLvls [Text]
texts
                                else Int -> [Int] -> [Text] -> Column
toMaybeText Int
maxDef [Int]
defLvls [Text]
texts
                ParquetType
PFIXED_LEN_BYTE_ARRAY ->
                    case Maybe Int32
maybeTypeLength of
                        Just Int32
len ->
                            let ([ByteString]
raws, ByteString
_) = Int -> Int -> ByteString -> ([ByteString], ByteString)
splitFixed Int
nPresent (Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
len) ByteString
afterLvls
                                texts :: [Text]
texts = (ByteString -> Text) -> [ByteString] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map ByteString -> Text
decodeUtf8Lenient [ByteString]
raws
                             in Column -> IO Column
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Column -> IO Column) -> Column -> IO Column
forall a b. (a -> b) -> a -> b
$
                                    if Int
maxRep Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
                                        then Int -> Int -> [Int] -> [Int] -> [Text] -> Column
stitchForRepText Int
maxRep Int
maxDef [Int]
repLvls [Int]
defLvls [Text]
texts
                                        else Int -> [Int] -> [Text] -> Column
toMaybeText Int
maxDef [Int]
defLvls [Text]
texts
                        Maybe Int32
Nothing -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"FIXED_LEN_BYTE_ARRAY requires type length"
                ParquetType
PARQUET_TYPE_UNKNOWN -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error [Char]
"Cannot read unknown Parquet type"
        ParquetEncoding
ERLE_DICTIONARY -> Maybe DictVals
-> Int -> Int -> [Int] -> [Int] -> Int -> ByteString -> IO Column
decodeDictV1 Maybe DictVals
dictValsM Int
maxDef Int
maxRep [Int]
repLvls [Int]
defLvls Int
nPresent ByteString
afterLvls
        ParquetEncoding
EPLAIN_DICTIONARY -> Maybe DictVals
-> Int -> Int -> [Int] -> [Int] -> Int -> ByteString -> IO Column
decodeDictV1 Maybe DictVals
dictValsM Int
maxDef Int
maxRep [Int]
repLvls [Int]
defLvls Int
nPresent ByteString
afterLvls
        ParquetEncoding
other -> [Char] -> IO Column
forall a. HasCallStack => [Char] -> a
error ([Char]
"Unsupported " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
versionLabel [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" encoding: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ ParquetEncoding -> [Char]
forall a. Show a => a -> [Char]
show ParquetEncoding
other)

-- Logical type conversion -------------------------------------------------

applyLogicalType :: LogicalType -> DI.Column -> DI.Column
applyLogicalType :: LogicalType -> Column -> Column
applyLogicalType (TimestampType Bool
_ TimeUnit
unit) Column
col =
    Column -> Either DataFrameException Column -> Column
forall b a. b -> Either a b -> b
fromRight Column
col (Either DataFrameException Column -> Column)
-> Either DataFrameException Column -> Column
forall a b. (a -> b) -> a -> b
$
        (Int64 -> UTCTime) -> Column -> Either DataFrameException Column
forall b c.
(Columnable b, Columnable c) =>
(b -> c) -> Column -> Either DataFrameException Column
DI.mapColumn
            (Int64 -> UTCTime
microsecondsToUTCTime (Int64 -> UTCTime) -> (Int64 -> Int64) -> Int64 -> UTCTime
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* (Int64
1_000_000 Int64 -> Int64 -> Int64
forall a. Integral a => a -> a -> a
`div` TimeUnit -> Int64
unitDivisor TimeUnit
unit)))
            Column
col
applyLogicalType (DecimalType Int32
precision Int32
scale) Column
col
    | Int32
precision Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
<= Int32
9 = case forall a (v :: * -> *).
(Vector v a, Columnable a) =>
Column -> Either DataFrameException (v a)
DI.toVector @Int32 @VU.Vector Column
col of
        Right Vector Int32
xs ->
            Vector Double -> Column
forall a. (Columnable a, Unbox a) => Vector a -> Column
DI.fromUnboxedVector (Vector Double -> Column) -> Vector Double -> Column
forall a b. (a -> b) -> a -> b
$
                (Int32 -> Double) -> Vector Int32 -> Vector Double
forall a b. (Unbox a, Unbox b) => (a -> b) -> Vector a -> Vector b
VU.map (\Int32
raw -> forall a b. (Integral a, Num b) => a -> b
fromIntegral @Int32 @Double Int32
raw Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
10 Double -> Int32 -> Double
forall a b. (Num a, Integral b) => a -> b -> a
^ Int32
scale) Vector Int32
xs
        Left DataFrameException
_ -> Column
col
    | Int32
precision Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
<= Int32
18 = case forall a (v :: * -> *).
(Vector v a, Columnable a) =>
Column -> Either DataFrameException (v a)
DI.toVector @Int64 @VU.Vector Column
col of
        Right Vector Int64
xs ->
            Vector Double -> Column
forall a. (Columnable a, Unbox a) => Vector a -> Column
DI.fromUnboxedVector (Vector Double -> Column) -> Vector Double -> Column
forall a b. (a -> b) -> a -> b
$
                (Int64 -> Double) -> Vector Int64 -> Vector Double
forall a b. (Unbox a, Unbox b) => (a -> b) -> Vector a -> Vector b
VU.map (\Int64
raw -> forall a b. (Integral a, Num b) => a -> b
fromIntegral @Int64 @Double Int64
raw Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
10 Double -> Int32 -> Double
forall a b. (Num a, Integral b) => a -> b -> a
^ Int32
scale) Vector Int64
xs
        Left DataFrameException
_ -> Column
col
    | Bool
otherwise = Column
col
applyLogicalType LogicalType
_ Column
col = Column
col

microsecondsToUTCTime :: Int64 -> UTCTime
microsecondsToUTCTime :: Int64 -> UTCTime
microsecondsToUTCTime Int64
us =
    POSIXTime -> UTCTime
posixSecondsToUTCTime (Int64 -> POSIXTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
us POSIXTime -> POSIXTime -> POSIXTime
forall a. Fractional a => a -> a -> a
/ POSIXTime
1_000_000)

unitDivisor :: TimeUnit -> Int64
unitDivisor :: TimeUnit -> Int64
unitDivisor TimeUnit
MILLISECONDS = Int64
1_000
unitDivisor TimeUnit
MICROSECONDS = Int64
1_000_000
unitDivisor TimeUnit
NANOSECONDS = Int64
1_000_000_000
unitDivisor TimeUnit
TIME_UNIT_UNKNOWN = Int64
1

applyScale :: Int32 -> Int32 -> Double
applyScale :: Int32 -> Int32 -> Double
applyScale Int32
scale Int32
rawValue =
    Int32 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
rawValue Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ (Double
10 Double -> Int32 -> Double
forall a b. (Num a, Integral b) => a -> b -> a
^ Int32
scale)

-- HuggingFace support -----------------------------------------------------

data HFRef = HFRef
    { HFRef -> Text
hfOwner :: T.Text
    , HFRef -> Text
hfDataset :: T.Text
    , HFRef -> Text
hfGlob :: T.Text
    }

data HFParquetFile = HFParquetFile
    { HFParquetFile -> Text
hfpUrl :: T.Text
    , HFParquetFile -> Text
hfpConfig :: T.Text
    , HFParquetFile -> Text
hfpSplit :: T.Text
    , HFParquetFile -> Text
hfpFilename :: T.Text
    }
    deriving (Int -> HFParquetFile -> ShowS
[HFParquetFile] -> ShowS
HFParquetFile -> [Char]
(Int -> HFParquetFile -> ShowS)
-> (HFParquetFile -> [Char])
-> ([HFParquetFile] -> ShowS)
-> Show HFParquetFile
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> HFParquetFile -> ShowS
showsPrec :: Int -> HFParquetFile -> ShowS
$cshow :: HFParquetFile -> [Char]
show :: HFParquetFile -> [Char]
$cshowList :: [HFParquetFile] -> ShowS
showList :: [HFParquetFile] -> ShowS
Show)

instance FromJSON HFParquetFile where
    parseJSON :: Value -> Parser HFParquetFile
parseJSON = [Char]
-> (Object -> Parser HFParquetFile)
-> Value
-> Parser HFParquetFile
forall a. [Char] -> (Object -> Parser a) -> Value -> Parser a
withObject [Char]
"HFParquetFile" ((Object -> Parser HFParquetFile) -> Value -> Parser HFParquetFile)
-> (Object -> Parser HFParquetFile)
-> Value
-> Parser HFParquetFile
forall a b. (a -> b) -> a -> b
$ \Object
o ->
        Text -> Text -> Text -> Text -> HFParquetFile
HFParquetFile
            (Text -> Text -> Text -> Text -> HFParquetFile)
-> Parser Text -> Parser (Text -> Text -> Text -> HFParquetFile)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
o Object -> Key -> Parser Text
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"url"
            Parser (Text -> Text -> Text -> HFParquetFile)
-> Parser Text -> Parser (Text -> Text -> HFParquetFile)
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
o Object -> Key -> Parser Text
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"config"
            Parser (Text -> Text -> HFParquetFile)
-> Parser Text -> Parser (Text -> HFParquetFile)
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
o Object -> Key -> Parser Text
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"split"
            Parser (Text -> HFParquetFile)
-> Parser Text -> Parser HFParquetFile
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
o Object -> Key -> Parser Text
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"filename"

newtype HFParquetResponse = HFParquetResponse {HFParquetResponse -> [HFParquetFile]
hfParquetFiles :: [HFParquetFile]}

instance FromJSON HFParquetResponse where
    parseJSON :: Value -> Parser HFParquetResponse
parseJSON = [Char]
-> (Object -> Parser HFParquetResponse)
-> Value
-> Parser HFParquetResponse
forall a. [Char] -> (Object -> Parser a) -> Value -> Parser a
withObject [Char]
"HFParquetResponse" ((Object -> Parser HFParquetResponse)
 -> Value -> Parser HFParquetResponse)
-> (Object -> Parser HFParquetResponse)
-> Value
-> Parser HFParquetResponse
forall a b. (a -> b) -> a -> b
$ \Object
o ->
        [HFParquetFile] -> HFParquetResponse
HFParquetResponse ([HFParquetFile] -> HFParquetResponse)
-> Parser [HFParquetFile] -> Parser HFParquetResponse
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
o Object -> Key -> Parser [HFParquetFile]
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"parquet_files"

isHFUri :: FilePath -> Bool
isHFUri :: [Char] -> Bool
isHFUri = [Char] -> [Char] -> Bool
forall a. Eq a => [a] -> [a] -> Bool
L.isPrefixOf [Char]
"hf://"

parseHFUri :: FilePath -> Either String HFRef
parseHFUri :: [Char] -> Either [Char] HFRef
parseHFUri [Char]
path =
    let stripped :: [Char]
stripped = Int -> ShowS
forall a. Int -> [a] -> [a]
drop ([Char] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ([Char]
"hf://datasets/" :: String)) [Char]
path
     in case HasCallStack => Text -> Text -> [Text]
Text -> Text -> [Text]
T.splitOn Text
"/" ([Char] -> Text
T.pack [Char]
stripped) of
            (Text
owner : Text
dataset : [Text]
rest)
                | Bool -> Bool
not ([Text] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Text]
rest) ->
                    HFRef -> Either [Char] HFRef
forall a b. b -> Either a b
Right (HFRef -> Either [Char] HFRef) -> HFRef -> Either [Char] HFRef
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text -> HFRef
HFRef Text
owner Text
dataset (Text -> [Text] -> Text
T.intercalate Text
"/" [Text]
rest)
            [Text]
_ ->
                [Char] -> Either [Char] HFRef
forall a b. a -> Either a b
Left ([Char] -> Either [Char] HFRef) -> [Char] -> Either [Char] HFRef
forall a b. (a -> b) -> a -> b
$ [Char]
"Invalid hf:// URI (expected hf://datasets/owner/dataset/glob): " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
path

getHFToken :: IO (Maybe BSO.ByteString)
getHFToken :: IO (Maybe ByteString)
getHFToken = do
    Maybe [Char]
envToken <- [Char] -> IO (Maybe [Char])
lookupEnv [Char]
"HF_TOKEN"
    case Maybe [Char]
envToken of
        Just [Char]
t -> Maybe ByteString -> IO (Maybe ByteString)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (Text -> ByteString
encodeUtf8 ([Char] -> Text
T.pack [Char]
t)))
        Maybe [Char]
Nothing -> do
            [Char]
home <- IO [Char]
getHomeDirectory
            let tokenPath :: [Char]
tokenPath = [Char]
home [Char] -> ShowS
</> [Char]
".cache" [Char] -> ShowS
</> [Char]
"huggingface" [Char] -> ShowS
</> [Char]
"token"
            Either IOError ByteString
result <- IO ByteString -> IO (Either IOError ByteString)
forall e a. Exception e => IO a -> IO (Either e a)
try ([Char] -> IO ByteString
BSO.readFile [Char]
tokenPath) :: IO (Either IOError BSO.ByteString)
            case Either IOError ByteString
result of
                Right ByteString
bs -> Maybe ByteString -> IO (Maybe ByteString)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ((Word8 -> Bool) -> ByteString -> ByteString
BSO.takeWhile (Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
/= Word8
10) ByteString
bs))
                Left IOError
_ -> Maybe ByteString -> IO (Maybe ByteString)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ByteString
forall a. Maybe a
Nothing

{- | Extract the repo-relative path from a HuggingFace download URL.
URL format: https://huggingface.co/datasets/{owner}/{dataset}/resolve/{ref}/{path}
Returns the {path} portion (e.g. "data/train-00000-of-00001.parquet").
-}
hfUrlRepoPath :: HFParquetFile -> String
hfUrlRepoPath :: HFParquetFile -> [Char]
hfUrlRepoPath HFParquetFile
f =
    case HasCallStack => Text -> Text -> (Text, Text)
Text -> Text -> (Text, Text)
T.breakOn Text
"/resolve/" (HFParquetFile -> Text
hfpUrl HFParquetFile
f) of
        (Text
_, Text
rest)
            | Bool -> Bool
not (Text -> Bool
T.null Text
rest) ->
                -- Drop "/resolve/", then drop the ref component (up to and including "/")
                Text -> [Char]
T.unpack (Text -> [Char]) -> Text -> [Char]
forall a b. (a -> b) -> a -> b
$ Int -> Text -> Text
T.drop Int
1 (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ (Char -> Bool) -> Text -> Text
T.dropWhile (Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
/= Char
'/') (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ Int -> Text -> Text
T.drop (Text -> Int
T.length Text
"/resolve/") Text
rest
        (Text, Text)
_ ->
            Text -> [Char]
T.unpack (HFParquetFile -> Text
hfpConfig HFParquetFile
f) [Char] -> ShowS
</> Text -> [Char]
T.unpack (HFParquetFile -> Text
hfpSplit HFParquetFile
f) [Char] -> ShowS
</> Text -> [Char]
T.unpack (HFParquetFile -> Text
hfpFilename HFParquetFile
f)

matchesGlob :: T.Text -> HFParquetFile -> Bool
matchesGlob :: Text -> HFParquetFile -> Bool
matchesGlob Text
g HFParquetFile
f = Pattern -> [Char] -> Bool
match ([Char] -> Pattern
compile (Text -> [Char]
T.unpack Text
g)) (HFParquetFile -> [Char]
hfUrlRepoPath HFParquetFile
f)

resolveHFUrls :: Maybe BSO.ByteString -> HFRef -> IO [HFParquetFile]
resolveHFUrls :: Maybe ByteString -> HFRef -> IO [HFParquetFile]
resolveHFUrls Maybe ByteString
mToken HFRef
ref = do
    let dataset :: Text
dataset = HFRef -> Text
hfOwner HFRef
ref Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"/" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> HFRef -> Text
hfDataset HFRef
ref
    let apiUrl :: [Char]
apiUrl = [Char]
"https://datasets-server.huggingface.co/parquet?dataset=" [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Text -> [Char]
T.unpack Text
dataset
    Request
req0 <- [Char] -> IO Request
forall (m :: * -> *). MonadThrow m => [Char] -> m Request
parseRequest [Char]
apiUrl
    let req :: Request
req = case Maybe ByteString
mToken of
            Maybe ByteString
Nothing -> Request
req0
            Just ByteString
tok -> HeaderName -> [ByteString] -> Request -> Request
setRequestHeader HeaderName
"Authorization" [ByteString
"Bearer " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tok] Request
req0
    Response ByteString
resp <- Request -> IO (Response ByteString)
forall (m :: * -> *).
MonadIO m =>
Request -> m (Response ByteString)
httpBS Request
req
    let status :: Int
status = Response ByteString -> Int
forall a. Response a -> Int
getResponseStatusCode Response ByteString
resp
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
status Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
200) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
        IOError -> IO ()
forall a. IOError -> IO a
ioError (IOError -> IO ()) -> IOError -> IO ()
forall a b. (a -> b) -> a -> b
$
            [Char] -> IOError
userError ([Char] -> IOError) -> [Char] -> IOError
forall a b. (a -> b) -> a -> b
$
                [Char]
"HuggingFace API returned status "
                    [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
status
                    [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" for dataset "
                    [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Text -> [Char]
T.unpack Text
dataset
    case ByteString -> Either [Char] HFParquetResponse
forall a. FromJSON a => ByteString -> Either [Char] a
eitherDecodeStrict (Response ByteString -> ByteString
forall a. Response a -> a
getResponseBody Response ByteString
resp) of
        Left [Char]
err -> IOError -> IO [HFParquetFile]
forall a. IOError -> IO a
ioError (IOError -> IO [HFParquetFile]) -> IOError -> IO [HFParquetFile]
forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError ([Char] -> IOError) -> [Char] -> IOError
forall a b. (a -> b) -> a -> b
$ [Char]
"Failed to parse HF API response: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
err
        Right HFParquetResponse
hfResp -> [HFParquetFile] -> IO [HFParquetFile]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([HFParquetFile] -> IO [HFParquetFile])
-> [HFParquetFile] -> IO [HFParquetFile]
forall a b. (a -> b) -> a -> b
$ (HFParquetFile -> Bool) -> [HFParquetFile] -> [HFParquetFile]
forall a. (a -> Bool) -> [a] -> [a]
filter (Text -> HFParquetFile -> Bool
matchesGlob (HFRef -> Text
hfGlob HFRef
ref)) (HFParquetResponse -> [HFParquetFile]
hfParquetFiles HFParquetResponse
hfResp)

downloadHFFiles :: Maybe BSO.ByteString -> [HFParquetFile] -> IO [FilePath]
downloadHFFiles :: Maybe ByteString -> [HFParquetFile] -> IO [[Char]]
downloadHFFiles Maybe ByteString
mToken [HFParquetFile]
files = do
    [Char]
tmpDir <- IO [Char]
getTemporaryDirectory
    [HFParquetFile] -> (HFParquetFile -> IO [Char]) -> IO [[Char]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [HFParquetFile]
files ((HFParquetFile -> IO [Char]) -> IO [[Char]])
-> (HFParquetFile -> IO [Char]) -> IO [[Char]]
forall a b. (a -> b) -> a -> b
$ \HFParquetFile
f -> do
        -- Derive a collision-resistant temp name from the URL path components
        let fname :: [Char]
fname = case (HFParquetFile -> Text
hfpConfig HFParquetFile
f, HFParquetFile -> Text
hfpSplit HFParquetFile
f) of
                (Text
c, Text
s) | Text -> Bool
T.null Text
c Bool -> Bool -> Bool
&& Text -> Bool
T.null Text
s -> Text -> [Char]
T.unpack (HFParquetFile -> Text
hfpFilename HFParquetFile
f)
                (Text
c, Text
s) -> Text -> [Char]
T.unpack Text
c [Char] -> ShowS
forall a. Semigroup a => a -> a -> a
<> [Char]
"_" [Char] -> ShowS
forall a. Semigroup a => a -> a -> a
<> Text -> [Char]
T.unpack Text
s [Char] -> ShowS
forall a. Semigroup a => a -> a -> a
<> [Char]
"_" [Char] -> ShowS
forall a. Semigroup a => a -> a -> a
<> Text -> [Char]
T.unpack (HFParquetFile -> Text
hfpFilename HFParquetFile
f)
        let destPath :: [Char]
destPath = [Char]
tmpDir [Char] -> ShowS
</> [Char]
fname
        Request
req0 <- [Char] -> IO Request
forall (m :: * -> *). MonadThrow m => [Char] -> m Request
parseRequest (Text -> [Char]
T.unpack (HFParquetFile -> Text
hfpUrl HFParquetFile
f))
        let req :: Request
req = case Maybe ByteString
mToken of
                Maybe ByteString
Nothing -> Request
req0
                Just ByteString
tok -> HeaderName -> [ByteString] -> Request -> Request
setRequestHeader HeaderName
"Authorization" [ByteString
"Bearer " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tok] Request
req0
        Response ByteString
resp <- Request -> IO (Response ByteString)
forall (m :: * -> *).
MonadIO m =>
Request -> m (Response ByteString)
httpBS Request
req
        let status :: Int
status = Response ByteString -> Int
forall a. Response a -> Int
getResponseStatusCode Response ByteString
resp
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
status Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
200) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
            IOError -> IO ()
forall a. IOError -> IO a
ioError (IOError -> IO ()) -> IOError -> IO ()
forall a b. (a -> b) -> a -> b
$
                [Char] -> IOError
userError ([Char] -> IOError) -> [Char] -> IOError
forall a b. (a -> b) -> a -> b
$
                    [Char]
"Failed to download " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Text -> [Char]
T.unpack (HFParquetFile -> Text
hfpUrl HFParquetFile
f) [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" (HTTP " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
status [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
")"
        [Char] -> ByteString -> IO ()
BSO.writeFile [Char]
destPath (Response ByteString -> ByteString
forall a. Response a -> a
getResponseBody Response ByteString
resp)
        [Char] -> IO [Char]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Char]
destPath

-- | True when the path contains glob wildcard characters.
hasGlob :: T.Text -> Bool
hasGlob :: Text -> Bool
hasGlob = (Char -> Bool) -> Text -> Bool
T.any (\Char
c -> Char
c Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
'*' Bool -> Bool -> Bool
|| Char
c Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
'?' Bool -> Bool -> Bool
|| Char
c Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
'[')

{- | Build the direct HF repo download URL for a path with no wildcards.
Format: https://huggingface.co/datasets/{owner}/{dataset}/resolve/main/{path}
-}
directHFUrl :: HFRef -> T.Text
directHFUrl :: HFRef -> Text
directHFUrl HFRef
ref =
    Text
"https://huggingface.co/datasets/"
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> HFRef -> Text
hfOwner HFRef
ref
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"/"
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> HFRef -> Text
hfDataset HFRef
ref
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"/resolve/main/"
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> HFRef -> Text
hfGlob HFRef
ref

fetchHFParquetFiles :: FilePath -> IO [FilePath]
fetchHFParquetFiles :: [Char] -> IO [[Char]]
fetchHFParquetFiles [Char]
uri = do
    HFRef
ref <- case [Char] -> Either [Char] HFRef
parseHFUri [Char]
uri of
        Left [Char]
err -> IOError -> IO HFRef
forall a. IOError -> IO a
ioError ([Char] -> IOError
userError [Char]
err)
        Right HFRef
r -> HFRef -> IO HFRef
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure HFRef
r
    Maybe ByteString
mToken <- IO (Maybe ByteString)
getHFToken
    if Text -> Bool
hasGlob (HFRef -> Text
hfGlob HFRef
ref)
        then do
            [HFParquetFile]
hfFiles <- Maybe ByteString -> HFRef -> IO [HFParquetFile]
resolveHFUrls Maybe ByteString
mToken HFRef
ref
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ([HFParquetFile] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [HFParquetFile]
hfFiles) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
                IOError -> IO ()
forall a. IOError -> IO a
ioError (IOError -> IO ()) -> IOError -> IO ()
forall a b. (a -> b) -> a -> b
$
                    [Char] -> IOError
userError ([Char] -> IOError) -> [Char] -> IOError
forall a b. (a -> b) -> a -> b
$
                        [Char]
"No parquet files found for " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
uri
            Maybe ByteString -> [HFParquetFile] -> IO [[Char]]
downloadHFFiles Maybe ByteString
mToken [HFParquetFile]
hfFiles
        else do
            -- Direct repo file download — no datasets-server needed
            let url :: Text
url = HFRef -> Text
directHFUrl HFRef
ref
            let filename :: Text
filename = [Text] -> Text
forall a. HasCallStack => [a] -> a
last ([Text] -> Text) -> [Text] -> Text
forall a b. (a -> b) -> a -> b
$ HasCallStack => Text -> Text -> [Text]
Text -> Text -> [Text]
T.splitOn Text
"/" (HFRef -> Text
hfGlob HFRef
ref)
            Maybe ByteString -> [HFParquetFile] -> IO [[Char]]
downloadHFFiles Maybe ByteString
mToken [Text -> Text -> Text -> Text -> HFParquetFile
HFParquetFile Text
url Text
"" Text
"" Text
filename]