{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE ExplicitNamespaces #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}

{- | Pull-based (iterator) execution engine.

Each operator returns a 'Stream' — an IO action that produces the next
'DataFrame' batch on each call and returns 'Nothing' when exhausted.
Blocking operators (Sort, HashJoin) materialise their input before producing
output.  HashAggregate uses streaming partial aggregation when all aggregate
expressions support it.
-}
module DataFrame.Lazy.Internal.Executor (
    ExecutorConfig (..),
    defaultExecutorConfig,
    execute,
    foldBatches,
) where

import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBQueue (newTBQueueIO, readTBQueue, writeTBQueue)
import Control.DeepSeq (force)
import Control.Exception (evaluate)
import Control.Monad (filterM, when)
import qualified Data.ByteString as BS
import Data.IORef
import qualified Data.Map as M
import qualified Data.Set as S
import qualified Data.Text as T
import Data.Type.Equality (TestEquality (testEquality), type (:~:) (Refl))
import qualified Data.Vector.Unboxed as VU
import qualified DataFrame.IO.Parquet as Parquet
import qualified DataFrame.Internal.Column as C
import qualified DataFrame.Internal.DataFrame as D
import qualified DataFrame.Internal.Expression as E
import DataFrame.Internal.Schema (elements)
import qualified DataFrame.Lazy.IO.Binary as Bin
import qualified DataFrame.Lazy.IO.CSV as LCSV
import DataFrame.Lazy.Internal.LogicalPlan (DataSource (..), SortOrder (..))
import DataFrame.Lazy.Internal.PhysicalPlan
import qualified DataFrame.Operations.Aggregation as Agg
import qualified DataFrame.Operations.Core as Core
import qualified DataFrame.Operations.Join as Join
import DataFrame.Operations.Merge ()
import qualified DataFrame.Operations.Permutation as Perm
import qualified DataFrame.Operations.Subset as Sub
import qualified DataFrame.Operations.Transformations as Trans
import System.Directory (doesDirectoryExist)
import System.FilePath ((</>))
import System.FilePath.Glob (glob)
import System.IO (hClose)
import Type.Reflection (typeRep)

-- ---------------------------------------------------------------------------
-- Configuration
-- ---------------------------------------------------------------------------

data ExecutorConfig = ExecutorConfig
    { ExecutorConfig -> Int
memoryBudgetBytes :: !Int
    -- ^ Per-node spill threshold (currently informational; not enforced yet).
    , ExecutorConfig -> [Char]
spillDirectory :: FilePath
    , ExecutorConfig -> Int
defaultBatchSize :: !Int
    }

defaultExecutorConfig :: ExecutorConfig
defaultExecutorConfig :: ExecutorConfig
defaultExecutorConfig =
    ExecutorConfig
        { memoryBudgetBytes :: Int
memoryBudgetBytes = Int
512 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1_048_576 -- 512 MiB
        , spillDirectory :: [Char]
spillDirectory = [Char]
"/tmp"
        , defaultBatchSize :: Int
defaultBatchSize = Int
1_000_000
        }

-- ---------------------------------------------------------------------------
-- Stream abstraction
-- ---------------------------------------------------------------------------

{- | A pull-based stream: each call to the action yields the next batch or
'Nothing' when the stream is exhausted.  State is captured by the closure.
-}
newtype Stream = Stream {Stream -> IO (Maybe DataFrame)
pullBatch :: IO (Maybe D.DataFrame)}

-- | Drain all batches from a stream and concatenate them into one DataFrame.
collectStream :: Stream -> IO D.DataFrame
collectStream :: Stream -> IO DataFrame
collectStream Stream
stream = DataFrame -> IO DataFrame
go DataFrame
D.empty
  where
    go :: DataFrame -> IO DataFrame
go DataFrame
acc = do
        Maybe DataFrame
mb <- Stream -> IO (Maybe DataFrame)
pullBatch Stream
stream
        case Maybe DataFrame
mb of
            Maybe DataFrame
Nothing -> DataFrame -> IO DataFrame
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return DataFrame
acc
            Just DataFrame
df -> DataFrame -> IO DataFrame
go (DataFrame
acc DataFrame -> DataFrame -> DataFrame
forall a. Semigroup a => a -> a -> a
<> DataFrame
df)

-- ---------------------------------------------------------------------------
-- Top-level entry point
-- ---------------------------------------------------------------------------

{- | Execute a physical plan, returning the complete result as a single
'DataFrame'.
-}
execute :: PhysicalPlan -> ExecutorConfig -> IO D.DataFrame
execute :: PhysicalPlan -> ExecutorConfig -> IO DataFrame
execute PhysicalPlan
plan ExecutorConfig
cfg = PhysicalPlan -> ExecutorConfig -> IO Stream
buildStream PhysicalPlan
plan ExecutorConfig
cfg IO Stream -> (Stream -> IO DataFrame) -> IO DataFrame
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Stream -> IO DataFrame
collectStream

{- | Fold a function over every batch produced by a physical plan.
The fold is strict in the accumulator; each batch is discarded after folding.
-}
foldBatches ::
    (b -> D.DataFrame -> IO b) -> b -> PhysicalPlan -> ExecutorConfig -> IO b
foldBatches :: forall b.
(b -> DataFrame -> IO b)
-> b -> PhysicalPlan -> ExecutorConfig -> IO b
foldBatches b -> DataFrame -> IO b
f b
seed PhysicalPlan
plan ExecutorConfig
cfg = do
    Stream
stream <- PhysicalPlan -> ExecutorConfig -> IO Stream
buildStream PhysicalPlan
plan ExecutorConfig
cfg
    let loop :: b -> IO b
loop !b
acc = do
            Maybe DataFrame
mb <- Stream -> IO (Maybe DataFrame)
pullBatch Stream
stream
            case Maybe DataFrame
mb of
                Maybe DataFrame
Nothing -> b -> IO b
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return b
acc
                Just DataFrame
batch -> do
                    !b
acc' <- b -> DataFrame -> IO b
f b
acc DataFrame
batch
                    b -> IO b
loop b
acc'
    b -> IO b
loop b
seed

-- ---------------------------------------------------------------------------
-- Per-operator stream builders
-- ---------------------------------------------------------------------------

buildStream :: PhysicalPlan -> ExecutorConfig -> IO Stream
-- Scan -----------------------------------------------------------------------
buildStream :: PhysicalPlan -> ExecutorConfig -> IO Stream
buildStream (PhysicalScan (CsvSource [Char]
path Char
sep) ScanConfig
cfg) ExecutorConfig
_ =
    [Char] -> Char -> ScanConfig -> IO Stream
executeCsvScan [Char]
path Char
sep ScanConfig
cfg
buildStream (PhysicalScan (ParquetSource [Char]
path) ScanConfig
cfg) ExecutorConfig
_ =
    [Char] -> ScanConfig -> IO Stream
executeParquetScan [Char]
path ScanConfig
cfg
buildStream (PhysicalSpill PhysicalPlan
child [Char]
path) ExecutorConfig
execCfg = do
    DataFrame
df <- PhysicalPlan -> ExecutorConfig -> IO DataFrame
execute PhysicalPlan
child ExecutorConfig
execCfg
    [Char] -> DataFrame -> IO ()
Bin.spillToDisk [Char]
path DataFrame
df
    DataFrame
df' <- [Char] -> IO DataFrame
Bin.readSpilled [Char]
path
    IORef (Maybe DataFrame)
ref <- Maybe DataFrame -> IO (IORef (Maybe DataFrame))
forall a. a -> IO (IORef a)
newIORef (DataFrame -> Maybe DataFrame
forall a. a -> Maybe a
Just DataFrame
df')
    Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$
        ( do
            Maybe DataFrame
mb <- IORef (Maybe DataFrame) -> IO (Maybe DataFrame)
forall a. IORef a -> IO a
readIORef IORef (Maybe DataFrame)
ref
            IORef (Maybe DataFrame) -> Maybe DataFrame -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe DataFrame)
ref Maybe DataFrame
forall a. Maybe a
Nothing
            Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe DataFrame
mb
        )
-- Filter ---------------------------------------------------------------------
buildStream (PhysicalFilter Expr Bool
p PhysicalPlan
child) ExecutorConfig
execCfg = do
    Stream
childStream <- PhysicalPlan -> ExecutorConfig -> IO Stream
buildStream PhysicalPlan
child ExecutorConfig
execCfg
    Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$
        ( do
            Maybe DataFrame
mb <- Stream -> IO (Maybe DataFrame)
pullBatch Stream
childStream
            Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe DataFrame -> IO (Maybe DataFrame))
-> Maybe DataFrame -> IO (Maybe DataFrame)
forall a b. (a -> b) -> a -> b
$ (DataFrame -> DataFrame) -> Maybe DataFrame -> Maybe DataFrame
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Expr Bool -> DataFrame -> DataFrame
Sub.filterWhere Expr Bool
p) Maybe DataFrame
mb
        )
