module DataFrame.IO.Parquet.Levels where

import Data.Int
import Data.List
import qualified Data.Text as T
import Data.Word

import DataFrame.IO.Parquet.Binary
import DataFrame.IO.Parquet.Encoding
import DataFrame.IO.Parquet.Thrift
import DataFrame.IO.Parquet.Types

readLevelsV1 :: Int -> Int -> Int -> [Word8] -> ([Int], [Int], [Word8])
readLevelsV1 :: Int -> Int -> Int -> [Word8] -> ([Int], [Int], [Word8])
readLevelsV1 Int
n Int
maxDef Int
maxRep [Word8]
bs =
    let bwDef :: Int
bwDef = Int -> Int
bitWidthForMaxLevel Int
maxDef
        bwRep :: Int
bwRep = Int -> Int
bitWidthForMaxLevel Int
maxRep

        ([Word32]
repLvlsU32, [Word8]
afterRep) =
            if Int
bwRep Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
                then (Int -> Word32 -> [Word32]
forall a. Int -> a -> [a]
replicate Int
n Word32
0, [Word8]
bs)
                else
                    let repLength :: Word32
repLength = [Word8] -> Word32
littleEndianWord32 (Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take Int
4 [Word8]
bs)
                        repData :: [Word8]
repData = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take (Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
repLength) (Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
4 [Word8]
bs)
                        afterRepData :: [Word8]
afterRepData = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop (Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
repLength) [Word8]
bs
                        ([Word32]
repVals, [Word8]
_) = Int -> Int -> [Word8] -> ([Word32], [Word8])
decodeRLEBitPackedHybrid Int
bwRep Int
n [Word8]
repData
                     in ([Word32]
repVals, [Word8]
afterRepData)

        ([Word32]
defLvlsU32, [Word8]
afterDef) =
            if Int
bwDef Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
                then (Int -> Word32 -> [Word32]
forall a. Int -> a -> [a]
replicate Int
n Word32
0, [Word8]
afterRep)
                else
                    let defLength :: Word32
defLength = [Word8] -> Word32
littleEndianWord32 (Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take Int
4 [Word8]
afterRep)
                        defData :: [Word8]
defData = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take (Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
defLength) (Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
4 [Word8]
afterRep)
                        afterDefData :: [Word8]
afterDefData = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop (Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
defLength) [Word8]
afterRep
                        ([Word32]
defVals, [Word8]
_) = Int -> Int -> [Word8] -> ([Word32], [Word8])
decodeRLEBitPackedHybrid Int
bwDef Int
n [Word8]
defData
                     in ([Word32]
defVals, [Word8]
afterDefData)
     in ((Word32 -> Int) -> [Word32] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral [Word32]
defLvlsU32, (Word32 -> Int) -> [Word32] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral [Word32]
repLvlsU32, [Word8]
afterDef)

readLevelsV2 ::
    Int -> Int -> Int -> Int32 -> Int32 -> [Word8] -> ([Int], [Int], [Word8])
readLevelsV2 :: Int
-> Int
-> Int
-> Int32
-> Int32
-> [Word8]
-> ([Int], [Int], [Word8])
readLevelsV2 Int
n Int
maxDef Int
maxRep Int32
defLen Int32
repLen [Word8]
bs =
    let ([Word8]
repBytes, [Word8]
afterRepBytes) = Int -> [Word8] -> ([Word8], [Word8])
forall a. Int -> [a] -> ([a], [a])
splitAt (Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
repLen) [Word8]
bs
        ([Word8]
defBytes, [Word8]
afterDefBytes) = Int -> [Word8] -> ([Word8], [Word8])
forall a. Int -> [a] -> ([a], [a])
splitAt (Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
defLen) [Word8]
afterRepBytes
        bwDef :: Int
bwDef = Int -> Int
bitWidthForMaxLevel Int
maxDef
        bwRep :: Int
