{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE ExplicitNamespaces #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

module DataFrame.Lazy.IO.CSV where

import qualified Data.ByteString as BS
import qualified Data.Map as M
import qualified Data.Proxy as P
import qualified Data.Text as T
import qualified Data.Text.Encoding as TextEncoding
import qualified Data.Text.IO as TIO
import qualified Data.Vector as V
import qualified Data.Vector.Mutable as VM
import qualified Data.Vector.Unboxed.Mutable as VUM

import Control.Monad (forM_, unless, when, zipWithM_)
import Data.Attoparsec.Text (IResult (..), parseWith)
import Data.Char (intToDigit)
import Data.IORef
import Data.Maybe (fromMaybe, isJust)
import Data.Type.Equality (TestEquality (testEquality))
import Data.Word (Word8)
import DataFrame.Internal.Column (
    Column (..),
    MutableColumn (..),
    columnLength,
    freezeColumn',
    writeColumn,
 )
import DataFrame.Internal.DataFrame (DataFrame (..))
import DataFrame.Internal.Parsing
import DataFrame.Internal.Schema (Schema, SchemaType (..), elements)
import System.IO
import Type.Reflection
import Prelude hiding (takeWhile)

-- | Record for CSV read options.
data ReadOptions = ReadOptions
    { ReadOptions -> Bool
hasHeader :: Bool
    , ReadOptions -> Bool
inferTypes :: Bool
    , ReadOptions -> Bool
safeRead :: Bool
    , ReadOptions -> Maybe (Int, Int)
rowRange :: !(Maybe (Int, Int)) -- (start, length)
    , ReadOptions -> Maybe Integer
seekPos :: !(Maybe Integer)
    , ReadOptions -> Maybe Int
totalRows :: !(Maybe Int)
    , ReadOptions -> Text
leftOver :: !T.Text
    , ReadOptions -> Int
rowsRead :: !Int
    }

{- | By default we assume the file has a header, we infer the types on read
and we convert any rows with nullish objects into Maybe (safeRead).
-}
defaultOptions :: ReadOptions
defaultOptions :: ReadOptions
defaultOptions =
    ReadOptions
        { hasHeader :: Bool
hasHeader = Bool
True
        , inferTypes :: Bool
inferTypes = Bool
True
        , safeRead :: Bool
safeRead = Bool
True
        , rowRange :: Maybe (Int, Int)
rowRange = Maybe (Int, Int)
forall a. Maybe a
Nothing
        , seekPos :: Maybe Integer
seekPos = Maybe Integer
forall a. Maybe a
Nothing
        , totalRows :: Maybe Int
totalRows = Maybe Int
forall a. Maybe a
Nothing
        , leftOver :: Text
leftOver = Text
""
        , rowsRead :: Int
rowsRead = Int
0
        }

{- | Reads a CSV file from the given path.
Note this file stores intermediate temporary files
while converting the CSV from a row to a columnar format.
-}
readCsv :: FilePath -> IO DataFrame
readCsv :: String -> IO DataFrame
readCsv String
path = (DataFrame, (Integer, Text, Int)) -> DataFrame
forall a b. (a, b) -> a
fst ((DataFrame, (Integer, Text, Int)) -> DataFrame)
-> IO (DataFrame, (Integer, Text, Int)) -> IO DataFrame
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Char
-> ReadOptions -> String -> IO (DataFrame, (Integer, Text, Int))
readSeparated Char
',' ReadOptions
defaultOptions String
path

{- | Reads a tab separated file from the given path.
Note this file stores intermediate temporary files
while converting the CSV from a row to a columnar format.
-}
readTsv :: FilePath -> IO DataFrame
readTsv :: String -> IO DataFrame
readTsv String
path = (DataFrame, (Integer, Text, Int)) -> DataFrame
forall a b. (a, b) -> a
fst ((DataFrame, (Integer, Text, Int)) -> DataFrame)
-> IO (DataFrame, (Integer, Text, Int)) -> IO DataFrame
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Char
-> ReadOptions -> String -> IO (DataFrame, (Integer, Text, Int))
readSeparated Char
'\t' ReadOptions
defaultOptions String
path

-- | Reads a character separated file into a dataframe using mutable vectors.
readSeparated ::
    Char -> ReadOptions -> FilePath -> IO (DataFrame, (Integer, T.Text, Int))
readSeparated :: Char
-> ReadOptions -> String -> IO (DataFrame, (Integer, Text, Int))
readSeparated Char
c ReadOptions
opts String
path = do
    Int
totalRows <- case ReadOptions -> Maybe Int
totalRows ReadOptions
opts of
        Maybe Int
Nothing ->
            Char -> String -> IO Int
countRows Char
c String
path IO Int -> (Int -> IO Int) -> IO Int
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Int
total -> if ReadOptions -> Bool
hasHeader ReadOptions
opts then Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
total Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) else Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
total
        Just Int
n -> if ReadOptions -> Bool
hasHeader ReadOptions
opts then Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) else Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
n
    let (Int
_, Int
len) = case ReadOptions -> Maybe (Int, Int)
rowRange ReadOptions
opts of
            Maybe (Int, Int)
Nothing -> (Int
0, Int
totalRows)
            Just (Int
start, Int
len) -> (Int
start, Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
len (Int
totalRows Int -> Int -> Int
forall a. Num a => a -> a -> a
- ReadOptions -> Int
rowsRead ReadOptions
opts))
    String
-> IOMode
-> (Handle -> IO (DataFrame, (Integer, Text, Int)))
-> IO (DataFrame, (Integer, Text, Int))
forall r. String -> IOMode -> (Handle -> IO r) -> IO r
withFile String
path IOMode
ReadMode ((Handle -> IO (DataFrame, (Integer, Text, Int)))
 -> IO (DataFrame, (Integer, Text, Int)))
-> (Handle -> IO (DataFrame, (Integer, Text, Int)))
-> IO (DataFrame, (Integer, Text, Int))
forall a b. (a -> b) -> a -> b
$ \Handle
handle -> do
        [Text]
firstRow <- (Text -> Text) -> [Text] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Text -> Text
T.strip ([Text] -> [Text]) -> (Text -> [Text]) -> Text -> [Text]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Char -> Text -> [Text]
parseSep Char
c (Text -> [Text]) -> IO Text -> IO [Text]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Handle -> IO Text
TIO.hGetLine Handle
handle
        let columnNames :: [Text]