-- Project --------------------------------------------------------------------
buildStream (PhysicalProject [Text]
cols PhysicalPlan
child) ExecutorConfig
execCfg = do
    Stream
childStream <- PhysicalPlan -> ExecutorConfig -> IO Stream
buildStream PhysicalPlan
child ExecutorConfig
execCfg
    Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$
        ( do
            Maybe DataFrame
mb <- Stream -> IO (Maybe DataFrame)
pullBatch Stream
childStream
            Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe DataFrame -> IO (Maybe DataFrame))
-> Maybe DataFrame -> IO (Maybe DataFrame)
forall a b. (a -> b) -> a -> b
$ (DataFrame -> DataFrame) -> Maybe DataFrame -> Maybe DataFrame
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([Text] -> DataFrame -> DataFrame
Sub.select [Text]
cols) Maybe DataFrame
mb
        )
-- Derive ---------------------------------------------------------------------
buildStream (PhysicalDerive Text
name UExpr
uexpr PhysicalPlan
child) ExecutorConfig
execCfg = do
    Stream
childStream <- PhysicalPlan -> ExecutorConfig -> IO Stream
buildStream PhysicalPlan
child ExecutorConfig
execCfg
    Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$
        ( do
            Maybe DataFrame
mb <- Stream -> IO (Maybe DataFrame)
pullBatch Stream
childStream
            Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe DataFrame -> IO (Maybe DataFrame))
-> Maybe DataFrame -> IO (Maybe DataFrame)
forall a b. (a -> b) -> a -> b
$ (DataFrame -> DataFrame) -> Maybe DataFrame -> Maybe DataFrame
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([NamedExpr] -> DataFrame -> DataFrame
Trans.deriveMany [(Text
name, UExpr
uexpr)]) Maybe DataFrame
mb
        )
-- Limit ----------------------------------------------------------------------
buildStream (PhysicalLimit Int
n PhysicalPlan
child) ExecutorConfig
execCfg = do
    Stream
childStream <- PhysicalPlan -> ExecutorConfig -> IO Stream
buildStream PhysicalPlan
child ExecutorConfig
execCfg
    IORef Int
countRef <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef (Int
0 :: Int)
    Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$
        ( do
            Int
remaining <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
countRef
            if Int
remaining Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n
                then Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe DataFrame
forall a. Maybe a
Nothing
                else do
                    Maybe DataFrame
mb <- Stream -> IO (Maybe DataFrame)
pullBatch Stream
childStream
                    case Maybe DataFrame
mb of
                        Maybe DataFrame
Nothing -> Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe DataFrame
forall a. Maybe a
Nothing
                        Just DataFrame
df -> do
                            let toTake :: Int
toTake = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min (DataFrame -> Int
Core.nRows DataFrame
df) (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
remaining)
                            IORef Int -> (Int -> Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef Int
countRef (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
toTake)
                            Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe DataFrame -> IO (Maybe DataFrame))
-> Maybe DataFrame -> IO (Maybe DataFrame)
forall a b. (a -> b) -> a -> b
$ DataFrame -> Maybe DataFrame
forall a. a -> Maybe a
Just (Int -> DataFrame -> DataFrame
Sub.take Int
toTake DataFrame
df)
        )
-- Sort (blocking) ------------------------------------------------------------
buildStream (PhysicalSort [(Text, SortOrder)]
cols PhysicalPlan
child) ExecutorConfig
execCfg = do
    DataFrame
df <- PhysicalPlan -> ExecutorConfig -> IO DataFrame
execute PhysicalPlan
child ExecutorConfig
execCfg
    let sortOrds :: [SortOrder]
sortOrds = ((Text, SortOrder) -> SortOrder)
-> [(Text, SortOrder)] -> [SortOrder]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Text, SortOrder) -> SortOrder
toPermSortOrder [(Text, SortOrder)]
cols
    let sorted :: DataFrame
