{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module DataFrame.Lazy.Internal.DataFrame where
import qualified Data.Text as T
import qualified DataFrame.Internal.Column as C
import qualified DataFrame.Internal.DataFrame as D
import qualified DataFrame.Internal.Expression as E
import DataFrame.Internal.Schema (Schema)
import DataFrame.Lazy.Internal.Executor (
ExecutorConfig (..),
defaultExecutorConfig,
execute,
)
import DataFrame.Lazy.Internal.LogicalPlan (
DataSource (..),
LogicalPlan (..),
SortOrder (..),
)
import qualified DataFrame.Lazy.Internal.Optimizer as Opt
import DataFrame.Operations.Join (JoinType)
data LazyDataFrame = LazyDataFrame
{ LazyDataFrame -> LogicalPlan
plan :: LogicalPlan
, LazyDataFrame -> Int
batchSize :: Int
}
instance Show LazyDataFrame where
show :: LazyDataFrame -> String
show LazyDataFrame
ldf =
String
"LazyDataFrame { batchSize = "
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> (Int -> String
forall a. Show a => a -> String
show (LazyDataFrame -> Int
batchSize LazyDataFrame
ldf) String -> ShowS
forall a. Semigroup a => a -> a -> a
<> (String
", plan = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> (LogicalPlan -> String
forall a. Show a => a -> String
show (LazyDataFrame -> LogicalPlan
plan LazyDataFrame
ldf) String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" }")))
runDataFrame :: LazyDataFrame -> IO D.DataFrame
runDataFrame :: LazyDataFrame -> IO DataFrame
runDataFrame LazyDataFrame
ldf = do
let physPlan :: PhysicalPlan
physPlan = Int -> LogicalPlan -> PhysicalPlan
Opt.optimize (LazyDataFrame -> Int
batchSize LazyDataFrame
ldf) (LazyDataFrame -> LogicalPlan
plan LazyDataFrame
ldf)
PhysicalPlan -> ExecutorConfig -> IO DataFrame
execute PhysicalPlan
physPlan ExecutorConfig
defaultExecutorConfig{defaultBatchSize = batchSize ldf}
fromDataFrame :: D.DataFrame -> LazyDataFrame
fromDataFrame :: DataFrame -> LazyDataFrame
fromDataFrame DataFrame
df = LazyDataFrame{plan :: LogicalPlan
plan = DataFrame -> LogicalPlan
SourceDF DataFrame
df, batchSize :: Int
batchSize = Int
1_000_000}
scanCsv :: Schema -> T.Text -> LazyDataFrame
scanCsv :: Schema -> Text -> LazyDataFrame
scanCsv Schema
schema Text
path =
LazyDataFrame
{ plan :: LogicalPlan
plan = DataSource -> Schema -> LogicalPlan
Scan (String -> Char -> DataSource
CsvSource (Text -> String
T.unpack Text
path) Char
',') Schema
schema
, batchSize :: Int
batchSize = Int
1_000_000
}
scanSeparated :: Char -> Schema -> T.Text -> LazyDataFrame
scanSeparated :: Char -> Schema -> Text -> LazyDataFrame
scanSeparated Char
sep Schema
schema Text
path =
LazyDataFrame
{ plan :: LogicalPlan
plan = DataSource -> Schema -> LogicalPlan
Scan (String -> Char -> DataSource
CsvSource (Text -> String
T.unpack Text
path) Char
sep) Schema
schema
, batchSize :: Int
batchSize = Int
1_000_000
}
scanParquet :: Schema -> T.Text -> LazyDataFrame
scanParquet :: Schema -> Text -> LazyDataFrame
scanParquet Schema
schema Text
path =
LazyDataFrame
{ plan :: LogicalPlan
plan = DataSource -> Schema -> LogicalPlan
Scan (String -> DataSource
ParquetSource (Text -> String
T.unpack Text
path)) Schema
schema
, batchSize :: Int
batchSize = Int
1_000_000
}
derive ::
(C.Columnable a) => T.Text -> E.Expr a -> LazyDataFrame -> LazyDataFrame
derive :: forall a.
Columnable a =>
Text -> Expr a -> LazyDataFrame -> LazyDataFrame
derive Text
name Expr a
expr LazyDataFrame
ldf =
LazyDataFrame
ldf{plan = Derive name (E.UExpr expr) (plan ldf)}
select :: [T.Text] -> LazyDataFrame -> LazyDataFrame
select :: [Text] -> LazyDataFrame -> LazyDataFrame
select [Text]
cols LazyDataFrame
ldf = LazyDataFrame
ldf{plan = Project cols (plan ldf)}
filter :: E.Expr Bool -> LazyDataFrame -> LazyDataFrame
filter :: Expr Bool -> LazyDataFrame -> LazyDataFrame
filter Expr Bool
cond LazyDataFrame
ldf = LazyDataFrame
ldf{plan = Filter cond (plan ldf)}
join ::
JoinType ->
T.Text ->
T.Text ->
LazyDataFrame ->
LazyDataFrame ->
LazyDataFrame
join :: JoinType
-> Text -> Text -> LazyDataFrame -> LazyDataFrame -> LazyDataFrame
join JoinType
jt Text
leftKey Text
rightKey LazyDataFrame
left LazyDataFrame
right =
LazyDataFrame
{ plan :: LogicalPlan
plan = JoinType
-> Text -> Text -> LogicalPlan -> LogicalPlan -> LogicalPlan
Join JoinType
jt Text
leftKey Text
rightKey (LazyDataFrame -> LogicalPlan
plan LazyDataFrame
left) (LazyDataFrame -> LogicalPlan
plan LazyDataFrame
right)
, batchSize :: Int
batchSize = LazyDataFrame -> Int
batchSize LazyDataFrame
left
}
groupBy ::
[T.Text] ->
[(T.Text, E.UExpr)] ->
LazyDataFrame ->
LazyDataFrame
groupBy :: [Text] -> [(Text, UExpr)] -> LazyDataFrame -> LazyDataFrame
groupBy [Text]
keys [(Text, UExpr)]
aggs LazyDataFrame
ldf = LazyDataFrame
ldf{plan = Aggregate keys aggs (plan ldf)}
sortBy :: [(T.Text, SortOrder)] -> LazyDataFrame -> LazyDataFrame
sortBy :: [(Text, SortOrder)] -> LazyDataFrame -> LazyDataFrame
sortBy [(Text, SortOrder)]
cols LazyDataFrame
ldf = LazyDataFrame
ldf{plan = Sort cols (plan ldf)}
limit :: Int -> LazyDataFrame -> LazyDataFrame
limit :: Int -> LazyDataFrame -> LazyDataFrame
limit Int
n LazyDataFrame
ldf = LazyDataFrame
ldf{plan = Limit n (plan ldf)}