{-# LANGUAGE OverloadedStrings #-}

module DataFrame.Lazy.Internal.Optimizer (optimize) where

import qualified Data.Map as M
import qualified Data.Set as S
import qualified Data.Text as T
import qualified DataFrame.Internal.Expression as E
import DataFrame.Internal.Schema (Schema (..), elements)
import DataFrame.Lazy.Internal.LogicalPlan
import DataFrame.Lazy.Internal.PhysicalPlan

{- | Optimise a logical plan and lower it to a physical plan.

Rules applied bottom-up (in order):
  1. Filter fusion       — merge consecutive Filter nodes into a conjunction
  2. Predicate pushdown  — move Filter past Derive/Project toward Scan
  3. Dead column elim    — drop Derive nodes whose output is never referenced

After rule application @toPhysical@ selects concrete operators.
-}
optimize :: Int -> LogicalPlan -> PhysicalPlan
optimize :: Int -> LogicalPlan -> PhysicalPlan
optimize Int
batchSz =
    Int -> LogicalPlan -> PhysicalPlan
toPhysical Int
batchSz
        (LogicalPlan -> PhysicalPlan)
-> (LogicalPlan -> LogicalPlan) -> LogicalPlan -> PhysicalPlan
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LogicalPlan -> LogicalPlan
eliminateDeadColumns
        (LogicalPlan -> LogicalPlan)
-> (LogicalPlan -> LogicalPlan) -> LogicalPlan -> LogicalPlan
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LogicalPlan -> LogicalPlan
pushPredicates
        (LogicalPlan -> LogicalPlan)
-> (LogicalPlan -> LogicalPlan) -> LogicalPlan -> LogicalPlan
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LogicalPlan -> LogicalPlan
fuseFilters

-- ---------------------------------------------------------------------------
-- Rule 1: Filter fusion
-- ---------------------------------------------------------------------------

-- | Merge @Filter p1 (Filter p2 child)@ into @Filter (p1 && p2) child@.
fuseFilters :: LogicalPlan -> LogicalPlan
fuseFilters :: LogicalPlan -> LogicalPlan
fuseFilters (Filter Expr Bool
p1 (Filter Expr Bool
p2 LogicalPlan
child)) =
    LogicalPlan -> LogicalPlan
fuseFilters (Expr Bool -> LogicalPlan -> LogicalPlan
Filter (Expr Bool -> Expr Bool -> Expr Bool
andExpr Expr Bool
p1 Expr Bool
p2) (LogicalPlan -> LogicalPlan
fuseFilters LogicalPlan
child))
fuseFilters (Filter Expr Bool
p LogicalPlan
child) = Expr Bool -> LogicalPlan -> LogicalPlan
Filter Expr Bool
p (LogicalPlan -> LogicalPlan
fuseFilters LogicalPlan
child)
fuseFilters (Project [Text]
cols LogicalPlan
child) = [Text] -> LogicalPlan -> LogicalPlan
Project [Text]
cols (LogicalPlan -> LogicalPlan
fuseFilters LogicalPlan
child)
fuseFilters (Derive Text
name UExpr
expr LogicalPlan
child) = Text -> UExpr -> LogicalPlan -> LogicalPlan
Derive Text
name UExpr
expr (LogicalPlan -> LogicalPlan
fuseFilters LogicalPlan
child)
fuseFilters (Join JoinType
jt Text
l Text
r LogicalPlan
left LogicalPlan
right) =
    JoinType
-> Text -> Text -> LogicalPlan -> LogicalPlan -> LogicalPlan
Join JoinType
jt Text
l Text
r (LogicalPlan -> LogicalPlan
fuseFilters LogicalPlan
left) (LogicalPlan -> LogicalPlan
fuseFilters LogicalPlan
right)
fuseFilters (Aggregate [Text]
keys [(Text, UExpr)]
aggs LogicalPlan
child) =
    [Text] -> [(Text, UExpr)] -> LogicalPlan -> LogicalPlan
Aggregate [Text]
keys [(Text, UExpr)]
aggs (LogicalPlan -> LogicalPlan
fuseFilters LogicalPlan
child)
fuseFilters (Sort [(Text, SortOrder)]
cols LogicalPlan
child) = [(Text, SortOrder)] -> LogicalPlan -> LogicalPlan
Sort [(Text, SortOrder)]
cols (LogicalPlan -> LogicalPlan
fuseFilters LogicalPlan
child)
fuseFilters (Limit Int
n LogicalPlan
child) = Int -> LogicalPlan -> LogicalPlan
Limit Int
n (LogicalPlan -> LogicalPlan
fuseFilters LogicalPlan
child)
fuseFilters LogicalPlan
leaf = LogicalPlan
leaf

-- | Logical AND of two @Bool@ expressions.
andExpr :: E.Expr Bool -> E.Expr Bool -> E.Expr Bool
andExpr :: Expr Bool -> Expr Bool -> Expr Bool
andExpr =
    BinaryOp Bool Bool Bool -> Expr Bool -> Expr Bool -> Expr Bool
forall c b a.
(Columnable c, Columnable b, Columnable a) =>
BinaryOp c b a -> Expr c -> Expr b -> Expr a
E.Binary
        ( E.MkBinaryOp
            { binaryFn :: Bool -> Bool -> Bool
E.binaryFn = Bool -> Bool -> Bool
(&&)
            , binaryName :: Text
E.binaryName = Text
"and"
            , binarySymbol :: Maybe Text
E.binarySymbol = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
"&&"
            , binaryCommutative :: Bool
E.binaryCommutative = Bool
True
            , binaryPrecedence :: Int
E.binaryPrecedence = Int
3
            }
        )

-- ---------------------------------------------------------------------------
-- Rule 2: Predicate pushdown
-- ---------------------------------------------------------------------------

{- | Push Filter nodes as close to the Scan as possible.

* Past a @Derive@ when the predicate doesn't reference the derived column.
* Past a @Project@ when all predicate columns are in the projected set.
* Into @ScanConfig.scanPushdownPredicate@ when the child is a @Scan@.
-}
pushPredicates :: LogicalPlan -> LogicalPlan
pushPredicates :: LogicalPlan -> LogicalPlan
pushPredicates (Filter Expr Bool
p (Derive Text
name UExpr
expr LogicalPlan
child))
    | Text
name Text -> [Text] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`notElem` Expr Bool -> [Text]
forall a. Expr a -> [Text]
E.getColumns Expr Bool
p =
        Text -> UExpr -> LogicalPlan -> LogicalPlan
Derive Text
name UExpr
expr (LogicalPlan -> LogicalPlan
pushPredicates (Expr Bool -> LogicalPlan -> LogicalPlan
Filter Expr Bool
p LogicalPlan
child))
    | Bool
otherwise =
        Expr Bool -> LogicalPlan -> LogicalPlan
Filter Expr Bool
p (Text -> UExpr -> LogicalPlan -> LogicalPlan
Derive Text
name UExpr
expr (LogicalPlan -> LogicalPlan
pushPredicates LogicalPlan
child))
pushPredicates (Filter Expr Bool
p (Project [Text]
cols LogicalPlan
child))
    | (Text -> Bool) -> [Text] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all (Text -> [Text] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Text]
cols) (Expr Bool -> [Text]
forall a. Expr a -> [Text]
E.getColumns Expr Bool
p) =
        [Text] -> LogicalPlan -> LogicalPlan