sorted = [SortOrder] -> DataFrame -> DataFrame
Perm.sortBy [SortOrder]
sortOrds DataFrame
df
    IORef (Maybe DataFrame)
ref <- Maybe DataFrame -> IO (IORef (Maybe DataFrame))
forall a. a -> IO (IORef a)
newIORef (DataFrame -> Maybe DataFrame
forall a. a -> Maybe a
Just DataFrame
sorted)
    Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$
        ( do
            Maybe DataFrame
mb <- IORef (Maybe DataFrame) -> IO (Maybe DataFrame)
forall a. IORef a -> IO a
readIORef IORef (Maybe DataFrame)
ref
            IORef (Maybe DataFrame) -> Maybe DataFrame -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe DataFrame)
ref Maybe DataFrame
forall a. Maybe a
Nothing
            Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe DataFrame
mb
        )
-- HashAggregate --------------------------------------------------------------
buildStream (PhysicalHashAggregate [Text]
keys [NamedExpr]
aggs PhysicalPlan
child) ExecutorConfig
execCfg = do
    Stream
childStream <- PhysicalPlan -> ExecutorConfig -> IO Stream
buildStream PhysicalPlan
child ExecutorConfig
execCfg
    if (NamedExpr -> Bool) -> [NamedExpr] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all (UExpr -> Bool
isStreamableAgg (UExpr -> Bool) -> (NamedExpr -> UExpr) -> NamedExpr -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NamedExpr -> UExpr
forall a b. (a, b) -> b
snd) [NamedExpr]
aggs
        then do
            -- Streaming partial aggregation: O(|groups|) memory
            let ([NamedExpr]
partialAggs, [NamedExpr]
mergeAggs, DataFrame -> DataFrame
finalizer) = [NamedExpr] -> ([NamedExpr], [NamedExpr], DataFrame -> DataFrame)
buildAggPlan [NamedExpr]
aggs
            IORef (Maybe DataFrame)
accRef <- Maybe DataFrame -> IO (IORef (Maybe DataFrame))
forall a. a -> IO (IORef a)
newIORef (Maybe DataFrame
forall a. Maybe a
Nothing :: Maybe D.DataFrame)
            let loop :: IO ()
loop = do
                    Maybe DataFrame
mb <- Stream -> IO (Maybe DataFrame)
pullBatch Stream
childStream
                    case Maybe DataFrame
mb of
                        Maybe DataFrame
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                        Just DataFrame
batch -> do
                            -- Force to NF so the batch DataFrame can be GC'd immediately.
                            -- evaluate . force breaks the thunk chain that would otherwise
                            -- keep every batch (~60 MB each) alive until the end = OOM.
                            !DataFrame
partial <-
                                DataFrame -> IO DataFrame
forall a. a -> IO a
evaluate (DataFrame -> IO DataFrame)
-> (DataFrame -> DataFrame) -> DataFrame -> IO DataFrame
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DataFrame -> DataFrame
forall a. NFData a => a -> a
force (DataFrame -> IO DataFrame) -> DataFrame -> IO DataFrame
forall a b. (a -> b) -> a -> b
$ [NamedExpr] -> GroupedDataFrame -> DataFrame
Agg.aggregate [NamedExpr]
partialAggs ([Text] -> DataFrame -> GroupedDataFrame
Agg.groupBy [Text]
keys DataFrame
batch)
                            Maybe DataFrame
mAcc <- IORef (Maybe DataFrame) -> IO (Maybe DataFrame)
forall a. IORef a -> IO a
readIORef IORef (Maybe DataFrame)
accRef
                            !DataFrame
newAcc <- case Maybe DataFrame
mAcc of
                                Maybe DataFrame
Nothing -> DataFrame -> IO DataFrame
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return DataFrame
partial
                                Just DataFrame
acc ->
                                    DataFrame -> IO DataFrame
forall a. a -> IO a
evaluate (DataFrame -> IO DataFrame)
-> (DataFrame -> DataFrame) -> DataFrame -> IO DataFrame
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DataFrame -> DataFrame
forall a. NFData a => a -> a
force (DataFrame -> IO DataFrame) -> DataFrame -> IO DataFrame
forall a b. (a -> b) -> a -> b
$
                                        [NamedExpr] -> GroupedDataFrame -> DataFrame
Agg.aggregate [NamedExpr]
mergeAggs (GroupedDataFrame -> DataFrame) -> GroupedDataFrame -> DataFrame
forall a b. (a -> b) -> a -> b
$
                                            [Text] -> DataFrame -> GroupedDataFrame
Agg.groupBy [Text]
keys (DataFrame
acc DataFrame -> DataFrame -> DataFrame
forall a. Semigroup a => a -> a -> a
<> DataFrame
partial)
                            IORef (Maybe DataFrame) -> Maybe DataFrame -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe DataFrame)
accRef (DataFrame -> Maybe DataFrame
forall a. a -> Maybe a
Just DataFrame
newAcc)
                            IO ()
loop
            IO ()
loop
            Maybe DataFrame
mFinal <- (Maybe DataFrame -> Maybe DataFrame)
-> IO (Maybe DataFrame) -> IO (Maybe DataFrame)
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((DataFrame -> DataFrame) -> Maybe DataFrame -> Maybe DataFrame
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap DataFrame -> DataFrame
finalizer) (IORef (Maybe DataFrame) -> IO (Maybe DataFrame)
forall a. IORef a -> IO a
readIORef IORef (Maybe DataFrame)
accRef)
            IORef (Maybe DataFrame)
ref <- Maybe DataFrame -> IO (IORef (Maybe DataFrame))
forall a. a -> IO (IORef a)
newIORef Maybe DataFrame
mFinal
            Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$ do
                Maybe DataFrame
mb <- IORef (Maybe DataFrame) -> IO (Maybe DataFrame)
forall a. IORef a -> IO a
readIORef IORef (Maybe DataFrame)
ref
                IORef (Maybe DataFrame) -> Maybe DataFrame -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe DataFrame)
ref Maybe DataFrame
forall a. Maybe a
Nothing
                Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe DataFrame
mb
        else do
            -- Fallback: materialise entire child (for CollectAgg etc.)
            DataFrame
df <- Stream -> IO DataFrame
collectStream Stream
childStream
            let result :: DataFrame
result = [NamedExpr] -> GroupedDataFrame -> DataFrame
Agg.aggregate [NamedExpr]
aggs ([Text] -> DataFrame -> GroupedDataFrame
Agg.groupBy [Text]
keys DataFrame
df)
            IORef (Maybe DataFrame)
