-- | Client handlers
--
-- This is an internal module only; see "Network.GRPC.Client.StreamType.IO"
-- for the main public module.
module Network.GRPC.Client.StreamType (
    -- * Handler type
    ClientHandler' -- opaque
  , ClientHandler
    -- * Run client handlers (part of the public API)
  , nonStreaming
  , clientStreaming
  , clientStreaming_
  , serverStreaming
  , biDiStreaming
    -- * Obtain handler for a specific type
  , CanCallRPC(..)
  , rpc
  , rpcWith
  ) where

import Control.Monad.Catch
import Control.Monad.IO.Class
import Control.Monad.Reader
import Data.Proxy

import Network.GRPC.Client.Call
import Network.GRPC.Client.Connection
import Network.GRPC.Common
import Network.GRPC.Common.NextElem qualified as NextElem
import Network.GRPC.Common.StreamType
import Network.GRPC.Spec

{------------------------------------------------------------------------------
  Constructing client handlers (used internally only)
-------------------------------------------------------------------------------}

mkNonStreaming :: forall rpc m.
     SupportsStreamingType rpc NonStreaming
  => (    Input rpc
       -> m (Output rpc)
     )
  -> ClientHandler' NonStreaming m rpc
mkNonStreaming :: forall {k} (rpc :: k) (m :: * -> *).
SupportsStreamingType rpc 'NonStreaming =>
(Input rpc -> m (Output rpc)) -> ClientHandler' 'NonStreaming m rpc
mkNonStreaming Input rpc -> m (Output rpc)
h = Handler 'Client 'NonStreaming m rpc
-> ClientHandler' 'NonStreaming m rpc
forall {k} (rpc :: k) (s :: StreamingType) (m :: * -> *).
SupportsStreamingType rpc s =>
Handler 'Client s m rpc -> ClientHandler' s m rpc
ClientHandler (Handler 'Client 'NonStreaming m rpc
 -> ClientHandler' 'NonStreaming m rpc)
-> Handler 'Client 'NonStreaming m rpc
-> ClientHandler' 'NonStreaming m rpc
forall a b. (a -> b) -> a -> b
$
    Handler 'Client 'NonStreaming m rpc
Input rpc -> m (Output rpc)
h

mkClientStreaming :: forall rpc m.
     SupportsStreamingType rpc ClientStreaming
  => ( forall r.
          (    (NextElem (Input rpc) -> IO ())
             -> m r
          )
       -> m (Output rpc, r)
     )
  -> ClientHandler' ClientStreaming m rpc
mkClientStreaming :: forall {k} (rpc :: k) (m :: * -> *).
SupportsStreamingType rpc 'ClientStreaming =>
(forall r.
 ((NextElem (Input rpc) -> IO ()) -> m r) -> m (Output rpc, r))
-> ClientHandler' 'ClientStreaming m rpc
mkClientStreaming forall r.
((NextElem (Input rpc) -> IO ()) -> m r) -> m (Output rpc, r)
h = Handler 'Client 'ClientStreaming m rpc
-> ClientHandler' 'ClientStreaming m rpc
forall {k} (rpc :: k) (s :: StreamingType) (m :: * -> *).
SupportsStreamingType rpc s =>
Handler 'Client s m rpc -> ClientHandler' s m rpc
ClientHandler (Handler 'Client 'ClientStreaming m rpc
 -> ClientHandler' 'ClientStreaming m rpc)
-> Handler 'Client 'ClientStreaming m rpc
-> ClientHandler' 'ClientStreaming m rpc
forall a b. (a -> b) -> a -> b
$
    (forall r.
 ((NextElem (Input rpc) -> IO ()) -> m r) -> m (Output rpc, r))
-> Negative m (NextElem (Input rpc) -> IO ()) (Output rpc)
forall (m :: * -> *) a b.
(forall r. (a -> m r) -> m (b, r)) -> Negative m a b
Negative ((NextElem (Input rpc) -> IO ()) -> m r) -> m (Output rpc, r)
forall r.
((NextElem (Input rpc) -> IO ()) -> m r) -> m (Output rpc, r)
h

mkServerStreaming :: forall rpc m.
     (SupportsStreamingType rpc ServerStreaming, Functor m)
  => ( forall r.
            Input rpc
         -> (    IO (NextElem (Output rpc))
              -> m r
            )
         -> m r
     )
  -> ClientHandler' ServerStreaming m rpc
