{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}

module DataFrame.Lazy.Internal.DataFrame where

import Control.Monad (foldM)
import qualified Data.List as L
import qualified Data.Text as T
import qualified DataFrame.Internal.Column as C
import qualified DataFrame.Internal.DataFrame as D
import qualified DataFrame.Internal.Expression as E
import qualified DataFrame.Lazy.IO.CSV as D
import DataFrame.Operations.Merge ()
import qualified DataFrame.Operations.Subset as D
import qualified DataFrame.Operations.Transformations as D

data LazyOperation where
    Derive :: (C.Columnable a) => T.Text -> E.Expr a -> LazyOperation
    Select :: [T.Text] -> LazyOperation
    Filter :: E.Expr Bool -> LazyOperation

instance Show LazyOperation where
    show :: LazyOperation -> String
    show :: LazyOperation -> String
show (Derive Text
name Expr a
expr) = Text -> String
T.unpack Text
name String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" := " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Expr a -> String
forall a. Show a => a -> String
show Expr a
expr
    show (Select [Text]
columns) = String
"select(" String -> ShowS
forall a. [a] -> [a] -> [a]
++ [Text] -> String
forall a. Show a => a -> String
show [Text]
columns String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
")"
    show (Filter Expr Bool
expr) = String
"filter(" String -> ShowS
forall a. [a] -> [a] -> [a]
++ Expr Bool -> String
forall a. Show a => a -> String
show Expr Bool
expr String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
")"

data InputType = ICSV deriving (Int -> InputType -> ShowS
[InputType] -> ShowS
InputType -> String
(Int -> InputType -> ShowS)
-> (InputType -> String)
-> ([InputType] -> ShowS)
-> Show InputType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> InputType -> ShowS
showsPrec :: Int -> InputType -> ShowS
$cshow :: InputType -> String
show :: InputType -> String
$cshowList :: [InputType] -> ShowS
showList :: [InputType] -> ShowS
Show)

data LazyDataFrame = LazyDataFrame
    { LazyDataFrame -> String
inputPath :: FilePath
    , LazyDataFrame -> InputType
inputType :: InputType
    , LazyDataFrame -> [LazyOperation]
operations :: [LazyOperation]
    , LazyDataFrame -> Int
batchSize :: Int
    }
    deriving (Int -> LazyDataFrame -> ShowS
[LazyDataFrame] -> ShowS
LazyDataFrame -> String
(Int -> LazyDataFrame -> ShowS)
-> (LazyDataFrame -> String)
-> ([LazyDataFrame] -> ShowS)
-> Show LazyDataFrame
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> LazyDataFrame -> ShowS
showsPrec :: Int -> LazyDataFrame -> ShowS
$cshow :: LazyDataFrame -> String
show :: LazyDataFrame -> String
$cshowList :: [LazyDataFrame] -> ShowS
showList :: [LazyDataFrame] -> ShowS
Show)

eval :: LazyOperation -> D.DataFrame -> D.DataFrame
eval :: LazyOperation -> DataFrame -> DataFrame
eval (Derive Text
name Expr a
expr) = Text -> Expr a -> DataFrame -> DataFrame
forall a. Columnable a => Text -> Expr a -> DataFrame -> DataFrame
D.derive Text
name Expr a
expr
eval (Select [Text]
columns) = [Text] -> DataFrame -> DataFrame
D.select [Text]
columns
eval (Filter Expr Bool
expr) = Expr Bool -> DataFrame -> DataFrame
D.filterWhere Expr Bool
expr

runDataFrame :: forall a. (C.Columnable a) => LazyDataFrame -> IO D.DataFrame
runDataFrame :: forall a. Columnable a => LazyDataFrame -> IO DataFrame
runDataFrame LazyDataFrame
df = do
    let path :: String
path = LazyDataFrame -> String
inputPath LazyDataFrame
df
    Int
totalRows <- Char -> String -> IO Int
D.countRows Char
',' String
path
    let batches :: [(Int, Int)]
batches = Int -> Int -> [(Int, Int)]
batchRanges Int
totalRows (LazyDataFrame -> Int
batchSize LazyDataFrame
df)
    (DataFrame
df', (Maybe Integer, Text, Int)
_) <-
        ((DataFrame, (Maybe Integer, Text, Int))
 -> (Int, Int) -> IO (DataFrame, (Maybe Integer, Text, Int)))