ref <- Maybe DataFrame -> IO (IORef (Maybe DataFrame))
forall a. a -> IO (IORef a)
newIORef (DataFrame -> Maybe DataFrame
forall a. a -> Maybe a
Just DataFrame
result)
            Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$ do
                Maybe DataFrame
mb <- IORef (Maybe DataFrame) -> IO (Maybe DataFrame)
forall a. IORef a -> IO a
readIORef IORef (Maybe DataFrame)
ref
                IORef (Maybe DataFrame) -> Maybe DataFrame -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe DataFrame)
ref Maybe DataFrame
forall a. Maybe a
Nothing
                Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe DataFrame
mb
-- SourceDF (split pre-loaded DataFrame into batches) -------------------------
buildStream (PhysicalSourceDF DataFrame
df) ExecutorConfig
execCfg = do
    let bs :: Int
bs = ExecutorConfig -> Int
defaultBatchSize ExecutorConfig
execCfg
        total :: Int
total = DataFrame -> Int
Core.nRows DataFrame
df
    IORef Int
posRef <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef (Int
0 :: Int)
    Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$ do
        Int
i <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
posRef
        if Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
total
            then Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe DataFrame
forall a. Maybe a
Nothing
            else do
                let n :: Int
n = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
bs (Int
total Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
i)
                    batch :: DataFrame
batch = (Int, Int) -> DataFrame -> DataFrame
Sub.range (Int
i, Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
n) DataFrame
df
                IORef Int -> Int -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Int
posRef (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
n)
                Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DataFrame -> Maybe DataFrame
forall a. a -> Maybe a
Just DataFrame
batch)
-- HashJoin — streaming probe (INNER/LEFT) or blocking fallback ----------------
buildStream (PhysicalHashJoin JoinType
jt Text
leftKey Text
rightKey PhysicalPlan
leftPlan PhysicalPlan
rightPlan) ExecutorConfig
execCfg =
    case JoinType
jt of
        JoinType
Join.INNER -> (Set Text
 -> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame)
-> IO Stream
streamingHashJoin Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame
assembleInnerBatch
        JoinType
Join.LEFT -> (Set Text
 -> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame)
-> IO Stream
streamingHashJoin Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame
assembleLeftBatch
        JoinType
_ -> do
            -- Blocking fallback for RIGHT / FULL_OUTER
            DataFrame
leftDf <- PhysicalPlan -> ExecutorConfig -> IO DataFrame
execute PhysicalPlan
leftPlan ExecutorConfig
execCfg
            DataFrame
rightDf <- PhysicalPlan -> ExecutorConfig -> IO DataFrame
execute PhysicalPlan
rightPlan ExecutorConfig
execCfg
            let result :: DataFrame
result = JoinType -> Text -> Text -> DataFrame -> DataFrame -> DataFrame
performJoin JoinType
jt Text
leftKey Text
rightKey DataFrame
leftDf DataFrame
rightDf
            IORef (Maybe DataFrame)
ref <- Maybe DataFrame -> IO (IORef (Maybe DataFrame))
forall a. a -> IO (IORef a)
newIORef (DataFrame -> Maybe DataFrame
forall a. a -> Maybe a
Just DataFrame
result)
            Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$ do
                Maybe DataFrame
mb <- IORef (Maybe DataFrame) -> IO (Maybe DataFrame)
forall a. IORef a -> IO a
readIORef IORef (Maybe DataFrame)
ref
                IORef (Maybe DataFrame) -> Maybe DataFrame -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe DataFrame)
ref Maybe DataFrame
forall a. Maybe a
Nothing
                Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe DataFrame
mb
  where
    streamingHashJoin :: (Set Text
 -> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame)
-> IO Stream
streamingHashJoin Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame
assembleFn = do
        -- Materialise build (right) side once and build the compact index.
        DataFrame
rightDf <- PhysicalPlan -> ExecutorConfig -> IO DataFrame
execute PhysicalPlan
rightPlan ExecutorConfig
execCfg
        let rightDf' :: DataFrame
rightDf' =
                if Text
leftKey Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Text
rightKey
                    then DataFrame
rightDf
                    else Text -> Text -> DataFrame -> DataFrame
Core.rename Text
rightKey Text
leftKey DataFrame
rightDf
            joinKey :: Text
joinKey = Text
leftKey
            csSet :: Set Text
csSet = [Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList [Text
joinKey]
            rightHashes :: Vector Int
rightHashes = [Text] -> DataFrame -> Vector Int
Join.buildHashColumn [Text
joinKey] DataFrame
rightDf'
            ci :: CompactIndex
ci = Vector Int -> CompactIndex
Join.buildCompactIndex Vector Int
rightHashes
        -- Stream probe (left) side batch by batch.
        Stream
leftStream <- PhysicalPlan -> ExecutorConfig -> IO Stream
buildStream PhysicalPlan
leftPlan ExecutorConfig
execCfg
        Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$ do
            Maybe DataFrame
mBatch <- Stream -> IO (Maybe DataFrame)
pullBatch Stream
leftStream
            case Maybe DataFrame
mBatch of
                Maybe DataFrame
Nothing -> Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe DataFrame
forall a. Maybe a
Nothing
                Just DataFrame
probeBatch -> do
                    let probeHashes :: Vector Int
probeHashes = [Text] -> DataFrame -> Vector Int
Join.buildHashColumn [Text
joinKey] DataFrame
probeBatch
                        (Vector Int
probeIxs, Vector Int
buildIxs) = CompactIndex -> Vector Int -> (Vector Int, Vector Int)
Join.hashProbeKernel CompactIndex
ci Vector Int
probeHashes
                    Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe DataFrame -> IO (Maybe DataFrame))
-> (DataFrame -> Maybe DataFrame)
-> DataFrame
-> IO (Maybe DataFrame)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DataFrame -> Maybe DataFrame
forall a. a -> Maybe a
Just (DataFrame -> IO (Maybe DataFrame))
-> DataFrame -> IO (Maybe DataFrame)
forall a b. (a -> b) -> a -> b
$ Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame
assembleFn Set Text
csSet DataFrame
probeBatch DataFrame
rightDf' Vector Int
probeIxs Vector Int
buildIxs

    assembleLeftBatch :: Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame
assembleLeftBatch Set Text
csSet DataFrame
probeBatch DataFrame
rightDf' Vector Int
probeIxs Vector Int
buildIxs =
        let batchN :: Int