columnNames =
                if ReadOptions -> Bool
hasHeader ReadOptions
opts
                    then (Text -> Text) -> [Text] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Char -> Bool) -> Text -> Text
T.filter (Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
/= Char
'\"')) [Text]
firstRow
                    else (Int -> Text) -> [Int] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Char -> Text
T.singleton (Char -> Text) -> (Int -> Char) -> Int -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Char
intToDigit) [Int
0 .. ([Text] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Text]
firstRow Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)]
        -- If there was no header rewind the file cursor.
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ReadOptions -> Bool
hasHeader ReadOptions
opts) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Handle -> SeekMode -> Integer -> IO ()
hSeek Handle
handle SeekMode
AbsoluteSeek Integer
0

        Integer
currPos <- Handle -> IO Integer
hTell Handle
handle
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe Integer -> Bool
forall a. Maybe a -> Bool
isJust (Maybe Integer -> Bool) -> Maybe Integer -> Bool
forall a b. (a -> b) -> a -> b
$ ReadOptions -> Maybe Integer
seekPos ReadOptions
opts) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
            Handle -> SeekMode -> Integer -> IO ()
hSeek Handle
handle SeekMode
AbsoluteSeek (Integer -> Maybe Integer -> Integer
forall a. a -> Maybe a -> a
fromMaybe Integer
currPos (ReadOptions -> Maybe Integer
seekPos ReadOptions
opts))

        -- Initialize mutable vectors for each column
        let numColumns :: Int
numColumns = [Text] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Text]
columnNames
        let numRows :: Int
numRows = Int
len
        -- Use this row to infer the types of the rest of the column.
        ([Text]
dataRow, Text
remainder) <- Char -> Text -> Handle -> IO ([Text], Text)
readSingleLine Char
c (ReadOptions -> Text
leftOver ReadOptions
opts) Handle
handle

        -- This array will track the indices of all null values for each column.
        IOVector [(Int, Text)]
nullIndices <- Int -> IO (MVector (PrimState IO) [(Int, Text)])
forall (m :: * -> *) a.
PrimMonad m =>
Int -> m (MVector (PrimState m) a)
VM.unsafeNew Int
numColumns
        MVector (PrimState IO) [(Int, Text)] -> [(Int, Text)] -> IO ()
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> a -> m ()
VM.set IOVector [(Int, Text)]
MVector (PrimState IO) [(Int, Text)]
nullIndices []
        IOVector MutableColumn
mutableCols <- Int -> IO (MVector (PrimState IO) MutableColumn)
forall (m :: * -> *) a.
PrimMonad m =>
Int -> m (MVector (PrimState m) a)
VM.unsafeNew Int
numColumns
        Int -> IOVector MutableColumn -> [Text] -> IO ()
getInitialDataVectors Int
numRows IOVector MutableColumn
mutableCols [Text]
dataRow

        -- Read rows into the mutable vectors
        (Text
unconsumed, Int
r) <-
            Int
-> Char
-> IOVector MutableColumn
-> IOVector [(Int, Text)]
-> Text
-> Handle
-> IO (Text, Int)
fillColumns Int
numRows Char
c IOVector MutableColumn
mutableCols IOVector [(Int, Text)]
nullIndices Text
remainder Handle
handle

        -- Freeze the mutable vectors into immutable ones
        Vector [(Int, Text)]
nulls' <- MVector (PrimState IO) [(Int, Text)] -> IO (Vector [(Int, Text)])
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> m (Vector a)
V.unsafeFreeze IOVector [(Int, Text)]
MVector (PrimState IO) [(Int, Text)]
nullIndices
        Vector Column
cols <- (Int -> IO Column) -> Vector Int -> IO (Vector Column)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Vector a -> m (Vector b)
V.mapM (IOVector MutableColumn
-> Vector [(Int, Text)] -> ReadOptions -> Int -> IO Column
freezeColumn IOVector MutableColumn
mutableCols Vector [(Int, Text)]
nulls' ReadOptions
opts) (Int -> (Int -> Int) -> Vector Int
forall a. Int -> (Int -> a) -> Vector a
V.generate Int
numColumns Int -> Int
forall a. a -> a
id)
        Integer
pos <- Handle -> IO Integer
hTell Handle
handle

        (DataFrame, (Integer, Text, Int))