Project [Text]
cols (LogicalPlan -> LogicalPlan
pushPredicates (Expr Bool -> LogicalPlan -> LogicalPlan
Filter Expr Bool
p LogicalPlan
child))
    | Bool
otherwise =
        Expr Bool -> LogicalPlan -> LogicalPlan
Filter Expr Bool
p ([Text] -> LogicalPlan -> LogicalPlan
Project [Text]
cols (LogicalPlan -> LogicalPlan
pushPredicates LogicalPlan
child))
pushPredicates (Filter Expr Bool
p LogicalPlan
child) = Expr Bool -> LogicalPlan -> LogicalPlan
Filter Expr Bool
p (LogicalPlan -> LogicalPlan
pushPredicates LogicalPlan
child)
pushPredicates (Project [Text]
cols LogicalPlan
child) = [Text] -> LogicalPlan -> LogicalPlan
Project [Text]
cols (LogicalPlan -> LogicalPlan
pushPredicates LogicalPlan
child)
pushPredicates (Derive Text
name UExpr
expr LogicalPlan
child) = Text -> UExpr -> LogicalPlan -> LogicalPlan
Derive Text
name UExpr
expr (LogicalPlan -> LogicalPlan
pushPredicates LogicalPlan
child)
pushPredicates (Join JoinType
jt Text
l Text
r LogicalPlan
left LogicalPlan
right) =
    JoinType