batchN = DataFrame -> Int
Core.nRows DataFrame
probeBatch
            -- Mark which probe rows were matched (may have duplicates — that's fine).
            matched :: Vector Bool
matched =
                (Bool -> Bool -> Bool)
-> Vector Bool -> Vector (Int, Bool) -> Vector Bool
forall a b.
(Unbox a, Unbox b) =>
(a -> b -> a) -> Vector a -> Vector (Int, b) -> Vector a
VU.accumulate
                    (\Bool
_ Bool
b -> Bool
b)
                    (Int -> Bool -> Vector Bool
forall a. Unbox a => Int -> a -> Vector a
VU.replicate Int
batchN Bool
False)
                    ((Int -> (Int, Bool)) -> Vector Int -> Vector (Int, Bool)
forall a b. (Unbox a, Unbox b) => (a -> b) -> Vector a -> Vector b
VU.map (,Bool
True) Vector Int
probeIxs)
            unmatchedIxs :: Vector Int
unmatchedIxs = (Bool -> Bool) -> Vector Bool -> Vector Int
forall a. Unbox a => (a -> Bool) -> Vector a -> Vector Int
VU.findIndices Bool -> Bool
not Vector Bool
matched
            allProbeIxs :: Vector Int
allProbeIxs = Vector Int
probeIxs Vector Int -> Vector Int -> Vector Int
forall a. Unbox a => Vector a -> Vector a -> Vector a
VU.++ Vector Int
unmatchedIxs
            allBuildIxs :: Vector Int
allBuildIxs = Vector Int
buildIxs Vector Int -> Vector Int -> Vector Int
forall a. Unbox a => Vector a -> Vector a -> Vector a
VU.++ Int -> Int -> Vector Int
forall a. Unbox a => Int -> a -> Vector a
VU.replicate (Vector Int -> Int
forall a. Unbox a => Vector a -> Int
VU.length Vector Int
unmatchedIxs) (-Int
1)
         in Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame
Join.assembleLeft Set Text
csSet DataFrame
probeBatch DataFrame
rightDf' Vector Int
allProbeIxs Vector Int
allBuildIxs

    assembleInnerBatch :: Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame
assembleInnerBatch = Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame
Join.assembleInner

-- SortMergeJoin (blocking on both sides) -------------------------------------
buildStream (PhysicalSortMergeJoin JoinType
jt Text
leftKey Text
rightKey PhysicalPlan
leftPlan PhysicalPlan
rightPlan) ExecutorConfig
execCfg = do
    DataFrame
leftDf <- PhysicalPlan -> ExecutorConfig -> IO DataFrame
execute PhysicalPlan
leftPlan ExecutorConfig
execCfg
    DataFrame
rightDf <- PhysicalPlan -> ExecutorConfig -> IO DataFrame
execute PhysicalPlan
rightPlan ExecutorConfig
execCfg
    let result :: DataFrame
result = JoinType -> Text -> Text -> DataFrame -> DataFrame -> DataFrame
performJoin JoinType
jt Text
leftKey Text
rightKey DataFrame
leftDf DataFrame
rightDf
    IORef (Maybe DataFrame)
ref <- Maybe DataFrame -> IO (IORef (Maybe DataFrame))
forall a. a -> IO (IORef a)
newIORef (DataFrame -> Maybe DataFrame
forall a. a -> Maybe a
Just DataFrame
result)
    Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$
        ( do
            Maybe DataFrame
mb <- IORef (Maybe DataFrame) -> IO (Maybe DataFrame)
forall a. IORef a -> IO a
readIORef IORef (Maybe DataFrame)
ref
            IORef (Maybe DataFrame) -> Maybe DataFrame -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe DataFrame)
ref Maybe DataFrame
forall a. Maybe a
Nothing
            Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe DataFrame
mb
        )

-- ---------------------------------------------------------------------------
-- Streaming aggregation helpers
-- ---------------------------------------------------------------------------

{- | True when an aggregate expression can be computed incrementally
(i.e., partial results can be merged without materialising all rows).
-}
isStreamableAgg :: E.UExpr -> Bool
isStreamableAgg :: UExpr -> Bool
isStreamableAgg (E.UExpr (E.Agg (E.CollectAgg Text
_ v b -> a
_) Expr b
_)) = Bool
False
isStreamableAgg (E.UExpr (E.Agg (E.FoldAgg Text
_ Maybe a
Nothing (a -> b -> a
_ :: a -> b -> a)) Expr b
_)) =
    case TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @a) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @b) of
        Just a :~: b
Refl -> Bool
True -- self-merging: min, max, sum
        Maybe (a :~: b)
Nothing -> Bool
False
isStreamableAgg (E.UExpr (E.Agg (E.FoldAgg Text
_ (Just a
_) (a -> b -> a
_ :: a -> b -> a)) Expr b
_)) =
    case TypeRep a -> TypeRep Int -> Maybe (a :~: Int)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @a) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @Int) of
        Just a :~: Int
Refl -> Bool
True -- seeded Int fold (old-style count): merge by sum
        Maybe (a :~: Int)
Nothing ->
            case TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @a) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @b) of
                Just a :~: b
Refl -> Bool
True -- seeded self-merging
                Maybe (a :~: b)
Nothing -> Bool
False
isStreamableAgg (E.UExpr (E.Agg (E.MergeAgg{}) Expr b
_)) = Bool
True
isStreamableAgg UExpr
_ = Bool
False

{- | Build the partial, merge, and finalizer plan for a list of streamable
aggregate expressions.

* @partialAggs@  — applied per batch, producing one row per group
* @mergeAggs@    — applied when combining two partial-result DataFrames
* @finalizer@    — post-process after all batches (needed for 'MergeAgg'
                   where the accumulator type differs from the output type)
-}
buildAggPlan ::
    [(T.Text, E.UExpr)] ->
    ( [(T.Text, E.UExpr)]
    , [(T.Text, E.UExpr)]
    , D.DataFrame -> D.DataFrame
    )
buildAggPlan :: [NamedExpr] -> ([NamedExpr], [NamedExpr], DataFrame -> DataFrame)
buildAggPlan [NamedExpr]
aggs = (([NamedExpr], [NamedExpr], DataFrame -> DataFrame)
 -> ([NamedExpr], [NamedExpr], DataFrame -> DataFrame)
 -> ([NamedExpr], [NamedExpr], DataFrame -> DataFrame))
