{-# LANGUAGE ExplicitNamespaces #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

module DataFrame.Operations.Aggregation where

import qualified Data.List as L
import qualified Data.Map as M
import qualified Data.Text as T
import qualified Data.Vector as V
import qualified Data.Vector.Algorithms.Merge as VA
import qualified Data.Vector.Generic as VG
import qualified Data.Vector.Unboxed as VU

import Control.Exception (throw)
import Control.Monad.ST (runST)
import Data.Hashable
import Data.Type.Equality (TestEquality (..), type (:~:) (Refl))
import DataFrame.Errors
import DataFrame.Internal.Column (
    Column (..),
    Columnable,
    TypedColumn (..),
    atIndicesStable,
    getIndices,
    getIndicesUnboxed,
 )
import DataFrame.Internal.DataFrame (DataFrame (..), GroupedDataFrame (..))
import DataFrame.Internal.Expression
import DataFrame.Operations.Core
import DataFrame.Operations.Subset
import Type.Reflection (typeOf, typeRep)

{- | O(k * n) groups the dataframe by the given rows aggregating the remaining rows
into vector that should be reduced later.
-}
groupBy ::
    [T.Text] ->
    DataFrame ->
    GroupedDataFrame
groupBy :: [Text] -> DataFrame -> GroupedDataFrame
groupBy [Text]
names DataFrame
df
    | (Text -> Bool) -> [Text] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any (Text -> [Text] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`notElem` DataFrame -> [Text]
columnNames DataFrame
df) [Text]
names =
        DataFrameException -> GroupedDataFrame
forall a e. Exception e => e -> a
throw (DataFrameException -> GroupedDataFrame)
-> DataFrameException -> GroupedDataFrame
forall a b. (a -> b) -> a -> b
$
            Text -> Text -> [Text] -> DataFrameException
ColumnNotFoundException
                (String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ [Text] -> String
forall a. Show a => a -> String
show ([Text] -> String) -> [Text] -> String
forall a b. (a -> b) -> a -> b
$ [Text]
names [Text] -> [Text] -> [Text]
forall a. Eq a => [a] -> [a] -> [a]
L.\\ DataFrame -> [Text]
columnNames DataFrame
df)
                Text
"groupBy"
                (DataFrame -> [Text]
columnNames DataFrame
df)
    | Bool
otherwise =
        DataFrame -> [Text] -> Vector Int -> Vector Int -> GroupedDataFrame
Grouped
            DataFrame
df
            [Text]
names
            (((Int, Int) -> Int) -> Vector (Int, Int) -> Vector Int
forall (v :: * -> *) a b.
(Vector v a, Vector v b) =>
(a -> b) -> v a -> v b
VG.map (Int, Int) -> Int
forall a b. (a, b) -> a
fst Vector (Int, Int)
valueIndices)
            ([Int] -> Vector Int
forall a. Unbox a => [a] -> Vector a
VU.fromList ([Int] -> [Int]
forall a. [a] -> [a]
reverse (Vector (Int, Int) -> [Int]
forall a. (Eq a, Unbox a) => Vector (Int, a) -> [Int]
changingPoints Vector (Int, Int)
valueIndices)))
  where
    indicesToGroup :: [Int]
indicesToGroup = Map Text Int -> [Int]
forall k a. Map k a -> [a]
M.elems (Map Text Int -> [Int]) -> Map Text Int -> [Int]
forall a b. (a -> b) -> a -> b
$ (Text -> Int -> Bool) -> Map Text Int -> Map Text Int
forall k a. (k -> a -> Bool) -> Map k a -> Map k a
M.filterWithKey (\Text
k Int
_ -> Text
k Text -> [Text] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Text]
names) (DataFrame -> Map Text Int
columnIndices DataFrame
df)
    rowRepresentations :: Vector Int
rowRepresentations = Int -> (Int -> Int) -> Vector Int
forall a. Unbox a => Int -> (Int -> a) -> Vector a
VU.generate ((Int, Int) -> Int
forall a b. (a, b) -> a
fst (DataFrame -> (Int, Int)
dimensions DataFrame
df)) ([Int] -> DataFrame -> Int -> Int
mkRowRep [Int]
indicesToGroup DataFrame
df)

    valueIndices :: Vector (Int, Int)
