{-# LANGUAGE ExplicitNamespaces #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE FlexibleContexts #-}
module DataFrame.Operations.Aggregation where

import qualified Data.Set as S
import qualified DataFrame.Functions as F

import qualified Data.List as L
import qualified Data.Map as M
import qualified Data.Map.Strict as MS
import qualified Data.Text as T
import qualified Data.Vector.Generic as VG
import qualified Data.Vector as V
import qualified Data.Vector.Mutable as VM
import qualified Data.Vector.Unboxed as VU
import qualified Data.Vector.Algorithms.Merge as VA
import qualified Statistics.Quantile as SS
import qualified Statistics.Sample as SS

import Control.Exception (throw)
import Control.Monad (foldM_)
import Control.Monad.ST (runST)
import DataFrame.Internal.Column (Column(..), fromVector,
                                  getIndicesUnboxed, getIndices, 
                                  Columnable, unwrapTypedColumn,
                                  columnVersionString)
import DataFrame.Internal.DataFrame (DataFrame(..), empty, getColumn, unsafeGetColumn)
import DataFrame.Internal.Expression
import DataFrame.Internal.Parsing
import DataFrame.Internal.Types
import DataFrame.Errors
import DataFrame.Operations.Core
import DataFrame.Operations.Subset
import Data.Function ((&))
import Data.Hashable
import Data.List ((\\))
import Data.Maybe
import Data.Type.Equality (type (:~:)(Refl), TestEquality(..))
import Type.Reflection (typeRep, typeOf)

-- | O(k * n) groups the dataframe by the given rows aggregating the remaining rows
-- into vector that should be reduced later.
groupBy ::
  [T.Text] ->
  DataFrame ->
  DataFrame
groupBy :: [Text] -> DataFrame -> DataFrame
groupBy [Text]
names DataFrame
df
  | (Text -> Bool) -> [Text] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any (Text -> [Text] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`notElem` DataFrame -> [Text]
columnNames DataFrame
df) [Text]
names = DataFrameException -> DataFrame
forall a e. Exception e => e -> a
throw (DataFrameException -> DataFrame)
-> DataFrameException -> DataFrame
forall a b. (a -> b) -> a -> b
$ Text -> Text -> [Text] -> DataFrameException
ColumnNotFoundException ([Char] -> Text
T.pack ([Char] -> Text) -> [Char] -> Text
forall a b. (a -> b) -> a -> b
$ [Text] -> [Char]
forall a. Show a => a -> [Char]
show ([Text] -> [Char]) -> [Text] -> [Char]
forall a b. (a -> b) -> a -> b
$ [Text]
names [Text] -> [Text] -> [Text]
forall a. Eq a => [a] -> [a] -> [a]
L.\\ DataFrame -> [Text]
columnNames DataFrame
df) Text
"groupBy" (DataFrame -> [Text]
columnNames DataFrame
df)
  | Bool
otherwise = (DataFrame -> Text -> DataFrame)
-> DataFrame -> [Text] -> DataFrame
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
L.foldl' DataFrame -> Text -> DataFrame
insertColumns DataFrame
initDf [Text]
groupingColumns
  where
    indicesToGroup :: [Int]
indicesToGroup = Map Text Int -> [Int]
forall k a. Map k a -> [a]
M.elems (Map Text Int -> [Int]) -> Map Text Int -> [Int]
forall a b. (a -> b) -> a -> b
$ (Text -> Int -> Bool) -> Map Text Int -> Map Text Int
forall k a. (k -> a -> Bool) -> Map k a -> Map k a
M.filterWithKey (\Text
k Int
_ -> Text
k Text -> [Text] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Text]
names) (DataFrame -> Map Text Int
columnIndices DataFrame
df)
    rowRepresentations :: Vector Int
rowRepresentations = Int -> (Int -> Int) -> Vector Int
forall a. Unbox a => Int -> (Int -> a) -> Vector a
VU.generate ((Int, Int) -> Int
forall a b. (a, b) -> a
fst (DataFrame -> (Int, Int)
dimensions DataFrame
df)) ([Int] -> DataFrame -> Int -> Int
mkRowRep [Int]
indicesToGroup DataFrame
df)

    valueIndices :: Vector (Vector Int)