-> (DataFrame, (Maybe Integer, Text, Int))
-> [(Int, Int)]
-> IO (DataFrame, (Maybe Integer, Text, Int))
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM
            ( \(DataFrame
accDf, (Maybe Integer
pos, Text
unused, Int
r)) (Int
start, Int
end) -> do
                (String -> IO ()) -> [String] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_
                    String -> IO ()
putStr
                    [ String
"Scanning: "
                    , Int -> String
forall a. Show a => a -> String
show Int
start
                    , String
" to "
                    , Int -> String
forall a. Show a => a -> String
show Int
end
                    , String
" rows out of "
                    , Int -> String
forall a. Show a => a -> String
show Int
totalRows
                    , String
"\n"
                    ]

                (DataFrame
sdf, (Integer
pos', Text
unconsumed, Int
rowsRead)) <-
                    Char
-> ReadOptions -> String -> IO (DataFrame, (Integer, Text, Int))
D.readSeparated
                        Char
','
                        ( ReadOptions
D.defaultOptions
                            { D.rowRange = Just (start, batchSize df)
                            , D.totalRows = Just totalRows
                            , D.seekPos = pos
                            , D.rowsRead = r
                            , D.leftOver = unused
                            }
                        )
                        String
path
                let rdf :: DataFrame
rdf = (DataFrame -> LazyOperation -> DataFrame)
-> DataFrame -> [LazyOperation] -> DataFrame
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
L.foldl' ((LazyOperation -> DataFrame -> DataFrame)
-> DataFrame -> LazyOperation -> DataFrame
forall a b c. (a -> b -> c) -> b -> a -> c
flip LazyOperation -> DataFrame -> DataFrame
eval) DataFrame
sdf (LazyDataFrame -> [LazyOperation]
operations LazyDataFrame
df)
                (DataFrame, (Maybe Integer, Text, Int))
-> IO (DataFrame, (Maybe Integer, Text, Int))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DataFrame
accDf DataFrame -> DataFrame -> DataFrame
forall a. Semigroup a => a -> a -> a
<> DataFrame
rdf, (Integer -> Maybe Integer
forall a. a -> Maybe a
Just Integer
pos', Text
unconsumed, Int
rowsRead Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
r))
            )
            (DataFrame
D.empty, (Maybe Integer
forall a. Maybe a
Nothing, Text
"", Int
0))
            [(Int, Int)]
batches
    DataFrame -> IO DataFrame
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return DataFrame
df'

batchRanges :: Int -> Int -> [(Int, Int)]
batchRanges :: Int -> Int -> [(Int, Int)]
batchRanges Int
n Int
inc = Int -> [Int] -> [(Int, Int)]
forall {a}. a -> [a] -> [(a, a)]
go Int
n [Int
0, Int
inc .. Int
n]
  where
    go :: a -> [a] -> [(a, a)]
go a
_ [] = []
    go a
n [a
x] = [(a
x, a
n)]
    go a
n (a
f : a
s : [a]
rest) = (a
f, a
s) (a, a) -> [(a, a)] -> [(a, a)]
forall a. a -> [a] -> [a]
: a -> [a] -> [(a, a)]
go a
n (a
s a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
rest)

scanCsv :: T.Text -> LazyDataFrame
scanCsv :: Text -> LazyDataFrame
scanCsv Text
path = String -> InputType -> [LazyOperation] -> Int -> LazyDataFrame
LazyDataFrame (Text -> String
T.unpack Text
path) InputType
ICSV [] Int
512_000

addOperation :: LazyOperation -> LazyDataFrame -> LazyDataFrame
addOperation :: LazyOperation -> LazyDataFrame -> LazyDataFrame
addOperation LazyOperation
op LazyDataFrame
df = LazyDataFrame
df{operations = operations df ++ [op]}

derive ::
    (C.Columnable a) => T.Text -> E.Expr a -> LazyDataFrame -> LazyDataFrame
derive :: forall a.
Columnable a =>
Text -> Expr a -> LazyDataFrame -> LazyDataFrame
derive Text
name Expr a
expr = LazyOperation -> LazyDataFrame -> LazyDataFrame
addOperation (Text -> Expr a -> LazyOperation
forall a. Columnable a => Text -> Expr a -> LazyOperation
Derive Text
name Expr a
expr)

select :: (C.Columnable a) => [T.Text] -> LazyDataFrame -> LazyDataFrame
select :: forall a. Columnable a => [Text] -> LazyDataFrame -> LazyDataFrame
select [Text]
columns = LazyOperation -> LazyDataFrame -> LazyDataFrame
addOperation ([Text] -> LazyOperation
Select [Text]
columns)

filter :: (C.Columnable a) => E.Expr Bool -> LazyDataFrame -> LazyDataFrame
filter :: forall a.
Columnable a =>
Expr Bool -> LazyDataFrame -> LazyDataFrame
filter Expr Bool
cond = LazyOperation -> LazyDataFrame -> LazyDataFrame
addOperation (Expr Bool -> LazyOperation
Filter Expr Bool
cond)