{-# LANGUAGE AllowAmbiguousTypes #-} {-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE InstanceSigs #-} {-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedStrings #-} module DataFrame.Lazy.Internal.DataFrame where import Control.Monad (foldM) import qualified Data.List as L import qualified Data.Text as T import qualified DataFrame.Internal.Column as C import qualified DataFrame.Internal.DataFrame as D import qualified DataFrame.Internal.Expression as E import qualified DataFrame.Lazy.IO.CSV as D import DataFrame.Operations.Merge () import qualified DataFrame.Operations.Subset as D import qualified DataFrame.Operations.Transformations as D data LazyOperation where Derive :: (C.Columnable a) => T.Text -> E.Expr a -> LazyOperation Select :: [T.Text] -> LazyOperation Filter :: E.Expr Bool -> LazyOperation instance Show LazyOperation where show :: LazyOperation -> String show :: LazyOperation -> String show (Derive Text name Expr a expr) = Text -> String T.unpack Text name String -> ShowS forall a. [a] -> [a] -> [a] ++ String " := " String -> ShowS forall a. [a] -> [a] -> [a] ++ Expr a -> String forall a. Show a => a -> String show Expr a expr show (Select [Text] columns) = String "select(" String -> ShowS forall a. [a] -> [a] -> [a] ++ [Text] -> String forall a. Show a => a -> String show [Text] columns String -> ShowS forall a. [a] -> [a] -> [a] ++ String ")" show (Filter Expr Bool expr) = String "filter(" String -> ShowS forall a. [a] -> [a] -> [a] ++ Expr Bool -> String forall a. Show a => a -> String show Expr Bool expr String -> ShowS forall a. [a] -> [a] -> [a] ++ String ")" data InputType = ICSV deriving (Int -> InputType -> ShowS [InputType] -> ShowS InputType -> String (Int -> InputType -> ShowS) -> (InputType -> String) -> ([InputType] -> ShowS) -> Show InputType forall a. (Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a $cshowsPrec :: Int -> InputType -> ShowS showsPrec :: Int -> InputType -> ShowS $cshow :: InputType -> String show :: InputType -> String $cshowList :: [InputType] -> ShowS showList :: [InputType] -> ShowS Show) data LazyDataFrame = LazyDataFrame { LazyDataFrame -> String inputPath :: FilePath , LazyDataFrame -> InputType inputType :: InputType , LazyDataFrame -> [LazyOperation] operations :: [LazyOperation] , LazyDataFrame -> Int batchSize :: Int } deriving (Int -> LazyDataFrame -> ShowS [LazyDataFrame] -> ShowS LazyDataFrame -> String (Int -> LazyDataFrame -> ShowS) -> (LazyDataFrame -> String) -> ([LazyDataFrame] -> ShowS) -> Show LazyDataFrame forall a. (Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a $cshowsPrec :: Int -> LazyDataFrame -> ShowS showsPrec :: Int -> LazyDataFrame -> ShowS $cshow :: LazyDataFrame -> String show :: LazyDataFrame -> String $cshowList :: [LazyDataFrame] -> ShowS showList :: [LazyDataFrame] -> ShowS Show) eval :: LazyOperation -> D.DataFrame -> D.DataFrame eval :: LazyOperation -> DataFrame -> DataFrame eval (Derive Text name Expr a expr) = Text -> Expr a -> DataFrame -> DataFrame forall a. Columnable a => Text -> Expr a -> DataFrame -> DataFrame D.derive Text name Expr a expr eval (Select [Text] columns) = [Text] -> DataFrame -> DataFrame D.select [Text] columns eval (Filter Expr Bool expr) = Expr Bool -> DataFrame -> DataFrame D.filterWhere Expr Bool expr runDataFrame :: forall a. (C.Columnable a) => LazyDataFrame -> IO D.DataFrame runDataFrame :: forall a. Columnable a => LazyDataFrame -> IO DataFrame runDataFrame LazyDataFrame df = do let path :: String path = LazyDataFrame -> String inputPath LazyDataFrame df Int totalRows <- Char -> String -> IO Int D.countRows Char ',' String path let batches :: [(Int, Int)] batches = Int -> Int -> [(Int, Int)] batchRanges Int totalRows (LazyDataFrame -> Int batchSize LazyDataFrame df) (DataFrame df', (Maybe Integer, Text, Int) _) <- ((DataFrame, (Maybe Integer, Text, Int)) -> (Int, Int) -> IO (DataFrame, (Maybe Integer, Text, Int))) -> (DataFrame, (Maybe Integer, Text, Int)) -> [(Int, Int)] -> IO (DataFrame, (Maybe Integer, Text, Int)) forall (t :: * -> *) (m :: * -> *) b a. (Foldable t, Monad m) => (b -> a -> m b) -> b -> t a -> m b foldM ( \(DataFrame accDf, (Maybe Integer pos, Text unused, Int r)) (Int start, Int end) -> do (String -> IO ()) -> [String] -> IO () forall (t :: * -> *) (m :: * -> *) a b. (Foldable t, Monad m) => (a -> m b) -> t a -> m () mapM_ String -> IO () putStr [ String "Scanning: " , Int -> String forall a. Show a => a -> String show Int start , String " to " , Int -> String forall a. Show a => a -> String show Int end , String " rows out of " , Int -> String forall a. Show a => a -> String show Int totalRows , String "\n" ] (DataFrame sdf, (Integer pos', Text unconsumed, Int rowsRead)) <- Char -> ReadOptions -> String -> IO (DataFrame, (Integer, Text, Int)) D.readSeparated Char ',' ( ReadOptions D.defaultOptions { D.rowRange = Just (start, batchSize df) , D.totalRows = Just totalRows , D.seekPos = pos , D.rowsRead = r , D.leftOver = unused } ) String path let rdf :: DataFrame rdf = (DataFrame -> LazyOperation -> DataFrame) -> DataFrame -> [LazyOperation] -> DataFrame forall b a. (b -> a -> b) -> b -> [a] -> b forall (t :: * -> *) b a. Foldable t => (b -> a -> b) -> b -> t a -> b L.foldl' ((LazyOperation -> DataFrame -> DataFrame) -> DataFrame -> LazyOperation -> DataFrame forall a b c. (a -> b -> c) -> b -> a -> c flip LazyOperation -> DataFrame -> DataFrame eval) DataFrame sdf (LazyDataFrame -> [LazyOperation] operations LazyDataFrame df) (DataFrame, (Maybe Integer, Text, Int)) -> IO (DataFrame, (Maybe Integer, Text, Int)) forall a. a -> IO a forall (m :: * -> *) a. Monad m => a -> m a return (DataFrame accDf DataFrame -> DataFrame -> DataFrame forall a. Semigroup a => a -> a -> a <> DataFrame rdf, (Integer -> Maybe Integer forall a. a -> Maybe a Just Integer pos', Text unconsumed, Int rowsRead Int -> Int -> Int forall a. Num a => a -> a -> a + Int r)) ) (DataFrame D.empty, (Maybe Integer forall a. Maybe a Nothing, Text "", Int 0)) [(Int, Int)] batches DataFrame -> IO DataFrame forall a. a -> IO a forall (m :: * -> *) a. Monad m => a -> m a return DataFrame df' batchRanges :: Int -> Int -> [(Int, Int)] batchRanges :: Int -> Int -> [(Int, Int)] batchRanges Int n Int inc = Int -> [Int] -> [(Int, Int)] forall {a}. a -> [a] -> [(a, a)] go Int n [Int 0, Int inc .. Int n] where go :: a -> [a] -> [(a, a)] go a _ [] = [] go a n [a x] = [(a x, a n)] go a n (a f : a s : [a] rest) = (a f, a s) (a, a) -> [(a, a)] -> [(a, a)] forall a. a -> [a] -> [a] : a -> [a] -> [(a, a)] go a n (a s a -> [a] -> [a] forall a. a -> [a] -> [a] : [a] rest) scanCsv :: T.Text -> LazyDataFrame scanCsv :: Text -> LazyDataFrame scanCsv Text path = String -> InputType -> [LazyOperation] -> Int -> LazyDataFrame LazyDataFrame (Text -> String T.unpack Text path) InputType ICSV [] Int 512_000 addOperation :: LazyOperation -> LazyDataFrame -> LazyDataFrame addOperation :: LazyOperation -> LazyDataFrame -> LazyDataFrame addOperation LazyOperation op LazyDataFrame df = LazyDataFrame df{operations = operations df ++ [op]} derive :: (C.Columnable a) => T.Text -> E.Expr a -> LazyDataFrame -> LazyDataFrame derive :: forall a. Columnable a => Text -> Expr a -> LazyDataFrame -> LazyDataFrame derive Text name Expr a expr = LazyOperation -> LazyDataFrame -> LazyDataFrame addOperation (Text -> Expr a -> LazyOperation forall a. Columnable a => Text -> Expr a -> LazyOperation Derive Text name Expr a expr) select :: (C.Columnable a) => [T.Text] -> LazyDataFrame -> LazyDataFrame select :: forall a. Columnable a => [Text] -> LazyDataFrame -> LazyDataFrame select [Text] columns = LazyOperation -> LazyDataFrame -> LazyDataFrame addOperation ([Text] -> LazyOperation Select [Text] columns) filter :: (C.Columnable a) => E.Expr Bool -> LazyDataFrame -> LazyDataFrame filter :: forall a. Columnable a => Expr Bool -> LazyDataFrame -> LazyDataFrame filter Expr Bool cond = LazyOperation -> LazyDataFrame -> LazyDataFrame addOperation (Expr Bool -> LazyOperation Filter Expr Bool cond)