{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE ExplicitNamespaces #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
module DataFrame.Lazy.Internal.Executor (
ExecutorConfig (..),
defaultExecutorConfig,
execute,
foldBatches,
) where
import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBQueue (newTBQueueIO, readTBQueue, writeTBQueue)
import Control.DeepSeq (force)
import Control.Exception (evaluate)
import Control.Monad (filterM, when)
import qualified Data.ByteString as BS
import Data.IORef
import qualified Data.Map as M
import qualified Data.Set as S
import qualified Data.Text as T
import Data.Type.Equality (TestEquality (testEquality), type (:~:) (Refl))
import qualified Data.Vector.Unboxed as VU
import qualified DataFrame.IO.Parquet as Parquet
import qualified DataFrame.Internal.Column as C
import qualified DataFrame.Internal.DataFrame as D
import qualified DataFrame.Internal.Expression as E
import DataFrame.Internal.Schema (elements)
import qualified DataFrame.Lazy.IO.Binary as Bin
import qualified DataFrame.Lazy.IO.CSV as LCSV
import DataFrame.Lazy.Internal.LogicalPlan (DataSource (..), SortOrder (..))
import DataFrame.Lazy.Internal.PhysicalPlan
import qualified DataFrame.Operations.Aggregation as Agg
import qualified DataFrame.Operations.Core as Core
import qualified DataFrame.Operations.Join as Join
import DataFrame.Operations.Merge ()
import qualified DataFrame.Operations.Permutation as Perm
import qualified DataFrame.Operations.Subset as Sub
import qualified DataFrame.Operations.Transformations as Trans
import System.Directory (doesDirectoryExist)
import System.FilePath ((</>))
import System.FilePath.Glob (glob)
import System.IO (hClose)
import Type.Reflection (typeRep)
data ExecutorConfig = ExecutorConfig
{ ExecutorConfig -> Int
memoryBudgetBytes :: !Int
, ExecutorConfig -> [Char]
spillDirectory :: FilePath
, ExecutorConfig -> Int
defaultBatchSize :: !Int
}
defaultExecutorConfig :: ExecutorConfig
defaultExecutorConfig :: ExecutorConfig
defaultExecutorConfig =
ExecutorConfig
{ memoryBudgetBytes :: Int
memoryBudgetBytes = Int
512 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1_048_576
, spillDirectory :: [Char]
spillDirectory = [Char]
"/tmp"
, defaultBatchSize :: Int
defaultBatchSize = Int
1_000_000
}
newtype Stream = Stream {Stream -> IO (Maybe DataFrame)
pullBatch :: IO (Maybe D.DataFrame)}
collectStream :: Stream -> IO D.DataFrame
collectStream :: Stream -> IO DataFrame
collectStream Stream
stream = DataFrame -> IO DataFrame
go DataFrame
D.empty
where
go :: DataFrame -> IO DataFrame
go DataFrame
acc = do
Maybe DataFrame
mb <- Stream -> IO (Maybe DataFrame)
pullBatch Stream
stream
case Maybe DataFrame
mb of
Maybe DataFrame
Nothing -> DataFrame -> IO DataFrame
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return DataFrame
acc
Just DataFrame
df -> DataFrame -> IO DataFrame
go (DataFrame
acc DataFrame -> DataFrame -> DataFrame
forall a. Semigroup a => a -> a -> a
<> DataFrame
df)
execute :: PhysicalPlan -> ExecutorConfig -> IO D.DataFrame
execute :: PhysicalPlan -> ExecutorConfig -> IO DataFrame
execute PhysicalPlan
plan ExecutorConfig
cfg = PhysicalPlan -> ExecutorConfig -> IO Stream
buildStream PhysicalPlan
plan ExecutorConfig
cfg IO Stream -> (Stream -> IO DataFrame) -> IO DataFrame
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Stream -> IO DataFrame
collectStream
foldBatches ::
(b -> D.DataFrame -> IO b) -> b -> PhysicalPlan -> ExecutorConfig -> IO b
foldBatches :: forall b.
(b -> DataFrame -> IO b)
-> b -> PhysicalPlan -> ExecutorConfig -> IO b
foldBatches b -> DataFrame -> IO b
f b
seed PhysicalPlan
plan ExecutorConfig
cfg = do
Stream
stream <- PhysicalPlan -> ExecutorConfig -> IO Stream
buildStream PhysicalPlan
plan ExecutorConfig
cfg
let loop :: b -> IO b
loop !b
acc = do
Maybe DataFrame
mb <- Stream -> IO (Maybe DataFrame)
pullBatch Stream
stream
case Maybe DataFrame
mb of
Maybe DataFrame
Nothing -> b -> IO b
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return b
acc
Just DataFrame
batch -> do
!b
acc' <- b -> DataFrame -> IO b
f b
acc DataFrame
batch
b -> IO b
loop b
acc'
b -> IO b
loop b
seed
buildStream :: PhysicalPlan -> ExecutorConfig -> IO Stream
buildStream :: PhysicalPlan -> ExecutorConfig -> IO Stream
buildStream (PhysicalScan (CsvSource [Char]
path Char
sep) ScanConfig
cfg) ExecutorConfig
_ =
[Char] -> Char -> ScanConfig -> IO Stream
executeCsvScan [Char]
path Char
sep ScanConfig
cfg
buildStream (PhysicalScan (ParquetSource [Char]
path) ScanConfig
cfg) ExecutorConfig
_ =
[Char] -> ScanConfig -> IO Stream
executeParquetScan [Char]
path ScanConfig
cfg
buildStream (PhysicalSpill PhysicalPlan
child [Char]
path) ExecutorConfig
execCfg = do
DataFrame
df <- PhysicalPlan -> ExecutorConfig -> IO DataFrame
execute PhysicalPlan
child ExecutorConfig
execCfg
[Char] -> DataFrame -> IO ()
Bin.spillToDisk [Char]
path DataFrame
df
DataFrame
df' <- [Char] -> IO DataFrame
Bin.readSpilled [Char]
path
IORef (Maybe DataFrame)
ref <- Maybe DataFrame -> IO (IORef (Maybe DataFrame))
forall a. a -> IO (IORef a)
newIORef (DataFrame -> Maybe DataFrame
forall a. a -> Maybe a
Just DataFrame
df')
Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$
( do
Maybe DataFrame
mb <- IORef (Maybe DataFrame) -> IO (Maybe DataFrame)
forall a. IORef a -> IO a
readIORef IORef (Maybe DataFrame)
ref
IORef (Maybe DataFrame) -> Maybe DataFrame -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe DataFrame)
ref Maybe DataFrame
forall a. Maybe a
Nothing
Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe DataFrame
mb
)
buildStream (PhysicalFilter Expr Bool
p PhysicalPlan
child) ExecutorConfig
execCfg = do
Stream
childStream <- PhysicalPlan -> ExecutorConfig -> IO Stream
buildStream PhysicalPlan
child ExecutorConfig
execCfg
Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$
( do
Maybe DataFrame
mb <- Stream -> IO (Maybe DataFrame)
pullBatch Stream
childStream
Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe DataFrame -> IO (Maybe DataFrame))
-> Maybe DataFrame -> IO (Maybe DataFrame)
forall a b. (a -> b) -> a -> b
$ (DataFrame -> DataFrame) -> Maybe DataFrame -> Maybe DataFrame
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Expr Bool -> DataFrame -> DataFrame
Sub.filterWhere Expr Bool
p) Maybe DataFrame
mb
)
buildStream (PhysicalProject [Text]
cols PhysicalPlan
child) ExecutorConfig
execCfg = do
Stream
childStream <- PhysicalPlan -> ExecutorConfig -> IO Stream
buildStream PhysicalPlan
child ExecutorConfig
execCfg
Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$
( do
Maybe DataFrame
mb <- Stream -> IO (Maybe DataFrame)
pullBatch Stream
childStream
Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe DataFrame -> IO (Maybe DataFrame))
-> Maybe DataFrame -> IO (Maybe DataFrame)
forall a b. (a -> b) -> a -> b
$ (DataFrame -> DataFrame) -> Maybe DataFrame -> Maybe DataFrame
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([Text] -> DataFrame -> DataFrame
Sub.select [Text]
cols) Maybe DataFrame
mb
)
buildStream (PhysicalDerive Text
name UExpr
uexpr PhysicalPlan
child) ExecutorConfig
execCfg = do
Stream
childStream <- PhysicalPlan -> ExecutorConfig -> IO Stream
buildStream PhysicalPlan
child ExecutorConfig
execCfg
Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$
( do
Maybe DataFrame
mb <- Stream -> IO (Maybe DataFrame)
pullBatch Stream
childStream
Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe DataFrame -> IO (Maybe DataFrame))
-> Maybe DataFrame -> IO (Maybe DataFrame)
forall a b. (a -> b) -> a -> b
$ (DataFrame -> DataFrame) -> Maybe DataFrame -> Maybe DataFrame
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([NamedExpr] -> DataFrame -> DataFrame
Trans.deriveMany [(Text
name, UExpr
uexpr)]) Maybe DataFrame
mb
)
buildStream (PhysicalLimit Int
n PhysicalPlan
child) ExecutorConfig
execCfg = do
Stream
childStream <- PhysicalPlan -> ExecutorConfig -> IO Stream
buildStream PhysicalPlan
child ExecutorConfig
execCfg
IORef Int
countRef <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef (Int
0 :: Int)
Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$
( do
Int
remaining <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
countRef
if Int
remaining Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n
then Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe DataFrame
forall a. Maybe a
Nothing
else do
Maybe DataFrame
mb <- Stream -> IO (Maybe DataFrame)
pullBatch Stream
childStream
case Maybe DataFrame
mb of
Maybe DataFrame
Nothing -> Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe DataFrame
forall a. Maybe a
Nothing
Just DataFrame
df -> do
let toTake :: Int
toTake = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min (DataFrame -> Int
Core.nRows DataFrame
df) (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
remaining)
IORef Int -> (Int -> Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef Int
countRef (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
toTake)
Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe DataFrame -> IO (Maybe DataFrame))
-> Maybe DataFrame -> IO (Maybe DataFrame)
forall a b. (a -> b) -> a -> b
$ DataFrame -> Maybe DataFrame
forall a. a -> Maybe a
Just (Int -> DataFrame -> DataFrame
Sub.take Int
toTake DataFrame
df)
)
buildStream (PhysicalSort [(Text, SortOrder)]
cols PhysicalPlan
child) ExecutorConfig
execCfg = do
DataFrame
df <- PhysicalPlan -> ExecutorConfig -> IO DataFrame
execute PhysicalPlan
child ExecutorConfig
execCfg
let sortOrds :: [SortOrder]
sortOrds = ((Text, SortOrder) -> SortOrder)
-> [(Text, SortOrder)] -> [SortOrder]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Text, SortOrder) -> SortOrder
toPermSortOrder [(Text, SortOrder)]
cols
let sorted :: DataFrame
sorted = [SortOrder] -> DataFrame -> DataFrame
Perm.sortBy [SortOrder]
sortOrds DataFrame
df
IORef (Maybe DataFrame)
ref <- Maybe DataFrame -> IO (IORef (Maybe DataFrame))
forall a. a -> IO (IORef a)
newIORef (DataFrame -> Maybe DataFrame
forall a. a -> Maybe a
Just DataFrame
sorted)
Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$
( do
Maybe DataFrame
mb <- IORef (Maybe DataFrame) -> IO (Maybe DataFrame)
forall a. IORef a -> IO a
readIORef IORef (Maybe DataFrame)
ref
IORef (Maybe DataFrame) -> Maybe DataFrame -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe DataFrame)
ref Maybe DataFrame
forall a. Maybe a
Nothing
Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe DataFrame
mb
)
buildStream (PhysicalHashAggregate [Text]
keys [NamedExpr]
aggs PhysicalPlan
child) ExecutorConfig
execCfg = do
Stream
childStream <- PhysicalPlan -> ExecutorConfig -> IO Stream
buildStream PhysicalPlan
child ExecutorConfig
execCfg
if (NamedExpr -> Bool) -> [NamedExpr] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all (UExpr -> Bool
isStreamableAgg (UExpr -> Bool) -> (NamedExpr -> UExpr) -> NamedExpr -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NamedExpr -> UExpr
forall a b. (a, b) -> b
snd) [NamedExpr]
aggs
then do
let ([NamedExpr]
partialAggs, [NamedExpr]
mergeAggs, DataFrame -> DataFrame
finalizer) = [NamedExpr] -> ([NamedExpr], [NamedExpr], DataFrame -> DataFrame)
buildAggPlan [NamedExpr]
aggs
IORef (Maybe DataFrame)
accRef <- Maybe DataFrame -> IO (IORef (Maybe DataFrame))
forall a. a -> IO (IORef a)
newIORef (Maybe DataFrame
forall a. Maybe a
Nothing :: Maybe D.DataFrame)
let loop :: IO ()
loop = do
Maybe DataFrame
mb <- Stream -> IO (Maybe DataFrame)
pullBatch Stream
childStream
case Maybe DataFrame
mb of
Maybe DataFrame
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just DataFrame
batch -> do
!DataFrame
partial <-
DataFrame -> IO DataFrame
forall a. a -> IO a
evaluate (DataFrame -> IO DataFrame)
-> (DataFrame -> DataFrame) -> DataFrame -> IO DataFrame
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DataFrame -> DataFrame
forall a. NFData a => a -> a
force (DataFrame -> IO DataFrame) -> DataFrame -> IO DataFrame
forall a b. (a -> b) -> a -> b
$ [NamedExpr] -> GroupedDataFrame -> DataFrame
Agg.aggregate [NamedExpr]
partialAggs ([Text] -> DataFrame -> GroupedDataFrame
Agg.groupBy [Text]
keys DataFrame
batch)
Maybe DataFrame
mAcc <- IORef (Maybe DataFrame) -> IO (Maybe DataFrame)
forall a. IORef a -> IO a
readIORef IORef (Maybe DataFrame)
accRef
!DataFrame
newAcc <- case Maybe DataFrame
mAcc of
Maybe DataFrame
Nothing -> DataFrame -> IO DataFrame
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return DataFrame
partial
Just DataFrame
acc ->
DataFrame -> IO DataFrame
forall a. a -> IO a
evaluate (DataFrame -> IO DataFrame)
-> (DataFrame -> DataFrame) -> DataFrame -> IO DataFrame
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DataFrame -> DataFrame
forall a. NFData a => a -> a
force (DataFrame -> IO DataFrame) -> DataFrame -> IO DataFrame
forall a b. (a -> b) -> a -> b
$
[NamedExpr] -> GroupedDataFrame -> DataFrame
Agg.aggregate [NamedExpr]
mergeAggs (GroupedDataFrame -> DataFrame) -> GroupedDataFrame -> DataFrame
forall a b. (a -> b) -> a -> b
$
[Text] -> DataFrame -> GroupedDataFrame
Agg.groupBy [Text]
keys (DataFrame
acc DataFrame -> DataFrame -> DataFrame
forall a. Semigroup a => a -> a -> a
<> DataFrame
partial)
IORef (Maybe DataFrame) -> Maybe DataFrame -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe DataFrame)
accRef (DataFrame -> Maybe DataFrame
forall a. a -> Maybe a
Just DataFrame
newAcc)
IO ()
loop
IO ()
loop
Maybe DataFrame
mFinal <- (Maybe DataFrame -> Maybe DataFrame)
-> IO (Maybe DataFrame) -> IO (Maybe DataFrame)
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((DataFrame -> DataFrame) -> Maybe DataFrame -> Maybe DataFrame
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap DataFrame -> DataFrame
finalizer) (IORef (Maybe DataFrame) -> IO (Maybe DataFrame)
forall a. IORef a -> IO a
readIORef IORef (Maybe DataFrame)
accRef)
IORef (Maybe DataFrame)
ref <- Maybe DataFrame -> IO (IORef (Maybe DataFrame))
forall a. a -> IO (IORef a)
newIORef Maybe DataFrame
mFinal
Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$ do
Maybe DataFrame
mb <- IORef (Maybe DataFrame) -> IO (Maybe DataFrame)
forall a. IORef a -> IO a
readIORef IORef (Maybe DataFrame)
ref
IORef (Maybe DataFrame) -> Maybe DataFrame -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe DataFrame)
ref Maybe DataFrame
forall a. Maybe a
Nothing
Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe DataFrame
mb
else do
DataFrame
df <- Stream -> IO DataFrame
collectStream Stream
childStream
let result :: DataFrame
result = [NamedExpr] -> GroupedDataFrame -> DataFrame
Agg.aggregate [NamedExpr]
aggs ([Text] -> DataFrame -> GroupedDataFrame
Agg.groupBy [Text]
keys DataFrame
df)
IORef (Maybe DataFrame)
ref <- Maybe DataFrame -> IO (IORef (Maybe DataFrame))
forall a. a -> IO (IORef a)
newIORef (DataFrame -> Maybe DataFrame
forall a. a -> Maybe a
Just DataFrame
result)
Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$ do
Maybe DataFrame
mb <- IORef (Maybe DataFrame) -> IO (Maybe DataFrame)
forall a. IORef a -> IO a
readIORef IORef (Maybe DataFrame)
ref
IORef (Maybe DataFrame) -> Maybe DataFrame -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe DataFrame)
ref Maybe DataFrame
forall a. Maybe a
Nothing
Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe DataFrame
mb
buildStream (PhysicalSourceDF DataFrame
df) ExecutorConfig
execCfg = do
let bs :: Int
bs = ExecutorConfig -> Int
defaultBatchSize ExecutorConfig
execCfg
total :: Int
total = DataFrame -> Int
Core.nRows DataFrame
df
IORef Int
posRef <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef (Int
0 :: Int)
Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$ do
Int
i <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
posRef
if Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
total
then Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe DataFrame
forall a. Maybe a
Nothing
else do
let n :: Int
n = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
bs (Int
total Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
i)
batch :: DataFrame
batch = (Int, Int) -> DataFrame -> DataFrame
Sub.range (Int
i, Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
n) DataFrame
df
IORef Int -> Int -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Int
posRef (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
n)
Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DataFrame -> Maybe DataFrame
forall a. a -> Maybe a
Just DataFrame
batch)
buildStream (PhysicalHashJoin JoinType
jt Text
leftKey Text
rightKey PhysicalPlan
leftPlan PhysicalPlan
rightPlan) ExecutorConfig
execCfg =
case JoinType
jt of
JoinType
Join.INNER -> (Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame)
-> IO Stream
streamingHashJoin Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame
assembleInnerBatch
JoinType
Join.LEFT -> (Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame)
-> IO Stream
streamingHashJoin Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame
assembleLeftBatch
JoinType
_ -> do
DataFrame
leftDf <- PhysicalPlan -> ExecutorConfig -> IO DataFrame
execute PhysicalPlan
leftPlan ExecutorConfig
execCfg
DataFrame
rightDf <- PhysicalPlan -> ExecutorConfig -> IO DataFrame
execute PhysicalPlan
rightPlan ExecutorConfig
execCfg
let result :: DataFrame
result = JoinType -> Text -> Text -> DataFrame -> DataFrame -> DataFrame
performJoin JoinType
jt Text
leftKey Text
rightKey DataFrame
leftDf DataFrame
rightDf
IORef (Maybe DataFrame)
ref <- Maybe DataFrame -> IO (IORef (Maybe DataFrame))
forall a. a -> IO (IORef a)
newIORef (DataFrame -> Maybe DataFrame
forall a. a -> Maybe a
Just DataFrame
result)
Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$ do
Maybe DataFrame
mb <- IORef (Maybe DataFrame) -> IO (Maybe DataFrame)
forall a. IORef a -> IO a
readIORef IORef (Maybe DataFrame)
ref
IORef (Maybe DataFrame) -> Maybe DataFrame -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe DataFrame)
ref Maybe DataFrame
forall a. Maybe a
Nothing
Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe DataFrame
mb
where
streamingHashJoin :: (Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame)
-> IO Stream
streamingHashJoin Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame
assembleFn = do
DataFrame
rightDf <- PhysicalPlan -> ExecutorConfig -> IO DataFrame
execute PhysicalPlan
rightPlan ExecutorConfig
execCfg
let rightDf' :: DataFrame
rightDf' =
if Text
leftKey Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Text
rightKey
then DataFrame
rightDf
else Text -> Text -> DataFrame -> DataFrame
Core.rename Text
rightKey Text
leftKey DataFrame
rightDf
joinKey :: Text
joinKey = Text
leftKey
csSet :: Set Text
csSet = [Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList [Text
joinKey]
rightHashes :: Vector Int
rightHashes = [Text] -> DataFrame -> Vector Int
Join.buildHashColumn [Text
joinKey] DataFrame
rightDf'
ci :: CompactIndex
ci = Vector Int -> CompactIndex
Join.buildCompactIndex Vector Int
rightHashes
Stream
leftStream <- PhysicalPlan -> ExecutorConfig -> IO Stream
buildStream PhysicalPlan
leftPlan ExecutorConfig
execCfg
Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$ do
Maybe DataFrame
mBatch <- Stream -> IO (Maybe DataFrame)
pullBatch Stream
leftStream
case Maybe DataFrame
mBatch of
Maybe DataFrame
Nothing -> Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe DataFrame
forall a. Maybe a
Nothing
Just DataFrame
probeBatch -> do
let probeHashes :: Vector Int
probeHashes = [Text] -> DataFrame -> Vector Int
Join.buildHashColumn [Text
joinKey] DataFrame
probeBatch
(Vector Int
probeIxs, Vector Int
buildIxs) = CompactIndex -> Vector Int -> (Vector Int, Vector Int)
Join.hashProbeKernel CompactIndex
ci Vector Int
probeHashes
Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe DataFrame -> IO (Maybe DataFrame))
-> (DataFrame -> Maybe DataFrame)
-> DataFrame
-> IO (Maybe DataFrame)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DataFrame -> Maybe DataFrame
forall a. a -> Maybe a
Just (DataFrame -> IO (Maybe DataFrame))
-> DataFrame -> IO (Maybe DataFrame)
forall a b. (a -> b) -> a -> b
$ Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame
assembleFn Set Text
csSet DataFrame
probeBatch DataFrame
rightDf' Vector Int
probeIxs Vector Int
buildIxs
assembleLeftBatch :: Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame
assembleLeftBatch Set Text
csSet DataFrame
probeBatch DataFrame
rightDf' Vector Int
probeIxs Vector Int
buildIxs =
let batchN :: Int
batchN = DataFrame -> Int
Core.nRows DataFrame
probeBatch
matched :: Vector Bool
matched =
(Bool -> Bool -> Bool)
-> Vector Bool -> Vector (Int, Bool) -> Vector Bool
forall a b.
(Unbox a, Unbox b) =>
(a -> b -> a) -> Vector a -> Vector (Int, b) -> Vector a
VU.accumulate
(\Bool
_ Bool
b -> Bool
b)
(Int -> Bool -> Vector Bool
forall a. Unbox a => Int -> a -> Vector a
VU.replicate Int
batchN Bool
False)
((Int -> (Int, Bool)) -> Vector Int -> Vector (Int, Bool)
forall a b. (Unbox a, Unbox b) => (a -> b) -> Vector a -> Vector b
VU.map (,Bool
True) Vector Int
probeIxs)
unmatchedIxs :: Vector Int
unmatchedIxs = (Bool -> Bool) -> Vector Bool -> Vector Int
forall a. Unbox a => (a -> Bool) -> Vector a -> Vector Int
VU.findIndices Bool -> Bool
not Vector Bool
matched
allProbeIxs :: Vector Int
allProbeIxs = Vector Int
probeIxs Vector Int -> Vector Int -> Vector Int
forall a. Unbox a => Vector a -> Vector a -> Vector a
VU.++ Vector Int
unmatchedIxs
allBuildIxs :: Vector Int
allBuildIxs = Vector Int
buildIxs Vector Int -> Vector Int -> Vector Int
forall a. Unbox a => Vector a -> Vector a -> Vector a
VU.++ Int -> Int -> Vector Int
forall a. Unbox a => Int -> a -> Vector a
VU.replicate (Vector Int -> Int
forall a. Unbox a => Vector a -> Int
VU.length Vector Int
unmatchedIxs) (-Int
1)
in Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame
Join.assembleLeft Set Text
csSet DataFrame
probeBatch DataFrame
rightDf' Vector Int
allProbeIxs Vector Int
allBuildIxs
assembleInnerBatch :: Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame
assembleInnerBatch = Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame
Join.assembleInner
buildStream (PhysicalSortMergeJoin JoinType
jt Text
leftKey Text
rightKey PhysicalPlan
leftPlan PhysicalPlan
rightPlan) ExecutorConfig
execCfg = do
DataFrame
leftDf <- PhysicalPlan -> ExecutorConfig -> IO DataFrame
execute PhysicalPlan
leftPlan ExecutorConfig
execCfg
DataFrame
rightDf <- PhysicalPlan -> ExecutorConfig -> IO DataFrame
execute PhysicalPlan
rightPlan ExecutorConfig
execCfg
let result :: DataFrame
result = JoinType -> Text -> Text -> DataFrame -> DataFrame -> DataFrame
performJoin JoinType
jt Text
leftKey Text
rightKey DataFrame
leftDf DataFrame
rightDf
IORef (Maybe DataFrame)
ref <- Maybe DataFrame -> IO (IORef (Maybe DataFrame))
forall a. a -> IO (IORef a)
newIORef (DataFrame -> Maybe DataFrame
forall a. a -> Maybe a
Just DataFrame
result)
Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$
( do
Maybe DataFrame
mb <- IORef (Maybe DataFrame) -> IO (Maybe DataFrame)
forall a. IORef a -> IO a
readIORef IORef (Maybe DataFrame)
ref
IORef (Maybe DataFrame) -> Maybe DataFrame -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe DataFrame)
ref Maybe DataFrame
forall a. Maybe a
Nothing
Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe DataFrame
mb
)
isStreamableAgg :: E.UExpr -> Bool
isStreamableAgg :: UExpr -> Bool
isStreamableAgg (E.UExpr (E.Agg (E.CollectAgg Text
_ v b -> a
_) Expr b
_)) = Bool
False
isStreamableAgg (E.UExpr (E.Agg (E.FoldAgg Text
_ Maybe a
Nothing (a -> b -> a
_ :: a -> b -> a)) Expr b
_)) =
case TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @a) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @b) of
Just a :~: b
Refl -> Bool
True
Maybe (a :~: b)
Nothing -> Bool
False
isStreamableAgg (E.UExpr (E.Agg (E.FoldAgg Text
_ (Just a
_) (a -> b -> a
_ :: a -> b -> a)) Expr b
_)) =
case TypeRep a -> TypeRep Int -> Maybe (a :~: Int)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @a) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @Int) of
Just a :~: Int
Refl -> Bool
True
Maybe (a :~: Int)
Nothing ->
case TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @a) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @b) of
Just a :~: b
Refl -> Bool
True
Maybe (a :~: b)
Nothing -> Bool
False
isStreamableAgg (E.UExpr (E.Agg (E.MergeAgg{}) Expr b
_)) = Bool
True
isStreamableAgg UExpr
_ = Bool
False
buildAggPlan ::
[(T.Text, E.UExpr)] ->
( [(T.Text, E.UExpr)]
, [(T.Text, E.UExpr)]
, D.DataFrame -> D.DataFrame
)
buildAggPlan :: [NamedExpr] -> ([NamedExpr], [NamedExpr], DataFrame -> DataFrame)
buildAggPlan [NamedExpr]
aggs = (([NamedExpr], [NamedExpr], DataFrame -> DataFrame)
-> ([NamedExpr], [NamedExpr], DataFrame -> DataFrame)
-> ([NamedExpr], [NamedExpr], DataFrame -> DataFrame))
-> ([NamedExpr], [NamedExpr], DataFrame -> DataFrame)
-> [([NamedExpr], [NamedExpr], DataFrame -> DataFrame)]
-> ([NamedExpr], [NamedExpr], DataFrame -> DataFrame)
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl ([NamedExpr], [NamedExpr], DataFrame -> DataFrame)
-> ([NamedExpr], [NamedExpr], DataFrame -> DataFrame)
-> ([NamedExpr], [NamedExpr], DataFrame -> DataFrame)
forall {a} {a} {b} {c} {a}.
([a], [a], b -> c) -> ([a], [a], a -> b) -> ([a], [a], a -> c)
combine ([], [], DataFrame -> DataFrame
forall a. a -> a
id) ((NamedExpr -> ([NamedExpr], [NamedExpr], DataFrame -> DataFrame))
-> [NamedExpr]
-> [([NamedExpr], [NamedExpr], DataFrame -> DataFrame)]
forall a b. (a -> b) -> [a] -> [b]
map NamedExpr -> ([NamedExpr], [NamedExpr], DataFrame -> DataFrame)
processAgg [NamedExpr]
aggs)
where
combine :: ([a], [a], b -> c) -> ([a], [a], a -> b) -> ([a], [a], a -> c)
combine ([a]
p1, [a]
m1, b -> c
f1) ([a]
p2, [a]
m2, a -> b
f2) = ([a]
p1 [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
++ [a]
p2, [a]
m1 [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
++ [a]
m2, b -> c
f1 (b -> c) -> (a -> b) -> a -> c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
f2)
processAgg ::
(T.Text, E.UExpr) ->
([(T.Text, E.UExpr)], [(T.Text, E.UExpr)], D.DataFrame -> D.DataFrame)
processAgg :: NamedExpr -> ([NamedExpr], [NamedExpr], DataFrame -> DataFrame)
processAgg (Text
name, UExpr
ue) = case UExpr
ue of
E.UExpr (E.Agg (E.FoldAgg Text
n Maybe a
Nothing (a -> b -> a
f :: a -> b -> a)) (Expr b
_ :: E.Expr b)) ->
case TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @a) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @b) of
Just a :~: b
Refl ->
( [(Text
name, UExpr
ue)]
, [(Text
name, Expr a -> UExpr
forall a. Columnable a => Expr a -> UExpr
E.UExpr (AggStrategy a b -> Expr b -> Expr a
forall a b.
(Columnable a, Columnable b) =>
AggStrategy a b -> Expr b -> Expr a
E.Agg (Text -> Maybe a -> (a -> b -> a) -> AggStrategy a b
forall a b. Text -> Maybe a -> (a -> b -> a) -> AggStrategy a b
E.FoldAgg Text
n Maybe a
forall a. Maybe a
Nothing a -> b -> a
f) (forall a. Columnable a => Text -> Expr a
E.Col @a Text
name)))]
, DataFrame -> DataFrame
forall a. a -> a
id
)
Maybe (a :~: b)
Nothing ->
case TypeRep a -> TypeRep Int -> Maybe (a :~: Int)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @a) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @Int) of
Just a :~: Int
Refl ->
( [(Text
name, UExpr
ue)]
,
[
( Text
name
, Expr Int -> UExpr
forall a. Columnable a => Expr a -> UExpr
E.UExpr
(AggStrategy Int Int -> Expr Int -> Expr Int
forall a b.
(Columnable a, Columnable b) =>
AggStrategy a b -> Expr b -> Expr a
E.Agg (Text -> Maybe Int -> (Int -> Int -> Int) -> AggStrategy Int Int
forall a b. Text -> Maybe a -> (a -> b -> a) -> AggStrategy a b
E.FoldAgg Text
"sum" Maybe Int
forall a. Maybe a
Nothing (Int -> Int -> Int
forall a. Num a => a -> a -> a
(+) :: Int -> Int -> Int)) (forall a. Columnable a => Text -> Expr a
E.Col @Int Text
name))
)
]
, DataFrame -> DataFrame
forall a. a -> a
id
)
Maybe (a :~: Int)
Nothing -> ([(Text
name, UExpr
ue)], [(Text
name, UExpr
ue)], DataFrame -> DataFrame
forall a. a -> a
id)
E.UExpr (E.Agg (E.FoldAgg Text
n (Just a
_) (a -> b -> a
f :: a -> b -> a)) (Expr b
_ :: E.Expr b)) ->
case TypeRep a -> TypeRep Int -> Maybe (a :~: Int)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @a) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @Int) of
Just a :~: Int
Refl ->
( [(Text
name, UExpr
ue)]
,
[
( Text
name
, Expr Int -> UExpr
forall a. Columnable a => Expr a -> UExpr
E.UExpr
(AggStrategy Int Int -> Expr Int -> Expr Int
forall a b.
(Columnable a, Columnable b) =>
AggStrategy a b -> Expr b -> Expr a
E.Agg (Text -> Maybe Int -> (Int -> Int -> Int) -> AggStrategy Int Int
forall a b. Text -> Maybe a -> (a -> b -> a) -> AggStrategy a b
E.FoldAgg Text
"sum" Maybe Int
forall a. Maybe a
Nothing (Int -> Int -> Int
forall a. Num a => a -> a -> a
(+) :: Int -> Int -> Int)) (forall a. Columnable a => Text -> Expr a
E.Col @Int Text
name))
)
]
, DataFrame -> DataFrame
forall a. a -> a
id
)
Maybe (a :~: Int)
Nothing ->
case TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @a) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @b) of
Just a :~: b
Refl ->
( [(Text
name, UExpr
ue)]
, [(Text
name, Expr a -> UExpr
forall a. Columnable a => Expr a -> UExpr
E.UExpr (AggStrategy a b -> Expr b -> Expr a
forall a b.
(Columnable a, Columnable b) =>
AggStrategy a b -> Expr b -> Expr a
E.Agg (Text -> Maybe a -> (a -> b -> a) -> AggStrategy a b
forall a b. Text -> Maybe a -> (a -> b -> a) -> AggStrategy a b
E.FoldAgg Text
n Maybe a
forall a. Maybe a
Nothing a -> b -> a
f) (forall a. Columnable a => Text -> Expr a
E.Col @a Text
name)))]
, DataFrame -> DataFrame
forall a. a -> a
id
)
Maybe (a :~: b)
Nothing -> ([(Text
name, UExpr
ue)], [(Text
name, UExpr
ue)], DataFrame -> DataFrame
forall a. a -> a
id)
E.UExpr
( E.Agg
( E.MergeAgg
Text
n
acc
seed
(acc -> b -> acc
step :: acc -> b -> acc)
(acc -> acc -> acc
merge :: acc -> acc -> acc)
(acc -> a
fin :: acc -> a)
)
(Expr b
inner :: E.Expr b)
) ->
let partialExpr :: UExpr
partialExpr =
Expr acc -> UExpr
forall a. Columnable a => Expr a -> UExpr
E.UExpr
( AggStrategy acc b -> Expr b -> Expr acc
forall a b.
(Columnable a, Columnable b) =>
AggStrategy a b -> Expr b -> Expr a
E.Agg
(Text
-> acc
-> (acc -> b -> acc)
-> (acc -> acc -> acc)
-> (acc -> acc)
-> AggStrategy acc b
forall acc b a.
Columnable acc =>
Text
-> acc
-> (acc -> b -> acc)
-> (acc -> acc -> acc)
-> (acc -> a)
-> AggStrategy a b
E.MergeAgg Text
n acc
seed acc -> b -> acc
step acc -> acc -> acc
merge (acc -> acc
forall a. a -> a
id :: acc -> acc))
Expr b
inner
)
mergeExpr :: UExpr
mergeExpr =
Expr acc -> UExpr
forall a. Columnable a => Expr a -> UExpr
E.UExpr
( AggStrategy acc acc -> Expr acc -> Expr acc
forall a b.
(Columnable a, Columnable b) =>
AggStrategy a b -> Expr b -> Expr a
E.Agg
(Text -> Maybe acc -> (acc -> acc -> acc) -> AggStrategy acc acc
forall a b. Text -> Maybe a -> (a -> b -> a) -> AggStrategy a b
E.FoldAgg (Text
"merge_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
n) Maybe acc
forall a. Maybe a
Nothing acc -> acc -> acc
merge)
(forall a. Columnable a => Text -> Expr a
E.Col @acc Text
name)
)
finalize :: DataFrame -> DataFrame
finalize DataFrame
df =
let accCol :: Column
accCol = Text -> DataFrame -> Column
D.unsafeGetColumn Text
name DataFrame
df
finalCol :: Column
finalCol =
(DataFrameException -> Column)
-> (Column -> Column) -> Either DataFrameException Column -> Column
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
([Char] -> DataFrameException -> Column
forall a. HasCallStack => [Char] -> a
error [Char]
"buildAggPlan: MergeAgg finalize failed")
Column -> Column
forall a. a -> a
id
(forall b c.
(Columnable b, Columnable c) =>
(b -> c) -> Column -> Either DataFrameException Column
C.mapColumn @acc @a acc -> a
fin Column
accCol)
in Text -> Column -> DataFrame -> DataFrame
Core.insertColumn Text
name Column
finalCol DataFrame
df
in ( [(Text
name, UExpr
partialExpr)]
, [(Text
name, UExpr
mergeExpr)]
, DataFrame -> DataFrame
finalize
)
UExpr
_ -> ([(Text
name, UExpr
ue)], [(Text
name, UExpr
ue)], DataFrame -> DataFrame
forall a. a -> a
id)
executeParquetScan :: FilePath -> ScanConfig -> IO Stream
executeParquetScan :: [Char] -> ScanConfig -> IO Stream
executeParquetScan [Char]
path ScanConfig
cfg = do
Bool
isDir <- [Char] -> IO Bool
doesDirectoryExist [Char]
path
let pat :: [Char]
pat = if Bool
isDir then [Char]
path [Char] -> [Char] -> [Char]
</> [Char]
"*" else [Char]
path
[[Char]]
matches <- [Char] -> IO [[Char]]
glob [Char]
pat
[[Char]]
files <- ([Char] -> IO Bool) -> [[Char]] -> IO [[Char]]
forall (m :: * -> *) a.
Applicative m =>
(a -> m Bool) -> [a] -> m [a]
filterM ((Bool -> Bool) -> IO Bool -> IO Bool
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Bool -> Bool
not (IO Bool -> IO Bool) -> ([Char] -> IO Bool) -> [Char] -> IO Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Char] -> IO Bool
doesDirectoryExist) [[Char]]
matches
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ([[Char]] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [[Char]]
files) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
[Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error ([Char]
"executeParquetScan: no parquet files found for " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
path)
let opts :: ParquetReadOptions
opts =
ParquetReadOptions
Parquet.defaultParquetReadOptions
{ Parquet.selectedColumns = Just (M.keys (elements (scanSchema cfg)))
, Parquet.predicate = scanPushdownPredicate cfg
}
IORef [[Char]]
ref <- [[Char]] -> IO (IORef [[Char]])
forall a. a -> IO (IORef a)
newIORef [[Char]]
files
Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$ do
[[Char]]
fs <- IORef [[Char]] -> IO [[Char]]
forall a. IORef a -> IO a
readIORef IORef [[Char]]
ref
case [[Char]]
fs of
[] -> Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe DataFrame
forall a. Maybe a
Nothing
([Char]
f : [[Char]]
rest) -> do
IORef [[Char]] -> [[Char]] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef [[Char]]
ref [[Char]]
rest
DataFrame -> Maybe DataFrame
forall a. a -> Maybe a
Just (DataFrame -> Maybe DataFrame)
-> IO DataFrame -> IO (Maybe DataFrame)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ParquetReadOptions -> [Char] -> IO DataFrame
Parquet.readParquetWithOpts ParquetReadOptions
opts [Char]
f
executeCsvScan :: FilePath -> Char -> ScanConfig -> IO Stream
executeCsvScan :: [Char] -> Char -> ScanConfig -> IO Stream
executeCsvScan [Char]
path Char
sep ScanConfig
cfg = do
(Handle
handle, [(Int, Text, SchemaType)]
colSpec) <- Char -> Schema -> [Char] -> IO (Handle, [(Int, Text, SchemaType)])
LCSV.openCsvStream Char
sep (ScanConfig -> Schema
scanSchema ScanConfig
cfg) [Char]
path
TBQueue (Maybe DataFrame)
queue <- Natural -> IO (TBQueue (Maybe DataFrame))
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
2
ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
let loop :: ByteString -> IO ()
loop ByteString
lo = do
Maybe (DataFrame, ByteString)
result <- Char
-> [(Int, Text, SchemaType)]
-> Int
-> ByteString
-> Handle
-> IO (Maybe (DataFrame, ByteString))
LCSV.readBatch Char
sep [(Int, Text, SchemaType)]
colSpec (ScanConfig -> Int
scanBatchSize ScanConfig
cfg) ByteString
lo Handle
handle
case Maybe (DataFrame, ByteString)
result of
Maybe (DataFrame, ByteString)
Nothing ->
Handle -> IO ()
hClose Handle
handle IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> STM () -> IO ()
forall a. STM a -> IO a
atomically (TBQueue (Maybe DataFrame) -> Maybe DataFrame -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (Maybe DataFrame)
queue Maybe DataFrame
forall a. Maybe a
Nothing)
Just (DataFrame
df, ByteString
lo') ->
STM () -> IO ()
forall a. STM a -> IO a
atomically (TBQueue (Maybe DataFrame) -> Maybe DataFrame -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (Maybe DataFrame)
queue (DataFrame -> Maybe DataFrame
forall a. a -> Maybe a
Just DataFrame
df)) IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ByteString -> IO ()
loop ByteString
lo'
ByteString -> IO ()
loop ByteString
BS.empty
Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> IO Stream)
-> (IO (Maybe DataFrame) -> Stream)
-> IO (Maybe DataFrame)
-> IO Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe DataFrame) -> Stream
Stream (IO (Maybe DataFrame) -> IO Stream)
-> IO (Maybe DataFrame) -> IO Stream
forall a b. (a -> b) -> a -> b
$
( do
Maybe DataFrame
mb <- STM (Maybe DataFrame) -> IO (Maybe DataFrame)
forall a. STM a -> IO a
atomically (TBQueue (Maybe DataFrame) -> STM (Maybe DataFrame)
forall a. TBQueue a -> STM a
readTBQueue TBQueue (Maybe DataFrame)
queue)
case Maybe DataFrame
mb of
Maybe DataFrame
Nothing -> STM () -> IO ()
forall a. STM a -> IO a
atomically (TBQueue (Maybe DataFrame) -> Maybe DataFrame -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (Maybe DataFrame)
queue Maybe DataFrame
forall a. Maybe a
Nothing) IO () -> IO (Maybe DataFrame) -> IO (Maybe DataFrame)
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe DataFrame
forall a. Maybe a
Nothing
Just DataFrame
df ->
let df' :: DataFrame
df' = case ScanConfig -> Maybe (Expr Bool)
scanPushdownPredicate ScanConfig
cfg of
Maybe (Expr Bool)
Nothing -> DataFrame
df
Just Expr Bool
p -> Expr Bool -> DataFrame -> DataFrame
Sub.filterWhere Expr Bool
p DataFrame
df
in Maybe DataFrame -> IO (Maybe DataFrame)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DataFrame -> Maybe DataFrame
forall a. a -> Maybe a
Just DataFrame
df')
)
performJoin ::
Join.JoinType -> T.Text -> T.Text -> D.DataFrame -> D.DataFrame -> D.DataFrame
performJoin :: JoinType -> Text -> Text -> DataFrame -> DataFrame -> DataFrame
performJoin JoinType
jt Text
leftKey Text
rightKey DataFrame
leftDf DataFrame
rightDf =
if Text
leftKey Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Text
rightKey
then JoinType -> [Text] -> DataFrame -> DataFrame -> DataFrame
Join.join JoinType
jt [Text
leftKey] DataFrame
rightDf DataFrame
leftDf
else
let rightRenamed :: DataFrame
rightRenamed = Text -> Text -> DataFrame -> DataFrame
Core.rename Text
rightKey Text
leftKey DataFrame
rightDf
in JoinType -> [Text] -> DataFrame -> DataFrame -> DataFrame
Join.join JoinType
jt [Text
leftKey] DataFrame
rightRenamed DataFrame
leftDf
toPermSortOrder :: (T.Text, SortOrder) -> Perm.SortOrder
toPermSortOrder :: (Text, SortOrder) -> SortOrder
toPermSortOrder (Text
col, SortOrder
Ascending) = Expr Text -> SortOrder
forall a. Columnable a => Expr a -> SortOrder
Perm.Asc (forall a. Columnable a => Text -> Expr a
E.Col @T.Text Text
col)
toPermSortOrder (Text
col, SortOrder
Descending) = Expr Text -> SortOrder
forall a. Columnable a => Expr a -> SortOrder
Perm.Desc (forall a. Columnable a => Text -> Expr a
E.Col @T.Text Text
col)