-> Text -> Text -> LogicalPlan -> LogicalPlan -> LogicalPlan
Join JoinType
jt Text
l Text
r (LogicalPlan -> LogicalPlan
pushPredicates LogicalPlan
left) (LogicalPlan -> LogicalPlan
pushPredicates LogicalPlan
right)
pushPredicates (Aggregate [Text]
keys [(Text, UExpr)]
aggs LogicalPlan
child) =
    [Text] -> [(Text, UExpr)] -> LogicalPlan -> LogicalPlan
Aggregate [Text]
keys [(Text, UExpr)]
aggs (LogicalPlan -> LogicalPlan
pushPredicates LogicalPlan
child)
pushPredicates (Sort [(Text, SortOrder)]
cols LogicalPlan
child) = [(Text, SortOrder)] -> LogicalPlan -> LogicalPlan
Sort [(Text, SortOrder)]
cols (LogicalPlan -> LogicalPlan
pushPredicates LogicalPlan
child)
pushPredicates (Limit Int
n LogicalPlan
child) = Int -> LogicalPlan -> LogicalPlan
Limit Int
n (LogicalPlan -> LogicalPlan
pushPredicates LogicalPlan
child)
pushPredicates LogicalPlan
leaf = LogicalPlan
leaf

-- ---------------------------------------------------------------------------
-- Rule 3: Dead column elimination
-- ---------------------------------------------------------------------------