-> IO (DataFrame, (Integer, Text, Int))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return
            ( DataFrame
                { columns :: Vector Column
columns = Vector Column
cols
                , columnIndices :: Map Text Int
columnIndices = [(Text, Int)] -> Map Text Int
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList ([Text] -> [Int] -> [(Text, Int)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Text]
columnNames [Int
0 ..])
                , dataframeDimensions :: (Int, Int)
dataframeDimensions = (Int -> (Column -> Int) -> Maybe Column -> Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int
0 Column -> Int
columnLength (Vector Column
cols Vector Column -> Int -> Maybe Column
forall a. Vector a -> Int -> Maybe a
V.!? Int
0), Vector Column -> Int
forall a. Vector a -> Int
V.length Vector Column
cols)
                , derivingExpressions :: Map Text UExpr
derivingExpressions = Map Text UExpr
forall k a. Map k a
M.empty
                }
            , (Integer
pos, Text
unconsumed, Int
r Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
            )
{-# INLINE readSeparated #-}

getInitialDataVectors :: Int -> VM.IOVector MutableColumn -> [T.Text] -> IO ()
getInitialDataVectors :: Int -> IOVector MutableColumn -> [Text] -> IO ()
getInitialDataVectors Int
n IOVector MutableColumn
mCol [Text]
xs = do
    [(Int, Text)] -> ((Int, Text) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([Int] -> [Text] -> [(Int, Text)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
0 ..] [Text]
xs) (((Int, Text) -> IO ()) -> IO ())
-> ((Int, Text) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Int
i, Text
x) -> do
        MutableColumn
col <- case Text -> Text
inferValueType Text
x of
            Text
"Int" ->
                IOVector Int -> MutableColumn
forall a. (Columnable a, Unbox a) => IOVector a -> MutableColumn
MUnboxedColumn
                    (IOVector Int -> MutableColumn)
-> IO (IOVector Int) -> IO MutableColumn
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ( (Int -> IO (MVector (PrimState IO) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
Int -> m (MVector (PrimState m) a)
VUM.unsafeNew Int
n :: IO (VUM.IOVector Int)) IO (IOVector Int)
-> (IOVector Int -> IO (IOVector Int)) -> IO (IOVector Int)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \IOVector Int
c -> MVector (PrimState IO) Int -> Int -> Int -> IO ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite IOVector Int
MVector (PrimState IO) Int
c Int
0 (Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe Int
0 (Maybe Int -> Int) -> Maybe Int -> Int
forall a b. (a -> b) -> a -> b
$ HasCallStack => Text -> Maybe Int
Text -> Maybe Int
readInt Text
x) IO () -> IO (IOVector Int) -> IO (IOVector Int)
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IOVector Int -> IO (IOVector Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return IOVector Int
c
                        )
            Text
"Double" ->
                IOVector Double -> MutableColumn
forall a. (Columnable a, Unbox a) => IOVector a -> MutableColumn
MUnboxedColumn
                    (IOVector Double -> MutableColumn)
-> IO (IOVector Double) -> IO MutableColumn
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ( (Int -> IO (MVector (PrimState IO) Double)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
Int -> m (MVector (PrimState m) a)
VUM.unsafeNew Int
n :: IO (VUM.IOVector Double)) IO (IOVector Double)
-> (IOVector Double -> IO (IOVector Double))
-> IO (IOVector Double)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \IOVector Double
c -> MVector (PrimState IO) Double -> Int -> Double -> IO ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite IOVector Double
MVector (PrimState IO) Double
c Int
0 (Double -> Maybe Double -> Double
forall a. a -> Maybe a -> a
fromMaybe Double
0 (Maybe Double -> Double) -> Maybe Double -> Double
forall a b. (a -> b) -> a -> b
$ HasCallStack => Text -> Maybe Double
Text -> Maybe Double
readDouble Text
x) IO () -> IO (IOVector Double) -> IO (IOVector Double)
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IOVector Double -> IO (IOVector Double)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return IOVector Double
c
                        )
            Text
_ ->
                IOVector Text -> MutableColumn
forall a. Columnable a => IOVector a -> MutableColumn
MBoxedColumn
                    (IOVector Text -> MutableColumn)
-> IO (IOVector Text) -> IO MutableColumn
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ( (Int -> IO (MVector (PrimState IO) Text)
forall (m :: * -> *) a.
PrimMonad m =>
Int -> m (MVector (PrimState m) a)
VM.unsafeNew Int
n :: IO (VM.IOVector T.Text)) IO (IOVector Text)
-> (IOVector Text -> IO (IOVector Text)) -> IO (IOVector Text)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \IOVector Text
c -> MVector (PrimState IO) Text -> Int -> Text -> IO ()
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> Int -> a -> m ()
VM.unsafeWrite IOVector Text
MVector (PrimState IO) Text
c Int
0 Text
x IO () -> IO (IOVector Text) -> IO (IOVector Text)
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IOVector Text -> IO (IOVector Text)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return IOVector Text
c
                        )
        MVector (PrimState IO) MutableColumn
-> Int -> MutableColumn -> IO ()
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> Int -> a -> m ()
VM.unsafeWrite IOVector MutableColumn
MVector (PrimState IO) MutableColumn
mCol Int
i MutableColumn
col
{-# INLINE getInitialDataVectors #-}

-- | Reads rows from the handle and stores values in mutable vectors.
fillColumns ::
    Int ->
    Char ->
    VM.IOVector MutableColumn ->
    VM.IOVector [(Int, T.Text)] ->
    T.Text ->
    Handle ->
    IO (T.Text, Int)
fillColumns :: Int
-> Char
-> IOVector MutableColumn
-> IOVector [(Int, Text)]
-> Text
-> Handle
-> IO (Text, Int)
fillColumns Int
n Char
c IOVector MutableColumn
mutableCols IOVector [(Int, Text)]
nullIndices Text
unused Handle
handle = do
    IORef Text
input <- Text -> IO (IORef Text)
forall a. a -> IO (IORef a)
newIORef Text
unused
    IORef Int
rowsRead <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef (Int
0 :: Int)
    [Int] -> (Int -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Int
1 .. (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)] ((Int -> IO ()) -> IO ()) -> (Int -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
i -> do
        Bool
isEOF <- Handle -> IO Bool
hIsEOF Handle
handle
        Text
input' <- IORef Text -> IO Text
forall a. IORef a -> IO a
readIORef IORef Text
input
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Bool
isEOF Bool -> Bool -> Bool
&& Text
input' Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Text
forall a. Monoid a => a
mempty) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            IO Text -> Parser [Text] -> Text -> IO (Result [Text])
forall (m :: * -> *) a.
Monad m =>
m Text -> Parser a -> Text -> m (Result a)
parseWith (Handle -> IO Text
TIO.hGetChunk Handle
handle) (Char -> Parser [Text]
parseRow Char
c) Text
input' IO (Result [Text]) -> (Result [Text] -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                Fail Text
unconsumed [String]
ctx String
er -> do
                    Integer
erpos <- Handle -> IO Integer
hTell Handle
handle
                    String -> IO ()
forall a. String -> IO a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$
                        String
"Failed to parse CSV file around "
                            String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Integer -> String
forall a. Show a => a -> String
show Integer
erpos
                            String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" byte; due: "
                            String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String -> String
forall a. Show a => a -> String
show String
er
                            String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"; context: "
                            String -> String -> String
forall a. Semigroup a => a -> a -> a
<> [String] -> String
forall a. Show a => a -> String
show [String]
ctx
                Partial Text -> Result [Text]
_ -> do
                    String -> IO ()
forall a. String -> IO a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Partial handler is called"
                Done (Text
unconsumed :: T.Text) ([Text]
row :: [T.Text]) -> do
                    IORef Text -> Text -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Text
input Text
unconsumed
                    IORef Int -> (Int -> Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef Int
rowsRead (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                    (Int -> Text -> IO ()) -> [Int] -> [Text] -> IO ()
forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> m c) -> [a] -> [b] -> m ()
zipWithM_ (IOVector MutableColumn
-> IOVector [(Int, Text)] -> Int -> Int -> Text -> IO ()
writeValue IOVector MutableColumn
mutableCols IOVector [(Int, Text)]
nullIndices Int
i) [Int
0 ..] [Text]
row
    Text
l <- IORef Text -> IO Text
forall a. IORef a -> IO a
readIORef IORef Text
input
    Int
r <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
rowsRead
    (Text, Int) -> IO (Text, Int)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text
l, Int
r)
{-# INLINE fillColumns #-}

-- | Writes a value into the appropriate column, resizing the vector if necessary.
writeValue ::
    VM.IOVector MutableColumn ->
    VM.IOVector [(Int, T.Text)] ->
    Int ->
    Int ->
    T.Text ->
    IO ()
writeValue :: IOVector MutableColumn
-> IOVector [(Int, Text)] -> Int -> Int -> Text -> IO ()
writeValue IOVector MutableColumn
mutableCols IOVector [(Int, Text)]
nullIndices Int
count Int
colIndex Text
value = do
    MutableColumn
col <- MVector (PrimState IO) MutableColumn -> Int -> IO MutableColumn
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> Int -> m a
VM.unsafeRead IOVector MutableColumn
MVector (PrimState IO) MutableColumn
mutableCols Int
colIndex
    Either Text Bool
res <- Int -> Text -> MutableColumn -> IO (Either Text Bool)
writeColumn Int
count Text
value MutableColumn
col
    let modify :: Text -> IO ()
modify Text
value = MVector (PrimState IO) [(Int, Text)]
-> ([(Int, Text)] -> [(Int, Text)]) -> Int -> IO ()
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> (a -> a) -> Int -> m ()
VM.unsafeModify IOVector [(Int, Text)]
MVector (PrimState IO) [(Int, Text)]
nullIndices ((Int
count, Text
value) (Int, Text) -> [(Int, Text)] -> [(Int, Text)]
forall a. a -> [a] -> [a]
:) Int
colIndex
    (Text -> IO ()) -> (Bool -> IO ()) -> Either Text Bool -> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either Text -> IO ()
modify (IO () -> Bool -> IO ()
forall a b. a -> b -> a
const (() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ())) Either Text Bool
res
{-# INLINE writeValue #-}

-- | Freezes a mutable vector into an immutable one, trimming it to the actual row count.
freezeColumn ::
    VM.IOVector MutableColumn ->
    V.Vector [(Int, T.Text)] ->
    ReadOptions ->
    Int ->
    IO Column
freezeColumn :: IOVector MutableColumn
-> Vector [(Int, Text)] -> ReadOptions -> Int -> IO Column
freezeColumn IOVector MutableColumn
mutableCols Vector [(Int, Text)]
nulls ReadOptions
opts Int
colIndex = do
    MutableColumn
col <- MVector (PrimState IO) MutableColumn -> Int -> IO MutableColumn
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> Int -> m a
VM.unsafeRead IOVector MutableColumn
MVector (PrimState IO) MutableColumn
mutableCols Int
colIndex
    [(Int, Text)] -> MutableColumn -> IO Column
freezeColumn' (Vector [(Int, Text)]
nulls Vector [(Int, Text)] -> Int -> [(Int, Text)]
forall a. Vector a -> Int -> a
V.! Int
colIndex) MutableColumn
col
{-# INLINE freezeColumn #-}

-- ---------------------------------------------------------------------------
-- Streaming scan API
-- ---------------------------------------------------------------------------

{- | Open a CSV/separated file for streaming, returning an open handle
(positioned just after the header line) and the column specification
for the schema columns that appear in the file header.

The caller is responsible for closing the handle when done.
-}
openCsvStream ::
    Char ->
    Schema ->
    FilePath ->
    IO (Handle, [(Int, T.Text, SchemaType)])
openCsvStream :: Char -> Schema -> String -> IO (Handle, [(Int, Text, SchemaType)])
openCsvStream Char
sep Schema
schema String
path = do
    Handle
handle <- String -> IOMode -> IO Handle
openFile String
path IOMode
ReadMode
    Handle -> BufferMode -> IO ()
hSetBuffering Handle
handle (Maybe Int -> BufferMode
BlockBuffering (Int -> Maybe Int
forall a. a -> Maybe a
Just (Int
8 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1024 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1024)))
    Text
headerLine <- Handle -> IO Text
TIO.hGetLine Handle
handle
    let headerCols :: [Text]
headerCols = (Text -> Text) -> [Text] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Char -> Bool) -> Text -> Text
T.filter (Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
/= Char
'"') (Text -> Text) -> (Text -> Text) -> Text -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text
T.strip) (Char -> Text -> [Text]
parseSep Char
sep Text
headerLine)
    let schemaMap :: Map Text SchemaType
schemaMap = Schema -> Map Text SchemaType
elements Schema
schema
    let colSpec :: [(Int, Text, SchemaType)]
colSpec =
            [ (Int
idx, Text
name, SchemaType
stype)
            | (Int
idx, Text
name) <- [Int] -> [Text] -> [(Int, Text)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
0 ..] [Text]
headerCols
            , Just SchemaType
stype <- [Text -> Map Text SchemaType -> Maybe SchemaType
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup Text
name Map Text SchemaType
schemaMap]
            ]
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ([(Int, Text, SchemaType)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(Int, Text, SchemaType)]
colSpec) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
        Handle -> IO ()
hClose Handle
handle
            IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> String -> IO ()
forall a. String -> IO a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail
                (String
"openCsvStream: none of the schema columns appear in the header of " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
path)
    (Handle, [(Int, Text, SchemaType)])
-> IO (Handle, [(Int, Text, SchemaType)])
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Handle
handle, [(Int, Text, SchemaType)]
colSpec)

{- | Read up to @batchSz@ rows from the open handle, returning a batch
'DataFrame' and the unconsumed leftover text.  Returns 'Nothing' when
the handle is at EOF and there is no leftover input.

The caller must pass the leftover returned by the previous call (use @""@
for the first call).
-}
readBatch ::
    Char ->
    [(Int, T.Text, SchemaType)] ->
    Int ->
    BS.ByteString ->
    Handle ->
    IO (Maybe (DataFrame, BS.ByteString))
readBatch :: Char
-> [(Int, Text, SchemaType)]
-> Int
-> ByteString
-> Handle
-> IO (Maybe (DataFrame, ByteString))
readBatch Char
sep [(Int, Text, SchemaType)]
colSpec Int
batchSz ByteString
leftover Handle
handle = do
    let sepByte :: Word8
sepByte = Int -> Word8
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Char -> Int
forall a. Enum a => a -> Int
fromEnum Char
sep) :: Word8
        numCols :: Int
numCols = [(Int, Text, SchemaType)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(Int, Text, SchemaType)]
colSpec
        -- Read in 8 MB chunks; only the partial-line tail is copied on refill.
        chunkSize :: Int
chunkSize = Int
8 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1024 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1024
    IOVector [(Int, Text)]
nullsArr <- Int -> IO (MVector (PrimState IO) [(Int, Text)])
forall (m :: * -> *) a.
PrimMonad m =>
Int -> m (MVector (PrimState m) a)
VM.unsafeNew Int
numCols
    MVector (PrimState IO) [(Int, Text)] -> [(Int, Text)] -> IO ()
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> a -> m ()
VM.set IOVector [(Int, Text)]
MVector (PrimState IO) [(Int, Text)]
nullsArr []
    IOVector MutableColumn
mCols <- Int -> IO (MVector (PrimState IO) MutableColumn)
forall (m :: * -> *) a.
PrimMonad m =>
Int -> m (MVector (PrimState m) a)
VM.unsafeNew Int
numCols
    [(Int, (Int, Text, SchemaType))]
-> ((Int, (Int, Text, SchemaType)) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([Int]
-> [(Int, Text, SchemaType)] -> [(Int, (Int, Text, SchemaType))]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
0 ..] [(Int, Text, SchemaType)]
colSpec) (((Int, (Int, Text, SchemaType)) -> IO ()) -> IO ())
-> ((Int, (Int, Text, SchemaType)) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Int
ci, (Int
_, Text
_, SchemaType
st)) ->
        MVector (PrimState IO) MutableColumn
-> Int -> MutableColumn -> IO ()
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> Int -> a -> m ()
VM.unsafeWrite IOVector MutableColumn
MVector (PrimState IO) MutableColumn
mCols Int
ci (MutableColumn -> IO ()) -> IO MutableColumn -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Int -> SchemaType -> IO MutableColumn
makeCol Int
batchSz SchemaType
st
    -- buf holds unprocessed bytes; refilled on demand when no newline is found.
    IORef ByteString
bufRef <- ByteString -> IO (IORef ByteString)
forall a. a -> IO (IORef a)
newIORef ByteString
leftover
    -- Row-by-row scan. When the buffer has no unquoted newline, fetch another chunk.
    -- The copy on refill is only the partial-line tail (≤ one row ≈ few hundred bytes).
    let loop :: Int -> IO (Int, ByteString)
loop !Int
rowIdx = do
            ByteString
remaining <- IORef ByteString -> IO ByteString
forall a. IORef a -> IO a
readIORef IORef ByteString
bufRef
            if Int
rowIdx Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
batchSz
                then (Int, ByteString) -> IO (Int, ByteString)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
rowIdx, ByteString
remaining)
                else case ByteString -> Maybe Int
findUnquotedNewline ByteString
remaining of
                    Maybe Int
Nothing -> do
                        ByteString
chunk <- Handle -> Int -> IO ByteString
BS.hGet Handle
handle Int
chunkSize
                        if ByteString -> Bool
BS.null ByteString
chunk
                            then (Int, ByteString) -> IO (Int, ByteString)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
rowIdx, ByteString
remaining) -- EOF
                            else IORef ByteString -> ByteString -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef ByteString
bufRef (ByteString
remaining ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
chunk) IO () -> IO (Int, ByteString) -> IO (Int, ByteString)
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Int -> IO (Int, ByteString)
loop Int
rowIdx
                    Just Int
nlIdx -> do
                        let line :: ByteString
line = Int -> ByteString -> ByteString
BS.take Int
nlIdx ByteString
remaining
                            rest' :: ByteString
rest' = Int -> ByteString -> ByteString
BS.drop (Int
nlIdx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) ByteString
remaining
                            line' :: ByteString
line' =
                                if Bool -> Bool
not (ByteString -> Bool
BS.null ByteString
line) Bool -> Bool -> Bool
&& HasCallStack => ByteString -> Word8
ByteString -> Word8
BS.last ByteString
line Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
0x0D
                                    then HasCallStack => ByteString -> ByteString
ByteString -> ByteString
BS.init ByteString
line
                                    else ByteString
line
                        IORef ByteString -> ByteString -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef ByteString
bufRef ByteString
rest'
                        [(Int, (Int, Text, SchemaType))]
-> ((Int, (Int, Text, SchemaType)) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([Int]
-> [(Int, Text, SchemaType)] -> [(Int, (Int, Text, SchemaType))]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
0 ..] [(Int, Text, SchemaType)]
colSpec) (((Int, (Int, Text, SchemaType)) -> IO ()) -> IO ())
-> ((Int, (Int, Text, SchemaType)) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Int
ci, (Int
fi, Text
_, SchemaType
_)) -> do
                            let fieldBs :: ByteString
fieldBs = Word8 -> Int -> ByteString -> ByteString
getNthFieldBs Word8
sepByte Int
fi ByteString
line'
                            MutableColumn
col <- MVector (PrimState IO) MutableColumn -> Int -> IO MutableColumn
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> Int -> m a
VM.unsafeRead IOVector MutableColumn
MVector (PrimState IO) MutableColumn
mCols Int
ci
                            Either Text Bool
res <- Int -> ByteString -> MutableColumn -> IO (Either Text Bool)
writeColumnBs Int
rowIdx ByteString
fieldBs MutableColumn
col
                            case Either Text Bool
res of
                                Left Text
nv -> MVector (PrimState IO) [(Int, Text)]
-> ([(Int, Text)] -> [(Int, Text)]) -> Int -> IO ()
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> (a -> a) -> Int -> m ()
VM.unsafeModify IOVector [(Int, Text)]
MVector (PrimState IO) [(Int, Text)]
nullsArr ((Int
rowIdx, Text
nv) (Int, Text) -> [(Int, Text)] -> [(Int, Text)]
forall a. a -> [a] -> [a]
:) Int
ci
                                Right Bool
_ -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                        Int -> IO (Int, ByteString)
loop (Int
rowIdx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
    (Int
completeRows, ByteString
newLeftover) <- Int -> IO (Int, ByteString)
loop Int
0
    if Int
completeRows Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
        then Maybe (DataFrame, ByteString) -> IO (Maybe (DataFrame, ByteString))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (DataFrame, ByteString)
forall a. Maybe a
Nothing
        else do
            [Int] -> (Int -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Int
0 .. Int
numCols Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1] ((Int -> IO ()) -> IO ()) -> (Int -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
ci -> do
                MutableColumn
col <- MVector (PrimState IO) MutableColumn -> Int -> IO MutableColumn
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> Int -> m a
VM.unsafeRead IOVector MutableColumn
MVector (PrimState IO) MutableColumn
mCols Int
ci
                MVector (PrimState IO) MutableColumn
-> Int -> MutableColumn -> IO ()
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> Int -> a -> m ()
VM.unsafeWrite IOVector MutableColumn
MVector (PrimState IO) MutableColumn
mCols Int
ci (Int -> MutableColumn -> MutableColumn
sliceCol Int
completeRows MutableColumn
col)
            Vector [(Int, Text)]
nullsVec <- MVector (PrimState IO) [(Int, Text)] -> IO (Vector [(Int, Text)])
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> m (Vector a)
V.unsafeFreeze IOVector [(Int, Text)]
MVector (PrimState IO) [(Int, Text)]
nullsArr
            Vector Column
cols <- Int -> (Int -> IO Column) -> IO (Vector Column)
forall (m :: * -> *) a.
Monad m =>
Int -> (Int -> m a) -> m (Vector a)
V.generateM Int
numCols ((Int -> IO Column) -> IO (Vector Column))
-> (Int -> IO Column) -> IO (Vector Column)
forall a b. (a -> b) -> a -> b
$ \Int
ci -> do
                MutableColumn
col <- MVector (PrimState IO) MutableColumn -> Int -> IO MutableColumn
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> Int -> m a
VM.unsafeRead IOVector MutableColumn
MVector (PrimState IO) MutableColumn
mCols Int
ci
                [(Int, Text)] -> MutableColumn -> IO Column
freezeColumn' (Vector [(Int, Text)]
nullsVec Vector [(Int, Text)] -> Int -> [(Int, Text)]
forall a. Vector a -> Int -> a
V.! Int
ci) MutableColumn
col
            let colNames :: [Text]
colNames = [Text
name | (Int
_, Text
name, SchemaType
_) <- [(Int, Text, SchemaType)]
colSpec]
            Maybe (DataFrame, ByteString) -> IO (Maybe (DataFrame, ByteString))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (DataFrame, ByteString)
 -> IO (Maybe (DataFrame, ByteString)))
-> Maybe (DataFrame, ByteString)
-> IO (Maybe (DataFrame, ByteString))
forall a b. (a -> b) -> a -> b
$
                (DataFrame, ByteString) -> Maybe (DataFrame, ByteString)
forall a. a -> Maybe a
Just
                    ( DataFrame
                        { columns :: Vector Column
columns = Vector Column
cols
                        , columnIndices :: Map Text Int
columnIndices = [(Text, Int)] -> Map Text Int
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList ([Text] -> [Int] -> [(Text, Int)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Text]
colNames [Int
0 ..])
                        , dataframeDimensions :: (Int, Int)
dataframeDimensions = (Int
completeRows, Int
numCols)
                        , derivingExpressions :: Map Text UExpr
derivingExpressions = Map Text UExpr
forall k a. Map k a
M.empty
                        }
                    , ByteString
newLeftover
                    )

{- | Write a 'ByteString' field value directly into a mutable column,
parsing numerics without an intermediate 'T.Text' allocation.
-}
writeColumnBs ::
    Int -> BS.ByteString -> MutableColumn -> IO (Either T.Text Bool)
writeColumnBs :: Int -> ByteString -> MutableColumn -> IO (Either Text Bool)
writeColumnBs Int
i ByteString
bs (MBoxedColumn (IOVector a
col :: VM.IOVector a)) =
    case TypeRep a -> TypeRep Text -> Maybe (a :~: Text)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @a) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @T.Text) of
        Just a :~: Text