valueIndices = [Vector Int] -> Vector (Vector Int)
forall a. [a] -> Vector a
V.fromList ([Vector Int] -> Vector (Vector Int))
-> [Vector Int] -> Vector (Vector Int)
forall a b. (a -> b) -> a -> b
$ (Vector (Int, Int) -> Vector Int)
-> [Vector (Int, Int)] -> [Vector Int]
forall a b. (a -> b) -> [a] -> [b]
map (((Int, Int) -> Int) -> Vector (Int, Int) -> Vector Int
forall (v :: * -> *) a b.
(Vector v a, Vector v b) =>
(a -> b) -> v a -> v b
VG.map (Int, Int) -> Int
forall a b. (a, b) -> a
fst) ([Vector (Int, Int)] -> [Vector Int])
-> [Vector (Int, Int)] -> [Vector Int]
forall a b. (a -> b) -> a -> b
$ ((Int, Int) -> (Int, Int) -> Bool)
-> Vector (Int, Int) -> [Vector (Int, Int)]
forall (v :: * -> *) a.
Vector v a =>
(a -> a -> Bool) -> v a -> [v a]
VG.groupBy (\(Int, Int)
a (Int, Int)
b -> (Int, Int) -> Int
forall a b. (a, b) -> b
snd (Int, Int)
a Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== (Int, Int) -> Int
forall a b. (a, b) -> b
snd (Int, Int)
b) ((forall s. ST s (Vector (Int, Int))) -> Vector (Int, Int)
forall a. (forall s. ST s a) -> a
runST ((forall s. ST s (Vector (Int, Int))) -> Vector (Int, Int))
-> (forall s. ST s (Vector (Int, Int))) -> Vector (Int, Int)
forall a b. (a -> b) -> a -> b
$ do
      MVector s (Int, Int)
withIndexes <- Vector (Int, Int)
-> ST s (Mutable Vector (PrimState (ST s)) (Int, Int))
forall (m :: * -> *) (v :: * -> *) a.
(PrimMonad m, Vector v a) =>
v a -> m (Mutable v (PrimState m) a)
VG.thaw (Vector (Int, Int)
 -> ST s (Mutable Vector (PrimState (ST s)) (Int, Int)))
-> Vector (Int, Int)
-> ST s (Mutable Vector (PrimState (ST s)) (Int, Int))
forall a b. (a -> b) -> a -> b
$ Vector Int -> Vector (Int, Int)
forall (v :: * -> *) a.
(Vector v a, Vector v (Int, a)) =>
v a -> v (Int, a)
VG.indexed Vector Int
rowRepresentations
      Comparison (Int, Int)
-> MVector (PrimState (ST s)) (Int, Int) -> ST s ()
forall (m :: * -> *) (v :: * -> * -> *) e.
(PrimMonad m, MVector v e) =>
Comparison e -> v (PrimState m) e -> m ()
VA.sortBy (\(Int
a, Int
b) (Int
a', Int
b') -> Int -> Int -> Ordering
forall a. Ord a => a -> a -> Ordering
compare Int
b Int
b') MVector s (Int, Int)
MVector (PrimState (ST s)) (Int, Int)
withIndexes
      Mutable Vector (PrimState (ST s)) (Int, Int)
-> ST s (Vector (Int, Int))
forall (m :: * -> *) (v :: * -> *) a.
(PrimMonad m, Vector v a) =>
Mutable v (PrimState m) a -> m (v a)
VG.unsafeFreeze MVector s (Int, Int)
Mutable Vector (PrimState (ST s)) (Int, Int)
withIndexes)

    -- These are the indexes of the grouping/key rows i.e the minimum elements
    -- of the list.
    keyIndices :: Vector Int
keyIndices = Int -> (Int -> Int) -> Vector Int
forall a. Unbox a => Int -> (Int -> a) -> Vector a
VU.generate (Vector (Vector Int) -> Int
forall (v :: * -> *) a. Vector v a => v a -> Int
VG.length Vector (Vector Int)
valueIndices) (\Int
i -> Vector Int -> Int
forall (v :: * -> *) a. (Vector v a, Ord a) => v a -> a
VG.minimum (Vector Int -> Int) -> Vector Int -> Int
forall a b. (a -> b) -> a -> b
$ Vector (Vector Int)
valueIndices Vector (Vector Int) -> Int -> Vector Int
forall (v :: * -> *) a.
(HasCallStack, Vector v a) =>
v a -> Int -> a
VG.! Int
i)
    -- this will be our main worker function in the fold that takes all
    -- indices and replaces each value in a column with a list of
    -- the elements with the indices where the grouped row
    -- values are the same.
    insertColumns :: DataFrame -> Text -> DataFrame
insertColumns = Vector (Vector Int) -> DataFrame -> DataFrame -> Text -> DataFrame
groupColumns Vector (Vector Int)
valueIndices DataFrame
df
    -- Out initial DF will just be all the grouped rows added to an
    -- empty dataframe. The entries are dedued and are in their
    -- initial order.
    initDf :: DataFrame
initDf = (DataFrame -> Text -> DataFrame)
-> DataFrame -> [Text] -> DataFrame
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
L.foldl' (Vector Int -> DataFrame -> DataFrame -> Text -> DataFrame
mkGroupedColumns Vector Int
keyIndices DataFrame
df) DataFrame
empty [Text]
names
    -- All the rest of the columns that we are grouping by.
    groupingColumns :: [Text]
groupingColumns = DataFrame -> [Text]
columnNames DataFrame
df [Text] -> [Text] -> [Text]
forall a. Eq a => [a] -> [a] -> [a]
L.\\ [Text]
names

mkRowRep :: [Int] -> DataFrame -> Int -> Int
mkRowRep :: [Int] -> DataFrame -> Int -> Int
mkRowRep [Int]
groupColumnIndices DataFrame
df Int
i = if [Int] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Int]
h Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1 then [Int] -> Int
forall a. HasCallStack => [a] -> a
head [Int]
h else [Int] -> Int
forall a. Hashable a => a -> Int
hash [Int]
h
  where
    h :: [Int]