mkServerStreaming :: forall {k} (rpc :: k) (m :: * -> *).
(SupportsStreamingType rpc 'ServerStreaming, Functor m) =>
(forall r. Input rpc -> (IO (NextElem (Output rpc)) -> m r) -> m r)
-> ClientHandler' 'ServerStreaming m rpc
mkServerStreaming forall r. Input rpc -> (IO (NextElem (Output rpc)) -> m r) -> m r
h = Handler 'Client 'ServerStreaming m rpc
-> ClientHandler' 'ServerStreaming m rpc
forall {k} (rpc :: k) (s :: StreamingType) (m :: * -> *).
SupportsStreamingType rpc s =>
Handler 'Client s m rpc -> ClientHandler' s m rpc
ClientHandler (Handler 'Client 'ServerStreaming m rpc
 -> ClientHandler' 'ServerStreaming m rpc)
-> Handler 'Client 'ServerStreaming m rpc
-> ClientHandler' 'ServerStreaming m rpc
forall a b. (a -> b) -> a -> b
$ \Input rpc
input ->
    (forall r. (IO (NextElem (Output rpc)) -> m r) -> m ((), r))
-> Negative m (IO (NextElem (Output rpc))) ()
forall (m :: * -> *) a b.
(forall r. (a -> m r) -> m (b, r)) -> Negative m a b
Negative ((forall r. (IO (NextElem (Output rpc)) -> m r) -> m ((), r))
 -> Negative m (IO (NextElem (Output rpc))) ())
-> (forall r. (IO (NextElem (Output rpc)) -> m r) -> m ((), r))
-> Negative m (IO (NextElem (Output rpc))) ()
forall a b. (a -> b) -> a -> b
$ \IO (NextElem (Output rpc)) -> m r
k -> ((),) (r -> ((), r)) -> m r -> m ((), r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Input rpc -> (IO (NextElem (Output rpc)) -> m r) -> m r
forall r. Input rpc -> (IO (NextElem (Output rpc)) -> m r) -> m r
h Input rpc
input IO (NextElem (Output rpc)) -> m r
k

mkBiDiStreaming :: forall rpc m.
     (SupportsStreamingType rpc BiDiStreaming, Functor m)
  => ( forall r.
           (    (NextElem (Input rpc) -> IO ())
             -> IO (NextElem (Output rpc))
             -> m r
           )
        -> m r
     )
  -> ClientHandler' BiDiStreaming m rpc
mkBiDiStreaming :: forall {k} (rpc :: k) (m :: * -> *).
(SupportsStreamingType rpc 'BiDiStreaming, Functor m) =>
(forall r.
 ((NextElem (Input rpc) -> IO ())
  -> IO (NextElem (Output rpc)) -> m r)
 -> m r)
-> ClientHandler' 'BiDiStreaming m rpc
mkBiDiStreaming forall r.
((NextElem (Input rpc) -> IO ())
 -> IO (NextElem (Output rpc)) -> m r)
-> m r
h = Handler 'Client 'BiDiStreaming m rpc
-> ClientHandler' 'BiDiStreaming m rpc
forall {k} (rpc :: k) (s :: StreamingType) (m :: * -> *).
SupportsStreamingType rpc s =>
Handler 'Client s m rpc -> ClientHandler' s m rpc
ClientHandler (Handler 'Client 'BiDiStreaming m rpc
 -> ClientHandler' 'BiDiStreaming m rpc)
-> Handler 'Client 'BiDiStreaming m rpc
-> ClientHandler' 'BiDiStreaming m rpc
forall a b. (a -> b) -> a -> b
$
    (forall r.
 ((NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc)))
  -> m r)
 -> m ((), r))
-> Negative
     m (NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc))) ()
forall (m :: * -> *) a b.
(forall r. (a -> m r) -> m (b, r)) -> Negative m a b
Negative ((forall r.
  ((NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc)))
   -> m r)
  -> m ((), r))
 -> Negative
      m (NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc))) ())
-> (forall r.
    ((NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc)))
     -> m r)
    -> m ((), r))
-> Negative
     m (NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc))) ()