Refl ->
            let val :: Text
val = ByteString -> Text
TextEncoding.decodeUtf8Lenient ByteString
bs
             in if Text -> Bool
isNullish Text
val
                    then MVector (PrimState IO) a -> Int -> a -> IO ()
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> Int -> a -> m ()
VM.unsafeWrite IOVector a
MVector (PrimState IO) a
col Int
i a
Text
T.empty IO () -> IO (Either Text Bool) -> IO (Either Text Bool)
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Either Text Bool -> IO (Either Text Bool)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Text -> Either Text Bool
forall a b. a -> Either a b
Left Text
val)
                    else MVector (PrimState IO) a -> Int -> a -> IO ()
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> Int -> a -> m ()
VM.unsafeWrite IOVector a
MVector (PrimState IO) a
col Int
i a
Text
val IO () -> IO (Either Text Bool) -> IO (Either Text Bool)
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Either Text Bool -> IO (Either Text Bool)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> Either Text Bool
forall a b. b -> Either a b
Right Bool
True)
        Maybe (a :~: Text)
Nothing -> Either Text Bool -> IO (Either Text Bool)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Text -> Either Text Bool
forall a b. a -> Either a b
Left (ByteString -> Text
TextEncoding.decodeUtf8Lenient ByteString
bs))
writeColumnBs Int
i ByteString
bs (MOptionalColumn (IOVector (Maybe a)
col :: VM.IOVector (Maybe a))) =
    case TypeRep a -> TypeRep Text -> Maybe (a :~: Text)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @a) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @T.Text) of
        Just a :~: Text