-> ([NamedExpr], [NamedExpr], DataFrame -> DataFrame)
-> [([NamedExpr], [NamedExpr], DataFrame -> DataFrame)]
-> ([NamedExpr], [NamedExpr], DataFrame -> DataFrame)
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl ([NamedExpr], [NamedExpr], DataFrame -> DataFrame)
-> ([NamedExpr], [NamedExpr], DataFrame -> DataFrame)
-> ([NamedExpr], [NamedExpr], DataFrame -> DataFrame)
forall {a} {a} {b} {c} {a}.
([a], [a], b -> c) -> ([a], [a], a -> b) -> ([a], [a], a -> c)
combine ([], [], DataFrame -> DataFrame
forall a. a -> a
id) ((NamedExpr -> ([NamedExpr], [NamedExpr], DataFrame -> DataFrame))
-> [NamedExpr]
-> [([NamedExpr], [NamedExpr], DataFrame -> DataFrame)]
forall a b. (a -> b) -> [a] -> [b]
map NamedExpr -> ([NamedExpr], [NamedExpr], DataFrame -> DataFrame)
processAgg [NamedExpr]
aggs)
  where
    combine :: ([a], [a], b -> c) -> ([a], [a], a -> b) -> ([a], [a], a -> c)
combine ([a]
p1, [a]
m1, b -> c
f1) ([a]
p2, [a]
m2, a -> b
f2) = ([a]
p1 [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
++ [a]
p2, [a]
m1 [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
++ [a]
m2, b -> c
f1 (b -> c) -> (a -> b) -> a -> c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
f2)

    processAgg ::
        (T.Text, E.UExpr) ->
        ([(T.Text, E.UExpr)], [(T.Text, E.UExpr)], D.DataFrame -> D.DataFrame)
    processAgg :: NamedExpr -> ([NamedExpr], [NamedExpr], DataFrame -> DataFrame)
processAgg (Text
name, UExpr
ue) = case UExpr
ue of
        -- Seedless FoldAgg: min, max, sum (self-merging when a = b)
        E.UExpr (E.Agg (E.FoldAgg Text
n Maybe a
Nothing (a -> b -> a
f :: a -> b -> a)) (Expr b
_ :: E.Expr b)) ->
            case TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @a) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @b) of
                Just a :~: b
Refl ->
                    ( [(Text
name, UExpr
ue)]
                    , [(Text
name, Expr a -> UExpr
forall a. Columnable a => Expr a -> UExpr
E.UExpr (AggStrategy a b -> Expr b -> Expr a
forall a b.
(Columnable a, Columnable b) =>
AggStrategy a b -> Expr b -> Expr a
E.Agg (Text -> Maybe a -> (a -> b -> a) -> AggStrategy a b
forall a b. Text -> Maybe a -> (a -> b -> a) -> AggStrategy a b
E.FoldAgg Text
n Maybe a
forall a. Maybe a
Nothing a -> b -> a
f) (forall a. Columnable a => Text -> Expr a
E.Col @a Text
name)))]
                    , DataFrame -> DataFrame
forall a. a -> a
id
                    )
                Maybe (a :~: b)
Nothing ->
                    -- a /= b but a = Int: merge by sum (backward compat)
                    case TypeRep a -> TypeRep Int -> Maybe (a :~: Int)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @a) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @Int) of
                        Just a :~: Int
Refl ->
                            ( [(Text
name, UExpr
ue)]
                            ,
                                [
                                    ( Text
name
                                    , Expr Int -> UExpr
forall a. Columnable a => Expr a -> UExpr
E.UExpr
                                        (AggStrategy Int Int -> Expr Int -> Expr Int
forall a b.
(Columnable a, Columnable b) =>
AggStrategy a b -> Expr b -> Expr a
E.Agg (Text -> Maybe Int -> (Int -> Int -> Int) -> AggStrategy Int Int
forall a b. Text -> Maybe a -> (a -> b -> a) -> AggStrategy a b
E.FoldAgg Text
"sum" Maybe Int
forall a. Maybe a
Nothing (Int -> Int -> Int
forall a. Num a => a -> a -> a
(+) :: Int -> Int -> Int)) (forall a. Columnable a => Text -> Expr a
E.Col @Int Text
name))
                                    )
                                ]
                            , DataFrame -> DataFrame
forall a. a -> a
id
                            )
                        Maybe (a :~: Int)
Nothing -> ([(Text
name, UExpr
ue)], [(Text
name, UExpr
ue)], DataFrame -> DataFrame
forall a. a -> a
id)
        -- Seeded FoldAgg: old-style count (a = Int)
        E.UExpr (E.Agg (E.FoldAgg Text
n (Just a
_) (a -> b -> a
f :: a -> b -> a)) (Expr b
_ :: E.Expr b)) ->
            case TypeRep a -> TypeRep Int -> Maybe (a :~: Int)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @a) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @Int) of
                Just a :~: Int
Refl ->
                    ( [(Text
name, UExpr
ue)]
                    ,
                        [
                            ( Text
name
                            , Expr Int -> UExpr
forall a. Columnable a => Expr a -> UExpr
E.UExpr
                                (AggStrategy Int Int -> Expr Int -> Expr Int
forall a b.
(Columnable a, Columnable b) =>
AggStrategy a b -> Expr b -> Expr a
E.Agg (Text -> Maybe Int -> (Int -> Int -> Int) -> AggStrategy Int Int
forall a b. Text -> Maybe a -> (a -> b -> a) -> AggStrategy a b
E.FoldAgg Text
"sum" Maybe Int
forall a. Maybe a
Nothing (Int -> Int -> Int
forall a. Num a => a -> a -> a
(+) :: Int -> Int -> Int)) (forall a. Columnable a => Text -> Expr a
E.Col @Int Text
name))
                            )
                        ]
                    , DataFrame -> DataFrame
forall a. a -> a
id
                    )
                Maybe (a :~: Int)
Nothing ->
                    case TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @a) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @b) of
                        Just a :~: b
Refl ->
                            ( [(Text
name, UExpr
ue)]
                            , [(Text
name, Expr a -> UExpr
forall a. Columnable a => Expr a -> UExpr
E.UExpr (AggStrategy a b -> Expr b -> Expr a
forall a b.
(Columnable a, Columnable b) =>
AggStrategy a b -> Expr b -> Expr a
E.Agg (Text -> Maybe a -> (a -> b -> a) -> AggStrategy a b
forall a b. Text -> Maybe a -> (a -> b -> a) -> AggStrategy a b
E.FoldAgg Text
n Maybe a
forall a. Maybe a
Nothing a -> b -> a
f) (forall a. Columnable a => Text -> Expr a
E.Col @a Text
name)))]
                            , DataFrame -> DataFrame
forall a. a -> a
id
                            )
                        Maybe (a :~: b)