forall a b. (a -> b) -> a -> b
$ \(NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc))) -> m r
k -> (r -> ((), r)) -> m r -> m ((), r)
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((),) (m r -> m ((), r)) -> m r -> m ((), r)
forall a b. (a -> b) -> a -> b
$ ((NextElem (Input rpc) -> IO ())
 -> IO (NextElem (Output rpc)) -> m r)
-> m r
forall r.
((NextElem (Input rpc) -> IO ())
 -> IO (NextElem (Output rpc)) -> m r)
-> m r
h (((NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc)))
 -> m r)
-> (NextElem (Input rpc) -> IO ())
-> IO (NextElem (Output rpc))
-> m r
forall a b c. ((a, b) -> c) -> a -> b -> c
curry (NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc))) -> m r
k)

{-------------------------------------------------------------------------------
  Running client handlers
-------------------------------------------------------------------------------}

-- | Execute non-streaming handler in any monad stack
nonStreaming :: forall rpc m.
     ClientHandler' NonStreaming m rpc
  -> Input rpc
  -> m (Output rpc)
nonStreaming :: forall {k} (rpc :: k) (m :: * -> *).
ClientHandler' 'NonStreaming m rpc -> Input rpc -> m (Output rpc)
nonStreaming (ClientHandler Handler 'Client 'NonStreaming m rpc
h) = Handler 'Client 'NonStreaming m rpc
Input rpc -> m (Output rpc)
h

-- | Generalization of 'clientStreaming_' with an additional result
clientStreaming :: forall rpc m r.
     ClientHandler' ClientStreaming m rpc
  -> (  (NextElem (Input rpc) -> IO ())
       -> m r
     )
  -> m (Output rpc, r)
clientStreaming :: forall {k} (rpc :: k) (m :: * -> *) r.
ClientHandler' 'ClientStreaming m rpc
-> ((NextElem (Input rpc) -> IO ()) -> m r) -> m (Output rpc, r)
clientStreaming (ClientHandler Handler 'Client 'ClientStreaming m rpc
h) (NextElem (Input rpc) -> IO ()) -> m r
k = Negative m (NextElem (Input rpc) -> IO ()) (Output rpc)
-> forall r.
   ((NextElem (Input rpc) -> IO ()) -> m r) -> m (Output rpc, r)
forall (m :: * -> *) a b.
Negative m a b -> forall r. (a -> m r) -> m (b, r)
runNegative Handler 'Client 'ClientStreaming m rpc
Negative m (NextElem (Input rpc) -> IO ()) (Output rpc)
h (NextElem (Input rpc) -> IO ()) -> m r
k

-- | Execute client-side streaming handler in any monad stack
clientStreaming_ :: forall rpc m.
     Functor m
  => ClientHandler' ClientStreaming m rpc
  -> (    (NextElem (Input rpc) -> IO ())
       -> m ()
     )
  -> m (Output rpc)
clientStreaming_ :: forall {k} (rpc :: k) (m :: * -> *).
Functor m =>
ClientHandler' 'ClientStreaming m rpc
-> ((NextElem (Input rpc) -> IO ()) -> m ()) -> m (Output rpc)
clientStreaming_ ClientHandler' 'ClientStreaming m rpc
h (NextElem (Input rpc) -> IO ()) -> m ()
k = (Output rpc, ()) -> Output rpc
forall a b. (a, b) -> a
fst ((Output rpc, ()) -> Output rpc)
-> m (Output rpc, ()) -> m (Output rpc)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ClientHandler' 'ClientStreaming m rpc
-> ((NextElem (Input rpc) -> IO ()) -> m ()) -> m (Output rpc, ())
forall {k} (rpc :: k) (m :: * -> *) r.
ClientHandler' 'ClientStreaming m rpc
-> ((NextElem (Input rpc) -> IO ()) -> m r) -> m (Output rpc, r)
clientStreaming ClientHandler' 'ClientStreaming m rpc
h (NextElem (Input rpc) -> IO ()) -> m ()
k

-- | Execute server-side streaming handler in any monad stack
serverStreaming :: forall rpc m r.
     Functor m
  => ClientHandler' ServerStreaming m rpc
  -> Input rpc
  -> (    IO (NextElem (Output rpc))
       -> m r
     )
  -> m r