valueIndices = (forall s. ST s (Vector (Int, Int))) -> Vector (Int, Int)
forall a. (forall s. ST s a) -> a
runST ((forall s. ST s (Vector (Int, Int))) -> Vector (Int, Int))
-> (forall s. ST s (Vector (Int, Int))) -> Vector (Int, Int)
forall a b. (a -> b) -> a -> b
$ do
        MVector s (Int, Int)
withIndexes <- Vector (Int, Int)
-> ST s (Mutable Vector (PrimState (ST s)) (Int, Int))
forall (m :: * -> *) (v :: * -> *) a.
(PrimMonad m, Vector v a) =>
v a -> m (Mutable v (PrimState m) a)
VG.thaw (Vector (Int, Int)
 -> ST s (Mutable Vector (PrimState (ST s)) (Int, Int)))
-> Vector (Int, Int)
-> ST s (Mutable Vector (PrimState (ST s)) (Int, Int))
forall a b. (a -> b) -> a -> b
$ Vector Int -> Vector (Int, Int)
forall (v :: * -> *) a.
(Vector v a, Vector v (Int, a)) =>
v a -> v (Int, a)
VG.indexed Vector Int
rowRepresentations
        Comparison (Int, Int)
-> MVector (PrimState (ST s)) (Int, Int) -> ST s ()
forall (m :: * -> *) (v :: * -> * -> *) e.
(PrimMonad m, MVector v e) =>
Comparison e -> v (PrimState m) e -> m ()
VA.sortBy (\(Int
a, Int
b) (Int
a', Int
b') -> Int -> Int -> Ordering
forall a. Ord a => a -> a -> Ordering
compare Int
b Int
b') MVector s (Int, Int)
MVector (PrimState (ST s)) (Int, Int)
withIndexes
        Mutable Vector (PrimState (ST s)) (Int, Int)
-> ST s (Vector (Int, Int))
forall (m :: * -> *) (v :: * -> *) a.
(PrimMonad m, Vector v a) =>
Mutable v (PrimState m) a -> m (v a)
VG.unsafeFreeze MVector s (Int, Int)
Mutable Vector (PrimState (ST s)) (Int, Int)
withIndexes

changingPoints :: (Eq a, VU.Unbox a) => VU.Vector (Int, a) -> [Int]
changingPoints :: forall a. (Eq a, Unbox a) => Vector (Int, a) -> [Int]
changingPoints Vector (Int, a)
vs = Vector (Int, a) -> Int
forall (v :: * -> *) a. Vector v a => v a -> Int
VG.length Vector (Int, a)
vs Int -> [Int] -> [Int]
forall a. a -> [a] -> [a]
: (([Int], a) -> [Int]
forall a b. (a, b) -> a
fst ((([Int], a) -> Int -> (Int, a) -> ([Int], a))
-> ([Int], a) -> Vector (Int, a) -> ([Int], a)
forall b a. Unbox b => (a -> Int -> b -> a) -> a -> Vector b -> a
VU.ifoldl ([Int], a) -> Int -> (Int, a) -> ([Int], a)
forall {b} {a} {a}. Eq b => ([a], b) -> a -> (a, b) -> ([a], b)
findChangePoints ([Int], a)
initialState Vector (Int, a)
vs))
  where
    initialState :: ([Int], a)
initialState = ([Int
0], (Int, a) -> a
forall a b. (a, b) -> b
snd (Vector (Int, a) -> (Int, a)
forall (v :: * -> *) a. Vector v a => v a -> a
VG.head Vector (Int, a)
vs))
    findChangePoints :: ([a], b) -> a -> (a, b) -> ([a], b)
findChangePoints ([a]
offsets, b
currentVal) a
index (a
_, b
newVal)
        | b
currentVal b -> b -> Bool
forall a. Eq a => a -> a -> Bool
== b
newVal = ([a]
offsets, b
currentVal)
        | Bool
otherwise = (a
index a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
offsets, b
newVal)

mkRowRep :: [Int] -> DataFrame -> Int -> Int
mkRowRep :: [Int] -> DataFrame -> Int -> Int
mkRowRep [Int]
groupColumnIndices DataFrame
df Int
i = case [Int]
h of
    (Int
x : []) -> Int
x
    [Int]
xs -> [Int] -> Int
forall a. Hashable a => a -> Int
hash [Int]
h
  where
    h :: [Int]
h = ((Int -> Int) -> [Int] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map Int -> Int
mkHash [Int]
groupColumnIndices)
    getHashedElem :: Column -> Int -> Int
    getHashedElem :: Column -> Int -> Int
getHashedElem (BoxedColumn (Vector a
c :: V.Vector a)) Int
j = forall a. Columnable a => a -> Int
hash' @a (Vector a
c Vector a -> Int -> a
forall a. Vector a -> Int -> a
V.! Int
j)
    getHashedElem (UnboxedColumn (Vector a
c :: VU.Vector a)) Int
j = forall a. Columnable a => a -> Int
hash' @a (Vector a
c Vector a -> Int -> a
forall a. Unbox a => Vector a -> Int -> a
VU.! Int
j)
    getHashedElem (OptionalColumn (Vector (Maybe a)
c :: V.Vector a)) Int
j = forall a. Columnable a => a -> Int
hash' @a (Vector (Maybe a)
c Vector (Maybe a) -> Int -> Maybe a
forall a. Vector a -> Int -> a
V.! Int
j)
    mkHash :: Int -> Int