Refl ->
            let val :: Text
val = ByteString -> Text
TextEncoding.decodeUtf8Lenient ByteString
bs
             in if Text -> Bool
isNullish Text
val
                    then MVector (PrimState IO) (Maybe a) -> Int -> Maybe a -> IO ()
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> Int -> a -> m ()
VM.unsafeWrite IOVector (Maybe a)
MVector (PrimState IO) (Maybe a)
col Int
i Maybe a
forall a. Maybe a
Nothing IO () -> IO (Either Text Bool) -> IO (Either Text Bool)
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Either Text Bool -> IO (Either Text Bool)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Text -> Either Text Bool
forall a b. a -> Either a b
Left Text
val)
                    else MVector (PrimState IO) (Maybe a) -> Int -> Maybe a -> IO ()
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> Int -> a -> m ()
VM.unsafeWrite IOVector (Maybe a)
MVector (PrimState IO) (Maybe a)
col Int
i (a -> Maybe a
forall a. a -> Maybe a
Just a
Text
val) IO () -> IO (Either Text Bool) -> IO (Either Text Bool)
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Either Text Bool -> IO (Either Text Bool)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> Either Text Bool
forall a b. b -> Either a b
Right Bool
True)
        Maybe (a :~: Text)
Nothing -> Either Text Bool -> IO (Either Text Bool)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Text -> Either Text Bool
forall a b. a -> Either a b
Left (ByteString -> Text
TextEncoding.decodeUtf8Lenient ByteString
bs))
writeColumnBs Int
i ByteString
bs (MUnboxedColumn (IOVector a
col :: VUM.IOVector a)) =
    case TypeRep a -> TypeRep Double -> Maybe (a :~: Double)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @a) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @Double) of
        Just a :~: Double