serverStreaming :: forall {k} (rpc :: k) (m :: * -> *) r.
Functor m =>
ClientHandler' 'ServerStreaming m rpc
-> Input rpc -> (IO (NextElem (Output rpc)) -> m r) -> m r
serverStreaming (ClientHandler Handler 'Client 'ServerStreaming m rpc
h) Input rpc
input IO (NextElem (Output rpc)) -> m r
k = ((), r) -> r
forall a b. (a, b) -> b
snd (((), r) -> r) -> m ((), r) -> m r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Negative m (IO (NextElem (Output rpc))) ()
-> forall r. (IO (NextElem (Output rpc)) -> m r) -> m ((), r)
forall (m :: * -> *) a b.
Negative m a b -> forall r. (a -> m r) -> m (b, r)
runNegative (Handler 'Client 'ServerStreaming m rpc
Input rpc -> Negative m (IO (NextElem (Output rpc))) ()
h Input rpc
input) IO (NextElem (Output rpc)) -> m r
k

-- | Execute bidirectional streaming handler in any monad stack
biDiStreaming :: forall rpc m r.
     Functor m
  => ClientHandler' BiDiStreaming m rpc
  -> (    (NextElem (Input rpc) -> IO ())
       -> IO (NextElem (Output rpc))
       -> m r
     )
  -> m r
biDiStreaming :: forall {k} (rpc :: k) (m :: * -> *) r.
Functor m =>
ClientHandler' 'BiDiStreaming m rpc
-> ((NextElem (Input rpc) -> IO ())
    -> IO (NextElem (Output rpc)) -> m r)
-> m r
biDiStreaming (ClientHandler Handler 'Client 'BiDiStreaming m rpc
h) (NextElem (Input rpc) -> IO ())
-> IO (NextElem (Output rpc)) -> m r
k = (((), r) -> r) -> m ((), r) -> m r
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((), r) -> r
forall a b. (a, b) -> b
snd (m ((), r) -> m r) -> m ((), r) -> m r
forall a b. (a -> b) -> a -> b
$ Negative
  m (NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc))) ()
-> forall r.
   ((NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc)))
    -> m r)
   -> m ((), r)
forall (m :: * -> *) a b.
Negative m a b -> forall r. (a -> m r) -> m (b, r)
runNegative Handler 'Client 'BiDiStreaming m rpc
Negative
  m (NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc))) ()
h (((NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc)))
  -> m r)
 -> m ((), r))
-> ((NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc)))
    -> m r)
-> m ((), r)
forall a b. (a -> b) -> a -> b
$ ((NextElem (Input rpc) -> IO ())
 -> IO (NextElem (Output rpc)) -> m r)
-> (NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc)))
-> m r
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry (NextElem (Input rpc) -> IO ())
-> IO (NextElem (Output rpc)) -> m r
k

{-------------------------------------------------------------------------------
  CanCallRPC
-------------------------------------------------------------------------------}

-- | Monads in which we make RPC calls
--
-- In order to be able to make an RPC call, we need
--
-- * 'MonadIO' (obviously)
-- * 'MonadMask' in order to ensure that the RPC call is terminated cleanly
-- * Access to the 'Connection' to the server
class (MonadIO m, MonadMask m) => CanCallRPC m where
  getConnection :: m Connection

instance (MonadIO m, MonadMask m) => CanCallRPC (ReaderT Connection m) where
  getConnection :: ReaderT Connection m Connection
getConnection = ReaderT Connection m Connection
forall r (m :: * -> *). MonadReader r m => m r
ask

{-------------------------------------------------------------------------------
  Obtain handler for specific RPC call
-------------------------------------------------------------------------------}

class MkStreamingHandler (styp :: StreamingType) where
  mkStreamingHandler ::
       ( CanCallRPC m
       , SupportsClientRpc rpc
       , SupportsStreamingType rpc styp
       )
    => CallParams rpc -> ClientHandler' styp m rpc

-- | Construct RPC handler
--
-- This has an ambiguous type, and is intended to be called using a type
-- application indicating the @rpc@ method to call, such as
--
-- > rpc @Ping
--
-- provided that @Ping@ is some type with an 'IsRPC' instance. In some cases
-- it may also be needed to provide a streaming type:
--
-- > rpc @Ping @NonStreaming
--
-- though in most cases the streaming type should be clear from the context or
-- from the choice of @rpc@.
--
-- See 'Network.GRPC.Client.StreamType.IO.nonStreaming' and co for examples.
-- See also 'rpcWith'.
rpc :: forall rpc styp m.
     ( CanCallRPC m
     , SupportsClientRpc rpc
     , SupportsStreamingType rpc styp
     , Default (RequestMetadata rpc)
     )
  => ClientHandler' styp m rpc
