module DataFrame.IO.Parquet.Encoding where

import Data.Bits
import Data.List (foldl')
import Data.Word
import DataFrame.IO.Parquet.Binary

ceilLog2 :: Int -> Int
ceilLog2 :: Int -> Int
ceilLog2 Int
x
    | Int
x Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
1 = Int
0
    | Bool
otherwise = Int
1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int -> Int
ceilLog2 ((Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
2)

bitWidthForMaxLevel :: Int -> Int
bitWidthForMaxLevel :: Int -> Int
bitWidthForMaxLevel Int
maxLevel = Int -> Int
ceilLog2 (Int
maxLevel Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)

bytesForBW :: Int -> Int
bytesForBW :: Int -> Int
bytesForBW Int
bw = (Int
bw Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
7) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
8

unpackBitPacked :: Int -> Int -> [Word8] -> ([Word32], [Word8])
unpackBitPacked :: Int -> Int -> [Word8] -> ([Word32], [Word8])
unpackBitPacked Int
bw Int
count [Word8]
bs
    | Int
count Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = ([], [Word8]
bs)
    | [Word8] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Word8]
bs = ([], [Word8]
bs)
    | Bool
otherwise =
        let totalBits :: Int
totalBits = Int
bw Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
count
            totalBytes :: Int
totalBytes = (Int
totalBits Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
7) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
8
            chunk :: [Word8]
chunk = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take Int
totalBytes [Word8]
bs
            rest :: [Word8]
rest = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
totalBytes [Word8]
bs
            bits :: [Word8]
bits = (Word8 -> [Word8]) -> [Word8] -> [Word8]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (\Word8
b -> (Int -> Word8) -> [Int] -> [Word8]
forall a b. (a -> b) -> [a] -> [b]
map (\Int
i -> (Word8
b Word8 -> Int -> Word8
forall a. Bits a => a -> Int -> a
`shiftR` Int
i) Word8 -> Word8 -> Word8
forall a. Bits a => a -> a -> a
.&. Word8
1) [Int
0 .. Int
7]) [Word8]
chunk
            toN :: [b] -> b
toN [b]
xs = (b -> (Int, b) -> b) -> b -> [(Int, b)] -> b
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (\b
a (Int
i, b
b) -> b
a b -> b -> b
forall a. Bits a => a -> a -> a
.|. (b
b b -> Int -> b
forall a. Bits a => a -> Int -> a
`shiftL` Int
i)) b
0 ([Int] -> [b] -> [(Int, b)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
0 ..] [b]
xs)

            extractValues :: t -> [a] -> [a]
extractValues t
_ [] = []
            extractValues t
n [a]
bitsLeft
                | t
n t -> t -> Bool
forall a. Ord a => a -> a -> Bool
<= t
0 = []
                | [a] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [a]
bitsLeft Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
bw = []
                | Bool
otherwise =
                    let ([a]
this, [a]
bitsLeft') = Int -> [a] -> ([a], [a])
forall a. Int -> [a] -> ([a], [a])
splitAt Int
bw [a]
bitsLeft
                     in [a] -> a
forall {b}. (Bits b, Num b) => [b] -> b
toN [a]
this a -> [a] -> [a]
forall a. a -> [a] -> [a]
: t -> [a] -> [a]
extractValues (t
n t -> t -> t
forall a. Num a => a -> a -> a
- t
1) [a]
bitsLeft'

            vals :: [Word8]
vals = Int -> [Word8] -> [Word8]
forall {t} {a}. (Ord t, Bits a, Num t, Num a) => t -> [a] -> [a]
extractValues Int
count [Word8]
bits
         in ((Word8 -> Word32) -> [Word8] -> [Word32]
forall a b. (a -> b) -> [a] -> [b]
map Word8 -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral [Word8]
vals, [Word8]
rest)

decodeRLEBitPackedHybrid :: Int -> Int -> [Word8] -> ([Word32], [Word8])
decodeRLEBitPackedHybrid :: Int -> Int -> [Word8] -> ([Word32], [Word8])
decodeRLEBitPackedHybrid Int
bw Int
need [Word8]
bs
    | Int
bw Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 = (Int -> Word32 -> [Word32]
forall a. Int -> a -> [a]
replicate Int
need Word32
0, [Word8]
bs)
    | Bool
otherwise = Int -> [Word8] -> [Word32] -> ([Word32], [Word8])
go Int
need [Word8]
bs []
  where
    mask :: Word32
    mask :: Word32
mask = if Int
bw Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
32 then Word32
forall a. Bounded a => a
maxBound else (Word32
1 Word32 -> Int -> Word32
forall a. Bits a => a -> Int -> a
`shiftL` Int
bw) Word32 -> Word32 -> Word32
forall a. Num a => a -> a -> a
- Word32
1
    go :: Int -> [Word8] -> [Word32] -> ([Word32], [Word8])
go Int
0 [Word8]
rest [Word32]
acc = ([Word32] -> [Word32]
forall a. [a] -> [a]
reverse [Word32]
acc, [Word8]
rest)
    go Int
n [Word8]
rest [Word32]
acc
        | [Word8] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Word8]
rest = ([Word32] -> [Word32]
forall a. [a] -> [a]
reverse [Word32]
acc, [Word8]
rest)
        | Bool
otherwise =
            let (Word64
hdr64, [Word8]
afterHdr) = [Word8] -> (Word64, [Word8])
readUVarInt [Word8]
rest
                isPacked :: Bool
isPacked = (Word64
hdr64 Word64 -> Word64 -> Word64
forall a. Bits a => a -> a -> a
.&. Word64
1) Word64 -> Word64 -> Bool
forall a. Eq a => a -> a -> Bool
== Word64
1
             in if Bool
isPacked
                    then
                        let groups :: Int
groups = Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word64
hdr64 Word64 -> Int -> Word64
forall a. Bits a => a -> Int -> a
`shiftR` Int
1) :: Int
                            totalVals :: Int
