{-# LANGUAGE GADTs #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE InstanceSigs #-} {-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE AllowAmbiguousTypes #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE NumericUnderscores #-} module DataFrame.Lazy.Internal.DataFrame where import Control.Monad (forM, foldM) import Data.IORef import Data.Kind import qualified Data.List as L import qualified Data.Map as M import qualified Data.Text as T import qualified Data.Vector as V import qualified DataFrame.Internal.DataFrame as D import qualified DataFrame.Internal.Column as C import qualified DataFrame.Internal.Expression as E import qualified DataFrame.Operations.Core as D import DataFrame.Operations.Merge import qualified DataFrame.Operations.Subset as D import qualified DataFrame.Operations.Transformations as D import qualified DataFrame.Lazy.IO.CSV as D import System.FilePath data LazyOperation where Derive :: C.Columnable a => T.Text -> E.Expr a -> LazyOperation Select :: [T.Text] -> LazyOperation Filter :: E.Expr Bool -> LazyOperation instance Show LazyOperation where show :: LazyOperation -> String show :: LazyOperation -> String show (Derive Text name Expr a expr) = Text -> String T.unpack Text name String -> ShowS forall a. [a] -> [a] -> [a] ++ String " := " String -> ShowS forall a. [a] -> [a] -> [a] ++ Expr a -> String forall a. Show a => a -> String show Expr a expr show (Select [Text] columns) = String "select(" String -> ShowS forall a. [a] -> [a] -> [a] ++ [Text] -> String forall a. Show a => a -> String show [Text] columns String -> ShowS forall a. [a] -> [a] -> [a] ++ String ")" show (Filter Expr Bool expr) = String "filter(" String -> ShowS forall a. [a] -> [a] -> [a] ++ Expr Bool -> String forall a. Show a => a -> String show Expr Bool expr String -> ShowS forall a. [a] -> [a] -> [a] ++ String ")" data InputType = ICSV deriving Int -> InputType -> ShowS [InputType] -> ShowS InputType -> String (Int -> InputType -> ShowS) -> (InputType -> String) -> ([InputType] -> ShowS) -> Show InputType forall a. (Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a $cshowsPrec :: Int -> InputType -> ShowS showsPrec :: Int -> InputType -> ShowS $cshow :: InputType -> String show :: InputType -> String $cshowList :: [InputType] -> ShowS showList :: [InputType] -> ShowS Show data LazyDataFrame = LazyDataFrame { LazyDataFrame -> String inputPath :: FilePath , LazyDataFrame -> InputType inputType :: InputType , LazyDataFrame -> [LazyOperation] operations :: [LazyOperation] , LazyDataFrame -> Int batchSize :: Int } deriving Int -> LazyDataFrame -> ShowS [LazyDataFrame] -> ShowS LazyDataFrame -> String (Int -> LazyDataFrame -> ShowS) -> (LazyDataFrame -> String) -> ([LazyDataFrame] -> ShowS) -> Show LazyDataFrame forall a. (Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a $cshowsPrec :: Int -> LazyDataFrame -> ShowS showsPrec :: Int -> LazyDataFrame -> ShowS $cshow :: LazyDataFrame -> String show :: LazyDataFrame -> String $cshowList :: [LazyDataFrame] -> ShowS showList :: [LazyDataFrame] -> ShowS Show eval :: LazyOperation -> D.DataFrame -> D.DataFrame eval :: LazyOperation -> DataFrame -> DataFrame eval (Derive Text name Expr a expr) = Text -> Expr a -> DataFrame -> DataFrame forall a. Columnable a => Text -> Expr a -> DataFrame -> DataFrame D.derive Text name Expr a expr eval (Select [Text] columns) = [Text] -> DataFrame -> DataFrame D.select [Text] columns eval (Filter Expr Bool expr) = Expr Bool -> DataFrame -> DataFrame D.filterWhere Expr Bool expr runDataFrame :: forall a . (C.Columnable a) => LazyDataFrame -> IO D.DataFrame runDataFrame :: forall a. Columnable a => LazyDataFrame -> IO DataFrame runDataFrame LazyDataFrame df = do let path :: String path = LazyDataFrame -> String inputPath LazyDataFrame df Int totalRows <- Char -> String -> IO Int D.countRows Char ',' String path let batches :: [(Int, Int)] batches = Int -> Int -> [(Int, Int)] batchRanges Int totalRows (LazyDataFrame -> Int batchSize LazyDataFrame df) (DataFrame df', (Maybe Integer, Text, Int) _) <- ((DataFrame, (Maybe Integer, Text, Int)) -> (Int, Int) -> IO (DataFrame, (Maybe Integer, Text, Int))) -> (DataFrame, (Maybe Integer, Text, Int)) -> [(Int, Int)] -> IO (DataFrame, (Maybe Integer, Text, Int)) forall (t :: * -> *) (m :: * -> *) b a. (Foldable t, Monad m) => (b -> a -> m b) -> b -> t a -> m b foldM (\(DataFrame accDf, (Maybe Integer pos, Text unused, Int r)) (Int start, Int end) -> do (String -> IO ()) -> [String] -> IO () forall (t :: * -> *) (m :: * -> *) a b. (Foldable t, Monad m) => (a -> m b) -> t a -> m () mapM_ String -> IO () putStr [String "Scanning: ", Int -> String forall a. Show a => a -> String show Int start, String " to ", Int -> String forall a. Show a => a -> String show Int end, String " rows out of ", Int -> String forall a. Show a => a -> String show Int totalRows, String "\n"] (DataFrame sdf, (Integer pos', Text unconsumed, Int rowsRead)) <- Char -> ReadOptions -> String -> IO (DataFrame, (Integer, Text, Int)) D.readSeparated Char ',' ( ReadOptions D.defaultOptions { D.rowRange = Just (start, batchSize df) , D.totalRows = Just totalRows , D.seekPos = pos , D.rowsRead = r , D.leftOver = unused}) String path let rdf :: DataFrame rdf = (DataFrame -> LazyOperation -> DataFrame) -> DataFrame -> [LazyOperation] -> DataFrame forall b a. (b -> a -> b) -> b -> [a] -> b forall (t :: * -> *) b a. Foldable t => (b -> a -> b) -> b -> t a -> b L.foldl' ((LazyOperation -> DataFrame -> DataFrame) -> DataFrame -> LazyOperation -> DataFrame forall a b c. (a -> b -> c) -> b -> a -> c flip LazyOperation -> DataFrame -> DataFrame eval) DataFrame sdf (LazyDataFrame -> [LazyOperation] operations LazyDataFrame df) (DataFrame, (Maybe Integer, Text, Int)) -> IO (DataFrame, (Maybe Integer, Text, Int)) forall a. a -> IO a forall (m :: * -> *) a. Monad m => a -> m a return (DataFrame accDf DataFrame -> DataFrame -> DataFrame forall a. Semigroup a => a -> a -> a <> DataFrame rdf, (Integer -> Maybe Integer forall a. a -> Maybe a Just Integer pos', Text unconsumed, Int rowsRead Int -> Int -> Int forall a. Num a => a -> a -> a + Int r)) ) (DataFrame D.empty, (Maybe Integer forall a. Maybe a Nothing, Text "", Int 0)) [(Int, Int)] batches DataFrame -> IO DataFrame forall a. a -> IO a forall (m :: * -> *) a. Monad m => a -> m a return DataFrame df' batchRanges :: Int -> Int -> [(Int, Int)] batchRanges :: Int -> Int -> [(Int, Int)] batchRanges Int n Int inc = Int -> [Int] -> [(Int, Int)] forall {a}. a -> [a] -> [(a, a)] go Int n [Int 0,Int inc..Int n] where go :: a -> [a] -> [(a, a)] go a _ [] = [] go a n [a x] = [(a x, a n)] go a n (a f:a s:[a] rest) =(a f, a s) (a, a) -> [(a, a)] -> [(a, a)] forall a. a -> [a] -> [a] : a -> [a] -> [(a, a)] go a n (a sa -> [a] -> [a] forall a. a -> [a] -> [a] :[a] rest) scanCsv :: T.Text -> LazyDataFrame scanCsv :: Text -> LazyDataFrame scanCsv Text path = String -> InputType -> [LazyOperation] -> Int -> LazyDataFrame LazyDataFrame (Text -> String T.unpack Text path) InputType ICSV [] Int 512_000 addOperation :: LazyOperation -> LazyDataFrame -> LazyDataFrame addOperation :: LazyOperation -> LazyDataFrame -> LazyDataFrame addOperation LazyOperation op LazyDataFrame df = LazyDataFrame df { operations = operations df ++ [op] } derive :: C.Columnable a => T.Text -> E.Expr a -> LazyDataFrame -> LazyDataFrame derive :: forall a. Columnable a => Text -> Expr a -> LazyDataFrame -> LazyDataFrame derive Text name Expr a expr = LazyOperation -> LazyDataFrame -> LazyDataFrame addOperation (Text -> Expr a -> LazyOperation forall a. Columnable a => Text -> Expr a -> LazyOperation Derive Text name Expr a expr) select :: C.Columnable a => [T.Text] -> LazyDataFrame -> LazyDataFrame select :: forall a. Columnable a => [Text] -> LazyDataFrame -> LazyDataFrame select [Text] columns = LazyOperation -> LazyDataFrame -> LazyDataFrame addOperation ([Text] -> LazyOperation Select [Text] columns) filter :: C.Columnable a => E.Expr Bool -> LazyDataFrame -> LazyDataFrame filter :: forall a. Columnable a => Expr Bool -> LazyDataFrame -> LazyDataFrame filter Expr Bool cond = LazyOperation -> LazyDataFrame -> LazyDataFrame addOperation (Expr Bool -> LazyOperation Filter Expr Bool cond)