{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}

module DataFrame.Lazy.Internal.DataFrame where

import qualified Data.Text as T
import qualified DataFrame.Internal.Column as C
import qualified DataFrame.Internal.DataFrame as D
import qualified DataFrame.Internal.Expression as E
import DataFrame.Internal.Schema (Schema)
import DataFrame.Lazy.Internal.Executor (
    ExecutorConfig (..),
    defaultExecutorConfig,
    execute,
 )
import DataFrame.Lazy.Internal.LogicalPlan (
    DataSource (..),
    LogicalPlan (..),
    SortOrder (..),
 )
import qualified DataFrame.Lazy.Internal.Optimizer as Opt
import DataFrame.Operations.Join (JoinType)

{- | A lazy query that has not been executed yet.

The query is represented as a 'LogicalPlan' tree; execution is deferred
until 'runDataFrame' is called.
-}
data LazyDataFrame = LazyDataFrame
    { LazyDataFrame -> LogicalPlan
plan :: LogicalPlan
    , LazyDataFrame -> Int
batchSize :: Int
    }

instance Show LazyDataFrame where
    show :: LazyDataFrame -> String
show LazyDataFrame
ldf =
        String
"LazyDataFrame { batchSize = "
            String -> ShowS
forall a. Semigroup a => a -> a -> a
<> (Int -> String
forall a. Show a => a -> String
show (LazyDataFrame -> Int
batchSize LazyDataFrame
ldf) String -> ShowS
forall a. Semigroup a => a -> a -> a
<> (String
", plan = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> (LogicalPlan -> String
forall a. Show a => a -> String
show (LazyDataFrame -> LogicalPlan
plan LazyDataFrame
ldf) String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" }")))

-- ---------------------------------------------------------------------------
-- Entry point
-- ---------------------------------------------------------------------------

{- | Execute the lazy query: optimise the logical plan, then stream-execute
the resulting physical plan, returning a fully-materialised 'D.DataFrame'.
-}
runDataFrame :: LazyDataFrame -> IO D.DataFrame
runDataFrame :: LazyDataFrame -> IO DataFrame
runDataFrame LazyDataFrame
ldf = do
    let physPlan :: PhysicalPlan
physPlan = Int -> LogicalPlan -> PhysicalPlan
Opt.optimize (LazyDataFrame -> Int
batchSize LazyDataFrame
ldf) (LazyDataFrame -> LogicalPlan
plan LazyDataFrame
ldf)
    PhysicalPlan -> ExecutorConfig -> IO DataFrame
execute PhysicalPlan
physPlan ExecutorConfig
defaultExecutorConfig{defaultBatchSize = batchSize ldf}

-- ---------------------------------------------------------------------------
-- Builders that construct the logical plan tree
-- ---------------------------------------------------------------------------

-- | Lift an already-loaded eager 'D.DataFrame' into the lazy plan.
fromDataFrame :: D.DataFrame -> LazyDataFrame
fromDataFrame :: DataFrame -> LazyDataFrame
fromDataFrame DataFrame
df = LazyDataFrame{plan :: LogicalPlan
plan = DataFrame -> LogicalPlan
SourceDF DataFrame
df, batchSize :: Int
batchSize = Int
1_000_000}

-- | Scan a CSV file with the default comma separator.
scanCsv :: Schema -> T.Text -> LazyDataFrame
scanCsv :: Schema -> Text -> LazyDataFrame
scanCsv Schema
schema Text
path =
    LazyDataFrame
        { plan :: LogicalPlan
plan = DataSource -> Schema -> LogicalPlan
Scan (String -> Char -> DataSource
CsvSource (Text -> String
T.unpack Text
path) Char
',') Schema
schema
        , batchSize :: Int
batchSize = Int
1_000_000
        }

-- | Scan a character-separated file.
scanSeparated :: Char -> Schema -> T.Text -> LazyDataFrame
scanSeparated :: Char -> Schema -> Text -> LazyDataFrame
scanSeparated Char
sep Schema
schema Text
path =
    LazyDataFrame
        { plan :: LogicalPlan
plan = DataSource -> Schema -> LogicalPlan
Scan (String -> Char -> DataSource
CsvSource (Text -> String
T.unpack Text
path) Char
sep) Schema
schema
        , batchSize :: Int
batchSize = Int
1_000_000
        }