totalVals = Int
groups Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
8
                            ([Word32]
valsAll, [Word8]
afterRun) = Int -> Int -> [Word8] -> ([Word32], [Word8])
unpackBitPacked Int
bw Int
totalVals [Word8]
afterHdr
                            takeN :: Int
takeN = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
n Int
totalVals
                            actualTaken :: [Word32]
actualTaken = Int -> [Word32] -> [Word32]
forall a. Int -> [a] -> [a]
take Int
takeN [Word32]
valsAll
                         in Int -> [Word8] -> [Word32] -> ([Word32], [Word8])
go (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
takeN) [Word8]
afterRun ([Word32] -> [Word32]
forall a. [a] -> [a]
reverse [Word32]
actualTaken [Word32] -> [Word32] -> [Word32]
forall a. [a] -> [a] -> [a]
++ [Word32]
acc)
                    else
                        let runLen :: Int
runLen = Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word64
hdr64 Word64 -> Int -> Word64
forall a. Bits a => a -> Int -> a
`shiftR` Int
1) :: Int
                            nbytes :: Int
nbytes = Int -> Int
bytesForBW Int
bw
                            word32 :: Word32
word32 = [Word8] -> Word32
littleEndianWord32 (Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take Int
4 [Word8]
afterHdr)
                            afterV :: [Word8]
afterV = Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
drop Int
nbytes [Word8]
afterHdr
                            val :: Word32
val = Word32
word32 Word32 -> Word32 -> Word32
forall a. Bits a => a -> a -> a
.&. Word32
mask
                            takeN :: Int
takeN = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
n Int
runLen
                         in Int -> [Word8] -> [Word32] -> ([Word32], [Word8])
go (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
takeN) [Word8]
afterV (Int -> Word32 -> [Word32]
forall a. Int -> a -> [a]
replicate Int
takeN Word32
val [Word32] -> [Word32] -> [Word32]
forall a. [a] -> [a] -> [a]
++ [Word32]
acc)

decodeDictIndicesV1 :: Int -> Int -> [Word8] -> ([Int], [Word8])
decodeDictIndicesV1 :: Int -> Int -> [Word8] -> ([Int], [Word8])
decodeDictIndicesV1 Int
need Int
dictCard [Word8]
bs =
    case [Word8]
bs of
        [] -> [Char] -> ([Int], [Word8])
forall a. HasCallStack => [Char] -> a
error [Char]
"empty dictionary index stream"
        (Word8
w0 : [Word8]
rest0) ->
            let widthFromDict :: Int
widthFromDict = Int -> Int
ceilLog2 Int
dictCard
                looksLikeWidth :: Bool
looksLikeWidth = Word8
w0 Word8 -> Word8 -> Bool
forall a. Ord a => a -> a -> Bool
<= Word8
32 Bool -> Bool -> Bool
&& (Word8
w0 Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
/= Word8
0 Bool -> Bool -> Bool
|| Int
dictCard Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
1)
                tryWithWidthByte :: ([Int], [Word8])
tryWithWidthByte =
                    let bw :: Int
bw = Word8 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word8
w0
                        ([Word32]
u32s, [Word8]
rest1) = Int -> Int -> [Word8] -> ([Word32], [Word8])
decodeRLEBitPackedHybrid Int
bw Int
need [Word8]
rest0
                     in ((Word32 -> Int) -> [Word32] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral [Word32]
u32s, [Word8]
rest1)
                tryWithoutWidthByte :: ([Int], [Word8])
tryWithoutWidthByte =
                    let bw :: Int
bw = Int
widthFromDict
                        ([Word32]
u32s, [Word8]
rest1) = Int -> Int -> [Word8] -> ([Word32], [Word8])
decodeRLEBitPackedHybrid Int
bw Int
need [Word8]
bs
                     in ((Word32 -> Int) -> [Word32] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral [Word32]
u32s, [Word8]
rest1)
                ([Int]
idxs, [Word8]
rest') =
                    if Bool
looksLikeWidth
                        then
                            let ([Int]
xs, [Word8]
r) = ([Int], [Word8])
tryWithWidthByte
                             in if [Int] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Int]
xs Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
need then ([Int]
xs, [Word8]
r) else ([Int], [Word8])
tryWithoutWidthByte
                        else
                            ([Int], [Word8])
tryWithoutWidthByte
             in ([Int]
idxs, [Word8]
rest')