rpc :: forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc = CallParams rpc -> ClientHandler' styp m rpc
forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp) =>
CallParams rpc -> ClientHandler' styp m rpc
rpcWith CallParams rpc
forall a. Default a => a
def

-- | Generalization of 'rpc' with custom 'CallParams'
rpcWith :: forall rpc styp m.
     ( CanCallRPC m
     , SupportsClientRpc rpc
     , SupportsStreamingType rpc styp
     )
  => CallParams rpc -> ClientHandler' styp m rpc
rpcWith :: forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp) =>
CallParams rpc -> ClientHandler' styp m rpc
rpcWith =
    case Proxy styp -> SStreamingType styp
forall (styp :: StreamingType).
ValidStreamingType styp =>
Proxy styp -> SStreamingType styp
validStreamingType (forall {k} (t :: k). Proxy t
forall (t :: StreamingType). Proxy t
Proxy @styp) of
      SStreamingType styp
SNonStreaming    -> CallParams rpc -> ClientHandler' styp m rpc
forall {k} (m :: * -> *) (rpc :: k).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp) =>
CallParams rpc -> ClientHandler' styp m rpc
forall (styp :: StreamingType) {k} (m :: * -> *) (rpc :: k).
(MkStreamingHandler styp, CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp) =>
CallParams rpc -> ClientHandler' styp m rpc
mkStreamingHandler
      SStreamingType styp
SClientStreaming -> CallParams rpc -> ClientHandler' styp m rpc
forall {k} (m :: * -> *) (rpc :: k).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp) =>
CallParams rpc -> ClientHandler' styp m rpc
forall (styp :: StreamingType) {k} (m :: * -> *) (rpc :: k).
(MkStreamingHandler styp, CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp) =>
CallParams rpc -> ClientHandler' styp m rpc
mkStreamingHandler
      SStreamingType styp
SServerStreaming -> CallParams rpc -> ClientHandler' styp m rpc
forall {k} (m :: * -> *) (rpc :: k).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp) =>
CallParams rpc -> ClientHandler' styp m rpc
forall (styp :: StreamingType) {k} (m :: * -> *) (rpc :: k).
(MkStreamingHandler styp, CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp) =>
CallParams rpc -> ClientHandler' styp m rpc
mkStreamingHandler
      SStreamingType styp
SBiDiStreaming   -> CallParams rpc -> ClientHandler' styp m rpc
forall {k} (m :: * -> *) (rpc :: k).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp) =>
CallParams rpc -> ClientHandler' styp m rpc
forall (styp :: StreamingType) {k} (m :: * -> *) (rpc :: k).
(MkStreamingHandler styp, CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp) =>
CallParams rpc -> ClientHandler' styp m rpc
mkStreamingHandler

instance MkStreamingHandler NonStreaming where
  mkStreamingHandler :: forall rpc m.
       ( CanCallRPC m
       , SupportsClientRpc rpc
       , SupportsStreamingType rpc NonStreaming
       )
    => CallParams rpc -> ClientHandler' NonStreaming m rpc
  mkStreamingHandler :: forall {k} (rpc :: k) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc 'NonStreaming) =>
CallParams rpc -> ClientHandler' 'NonStreaming m rpc
mkStreamingHandler CallParams rpc
params = (Input rpc -> m (Output rpc)) -> ClientHandler' 'NonStreaming m rpc
forall {k} (rpc :: k) (m :: * -> *).
SupportsStreamingType rpc 'NonStreaming =>
(Input rpc -> m (Output rpc)) -> ClientHandler' 'NonStreaming m rpc
mkNonStreaming ((Input rpc -> m (Output rpc))
 -> ClientHandler' 'NonStreaming m rpc)
-> (Input rpc -> m (Output rpc))
-> ClientHandler' 'NonStreaming m rpc
forall a b. (a -> b) -> a -> b
$ \Input rpc
input -> do
      conn <- m Connection
forall (m :: * -> *). CanCallRPC m => m Connection
getConnection
      withRPC conn params (Proxy @rpc) $ \Call rpc