{- | Collect every column name that is explicitly referenced somewhere in the
plan (in filter predicates, sort keys, aggregate keys, projection lists,
join keys, and derived expressions).  Returns Nothing when "all columns
are needed" (i.e. no Project restricts the output).
-}
referencedCols :: LogicalPlan -> Maybe (S.Set T.Text)
referencedCols :: LogicalPlan -> Maybe (Set Text)
referencedCols (Scan DataSource
_ Schema
schema) = Set Text -> Maybe (Set Text)
forall a. a -> Maybe a
Just ([Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList (Map Text SchemaType -> [Text]
forall k a. Map k a -> [k]
M.keys (Schema -> Map Text SchemaType
elements Schema
schema)))
referencedCols (Project [Text]
cols LogicalPlan
_) = Set Text -> Maybe (Set Text)
forall a. a -> Maybe a
Just ([Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList [Text]
cols)
referencedCols (Filter Expr Bool
p LogicalPlan
child) =
    (Set Text -> Set Text) -> Maybe (Set Text) -> Maybe (Set Text)
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Set Text -> Set Text -> Set Text
forall a. Ord a => Set a -> Set a -> Set a
S.union ([Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList (Expr Bool -> [Text]
forall a. Expr a -> [Text]
E.getColumns Expr Bool
p))) (LogicalPlan -> Maybe (Set Text)
referencedCols LogicalPlan
child)
referencedCols (Derive Text
_ UExpr
expr LogicalPlan
child) =
    (Set Text -> Set Text) -> Maybe (Set Text) -> Maybe (Set Text)
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Set Text -> Set Text -> Set Text
forall a. Ord a => Set a -> Set a -> Set a
S.union ([Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList (UExpr -> [Text]
uExprCols UExpr
expr))) (LogicalPlan -> Maybe (Set Text)
referencedCols LogicalPlan
child)
referencedCols (Join JoinType
_ Text
l Text
r LogicalPlan
left LogicalPlan
right) =
    let keySet :: Set Text
keySet = [Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList [Text
l, Text
r]
        lRef :: Maybe (Set Text)
lRef = (Set Text -> Set Text) -> Maybe (Set Text) -> Maybe (Set Text)
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Set Text -> Set Text -> Set Text
forall a. Ord a => Set a -> Set a -> Set a
S.union Set Text
keySet) (LogicalPlan -> Maybe (Set Text)
referencedCols LogicalPlan
left)
        rRef :: Maybe (Set Text)
rRef = (Set Text -> Set Text) -> Maybe (Set Text) -> Maybe (Set Text)
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Set Text -> Set Text -> Set Text
forall a. Ord a => Set a -> Set a -> Set a
S.union Set Text
keySet) (LogicalPlan -> Maybe (Set Text)
referencedCols LogicalPlan
right)
     in (Set Text -> Set Text -> Set Text)
-> Maybe (Set Text) -> Maybe (Set Text) -> Maybe (Set Text)
forall a b c. (a -> b -> c) -> Maybe a -> Maybe b -> Maybe c
liftMaybe2 Set Text -> Set Text -> Set Text
forall a. Ord a => Set a -> Set a -> Set a
S.union Maybe (Set Text)
lRef Maybe (Set Text)
rRef
referencedCols (Aggregate [Text]
keys [(Text, UExpr)]
aggs LogicalPlan
child) =
    let aggCols :: Set Text
aggCols = [Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList ([Text]
keys [Text] -> [Text] -> [Text]
forall a. Semigroup a => a -> a -> a
<> ((Text, UExpr) -> [Text]) -> [(Text, UExpr)] -> [Text]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (UExpr -> [Text]
uExprCols (UExpr -> [Text])
-> ((Text, UExpr) -> UExpr) -> (Text, UExpr) -> [Text]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text, UExpr) -> UExpr
forall a b. (a, b) -> b
snd) [(Text, UExpr)]
aggs)
     in (Set Text -> Set Text) -> Maybe (Set Text) -> Maybe (Set Text)
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Set Text -> Set Text -> Set Text
forall a. Ord a => Set a -> Set a -> Set a
S.union Set Text
aggCols) (LogicalPlan -> Maybe (Set Text)
referencedCols LogicalPlan
child)
referencedCols (Sort [(Text, SortOrder)]
cols LogicalPlan
child) =
    (Set Text -> Set Text) -> Maybe (Set Text) -> Maybe (Set Text)
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Set Text -> Set Text -> Set Text
forall a. Ord a => Set a -> Set a -> Set a
S.union ([Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList (((Text, SortOrder) -> Text) -> [(Text, SortOrder)] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Text, SortOrder) -> Text
forall a b. (a, b) -> a
fst [(Text, SortOrder)]
cols))) (LogicalPlan -> Maybe (Set Text)
referencedCols LogicalPlan
child)
referencedCols (Limit Int
_ LogicalPlan
child) = LogicalPlan -> Maybe (Set Text)
referencedCols LogicalPlan
child
referencedCols (SourceDF DataFrame
_) = Maybe (Set Text)
forall a. Maybe a
Nothing

liftMaybe2 :: (a -> b -> c) -> Maybe a -> Maybe b -> Maybe c
liftMaybe2 :: forall a b c. (a -> b -> c) -> Maybe a -> Maybe b -> Maybe c
liftMaybe2 a -> b -> c
f (Just a
a) (Just b
b) = c -> Maybe c
forall a. a -> Maybe a
Just (a -> b -> c
f a
a b
b)
liftMaybe2 a -> b -> c
_ Maybe a
_ Maybe b
_ = Maybe c
forall a. Maybe a
Nothing

uExprCols :: E.UExpr -> [T.Text]
uExprCols :: UExpr -> [Text]
uExprCols (E.UExpr Expr a
expr) = Expr a -> [Text]
forall a. Expr a -> [Text]
E.getColumns Expr a
expr