bwRep = Int -> Int
bitWidthForMaxLevel Int
maxRep
        ([Word32]
repLvlsU32, [Word8]
_) =
            if Int
bwRep Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
                then (Int -> Word32 -> [Word32]
forall a. Int -> a -> [a]
replicate Int
n Word32
0, [Word8]
repBytes)
                else Int -> Int -> [Word8] -> ([Word32], [Word8])
decodeRLEBitPackedHybrid Int
bwRep Int
n [Word8]
repBytes
        ([Word32]
defLvlsU32, [Word8]
_) =
            if Int
bwDef Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
                then (Int -> Word32 -> [Word32]
forall a. Int -> a -> [a]
replicate Int
n Word32
0, [Word8]
defBytes)
                else Int -> Int -> [Word8] -> ([Word32], [Word8])
decodeRLEBitPackedHybrid Int
bwDef Int
n [Word8]
defBytes
     in ((Word32 -> Int) -> [Word32] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral [Word32]
defLvlsU32, (Word32 -> Int) -> [Word32] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral [Word32]
repLvlsU32, [Word8]
afterDefBytes)

stitchNullable :: Int -> [Int] -> [a] -> [Maybe a]
stitchNullable :: forall a. Int -> [Int] -> [a] -> [Maybe a]
stitchNullable Int
maxDef [Int]
defLvls [a]
vals = [Int] -> [a] -> [Maybe a]
forall {a}. [Int] -> [a] -> [Maybe a]
go [Int]
defLvls [a]
vals
  where
    go :: [Int] -> [a] -> [Maybe a]
go [] [a]
_ = []
    go (Int
d : [Int]
ds) [a]
vs
        | Int
d Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
maxDef = case [a]
vs of
            (a
v : [a]
vs') -> a -> Maybe a
forall a. a -> Maybe a
Just a
v Maybe a -> [Maybe a] -> [Maybe a]
forall a. a -> [a] -> [a]
: [Int] -> [a] -> [Maybe a]
go [Int]
ds [a]
vs'
            [] -> [Char] -> [Maybe a]
forall a. HasCallStack => [Char] -> a
error [Char]
"value stream exhausted"
        | Bool
otherwise = Maybe a
forall a. Maybe a
Nothing Maybe a -> [Maybe a] -> [Maybe a]
forall a. a -> [a] -> [a]
: [Int] -> [a] -> [Maybe a]
go [Int]
ds [a]
vs

data SNode = SNode
    { SNode -> [Char]
sName :: String
    , SNode -> RepetitionType
sRep :: RepetitionType
    , SNode -> [SNode]
sChildren :: [SNode]
    }
    deriving (Int -> SNode -> ShowS
[SNode] -> ShowS
SNode -> [Char]
(Int -> SNode -> ShowS)
-> (SNode -> [Char]) -> ([SNode] -> ShowS) -> Show SNode
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SNode -> ShowS
showsPrec :: Int -> SNode -> ShowS
$cshow :: SNode -> [Char]
show :: SNode -> [Char]
$cshowList :: [SNode] -> ShowS
showList :: [SNode] -> ShowS
Show, SNode -> SNode -> Bool
(SNode -> SNode -> Bool) -> (SNode -> SNode -> Bool) -> Eq SNode
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SNode -> SNode -> Bool
== :: SNode -> SNode -> Bool
$c/= :: SNode -> SNode -> Bool
/= :: SNode -> SNode -> Bool
Eq)

parseOne :: [SchemaElement] -> (SNode, [SchemaElement])
parseOne :: [SchemaElement] -> (SNode, [SchemaElement])
parseOne [] = [Char] -> (SNode, [SchemaElement])
forall a. HasCallStack => [Char] -> a
error [Char]
"parseOne: empty schema list"
parseOne (SchemaElement
se : [SchemaElement]
rest) =
    let childCount :: Int
childCount = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (SchemaElement -> Int32
numChildren SchemaElement
se)
        ([SNode]
kids, [SchemaElement]
rest') = Int -> [SchemaElement] -> ([SNode], [SchemaElement])
parseMany Int
childCount [SchemaElement]
rest
     in ( SNode
            { sName :: [Char]
sName = Text -> [Char]
T.unpack (SchemaElement -> Text
elementName SchemaElement
se)
            , sRep :: RepetitionType
sRep = SchemaElement -> RepetitionType
repetitionType SchemaElement
se
            , sChildren :: [SNode]
sChildren = [SNode]
kids
            }
        , [SchemaElement]
rest'
        )

parseMany :: Int -> [SchemaElement] -> ([SNode], [SchemaElement])
parseMany :: Int -> [SchemaElement] -> ([SNode], [SchemaElement])
parseMany Int
0 [SchemaElement]
xs = ([], [SchemaElement]
xs)
parseMany Int
n [SchemaElement]
xs =
    let (SNode
node, [SchemaElement]
xs') = [SchemaElement] -> (SNode, [SchemaElement])
parseOne [SchemaElement]
xs
        ([SNode]
nodes, [SchemaElement]
xs'') = Int -> [SchemaElement] -> ([SNode], [SchemaElement])
parseMany (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) [SchemaElement]
xs'
     in (SNode
node SNode -> [SNode] -> [SNode]
forall a. a -> [a] -> [a]
: [SNode]
nodes, [SchemaElement]
xs'')

parseAll :: [SchemaElement] -> [SNode]
parseAll :: [SchemaElement] -> [SNode]
parseAll [] = []
parseAll [SchemaElement]
xs = let (SNode
n, [SchemaElement]
xs') = [SchemaElement] -> (SNode, [SchemaElement])
parseOne [SchemaElement]
xs in SNode
n SNode -> [SNode] -> [SNode]
forall a. a -> [a] -> [a]
: [SchemaElement] -> [SNode]
parseAll [SchemaElement]
xs'

levelsForPath :: [SchemaElement] -> [String] -> (Int, Int)
levelsForPath :: [SchemaElement] -> [[Char]] -> (Int, Int)
levelsForPath [SchemaElement]
schemaTail [[Char]]
path = Int -> Int -> [SNode] -> [[Char]] -> (Int, Int)
forall {t} {t}.
(Num t, Num t) =>
t -> t -> [SNode] -> [[Char]] -> (t, t)
go Int
0 Int
0 ([SchemaElement] -> [SNode]
parseAll [SchemaElement]
schemaTail) [[Char]]
path
  where
    go :: t -> t -> [SNode] -> [[Char]] -> (t, t)
go t
defC t
repC [SNode]
_ [] = (t
defC, t
repC)
    go t
defC t
repC [SNode]
nodes ([Char]
p : [[Char]]
ps) =
        case (SNode -> Bool) -> [SNode] -> Maybe SNode
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (\SNode
n -> SNode -> [Char]
sName SNode
n [Char] -> [Char] -> Bool
forall a. Eq a => a -> a -> Bool
== [Char]
p) [SNode]
nodes of
            Maybe SNode
Nothing -> (t
defC, t
repC)
            Just SNode
n ->
                let defC' :: t
defC' = t
defC t -> t -> t
forall a. Num a => a -> a -> a
+ (if SNode -> RepetitionType
sRep SNode
n RepetitionType -> RepetitionType -> Bool
forall a. Eq a => a -> a -> Bool
== RepetitionType
OPTIONAL then t
1 else t
0)
                    repC' :: t
repC' = t
repC t -> t -> t
forall a. Num a => a -> a -> a
+ (if SNode -> RepetitionType
sRep SNode
n RepetitionType -> RepetitionType -> Bool
forall a. Eq a => a -> a -> Bool
== RepetitionType
REPEATED then t
1 else t
0)
                 in t -> t -> [SNode] -> [[Char]] -> (t, t)
go t
defC' t
repC' (SNode -> [SNode]
sChildren SNode
n) [[Char]]
ps