{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

module DataFrame.Operations.Join where

import Control.Applicative ((<|>))
import Control.Monad (forM_, when)
import Control.Monad.ST (ST, runST)
import qualified Data.HashMap.Strict as HM
import Data.List (foldl')
import qualified Data.Map.Strict as M
import Data.Maybe (fromMaybe)
import Data.STRef (newSTRef, readSTRef, writeSTRef)
import qualified Data.Set as S
import qualified Data.Text as T
import Data.Type.Equality (TestEquality (..))
import qualified Data.Vector as VB
import qualified Data.Vector.Algorithms.Merge as VA
import qualified Data.Vector.Unboxed as VU
import qualified Data.Vector.Unboxed.Mutable as VUM
import DataFrame.Internal.Column as D
import DataFrame.Internal.DataFrame as D
import DataFrame.Operations.Aggregation as D
import DataFrame.Operations.Core as D
import Type.Reflection

-- | Equivalent to SQL join types.
data JoinType
    = INNER
    | LEFT
    | RIGHT
    | FULL_OUTER
    deriving (Int -> JoinType -> ShowS
[JoinType] -> ShowS
JoinType -> String
(Int -> JoinType -> ShowS)
-> (JoinType -> String) -> ([JoinType] -> ShowS) -> Show JoinType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> JoinType -> ShowS
showsPrec :: Int -> JoinType -> ShowS
$cshow :: JoinType -> String
show :: JoinType -> String
$cshowList :: [JoinType] -> ShowS
showList :: [JoinType] -> ShowS
Show)

-- | Join two dataframes using SQL join semantics.
join ::
    JoinType ->
    [T.Text] ->
    DataFrame -> -- Right hand side
    DataFrame -> -- Left hand side
    DataFrame
join :: JoinType -> [Text] -> DataFrame -> DataFrame -> DataFrame
join JoinType
INNER [Text]
xs DataFrame
right = [Text] -> DataFrame -> DataFrame -> DataFrame
innerJoin [Text]
xs DataFrame
right
join JoinType
LEFT [Text]
xs DataFrame
right = [Text] -> DataFrame -> DataFrame -> DataFrame
leftJoin [Text]
xs DataFrame
right
join JoinType
RIGHT [Text]
xs DataFrame
right = [Text] -> DataFrame -> DataFrame -> DataFrame
rightJoin [Text]
xs DataFrame
right
join JoinType
FULL_OUTER [Text]
xs DataFrame
right = [Text] -> DataFrame -> DataFrame -> DataFrame
fullOuterJoin [Text]
xs DataFrame
right

{- | Row-count threshold for the build side.
When the build side exceeds this, sort-merge join is used
instead of hash join to avoid L3 cache thrashing.
-}
joinStrategyThreshold :: Int
joinStrategyThreshold :: Int
joinStrategyThreshold = Int
500_000

{- | A compact index mapping hash values to contiguous slices of
original row indices. All indices live in a single unboxed vector;
the HashMap stores @(offset, length)@ into that vector.
-}
data CompactIndex = CompactIndex
    { CompactIndex -> Vector Int
ciSortedIndices :: {-# UNPACK #-} !(VU.Vector Int)
    , CompactIndex -> HashMap Int (Int, Int)
ciOffsets :: !(HM.HashMap Int (Int, Int))
    }

{- | Build a compact index from a vector of row hashes.
Sorts @(hash, originalIndex)@ pairs by hash, then scans for
contiguous runs to populate the offset map.
-}
buildCompactIndex :: VU.Vector Int -> CompactIndex
buildCompactIndex :: Vector Int -> CompactIndex
buildCompactIndex Vector Int
hashes =
    let n :: Int
n = Vector Int -> Int
forall a. Unbox a => Vector a -> Int
VU.length Vector Int
hashes
        (Vector Int
sortedHashes, Vector Int
sortedIndices) = Vector Int -> (Vector Int, Vector Int)
sortWithIndices Vector Int
hashes
        !offsets :: HashMap Int (Int, Int)
offsets = Vector Int
-> Int -> Int -> HashMap Int (Int, Int) -> HashMap Int (Int, Int)
buildOffsets Vector Int
sortedHashes Int
n Int
0 HashMap Int (Int, Int)
forall k v. HashMap k v
HM.empty
     in Vector Int -> HashMap Int (Int, Int) -> CompactIndex
CompactIndex Vector Int
sortedIndices HashMap Int (Int, Int)
offsets
  where
    buildOffsets ::
        VU.Vector Int ->
        Int ->
        Int ->
        HM.HashMap Int (Int, Int) ->
        HM.HashMap Int (Int, Int)
    buildOffsets :: Vector Int
-> Int -> Int -> HashMap Int (Int, Int) -> HashMap Int (Int, Int)
buildOffsets !Vector Int
sh !Int
n !Int
i !HashMap Int (Int, Int)
acc
        | Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n = HashMap Int (Int, Int)
acc
        | Bool
otherwise =
            let !h :: Int
h = Vector Int
sh Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
i
                !end :: Int
end = Vector Int -> Int -> Int -> Int -> Int
findGroupEnd Vector Int
sh Int
h (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int
n
             in Vector Int
-> Int -> Int -> HashMap Int (Int, Int) -> HashMap Int (Int, Int)
buildOffsets Vector Int
sh Int
n Int
end (Int
-> (Int, Int) -> HashMap Int (Int, Int) -> HashMap Int (Int, Int)
forall k v. Hashable k => k -> v -> HashMap k v -> HashMap k v
HM.insert Int
h (Int
i, Int
end Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
i) HashMap Int (Int, Int)
acc)

-- | Find the end of a contiguous run of equal values starting at @j@.
findGroupEnd :: VU.Vector Int -> Int -> Int -> Int -> Int
findGroupEnd :: Vector Int -> Int -> Int -> Int -> Int
findGroupEnd !Vector Int
v !Int
h !Int
j !Int
n
    | Int
j Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n = Int
j
    | Vector Int
v Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
j Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
h = Vector Int -> Int -> Int -> Int -> Int
findGroupEnd Vector Int
v Int
h (Int
j Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int
n
    | Bool
otherwise = Int
j
{-# INLINE findGroupEnd #-}

{- | Sort a hash vector, returning sorted hashes and corresponding original indices.
Sorts an index array using hash values as the comparison key, avoiding the
intermediate pair vector used by the naive zip-then-sort approach.
-}
sortWithIndices :: VU.Vector Int -> (VU.Vector Int, VU.Vector Int)
sortWithIndices :: Vector Int -> (Vector Int, Vector Int)
sortWithIndices Vector Int
hashes = (forall s. ST s (Vector Int, Vector Int))
-> (Vector Int, Vector Int)
forall a. (forall s. ST s a) -> a
runST ((forall s. ST s (Vector Int, Vector Int))
 -> (Vector Int, Vector Int))
-> (forall s. ST s (Vector Int, Vector Int))
-> (Vector Int, Vector Int)
forall a b. (a -> b) -> a -> b
$ do
    let n :: Int
n = Vector Int -> Int
forall a. Unbox a => Vector a -> Int
VU.length Vector Int
hashes
    MVector s Int
mv <- Vector Int -> ST s (MVector (PrimState (ST s)) Int)
forall a (m :: * -> *).
(Unbox a, PrimMonad m) =>
Vector a -> m (MVector (PrimState m) a)
VU.thaw (Int -> Int -> Vector Int
forall a. (Unbox a, Num a) => a -> Int -> Vector a
VU.enumFromN Int
0 Int
n)
    Comparison Int -> MVector (PrimState (ST s)) Int -> ST s ()
forall (m :: * -> *) (v :: * -> * -> *) e.
(PrimMonad m, MVector v e) =>
Comparison e -> v (PrimState m) e -> m ()
VA.sortBy
        (\Int
i Int
j -> Comparison Int
forall a. Ord a => a -> a -> Ordering
compare (Vector Int
hashes Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
i) (Vector Int
hashes Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
j))
        MVector s Int
MVector (PrimState (ST s)) Int
mv
    Vector Int
sortedIdxs <- MVector (PrimState (ST s)) Int -> ST s (Vector Int)
forall a (m :: * -> *).
(Unbox a, PrimMonad m) =>
MVector (PrimState m) a -> m (Vector a)
VU.unsafeFreeze MVector s Int
MVector (PrimState (ST s)) Int
mv
    (Vector Int, Vector Int) -> ST s (Vector Int, Vector Int)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Vector Int -> Vector Int -> Vector Int
forall a. Unbox a => Vector a -> Vector Int -> Vector a
VU.unsafeBackpermute Vector Int
hashes Vector Int
sortedIdxs, Vector Int
sortedIdxs)

-- | Write the cross product of two index ranges into mutable vectors.
fillCrossProduct ::
    VU.Vector Int ->
    VU.Vector Int ->
    Int ->
    Int ->
    Int ->
    Int ->
    VUM.MVector s Int ->
    VUM.MVector s Int ->
    Int ->
    ST s ()
fillCrossProduct :: forall s.
Vector Int
-> Vector Int
-> Int
-> Int
-> Int
-> Int
-> MVector s Int
-> MVector s Int
-> Int
-> ST s ()
fillCrossProduct !Vector Int
leftSI !Vector Int
rightSI !Int
lStart !Int
lEnd !Int
rStart !Int
rEnd !MVector s Int
lv !MVector s Int
rv !Int
pos = Int -> Int -> ST s ()
goL Int
lStart Int
pos
  where
    !rLen :: Int
rLen = Int
rEnd Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
rStart
    goL :: Int -> Int -> ST s ()
goL !Int
li !Int
p
        | Int
li Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
lEnd = () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        | Bool
otherwise = do
            let !lOrigIdx :: Int
lOrigIdx = Vector Int
leftSI Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
li
            Int -> Int -> Int -> ST s ()
goR Int
lOrigIdx Int
rStart Int
p
            Int -> Int -> ST s ()
goL (Int
li Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
rLen)
    goR :: Int -> Int -> Int -> ST s ()
goR !Int
lOrigIdx !Int
ri !Int
q
        | Int
ri Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
rEnd = () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        | Bool
otherwise = do
            MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
lv Int
q Int
lOrigIdx
            MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
rv Int
q (Vector Int
rightSI Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
ri)
            Int -> Int -> Int -> ST s ()
goR Int
lOrigIdx (Int
ri Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) (Int
q Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
{-# INLINE fillCrossProduct #-}

-- | Compute key-column indices from the column index map.
keyColIndices :: S.Set T.Text -> DataFrame -> [Int]
keyColIndices :: Set Text -> DataFrame -> [Int]
keyColIndices Set Text
csSet DataFrame
df = Map Text Int -> [Int]
forall k a. Map k a -> [a]
M.elems (Map Text Int -> [Int]) -> Map Text Int -> [Int]
forall a b. (a -> b) -> a -> b
$ Map Text Int -> Set Text -> Map Text Int
forall k a. Ord k => Map k a -> Set k -> Map k a
M.restrictKeys (DataFrame -> Map Text Int
D.columnIndices DataFrame
df) Set Text
csSet

-- ============================================================
-- Inner Join
-- ============================================================

{- | Performs an inner join on two dataframes using the specified key columns.
Returns only rows where the key values exist in both dataframes.

==== __Example__
@
ghci> df = D.fromNamedColumns [("key", D.fromList ["K0", "K1", "K2", "K3"]), ("A", D.fromList ["A0", "A1", "A2", "A3"])]
ghci> other = D.fromNamedColumns [("key", D.fromList ["K0", "K1", "K2"]), ("B", D.fromList ["B0", "B1", "B2"])]
ghci> D.innerJoin ["key"] df other

-----------------
 key  |  A  |  B
------|-----|----
 Text | Text| Text
------|-----|----
 K0   | A0  | B0
 K1   | A1  | B1
 K2   | A2  | B2

@
-}
innerJoin :: [T.Text] -> DataFrame -> DataFrame -> DataFrame
innerJoin :: [Text] -> DataFrame -> DataFrame -> DataFrame
innerJoin [Text]
cs DataFrame
left DataFrame
right
    | DataFrame -> Bool
D.null DataFrame
right Bool -> Bool -> Bool
|| DataFrame -> Bool
D.null DataFrame
left = DataFrame
D.empty
    | Bool
otherwise =
        let
            csSet :: Set Text
csSet = [Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList [Text]
cs
            leftRows :: Int
leftRows = (Int, Int) -> Int
forall a b. (a, b) -> a
fst (DataFrame -> (Int, Int)
D.dimensions DataFrame
left)
            rightRows :: Int
rightRows = (Int, Int) -> Int
forall a b. (a, b) -> a
fst (DataFrame -> (Int, Int)
D.dimensions DataFrame
right)

            leftKeyIdxs :: [Int]
leftKeyIdxs = Set Text -> DataFrame -> [Int]
keyColIndices Set Text
csSet DataFrame
left
            rightKeyIdxs :: [Int]
rightKeyIdxs = Set Text -> DataFrame -> [Int]
keyColIndices Set Text
csSet DataFrame
right
            leftHashes :: Vector Int
leftHashes = [Int] -> DataFrame -> Vector Int
D.computeRowHashes [Int]
leftKeyIdxs DataFrame
left
            rightHashes :: Vector Int
rightHashes = [Int] -> DataFrame -> Vector Int
D.computeRowHashes [Int]
rightKeyIdxs DataFrame
right

            buildRows :: Int
buildRows = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
leftRows Int
rightRows
            (Vector Int
leftIxs, Vector Int
rightIxs)
                | Int
buildRows Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
joinStrategyThreshold =
                    Vector Int -> Vector Int -> (Vector Int, Vector Int)
sortMergeInnerKernel Vector Int
leftHashes Vector Int
rightHashes
                | Int
rightRows Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
leftRows =
                    -- Build on right (smaller or equal), probe with left
                    Vector Int -> Vector Int -> (Vector Int, Vector Int)
hashInnerKernel Vector Int
leftHashes Vector Int
rightHashes
                | Bool
otherwise =
                    -- Build on left (smaller), probe with right, swap result
                    let (!Vector Int
rIxs, !Vector Int
lIxs) = Vector Int -> Vector Int -> (Vector Int, Vector Int)
hashInnerKernel Vector Int
rightHashes Vector Int
leftHashes
                     in (Vector Int
lIxs, Vector Int
rIxs)
         in
            Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame
assembleInner Set Text
csSet DataFrame
left DataFrame
right Vector Int
leftIxs Vector Int
rightIxs

-- | Compute hashes for the given key column names in a DataFrame.
buildHashColumn :: [T.Text] -> DataFrame -> VU.Vector Int
buildHashColumn :: [Text] -> DataFrame -> Vector Int
buildHashColumn [Text]
keys DataFrame
df =
    let csSet :: Set Text
csSet = [Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList [Text]
keys
        keyIdxs :: [Int]
keyIdxs = Set Text -> DataFrame -> [Int]
keyColIndices Set Text
csSet DataFrame
df
     in [Int] -> DataFrame -> Vector Int
D.computeRowHashes [Int]
keyIdxs DataFrame
df

{- | Probe one batch of rows against a pre-built 'CompactIndex'.
Returns @(probeExpandedIxs, buildExpandedIxs)@.
Unlike 'hashInnerKernel', does not build the index (it is pre-built once)
and has no cross-product row guard — the caller controls probe batch size.
-}
hashProbeKernel ::
    -- | Built once from the full right\/build side.
    CompactIndex ->
    -- | Probe hashes (one batch).
    VU.Vector Int ->
    (VU.Vector Int, VU.Vector Int)
hashProbeKernel :: CompactIndex -> Vector Int -> (Vector Int, Vector Int)
hashProbeKernel CompactIndex
ci Vector Int
probeHashes =
    let ciIxs :: Vector Int
ciIxs = CompactIndex -> Vector Int
ciSortedIndices CompactIndex
ci
        ciOff :: HashMap Int (Int, Int)
ciOff = CompactIndex -> HashMap Int (Int, Int)
ciOffsets CompactIndex
ci
        (Vector Int
pFrozen, Vector Int
bFrozen) = (forall s. ST s (Vector Int, Vector Int))
-> (Vector Int, Vector Int)
forall a. (forall s. ST s a) -> a
runST ((forall s. ST s (Vector Int, Vector Int))
 -> (Vector Int, Vector Int))
-> (forall s. ST s (Vector Int, Vector Int))
-> (Vector Int, Vector Int)
forall a b. (a -> b) -> a -> b
$ do
            let !probeN :: Int
probeN = Vector Int -> Int
forall a. Unbox a => Vector a -> Int
VU.length Vector Int
probeHashes
                initCap :: Int
initCap = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1 (Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
probeN Int
1_000_000)

            MVector s Int
initPv <- Int -> ST s (MVector (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
Int -> m (MVector (PrimState m) a)
VUM.unsafeNew Int
initCap
            MVector s Int
initBv <- Int -> ST s (MVector (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
Int -> m (MVector (PrimState m) a)
VUM.unsafeNew Int
initCap
            STRef s (MVector s Int)
pvRef <- MVector s Int -> ST s (STRef s (MVector s Int))
forall a s. a -> ST s (STRef s a)
newSTRef MVector s Int
initPv
            STRef s (MVector s Int)
bvRef <- MVector s Int -> ST s (STRef s (MVector s Int))
forall a s. a -> ST s (STRef s a)
newSTRef MVector s Int
initBv
            STRef s Int
capRef <- Int -> ST s (STRef s Int)
forall a s. a -> ST s (STRef s a)
newSTRef Int
initCap
            STRef s Int
posRef <- Int -> ST s (STRef s Int)
forall a s. a -> ST s (STRef s a)
newSTRef (Int
0 :: Int)

            let ensureCapacity :: Int -> ST s ()
ensureCapacity Int
needed = do
                    Int
cap <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
capRef
                    Bool -> ST s () -> ST s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
needed Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
cap) (ST s () -> ST s ()) -> ST s () -> ST s ()
forall a b. (a -> b) -> a -> b
$ do
                        let newCap :: Int
newCap = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
needed (Int
cap Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
2)
                            delta :: Int
delta = Int
newCap Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
cap
                        MVector s Int
pv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
pvRef
                        MVector s Int
bv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
bvRef
                        MVector s Int
newPv <- MVector (PrimState (ST s)) Int
-> Int -> ST s (MVector (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> m (MVector (PrimState m) a)
VUM.unsafeGrow MVector s Int
MVector (PrimState (ST s)) Int
pv Int
delta
                        MVector s Int
newBv <- MVector (PrimState (ST s)) Int
-> Int -> ST s (MVector (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> m (MVector (PrimState m) a)
VUM.unsafeGrow MVector s Int
MVector (PrimState (ST s)) Int
bv Int
delta
                        STRef s (MVector s Int) -> MVector s Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s (MVector s Int)
pvRef MVector s Int
newPv
                        STRef s (MVector s Int) -> MVector s Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s (MVector s Int)
bvRef MVector s Int
newBv
                        STRef s Int -> Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s Int
capRef Int
newCap

                go :: Int -> ST s ()
go !Int
i
                    | Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
probeN = () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                    | Bool
otherwise = do
                        let !h :: Int
h = Vector Int
probeHashes Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
i
                        case Int -> HashMap Int (Int, Int) -> Maybe (Int, Int)
forall k v. Hashable k => k -> HashMap k v -> Maybe v
HM.lookup Int
h HashMap Int (Int, Int)
ciOff of
                            Maybe (Int, Int)
Nothing -> Int -> ST s ()
go (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                            Just (!Int
start, !Int
len) -> do
                                !Int
p <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
posRef
                                Int -> ST s ()
ensureCapacity (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
len)
                                MVector s Int
pv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
pvRef
                                MVector s Int
bv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
bvRef
                                Int
-> Int
-> Int
-> Int
-> Int
-> MVector s Int
-> MVector s Int
-> ST s ()
fillBuild Int
i Int
start Int
len Int
p Int
0 MVector s Int
pv MVector s Int
bv
                                STRef s Int -> Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s Int
posRef (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
len)
                                Int -> ST s ()
go (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                fillBuild :: Int
-> Int
-> Int
-> Int
-> Int
-> MVector s Int
-> MVector s Int
-> ST s ()
fillBuild !Int
probeIdx !Int
start !Int
len !Int
p !Int
j !MVector s Int
pv !MVector s Int
bv
                    | Int
j Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
len = () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                    | Bool
otherwise = do
                        MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
pv (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
j) Int
probeIdx
                        MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
bv (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
j) (Vector Int
ciIxs Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` (Int
start Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
j))
                        Int
-> Int
-> Int
-> Int
-> Int
-> MVector s Int
-> MVector s Int
-> ST s ()
fillBuild Int
probeIdx Int
start Int
len Int
p (Int
j Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) MVector s Int
pv MVector s Int
bv
            Int -> ST s ()
go Int
0

            !Int
total <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
posRef
            MVector s Int
pv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
pvRef
            MVector s Int
bv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
bvRef
            (,)
                (Vector Int -> Vector Int -> (Vector Int, Vector Int))
-> ST s (Vector Int)
-> ST s (Vector Int -> (Vector Int, Vector Int))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MVector (PrimState (ST s)) Int -> ST s (Vector Int)
forall a (m :: * -> *).
(Unbox a, PrimMonad m) =>
MVector (PrimState m) a -> m (Vector a)
VU.unsafeFreeze (Int -> Int -> MVector s Int -> MVector s Int
forall a s. Unbox a => Int -> Int -> MVector s a -> MVector s a
VUM.slice Int
0 Int
total MVector s Int
pv)
                ST s (Vector Int -> (Vector Int, Vector Int))
-> ST s (Vector Int) -> ST s (Vector Int, Vector Int)
forall a b. ST s (a -> b) -> ST s a -> ST s b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> MVector (PrimState (ST s)) Int -> ST s (Vector Int)
forall a (m :: * -> *).
(Unbox a, PrimMonad m) =>
MVector (PrimState m) a -> m (Vector a)
VU.unsafeFreeze (Int -> Int -> MVector s Int -> MVector s Int
forall a s. Unbox a => Int -> Int -> MVector s a -> MVector s a
VUM.slice Int
0 Int
total MVector s Int
bv)
     in (Vector Int -> Vector Int
forall a. Unbox a => Vector a -> Vector a
VU.force Vector Int
pFrozen, Vector Int -> Vector Int
forall a. Unbox a => Vector a -> Vector a
VU.force Vector Int
bFrozen)

{- | Hash-based inner join kernel.
Builds compact index on @buildHashes@ (second arg), probes with
@probeHashes@ (first arg).
Returns @(probeExpandedIndices, buildExpandedIndices)@.
Uses a dynamically growing output buffer to avoid pre-allocating the full
cross-product size (which can be astronomically large for low-cardinality keys).
-}

{- | Maximum number of output rows allowed from a join kernel.
Exceeding this limit indicates a cross-product explosion (e.g. low-cardinality keys).
-}
maxJoinOutputRows :: Int
maxJoinOutputRows :: Int
maxJoinOutputRows = Int
500_000_000

hashInnerKernel ::
    VU.Vector Int -> VU.Vector Int -> (VU.Vector Int, VU.Vector Int)
hashInnerKernel :: Vector Int -> Vector Int -> (Vector Int, Vector Int)
hashInnerKernel Vector Int
probeHashes Vector Int
buildHashes =
    let (Vector Int
pFrozen, Vector Int
bFrozen) = (forall s. ST s (Vector Int, Vector Int))
-> (Vector Int, Vector Int)
forall a. (forall s. ST s a) -> a
runST ((forall s. ST s (Vector Int, Vector Int))
 -> (Vector Int, Vector Int))
-> (forall s. ST s (Vector Int, Vector Int))
-> (Vector Int, Vector Int)
forall a b. (a -> b) -> a -> b
$ do
            let ci :: CompactIndex
ci = Vector Int -> CompactIndex
buildCompactIndex Vector Int
buildHashes
                ciIxs :: Vector Int
ciIxs = CompactIndex -> Vector Int
ciSortedIndices CompactIndex
ci
                ciOff :: HashMap Int (Int, Int)
ciOff = CompactIndex -> HashMap Int (Int, Int)
ciOffsets CompactIndex
ci
                !probeN :: Int
probeN = Vector Int -> Int
forall a. Unbox a => Vector a -> Int
VU.length Vector Int
probeHashes
                !buildN :: Int
buildN = Vector Int -> Int
forall a. Unbox a => Vector a -> Int
VU.length Vector Int
buildHashes
                initCap :: Int
initCap = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1 (Int -> Int -> Int
forall a. Ord a => a -> a -> a
min (Int
probeN Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
buildN) Int
1_000_000)

            MVector s Int
initPv <- Int -> ST s (MVector (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
Int -> m (MVector (PrimState m) a)
VUM.unsafeNew Int
initCap
            MVector s Int
initBv <- Int -> ST s (MVector (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
Int -> m (MVector (PrimState m) a)
VUM.unsafeNew Int
initCap
            STRef s (MVector s Int)
pvRef <- MVector s Int -> ST s (STRef s (MVector s Int))
forall a s. a -> ST s (STRef s a)
newSTRef MVector s Int
initPv
            STRef s (MVector s Int)
bvRef <- MVector s Int -> ST s (STRef s (MVector s Int))
forall a s. a -> ST s (STRef s a)
newSTRef MVector s Int
initBv
            STRef s Int
capRef <- Int -> ST s (STRef s Int)
forall a s. a -> ST s (STRef s a)
newSTRef Int
initCap
            STRef s Int
posRef <- Int -> ST s (STRef s Int)
forall a s. a -> ST s (STRef s a)
newSTRef (Int
0 :: Int)

            let ensureCapacity :: Int -> ST s ()
ensureCapacity Int
needed = do
                    Int
cap <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
capRef
                    Bool -> ST s () -> ST s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
needed Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
cap) (ST s () -> ST s ()) -> ST s () -> ST s ()
forall a b. (a -> b) -> a -> b
$ do
                        let newCap :: Int
newCap = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
needed (Int
cap Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
2)
                            delta :: Int
delta = Int
newCap Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
cap
                        MVector s Int
pv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
pvRef
                        MVector s Int
bv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
bvRef
                        MVector s Int
newPv <- MVector (PrimState (ST s)) Int
-> Int -> ST s (MVector (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> m (MVector (PrimState m) a)
VUM.unsafeGrow MVector s Int
MVector (PrimState (ST s)) Int
pv Int
delta
                        MVector s Int
newBv <- MVector (PrimState (ST s)) Int
-> Int -> ST s (MVector (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> m (MVector (PrimState m) a)
VUM.unsafeGrow MVector s Int
MVector (PrimState (ST s)) Int
bv Int
delta
                        STRef s (MVector s Int) -> MVector s Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s (MVector s Int)
pvRef MVector s Int
newPv
                        STRef s (MVector s Int) -> MVector s Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s (MVector s Int)
bvRef MVector s Int
newBv
                        STRef s Int -> Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s Int
capRef Int
newCap

                go :: Int -> ST s ()
go !Int
i
                    | Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
probeN = () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                    | Bool
otherwise = do
                        let !h :: Int
h = Vector Int
probeHashes Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
i
                        case Int -> HashMap Int (Int, Int) -> Maybe (Int, Int)
forall k v. Hashable k => k -> HashMap k v -> Maybe v
HM.lookup Int
h HashMap Int (Int, Int)
ciOff of
                            Maybe (Int, Int)
Nothing -> Int -> ST s ()
go (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                            Just (!Int
start, !Int
len) -> do
                                !Int
p <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
posRef
                                Bool -> ST s () -> ST s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
maxJoinOutputRows) (ST s () -> ST s ()) -> ST s () -> ST s ()
forall a b. (a -> b) -> a -> b
$
                                    String -> ST s ()
forall a. HasCallStack => String -> a
error (String -> ST s ()) -> String -> ST s ()
forall a b. (a -> b) -> a -> b
$
                                        String
"Join output would exceed "
                                            String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
maxJoinOutputRows
                                            String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" rows (cross-product explosion). "
                                            String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"Consider filtering or using higher-cardinality join keys or using the lazy API."
                                Int -> ST s ()
ensureCapacity (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
len)
                                MVector s Int
pv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
pvRef
                                MVector s Int
bv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
bvRef
                                Int
-> Int
-> Int
-> Int
-> Int
-> MVector s Int
-> MVector s Int
-> ST s ()
fillBuild Int
i Int
start Int
len Int
p Int
0 MVector s Int
pv MVector s Int
bv
                                STRef s Int -> Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s Int
posRef (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
len)
                                Int -> ST s ()
go (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                fillBuild :: Int
-> Int
-> Int
-> Int
-> Int
-> MVector s Int
-> MVector s Int
-> ST s ()
fillBuild !Int
probeIdx !Int
start !Int
len !Int
p !Int
j !MVector s Int
pv !MVector s Int
bv
                    | Int
j Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
len = () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                    | Bool
otherwise = do
                        MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
pv (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
j) Int
probeIdx
                        MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
bv (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
j) (Vector Int
ciIxs Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` (Int
start Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
j))
                        Int
-> Int
-> Int
-> Int
-> Int
-> MVector s Int
-> MVector s Int
-> ST s ()
fillBuild Int
probeIdx Int
start Int
len Int
p (Int
j Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) MVector s Int
pv MVector s Int
bv
            Int -> ST s ()
go Int
0

            !Int
total <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
posRef
            MVector s Int
pv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
pvRef
            MVector s Int
bv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
bvRef
            (,)
                (Vector Int -> Vector Int -> (Vector Int, Vector Int))
-> ST s (Vector Int)
-> ST s (Vector Int -> (Vector Int, Vector Int))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MVector (PrimState (ST s)) Int -> ST s (Vector Int)
forall a (m :: * -> *).
(Unbox a, PrimMonad m) =>
MVector (PrimState m) a -> m (Vector a)
VU.unsafeFreeze (Int -> Int -> MVector s Int -> MVector s Int
forall a s. Unbox a => Int -> Int -> MVector s a -> MVector s a
VUM.slice Int
0 Int
total MVector s Int
pv)
                ST s (Vector Int -> (Vector Int, Vector Int))
-> ST s (Vector Int) -> ST s (Vector Int, Vector Int)
forall a b. ST s (a -> b) -> ST s a -> ST s b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> MVector (PrimState (ST s)) Int -> ST s (Vector Int)
forall a (m :: * -> *).
(Unbox a, PrimMonad m) =>
MVector (PrimState m) a -> m (Vector a)
VU.unsafeFreeze (Int -> Int -> MVector s Int -> MVector s Int
forall a s. Unbox a => Int -> Int -> MVector s a -> MVector s a
VUM.slice Int
0 Int
total MVector s Int
bv)
     in -- VU.force copies the slice into a compact array, releasing the oversized
        -- backing buffer allocated by the doubling strategy.
        (Vector Int -> Vector Int
forall a. Unbox a => Vector a -> Vector a
VU.force Vector Int
pFrozen, Vector Int -> Vector Int
forall a. Unbox a => Vector a -> Vector a
VU.force Vector Int
bFrozen)

{- | Sort-merge inner join kernel.
Sorts both sides by hash, walks in lockstep.
Returns @(leftExpandedIndices, rightExpandedIndices)@.
Uses a dynamically growing output buffer instead of a two-pass count-then-allocate
strategy, which OOMs when low-cardinality keys produce large cross products.
-}
sortMergeInnerKernel ::
    VU.Vector Int -> VU.Vector Int -> (VU.Vector Int, VU.Vector Int)
sortMergeInnerKernel :: Vector Int -> Vector Int -> (Vector Int, Vector Int)
sortMergeInnerKernel Vector Int
leftHashes Vector Int
rightHashes =
    let (Vector Int
lFrozen, Vector Int
rFrozen) = (forall s. ST s (Vector Int, Vector Int))
-> (Vector Int, Vector Int)
forall a. (forall s. ST s a) -> a
runST ((forall s. ST s (Vector Int, Vector Int))
 -> (Vector Int, Vector Int))
-> (forall s. ST s (Vector Int, Vector Int))
-> (Vector Int, Vector Int)
forall a b. (a -> b) -> a -> b
$ do
            let (Vector Int
leftSH, Vector Int
leftSI) = Vector Int -> (Vector Int, Vector Int)
sortWithIndices Vector Int
leftHashes
                (Vector Int
rightSH, Vector Int
rightSI) = Vector Int -> (Vector Int, Vector Int)
sortWithIndices Vector Int
rightHashes
                !leftN :: Int
leftN = Vector Int -> Int
forall a. Unbox a => Vector a -> Int
VU.length Vector Int
leftHashes
                !rightN :: Int
rightN = Vector Int -> Int
forall a. Unbox a => Vector a -> Int
VU.length Vector Int
rightHashes
                initCap :: Int
initCap = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1 (Int -> Int -> Int
forall a. Ord a => a -> a -> a
min (Int
leftN Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
rightN) Int
1_000_000)

            MVector s Int
initLv <- Int -> ST s (MVector (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
Int -> m (MVector (PrimState m) a)
VUM.unsafeNew Int
initCap
            MVector s Int
initRv <- Int -> ST s (MVector (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
Int -> m (MVector (PrimState m) a)
VUM.unsafeNew Int
initCap
            STRef s (MVector s Int)
lvRef <- MVector s Int -> ST s (STRef s (MVector s Int))
forall a s. a -> ST s (STRef s a)
newSTRef MVector s Int
initLv
            STRef s (MVector s Int)
rvRef <- MVector s Int -> ST s (STRef s (MVector s Int))
forall a s. a -> ST s (STRef s a)
newSTRef MVector s Int
initRv
            STRef s Int
capRef <- Int -> ST s (STRef s Int)
forall a s. a -> ST s (STRef s a)
newSTRef Int
initCap
            STRef s Int
posRef <- Int -> ST s (STRef s Int)
forall a s. a -> ST s (STRef s a)
newSTRef (Int
0 :: Int)

            let ensureCapacity :: Int -> ST s ()
ensureCapacity Int
needed = do
                    Int
cap <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
capRef
                    Bool -> ST s () -> ST s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
needed Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
cap) (ST s () -> ST s ()) -> ST s () -> ST s ()
forall a b. (a -> b) -> a -> b
$ do
                        let newCap :: Int
newCap = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
needed (Int
cap Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
2)
                            delta :: Int
delta = Int
newCap Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
cap
                        MVector s Int
lv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
lvRef
                        MVector s Int
rv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
rvRef
                        MVector s Int
newLv <- MVector (PrimState (ST s)) Int
-> Int -> ST s (MVector (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> m (MVector (PrimState m) a)
VUM.unsafeGrow MVector s Int
MVector (PrimState (ST s)) Int
lv Int
delta
                        MVector s Int
newRv <- MVector (PrimState (ST s)) Int
-> Int -> ST s (MVector (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> m (MVector (PrimState m) a)
VUM.unsafeGrow MVector s Int
MVector (PrimState (ST s)) Int
rv Int
delta
                        STRef s (MVector s Int) -> MVector s Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s (MVector s Int)
lvRef MVector s Int
newLv
                        STRef s (MVector s Int) -> MVector s Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s (MVector s Int)
rvRef MVector s Int
newRv
                        STRef s Int -> Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s Int
capRef Int
newCap

                fillGroup :: Int -> Int -> Int -> Int -> ST s ()
fillGroup !Int
li !Int
lEnd !Int
ri !Int
rEnd = do
                    let !lLen :: Int
lLen = Int
lEnd Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
li
                        !rLen :: Int
rLen = Int
rEnd Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
ri
                        !groupSize :: Int
groupSize = Int
lLen Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
rLen
                    !Int
p <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
posRef
                    Bool -> ST s () -> ST s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
groupSize Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
maxJoinOutputRows) (ST s () -> ST s ()) -> ST s () -> ST s ()
forall a b. (a -> b) -> a -> b
$
                        String -> ST s ()
forall a. HasCallStack => String -> a
error (String -> ST s ()) -> String -> ST s ()
forall a b. (a -> b) -> a -> b
$
                            String
"Join output would exceed "
                                String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
maxJoinOutputRows
                                String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" rows (cross-product explosion with group sizes "
                                String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
lLen
                                String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" × "
                                String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
rLen
                                String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"). Consider filtering or using higher-cardinality join keys."
                    Int -> ST s ()
ensureCapacity (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
groupSize)
                    MVector s Int
lv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
lvRef
                    MVector s Int
rv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
rvRef
                    let goL :: Int -> Int -> ST s ()
goL !Int
lIdx !Int
pos
                            | Int
lIdx Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
lEnd = () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                            | Bool
otherwise = do
                                let !lOrig :: Int
lOrig = Vector Int
leftSI Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
lIdx
                                Int -> Int -> Int -> ST s ()
goR Int
lOrig Int
ri Int
pos
                                Int -> Int -> ST s ()
goL (Int
lIdx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) (Int
pos Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
rLen)
                        goR :: Int -> Int -> Int -> ST s ()
goR !Int
lOrig !Int
rIdx !Int
pos
                            | Int
rIdx Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
rEnd = () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                            | Bool
otherwise = do
                                MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
lv Int
pos Int
lOrig
                                MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
rv Int
pos (Vector Int
rightSI Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
rIdx)
                                Int -> Int -> Int -> ST s ()
goR Int
lOrig (Int
rIdx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) (Int
pos Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                    Int -> Int -> ST s ()
goL Int
li Int
p
                    STRef s Int -> Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s Int
posRef (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
groupSize)

                fill :: Int -> Int -> ST s ()
fill !Int
li !Int
ri
                    | Int
li Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
leftN Bool -> Bool -> Bool
|| Int
ri Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
rightN = () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                    | Int
lh Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
rh = Int -> Int -> ST s ()
fill (Int
li Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int
ri
                    | Int
lh Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
rh = Int -> Int -> ST s ()
fill Int
li (Int
ri Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                    | Bool
otherwise = do
                        let !lEnd :: Int
lEnd = Vector Int -> Int -> Int -> Int -> Int
findGroupEnd Vector Int
leftSH Int
lh (Int
li Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int
leftN
                            !rEnd :: Int
rEnd = Vector Int -> Int -> Int -> Int -> Int
findGroupEnd Vector Int
rightSH Int
rh (Int
ri Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int
rightN
                        Int -> Int -> Int -> Int -> ST s ()
fillGroup Int
li Int
lEnd Int
ri Int
rEnd
                        Int -> Int -> ST s ()
fill Int
lEnd Int
rEnd
                  where
                    !lh :: Int
lh = Vector Int
leftSH Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
li
                    !rh :: Int
rh = Vector Int
rightSH Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
ri

            Int -> Int -> ST s ()
fill Int
0 Int
0

            !Int
total <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
posRef
            MVector s Int
lv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
lvRef
            MVector s Int
rv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
rvRef
            (,)
                (Vector Int -> Vector Int -> (Vector Int, Vector Int))
-> ST s (Vector Int)
-> ST s (Vector Int -> (Vector Int, Vector Int))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MVector (PrimState (ST s)) Int -> ST s (Vector Int)
forall a (m :: * -> *).
(Unbox a, PrimMonad m) =>
MVector (PrimState m) a -> m (Vector a)
VU.unsafeFreeze (Int -> Int -> MVector s Int -> MVector s Int
forall a s. Unbox a => Int -> Int -> MVector s a -> MVector s a
VUM.slice Int
0 Int
total MVector s Int
lv)
                ST s (Vector Int -> (Vector Int, Vector Int))
-> ST s (Vector Int) -> ST s (Vector Int, Vector Int)
forall a b. ST s (a -> b) -> ST s a -> ST s b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> MVector (PrimState (ST s)) Int -> ST s (Vector Int)
forall a (m :: * -> *).
(Unbox a, PrimMonad m) =>
MVector (PrimState m) a -> m (Vector a)
VU.unsafeFreeze (Int -> Int -> MVector s Int -> MVector s Int
forall a s. Unbox a => Int -> Int -> MVector s a -> MVector s a
VUM.slice Int
0 Int
total MVector s Int
rv)
     in -- VU.force copies the slice into a compact array, releasing the oversized
        -- backing buffer allocated by the doubling strategy.
        (Vector Int -> Vector Int
forall a. Unbox a => Vector a -> Vector a
VU.force Vector Int
lFrozen, Vector Int -> Vector Int
forall a. Unbox a => Vector a -> Vector a
VU.force Vector Int
rFrozen)

-- | Assemble the result DataFrame for an inner join from expanded index vectors.
assembleInner ::
    S.Set T.Text ->
    DataFrame ->
    DataFrame ->
    VU.Vector Int ->
    VU.Vector Int ->
    DataFrame
assembleInner :: Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame
assembleInner Set Text
csSet DataFrame
left DataFrame
right Vector Int
leftIxs Vector Int
rightIxs =
    let !resultLen :: Int
resultLen = Vector Int -> Int
forall a. Unbox a => Vector a -> Int
VU.length Vector Int
leftIxs
        leftColSet :: Set Text
leftColSet = [Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList (DataFrame -> [Text]
D.columnNames DataFrame
left)
        rightColNames :: [Text]
rightColNames = DataFrame -> [Text]
D.columnNames DataFrame
right

        -- Pre-expand every column once
        expandedLeftCols :: Vector Column
expandedLeftCols = (Column -> Column) -> Vector Column -> Vector Column
forall a b. (a -> b) -> Vector a -> Vector b
VB.map (Vector Int -> Column -> Column
D.atIndicesStable Vector Int
leftIxs) (DataFrame -> Vector Column
D.columns DataFrame
left)
        expandedRightCols :: Vector Column
expandedRightCols = (Column -> Column) -> Vector Column -> Vector Column
forall a b. (a -> b) -> Vector a -> Vector b
VB.map (Vector Int -> Column -> Column
D.atIndicesStable Vector Int
rightIxs) (DataFrame -> Vector Column
D.columns DataFrame
right)

        getExpandedLeft :: Text -> Maybe Column
getExpandedLeft Text
name = do
            Int
idx <- Text -> Map Text Int -> Maybe Int
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup Text
name (DataFrame -> Map Text Int
D.columnIndices DataFrame
left)
            Column -> Maybe Column
forall a. a -> Maybe a
forall (m :: * -> *) a. Monad m => a -> m a
return (Vector Column
expandedLeftCols Vector Column -> Int -> Column
forall a. Vector a -> Int -> a
`VB.unsafeIndex` Int
idx)

        getExpandedRight :: Text -> Maybe Column
getExpandedRight Text
name = do
            Int
idx <- Text -> Map Text Int -> Maybe Int
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup Text
name (DataFrame -> Map Text Int
D.columnIndices DataFrame
right)
            Column -> Maybe Column
forall a. a -> Maybe a
forall (m :: * -> *) a. Monad m => a -> m a
return (Vector Column
expandedRightCols Vector Column -> Int -> Column
forall a. Vector a -> Int -> a
`VB.unsafeIndex` Int
idx)

        -- Base DataFrame: all left columns, expanded
        baseDf :: DataFrame
baseDf =
            DataFrame
left
                { columns = expandedLeftCols
                , dataframeDimensions = (resultLen, snd (D.dataframeDimensions left))
                , derivingExpressions = M.empty
                }

        insertIfPresent :: Text -> Maybe Column -> DataFrame -> DataFrame
insertIfPresent Text
_ Maybe Column
Nothing DataFrame
df = DataFrame
df
        insertIfPresent Text
name (Just Column
c) DataFrame
df = Text -> Column -> DataFrame -> DataFrame
D.insertColumn Text
name Column
c DataFrame
df
     in (Text -> DataFrame -> DataFrame)
-> [Text] -> DataFrame -> DataFrame
forall a.
(a -> DataFrame -> DataFrame) -> [a] -> DataFrame -> DataFrame
D.fold
            ( \Text
name DataFrame
df ->
                if Text -> Set Text -> Bool
forall a. Ord a => a -> Set a -> Bool
S.member Text
name Set Text
csSet
                    then DataFrame
df -- Key column already present from left side
                    else
                        if Text -> Set Text -> Bool
forall a. Ord a => a -> Set a -> Bool
S.member Text
name Set Text
leftColSet
                            then -- Overlapping non-key column: merge with These
                                Text -> Maybe Column -> DataFrame -> DataFrame
insertIfPresent
                                    Text
name
                                    (Column -> Column -> Column
D.mergeColumns (Column -> Column -> Column)
-> Maybe Column -> Maybe (Column -> Column)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text -> Maybe Column
getExpandedLeft Text
name Maybe (Column -> Column) -> Maybe Column -> Maybe Column
forall a b. Maybe (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Maybe Column
getExpandedRight Text
name)
                                    DataFrame
df
                            else -- Right-only column
                                Text -> Maybe Column -> DataFrame -> DataFrame
insertIfPresent Text
name (Text -> Maybe Column
getExpandedRight Text
name) DataFrame
df
            )
            [Text]
rightColNames
            DataFrame
baseDf

-- ============================================================
-- Left Join
-- ============================================================

{- | Performs a left join on two dataframes using the specified key columns.
Returns all rows from the left dataframe, with matching rows from the right dataframe.
Non-matching rows will have Nothing/null values for columns from the right dataframe.

==== __Example__
@
ghci> df = D.fromNamedColumns [("key", D.fromList ["K0", "K1", "K2", "K3"]), ("A", D.fromList ["A0", "A1", "A2", "A3"])]
ghci> other = D.fromNamedColumns [("key", D.fromList ["K0", "K1", "K2"]), ("B", D.fromList ["B0", "B1", "B2"])]
ghci> D.leftJoin ["key"] df other

------------------------
 key  |  A  |     B
------|-----|----------
 Text | Text| Maybe Text
------|-----|----------
 K0   | A0  | Just "B0"
 K1   | A1  | Just "B1"
 K2   | A2  | Just "B2"
 K3   | A3  | Nothing

@
-}
leftJoin :: [T.Text] -> DataFrame -> DataFrame -> DataFrame
leftJoin :: [Text] -> DataFrame -> DataFrame -> DataFrame
leftJoin [Text]
cs DataFrame
left DataFrame
right
    | DataFrame -> Bool
D.null DataFrame
right Bool -> Bool -> Bool
|| DataFrame -> Int
D.nRows DataFrame
right Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 = DataFrame
left
    | DataFrame -> Bool
D.null DataFrame
left Bool -> Bool -> Bool
|| DataFrame -> Int
D.nRows DataFrame
left Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 = DataFrame
D.empty
    | Bool
otherwise =
        let
            csSet :: Set Text
csSet = [Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList [Text]
cs
            rightRows :: Int
rightRows = (Int, Int) -> Int
forall a b. (a, b) -> a
fst (DataFrame -> (Int, Int)
D.dimensions DataFrame
right)

            leftKeyIdxs :: [Int]
leftKeyIdxs = Set Text -> DataFrame -> [Int]
keyColIndices Set Text
csSet DataFrame
left
            rightKeyIdxs :: [Int]
rightKeyIdxs = Set Text -> DataFrame -> [Int]
keyColIndices Set Text
csSet DataFrame
right
            leftHashes :: Vector Int
leftHashes = [Int] -> DataFrame -> Vector Int
D.computeRowHashes [Int]
leftKeyIdxs DataFrame
left
            rightHashes :: Vector Int
rightHashes = [Int] -> DataFrame -> Vector Int
D.computeRowHashes [Int]
rightKeyIdxs DataFrame
right

            -- Right is always the build side for left join
            (Vector Int
leftIxs, Vector Int
rightIxs)
                | Int
rightRows Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
joinStrategyThreshold =
                    Vector Int -> Vector Int -> (Vector Int, Vector Int)
sortMergeLeftKernel Vector Int
leftHashes Vector Int
rightHashes
                | Bool
otherwise =
                    Vector Int -> Vector Int -> (Vector Int, Vector Int)
hashLeftKernel Vector Int
leftHashes Vector Int
rightHashes
         in
            -- rightIxs uses -1 as sentinel for "no match"
            Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame
assembleLeft Set Text
csSet DataFrame
left DataFrame
right Vector Int
leftIxs Vector Int
rightIxs

{- | Hash-based left join kernel.
Returns @(leftExpandedIndices, rightExpandedIndices)@ where
right indices use @-1@ as sentinel for unmatched rows.
Uses a dynamically growing output buffer to avoid pre-allocating the full
cross-product size (which can be astronomically large for low-cardinality keys).
-}
hashLeftKernel ::
    VU.Vector Int -> VU.Vector Int -> (VU.Vector Int, VU.Vector Int)
hashLeftKernel :: Vector Int -> Vector Int -> (Vector Int, Vector Int)
hashLeftKernel Vector Int
leftHashes Vector Int
rightHashes = (forall s. ST s (Vector Int, Vector Int))
-> (Vector Int, Vector Int)
forall a. (forall s. ST s a) -> a
runST ((forall s. ST s (Vector Int, Vector Int))
 -> (Vector Int, Vector Int))
-> (forall s. ST s (Vector Int, Vector Int))
-> (Vector Int, Vector Int)
forall a b. (a -> b) -> a -> b
$ do
    let ci :: CompactIndex
ci = Vector Int -> CompactIndex
buildCompactIndex Vector Int
rightHashes
        ciIxs :: Vector Int
ciIxs = CompactIndex -> Vector Int
ciSortedIndices CompactIndex
ci
        ciOff :: HashMap Int (Int, Int)
ciOff = CompactIndex -> HashMap Int (Int, Int)
ciOffsets CompactIndex
ci
        !leftN :: Int
leftN = Vector Int -> Int
forall a. Unbox a => Vector a -> Int
VU.length Vector Int
leftHashes
        !rightN :: Int
rightN = Vector Int -> Int
forall a. Unbox a => Vector a -> Int
VU.length Vector Int
rightHashes
        initCap :: Int
initCap = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1 (Int -> Int -> Int
forall a. Ord a => a -> a -> a
min (Int
leftN Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
rightN) Int
1_000_000)

    MVector s Int
initLv <- Int -> ST s (MVector (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
Int -> m (MVector (PrimState m) a)
VUM.unsafeNew Int
initCap
    MVector s Int
initRv <- Int -> ST s (MVector (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
Int -> m (MVector (PrimState m) a)
VUM.unsafeNew Int
initCap
    STRef s (MVector s Int)
lvRef <- MVector s Int -> ST s (STRef s (MVector s Int))
forall a s. a -> ST s (STRef s a)
newSTRef MVector s Int
initLv
    STRef s (MVector s Int)
rvRef <- MVector s Int -> ST s (STRef s (MVector s Int))
forall a s. a -> ST s (STRef s a)
newSTRef MVector s Int
initRv
    STRef s Int
capRef <- Int -> ST s (STRef s Int)
forall a s. a -> ST s (STRef s a)
newSTRef Int
initCap
    STRef s Int
posRef <- Int -> ST s (STRef s Int)
forall a s. a -> ST s (STRef s a)
newSTRef (Int
0 :: Int)

    let ensureCapacity :: Int -> ST s ()
ensureCapacity Int
needed = do
            Int
cap <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
capRef
            Bool -> ST s () -> ST s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
needed Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
cap) (ST s () -> ST s ()) -> ST s () -> ST s ()
forall a b. (a -> b) -> a -> b
$ do
                let newCap :: Int
newCap = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
needed (Int
cap Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
2)
                    delta :: Int
delta = Int
newCap Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
cap
                MVector s Int
lv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
lvRef
                MVector s Int
rv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
rvRef
                MVector s Int
newLv <- MVector (PrimState (ST s)) Int
-> Int -> ST s (MVector (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> m (MVector (PrimState m) a)
VUM.unsafeGrow MVector s Int
MVector (PrimState (ST s)) Int
lv Int
delta
                MVector s Int
newRv <- MVector (PrimState (ST s)) Int
-> Int -> ST s (MVector (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> m (MVector (PrimState m) a)
VUM.unsafeGrow MVector s Int
MVector (PrimState (ST s)) Int
rv Int
delta
                STRef s (MVector s Int) -> MVector s Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s (MVector s Int)
lvRef MVector s Int
newLv
                STRef s (MVector s Int) -> MVector s Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s (MVector s Int)
rvRef MVector s Int
newRv
                STRef s Int -> Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s Int
capRef Int
newCap

        go :: Int -> ST s ()
go !Int
i
            | Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
leftN = () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            | Bool
otherwise = do
                let !h :: Int
h = Vector Int
leftHashes Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
i
                !Int
p <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
posRef
                case Int -> HashMap Int (Int, Int) -> Maybe (Int, Int)
forall k v. Hashable k => k -> HashMap k v -> Maybe v
HM.lookup Int
h HashMap Int (Int, Int)
ciOff of
                    Maybe (Int, Int)
Nothing -> do
                        Int -> ST s ()
ensureCapacity (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                        MVector s Int
lv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
lvRef
                        MVector s Int
rv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
rvRef
                        MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
lv Int
p Int
i
                        MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
rv Int
p (-Int
1)
                        STRef s Int -> Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s Int
posRef (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                    Just (!Int
start, !Int
len) -> do
                        Int -> ST s ()
ensureCapacity (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
len)
                        MVector s Int
lv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
lvRef
                        MVector s Int
rv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
rvRef
                        Int
-> Int
-> Int
-> Int
-> Int
-> MVector s Int
-> MVector s Int
-> ST s ()
fillBuild Int
i Int
start Int
len Int
p Int
0 MVector s Int
lv MVector s Int
rv
                        STRef s Int -> Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s Int
posRef (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
len)
                Int -> ST s ()
go (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
        fillBuild :: Int
-> Int
-> Int
-> Int
-> Int
-> MVector s Int
-> MVector s Int
-> ST s ()
fillBuild !Int
leftIdx !Int
start !Int
len !Int
p !Int
j !MVector s Int
lv !MVector s Int
rv
            | Int
j Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
len = () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            | Bool
otherwise = do
                MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
lv (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
j) Int
leftIdx
                MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
rv (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
j) (Vector Int
ciIxs Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` (Int
start Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
j))
                Int
-> Int
-> Int
-> Int
-> Int
-> MVector s Int
-> MVector s Int
-> ST s ()
fillBuild Int
leftIdx Int
start Int
len Int
p (Int
j Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) MVector s Int
lv MVector s Int
rv
    Int -> ST s ()
go Int
0

    !Int
total <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
posRef
    MVector s Int
lv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
lvRef
    MVector s Int
rv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
rvRef
    (,)
        (Vector Int -> Vector Int -> (Vector Int, Vector Int))
-> ST s (Vector Int)
-> ST s (Vector Int -> (Vector Int, Vector Int))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MVector (PrimState (ST s)) Int -> ST s (Vector Int)
forall a (m :: * -> *).
(Unbox a, PrimMonad m) =>
MVector (PrimState m) a -> m (Vector a)
VU.unsafeFreeze (Int -> Int -> MVector s Int -> MVector s Int
forall a s. Unbox a => Int -> Int -> MVector s a -> MVector s a
VUM.slice Int
0 Int
total MVector s Int
lv)
        ST s (Vector Int -> (Vector Int, Vector Int))
-> ST s (Vector Int) -> ST s (Vector Int, Vector Int)
forall a b. ST s (a -> b) -> ST s a -> ST s b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> MVector (PrimState (ST s)) Int -> ST s (Vector Int)
forall a (m :: * -> *).
(Unbox a, PrimMonad m) =>
MVector (PrimState m) a -> m (Vector a)
VU.unsafeFreeze (Int -> Int -> MVector s Int -> MVector s Int
forall a s. Unbox a => Int -> Int -> MVector s a -> MVector s a
VUM.slice Int
0 Int
total MVector s Int
rv)

{- | Sort-merge left join kernel.
Returns @(leftExpandedIndices, rightExpandedIndices)@ with @-1@ sentinel.
Uses a dynamically growing output buffer instead of a two-pass count-then-allocate
strategy, which OOMs when low-cardinality keys produce large cross products.
-}
sortMergeLeftKernel ::
    VU.Vector Int -> VU.Vector Int -> (VU.Vector Int, VU.Vector Int)
sortMergeLeftKernel :: Vector Int -> Vector Int -> (Vector Int, Vector Int)
sortMergeLeftKernel Vector Int
leftHashes Vector Int
rightHashes = (forall s. ST s (Vector Int, Vector Int))
-> (Vector Int, Vector Int)
forall a. (forall s. ST s a) -> a
runST ((forall s. ST s (Vector Int, Vector Int))
 -> (Vector Int, Vector Int))
-> (forall s. ST s (Vector Int, Vector Int))
-> (Vector Int, Vector Int)
forall a b. (a -> b) -> a -> b
$ do
    let (Vector Int
leftSH, Vector Int
leftSI) = Vector Int -> (Vector Int, Vector Int)
sortWithIndices Vector Int
leftHashes
        (Vector Int
rightSH, Vector Int
rightSI) = Vector Int -> (Vector Int, Vector Int)
sortWithIndices Vector Int
rightHashes
        !leftN :: Int
leftN = Vector Int -> Int
forall a. Unbox a => Vector a -> Int
VU.length Vector Int
leftHashes
        !rightN :: Int
rightN = Vector Int -> Int
forall a. Unbox a => Vector a -> Int
VU.length Vector Int
rightHashes
        initCap :: Int
initCap = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1 (Int -> Int -> Int
forall a. Ord a => a -> a -> a
min (Int
leftN Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
rightN) Int
1_000_000)

    MVector s Int
initLv <- Int -> ST s (MVector (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
Int -> m (MVector (PrimState m) a)
VUM.unsafeNew Int
initCap
    MVector s Int
initRv <- Int -> ST s (MVector (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
Int -> m (MVector (PrimState m) a)
VUM.unsafeNew Int
initCap
    STRef s (MVector s Int)
lvRef <- MVector s Int -> ST s (STRef s (MVector s Int))
forall a s. a -> ST s (STRef s a)
newSTRef MVector s Int
initLv
    STRef s (MVector s Int)
rvRef <- MVector s Int -> ST s (STRef s (MVector s Int))
forall a s. a -> ST s (STRef s a)
newSTRef MVector s Int
initRv
    STRef s Int
capRef <- Int -> ST s (STRef s Int)
forall a s. a -> ST s (STRef s a)
newSTRef Int
initCap
    STRef s Int
posRef <- Int -> ST s (STRef s Int)
forall a s. a -> ST s (STRef s a)
newSTRef (Int
0 :: Int)

    let ensureCapacity :: Int -> ST s ()
ensureCapacity Int
needed = do
            Int
cap <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
capRef
            Bool -> ST s () -> ST s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
needed Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
cap) (ST s () -> ST s ()) -> ST s () -> ST s ()
forall a b. (a -> b) -> a -> b
$ do
                let newCap :: Int
newCap = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
needed (Int
cap Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
2)
                    delta :: Int
delta = Int
newCap Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
cap
                MVector s Int
lv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
lvRef
                MVector s Int
rv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
rvRef
                MVector s Int
newLv <- MVector (PrimState (ST s)) Int
-> Int -> ST s (MVector (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> m (MVector (PrimState m) a)
VUM.unsafeGrow MVector s Int
MVector (PrimState (ST s)) Int
lv Int
delta
                MVector s Int
newRv <- MVector (PrimState (ST s)) Int
-> Int -> ST s (MVector (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> m (MVector (PrimState m) a)
VUM.unsafeGrow MVector s Int
MVector (PrimState (ST s)) Int
rv Int
delta
                STRef s (MVector s Int) -> MVector s Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s (MVector s Int)
lvRef MVector s Int
newLv
                STRef s (MVector s Int) -> MVector s Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s (MVector s Int)
rvRef MVector s Int
newRv
                STRef s Int -> Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s Int
capRef Int
newCap

        fillGroup :: Int -> Int -> Int -> Int -> ST s ()
fillGroup !Int
li !Int
lEnd !Int
ri !Int
rEnd = do
            let !lLen :: Int
lLen = Int
lEnd Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
li
                !rLen :: Int
rLen = Int
rEnd Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
ri
                !groupSize :: Int
groupSize = Int
lLen Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
rLen
            !Int
p <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
posRef
            Int -> ST s ()
ensureCapacity (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
groupSize)
            MVector s Int
lv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
lvRef
            MVector s Int
rv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
rvRef
            let goL :: Int -> Int -> ST s ()
goL !Int
lIdx !Int
pos
                    | Int
lIdx Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
lEnd = () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                    | Bool
otherwise = do
                        let !lOrig :: Int
lOrig = Vector Int
leftSI Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
lIdx
                        Int -> Int -> Int -> ST s ()
goR Int
lOrig Int
ri Int
pos
                        Int -> Int -> ST s ()
goL (Int
lIdx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) (Int
pos Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
rLen)
                goR :: Int -> Int -> Int -> ST s ()
goR !Int
lOrig !Int
rIdx !Int
pos
                    | Int
rIdx Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
rEnd = () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                    | Bool
otherwise = do
                        MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
lv Int
pos Int
lOrig
                        MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
rv Int
pos (Vector Int
rightSI Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
rIdx)
                        Int -> Int -> Int -> ST s ()
goR Int
lOrig (Int
rIdx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) (Int
pos Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
            Int -> Int -> ST s ()
goL Int
li Int
p
            STRef s Int -> Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s Int
posRef (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
groupSize)

        fill :: Int -> Int -> ST s ()
fill !Int
li !Int
ri
            | Int
li Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
leftN = () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            | Int
ri Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
rightN = Int -> ST s ()
fillRemainingLeft Int
li
            | Int
lh Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
rh = do
                !Int
p <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
posRef
                Int -> ST s ()
ensureCapacity (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                MVector s Int
lv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
lvRef
                MVector s Int
rv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
rvRef
                MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
lv Int
p (Vector Int
leftSI Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
li)
                MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
rv Int
p (-Int
1)
                STRef s Int -> Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s Int
posRef (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                Int -> Int -> ST s ()
fill (Int
li Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int
ri
            | Int
lh Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
rh = Int -> Int -> ST s ()
fill Int
li (Int
ri Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
            | Bool
otherwise = do
                let !lEnd :: Int
lEnd = Vector Int -> Int -> Int -> Int -> Int
findGroupEnd Vector Int
leftSH Int
lh (Int
li Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int
leftN
                    !rEnd :: Int
rEnd = Vector Int -> Int -> Int -> Int -> Int
findGroupEnd Vector Int
rightSH Int
rh (Int
ri Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int
rightN
                Int -> Int -> Int -> Int -> ST s ()
fillGroup Int
li Int
lEnd Int
ri Int
rEnd
                Int -> Int -> ST s ()
fill Int
lEnd Int
rEnd
          where
            !lh :: Int
lh = Vector Int
leftSH Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
li
            !rh :: Int
rh = Vector Int
rightSH Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
ri

        fillRemainingLeft :: Int -> ST s ()
fillRemainingLeft !Int
i = do
            let !remaining :: Int
remaining = Int
leftN Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
i
            Bool -> ST s () -> ST s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
remaining Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (ST s () -> ST s ()) -> ST s () -> ST s ()
forall a b. (a -> b) -> a -> b
$ do
                !Int
p <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
posRef
                Int -> ST s ()
ensureCapacity (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
remaining)
                MVector s Int
lv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
lvRef
                MVector s Int
rv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
rvRef
                let go :: Int -> ST s ()
go !Int
j
                        | Int
j Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
remaining = () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                        | Bool
otherwise = do
                            MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
lv (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
j) (Vector Int
leftSI Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
j))
                            MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
rv (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
j) (-Int
1)
                            Int -> ST s ()
go (Int
j Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                Int -> ST s ()
go Int
0
                STRef s Int -> Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s Int
posRef (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
remaining)

    Int -> Int -> ST s ()
fill Int
0 Int
0

    !Int
total <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
posRef
    MVector s Int
lv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
lvRef
    MVector s Int
rv <- STRef s (MVector s Int) -> ST s (MVector s Int)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MVector s Int)
rvRef
    (,)
        (Vector Int -> Vector Int -> (Vector Int, Vector Int))
-> ST s (Vector Int)
-> ST s (Vector Int -> (Vector Int, Vector Int))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MVector (PrimState (ST s)) Int -> ST s (Vector Int)
forall a (m :: * -> *).
(Unbox a, PrimMonad m) =>
MVector (PrimState m) a -> m (Vector a)
VU.unsafeFreeze (Int -> Int -> MVector s Int -> MVector s Int
forall a s. Unbox a => Int -> Int -> MVector s a -> MVector s a
VUM.slice Int
0 Int
total MVector s Int
lv)
        ST s (Vector Int -> (Vector Int, Vector Int))
-> ST s (Vector Int) -> ST s (Vector Int, Vector Int)
forall a b. ST s (a -> b) -> ST s a -> ST s b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> MVector (PrimState (ST s)) Int -> ST s (Vector Int)
forall a (m :: * -> *).
(Unbox a, PrimMonad m) =>
MVector (PrimState m) a -> m (Vector a)
VU.unsafeFreeze (Int -> Int -> MVector s Int -> MVector s Int
forall a s. Unbox a => Int -> Int -> MVector s a -> MVector s a
VUM.slice Int
0 Int
total MVector s Int
rv)

{- | Assemble the result DataFrame for a left join.
Right index vectors use @-1@ sentinel, gathered via 'gatherWithSentinel'.
-}
assembleLeft ::
    S.Set T.Text ->
    DataFrame ->
    DataFrame ->
    VU.Vector Int ->
    VU.Vector Int ->
    DataFrame
assembleLeft :: Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame
assembleLeft Set Text
csSet DataFrame
left DataFrame
right Vector Int
leftIxs Vector Int
rightIxs =
    let !resultLen :: Int
resultLen = Vector Int -> Int
forall a. Unbox a => Vector a -> Int
VU.length Vector Int
leftIxs
        leftColSet :: Set Text
leftColSet = [Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList (DataFrame -> [Text]
D.columnNames DataFrame
left)
        rightColNames :: [Text]
rightColNames = DataFrame -> [Text]
D.columnNames DataFrame
right

        expandedLeftCols :: Vector Column
expandedLeftCols = (Column -> Column) -> Vector Column -> Vector Column
forall a b. (a -> b) -> Vector a -> Vector b
VB.map (Vector Int -> Column -> Column
D.atIndicesStable Vector Int
leftIxs) (DataFrame -> Vector Column
D.columns DataFrame
left)
        expandedRightCols :: Vector Column
expandedRightCols = (Column -> Column) -> Vector Column -> Vector Column
forall a b. (a -> b) -> Vector a -> Vector b
VB.map (Vector Int -> Column -> Column
D.gatherWithSentinel Vector Int
rightIxs) (DataFrame -> Vector Column
D.columns DataFrame
right)

        getExpandedLeft :: Text -> Maybe Column
getExpandedLeft Text
name = do
            Int
idx <- Text -> Map Text Int -> Maybe Int
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup Text
name (DataFrame -> Map Text Int
D.columnIndices DataFrame
left)
            Column -> Maybe Column
forall a. a -> Maybe a
forall (m :: * -> *) a. Monad m => a -> m a
return (Vector Column
expandedLeftCols Vector Column -> Int -> Column
forall a. Vector a -> Int -> a
`VB.unsafeIndex` Int
idx)

        getExpandedRight :: Text -> Maybe Column
getExpandedRight Text
name = do
            Int
idx <- Text -> Map Text Int -> Maybe Int
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup Text
name (DataFrame -> Map Text Int
D.columnIndices DataFrame
right)
            Column -> Maybe Column
forall a. a -> Maybe a
forall (m :: * -> *) a. Monad m => a -> m a
return (Vector Column
expandedRightCols Vector Column -> Int -> Column
forall a. Vector a -> Int -> a
`VB.unsafeIndex` Int
idx)

        baseDf :: DataFrame
baseDf =
            DataFrame
left
                { columns = expandedLeftCols
                , dataframeDimensions = (resultLen, snd (D.dataframeDimensions left))
                , derivingExpressions = M.empty
                }

        insertIfPresent :: Text -> Maybe Column -> DataFrame -> DataFrame
insertIfPresent Text
_ Maybe Column
Nothing DataFrame
df = DataFrame
df
        insertIfPresent Text
name (Just Column
c) DataFrame
df = Text -> Column -> DataFrame -> DataFrame
D.insertColumn Text
name Column
c DataFrame
df
     in (Text -> DataFrame -> DataFrame)
-> [Text] -> DataFrame -> DataFrame
forall a.
(a -> DataFrame -> DataFrame) -> [a] -> DataFrame -> DataFrame
D.fold
            ( \Text
name DataFrame
df ->
                if Text -> Set Text -> Bool
forall a. Ord a => a -> Set a -> Bool
S.member Text
name Set Text
csSet
                    then DataFrame
df
                    else
                        if Text -> Set Text -> Bool
forall a. Ord a => a -> Set a -> Bool
S.member Text
name Set Text
leftColSet
                            then
                                Text -> Maybe Column -> DataFrame -> DataFrame
insertIfPresent
                                    Text
name
                                    (Column -> Column -> Column
D.mergeColumns (Column -> Column -> Column)
-> Maybe Column -> Maybe (Column -> Column)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text -> Maybe Column
getExpandedLeft Text
name Maybe (Column -> Column) -> Maybe Column -> Maybe Column
forall a b. Maybe (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Maybe Column
getExpandedRight Text
name)
                                    DataFrame
df
                            else Text -> Maybe Column -> DataFrame -> DataFrame
insertIfPresent Text
name (Text -> Maybe Column
getExpandedRight Text
name) DataFrame
df
            )
            [Text]
rightColNames
            DataFrame
baseDf

{- | Performs a right join on two dataframes using the specified key columns.
Returns all rows from the right dataframe, with matching rows from the left dataframe.
Non-matching rows will have Nothing/null values for columns from the left dataframe.

==== __Example__
@
ghci> df = D.fromNamedColumns [("key", D.fromList ["K0", "K1", "K2", "K3"]), ("A", D.fromList ["A0", "A1", "A2", "A3"])]
ghci> other = D.fromNamedColumns [("key", D.fromList ["K0", "K1"]), ("B", D.fromList ["B0", "B1"])]
ghci> D.rightJoin ["key"] df other

-----------------
 key  |  A  |  B
------|-----|----
 Text | Text| Text
------|-----|----
 K0   | A0  | B0
 K1   | A1  | B1

@
-}
rightJoin ::
    [T.Text] -> DataFrame -> DataFrame -> DataFrame
rightJoin :: [Text] -> DataFrame -> DataFrame -> DataFrame
rightJoin [Text]
cs DataFrame
left DataFrame
right = [Text] -> DataFrame -> DataFrame -> DataFrame
leftJoin [Text]
cs DataFrame
right DataFrame
left

fullOuterJoin ::
    [T.Text] -> DataFrame -> DataFrame -> DataFrame
fullOuterJoin :: [Text] -> DataFrame -> DataFrame -> DataFrame
fullOuterJoin [Text]
cs DataFrame
left DataFrame
right
    | DataFrame -> Bool
D.null DataFrame
right Bool -> Bool -> Bool
|| DataFrame -> Int
D.nRows DataFrame
right Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 = DataFrame
left
    | DataFrame -> Bool
D.null DataFrame
left Bool -> Bool -> Bool
|| DataFrame -> Int
D.nRows DataFrame
left Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 = DataFrame
right
    | Bool
otherwise =
        let
            csSet :: Set Text
csSet = [Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList [Text]
cs
            leftRows :: Int
leftRows = (Int, Int) -> Int
forall a b. (a, b) -> a
fst (DataFrame -> (Int, Int)
D.dimensions DataFrame
left)
            rightRows :: Int
rightRows = (Int, Int) -> Int
forall a b. (a, b) -> a
fst (DataFrame -> (Int, Int)
D.dimensions DataFrame
right)

            leftKeyIdxs :: [Int]
leftKeyIdxs = Set Text -> DataFrame -> [Int]
keyColIndices Set Text
csSet DataFrame
left
            rightKeyIdxs :: [Int]
rightKeyIdxs = Set Text -> DataFrame -> [Int]
keyColIndices Set Text
csSet DataFrame
right
            leftHashes :: Vector Int
leftHashes = [Int] -> DataFrame -> Vector Int
D.computeRowHashes [Int]
leftKeyIdxs DataFrame
left
            rightHashes :: Vector Int
rightHashes = [Int] -> DataFrame -> Vector Int
D.computeRowHashes [Int]
rightKeyIdxs DataFrame
right

            -- Both sides can have nulls in full outer
            (Vector Int
leftIxs, Vector Int
rightIxs)
                | Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
leftRows Int
rightRows Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
joinStrategyThreshold =
                    Vector Int -> Vector Int -> (Vector Int, Vector Int)
sortMergeFullOuterKernel Vector Int
leftHashes Vector Int
rightHashes
                | Bool
otherwise =
                    Vector Int -> Vector Int -> (Vector Int, Vector Int)
hashFullOuterKernel Vector Int
leftHashes Vector Int
rightHashes
         in
            -- Both index vectors use -1 as sentinel
            Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame
assembleFullOuter Set Text
csSet DataFrame
left DataFrame
right Vector Int
leftIxs Vector Int
rightIxs

{- | Hash-based full outer join kernel.
Builds compact indices on both sides.
Returns @(leftExpandedIndices, rightExpandedIndices)@ with @-1@ sentinels.
-}
hashFullOuterKernel ::
    VU.Vector Int -> VU.Vector Int -> (VU.Vector Int, VU.Vector Int)
hashFullOuterKernel :: Vector Int -> Vector Int -> (Vector Int, Vector Int)
hashFullOuterKernel Vector Int
leftHashes Vector Int
rightHashes = (forall s. ST s (Vector Int, Vector Int))
-> (Vector Int, Vector Int)
forall a. (forall s. ST s a) -> a
runST ((forall s. ST s (Vector Int, Vector Int))
 -> (Vector Int, Vector Int))
-> (forall s. ST s (Vector Int, Vector Int))
-> (Vector Int, Vector Int)
forall a b. (a -> b) -> a -> b
$ do
    let leftCI :: CompactIndex
leftCI = Vector Int -> CompactIndex
buildCompactIndex Vector Int
leftHashes
        rightCI :: CompactIndex
rightCI = Vector Int -> CompactIndex
buildCompactIndex Vector Int
rightHashes
        leftOff :: HashMap Int (Int, Int)
leftOff = CompactIndex -> HashMap Int (Int, Int)
ciOffsets CompactIndex
leftCI
        rightOff :: HashMap Int (Int, Int)
rightOff = CompactIndex -> HashMap Int (Int, Int)
ciOffsets CompactIndex
rightCI
        leftSI :: Vector Int
leftSI = CompactIndex -> Vector Int
ciSortedIndices CompactIndex
leftCI
        rightSI :: Vector Int
rightSI = CompactIndex -> Vector Int
ciSortedIndices CompactIndex
rightCI

    -- Count: matched + left-only + right-only
    let leftEntries :: [(Int, (Int, Int))]
leftEntries = HashMap Int (Int, Int) -> [(Int, (Int, Int))]
forall k v. HashMap k v -> [(k, v)]
HM.toList HashMap Int (Int, Int)
leftOff
        rightEntries :: [(Int, (Int, Int))]
rightEntries = HashMap Int (Int, Int) -> [(Int, (Int, Int))]
forall k v. HashMap k v -> [(k, v)]
HM.toList HashMap Int (Int, Int)
rightOff

        !matchedCount :: Int
matchedCount =
            (Int -> (Int, (Int, Int)) -> Int)
-> Int -> [(Int, (Int, Int))] -> Int
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl'
                ( \Int
acc (Int
h, (Int
_, Int
ll)) ->
                    case Int -> HashMap Int (Int, Int) -> Maybe (Int, Int)
forall k v. Hashable k => k -> HashMap k v -> Maybe v
HM.lookup Int
h HashMap Int (Int, Int)
rightOff of
                        Maybe (Int, Int)
Nothing -> Int
acc
                        Just (Int
_, Int
rl) -> Int
acc Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
ll Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
rl
                )
                Int
0
                [(Int, (Int, Int))]
leftEntries

        !leftOnlyCount :: Int
leftOnlyCount =
            (Int -> (Int, (Int, Int)) -> Int)
-> Int -> [(Int, (Int, Int))] -> Int
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl'
                ( \Int
acc (Int
h, (Int
_, Int
ll)) ->
                    if Int -> HashMap Int (Int, Int) -> Bool
forall k a. Hashable k => k -> HashMap k a -> Bool
HM.member Int
h HashMap Int (Int, Int)
rightOff then Int
acc else Int
acc Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
ll
                )
                Int
0
                [(Int, (Int, Int))]
leftEntries

        !rightOnlyCount :: Int
rightOnlyCount =
            (Int -> (Int, (Int, Int)) -> Int)
-> Int -> [(Int, (Int, Int))] -> Int
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl'
                ( \Int
acc (Int
h, (Int
_, Int
rl)) ->
                    if Int -> HashMap Int (Int, Int) -> Bool
forall k a. Hashable k => k -> HashMap k a -> Bool
HM.member Int
h HashMap Int (Int, Int)
leftOff then Int
acc else Int
acc Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
rl
                )
                Int
0
                [(Int, (Int, Int))]
rightEntries

        !totalCount :: Int
totalCount = Int
matchedCount Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
leftOnlyCount Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
rightOnlyCount

    MVector s Int
lv <- Int -> ST s (MVector (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
Int -> m (MVector (PrimState m) a)
VUM.unsafeNew Int
totalCount
    MVector s Int
rv <- Int -> ST s (MVector (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
Int -> m (MVector (PrimState m) a)
VUM.unsafeNew Int
totalCount
    STRef s Int
posRef <- Int -> ST s (STRef s Int)
forall a s. a -> ST s (STRef s a)
newSTRef (Int
0 :: Int)

    -- Fill matched + left-only (iterate left keys)
    [(Int, (Int, Int))] -> ((Int, (Int, Int)) -> ST s ()) -> ST s ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(Int, (Int, Int))]
leftEntries (((Int, (Int, Int)) -> ST s ()) -> ST s ())
-> ((Int, (Int, Int)) -> ST s ()) -> ST s ()
forall a b. (a -> b) -> a -> b
$ \(Int
h, (Int
lStart, Int
lLen)) -> do
        !Int
p <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
posRef
        case Int -> HashMap Int (Int, Int) -> Maybe (Int, Int)
forall k v. Hashable k => k -> HashMap k v -> Maybe v
HM.lookup Int
h HashMap Int (Int, Int)
rightOff of
            Maybe (Int, Int)
Nothing -> do
                -- Left-only rows
                let goL :: Int -> Int -> ST s ()
goL !Int
j !Int
q
                        | Int
j Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
lLen = () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                        | Bool
otherwise = do
                            MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
lv Int
q (Vector Int
leftSI Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` (Int
lStart Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
j))
                            MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
rv Int
q (-Int
1)
                            Int -> Int -> ST s ()
goL (Int
j Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) (Int
q Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                Int -> Int -> ST s ()
goL Int
0 Int
p
                STRef s Int -> Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s Int
posRef (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
lLen)
            Just (!Int
rStart, !Int
rLen) -> do
                -- Cross product
                Vector Int
-> Vector Int
-> Int
-> Int
-> Int
-> Int
-> MVector s Int
-> MVector s Int
-> Int
-> ST s ()
forall s.
Vector Int
-> Vector Int
-> Int
-> Int
-> Int
-> Int
-> MVector s Int
-> MVector s Int
-> Int
-> ST s ()
fillCrossProduct
                    Vector Int
leftSI
                    Vector Int
rightSI
                    Int
lStart
                    (Int
lStart Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
lLen)
                    Int
rStart
                    (Int
rStart Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
rLen)
                    MVector s Int
lv
                    MVector s Int
rv
                    Int
p
                STRef s Int -> Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s Int
posRef (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
lLen Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
rLen)

    -- Fill right-only (iterate right keys not in left)
    [(Int, (Int, Int))] -> ((Int, (Int, Int)) -> ST s ()) -> ST s ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(Int, (Int, Int))]
rightEntries (((Int, (Int, Int)) -> ST s ()) -> ST s ())
-> ((Int, (Int, Int)) -> ST s ()) -> ST s ()
forall a b. (a -> b) -> a -> b
$ \(Int
h, (Int
rStart, Int
rLen)) ->
        case Int -> HashMap Int (Int, Int) -> Maybe (Int, Int)
forall k v. Hashable k => k -> HashMap k v -> Maybe v
HM.lookup Int
h HashMap Int (Int, Int)
leftOff of
            Just (Int, Int)
_ -> () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Maybe (Int, Int)
Nothing -> do
                !Int
p <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
posRef
                let goR :: Int -> Int -> ST s ()
goR !Int
j !Int
q
                        | Int
j Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
rLen = () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                        | Bool
otherwise = do
                            MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
lv Int
q (-Int
1)
                            MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
rv Int
q (Vector Int
rightSI Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` (Int
rStart Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
j))
                            Int -> Int -> ST s ()
goR (Int
j Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) (Int
q Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                Int -> Int -> ST s ()
goR Int
0 Int
p
                STRef s Int -> Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s Int
posRef (Int
p Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
rLen)

    (,) (Vector Int -> Vector Int -> (Vector Int, Vector Int))
-> ST s (Vector Int)
-> ST s (Vector Int -> (Vector Int, Vector Int))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MVector (PrimState (ST s)) Int -> ST s (Vector Int)
forall a (m :: * -> *).
(Unbox a, PrimMonad m) =>
MVector (PrimState m) a -> m (Vector a)
VU.unsafeFreeze MVector s Int
MVector (PrimState (ST s)) Int
lv ST s (Vector Int -> (Vector Int, Vector Int))
-> ST s (Vector Int) -> ST s (Vector Int, Vector Int)
forall a b. ST s (a -> b) -> ST s a -> ST s b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> MVector (PrimState (ST s)) Int -> ST s (Vector Int)
forall a (m :: * -> *).
(Unbox a, PrimMonad m) =>
MVector (PrimState m) a -> m (Vector a)
VU.unsafeFreeze MVector s Int
MVector (PrimState (ST s)) Int
rv

{- | Sort-merge full outer join kernel.
Returns @(leftExpandedIndices, rightExpandedIndices)@ with @-1@ sentinels.
-}
sortMergeFullOuterKernel ::
    VU.Vector Int -> VU.Vector Int -> (VU.Vector Int, VU.Vector Int)
sortMergeFullOuterKernel :: Vector Int -> Vector Int -> (Vector Int, Vector Int)
sortMergeFullOuterKernel Vector Int
leftHashes Vector Int
rightHashes = (forall s. ST s (Vector Int, Vector Int))
-> (Vector Int, Vector Int)
forall a. (forall s. ST s a) -> a
runST ((forall s. ST s (Vector Int, Vector Int))
 -> (Vector Int, Vector Int))
-> (forall s. ST s (Vector Int, Vector Int))
-> (Vector Int, Vector Int)
forall a b. (a -> b) -> a -> b
$ do
    let (Vector Int
leftSH, Vector Int
leftSI) = Vector Int -> (Vector Int, Vector Int)
sortWithIndices Vector Int
leftHashes
        (Vector Int
rightSH, Vector Int
rightSI) = Vector Int -> (Vector Int, Vector Int)
sortWithIndices Vector Int
rightHashes
        !leftN :: Int
leftN = Vector Int -> Int
forall a. Unbox a => Vector a -> Int
VU.length Vector Int
leftHashes
        !rightN :: Int
rightN = Vector Int -> Int
forall a. Unbox a => Vector a -> Int
VU.length Vector Int
rightHashes

    -- Pass 1: count
    let countLoop :: Int -> Int -> Int -> Int
countLoop !Int
li !Int
ri !Int
c
            | Int
li Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
leftN Bool -> Bool -> Bool
&& Int
ri Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
rightN = Int
c
            | Int
li Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
leftN = Int
c Int -> Int -> Int
forall a. Num a => a -> a -> a
+ (Int
rightN Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
ri)
            | Int
ri Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
rightN = Int
c Int -> Int -> Int
forall a. Num a => a -> a -> a
+ (Int
leftN Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
li)
            | Int
lh Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
rh = Int -> Int -> Int -> Int
countLoop (Int
li Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int
ri (Int
c Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
            | Int
lh Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
rh = Int -> Int -> Int -> Int
countLoop Int
li (Int
ri Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) (Int
c Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
            | Bool
otherwise =
                let !lEnd :: Int
lEnd = Vector Int -> Int -> Int -> Int -> Int
findGroupEnd Vector Int
leftSH Int
lh (Int
li Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int
leftN
                    !rEnd :: Int
rEnd = Vector Int -> Int -> Int -> Int -> Int
findGroupEnd Vector Int
rightSH Int
rh (Int
ri Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int
rightN
                 in Int -> Int -> Int -> Int
countLoop Int
lEnd Int
rEnd (Int
c Int -> Int -> Int
forall a. Num a => a -> a -> a
+ (Int
lEnd Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
li) Int -> Int -> Int
forall a. Num a => a -> a -> a
* (Int
rEnd Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
ri))
          where
            !lh :: Int
lh = Vector Int
leftSH Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
li
            !rh :: Int
rh = Vector Int
rightSH Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
ri
        !totalRows :: Int
totalRows = Int -> Int -> Int -> Int
countLoop Int
0 Int
0 Int
0

    -- Pass 2: fill
    MVector s Int
lv <- Int -> ST s (MVector (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
Int -> m (MVector (PrimState m) a)
VUM.unsafeNew Int
totalRows
    MVector s Int
rv <- Int -> ST s (MVector (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
Int -> m (MVector (PrimState m) a)
VUM.unsafeNew Int
totalRows

    let fill :: Int -> Int -> Int -> ST s ()
fill !Int
li !Int
ri !Int
pos
            | Int
li Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
leftN Bool -> Bool -> Bool
&& Int
ri Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
rightN = () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            | Int
li Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
leftN = Int -> Int -> ST s ()
fillRemainingRight Int
ri Int
pos
            | Int
ri Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
rightN = Int -> Int -> ST s ()
fillRemainingLeft Int
li Int
pos
            | Int
lh Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
rh = do
                MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
lv Int
pos (Vector Int
leftSI Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
li)
                MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
rv Int
pos (-Int
1)
                Int -> Int -> Int -> ST s ()
fill (Int
li Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int
ri (Int
pos Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
            | Int
lh Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
rh = do
                MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
lv Int
pos (-Int
1)
                MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
rv Int
pos (Vector Int
rightSI Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
ri)
                Int -> Int -> Int -> ST s ()
fill Int
li (Int
ri Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) (Int
pos Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
            | Bool
otherwise = do
                let !lEnd :: Int
lEnd = Vector Int -> Int -> Int -> Int -> Int
findGroupEnd Vector Int
leftSH Int
lh (Int
li Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int
leftN
                    !rEnd :: Int
rEnd = Vector Int -> Int -> Int -> Int -> Int
findGroupEnd Vector Int
rightSH Int
rh (Int
ri Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int
rightN
                    !groupSize :: Int
groupSize = (Int
lEnd Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
li) Int -> Int -> Int
forall a. Num a => a -> a -> a
* (Int
rEnd Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
ri)
                Vector Int
-> Vector Int
-> Int
-> Int
-> Int
-> Int
-> MVector s Int
-> MVector s Int
-> Int
-> ST s ()
forall s.
Vector Int
-> Vector Int
-> Int
-> Int
-> Int
-> Int
-> MVector s Int
-> MVector s Int
-> Int
-> ST s ()
fillCrossProduct Vector Int
leftSI Vector Int
rightSI Int
li Int
lEnd Int
ri Int
rEnd MVector s Int
lv MVector s Int
rv Int
pos
                Int -> Int -> Int -> ST s ()
fill Int
lEnd Int
rEnd (Int
pos Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
groupSize)
          where
            !lh :: Int
lh = Vector Int
leftSH Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
li
            !rh :: Int
rh = Vector Int
rightSH Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
ri

        fillRemainingLeft :: Int -> Int -> ST s ()
fillRemainingLeft !Int
i !Int
pos
            | Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
leftN = () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            | Bool
otherwise = do
                MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
lv Int
pos (Vector Int
leftSI Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
i)
                MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
rv Int
pos (-Int
1)
                Int -> Int -> ST s ()
fillRemainingLeft (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) (Int
pos Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)

        fillRemainingRight :: Int -> Int -> ST s ()
fillRemainingRight !Int
i !Int
pos
            | Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
rightN = () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            | Bool
otherwise = do
                MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
lv Int
pos (-Int
1)
                MVector (PrimState (ST s)) Int -> Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite MVector s Int
MVector (PrimState (ST s)) Int
rv Int
pos (Vector Int
rightSI Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
`VU.unsafeIndex` Int
i)
                Int -> Int -> ST s ()
fillRemainingRight (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) (Int
pos Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)

    Int -> Int -> Int -> ST s ()
fill Int
0 Int
0 Int
0
    (,) (Vector Int -> Vector Int -> (Vector Int, Vector Int))
-> ST s (Vector Int)
-> ST s (Vector Int -> (Vector Int, Vector Int))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MVector (PrimState (ST s)) Int -> ST s (Vector Int)
forall a (m :: * -> *).
(Unbox a, PrimMonad m) =>
MVector (PrimState m) a -> m (Vector a)
VU.unsafeFreeze MVector s Int
MVector (PrimState (ST s)) Int
lv ST s (Vector Int -> (Vector Int, Vector Int))
-> ST s (Vector Int) -> ST s (Vector Int, Vector Int)
forall a b. ST s (a -> b) -> ST s a -> ST s b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> MVector (PrimState (ST s)) Int -> ST s (Vector Int)
forall a (m :: * -> *).
(Unbox a, PrimMonad m) =>
MVector (PrimState m) a -> m (Vector a)
VU.unsafeFreeze MVector s Int
MVector (PrimState (ST s)) Int
rv

{- | Assemble the result DataFrame for a full outer join.
Both index vectors use @-1@ sentinel; all columns gathered via
'gatherWithSentinel'.  Key columns are coalesced (first non-null wins).
-}
assembleFullOuter ::
    S.Set T.Text ->
    DataFrame ->
    DataFrame ->
    VU.Vector Int ->
    VU.Vector Int ->
    DataFrame
assembleFullOuter :: Set Text
-> DataFrame -> DataFrame -> Vector Int -> Vector Int -> DataFrame
assembleFullOuter Set Text
csSet DataFrame
left DataFrame
right Vector Int
leftIxs Vector Int
rightIxs =
    let !resultLen :: Int
resultLen = Vector Int -> Int
forall a. Unbox a => Vector a -> Int
VU.length Vector Int
leftIxs
        leftColSet :: Set Text
leftColSet = [Text] -> Set Text
forall a. Ord a => [a] -> Set a
S.fromList (DataFrame -> [Text]
D.columnNames DataFrame
left)
        rightColNames :: [Text]
rightColNames = DataFrame -> [Text]
D.columnNames DataFrame
right

        expandedLeftCols :: Vector Column
expandedLeftCols = (Column -> Column) -> Vector Column -> Vector Column
forall a b. (a -> b) -> Vector a -> Vector b
VB.map (Vector Int -> Column -> Column
D.gatherWithSentinel Vector Int
leftIxs) (DataFrame -> Vector Column
D.columns DataFrame
left)
        expandedRightCols :: Vector Column
expandedRightCols = (Column -> Column) -> Vector Column -> Vector Column
forall a b. (a -> b) -> Vector a -> Vector b
VB.map (Vector Int -> Column -> Column
D.gatherWithSentinel Vector Int
rightIxs) (DataFrame -> Vector Column
D.columns DataFrame
right)

        getExpandedLeft :: Text -> Maybe Column
getExpandedLeft Text
name = do
            Int
idx <- Text -> Map Text Int -> Maybe Int
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup Text
name (DataFrame -> Map Text Int
D.columnIndices DataFrame
left)
            Column -> Maybe Column
forall a. a -> Maybe a
forall (m :: * -> *) a. Monad m => a -> m a
return (Vector Column
expandedLeftCols Vector Column -> Int -> Column
forall a. Vector a -> Int -> a
`VB.unsafeIndex` Int
idx)

        getExpandedRight :: Text -> Maybe Column
getExpandedRight Text
name = do
            Int
idx <- Text -> Map Text Int -> Maybe Int
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup Text
name (DataFrame -> Map Text Int
D.columnIndices DataFrame
right)
            Column -> Maybe Column
forall a. a -> Maybe a
forall (m :: * -> *) a. Monad m => a -> m a
return (Vector Column
expandedRightCols Vector Column -> Int -> Column
forall a. Vector a -> Int -> a
`VB.unsafeIndex` Int
idx)

        baseDf :: DataFrame
baseDf =
            DataFrame
left
                { columns = expandedLeftCols
                , dataframeDimensions = (resultLen, snd (D.dataframeDimensions left))
                , derivingExpressions = M.empty
                }

        insertIfPresent :: Text -> Maybe Column -> DataFrame -> DataFrame
insertIfPresent Text
_ Maybe Column
Nothing DataFrame
df = DataFrame
df
        insertIfPresent Text
name (Just Column
c) DataFrame
df = Text -> Column -> DataFrame -> DataFrame
D.insertColumn Text
name Column
c DataFrame
df

        -- Coalesce two OptionalColumns: take first non-Nothing per row,
        -- producing a non-optional column.
        coalesceKeyColumn :: Column -> Column -> Column
        coalesceKeyColumn :: Column -> Column -> Column
coalesceKeyColumn
            (OptionalColumn (Vector (Maybe a)
lCol :: VB.Vector (Maybe a)))
            (OptionalColumn (Vector (Maybe a)
rCol :: VB.Vector (Maybe b))) =
                case TypeRep a -> TypeRep a -> Maybe (a :~: a)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @a) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @b) of
                    Just a :~: a
Refl ->
                        Vector a -> Column
forall a.
(Columnable a, ColumnifyRep (KindOf a) a) =>
Vector a -> Column
D.fromVector (Vector a -> Column) -> Vector a -> Column
forall a b. (a -> b) -> a -> b
$
                            (Maybe a -> Maybe a -> a)
-> Vector (Maybe a) -> Vector (Maybe a) -> Vector a
forall a b c. (a -> b -> c) -> Vector a -> Vector b -> Vector c
VB.zipWith
                                ( \Maybe a
l Maybe a
r ->
                                    a -> Maybe a -> a
forall a. a -> Maybe a -> a
fromMaybe (String -> a
forall a. HasCallStack => String -> a
error String
"fullOuterJoin: null on both sides of key column") (Maybe a
l Maybe a -> Maybe a -> Maybe a
forall a. Maybe a -> Maybe a -> Maybe a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Maybe a
r)
                                )
                                Vector (Maybe a)
lCol
                                Vector (Maybe a)
Vector (Maybe a)
rCol
                    Maybe (a :~: a)
Nothing -> String -> Column
forall a. HasCallStack => String -> a
error String
"Cannot join columns of different types"
        coalesceKeyColumn Column
_ Column
_ = String -> Column
forall a. HasCallStack => String -> a
error String
"fullOuterJoin: expected OptionalColumn for key columns"
     in (Text -> DataFrame -> DataFrame)
-> [Text] -> DataFrame -> DataFrame
forall a.
(a -> DataFrame -> DataFrame) -> [a] -> DataFrame -> DataFrame
D.fold
            ( \Text
name DataFrame
df ->
                if Text -> Set Text -> Bool
forall a. Ord a => a -> Set a -> Bool
S.member Text
name Set Text
csSet
                    then -- Key column: coalesce left and right
                        case (Text -> Maybe Column
getExpandedLeft Text
name, Text -> Maybe Column
getExpandedRight Text
name) of
                            (Just Column
lc, Just Column
rc) -> Text -> Column -> DataFrame -> DataFrame
D.insertColumn Text
name (Column -> Column -> Column
coalesceKeyColumn Column
lc Column
rc) DataFrame
df
                            (Maybe Column, Maybe Column)
_ -> DataFrame
df
                    else
                        if Text -> Set Text -> Bool
forall a. Ord a => a -> Set a -> Bool
S.member Text
name Set Text
leftColSet
                            then
                                Text -> Maybe Column -> DataFrame -> DataFrame
insertIfPresent
                                    Text
name
                                    (Column -> Column -> Column
D.mergeColumns (Column -> Column -> Column)
-> Maybe Column -> Maybe (Column -> Column)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text -> Maybe Column
getExpandedLeft Text
name Maybe (Column -> Column) -> Maybe Column -> Maybe Column
forall a b. Maybe (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Maybe Column
getExpandedRight Text
name)
                                    DataFrame
df
                            else Text -> Maybe Column -> DataFrame -> DataFrame
insertIfPresent Text
name (Text -> Maybe Column
getExpandedRight Text
name) DataFrame
df
            )
            [Text]
rightColNames
            DataFrame
baseDf