Nothing -> ([(Text
name, UExpr
ue)], [(Text
name, UExpr
ue)], DataFrame -> DataFrame
forall a. a -> a
id)
        -- MergeAgg: count, mean, etc.
        -- Partial step: accumulate into acc type (using id as finalizer).
        -- Merge step: apply merge function to two acc-typed partial results.
        -- Finalizer: apply fin to convert acc column to output type.
        E.UExpr
            ( E.Agg
                    ( E.MergeAgg
                            Text
n
                            acc
seed
                            (acc -> b -> acc
step :: acc -> b -> acc)
                            (acc -> acc -> acc
merge :: acc -> acc -> acc)
                            (acc -> a
fin :: acc -> a)
                        )
                    (Expr b
inner :: E.Expr b)
                ) ->
                let partialExpr :: UExpr
partialExpr =
                        Expr acc -> UExpr
forall a. Columnable a => Expr a -> UExpr
E.UExpr
                            ( AggStrategy acc b -> Expr b -> Expr acc
forall a b.
(Columnable a, Columnable b) =>
AggStrategy a b -> Expr b -> Expr a
E.Agg
                                (Text
-> acc
-> (acc -> b -> acc)
-> (acc -> acc -> acc)
-> (acc -> acc)
-> AggStrategy acc b
forall acc b a.
Columnable acc =>
Text
-> acc
-> (acc -> b -> acc)
-> (acc -> acc -> acc)
-> (acc -> a)
-> AggStrategy a b
E.MergeAgg Text
n acc
seed acc -> b -> acc
step acc -> acc -> acc
merge (acc -> acc
forall a. a -> a
id :: acc -> acc))
                                Expr b
inner
                            )
                    mergeExpr :: UExpr
mergeExpr =
                        Expr acc -> UExpr
forall a. Columnable a => Expr a -> UExpr
E.UExpr
                            ( AggStrategy acc acc -> Expr acc -> Expr acc
forall a b.
(Columnable a, Columnable b) =>
AggStrategy a b -> Expr b -> Expr a
E.Agg
                                (Text -> Maybe acc -> (acc -> acc -> acc) -> AggStrategy acc acc
forall a b. Text -> Maybe a -> (a -> b -> a) -> AggStrategy a b
E.FoldAgg (Text
"merge_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
n) Maybe acc
forall a. Maybe a
Nothing acc -> acc -> acc
merge)
                                (forall a. Columnable a => Text -> Expr a
E.Col @acc Text
name)
                            )
                    finalize :: DataFrame -> DataFrame
finalize DataFrame
df =
                        let accCol :: Column
accCol = Text -> DataFrame -> Column
D.unsafeGetColumn Text
name DataFrame
df
                            finalCol :: Column
finalCol =
                                (DataFrameException -> Column)
-> (Column -> Column) -> Either DataFrameException Column -> Column
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
                                    ([Char] -> DataFrameException -> Column
forall a. HasCallStack => [Char] -> a
error [Char]
"buildAggPlan: MergeAgg finalize failed")
                                    Column -> Column
forall a. a -> a
id
                                    (forall b c.
(Columnable b, Columnable c) =>
(b -> c) -> Column -> Either DataFrameException Column
C.mapColumn @acc @a acc -> a
fin Column
accCol)
                         in Text -> Column -> DataFrame -> DataFrame
Core.insertColumn Text
name Column
finalCol DataFrame
df
                 in ( [(Text
name, UExpr
partialExpr)]
                    , [(Text
name, UExpr
mergeExpr)]
                    , DataFrame -> DataFrame
finalize
                    )
        UExpr
_ -> ([(Text
name, UExpr
ue)], [(Text
name, UExpr
ue)], DataFrame -> DataFrame
forall a. a -> a
id)

-- ---------------------------------------------------------------------------
-- Parquet scan implementation
-- ---------------------------------------------------------------------------

{- | Scan a Parquet file, directory, or glob.  Each file becomes one batch.
Column projection and predicate pushdown are forwarded to 'readParquetWithOpts'
via 'ParquetReadOptions'.
-}
executeParquetScan :: FilePath -> ScanConfig -> IO Stream
executeParquetScan :: [Char] -> ScanConfig -> IO Stream
executeParquetScan [Char]
path ScanConfig
cfg = do
    Bool
isDir <- [Char] -> IO Bool
doesDirectoryExist [Char]
path
    let pat :: [Char]
pat = if Bool
isDir then [Char]
path [Char] -> [Char] -> [Char]
</> [Char]
"*" else [Char]
path
    [[Char]]
matches <- [Char] -> IO [[Char]]
glob [Char]
pat
    [[Char]]