h = ((Int -> Int) -> [Int] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map Int -> Int
mkHash [Int]
groupColumnIndices)
    getHashedElem :: Column -> Int -> Int
    getHashedElem :: Column -> Int -> Int
getHashedElem (BoxedColumn (Vector a
c :: V.Vector a)) Int
j = forall a. Columnable a => a -> Int
hash' @a (Vector a
c Vector a -> Int -> a
forall a. Vector a -> Int -> a
V.! Int
j)
    getHashedElem (UnboxedColumn (Vector a
c :: VU.Vector a)) Int
j = forall a. Columnable a => a -> Int
hash' @a (Vector a
c Vector a -> Int -> a
forall a. Unbox a => Vector a -> Int -> a
VU.! Int
j)
    getHashedElem (OptionalColumn (Vector (Maybe a)
c :: V.Vector a)) Int
j = forall a. Columnable a => a -> Int
hash' @a (Vector (Maybe a)
c Vector (Maybe a) -> Int -> Maybe a
forall a. Vector a -> Int -> a
V.! Int
j)
    getHashedElem Column
_ Int
_ = Int
0
    mkHash :: Int -> Int
mkHash Int
j = Column -> Int -> Int
getHashedElem (Vector Column -> Int -> Column
forall a. Vector a -> Int -> a
(V.!) (DataFrame -> Vector Column
columns DataFrame
df) Int
j) Int
i 

-- | This hash function returns the hash when given a non numeric type but
-- the value when given a numeric.
hash' :: Columnable a => a -> Int
hash' :: forall a. Columnable a => a -> Int
hash' a
value = case TypeRep a -> TypeRep Double -> Maybe (a :~: Double)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (a -> TypeRep a
forall a. Typeable a => a -> TypeRep a
typeOf a
value) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @Double) of
  Just a :~: Double
Refl -> a -> Int
forall b. Integral b => a -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (a -> Int) -> a -> Int
forall a b. (a -> b) -> a -> b
$ a
value a -> a -> a
forall a. Num a => a -> a -> a
* a
1000
  Maybe (a :~: Double)
Nothing -> case TypeRep a -> TypeRep Int -> Maybe (a :~: Int)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (a -> TypeRep a
forall a. Typeable a => a -> TypeRep a
typeOf a
value) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @Int) of
    Just a :~: Int
Refl -> a
Int
value
    Maybe (a :~: Int)
Nothing -> case TypeRep a -> TypeRep Text -> Maybe (a :~: Text)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (a -> TypeRep a
forall a. Typeable a => a -> TypeRep a
typeOf a
value) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @T.Text) of
      Just a :~: Text
Refl -> a -> Int
forall a. Hashable a => a -> Int
hash a
value
      Maybe (a :~: Text)
Nothing -> [Char] -> Int
forall a. Hashable a => a -> Int
hash (a -> [Char]
forall a. Show a => a -> [Char]
show a
value)

mkGroupedColumns :: VU.Vector Int -> DataFrame -> DataFrame -> T.Text -> DataFrame
mkGroupedColumns :: Vector Int -> DataFrame -> DataFrame -> Text -> DataFrame
mkGroupedColumns Vector Int
indices DataFrame
df DataFrame
acc Text
name =
  case Vector Column -> Int -> Column
forall a. Vector a -> Int -> a
(V.!) (DataFrame -> Vector Column
columns DataFrame
df) (DataFrame -> Map Text Int
columnIndices DataFrame
df Map Text Int -> Text -> Int
forall k a. Ord k => Map k a -> k -> a
M.! Text
name) of
    BoxedColumn Vector a
