{-# LANGUAGE GADTs #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE NumericUnderscores #-}
module DataFrame.Lazy.Internal.DataFrame where

import           Control.Monad (forM, foldM)
import           Data.IORef
import           Data.Kind
import qualified Data.List as L
import qualified Data.Map as M
import qualified Data.Text as T
import qualified Data.Vector as V
import qualified DataFrame.Internal.DataFrame as D
import qualified DataFrame.Internal.Column as C
import qualified DataFrame.Internal.Expression as E
import qualified DataFrame.Operations.Core as D
import           DataFrame.Operations.Merge
import qualified DataFrame.Operations.Subset as D
import qualified DataFrame.Operations.Transformations as D
import qualified DataFrame.Lazy.IO.CSV as D
import           System.FilePath

data LazyOperation where
  Derive :: C.Columnable a => T.Text -> E.Expr a -> LazyOperation
  Select :: [T.Text] -> LazyOperation
  Filter :: E.Expr Bool -> LazyOperation

instance Show LazyOperation where
  show :: LazyOperation -> String
  show :: LazyOperation -> String
show (Derive Text
name Expr a
expr) = Text -> String
T.unpack Text
name String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" := " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Expr a -> String
forall a. Show a => a -> String
show Expr a
expr
  show (Select [Text]
columns) =  String
"select(" String -> ShowS
forall a. [a] -> [a] -> [a]
++ [Text] -> String
forall a. Show a => a -> String
show [Text]
columns String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
")"
  show (Filter Expr Bool
expr) = String
"filter(" String -> ShowS
forall a. [a] -> [a] -> [a]
++ Expr Bool -> String
forall a. Show a => a -> String
show Expr Bool
expr String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
")"

data InputType = ICSV deriving Int -> InputType -> ShowS
[InputType] -> ShowS
InputType -> String
(Int -> InputType -> ShowS)
-> (InputType -> String)
-> ([InputType] -> ShowS)
-> Show InputType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> InputType -> ShowS
showsPrec :: Int -> InputType -> ShowS
$cshow :: InputType -> String
show :: InputType -> String
$cshowList :: [InputType] -> ShowS
showList :: [InputType] -> ShowS
Show

data LazyDataFrame = LazyDataFrame
  { LazyDataFrame -> String
inputPath        :: FilePath
  , LazyDataFrame -> InputType
inputType        :: InputType
  , LazyDataFrame -> [LazyOperation]
operations       :: [LazyOperation]
  , LazyDataFrame -> Int
batchSize        :: Int
  } deriving Int -> LazyDataFrame -> ShowS
[LazyDataFrame] -> ShowS
LazyDataFrame -> String
(Int -> LazyDataFrame -> ShowS)
-> (LazyDataFrame -> String)
-> ([LazyDataFrame] -> ShowS)
-> Show LazyDataFrame
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> LazyDataFrame -> ShowS
showsPrec :: Int -> LazyDataFrame -> ShowS
$cshow :: LazyDataFrame -> String
show :: LazyDataFrame -> String
$cshowList :: [LazyDataFrame] -> ShowS
showList :: [LazyDataFrame] -> ShowS
Show

eval :: LazyOperation -> D.DataFrame -> D.DataFrame
eval :: LazyOperation -> DataFrame -> DataFrame
eval (Derive Text
name Expr a
expr) = Text -> Expr a -> DataFrame -> DataFrame
forall a. Columnable a => Text -> Expr a -> DataFrame -> DataFrame
D.derive Text
name Expr a
expr
eval (Select [Text]
columns) = [Text] -> DataFrame -> DataFrame
D.select [Text]
columns
eval (Filter Expr Bool
expr) = Expr Bool -> DataFrame -> DataFrame
D.filterWhere Expr Bool
expr

runDataFrame :: forall a . (C.Columnable a) => LazyDataFrame -> IO D.DataFrame
runDataFrame :: forall a. Columnable a => LazyDataFrame -> IO DataFrame
runDataFrame LazyDataFrame
df = do
  let path :: String
path = LazyDataFrame -> String
inputPath LazyDataFrame
df
  Int
totalRows <- Char -> String -> IO Int
D.countRows Char
',' String
path
  let batches :: [(Int, Int)]
batches = Int -> Int -> [(Int, Int)]
batchRanges Int
totalRows (LazyDataFrame -> Int
batchSize LazyDataFrame
df)
  (DataFrame
df', (Maybe Integer, Text, Int)
_) <- ((DataFrame, (Maybe Integer, Text, Int))
 -> (Int, Int) -> IO (DataFrame, (Maybe Integer, Text, Int)))
-> (DataFrame, (Maybe Integer, Text, Int))
-> [(Int, Int)]
-> IO (DataFrame, (Maybe Integer, Text, Int))
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM (\(DataFrame
accDf, (Maybe Integer
pos, Text
unused, Int
r)) (Int
start, Int
end) -> do
    (String -> IO ()) -> [String] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ String -> IO ()
putStr [String
"Scanning: ", Int -> String
forall a. Show a => a -> String
show Int
start, String
" to ", Int -> String
forall a. Show a => a -> String
show Int
end, String
" rows out of ", Int -> String
forall a. Show a => a -> String
show Int
totalRows, String
"\n"] 

    (DataFrame
sdf, (Integer
pos', Text
unconsumed, Int
rowsRead)) <- Char
-> ReadOptions -> String -> IO (DataFrame, (Integer, Text, Int))
D.readSeparated Char
',' (
      ReadOptions
D.defaultOptions { D.rowRange = Just (start, batchSize df)
                       , D.totalRows = Just totalRows
                       , D.seekPos = pos
                       , D.rowsRead = r
                       , D.leftOver = unused}) String
path
    let rdf :: DataFrame
rdf = (DataFrame -> LazyOperation -> DataFrame)
-> DataFrame -> [LazyOperation] -> DataFrame
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
L.foldl' ((LazyOperation -> DataFrame -> DataFrame)
-> DataFrame -> LazyOperation -> DataFrame
forall a b c. (a -> b -> c) -> b -> a -> c
flip LazyOperation -> DataFrame -> DataFrame
eval) DataFrame
sdf (LazyDataFrame -> [LazyOperation]
operations LazyDataFrame
df)
    (DataFrame, (Maybe Integer, Text, Int))
-> IO (DataFrame, (Maybe Integer, Text, Int))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DataFrame
accDf DataFrame -> DataFrame -> DataFrame
forall a. Semigroup a => a -> a -> a
<> DataFrame
rdf, (Integer -> Maybe Integer
forall a. a -> Maybe a
Just Integer
pos', Text
unconsumed, Int
rowsRead Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
r)) ) (DataFrame
D.empty, (Maybe Integer
forall a. Maybe a
Nothing, Text
"", Int
0)) [(Int, Int)]
batches
  DataFrame -> IO DataFrame
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return DataFrame
df'

