module Network.GRPC.Client.StreamType (
ClientHandler'
, ClientHandler
, nonStreaming
, clientStreaming
, clientStreaming_
, serverStreaming
, biDiStreaming
, CanCallRPC(..)
, rpc
, rpcWith
) where
import Control.Monad.Catch
import Control.Monad.IO.Class
import Control.Monad.Reader
import Data.Proxy
import Network.GRPC.Client.Call
import Network.GRPC.Client.Connection
import Network.GRPC.Common
import Network.GRPC.Common.NextElem qualified as NextElem
import Network.GRPC.Common.StreamType
import Network.GRPC.Spec
mkNonStreaming :: forall rpc m.
SupportsStreamingType rpc NonStreaming
=> ( Input rpc
-> m (Output rpc)
)
-> ClientHandler' NonStreaming m rpc
mkNonStreaming :: forall {k} (rpc :: k) (m :: * -> *).
SupportsStreamingType rpc 'NonStreaming =>
(Input rpc -> m (Output rpc)) -> ClientHandler' 'NonStreaming m rpc
mkNonStreaming Input rpc -> m (Output rpc)
h = Handler 'Client 'NonStreaming m rpc
-> ClientHandler' 'NonStreaming m rpc
forall {k} (rpc :: k) (s :: StreamingType) (m :: * -> *).
SupportsStreamingType rpc s =>
Handler 'Client s m rpc -> ClientHandler' s m rpc
ClientHandler (Handler 'Client 'NonStreaming m rpc
-> ClientHandler' 'NonStreaming m rpc)
-> Handler 'Client 'NonStreaming m rpc
-> ClientHandler' 'NonStreaming m rpc
forall a b. (a -> b) -> a -> b
$
Handler 'Client 'NonStreaming m rpc
Input rpc -> m (Output rpc)
h
mkClientStreaming :: forall rpc m.
SupportsStreamingType rpc ClientStreaming
=> ( forall r.
( (NextElem (Input rpc) -> IO ())
-> m r
)
-> m (Output rpc, r)
)
-> ClientHandler' ClientStreaming m rpc
mkClientStreaming :: forall {k} (rpc :: k) (m :: * -> *).
SupportsStreamingType rpc 'ClientStreaming =>
(forall r.
((NextElem (Input rpc) -> IO ()) -> m r) -> m (Output rpc, r))
-> ClientHandler' 'ClientStreaming m rpc
mkClientStreaming forall r.
((NextElem (Input rpc) -> IO ()) -> m r) -> m (Output rpc, r)
h = Handler 'Client 'ClientStreaming m rpc
-> ClientHandler' 'ClientStreaming m rpc
forall {k} (rpc :: k) (s :: StreamingType) (m :: * -> *).
SupportsStreamingType rpc s =>
Handler 'Client s m rpc -> ClientHandler' s m rpc
ClientHandler (Handler 'Client 'ClientStreaming m rpc
-> ClientHandler' 'ClientStreaming m rpc)
-> Handler 'Client 'ClientStreaming m rpc
-> ClientHandler' 'ClientStreaming m rpc
forall a b. (a -> b) -> a -> b
$
(forall r.
((NextElem (Input rpc) -> IO ()) -> m r) -> m (Output rpc, r))
-> Negative m (NextElem (Input rpc) -> IO ()) (Output rpc)
forall (m :: * -> *) a b.
(forall r. (a -> m r) -> m (b, r)) -> Negative m a b
Negative ((NextElem (Input rpc) -> IO ()) -> m r) -> m (Output rpc, r)
forall r.
((NextElem (Input rpc) -> IO ()) -> m r) -> m (Output rpc, r)
h
mkServerStreaming :: forall rpc m.
(SupportsStreamingType rpc ServerStreaming, Functor m)
=> ( forall r.
Input rpc
-> ( IO (NextElem (Output rpc))
-> m r
)
-> m r
)
-> ClientHandler' ServerStreaming m rpc
mkServerStreaming :: forall {k} (rpc :: k) (m :: * -> *).
(SupportsStreamingType rpc 'ServerStreaming, Functor m) =>
(forall r. Input rpc -> (IO (NextElem (Output rpc)) -> m r) -> m r)
-> ClientHandler' 'ServerStreaming m rpc
mkServerStreaming forall r. Input rpc -> (IO (NextElem (Output rpc)) -> m r) -> m r
h = Handler 'Client 'ServerStreaming m rpc
-> ClientHandler' 'ServerStreaming m rpc
forall {k} (rpc :: k) (s :: StreamingType) (m :: * -> *).
SupportsStreamingType rpc s =>
Handler 'Client s m rpc -> ClientHandler' s m rpc
ClientHandler (Handler 'Client 'ServerStreaming m rpc
-> ClientHandler' 'ServerStreaming m rpc)
-> Handler 'Client 'ServerStreaming m rpc
-> ClientHandler' 'ServerStreaming m rpc
forall a b. (a -> b) -> a -> b
$ \Input rpc
input ->
(forall r. (IO (NextElem (Output rpc)) -> m r) -> m ((), r))
-> Negative m (IO (NextElem (Output rpc))) ()
forall (m :: * -> *) a b.
(forall r. (a -> m r) -> m (b, r)) -> Negative m a b
Negative ((forall r. (IO (NextElem (Output rpc)) -> m r) -> m ((), r))
-> Negative m (IO (NextElem (Output rpc))) ())
-> (forall r. (IO (NextElem (Output rpc)) -> m r) -> m ((), r))
-> Negative m (IO (NextElem (Output rpc))) ()
forall a b. (a -> b) -> a -> b
$ \IO (NextElem (Output rpc)) -> m r
k -> ((),) (r -> ((), r)) -> m r -> m ((), r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Input rpc -> (IO (NextElem (Output rpc)) -> m r) -> m r
forall r. Input rpc -> (IO (NextElem (Output rpc)) -> m r) -> m r
h Input rpc
input IO (NextElem (Output rpc)) -> m r
k
mkBiDiStreaming :: forall rpc m.
(SupportsStreamingType rpc BiDiStreaming, Functor m)
=> ( forall r.
( (NextElem (Input rpc) -> IO ())
-> IO (NextElem (Output rpc))
-> m r
)
-> m r
)
-> ClientHandler' BiDiStreaming m rpc
mkBiDiStreaming :: forall {k} (rpc :: k) (m :: * -> *).
(SupportsStreamingType rpc 'BiDiStreaming, Functor m) =>
(forall r.
((NextElem (Input rpc) -> IO ())
-> IO (NextElem (Output rpc)) -> m r)
-> m r)
-> ClientHandler' 'BiDiStreaming m rpc
mkBiDiStreaming forall r.
((NextElem (Input rpc) -> IO ())
-> IO (NextElem (Output rpc)) -> m r)
-> m r
h = Handler 'Client 'BiDiStreaming m rpc
-> ClientHandler' 'BiDiStreaming m rpc
forall {k} (rpc :: k) (s :: StreamingType) (m :: * -> *).
SupportsStreamingType rpc s =>
Handler 'Client s m rpc -> ClientHandler' s m rpc
ClientHandler (Handler 'Client 'BiDiStreaming m rpc
-> ClientHandler' 'BiDiStreaming m rpc)
-> Handler 'Client 'BiDiStreaming m rpc
-> ClientHandler' 'BiDiStreaming m rpc
forall a b. (a -> b) -> a -> b
$
(forall r.
((NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc)))
-> m r)
-> m ((), r))
-> Negative
m (NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc))) ()
forall (m :: * -> *) a b.
(forall r. (a -> m r) -> m (b, r)) -> Negative m a b
Negative ((forall r.
((NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc)))
-> m r)
-> m ((), r))
-> Negative
m (NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc))) ())
-> (forall r.
((NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc)))
-> m r)
-> m ((), r))
-> Negative
m (NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc))) ()
forall a b. (a -> b) -> a -> b
$ \(NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc))) -> m r
k -> (r -> ((), r)) -> m r -> m ((), r)
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((),) (m r -> m ((), r)) -> m r -> m ((), r)
forall a b. (a -> b) -> a -> b
$ ((NextElem (Input rpc) -> IO ())
-> IO (NextElem (Output rpc)) -> m r)
-> m r
forall r.
((NextElem (Input rpc) -> IO ())
-> IO (NextElem (Output rpc)) -> m r)
-> m r
h (((NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc)))
-> m r)
-> (NextElem (Input rpc) -> IO ())
-> IO (NextElem (Output rpc))
-> m r
forall a b c. ((a, b) -> c) -> a -> b -> c
curry (NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc))) -> m r
k)
nonStreaming :: forall rpc m.
ClientHandler' NonStreaming m rpc
-> Input rpc
-> m (Output rpc)
nonStreaming :: forall {k} (rpc :: k) (m :: * -> *).
ClientHandler' 'NonStreaming m rpc -> Input rpc -> m (Output rpc)
nonStreaming (ClientHandler Handler 'Client 'NonStreaming m rpc
h) = Handler 'Client 'NonStreaming m rpc
Input rpc -> m (Output rpc)
h
clientStreaming :: forall rpc m r.
ClientHandler' ClientStreaming m rpc
-> ( (NextElem (Input rpc) -> IO ())
-> m r
)
-> m (Output rpc, r)
clientStreaming :: forall {k} (rpc :: k) (m :: * -> *) r.
ClientHandler' 'ClientStreaming m rpc
-> ((NextElem (Input rpc) -> IO ()) -> m r) -> m (Output rpc, r)
clientStreaming (ClientHandler Handler 'Client 'ClientStreaming m rpc
h) (NextElem (Input rpc) -> IO ()) -> m r
k = Negative m (NextElem (Input rpc) -> IO ()) (Output rpc)
-> forall r.
((NextElem (Input rpc) -> IO ()) -> m r) -> m (Output rpc, r)
forall (m :: * -> *) a b.
Negative m a b -> forall r. (a -> m r) -> m (b, r)
runNegative Handler 'Client 'ClientStreaming m rpc
Negative m (NextElem (Input rpc) -> IO ()) (Output rpc)
h (NextElem (Input rpc) -> IO ()) -> m r
k
clientStreaming_ :: forall rpc m.
Functor m
=> ClientHandler' ClientStreaming m rpc
-> ( (NextElem (Input rpc) -> IO ())
-> m ()
)
-> m (Output rpc)
clientStreaming_ :: forall {k} (rpc :: k) (m :: * -> *).
Functor m =>
ClientHandler' 'ClientStreaming m rpc
-> ((NextElem (Input rpc) -> IO ()) -> m ()) -> m (Output rpc)
clientStreaming_ ClientHandler' 'ClientStreaming m rpc
h (NextElem (Input rpc) -> IO ()) -> m ()
k = (Output rpc, ()) -> Output rpc
forall a b. (a, b) -> a
fst ((Output rpc, ()) -> Output rpc)
-> m (Output rpc, ()) -> m (Output rpc)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ClientHandler' 'ClientStreaming m rpc
-> ((NextElem (Input rpc) -> IO ()) -> m ()) -> m (Output rpc, ())
forall {k} (rpc :: k) (m :: * -> *) r.
ClientHandler' 'ClientStreaming m rpc
-> ((NextElem (Input rpc) -> IO ()) -> m r) -> m (Output rpc, r)
clientStreaming ClientHandler' 'ClientStreaming m rpc
h (NextElem (Input rpc) -> IO ()) -> m ()
k
serverStreaming :: forall rpc m r.
Functor m
=> ClientHandler' ServerStreaming m rpc
-> Input rpc
-> ( IO (NextElem (Output rpc))
-> m r
)
-> m r
serverStreaming :: forall {k} (rpc :: k) (m :: * -> *) r.
Functor m =>
ClientHandler' 'ServerStreaming m rpc
-> Input rpc -> (IO (NextElem (Output rpc)) -> m r) -> m r
serverStreaming (ClientHandler Handler 'Client 'ServerStreaming m rpc
h) Input rpc
input IO (NextElem (Output rpc)) -> m r
k = ((), r) -> r
forall a b. (a, b) -> b
snd (((), r) -> r) -> m ((), r) -> m r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Negative m (IO (NextElem (Output rpc))) ()
-> forall r. (IO (NextElem (Output rpc)) -> m r) -> m ((), r)
forall (m :: * -> *) a b.
Negative m a b -> forall r. (a -> m r) -> m (b, r)
runNegative (Handler 'Client 'ServerStreaming m rpc
Input rpc -> Negative m (IO (NextElem (Output rpc))) ()
h Input rpc
input) IO (NextElem (Output rpc)) -> m r
k
biDiStreaming :: forall rpc m r.
Functor m
=> ClientHandler' BiDiStreaming m rpc
-> ( (NextElem (Input rpc) -> IO ())
-> IO (NextElem (Output rpc))
-> m r
)
-> m r
biDiStreaming :: forall {k} (rpc :: k) (m :: * -> *) r.
Functor m =>
ClientHandler' 'BiDiStreaming m rpc
-> ((NextElem (Input rpc) -> IO ())
-> IO (NextElem (Output rpc)) -> m r)
-> m r
biDiStreaming (ClientHandler Handler 'Client 'BiDiStreaming m rpc
h) (NextElem (Input rpc) -> IO ())
-> IO (NextElem (Output rpc)) -> m r
k = (((), r) -> r) -> m ((), r) -> m r
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((), r) -> r
forall a b. (a, b) -> b
snd (m ((), r) -> m r) -> m ((), r) -> m r
forall a b. (a -> b) -> a -> b
$ Negative
m (NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc))) ()
-> forall r.
((NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc)))
-> m r)
-> m ((), r)
forall (m :: * -> *) a b.
Negative m a b -> forall r. (a -> m r) -> m (b, r)
runNegative Handler 'Client 'BiDiStreaming m rpc
Negative
m (NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc))) ()
h (((NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc)))
-> m r)
-> m ((), r))
-> ((NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc)))
-> m r)
-> m ((), r)
forall a b. (a -> b) -> a -> b
$ ((NextElem (Input rpc) -> IO ())
-> IO (NextElem (Output rpc)) -> m r)
-> (NextElem (Input rpc) -> IO (), IO (NextElem (Output rpc)))
-> m r
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry (NextElem (Input rpc) -> IO ())
-> IO (NextElem (Output rpc)) -> m r
k
class (MonadIO m, MonadMask m) => CanCallRPC m where
getConnection :: m Connection
instance (MonadIO m, MonadMask m) => CanCallRPC (ReaderT Connection m) where
getConnection :: ReaderT Connection m Connection
getConnection = ReaderT Connection m Connection
forall r (m :: * -> *). MonadReader r m => m r
ask
class MkStreamingHandler (styp :: StreamingType) where
mkStreamingHandler ::
( CanCallRPC m
, SupportsClientRpc rpc
, SupportsStreamingType rpc styp
)
=> CallParams rpc -> ClientHandler' styp m rpc
rpc :: forall rpc styp m.
( CanCallRPC m
, SupportsClientRpc rpc
, SupportsStreamingType rpc styp
, Default (RequestMetadata rpc)
)
=> ClientHandler' styp m rpc
rpc :: forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc = CallParams rpc -> ClientHandler' styp m rpc
forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp) =>
CallParams rpc -> ClientHandler' styp m rpc
rpcWith CallParams rpc
forall a. Default a => a
def
rpcWith :: forall rpc styp m.
( CanCallRPC m
, SupportsClientRpc rpc
, SupportsStreamingType rpc styp
)
=> CallParams rpc -> ClientHandler' styp m rpc
rpcWith :: forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp) =>
CallParams rpc -> ClientHandler' styp m rpc
rpcWith =
case Proxy styp -> SStreamingType styp
forall (styp :: StreamingType).
ValidStreamingType styp =>
Proxy styp -> SStreamingType styp
validStreamingType (forall {k} (t :: k). Proxy t
forall (t :: StreamingType). Proxy t
Proxy @styp) of
SStreamingType styp
SNonStreaming -> CallParams rpc -> ClientHandler' styp m rpc
forall {k} (m :: * -> *) (rpc :: k).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp) =>
CallParams rpc -> ClientHandler' styp m rpc
forall (styp :: StreamingType) {k} (m :: * -> *) (rpc :: k).
(MkStreamingHandler styp, CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp) =>
CallParams rpc -> ClientHandler' styp m rpc
mkStreamingHandler
SStreamingType styp
SClientStreaming -> CallParams rpc -> ClientHandler' styp m rpc
forall {k} (m :: * -> *) (rpc :: k).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp) =>
CallParams rpc -> ClientHandler' styp m rpc
forall (styp :: StreamingType) {k} (m :: * -> *) (rpc :: k).
(MkStreamingHandler styp, CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp) =>
CallParams rpc -> ClientHandler' styp m rpc
mkStreamingHandler
SStreamingType styp
SServerStreaming -> CallParams rpc -> ClientHandler' styp m rpc
forall {k} (m :: * -> *) (rpc :: k).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp) =>
CallParams rpc -> ClientHandler' styp m rpc
forall (styp :: StreamingType) {k} (m :: * -> *) (rpc :: k).
(MkStreamingHandler styp, CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp) =>
CallParams rpc -> ClientHandler' styp m rpc
mkStreamingHandler
SStreamingType styp
SBiDiStreaming -> CallParams rpc -> ClientHandler' styp m rpc
forall {k} (m :: * -> *) (rpc :: k).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp) =>
CallParams rpc -> ClientHandler' styp m rpc
forall (styp :: StreamingType) {k} (m :: * -> *) (rpc :: k).
(MkStreamingHandler styp, CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp) =>
CallParams rpc -> ClientHandler' styp m rpc
mkStreamingHandler
instance MkStreamingHandler NonStreaming where
mkStreamingHandler :: forall rpc m.
( CanCallRPC m
, SupportsClientRpc rpc
, SupportsStreamingType rpc NonStreaming
)
=> CallParams rpc -> ClientHandler' NonStreaming m rpc
mkStreamingHandler :: forall {k} (rpc :: k) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc 'NonStreaming) =>
CallParams rpc -> ClientHandler' 'NonStreaming m rpc
mkStreamingHandler CallParams rpc
params = (Input rpc -> m (Output rpc)) -> ClientHandler' 'NonStreaming m rpc
forall {k} (rpc :: k) (m :: * -> *).
SupportsStreamingType rpc 'NonStreaming =>
(Input rpc -> m (Output rpc)) -> ClientHandler' 'NonStreaming m rpc
mkNonStreaming ((Input rpc -> m (Output rpc))
-> ClientHandler' 'NonStreaming m rpc)
-> (Input rpc -> m (Output rpc))
-> ClientHandler' 'NonStreaming m rpc
forall a b. (a -> b) -> a -> b
$ \Input rpc
input -> do
conn <- m Connection
forall (m :: * -> *). CanCallRPC m => m Connection
getConnection
withRPC conn params (Proxy @rpc) $ \Call rpc
call -> do
Call rpc -> Input rpc -> m ()
forall {k} (m :: * -> *) (rpc :: k).
MonadIO m =>
Call rpc -> Input rpc -> m ()
sendFinalInput Call rpc
call Input rpc
input
(output, _trailers) <- Call rpc -> m (Output rpc, ResponseTrailingMetadata rpc)
forall {k} (rpc :: k) (m :: * -> *).
(MonadIO m, HasCallStack) =>
Call rpc -> m (Output rpc, ResponseTrailingMetadata rpc)
recvFinalOutput Call rpc
call
return output
instance MkStreamingHandler ClientStreaming where
mkStreamingHandler :: forall rpc m.
( CanCallRPC m
, SupportsClientRpc rpc
, SupportsStreamingType rpc ClientStreaming
)
=> CallParams rpc -> ClientHandler' ClientStreaming m rpc
mkStreamingHandler :: forall {k} (rpc :: k) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc 'ClientStreaming) =>
CallParams rpc -> ClientHandler' 'ClientStreaming m rpc
mkStreamingHandler CallParams rpc
params = (forall r.
((NextElem (Input rpc) -> IO ()) -> m r) -> m (Output rpc, r))
-> ClientHandler' 'ClientStreaming m rpc
forall {k} (rpc :: k) (m :: * -> *).
SupportsStreamingType rpc 'ClientStreaming =>
(forall r.
((NextElem (Input rpc) -> IO ()) -> m r) -> m (Output rpc, r))
-> ClientHandler' 'ClientStreaming m rpc
mkClientStreaming ((forall r.
((NextElem (Input rpc) -> IO ()) -> m r) -> m (Output rpc, r))
-> ClientHandler' 'ClientStreaming m rpc)
-> (forall r.
((NextElem (Input rpc) -> IO ()) -> m r) -> m (Output rpc, r))
-> ClientHandler' 'ClientStreaming m rpc
forall a b. (a -> b) -> a -> b
$ \(NextElem (Input rpc) -> IO ()) -> m r
k -> do
conn <- m Connection
forall (m :: * -> *). CanCallRPC m => m Connection
getConnection
withRPC conn params (Proxy @rpc) $ \Call rpc
call -> do
r <- (NextElem (Input rpc) -> IO ()) -> m r
k (Call rpc -> StreamElem NoMetadata (Input rpc) -> IO ()
forall {k} (m :: * -> *) (rpc :: k).
(HasCallStack, MonadIO m) =>
Call rpc -> StreamElem NoMetadata (Input rpc) -> m ()
sendInput Call rpc
call (StreamElem NoMetadata (Input rpc) -> IO ())
-> (NextElem (Input rpc) -> StreamElem NoMetadata (Input rpc))
-> NextElem (Input rpc)
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NextElem (Input rpc) -> StreamElem NoMetadata (Input rpc)
forall inp. NextElem inp -> StreamElem NoMetadata inp
fromNextElem)
(output, _trailers) <- recvFinalOutput call
return (output, r)
instance MkStreamingHandler ServerStreaming where
mkStreamingHandler :: forall rpc m.
( CanCallRPC m
, SupportsClientRpc rpc
, SupportsStreamingType rpc ServerStreaming
)
=> CallParams rpc -> ClientHandler' ServerStreaming m rpc
mkStreamingHandler :: forall {k} (rpc :: k) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc 'ServerStreaming) =>
CallParams rpc -> ClientHandler' 'ServerStreaming m rpc
mkStreamingHandler CallParams rpc
params = (forall r. Input rpc -> (IO (NextElem (Output rpc)) -> m r) -> m r)
-> ClientHandler' 'ServerStreaming m rpc
forall {k} (rpc :: k) (m :: * -> *).
(SupportsStreamingType rpc 'ServerStreaming, Functor m) =>
(forall r. Input rpc -> (IO (NextElem (Output rpc)) -> m r) -> m r)
-> ClientHandler' 'ServerStreaming m rpc
mkServerStreaming ((forall r.
Input rpc -> (IO (NextElem (Output rpc)) -> m r) -> m r)
-> ClientHandler' 'ServerStreaming m rpc)
-> (forall r.
Input rpc -> (IO (NextElem (Output rpc)) -> m r) -> m r)
-> ClientHandler' 'ServerStreaming m rpc
forall a b. (a -> b) -> a -> b
$ \Input rpc
input IO (NextElem (Output rpc)) -> m r
k -> do
conn <- m Connection
forall (m :: * -> *). CanCallRPC m => m Connection
getConnection
withRPC conn params (Proxy @rpc) $ \Call rpc
call -> do
Call rpc -> Input rpc -> m ()
forall {k} (m :: * -> *) (rpc :: k).
MonadIO m =>
Call rpc -> Input rpc -> m ()
sendFinalInput Call rpc
call Input rpc
input
IO (NextElem (Output rpc)) -> m r
k (Call rpc -> IO (NextElem (Output rpc))
forall {k} (m :: * -> *) (rpc :: k).
(MonadIO m, HasCallStack) =>
Call rpc -> m (NextElem (Output rpc))
recvNextOutputElem Call rpc
call)
instance MkStreamingHandler BiDiStreaming where
mkStreamingHandler :: forall rpc m.
( CanCallRPC m
, SupportsClientRpc rpc
, SupportsStreamingType rpc BiDiStreaming
)
=> CallParams rpc -> ClientHandler' BiDiStreaming m rpc
mkStreamingHandler :: forall {k} (rpc :: k) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc 'BiDiStreaming) =>
CallParams rpc -> ClientHandler' 'BiDiStreaming m rpc
mkStreamingHandler CallParams rpc
params = (forall r.
((NextElem (Input rpc) -> IO ())
-> IO (NextElem (Output rpc)) -> m r)
-> m r)
-> ClientHandler' 'BiDiStreaming m rpc
forall {k} (rpc :: k) (m :: * -> *).
(SupportsStreamingType rpc 'BiDiStreaming, Functor m) =>
(forall r.
((NextElem (Input rpc) -> IO ())
-> IO (NextElem (Output rpc)) -> m r)
-> m r)
-> ClientHandler' 'BiDiStreaming m rpc
mkBiDiStreaming ((forall r.
((NextElem (Input rpc) -> IO ())
-> IO (NextElem (Output rpc)) -> m r)
-> m r)
-> ClientHandler' 'BiDiStreaming m rpc)
-> (forall r.
((NextElem (Input rpc) -> IO ())
-> IO (NextElem (Output rpc)) -> m r)
-> m r)
-> ClientHandler' 'BiDiStreaming m rpc
forall a b. (a -> b) -> a -> b
$ \(NextElem (Input rpc) -> IO ())
-> IO (NextElem (Output rpc)) -> m r
k -> do
conn <- m Connection
forall (m :: * -> *). CanCallRPC m => m Connection
getConnection
withRPC conn params (Proxy @rpc) $ \Call rpc
call ->
(NextElem (Input rpc) -> IO ())
-> IO (NextElem (Output rpc)) -> m r
k (Call rpc -> StreamElem NoMetadata (Input rpc) -> IO ()
forall {k} (m :: * -> *) (rpc :: k).
(HasCallStack, MonadIO m) =>
Call rpc -> StreamElem NoMetadata (Input rpc) -> m ()
sendInput Call rpc
call (StreamElem NoMetadata (Input rpc) -> IO ())
-> (NextElem (Input rpc) -> StreamElem NoMetadata (Input rpc))
-> NextElem (Input rpc)
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NextElem (Input rpc) -> StreamElem NoMetadata (Input rpc)
forall inp. NextElem inp -> StreamElem NoMetadata inp
fromNextElem)
(Call rpc -> IO (NextElem (Output rpc))
forall {k} (m :: * -> *) (rpc :: k).
(MonadIO m, HasCallStack) =>
Call rpc -> m (NextElem (Output rpc))
recvNextOutputElem Call rpc
call)
fromNextElem :: NextElem inp -> StreamElem NoMetadata inp
fromNextElem :: forall inp. NextElem inp -> StreamElem NoMetadata inp
fromNextElem = NoMetadata -> NextElem inp -> StreamElem NoMetadata inp
forall b a. b -> NextElem a -> StreamElem b a
NextElem.toStreamElem NoMetadata
NoMetadata