column ->
      let vs :: Vector a
vs = Vector Int
indices Vector Int -> Vector a -> Vector a
forall a. Vector Int -> Vector a -> Vector a
`getIndices` Vector a
column
       in Text -> Vector a -> DataFrame -> DataFrame
forall a.
Columnable a =>
Text -> Vector a -> DataFrame -> DataFrame
insertVector Text
name Vector a
vs DataFrame
acc
    OptionalColumn Vector (Maybe a)
column ->
      let vs :: Vector (Maybe a)
vs = Vector Int
indices Vector Int -> Vector (Maybe a) -> Vector (Maybe a)
forall a. Vector Int -> Vector a -> Vector a
`getIndices` Vector (Maybe a)
column
       in Text -> Vector (Maybe a) -> DataFrame -> DataFrame
forall a.
Columnable a =>
Text -> Vector a -> DataFrame -> DataFrame
insertVector Text
name Vector (Maybe a)
vs DataFrame
acc
    UnboxedColumn Vector a
column ->
      let vs :: Vector a
vs = Vector Int
indices Vector Int -> Vector a -> Vector a
forall a. Unbox a => Vector Int -> Vector a -> Vector a
`getIndicesUnboxed` Vector a
column
       in Text -> Vector a -> DataFrame -> DataFrame
forall a.
(Columnable a, Unbox a) =>
Text -> Vector a -> DataFrame -> DataFrame
insertUnboxedVector Text
name Vector a
vs DataFrame
acc

groupColumns :: V.Vector (VU.Vector Int) -> DataFrame -> DataFrame -> T.Text -> DataFrame
groupColumns :: Vector (Vector Int) -> DataFrame -> DataFrame -> Text -> DataFrame
groupColumns Vector (Vector Int)
indices DataFrame
df DataFrame
acc Text
name =
  case Vector Column -> Int -> Column
forall a. Vector a -> Int -> a
(V.!) (DataFrame -> Vector Column
columns DataFrame
df) (DataFrame -> Map Text Int
columnIndices DataFrame
df Map Text Int -> Text -> Int
forall k a. Ord k => Map k a -> k -> a
M.! Text
name) of
    BoxedColumn Vector a
column ->
      let vs :: Vector (Vector a)
vs = (Vector Int -> Vector a)
-> Vector (Vector Int) -> Vector (Vector a)
forall a b. (a -> b) -> Vector a -> Vector b
V.map (Vector Int -> Vector a -> Vector a
forall a. Vector Int -> Vector a -> Vector a
`getIndices` Vector a
column) Vector (Vector Int)
indices
       in Text -> Column -> DataFrame -> DataFrame
insertColumn Text
name (Vector (Vector a) -> Column
forall a. Columnable a => Vector (Vector a) -> Column
GroupedBoxedColumn Vector (Vector a)
vs) DataFrame
acc
    OptionalColumn Vector (Maybe a)
column ->
      let vs :: Vector (Vector (Maybe a))
vs = (Vector Int -> Vector (Maybe a))
-> Vector (Vector Int) -> Vector (Vector (Maybe a))
forall a b. (a -> b) -> Vector a -> Vector b
V.map (Vector Int -> Vector (Maybe a) -> Vector (Maybe a)
forall a. Vector Int -> Vector a -> Vector a
`getIndices` Vector (Maybe a)
column) Vector (Vector Int)
indices
       in Text -> Column -> DataFrame -> DataFrame
insertColumn Text
name (Vector (Vector (Maybe a)) -> Column
forall a. Columnable a => Vector (Vector a) -> Column
GroupedBoxedColumn Vector (Vector (Maybe a))
vs) DataFrame
acc
    UnboxedColumn Vector a
column ->
      let vs :: Vector (Vector a)
vs = (Vector Int -> Vector a)
-> Vector (Vector Int) -> Vector (Vector a)
forall a b. (a -> b) -> Vector a -> Vector b
V.map (Vector Int -> Vector a -> Vector a
forall a. Unbox a => Vector Int -> Vector a -> Vector a
`getIndicesUnboxed` Vector a
column) Vector (Vector Int)
indices
       in Text -> Column -> DataFrame -> DataFrame
insertColumn Text
name (Vector (Vector a) -> Column
forall a. (Columnable a, Unbox a) => Vector (Vector a) -> Column
GroupedUnboxedColumn Vector (Vector a)
vs) DataFrame
acc