Refl -> case HasCallStack => ByteString -> Maybe Double
ByteString -> Maybe Double
readByteStringDouble ByteString
bs of
            Just Double
v -> MVector (PrimState IO) a -> Int -> a -> IO ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite IOVector a
MVector (PrimState IO) a
col Int
i a
Double
v IO () -> IO (Either Text Bool) -> IO (Either Text Bool)
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Either Text Bool -> IO (Either Text Bool)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> Either Text Bool
forall a b. b -> Either a b
Right Bool
True)
            Maybe Double
Nothing -> MVector (PrimState IO) a -> Int -> a -> IO ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite IOVector a
MVector (PrimState IO) a
col Int
i a
0 IO () -> IO (Either Text Bool) -> IO (Either Text Bool)
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Either Text Bool -> IO (Either Text Bool)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Text -> Either Text Bool
forall a b. a -> Either a b
Left (ByteString -> Text
TextEncoding.decodeUtf8Lenient ByteString
bs))
        Maybe (a :~: Double)
Nothing -> case TypeRep a -> TypeRep Int -> Maybe (a :~: Int)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @a) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @Int) of
            Just a :~: Int
Refl -> case HasCallStack => ByteString -> Maybe Int
ByteString -> Maybe Int
readByteStringInt ByteString
bs of
                Just Int