call -> do
        Call rpc -> Input rpc -> m ()
forall {k} (m :: * -> *) (rpc :: k).
MonadIO m =>
Call rpc -> Input rpc -> m ()
sendFinalInput Call rpc
call Input rpc
input
        (output, _trailers) <- Call rpc -> m (Output rpc, ResponseTrailingMetadata rpc)
forall {k} (rpc :: k) (m :: * -> *).
(MonadIO m, HasCallStack) =>
Call rpc -> m (Output rpc, ResponseTrailingMetadata rpc)
recvFinalOutput Call rpc
call
        return output

instance MkStreamingHandler ClientStreaming where
  mkStreamingHandler :: forall rpc m.
       ( CanCallRPC m
       , SupportsClientRpc rpc
       , SupportsStreamingType rpc ClientStreaming
       )
    => CallParams rpc -> ClientHandler' ClientStreaming m rpc
  mkStreamingHandler :: forall {k} (rpc :: k) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc 'ClientStreaming) =>
CallParams rpc -> ClientHandler' 'ClientStreaming m rpc
mkStreamingHandler CallParams rpc
params = (forall r.
 ((NextElem (Input rpc) -> IO ()) -> m r) -> m (Output rpc, r))
-> ClientHandler' 'ClientStreaming m rpc
forall {k} (rpc :: k) (m :: * -> *).
SupportsStreamingType rpc 'ClientStreaming =>
(forall r.
 ((NextElem (Input rpc) -> IO ()) -> m r) -> m (Output rpc, r))
-> ClientHandler' 'ClientStreaming m rpc
mkClientStreaming ((forall r.
  ((NextElem (Input rpc) -> IO ()) -> m r) -> m (Output rpc, r))
 -> ClientHandler' 'ClientStreaming m rpc)
-> (forall r.
    ((NextElem (Input rpc) -> IO ()) -> m r) -> m (Output rpc, r))
-> ClientHandler' 'ClientStreaming m rpc
forall a b. (a -> b) -> a -> b
$ \(NextElem (Input rpc) -> IO ()) -> m r
k -> do
      conn <- m Connection
forall (m :: * -> *). CanCallRPC m => m Connection
getConnection
      withRPC conn params (Proxy @rpc) $ \Call rpc
call -> do
        r <- (NextElem (Input rpc) -> IO ()) -> m r
k (Call rpc -> StreamElem NoMetadata (Input rpc) -> IO ()
forall {k} (m :: * -> *) (rpc :: k).
(HasCallStack, MonadIO m) =>
Call rpc -> StreamElem NoMetadata (Input rpc) -> m ()
sendInput Call rpc
call (StreamElem NoMetadata (Input rpc) -> IO ())
-> (NextElem (Input rpc) -> StreamElem NoMetadata (Input rpc))
-> NextElem (Input rpc)
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NextElem (Input rpc) -> StreamElem NoMetadata (Input rpc)
forall inp. NextElem inp -> StreamElem NoMetadata inp
fromNextElem)
        (output, _trailers) <- recvFinalOutput call
        return (output, r)

instance MkStreamingHandler ServerStreaming where
  mkStreamingHandler :: forall rpc m.
       ( CanCallRPC m
       , SupportsClientRpc rpc
       , SupportsStreamingType rpc ServerStreaming
       )
    => CallParams rpc -> ClientHandler' ServerStreaming m rpc
  mkStreamingHandler :: forall {k} (rpc :: k) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc 'ServerStreaming) =>