-- | Drop @Derive@ nodes whose output column is never consumed downstream.
eliminateDeadColumns :: LogicalPlan -> LogicalPlan
eliminateDeadColumns :: LogicalPlan -> LogicalPlan
eliminateDeadColumns LogicalPlan
plan = Maybe (Set Text) -> LogicalPlan -> LogicalPlan
go (LogicalPlan -> Maybe (Set Text)
referencedCols LogicalPlan
plan) LogicalPlan
plan
  where
    go :: Maybe (Set Text) -> LogicalPlan -> LogicalPlan
go Maybe (Set Text)
needed (Derive Text
name UExpr
expr LogicalPlan
child) =
        case Maybe (Set Text)
needed of
            Maybe (Set Text)
Nothing -> Text -> UExpr -> LogicalPlan -> LogicalPlan
Derive Text
name UExpr
expr (Maybe (Set Text) -> LogicalPlan -> LogicalPlan
go Maybe (Set Text)
needed LogicalPlan
child)
            Just Set Text
cols
                | Text
name Text -> Set Text -> Bool
forall a. Ord a => a -> Set a -> Bool
`S.notMember` Set Text
cols -> Maybe (Set Text) -> LogicalPlan -> LogicalPlan
go Maybe (Set Text)
needed LogicalPlan
child
                | Bool
otherwise ->
                    Text -> UExpr -> LogicalPlan -> LogicalPlan
Derive
                        Text
name
                        UExpr
expr
                        (Maybe (Set Text) -> LogicalPlan -> LogicalPlan
go (Set Text -> Maybe (Set Text)
forall a. a -> Maybe a
Just (Set Text -> Set Text -> Set Text
forall a. Ord a => Set a -> Set a -> Set a
S.union Set Text
cols ([Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList (UExpr -> [Text]
uExprCols UExpr
expr)))) LogicalPlan
child)
    go Maybe (Set Text)
needed (Filter Expr Bool
p LogicalPlan
child) =
        Expr Bool -> LogicalPlan -> LogicalPlan
Filter Expr Bool
p (Maybe (Set Text) -> LogicalPlan -> LogicalPlan
go ((Set Text -> Set Text) -> Maybe (Set Text) -> Maybe (Set Text)
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Set Text -> Set Text -> Set Text
forall a. Ord a => Set a -> Set a -> Set a
S.union ([Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList (Expr Bool -> [Text]
forall a. Expr a -> [Text]
E.getColumns Expr Bool
p))) Maybe (Set Text)
needed) LogicalPlan
child)
    go Maybe (Set Text)
needed (Project [Text]
cols LogicalPlan
child) =
        [Text] -> LogicalPlan -> LogicalPlan
Project [Text]
cols (Maybe (Set Text) -> LogicalPlan -> LogicalPlan
go (Set Text -> Maybe (Set Text)
forall a. a -> Maybe a
Just ([Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList [Text]
cols)) LogicalPlan
child)
    go Maybe (Set Text)
needed (Join JoinType
jt Text
l Text
r LogicalPlan
left LogicalPlan
right) =
        let keySet :: Maybe (Set Text)
keySet = (Set Text -> Set Text) -> Maybe (Set Text) -> Maybe (Set Text)
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Set Text -> Set Text -> Set Text
forall a. Ord a => Set a -> Set a -> Set a
S.union ([Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList [Text
l, Text
r])) Maybe (Set Text)
needed
         in JoinType
-> Text -> Text -> LogicalPlan -> LogicalPlan -> LogicalPlan
Join JoinType
jt Text
l Text
r (Maybe (Set Text) -> LogicalPlan -> LogicalPlan
go Maybe (Set Text)
keySet LogicalPlan
left) (Maybe (Set Text) -> LogicalPlan -> LogicalPlan
go Maybe (Set Text)
keySet LogicalPlan
right)
    go Maybe (Set Text)
needed (Aggregate [Text]
keys [(Text, UExpr)]
aggs LogicalPlan
child) =
        let aggCols :: Maybe (Set Text)
aggCols = (Set Text -> Set Text) -> Maybe (Set Text) -> Maybe (Set Text)
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Set Text -> Set Text -> Set Text
forall a. Ord a => Set a -> Set a -> Set a
S.union ([Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList ([Text]
keys [Text] -> [Text] -> [Text]
forall a. Semigroup a => a -> a -> a
<> ((Text, UExpr) -> [Text]) -> [(Text, UExpr)] -> [Text]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (UExpr -> [Text]
uExprCols (UExpr -> [Text])
-> ((Text, UExpr) -> UExpr) -> (Text, UExpr) -> [Text]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text, UExpr) -> UExpr
forall a b. (a, b) -> b
snd) [(Text, UExpr)]
aggs))) Maybe (Set Text)
needed
         in [Text] -> [(Text, UExpr)] -> LogicalPlan -> LogicalPlan
Aggregate [Text]
keys [(Text, UExpr)]
aggs (Maybe (Set Text) -> LogicalPlan -> LogicalPlan
go Maybe (Set Text)
aggCols LogicalPlan
child)
    go Maybe (Set Text)
needed (Sort [(Text, SortOrder)]
cols LogicalPlan
child) =
        [(Text, SortOrder)] -> LogicalPlan -> LogicalPlan
Sort [(Text, SortOrder)]
cols (Maybe (Set Text) -> LogicalPlan -> LogicalPlan
go ((Set Text -> Set Text) -> Maybe (Set Text) -> Maybe (Set Text)
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Set Text -> Set Text -> Set Text
forall a. Ord a => Set a -> Set a -> Set a
S.union ([Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList (((Text, SortOrder) -> Text) -> [(Text, SortOrder)] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Text, SortOrder) -> Text
forall a b. (a, b) -> a
fst [(Text, SortOrder)]
cols))) Maybe (Set Text)
needed) LogicalPlan
child)
    go Maybe (Set Text)