batchRanges :: Int -> Int -> [(Int, Int)]
batchRanges :: Int -> Int -> [(Int, Int)]
batchRanges Int
n Int
inc = Int -> [Int] -> [(Int, Int)]
forall {a}. a -> [a] -> [(a, a)]
go Int
n [Int
0,Int
inc..Int
n]
  where
    go :: a -> [a] -> [(a, a)]
go a
_ []         = []
    go a
n [a
x]        = [(a
x, a
n)]
    go a
n (a
f:a
s:[a]
rest) =(a
f, a
s) (a, a) -> [(a, a)] -> [(a, a)]
forall a. a -> [a] -> [a]
: a -> [a] -> [(a, a)]
go a
n (a
sa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
rest)

scanCsv :: T.Text -> LazyDataFrame
scanCsv :: Text -> LazyDataFrame
scanCsv Text
path = String -> InputType -> [LazyOperation] -> Int -> LazyDataFrame
LazyDataFrame (Text -> String
T.unpack Text
path) InputType
ICSV [] Int
512_000

addOperation :: LazyOperation -> LazyDataFrame -> LazyDataFrame
addOperation :: LazyOperation -> LazyDataFrame -> LazyDataFrame
addOperation LazyOperation
op LazyDataFrame
df = LazyDataFrame
df { operations = operations df ++ [op] }

derive :: C.Columnable a => T.Text -> E.Expr a -> LazyDataFrame -> LazyDataFrame
derive :: forall a.
Columnable a =>
Text -> Expr a -> LazyDataFrame -> LazyDataFrame
derive Text
name Expr a
expr = LazyOperation -> LazyDataFrame -> LazyDataFrame
addOperation (Text -> Expr a -> LazyOperation
forall a. Columnable a => Text -> Expr a -> LazyOperation
Derive Text
name Expr a
expr)

select :: C.Columnable a => [T.Text] -> LazyDataFrame -> LazyDataFrame
select :: forall a. Columnable a => [Text] -> LazyDataFrame -> LazyDataFrame
select [Text]
columns = LazyOperation -> LazyDataFrame -> LazyDataFrame
addOperation ([Text] -> LazyOperation
Select [Text]
columns)

filter :: C.Columnable a => E.Expr Bool -> LazyDataFrame -> LazyDataFrame
filter :: forall a.
Columnable a =>
Expr Bool -> LazyDataFrame -> LazyDataFrame
filter Expr Bool
cond = LazyOperation -> LazyDataFrame -> LazyDataFrame
addOperation (Expr Bool -> LazyOperation
Filter Expr Bool
cond)