v -> MVector (PrimState IO) a -> Int -> a -> IO ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite IOVector a
MVector (PrimState IO) a
col Int
i a
Int
v IO () -> IO (Either Text Bool) -> IO (Either Text Bool)
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Either Text Bool -> IO (Either Text Bool)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> Either Text Bool
forall a b. b -> Either a b
Right Bool
True)
                Maybe Int
Nothing -> MVector (PrimState IO) a -> Int -> a -> IO ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.unsafeWrite IOVector a
MVector (PrimState IO) a
col Int
i a
0 IO () -> IO (Either Text Bool) -> IO (Either Text Bool)
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Either Text Bool -> IO (Either Text Bool)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Text -> Either Text Bool
forall a b. a -> Either a b
Left (ByteString -> Text
TextEncoding.decodeUtf8Lenient ByteString
bs))
            Maybe (a :~: Int)
Nothing -> Either Text Bool -> IO (Either Text Bool)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Text -> Either Text Bool
forall a b. a -> Either a b
Left (ByteString -> Text
TextEncoding.decodeUtf8Lenient ByteString
bs))
{-# INLINE writeColumnBs #-}

{- | Extracts the Nth field (0-indexed), respecting double quotes and stripping them.
Fast path: uses memchr-based 'BS.break' when no quotes are present in the line.
Slow path: quote-aware character-by-character scan.
-}
getNthFieldBs :: Word8 -> Int -> BS.ByteString -> BS.ByteString
getNthFieldBs :: Word8 -> Int -> ByteString -> ByteString
getNthFieldBs Word8
sep Int
targetIdx ByteString
bs
    | Bool -> Bool
not ((Word8 -> Bool) -> ByteString -> Bool
BS.any (Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
0x22) ByteString
bs) = Int -> ByteString -> ByteString
skipFast Int
targetIdx ByteString
bs
    | Bool
otherwise = Int -> Int -> Bool -> Int -> ByteString
go Int
0 Int
0 Bool
False Int
0
  where
    -- Fast path: skip fields using elemIndex (memchr); avoids pair allocation.
    skipFast :: Int -> ByteString -> ByteString
skipFast Int
k ByteString
s =
        case Word8 -> ByteString -> Maybe Int
BS.elemIndex Word8
sep ByteString
s of
            Maybe Int
Nothing -> if Int
k Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 then ByteString
s else ByteString
BS.empty
            Just Int
i ->
                if Int
k Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
                    then Int -> ByteString -> ByteString
BS.take Int
i ByteString
s
                    else Int -> ByteString -> ByteString
skipFast (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) (Int -> ByteString -> ByteString
BS.drop (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) ByteString
s)

    -- Slow path: quote-aware scan.
    quoteChar :: Word8
quoteChar = Word8
0x22 :: Word8
    len :: Int
len = ByteString -> Int
BS.length ByteString
bs
    go :: Int -> Int -> Bool -> Int -> ByteString
go !Int
idx !Int
start !Bool
inQ !Int
pos
        | Int
pos Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
len =
            if Int
idx Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
targetIdx then Int -> Int -> ByteString
extract Int
start Int
pos else ByteString
BS.empty
        | Bool
otherwise =
            let c :: Word8
c = HasCallStack => ByteString -> Int -> Word8
ByteString -> Int -> Word8
BS.index ByteString
bs Int
pos
             in if Word8
c Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
quoteChar
                    then Int -> Int -> Bool -> Int -> ByteString
go Int
idx Int
start (Bool -> Bool
not Bool
inQ) (Int
pos Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                    else
                        if Word8
c Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
sep Bool -> Bool -> Bool
&& Bool -> Bool
not Bool
inQ
                            then
                                if Int
idx Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
targetIdx
                                    then Int -> Int -> ByteString
extract Int
start Int
pos
                                    else Int -> Int -> Bool -> Int -> ByteString
go (Int
idx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) (Int
pos Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Bool
False (Int
pos Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                            else Int -> Int -> Bool -> Int -> ByteString
go Int
idx Int
start Bool
inQ (Int
pos Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)

    extract :: Int -> Int -> ByteString
extract Int
s Int
e =
        let field :: ByteString
field = Int -> ByteString -> ByteString
BS.take (Int
e Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
s) (Int -> ByteString -> ByteString
BS.drop Int
s ByteString
bs)
         in if ByteString -> Int
BS.length ByteString
field Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
2
                Bool -> Bool -> Bool
&& HasCallStack => ByteString -> Word8
ByteString -> Word8
BS.head ByteString
field Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
quoteChar
                Bool -> Bool -> Bool
&& HasCallStack => ByteString -> Word8
ByteString -> Word8
BS.last ByteString
field Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
quoteChar
                then HasCallStack => ByteString -> ByteString
ByteString -> ByteString
BS.init (HasCallStack => ByteString -> ByteString
ByteString -> ByteString
BS.tail ByteString
field)
                else ByteString
field
{-# INLINE getNthFieldBs #-}

-- | Allocate a fresh 'MutableColumn' for @n@ slots based on a 'SchemaType'.
makeCol :: Int -> SchemaType -> IO MutableColumn
makeCol :: Int -> SchemaType -> IO MutableColumn
makeCol Int
n (SType (Proxy a
_ :: P.Proxy a)) =
    case TypeRep a -> TypeRep Int -> Maybe (a :~: Int)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @a) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @Int) of
        Just a :~: Int
Refl -> IOVector Int -> MutableColumn
forall a. (Columnable a, Unbox a) => IOVector a -> MutableColumn
MUnboxedColumn (IOVector Int -> MutableColumn)
-> IO (IOVector Int) -> IO MutableColumn
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Int -> IO (MVector (PrimState IO) Int)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
Int -> m (MVector (PrimState m) a)
VUM.unsafeNew Int
n :: IO (VUM.IOVector Int))
        Maybe (a :~: Int)
Nothing -> case TypeRep a -> TypeRep Double -> Maybe (a :~: Double)
forall a b. TypeRep a -> TypeRep b -> Maybe (a :~: b)
forall {k} (f :: k -> *) (a :: k) (b :: k).
TestEquality f =>
f a -> f b -> Maybe (a :~: b)
testEquality (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @a) (forall a. Typeable a => TypeRep a
forall {k} (a :: k). Typeable a => TypeRep a
typeRep @Double) of
            Just a :~: Double
Refl -> IOVector Double -> MutableColumn
forall a. (Columnable a, Unbox a) => IOVector a -> MutableColumn
MUnboxedColumn (IOVector Double -> MutableColumn)
-> IO (IOVector Double) -> IO MutableColumn
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Int -> IO (MVector (PrimState IO) Double)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
Int -> m (MVector (PrimState m) a)
VUM.unsafeNew Int
n :: IO (VUM.IOVector Double))
            Maybe (a :~: Double)
Nothing -> IOVector Text -> MutableColumn
forall a. Columnable a => IOVector a -> MutableColumn
MBoxedColumn (IOVector Text -> MutableColumn)
-> IO (IOVector Text) -> IO MutableColumn
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Int -> IO (MVector (PrimState IO) Text)
forall (m :: * -> *) a.
PrimMonad m =>
Int -> m (MVector (PrimState m) a)
VM.unsafeNew Int
n :: IO (VM.IOVector T.Text))