-- | Scan a Parquet file, directory of files, or glob pattern.
scanParquet :: Schema -> T.Text -> LazyDataFrame
scanParquet :: Schema -> Text -> LazyDataFrame
scanParquet Schema
schema Text
path =
    LazyDataFrame
        { plan :: LogicalPlan
plan = DataSource -> Schema -> LogicalPlan
Scan (String -> DataSource
ParquetSource (Text -> String
T.unpack Text
path)) Schema
schema
        , batchSize :: Int
batchSize = Int
1_000_000
        }

-- | Add a computed column (or overwrite an existing one).
derive ::
    (C.Columnable a) => T.Text -> E.Expr a -> LazyDataFrame -> LazyDataFrame
derive :: forall a.
Columnable a =>
Text -> Expr a -> LazyDataFrame -> LazyDataFrame
derive Text
name Expr a
expr LazyDataFrame
ldf =
    LazyDataFrame
ldf{plan = Derive name (E.UExpr expr) (plan ldf)}

-- | Retain only the listed columns.
select :: [T.Text] -> LazyDataFrame -> LazyDataFrame
select :: [Text] -> LazyDataFrame -> LazyDataFrame
select [Text]
cols LazyDataFrame
ldf = LazyDataFrame
ldf{plan = Project cols (plan ldf)}

-- | Keep rows that satisfy the predicate.
filter :: E.Expr Bool -> LazyDataFrame -> LazyDataFrame
filter :: Expr Bool -> LazyDataFrame -> LazyDataFrame
filter Expr Bool
cond LazyDataFrame
ldf = LazyDataFrame
ldf{plan = Filter cond (plan ldf)}

-- | Join two lazy queries on the given key columns.
join ::
    JoinType ->
    -- | Left join key column name
    T.Text ->
    -- | Right join key column name
    T.Text ->
    -- | Left sub-query
    LazyDataFrame ->
    -- | Right sub-query
    LazyDataFrame ->
    LazyDataFrame
join :: JoinType
-> Text -> Text -> LazyDataFrame -> LazyDataFrame -> LazyDataFrame
join JoinType
jt Text
leftKey Text
rightKey LazyDataFrame
left LazyDataFrame
right =
    LazyDataFrame
        { plan :: LogicalPlan
plan = JoinType
-> Text -> Text -> LogicalPlan -> LogicalPlan -> LogicalPlan
Join JoinType
jt Text
leftKey Text
rightKey (LazyDataFrame -> LogicalPlan
plan LazyDataFrame
left) (LazyDataFrame -> LogicalPlan
plan LazyDataFrame
right)
        , batchSize :: Int
batchSize = LazyDataFrame -> Int
batchSize LazyDataFrame
left
        }

{- | Group by a set of columns and compute aggregate expressions.

Each aggregate expression should use an 'Agg' node (e.g. @sumOf@, @meanOf@).
-}
groupBy ::
    -- | Group-by key columns
    [T.Text] ->
    -- | @[(outputName, aggregateExpr)]@
    [(T.Text, E.UExpr)] ->
    LazyDataFrame ->
    LazyDataFrame
groupBy :: [Text] -> [(Text, UExpr)] -> LazyDataFrame -> LazyDataFrame
groupBy [Text]
keys [(Text, UExpr)]
aggs LazyDataFrame
ldf = LazyDataFrame
ldf{plan = Aggregate keys aggs (plan ldf)}

-- | Sort the result by the given @(column, direction)@ pairs.
sortBy :: [(T.Text, SortOrder)] -> LazyDataFrame -> LazyDataFrame
sortBy :: [(Text, SortOrder)] -> LazyDataFrame -> LazyDataFrame
sortBy [(Text, SortOrder)]
cols LazyDataFrame
ldf = LazyDataFrame
ldf{plan = Sort cols (plan ldf)}

-- | Retain at most @n@ rows.
limit :: Int -> LazyDataFrame -> LazyDataFrame
limit :: Int -> LazyDataFrame -> LazyDataFrame
limit Int
n LazyDataFrame
ldf = LazyDataFrame
ldf{plan = Limit n (plan ldf)}