files <- ([Char] -> IO Bool) -> [[Char]] -> IO [[Char]]
forall (m :: * -> *) a.
Applicative m =>
(a -> m Bool) -> [a] -> m [a]
filterM ((Bool -> Bool) -> IO Bool -> IO Bool
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Bool -> Bool
not (IO Bool -> IO Bool) -> ([Char] -> IO Bool) -> [Char] -> IO Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Char] -> IO Bool
doesDirectoryExist) [[Char]]
matches
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ([[Char]] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [[Char]]
files) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
        [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error ([Char]
"executeParquetScan: no parquet files found for " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
path)
    let opts :: ParquetReadOptions
opts =
            ParquetReadOptions
Parquet.defaultParquetReadOptions
                { Parquet.selectedColumns = Just (M.keys (elements (scanSchema cfg)))
                , Parquet.predicate = scanPushdownPredicate cfg
                }
    IORef [[Char]]
ref <- [[Char]] -> IO (IORef [[Char]])
forall a. a -> IO (IORef a)
newIORef [[Char]]
files
    Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$ do
        [[Char]]
fs <- IORef [[Char]] -> IO [[Char]]
forall a. IORef a -> IO a
readIORef IORef [[Char]]
ref
        case [[Char]]
fs of
            [] -> Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe DataFrame
forall a. Maybe a
Nothing
            ([Char]
f : [[Char]]
rest) -> do
                IORef [[Char]] -> [[Char]] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef [[Char]]
ref [[Char]]
rest
                DataFrame -> Maybe DataFrame
forall a. a -> Maybe a
Just (DataFrame -> Maybe DataFrame)
-> IO DataFrame -> IO (Maybe DataFrame)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ParquetReadOptions -> [Char] -> IO DataFrame
Parquet.readParquetWithOpts ParquetReadOptions
opts [Char]
f

-- ---------------------------------------------------------------------------
-- CSV scan implementation
-- ---------------------------------------------------------------------------

{- | CSV scan with pipeline parallelism: a dedicated reader thread fills a
bounded queue while the caller's thread applies pushdown predicates and
delivers batches to the rest of the pipeline.  The queue depth of 8 keeps
at most eight raw batches in flight, bounding memory while hiding I/O latency.
-}
executeCsvScan :: FilePath -> Char -> ScanConfig -> IO Stream
executeCsvScan :: [Char] -> Char -> ScanConfig -> IO Stream
executeCsvScan [Char]
path Char
sep ScanConfig
cfg = do
    (Handle
handle, [(Int, Text, SchemaType)]
colSpec) <- Char -> Schema -> [Char] -> IO (Handle, [(Int, Text, SchemaType)])
LCSV.openCsvStream Char
sep (ScanConfig -> Schema
scanSchema ScanConfig
cfg) [Char]
path
    -- Queue carries raw batches; Nothing is the end-of-stream sentinel.
    -- Depth 2: each batch holds ~60 MB (1M Text + Double columns); 8 would be ~480 MB.
    TBQueue (Maybe DataFrame)
queue <- Natural -> IO (TBQueue (Maybe DataFrame))
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
2
    ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
        let loop :: ByteString -> IO ()
loop ByteString
lo = do
                Maybe (DataFrame, ByteString)
result <- Char
-> [(Int, Text, SchemaType)]
-> Int
-> ByteString
-> Handle
-> IO (Maybe (DataFrame, ByteString))
LCSV.readBatch Char
sep [(Int, Text, SchemaType)]
colSpec (ScanConfig -> Int
scanBatchSize ScanConfig
cfg) ByteString
lo Handle
handle
                case Maybe (DataFrame, ByteString)
result of
                    Maybe (DataFrame, ByteString)
Nothing ->
                        Handle -> IO ()
hClose Handle
handle IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> STM () -> IO ()
forall a. STM a -> IO a
atomically (TBQueue (Maybe DataFrame) -> Maybe DataFrame -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (Maybe DataFrame)
queue Maybe DataFrame
forall a. Maybe a
Nothing)
                    Just (DataFrame
df, ByteString
lo') ->
                        STM () -> IO ()
forall a. STM a -> IO a
atomically (TBQueue (Maybe DataFrame) -> Maybe DataFrame -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (Maybe DataFrame)
queue (DataFrame -> Maybe DataFrame
forall a. a -> Maybe a
Just DataFrame
df)) IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ByteString -> IO ()
loop ByteString
lo'
        ByteString -> IO ()
loop ByteString
BS.empty
    Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$
        ( do
            Maybe DataFrame
mb <- STM (Maybe DataFrame) -> IO (Maybe DataFrame)
forall a. STM a -> IO a
atomically (TBQueue (Maybe DataFrame) -> STM (Maybe DataFrame)
forall a. TBQueue a -> STM a
readTBQueue TBQueue (Maybe DataFrame)
queue)
            case Maybe DataFrame
mb of
                -- Re-insert the sentinel so repeated pulls after EOF stay Nothing.
                Maybe DataFrame
Nothing -> STM () -> IO ()
forall a. STM a -> IO a
atomically (TBQueue (Maybe DataFrame) -> Maybe DataFrame -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (Maybe DataFrame)
queue Maybe DataFrame
forall a. Maybe a
Nothing) IO () -> IO (Maybe DataFrame) -> IO (Maybe DataFrame)
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe DataFrame
forall a. Maybe a
Nothing
                Just DataFrame
df ->
                    let df' :: DataFrame
df' = case ScanConfig -> Maybe (Expr Bool)
scanPushdownPredicate ScanConfig
cfg of
                            Maybe (Expr Bool)
Nothing -> DataFrame
df
                            Just Expr Bool
p -> Expr Bool -> DataFrame -> DataFrame
Sub.filterWhere Expr Bool
p DataFrame
df
                     in Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DataFrame -> Maybe DataFrame
forall a. a -> Maybe a
Just DataFrame
df')
        )

-- ---------------------------------------------------------------------------
-- Join helper
-- ---------------------------------------------------------------------------

{- | Route join to the existing Operations.Join implementation.
When the left and right key names differ, rename the right key before joining.
-}
performJoin ::
    Join.JoinType -> T.Text -> T.Text -> D.DataFrame -> D.DataFrame -> D.DataFrame
performJoin :: JoinType -> Text -> Text -> DataFrame -> DataFrame -> DataFrame
performJoin JoinType
jt Text
leftKey Text
rightKey DataFrame
leftDf DataFrame
rightDf =
    if Text
leftKey Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Text
rightKey
        then JoinType -> [Text] -> DataFrame -> DataFrame -> DataFrame
Join.join JoinType
jt [Text
leftKey] DataFrame
rightDf DataFrame
leftDf
        else
            let rightRenamed :: DataFrame
rightRenamed = Text -> Text -> DataFrame -> DataFrame
Core.rename Text
rightKey Text
leftKey DataFrame
rightDf
             in JoinType -> [Text] -> DataFrame -> DataFrame -> DataFrame
Join.join JoinType
jt [Text
leftKey] DataFrame
rightRenamed DataFrame
leftDf

-- ---------------------------------------------------------------------------
-- Sort order conversion
-- ---------------------------------------------------------------------------

-- | Convert plan-level sort order to the Permutation module's SortOrder.
toPermSortOrder :: (T.Text, SortOrder) -> Perm.SortOrder
toPermSortOrder :: (Text, SortOrder) -> SortOrder
toPermSortOrder (Text
col, SortOrder
Ascending) = Expr Text -> SortOrder
forall a. Columnable a => Expr a -> SortOrder
Perm.Asc (forall a. Columnable a => Text -> Expr a
E.Col @T.Text Text
col)
toPermSortOrder (Text
col, SortOrder
Descending) = Expr Text -> SortOrder
forall a. Columnable a => Expr a -> SortOrder
Perm.Desc (forall a. Columnable a => Text -> Expr a
E.Col @T.Text Text
col)