-- | Slice a 'MutableColumn' to @n@ elements (no-copy view).
sliceCol :: Int -> MutableColumn -> MutableColumn
sliceCol :: Int -> MutableColumn -> MutableColumn
sliceCol Int
n (MBoxedColumn IOVector a
col) = IOVector a -> MutableColumn
forall a. Columnable a => IOVector a -> MutableColumn
MBoxedColumn (Int -> IOVector a -> IOVector a
forall s a. Int -> MVector s a -> MVector s a
VM.take Int
n IOVector a
col)
sliceCol Int
n (MUnboxedColumn IOVector a
col) = IOVector a -> MutableColumn
forall a. (Columnable a, Unbox a) => IOVector a -> MutableColumn
MUnboxedColumn (Int -> IOVector a -> IOVector a
forall a s. Unbox a => Int -> MVector s a -> MVector s a
VUM.take Int
n IOVector a
col)
sliceCol Int
n (MOptionalColumn IOVector (Maybe a)
col) = IOVector (Maybe a) -> MutableColumn
forall a. Columnable a => IOVector (Maybe a) -> MutableColumn
MOptionalColumn (Int -> IOVector (Maybe a) -> IOVector (Maybe a)
forall s a. Int -> MVector s a -> MVector s a
VM.take Int
n IOVector (Maybe a)
col)

{- | Finds the index of the next unquoted newline (0x0A).
Fast path: uses memchr (SIMD) and falls back to a quote-aware linear scan
only if a double-quote appears before the candidate newline.
-}
findUnquotedNewline :: BS.ByteString -> Maybe Int
findUnquotedNewline :: ByteString -> Maybe Int
findUnquotedNewline ByteString
bs =
    case Word8 -> ByteString -> Maybe Int
BS.elemIndex Word8
0x0A ByteString
bs of
        Maybe Int
Nothing -> Maybe Int
forall a. Maybe a
Nothing
        Just Int
nlPos
            -- No quote before the newline → safe to use this position.
            -- Check with elemIndex to avoid allocating a ByteString slice.
            | Bool -> (Int -> Bool) -> Maybe Int -> Bool
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Bool
True (Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
nlPos) (Word8 -> ByteString -> Maybe Int
BS.elemIndex Word8
0x22 ByteString
bs) -> Int -> Maybe Int
forall a. a -> Maybe a
Just Int
nlPos
            -- Quote present → may be a newline inside a quoted field; scan carefully.
            | Bool
otherwise -> Int -> Bool -> Maybe Int
slowScan Int
0 Bool
False
  where
    len :: Int
len = ByteString -> Int
BS.length ByteString
bs
    slowScan :: Int -> Bool -> Maybe Int
slowScan !Int
pos !Bool
inQ
        | Int
pos Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
len = Maybe Int
forall a. Maybe a
Nothing
        | Bool
otherwise =
            let c :: Word8
c = HasCallStack => ByteString -> Int -> Word8
ByteString -> Int -> Word8
BS.index ByteString
bs Int
pos
             in if Word8
c Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
0x22
                    then Int -> Bool -> Maybe Int
slowScan (Int
pos Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) (Bool -> Bool
not Bool
inQ)
                    else
                        if Word8
c Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
0x0A Bool -> Bool -> Bool
&& Bool -> Bool
not Bool
inQ
                            then Int -> Maybe Int
forall a. a -> Maybe a
Just Int
pos
                            else Int -> Bool -> Maybe Int
slowScan (Int
pos Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Bool
inQ
{-# INLINE findUnquotedNewline #-}