CallParams rpc -> ClientHandler' 'ServerStreaming m rpc
mkStreamingHandler CallParams rpc
params = (forall r. Input rpc -> (IO (NextElem (Output rpc)) -> m r) -> m r)
-> ClientHandler' 'ServerStreaming m rpc
forall {k} (rpc :: k) (m :: * -> *).
(SupportsStreamingType rpc 'ServerStreaming, Functor m) =>
(forall r. Input rpc -> (IO (NextElem (Output rpc)) -> m r) -> m r)
-> ClientHandler' 'ServerStreaming m rpc
mkServerStreaming ((forall r.
  Input rpc -> (IO (NextElem (Output rpc)) -> m r) -> m r)
 -> ClientHandler' 'ServerStreaming m rpc)
-> (forall r.
    Input rpc -> (IO (NextElem (Output rpc)) -> m r) -> m r)
-> ClientHandler' 'ServerStreaming m rpc
forall a b. (a -> b) -> a -> b
$ \Input rpc
input IO (NextElem (Output rpc)) -> m r
k -> do
      conn <- m Connection
forall (m :: * -> *). CanCallRPC m => m Connection
getConnection
      withRPC conn params (Proxy @rpc) $ \Call rpc
call -> do
        Call rpc -> Input rpc -> m ()
forall {k} (m :: * -> *) (rpc :: k).
MonadIO m =>
Call rpc -> Input rpc -> m ()
sendFinalInput Call rpc
call Input rpc
input
        IO (NextElem (Output rpc)) -> m r
k (Call rpc -> IO (NextElem (Output rpc))
forall {k} (m :: * -> *) (rpc :: k).
(MonadIO m, HasCallStack) =>
Call rpc -> m (NextElem (Output rpc))
recvNextOutputElem Call rpc
call)

instance MkStreamingHandler BiDiStreaming where
  mkStreamingHandler :: forall rpc m.
       ( CanCallRPC m
       , SupportsClientRpc rpc
       , SupportsStreamingType rpc BiDiStreaming
       )
    => CallParams rpc -> ClientHandler' BiDiStreaming m rpc
  mkStreamingHandler :: forall {k} (rpc :: k) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc 'BiDiStreaming) =>
CallParams rpc -> ClientHandler' 'BiDiStreaming m rpc
mkStreamingHandler CallParams rpc
params = (forall r.
 ((NextElem (Input rpc) -> IO ())
  -> IO (NextElem (Output rpc)) -> m r)
 -> m r)
-> ClientHandler' 'BiDiStreaming m rpc
forall {k} (rpc :: k) (m :: * -> *).
(SupportsStreamingType rpc 'BiDiStreaming, Functor m) =>
(forall r.
 ((NextElem (Input rpc) -> IO ())
  -> IO (NextElem (Output rpc)) -> m r)
 -> m r)
-> ClientHandler' 'BiDiStreaming m rpc
mkBiDiStreaming ((forall r.
  ((NextElem (Input rpc) -> IO ())
   -> IO (NextElem (Output rpc)) -> m r)
  -> m r)
 -> ClientHandler' 'BiDiStreaming m rpc)
-> (forall r.
    ((NextElem (Input rpc) -> IO ())
     -> IO (NextElem (Output rpc)) -> m r)
    -> m r)
-> ClientHandler' 'BiDiStreaming m rpc
forall a b. (a -> b) -> a -> b
$ \(NextElem (Input rpc) -> IO ())
-> IO (NextElem (Output rpc)) -> m r
k -> do
      conn <- m Connection
forall (m :: * -> *). CanCallRPC m => m Connection
getConnection
      withRPC conn params (Proxy @rpc) $ \Call rpc
call ->
        (NextElem (Input rpc) -> IO ())
-> IO (NextElem (Output rpc)) -> m r
k (Call rpc -> StreamElem NoMetadata (Input rpc) -> IO ()
forall {k} (m :: * -> *) (rpc :: k).
(HasCallStack, MonadIO m) =>
Call rpc -> StreamElem NoMetadata (Input rpc) -> m ()
sendInput Call rpc
call (StreamElem NoMetadata (Input rpc) -> IO ())
-> (NextElem (Input rpc) -> StreamElem NoMetadata (Input rpc))
-> NextElem (Input rpc)
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NextElem (Input rpc) -> StreamElem NoMetadata (Input rpc)
forall inp. NextElem inp -> StreamElem NoMetadata inp
fromNextElem)
          (Call rpc -> IO (NextElem (Output rpc))
forall {k} (m :: * -> *) (rpc :: k).
(MonadIO m, HasCallStack) =>
Call rpc -> m (NextElem (Output rpc))
recvNextOutputElem Call rpc
call)

{-------------------------------------------------------------------------------
  Internal: dealing with metadata
-------------------------------------------------------------------------------}

fromNextElem :: NextElem inp -> StreamElem NoMetadata inp
fromNextElem :: forall inp. NextElem inp -> StreamElem NoMetadata inp
fromNextElem = NoMetadata -> NextElem inp -> StreamElem NoMetadata inp
forall b a. b -> NextElem a -> StreamElem b a
NextElem.toStreamElem NoMetadata
NoMetadata