needed (Limit Int
n LogicalPlan
child) = Int -> LogicalPlan -> LogicalPlan
Limit Int
n (Maybe (Set Text) -> LogicalPlan -> LogicalPlan
go Maybe (Set Text)
needed LogicalPlan
child)
    go Maybe (Set Text)
needed (Scan DataSource
ds Schema
schema) =
        case Maybe (Set Text)
needed of
            Maybe (Set Text)
Nothing -> DataSource -> Schema -> LogicalPlan
Scan DataSource
ds Schema
schema
            Just Set Text
cols ->
                DataSource -> Schema -> LogicalPlan
Scan DataSource
ds (Map Text SchemaType -> Schema
Schema ((Text -> SchemaType -> Bool)
-> Map Text SchemaType -> Map Text SchemaType
forall k a. (k -> a -> Bool) -> Map k a -> Map k a
M.filterWithKey (\Text
k SchemaType
_ -> Text
k Text -> Set Text -> Bool
forall a. Ord a => a -> Set a -> Bool
`S.member` Set Text
cols) (Schema -> Map Text SchemaType
elements Schema
schema)))
    go Maybe (Set Text)
_ (SourceDF DataFrame
df) = DataFrame -> LogicalPlan
SourceDF DataFrame
df

-- ---------------------------------------------------------------------------
-- Logical → Physical lowering
-- ---------------------------------------------------------------------------

{- | Lower the (already-optimised) logical plan to a physical plan.

Join strategy: always HashJoin (the executor can fall back to SortMerge
at runtime once statistics are available).
-}
toPhysical :: Int -> LogicalPlan -> PhysicalPlan
-- Special case: Filter directly on a Scan → push into ScanConfig.
toPhysical :: Int -> LogicalPlan -> PhysicalPlan
toPhysical Int
batchSz (Filter Expr Bool
p (Scan (CsvSource FilePath
path Char
sep) Schema
schema)) =
    DataSource -> ScanConfig -> PhysicalPlan
PhysicalScan
        (FilePath -> Char -> DataSource
CsvSource FilePath
path Char
sep)
        (Int -> Char -> Schema -> Maybe (Expr Bool) -> ScanConfig
ScanConfig Int
batchSz Char
sep Schema
schema (Expr Bool -> Maybe (Expr Bool)
forall a. a -> Maybe a
Just Expr Bool
p))
toPhysical Int
batchSz (Scan (CsvSource FilePath
path Char
sep) Schema
schema) =
    DataSource -> ScanConfig -> PhysicalPlan