mkHash Int
j = Column -> Int -> Int
getHashedElem (Vector Column -> Int -> Column
forall a. Vector a -> Int -> a
(V.!) (DataFrame -> Vector Column
columns DataFrame
df) Int
j) Int
i

{- | This hash function returns the hash when given a non numeric type but
the value when given a numeric.
-}
hash' :: (Columnable a) => a -> Int
hash' :: forall a. Columnable a => a -> Int
hash' a
value = case TypeRep a -> TypeRep Double -> Maybe (a :~: Double)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (a -> TypeRep a
forall a. Typeable a => a -> TypeRep a
typeOf a
value) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @Double) of
    Just a :~: Double
Refl -> a -> Int
forall b. Integral b => a -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (a -> Int) -> a -> Int
forall a b. (a -> b) -> a -> b
$ a
value a -> a -> a
forall a. Num a => a -> a -> a
* a
1000
    Maybe (a :~: Double)
Nothing -> case TypeRep a -> TypeRep Int -> Maybe (a :~: Int)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (a -> TypeRep a
forall a. Typeable a => a -> TypeRep a
typeOf a
value) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @Int) of
        Just a :~: Int
Refl -> a
Int
value
        Maybe (a :~: Int)
Nothing -> case TypeRep a -> TypeRep Text -> Maybe (a :~: Text)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (a -> TypeRep a
forall a. Typeable a => a -> TypeRep a
typeOf a
value) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @T.Text) of
            Just a :~: Text
Refl -> a -> Int
forall a. Hashable a => a -> Int
hash a
value
            Maybe (a :~: Text)
Nothing -> String -> Int
forall a. Hashable a => a -> Int
hash (a -> String
forall a. Show a => a -> String
show a
value)

mkGroupedColumns ::
    VU.Vector Int -> DataFrame -> DataFrame -> T.Text -> DataFrame
mkGroupedColumns :: Vector Int -> DataFrame -> DataFrame -> Text -> DataFrame
mkGroupedColumns Vector Int
indices DataFrame
df DataFrame
acc Text
name =
    case Vector Column -> Int -> Column
forall a. Vector a -> Int -> a
(V.!) (DataFrame -> Vector Column
columns DataFrame
df) (DataFrame -> Map Text Int
columnIndices DataFrame
df Map Text Int -> Text -> Int
forall k a. Ord k => Map k a -> k -> a
M.! Text
name) of
        BoxedColumn Vector a
column ->
            let vs :: Vector a
vs = Vector Int
indices Vector Int -> Vector a -> Vector a
forall a. Vector Int -> Vector a -> Vector a
`getIndices` Vector a
column
             in Text -> Vector a -> DataFrame -> DataFrame
forall a.
Columnable a =>
Text -> Vector a -> DataFrame -> DataFrame
insertVector Text
name Vector a
vs DataFrame
acc
        OptionalColumn Vector (Maybe a)
column ->
            let vs :: Vector (Maybe a)
vs = Vector Int
indices Vector Int -> Vector (Maybe a) -> Vector (Maybe a)
forall a. Vector Int -> Vector a -> Vector a
`getIndices` Vector (Maybe a)
column
             in Text -> Vector (Maybe a) -> DataFrame -> DataFrame
forall a.
Columnable a =>
Text -> Vector a -> DataFrame -> DataFrame
insertVector Text
name Vector (Maybe a)
vs DataFrame
acc
        UnboxedColumn Vector a
column ->
            let vs :: Vector a