aggregate :: [(T.Text, UExpr)] -> DataFrame -> DataFrame
aggregate :: [(Text, UExpr)] -> DataFrame -> DataFrame
aggregate [(Text, UExpr)]
aggs DataFrame
df = let
    groupingColumns :: [Text]
groupingColumns = (Text -> Bool) -> [Text] -> [Text]
forall a. (a -> Bool) -> [a] -> [a]
Prelude.filter (\Text
c -> Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Bool
T.isPrefixOf Text
"Grouped" ([Char] -> Text
T.pack ([Char] -> Text) -> [Char] -> Text
forall a b. (a -> b) -> a -> b
$ Column -> [Char]
columnVersionString (Column -> Maybe Column -> Column
forall a. a -> Maybe a -> a
fromMaybe ([Char] -> Column
forall a. HasCallStack => [Char] -> a
error [Char]
"Unexpected") (Text -> DataFrame -> Maybe Column
getColumn Text
c DataFrame
df)))) (DataFrame -> [Text]
columnNames DataFrame
df)
    df' :: DataFrame
df' = [Text] -> DataFrame -> DataFrame
select [Text]
groupingColumns DataFrame
df
    f :: (Text, UExpr) -> DataFrame -> DataFrame
f (Text
name, Wrap (Expr a
expr :: Expr a)) DataFrame
d = let
        value :: TypedColumn a
value = forall a. Columnable a => DataFrame -> Expr a -> TypedColumn a
interpret @a DataFrame
df Expr a
expr
      in Text -> Column -> DataFrame -> DataFrame
insertColumn Text
name (TypedColumn a -> Column
forall a. TypedColumn a -> Column
unwrapTypedColumn TypedColumn a
value) DataFrame
d
  in ((Text, UExpr) -> DataFrame -> DataFrame)
-> [(Text, UExpr)] -> DataFrame -> DataFrame
forall a.
(a -> DataFrame -> DataFrame) -> [a] -> DataFrame -> DataFrame
fold (Text, UExpr) -> DataFrame -> DataFrame
f [(Text, UExpr)]
aggs DataFrame
df'

distinct :: DataFrame -> DataFrame
distinct :: DataFrame -> DataFrame
distinct DataFrame
df = [Text] -> DataFrame -> DataFrame
groupBy (DataFrame -> [Text]
columnNames DataFrame
df) DataFrame
df

distinctBy :: [T.Text] -> DataFrame -> DataFrame
distinctBy :: [Text] -> DataFrame -> DataFrame
distinctBy [Text]
names DataFrame
df = let
    excluded :: [Text]
excluded = (DataFrame -> [Text]
columnNames DataFrame
df) [Text] -> [Text] -> [Text]
forall a. Eq a => [a] -> [a] -> [a]
\\ [Text]
names
    distinctColumns :: DataFrame
distinctColumns = [Text] -> DataFrame -> DataFrame
groupBy [Text]
names DataFrame
df
    aggF :: Text -> (Text, UExpr)
aggF Text
name = case Text -> DataFrame -> Column
unsafeGetColumn Text
name DataFrame
distinctColumns of
      GroupedBoxedColumn (Vector (Vector a)
column :: V.Vector (V.Vector a)) -> (Expr a -> Expr a
forall a. Columnable a => Expr a -> Expr a
F.anyValue (forall a. Columnable a => Text -> Expr a
F.col @a Text
name)) Expr a -> Text -> (Text, UExpr)
forall a. Columnable a => Expr a -> Text -> (Text, UExpr)
`F.as` Text
name
      GroupedUnboxedColumn (Vector (Vector a)
column :: V.Vector (VU.Vector a)) -> (Expr a -> Expr a
forall a. Columnable a => Expr a -> Expr a
F.anyValue (forall a. Columnable a => Text -> Expr a
F.col @a Text
name)) Expr a -> Text -> (Text, UExpr)
forall a. Columnable a => Expr a -> Text -> (Text, UExpr)
`F.as` Text
name
      Column
_ -> [Char] -> (Text, UExpr)
forall a. HasCallStack => [Char] -> a
error ([Char] -> (Text, UExpr)) -> [Char] -> (Text, UExpr)
forall a b. (a -> b) -> a -> b
$ [Char]
"Column isn't grouped: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ (Text -> [Char]
T.unpack Text
name)
  in [(Text, UExpr)] -> DataFrame -> DataFrame
aggregate ((Text -> (Text, UExpr)) -> [Text] -> [(Text, UExpr)]
forall a b. (a -> b) -> [a] -> [b]
map Text -> (Text, UExpr)
aggF [Text]
excluded) DataFrame
distinctColumns