PhysicalScan
        (FilePath -> Char -> DataSource
CsvSource FilePath
path Char
sep)
        (Int -> Char -> Schema -> Maybe (Expr Bool) -> ScanConfig
ScanConfig Int
batchSz Char
sep Schema
schema Maybe (Expr Bool)
forall a. Maybe a
Nothing)
toPhysical Int
batchSz (Filter Expr Bool
p (Scan (ParquetSource FilePath
path) Schema
schema)) =
    DataSource -> ScanConfig -> PhysicalPlan
PhysicalScan
        (FilePath -> DataSource
ParquetSource FilePath
path)
        (Int -> Char -> Schema -> Maybe (Expr Bool) -> ScanConfig
ScanConfig Int
batchSz Char
',' Schema
schema (Expr Bool -> Maybe (Expr Bool)
forall a. a -> Maybe a
Just Expr Bool
p))
toPhysical Int
batchSz (Scan (ParquetSource FilePath
path) Schema
schema) =
    DataSource -> ScanConfig -> PhysicalPlan
PhysicalScan
        (FilePath -> DataSource
ParquetSource FilePath
path)
        (Int -> Char -> Schema -> Maybe (Expr Bool) -> ScanConfig
ScanConfig Int
batchSz Char
',' Schema
schema Maybe (Expr Bool)
forall a. Maybe a
Nothing)
toPhysical Int
batchSz (Project [Text]
cols LogicalPlan
child) =
    [Text] -> PhysicalPlan -> PhysicalPlan
PhysicalProject [Text]
cols (Int -> LogicalPlan -> PhysicalPlan
toPhysical Int
batchSz LogicalPlan
child)
toPhysical Int
batchSz (Filter Expr Bool
p LogicalPlan
child) =
    Expr Bool -> PhysicalPlan -> PhysicalPlan
PhysicalFilter Expr Bool
p (Int -> LogicalPlan -> PhysicalPlan
toPhysical Int
batchSz LogicalPlan
child)
toPhysical Int
batchSz (Derive Text
name UExpr
expr LogicalPlan
child) =
    Text -> UExpr -> PhysicalPlan -> PhysicalPlan
PhysicalDerive Text
name UExpr
expr (Int -> LogicalPlan -> PhysicalPlan
toPhysical Int
batchSz LogicalPlan
child)
toPhysical Int
batchSz (Join JoinType
jt Text
l Text
r LogicalPlan
left LogicalPlan
right) =
    JoinType
-> Text -> Text -> PhysicalPlan -> PhysicalPlan -> PhysicalPlan
PhysicalHashJoin
        JoinType
jt
        Text
l
        Text
r
        (Int -> LogicalPlan -> PhysicalPlan
toPhysical Int
batchSz LogicalPlan
left)
        (Int -> LogicalPlan -> PhysicalPlan
toPhysical Int
batchSz LogicalPlan
right)
toPhysical Int
batchSz (Aggregate [Text]
keys [(Text, UExpr)]
aggs LogicalPlan
child) =
    [Text] -> [(Text, UExpr)] -> PhysicalPlan -> PhysicalPlan
PhysicalHashAggregate [Text]
keys [(Text, UExpr)]
aggs (Int -> LogicalPlan -> PhysicalPlan
toPhysical Int
batchSz LogicalPlan
child)
toPhysical Int
batchSz (Sort [(Text, SortOrder)]
cols LogicalPlan
child) =
    [(Text, SortOrder)] -> PhysicalPlan -> PhysicalPlan
PhysicalSort [(Text, SortOrder)]
cols (Int -> LogicalPlan -> PhysicalPlan
toPhysical Int
batchSz LogicalPlan
child)
toPhysical Int
batchSz (Limit Int
n LogicalPlan
child) =
    Int -> PhysicalPlan -> PhysicalPlan
PhysicalLimit Int
n (Int -> LogicalPlan -> PhysicalPlan
toPhysical Int
batchSz LogicalPlan
child)
toPhysical Int
_ (SourceDF DataFrame
df) = DataFrame -> PhysicalPlan
PhysicalSourceDF DataFrame
df