vs = Vector Int
indices Vector Int -> Vector a -> Vector a
forall a. Unbox a => Vector Int -> Vector a -> Vector a
`getIndicesUnboxed` Vector a
column
             in Text -> Vector a -> DataFrame -> DataFrame
forall a.
(Columnable a, Unbox a) =>
Text -> Vector a -> DataFrame -> DataFrame
insertUnboxedVector Text
name Vector a
vs DataFrame
acc

{- | Aggregate a grouped dataframe using the expressions given.
All ungrouped columns will be dropped.
-}
aggregate :: [(T.Text, UExpr)] -> GroupedDataFrame -> DataFrame
aggregate :: [(Text, UExpr)] -> GroupedDataFrame -> DataFrame
aggregate [(Text, UExpr)]
aggs gdf :: GroupedDataFrame
gdf@(Grouped DataFrame
df [Text]
groupingColumns Vector Int
valueIndices Vector Int
offsets) =
    let
        df' :: DataFrame
df' =
            Vector Int -> DataFrame -> DataFrame
selectIndices
                ((Int -> Int) -> Vector Int -> Vector Int
forall (v :: * -> *) a b.
(Vector v a, Vector v b) =>
(a -> b) -> v a -> v b
VG.map (Vector Int
valueIndices Vector Int -> Int -> Int
forall (v :: * -> *) a.
(HasCallStack, Vector v a) =>
v a -> Int -> a
VG.!) (Vector Int -> Vector Int
forall (v :: * -> *) a. Vector v a => v a -> v a
VG.init Vector Int
offsets))
                ([Text] -> DataFrame -> DataFrame
select [Text]
groupingColumns DataFrame
df)

        f :: (Text, UExpr) -> DataFrame -> DataFrame
f (Text
name, Wrap (Expr a
expr :: Expr a)) DataFrame
d =
            let
                value :: Column
value = case forall a.
Columnable a =>
GroupedDataFrame
-> Expr a -> Either DataFrameException (AggregationResult a)
interpretAggregation @a GroupedDataFrame
gdf Expr a
expr of
                    Left DataFrameException
e -> DataFrameException -> Column
forall a e. Exception e => e -> a
throw DataFrameException
e
                    Right (UnAggregated Column
_) -> DataFrameException -> Column
forall a e. Exception e => e -> a
throw (DataFrameException -> Column) -> DataFrameException -> Column
forall a b. (a -> b) -> a -> b
$ Text -> DataFrameException
UnaggregatedException (String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Expr a -> String
forall a. Show a => a -> String
show Expr a
expr)
                    Right (Aggregated (TColumn Column
col)) -> Column
col
             in
                Text -> Column -> DataFrame -> DataFrame
insertColumn Text
name Column
value DataFrame
d
     in
        ((Text, UExpr) -> DataFrame -> DataFrame)
-> [(Text, UExpr)] -> DataFrame -> DataFrame
forall a.
(a -> DataFrame -> DataFrame) -> [a] -> DataFrame -> DataFrame
fold (Text, UExpr) -> DataFrame -> DataFrame
f [(Text, UExpr)]
aggs DataFrame
df'

selectIndices :: VU.Vector Int -> DataFrame -> DataFrame
selectIndices :: Vector Int -> DataFrame -> DataFrame
selectIndices Vector Int
xs DataFrame
df =
    DataFrame
df
        { columns = VG.map (atIndicesStable xs) (columns df)
        , dataframeDimensions = (VG.length xs, VG.length (columns df))
        }

-- | Filter out all non-unique values in a dataframe.
distinct :: DataFrame -> DataFrame
distinct :: DataFrame -> DataFrame
distinct DataFrame
df = Vector Int -> DataFrame -> DataFrame
selectIndices ((Int -> Int) -> Vector Int -> Vector Int
forall (v :: * -> *) a b.
(Vector v a, Vector v b) =>
(a -> b) -> v a -> v b
VG.map (Vector Int
indices Vector Int -> Int -> Int
forall (v :: * -> *) a.
(HasCallStack, Vector v a) =>
v a -> Int -> a
VG.!) (Vector Int -> Vector Int
forall (v :: * -> *) a. Vector v a => v a -> v a
VG.init Vector Int
os)) DataFrame
df
  where
    (Grouped DataFrame
_ [Text]
_ Vector Int
indices Vector Int
os) = [Text] -> DataFrame -> GroupedDataFrame
groupBy (DataFrame -> [Text]
columnNames DataFrame
df) DataFrame
df