{-# LANGUAGE CPP #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
module Kafka.Avro.SchemaRegistry
( schemaRegistry, loadSchema, sendSchema
, schemaRegistry_
, schemaRegistryWithHeaders
, loadSubjectSchema
, getGlobalConfig, getSubjectConfig
, getVersions, isCompatible
, getSubjects
, defaultSchemaRegistryConfig
, cfgAuth
, cfgHeaders
, cfgAutoRegisterSchemas
, SchemaId(..), Subject(..)
, SchemaRegistryConfig
, SchemaRegistry, SchemaRegistryError(..)
, Schema(..)
, Compatibility(..), Version(..)
) where
import Control.Arrow (first)
import Control.Exception (SomeException (SomeException), throwIO)
import Control.Exception.Safe (MonadCatch, try)
import Control.Lens (view, (%~), (&), (.~), (^.))
import Control.Monad (void)
import Control.Monad.Except (liftEither)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Except (ExceptT (ExceptT), except, runExceptT, withExceptT)
import Data.Aeson
import qualified Data.Aeson.Key as A
import qualified Data.Aeson.KeyMap as KM
import Data.Aeson.Types (typeMismatch)
import Data.Avro.Schema.Schema (Schema (..), typeName)
import Data.Bifunctor (bimap)
import Data.Cache as C
import Data.Foldable (traverse_)
import Data.Functor (($>))
import Data.Hashable (Hashable)
import qualified Data.HashMap.Lazy as HM
import Data.Int (Int32)
import Data.List (find)
import Data.String (IsString)
import Data.Text (Text, append, cons, unpack)
import qualified Data.Text.Encoding as Text
import qualified Data.Text.Lazy.Encoding as LText
import Data.Word (Word32)
import GHC.Exception (SomeException, displayException, fromException)
import GHC.Generics (Generic)
import Network.HTTP.Client (HttpException (..), HttpExceptionContent (..), Manager, defaultManagerSettings, newManager, responseStatus)
import Network.HTTP.Types.Header (Header)
import Network.HTTP.Types.Status (notFound404)
import qualified Network.Wreq as Wreq
newtype SchemaId = SchemaId { SchemaId -> Int32
unSchemaId :: Int32} deriving (SchemaId -> SchemaId -> Bool
(SchemaId -> SchemaId -> Bool)
-> (SchemaId -> SchemaId -> Bool) -> Eq SchemaId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SchemaId -> SchemaId -> Bool
== :: SchemaId -> SchemaId -> Bool
$c/= :: SchemaId -> SchemaId -> Bool
/= :: SchemaId -> SchemaId -> Bool
Eq, Eq SchemaId
Eq SchemaId =>
(SchemaId -> SchemaId -> Ordering)
-> (SchemaId -> SchemaId -> Bool)
-> (SchemaId -> SchemaId -> Bool)
-> (SchemaId -> SchemaId -> Bool)
-> (SchemaId -> SchemaId -> Bool)
-> (SchemaId -> SchemaId -> SchemaId)
-> (SchemaId -> SchemaId -> SchemaId)
-> Ord SchemaId
SchemaId -> SchemaId -> Bool
SchemaId -> SchemaId -> Ordering
SchemaId -> SchemaId -> SchemaId
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: SchemaId -> SchemaId -> Ordering
compare :: SchemaId -> SchemaId -> Ordering
$c< :: SchemaId -> SchemaId -> Bool
< :: SchemaId -> SchemaId -> Bool
$c<= :: SchemaId -> SchemaId -> Bool
<= :: SchemaId -> SchemaId -> Bool
$c> :: SchemaId -> SchemaId -> Bool
> :: SchemaId -> SchemaId -> Bool
$c>= :: SchemaId -> SchemaId -> Bool
>= :: SchemaId -> SchemaId -> Bool
$cmax :: SchemaId -> SchemaId -> SchemaId
max :: SchemaId -> SchemaId -> SchemaId
$cmin :: SchemaId -> SchemaId -> SchemaId
min :: SchemaId -> SchemaId -> SchemaId
Ord, Int -> SchemaId -> ShowS
[SchemaId] -> ShowS
SchemaId -> [Char]
(Int -> SchemaId -> ShowS)
-> (SchemaId -> [Char]) -> ([SchemaId] -> ShowS) -> Show SchemaId
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SchemaId -> ShowS
showsPrec :: Int -> SchemaId -> ShowS
$cshow :: SchemaId -> [Char]
show :: SchemaId -> [Char]
$cshowList :: [SchemaId] -> ShowS
showList :: [SchemaId] -> ShowS
Show, Eq SchemaId
Eq SchemaId =>
(Int -> SchemaId -> Int) -> (SchemaId -> Int) -> Hashable SchemaId
Int -> SchemaId -> Int
SchemaId -> Int
forall a. Eq a => (Int -> a -> Int) -> (a -> Int) -> Hashable a
$chashWithSalt :: Int -> SchemaId -> Int
hashWithSalt :: Int -> SchemaId -> Int
$chash :: SchemaId -> Int
hash :: SchemaId -> Int
Hashable)
newtype SchemaName = SchemaName Text deriving (SchemaName -> SchemaName -> Bool
(SchemaName -> SchemaName -> Bool)
-> (SchemaName -> SchemaName -> Bool) -> Eq SchemaName
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SchemaName -> SchemaName -> Bool
== :: SchemaName -> SchemaName -> Bool
$c/= :: SchemaName -> SchemaName -> Bool
/= :: SchemaName -> SchemaName -> Bool
Eq, Eq SchemaName
Eq SchemaName =>
(SchemaName -> SchemaName -> Ordering)
-> (SchemaName -> SchemaName -> Bool)
-> (SchemaName -> SchemaName -> Bool)
-> (SchemaName -> SchemaName -> Bool)
-> (SchemaName -> SchemaName -> Bool)
-> (SchemaName -> SchemaName -> SchemaName)
-> (SchemaName -> SchemaName -> SchemaName)
-> Ord SchemaName
SchemaName -> SchemaName -> Bool
SchemaName -> SchemaName -> Ordering
SchemaName -> SchemaName -> SchemaName
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: SchemaName -> SchemaName -> Ordering
compare :: SchemaName -> SchemaName -> Ordering
$c< :: SchemaName -> SchemaName -> Bool
< :: SchemaName -> SchemaName -> Bool
$c<= :: SchemaName -> SchemaName -> Bool
<= :: SchemaName -> SchemaName -> Bool
$c> :: SchemaName -> SchemaName -> Bool
> :: SchemaName -> SchemaName -> Bool
$c>= :: SchemaName -> SchemaName -> Bool
>= :: SchemaName -> SchemaName -> Bool
$cmax :: SchemaName -> SchemaName -> SchemaName
max :: SchemaName -> SchemaName -> SchemaName
$cmin :: SchemaName -> SchemaName -> SchemaName
min :: SchemaName -> SchemaName -> SchemaName
Ord, [Char] -> SchemaName
([Char] -> SchemaName) -> IsString SchemaName
forall a. ([Char] -> a) -> IsString a
$cfromString :: [Char] -> SchemaName
fromString :: [Char] -> SchemaName
IsString, Int -> SchemaName -> ShowS
[SchemaName] -> ShowS
SchemaName -> [Char]
(Int -> SchemaName -> ShowS)
-> (SchemaName -> [Char])
-> ([SchemaName] -> ShowS)
-> Show SchemaName
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SchemaName -> ShowS
showsPrec :: Int -> SchemaName -> ShowS
$cshow :: SchemaName -> [Char]
show :: SchemaName -> [Char]
$cshowList :: [SchemaName] -> ShowS
showList :: [SchemaName] -> ShowS
Show, Eq SchemaName
Eq SchemaName =>
(Int -> SchemaName -> Int)
-> (SchemaName -> Int) -> Hashable SchemaName
Int -> SchemaName -> Int
SchemaName -> Int
forall a. Eq a => (Int -> a -> Int) -> (a -> Int) -> Hashable a
$chashWithSalt :: Int -> SchemaName -> Int
hashWithSalt :: Int -> SchemaName -> Int
$chash :: SchemaName -> Int
hash :: SchemaName -> Int
Hashable)
newtype Subject = Subject { Subject -> Text
unSubject :: Text} deriving (Subject -> Subject -> Bool
(Subject -> Subject -> Bool)
-> (Subject -> Subject -> Bool) -> Eq Subject
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Subject -> Subject -> Bool
== :: Subject -> Subject -> Bool
$c/= :: Subject -> Subject -> Bool
/= :: Subject -> Subject -> Bool
Eq, Int -> Subject -> ShowS
[Subject] -> ShowS
Subject -> [Char]
(Int -> Subject -> ShowS)
-> (Subject -> [Char]) -> ([Subject] -> ShowS) -> Show Subject
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Subject -> ShowS
showsPrec :: Int -> Subject -> ShowS
$cshow :: Subject -> [Char]
show :: Subject -> [Char]
$cshowList :: [Subject] -> ShowS
showList :: [Subject] -> ShowS
Show, [Char] -> Subject
([Char] -> Subject) -> IsString Subject
forall a. ([Char] -> a) -> IsString a
$cfromString :: [Char] -> Subject
fromString :: [Char] -> Subject
IsString, Eq Subject
Eq Subject =>
(Subject -> Subject -> Ordering)
-> (Subject -> Subject -> Bool)
-> (Subject -> Subject -> Bool)
-> (Subject -> Subject -> Bool)
-> (Subject -> Subject -> Bool)
-> (Subject -> Subject -> Subject)
-> (Subject -> Subject -> Subject)
-> Ord Subject
Subject -> Subject -> Bool
Subject -> Subject -> Ordering
Subject -> Subject -> Subject
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: Subject -> Subject -> Ordering
compare :: Subject -> Subject -> Ordering
$c< :: Subject -> Subject -> Bool
< :: Subject -> Subject -> Bool
$c<= :: Subject -> Subject -> Bool
<= :: Subject -> Subject -> Bool
$c> :: Subject -> Subject -> Bool
> :: Subject -> Subject -> Bool
$c>= :: Subject -> Subject -> Bool
>= :: Subject -> Subject -> Bool
$cmax :: Subject -> Subject -> Subject
max :: Subject -> Subject -> Subject
$cmin :: Subject -> Subject -> Subject
min :: Subject -> Subject -> Subject
Ord, (forall x. Subject -> Rep Subject x)
-> (forall x. Rep Subject x -> Subject) -> Generic Subject
forall x. Rep Subject x -> Subject
forall x. Subject -> Rep Subject x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. Subject -> Rep Subject x
from :: forall x. Subject -> Rep Subject x
$cto :: forall x. Rep Subject x -> Subject
to :: forall x. Rep Subject x -> Subject
Generic, Eq Subject
Eq Subject =>
(Int -> Subject -> Int) -> (Subject -> Int) -> Hashable Subject
Int -> Subject -> Int
Subject -> Int
forall a. Eq a => (Int -> a -> Int) -> (a -> Int) -> Hashable a
$chashWithSalt :: Int -> Subject -> Int
hashWithSalt :: Int -> Subject -> Int
$chash :: Subject -> Int
hash :: Subject -> Int
Hashable)
newtype RegisteredSchema = RegisteredSchema { RegisteredSchema -> Schema
unRegisteredSchema :: Schema} deriving ((forall x. RegisteredSchema -> Rep RegisteredSchema x)
-> (forall x. Rep RegisteredSchema x -> RegisteredSchema)
-> Generic RegisteredSchema
forall x. Rep RegisteredSchema x -> RegisteredSchema
forall x. RegisteredSchema -> Rep RegisteredSchema x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. RegisteredSchema -> Rep RegisteredSchema x
from :: forall x. RegisteredSchema -> Rep RegisteredSchema x
$cto :: forall x. Rep RegisteredSchema x -> RegisteredSchema
to :: forall x. Rep RegisteredSchema x -> RegisteredSchema
Generic, Int -> RegisteredSchema -> ShowS
[RegisteredSchema] -> ShowS
RegisteredSchema -> [Char]
(Int -> RegisteredSchema -> ShowS)
-> (RegisteredSchema -> [Char])
-> ([RegisteredSchema] -> ShowS)
-> Show RegisteredSchema
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RegisteredSchema -> ShowS
showsPrec :: Int -> RegisteredSchema -> ShowS
$cshow :: RegisteredSchema -> [Char]
show :: RegisteredSchema -> [Char]
$cshowList :: [RegisteredSchema] -> ShowS
showList :: [RegisteredSchema] -> ShowS
Show)
newtype Version = Version { Version -> Word32
unVersion :: Word32 } deriving (Version -> Version -> Bool
(Version -> Version -> Bool)
-> (Version -> Version -> Bool) -> Eq Version
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Version -> Version -> Bool
== :: Version -> Version -> Bool
$c/= :: Version -> Version -> Bool
/= :: Version -> Version -> Bool
Eq, Eq Version
Eq Version =>
(Version -> Version -> Ordering)
-> (Version -> Version -> Bool)
-> (Version -> Version -> Bool)
-> (Version -> Version -> Bool)
-> (Version -> Version -> Bool)
-> (Version -> Version -> Version)
-> (Version -> Version -> Version)
-> Ord Version
Version -> Version -> Bool
Version -> Version -> Ordering
Version -> Version -> Version
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: Version -> Version -> Ordering
compare :: Version -> Version -> Ordering
$c< :: Version -> Version -> Bool
< :: Version -> Version -> Bool
$c<= :: Version -> Version -> Bool
<= :: Version -> Version -> Bool
$c> :: Version -> Version -> Bool
> :: Version -> Version -> Bool
$c>= :: Version -> Version -> Bool
>= :: Version -> Version -> Bool
$cmax :: Version -> Version -> Version
max :: Version -> Version -> Version
$cmin :: Version -> Version -> Version
min :: Version -> Version -> Version
Ord, Int -> Version -> ShowS
[Version] -> ShowS
Version -> [Char]
(Int -> Version -> ShowS)
-> (Version -> [Char]) -> ([Version] -> ShowS) -> Show Version
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Version -> ShowS
showsPrec :: Int -> Version -> ShowS
$cshow :: Version -> [Char]
show :: Version -> [Char]
$cshowList :: [Version] -> ShowS
showList :: [Version] -> ShowS
Show, Eq Version
Eq Version =>
(Int -> Version -> Int) -> (Version -> Int) -> Hashable Version
Int -> Version -> Int
Version -> Int
forall a. Eq a => (Int -> a -> Int) -> (a -> Int) -> Hashable a
$chashWithSalt :: Int -> Version -> Int
hashWithSalt :: Int -> Version -> Int
$chash :: Version -> Int
hash :: Version -> Int
Hashable)
data Compatibility = NoCompatibility
| FullCompatibility
| ForwardCompatibility
| BackwardCompatibility
deriving (Compatibility -> Compatibility -> Bool
(Compatibility -> Compatibility -> Bool)
-> (Compatibility -> Compatibility -> Bool) -> Eq Compatibility
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Compatibility -> Compatibility -> Bool
== :: Compatibility -> Compatibility -> Bool
$c/= :: Compatibility -> Compatibility -> Bool
/= :: Compatibility -> Compatibility -> Bool
Eq, Int -> Compatibility -> ShowS
[Compatibility] -> ShowS
Compatibility -> [Char]
(Int -> Compatibility -> ShowS)
-> (Compatibility -> [Char])
-> ([Compatibility] -> ShowS)
-> Show Compatibility
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Compatibility -> ShowS
showsPrec :: Int -> Compatibility -> ShowS
$cshow :: Compatibility -> [Char]
show :: Compatibility -> [Char]
$cshowList :: [Compatibility] -> ShowS
showList :: [Compatibility] -> ShowS
Show, Eq Compatibility
Eq Compatibility =>
(Compatibility -> Compatibility -> Ordering)
-> (Compatibility -> Compatibility -> Bool)
-> (Compatibility -> Compatibility -> Bool)
-> (Compatibility -> Compatibility -> Bool)
-> (Compatibility -> Compatibility -> Bool)
-> (Compatibility -> Compatibility -> Compatibility)
-> (Compatibility -> Compatibility -> Compatibility)
-> Ord Compatibility
Compatibility -> Compatibility -> Bool
Compatibility -> Compatibility -> Ordering
Compatibility -> Compatibility -> Compatibility
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: Compatibility -> Compatibility -> Ordering
compare :: Compatibility -> Compatibility -> Ordering
$c< :: Compatibility -> Compatibility -> Bool
< :: Compatibility -> Compatibility -> Bool
$c<= :: Compatibility -> Compatibility -> Bool
<= :: Compatibility -> Compatibility -> Bool
$c> :: Compatibility -> Compatibility -> Bool
> :: Compatibility -> Compatibility -> Bool
$c>= :: Compatibility -> Compatibility -> Bool
>= :: Compatibility -> Compatibility -> Bool
$cmax :: Compatibility -> Compatibility -> Compatibility
max :: Compatibility -> Compatibility -> Compatibility
$cmin :: Compatibility -> Compatibility -> Compatibility
min :: Compatibility -> Compatibility -> Compatibility
Ord)
data SchemaRegistryConfig = SchemaRegistryConfig
{ SchemaRegistryConfig -> Maybe Auth
cAuth :: Maybe Wreq.Auth
, :: [Header]
, SchemaRegistryConfig -> Bool
cAutoRegisterSchemas :: Bool
}
data SchemaRegistry = SchemaRegistry
{ SchemaRegistry -> Cache SchemaId Schema
srCache :: Cache SchemaId Schema
, SchemaRegistry -> Cache (Subject, SchemaName) SchemaId
srReverseCache :: Cache (Subject, SchemaName) SchemaId
, SchemaRegistry -> [Char]
srBaseUrl :: String
, SchemaRegistry -> SchemaRegistryConfig
srConfig :: SchemaRegistryConfig
}
data SchemaRegistryError = SchemaRegistryConnectError String
| SchemaRegistryLoadError SchemaId
| SchemaRegistrySchemaNotFound SchemaId
| SchemaRegistrySubjectNotFound Subject
| SchemaRegistryNoCompatibleSchemaFound Schema
| SchemaRegistryUrlNotFound String
| SchemaRegistrySendError String
| SchemaRegistryCacheError
deriving (Int -> SchemaRegistryError -> ShowS
[SchemaRegistryError] -> ShowS
SchemaRegistryError -> [Char]
(Int -> SchemaRegistryError -> ShowS)
-> (SchemaRegistryError -> [Char])
-> ([SchemaRegistryError] -> ShowS)
-> Show SchemaRegistryError
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SchemaRegistryError -> ShowS
showsPrec :: Int -> SchemaRegistryError -> ShowS
$cshow :: SchemaRegistryError -> [Char]
show :: SchemaRegistryError -> [Char]
$cshowList :: [SchemaRegistryError] -> ShowS
showList :: [SchemaRegistryError] -> ShowS
Show, SchemaRegistryError -> SchemaRegistryError -> Bool
(SchemaRegistryError -> SchemaRegistryError -> Bool)
-> (SchemaRegistryError -> SchemaRegistryError -> Bool)
-> Eq SchemaRegistryError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SchemaRegistryError -> SchemaRegistryError -> Bool
== :: SchemaRegistryError -> SchemaRegistryError -> Bool
$c/= :: SchemaRegistryError -> SchemaRegistryError -> Bool
/= :: SchemaRegistryError -> SchemaRegistryError -> Bool
Eq)
defaultSchemaRegistryConfig :: SchemaRegistryConfig
defaultSchemaRegistryConfig :: SchemaRegistryConfig
defaultSchemaRegistryConfig = SchemaRegistryConfig
{ cAuth :: Maybe Auth
cAuth = Maybe Auth
forall a. Maybe a
Nothing
, cExtraHeaders :: [Header]
cExtraHeaders = []
, cAutoRegisterSchemas :: Bool
cAutoRegisterSchemas = Bool
True
}
schemaRegistry :: MonadIO m => String -> m SchemaRegistry
schemaRegistry :: forall (m :: * -> *). MonadIO m => [Char] -> m SchemaRegistry
schemaRegistry = Maybe Auth -> [Char] -> m SchemaRegistry
forall (m :: * -> *).
MonadIO m =>
Maybe Auth -> [Char] -> m SchemaRegistry
schemaRegistry_ Maybe Auth
forall a. Maybe a
Nothing
schemaRegistry_ :: MonadIO m => Maybe Wreq.Auth -> String -> m SchemaRegistry
schemaRegistry_ :: forall (m :: * -> *).
MonadIO m =>
Maybe Auth -> [Char] -> m SchemaRegistry
schemaRegistry_ Maybe Auth
auth = Maybe Auth -> [Header] -> [Char] -> m SchemaRegistry
forall (m :: * -> *).
MonadIO m =>
Maybe Auth -> [Header] -> [Char] -> m SchemaRegistry
schemaRegistryWithHeaders Maybe Auth
auth []
schemaRegistryWithHeaders :: MonadIO m => Maybe Wreq.Auth -> [Header] -> String -> m SchemaRegistry
Maybe Auth
auth [Header]
headers [Char]
url
= [Char] -> SchemaRegistryConfig -> m SchemaRegistry
forall (m :: * -> *).
MonadIO m =>
[Char] -> SchemaRegistryConfig -> m SchemaRegistry
schemaRegistryWithConfig [Char]
url (SchemaRegistryConfig -> m SchemaRegistry)
-> SchemaRegistryConfig -> m SchemaRegistry
forall a b. (a -> b) -> a -> b
$ Maybe Auth -> SchemaRegistryConfig -> SchemaRegistryConfig
cfgAuth Maybe Auth
auth (SchemaRegistryConfig -> SchemaRegistryConfig)
-> SchemaRegistryConfig -> SchemaRegistryConfig
forall a b. (a -> b) -> a -> b
$ [Header] -> SchemaRegistryConfig -> SchemaRegistryConfig
cfgHeaders [Header]
headers SchemaRegistryConfig
defaultSchemaRegistryConfig
schemaRegistryWithConfig :: MonadIO m => String -> SchemaRegistryConfig -> m SchemaRegistry
schemaRegistryWithConfig :: forall (m :: * -> *).
MonadIO m =>
[Char] -> SchemaRegistryConfig -> m SchemaRegistry
schemaRegistryWithConfig [Char]
url SchemaRegistryConfig
config = IO SchemaRegistry -> m SchemaRegistry
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO SchemaRegistry -> m SchemaRegistry)
-> IO SchemaRegistry -> m SchemaRegistry
forall a b. (a -> b) -> a -> b
$
Cache SchemaId Schema
-> Cache (Subject, SchemaName) SchemaId
-> [Char]
-> SchemaRegistryConfig
-> SchemaRegistry
SchemaRegistry
(Cache SchemaId Schema
-> Cache (Subject, SchemaName) SchemaId
-> [Char]
-> SchemaRegistryConfig
-> SchemaRegistry)
-> IO (Cache SchemaId Schema)
-> IO
(Cache (Subject, SchemaName) SchemaId
-> [Char] -> SchemaRegistryConfig -> SchemaRegistry)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe TimeSpec -> IO (Cache SchemaId Schema)
forall k v. Maybe TimeSpec -> IO (Cache k v)
newCache Maybe TimeSpec
forall a. Maybe a
Nothing
IO
(Cache (Subject, SchemaName) SchemaId
-> [Char] -> SchemaRegistryConfig -> SchemaRegistry)
-> IO (Cache (Subject, SchemaName) SchemaId)
-> IO ([Char] -> SchemaRegistryConfig -> SchemaRegistry)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe TimeSpec -> IO (Cache (Subject, SchemaName) SchemaId)
forall k v. Maybe TimeSpec -> IO (Cache k v)
newCache Maybe TimeSpec
forall a. Maybe a
Nothing
IO ([Char] -> SchemaRegistryConfig -> SchemaRegistry)
-> IO [Char] -> IO (SchemaRegistryConfig -> SchemaRegistry)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> [Char] -> IO [Char]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Char]
url
IO (SchemaRegistryConfig -> SchemaRegistry)
-> IO SchemaRegistryConfig -> IO SchemaRegistry
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> SchemaRegistryConfig -> IO SchemaRegistryConfig
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SchemaRegistryConfig
config
cfgAuth :: Maybe Wreq.Auth -> SchemaRegistryConfig -> SchemaRegistryConfig
cfgAuth :: Maybe Auth -> SchemaRegistryConfig -> SchemaRegistryConfig
cfgAuth Maybe Auth
auth SchemaRegistryConfig
config = SchemaRegistryConfig
config { cAuth = auth }
cfgHeaders :: [Header] -> SchemaRegistryConfig -> SchemaRegistryConfig
[Header]
headers SchemaRegistryConfig
config = SchemaRegistryConfig
config { cExtraHeaders = headers }
cfgAutoRegisterSchemas :: Bool -> SchemaRegistryConfig -> SchemaRegistryConfig
cfgAutoRegisterSchemas :: Bool -> SchemaRegistryConfig -> SchemaRegistryConfig
cfgAutoRegisterSchemas Bool
autoRegisterSchemas SchemaRegistryConfig
config = SchemaRegistryConfig
config { cAutoRegisterSchemas = autoRegisterSchemas }
loadSchema :: MonadIO m => SchemaRegistry -> SchemaId -> m (Either SchemaRegistryError Schema)
loadSchema :: forall (m :: * -> *).
MonadIO m =>
SchemaRegistry -> SchemaId -> m (Either SchemaRegistryError Schema)
loadSchema SchemaRegistry
sr SchemaId
sid = do
Maybe Schema
sc <- SchemaRegistry -> SchemaId -> m (Maybe Schema)
forall (m :: * -> *).
MonadIO m =>
SchemaRegistry -> SchemaId -> m (Maybe Schema)
cachedSchema SchemaRegistry
sr SchemaId
sid
case Maybe Schema
sc of
Just Schema
s -> Either SchemaRegistryError Schema
-> m (Either SchemaRegistryError Schema)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Schema -> Either SchemaRegistryError Schema
forall a b. b -> Either a b
Right Schema
s)
Maybe Schema
Nothing -> IO (Either SchemaRegistryError Schema)
-> m (Either SchemaRegistryError Schema)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SchemaRegistryError Schema)
-> m (Either SchemaRegistryError Schema))
-> IO (Either SchemaRegistryError Schema)
-> m (Either SchemaRegistryError Schema)
forall a b. (a -> b) -> a -> b
$ do
Either SchemaRegistryError RegisteredSchema
res <- SchemaRegistry
-> SchemaId -> IO (Either SchemaRegistryError RegisteredSchema)
getSchemaById SchemaRegistry
sr SchemaId
sid
(RegisteredSchema -> IO Schema)
-> Either SchemaRegistryError RegisteredSchema
-> IO (Either SchemaRegistryError Schema)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b)
-> Either SchemaRegistryError a -> f (Either SchemaRegistryError b)
traverse ((\Schema
schema -> Schema
schema Schema -> IO () -> IO Schema
forall a b. a -> IO b -> IO a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ SchemaRegistry -> SchemaId -> Schema -> IO ()
forall (m :: * -> *).
MonadIO m =>
SchemaRegistry -> SchemaId -> Schema -> m ()
cacheSchema SchemaRegistry
sr SchemaId
sid Schema
schema) (Schema -> IO Schema)
-> (RegisteredSchema -> Schema) -> RegisteredSchema -> IO Schema
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RegisteredSchema -> Schema
unRegisteredSchema) Either SchemaRegistryError RegisteredSchema
res
loadSubjectSchema :: MonadIO m => SchemaRegistry -> Subject -> Version -> m (Either SchemaRegistryError Schema)
loadSubjectSchema :: forall (m :: * -> *).
MonadIO m =>
SchemaRegistry
-> Subject -> Version -> m (Either SchemaRegistryError Schema)
loadSubjectSchema SchemaRegistry
sr (Subject Text
sbj) (Version Word32
version) = do
let url :: [Char]
url = SchemaRegistry -> [Char]
srBaseUrl SchemaRegistry
sr [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"/subjects/" [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Text -> [Char]
unpack Text
sbj [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"/versions/" [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Word32 -> [Char]
forall a. Show a => a -> [Char]
show Word32
version
Either SomeException (Response ByteString)
respE <- IO (Either SomeException (Response ByteString))
-> m (Either SomeException (Response ByteString))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SomeException (Response ByteString))
-> m (Either SomeException (Response ByteString)))
-> (IO (Response ByteString)
-> IO (Either SomeException (Response ByteString)))
-> IO (Response ByteString)
-> m (Either SomeException (Response ByteString))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Response ByteString)
-> IO (Either SomeException (Response ByteString))
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (IO (Response ByteString)
-> m (Either SomeException (Response ByteString)))
-> IO (Response ByteString)
-> m (Either SomeException (Response ByteString))
forall a b. (a -> b) -> a -> b
$ Options -> [Char] -> IO (Response ByteString)
Wreq.getWith (SchemaRegistry -> Options
wreqOpts SchemaRegistry
sr) [Char]
url
case Either SomeException (Response ByteString)
respE of
Left SomeException
exc -> Either SchemaRegistryError Schema
-> m (Either SchemaRegistryError Schema)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either SchemaRegistryError Schema
-> m (Either SchemaRegistryError Schema))
-> (SchemaRegistryError -> Either SchemaRegistryError Schema)
-> SchemaRegistryError
-> m (Either SchemaRegistryError Schema)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SchemaRegistryError -> Either SchemaRegistryError Schema
forall a b. a -> Either a b
Left (SchemaRegistryError -> m (Either SchemaRegistryError Schema))
-> SchemaRegistryError -> m (Either SchemaRegistryError Schema)
forall a b. (a -> b) -> a -> b
$ [Char] -> SomeException -> SchemaRegistryError
wrapErrorWithUrl [Char]
url SomeException
exc
Right Response ByteString
resp -> do
let wrapped :: Either SchemaRegistryError Value
wrapped = (SomeException -> SchemaRegistryError)
-> (Response Value -> Value)
-> Either SomeException (Response Value)
-> Either SchemaRegistryError Value
forall a b c d. (a -> b) -> (c -> d) -> Either a c -> Either b d
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap SomeException -> SchemaRegistryError
wrapError (Getting Value (Response Value) Value -> Response Value -> Value
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting Value (Response Value) Value
forall body0 body1 (f :: * -> *).
Functor f =>
(body0 -> f body1) -> Response body0 -> f (Response body1)
Wreq.responseBody) (Response ByteString -> Either SomeException (Response Value)
forall (m :: * -> *).
MonadThrow m =>
Response ByteString -> m (Response Value)
Wreq.asValue Response ByteString
resp)
Either SchemaRegistryError RegisteredSchema
schema <- [Char]
-> Either SchemaRegistryError Value
-> m (Either SchemaRegistryError RegisteredSchema)
forall (m :: * -> *) a e.
(MonadIO m, FromJSON a) =>
[Char] -> Either e Value -> m (Either e a)
getData [Char]
"schema" Either SchemaRegistryError Value
wrapped
Either SchemaRegistryError SchemaId
schemaId <- [Char]
-> Either SchemaRegistryError Value
-> m (Either SchemaRegistryError SchemaId)
forall (m :: * -> *) a e.
(MonadIO m, FromJSON a) =>
[Char] -> Either e Value -> m (Either e a)
getData [Char]
"id" Either SchemaRegistryError Value
wrapped
case (,) (RegisteredSchema -> SchemaId -> (RegisteredSchema, SchemaId))
-> Either SchemaRegistryError RegisteredSchema
-> Either
SchemaRegistryError (SchemaId -> (RegisteredSchema, SchemaId))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either SchemaRegistryError RegisteredSchema
schema Either
SchemaRegistryError (SchemaId -> (RegisteredSchema, SchemaId))
-> Either SchemaRegistryError SchemaId
-> Either SchemaRegistryError (RegisteredSchema, SchemaId)
forall a b.
Either SchemaRegistryError (a -> b)
-> Either SchemaRegistryError a -> Either SchemaRegistryError b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Either SchemaRegistryError SchemaId
schemaId of
Left SchemaRegistryError
err -> Either SchemaRegistryError Schema
-> m (Either SchemaRegistryError Schema)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either SchemaRegistryError Schema
-> m (Either SchemaRegistryError Schema))
-> Either SchemaRegistryError Schema
-> m (Either SchemaRegistryError Schema)
forall a b. (a -> b) -> a -> b
$ SchemaRegistryError -> Either SchemaRegistryError Schema
forall a b. a -> Either a b
Left SchemaRegistryError
err
Right (RegisteredSchema Schema
schema, SchemaId
schemaId) -> SchemaRegistry -> SchemaId -> Schema -> m ()
forall (m :: * -> *).
MonadIO m =>
SchemaRegistry -> SchemaId -> Schema -> m ()
cacheSchema SchemaRegistry
sr SchemaId
schemaId Schema
schema m ()
-> Either SchemaRegistryError Schema
-> m (Either SchemaRegistryError Schema)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Schema -> Either SchemaRegistryError Schema
forall a b. b -> Either a b
Right Schema
schema
where
getData :: (MonadIO m, FromJSON a) => String -> Either e Value -> m (Either e a)
getData :: forall (m :: * -> *) a e.
(MonadIO m, FromJSON a) =>
[Char] -> Either e Value -> m (Either e a)
getData [Char]
key = (e -> m (Either e a))
-> (Value -> m (Either e a)) -> Either e Value -> m (Either e a)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (Either e a -> m (Either e a)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either e a -> m (Either e a))
-> (e -> Either e a) -> e -> m (Either e a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> Either e a
forall a b. a -> Either a b
Left) ([Char] -> Value -> m (Either e a)
forall (m :: * -> *) a e.
(MonadIO m, FromJSON a) =>
[Char] -> Value -> m (Either e a)
viewData [Char]
key)
viewData :: (MonadIO m, FromJSON a) => String -> Value -> m (Either e a)
viewData :: forall (m :: * -> *) a e.
(MonadIO m, FromJSON a) =>
[Char] -> Value -> m (Either e a)
viewData [Char]
key Value
value = IO (Either e a) -> m (Either e a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either e a) -> m (Either e a))
-> IO (Either e a) -> m (Either e a)
forall a b. (a -> b) -> a -> b
$ ([Char] -> IO (Either e a))
-> (a -> IO (Either e a)) -> Either [Char] a -> IO (Either e a)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (JSONError -> IO (Either e a)
forall e a. Exception e => e -> IO a
throwIO (JSONError -> IO (Either e a))
-> ([Char] -> JSONError) -> [Char] -> IO (Either e a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Char] -> JSONError
Wreq.JSONError)
(Either e a -> IO (Either e a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either e a -> IO (Either e a))
-> (a -> Either e a) -> a -> IO (Either e a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Either e a
forall a. a -> Either e a
forall (m :: * -> *) a. Monad m => a -> m a
return)
(Value -> Either [Char] a
forall a. FromJSON a => Value -> Either [Char] a
toData Value
value)
toData :: FromJSON a => Value -> Either String a
toData :: forall a. FromJSON a => Value -> Either [Char] a
toData Value
value = case Value -> Result a
forall a. FromJSON a => Value -> Result a
fromJSON Value
value of
Success a
a -> a -> Either [Char] a
forall a b. b -> Either a b
Right a
a
Error [Char]
e -> [Char] -> Either [Char] a
forall a b. a -> Either a b
Left [Char]
e
sendSchema :: MonadIO m => SchemaRegistry -> Subject -> Schema -> m (Either SchemaRegistryError SchemaId)
sendSchema :: forall (m :: * -> *).
MonadIO m =>
SchemaRegistry
-> Subject -> Schema -> m (Either SchemaRegistryError SchemaId)
sendSchema SchemaRegistry
sr Subject
subj Schema
sc = do
let schemaName :: SchemaName
schemaName = Schema -> SchemaName
fullTypeName Schema
sc
Maybe SchemaId
sid <- SchemaRegistry -> Subject -> SchemaName -> m (Maybe SchemaId)
forall (m :: * -> *).
MonadIO m =>
SchemaRegistry -> Subject -> SchemaName -> m (Maybe SchemaId)
cachedId SchemaRegistry
sr Subject
subj SchemaName
schemaName
case Maybe SchemaId
sid of
Just SchemaId
sid' -> Either SchemaRegistryError SchemaId
-> m (Either SchemaRegistryError SchemaId)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (SchemaId -> Either SchemaRegistryError SchemaId
forall a b. b -> Either a b
Right SchemaId
sid')
Maybe SchemaId
Nothing -> if SchemaRegistryConfig -> Bool
cAutoRegisterSchemas (SchemaRegistry -> SchemaRegistryConfig
srConfig SchemaRegistry
sr)
then SchemaRegistry
-> Subject -> Schema -> m (Either SchemaRegistryError SchemaId)
forall (m :: * -> *).
MonadIO m =>
SchemaRegistry
-> Subject -> Schema -> m (Either SchemaRegistryError SchemaId)
registerSchema SchemaRegistry
sr Subject
subj Schema
sc
else SchemaRegistry
-> Subject -> Schema -> m (Either SchemaRegistryError SchemaId)
forall (m :: * -> *).
MonadIO m =>
SchemaRegistry
-> Subject -> Schema -> m (Either SchemaRegistryError SchemaId)
getCompatibleSchema SchemaRegistry
sr Subject
subj Schema
sc
registerSchema :: MonadIO m => SchemaRegistry -> Subject -> Schema -> m (Either SchemaRegistryError SchemaId)
registerSchema :: forall (m :: * -> *).
MonadIO m =>
SchemaRegistry
-> Subject -> Schema -> m (Either SchemaRegistryError SchemaId)
registerSchema SchemaRegistry
sr Subject
subj Schema
sc = do
let schemaName :: SchemaName
schemaName = Schema -> SchemaName
fullTypeName Schema
sc
Either SchemaRegistryError SchemaId
res <- IO (Either SchemaRegistryError SchemaId)
-> m (Either SchemaRegistryError SchemaId)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SchemaRegistryError SchemaId)
-> m (Either SchemaRegistryError SchemaId))
-> IO (Either SchemaRegistryError SchemaId)
-> m (Either SchemaRegistryError SchemaId)
forall a b. (a -> b) -> a -> b
$ SchemaRegistry
-> Subject
-> RegisteredSchema
-> IO (Either SchemaRegistryError SchemaId)
putSchema SchemaRegistry
sr Subject
subj (Schema -> RegisteredSchema
RegisteredSchema Schema
sc)
(SchemaId -> m ()) -> Either SchemaRegistryError SchemaId -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (SchemaRegistry -> Subject -> SchemaName -> SchemaId -> m ()
forall (m :: * -> *).
MonadIO m =>
SchemaRegistry -> Subject -> SchemaName -> SchemaId -> m ()
cacheId SchemaRegistry
sr Subject
subj SchemaName
schemaName) Either SchemaRegistryError SchemaId
res
(SchemaId -> m ()) -> Either SchemaRegistryError SchemaId -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (\SchemaId
sid' -> SchemaRegistry -> SchemaId -> Schema -> m ()
forall (m :: * -> *).
MonadIO m =>
SchemaRegistry -> SchemaId -> Schema -> m ()
cacheSchema SchemaRegistry
sr SchemaId
sid' Schema
sc) Either SchemaRegistryError SchemaId
res
Either SchemaRegistryError SchemaId
-> m (Either SchemaRegistryError SchemaId)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either SchemaRegistryError SchemaId
res
getCompatibleSchema :: MonadIO m => SchemaRegistry -> Subject -> Schema -> m (Either SchemaRegistryError SchemaId)
getCompatibleSchema :: forall (m :: * -> *).
MonadIO m =>
SchemaRegistry
-> Subject -> Schema -> m (Either SchemaRegistryError SchemaId)
getCompatibleSchema SchemaRegistry
sr Subject
subj Schema
sc = IO (Either SchemaRegistryError SchemaId)
-> m (Either SchemaRegistryError SchemaId)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SchemaRegistryError SchemaId)
-> m (Either SchemaRegistryError SchemaId))
-> (ExceptT SchemaRegistryError IO SchemaId
-> IO (Either SchemaRegistryError SchemaId))
-> ExceptT SchemaRegistryError IO SchemaId
-> m (Either SchemaRegistryError SchemaId)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT SchemaRegistryError IO SchemaId
-> IO (Either SchemaRegistryError SchemaId)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT SchemaRegistryError IO SchemaId
-> m (Either SchemaRegistryError SchemaId))
-> ExceptT SchemaRegistryError IO SchemaId
-> m (Either SchemaRegistryError SchemaId)
forall a b. (a -> b) -> a -> b
$ do
let schemaName :: SchemaName
schemaName = Schema -> SchemaName
fullTypeName Schema
sc
[Version]
versions <- Either SchemaRegistryError [Version]
-> ExceptT SchemaRegistryError IO [Version]
forall e (m :: * -> *) a. MonadError e m => Either e a -> m a
liftEither (Either SchemaRegistryError [Version]
-> ExceptT SchemaRegistryError IO [Version])
-> ExceptT
SchemaRegistryError IO (Either SchemaRegistryError [Version])
-> ExceptT SchemaRegistryError IO [Version]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< SchemaRegistry
-> Subject
-> ExceptT
SchemaRegistryError IO (Either SchemaRegistryError [Version])
forall (m :: * -> *).
MonadIO m =>
SchemaRegistry
-> Subject -> m (Either SchemaRegistryError [Version])
getVersions SchemaRegistry
sr Subject
subj
[(Bool, Version)]
compatibilites <- Either SchemaRegistryError [(Bool, Version)]
-> ExceptT SchemaRegistryError IO [(Bool, Version)]
forall e (m :: * -> *) a. MonadError e m => Either e a -> m a
liftEither (Either SchemaRegistryError [(Bool, Version)]
-> ExceptT SchemaRegistryError IO [(Bool, Version)])
-> ([Either SchemaRegistryError (Bool, Version)]
-> Either SchemaRegistryError [(Bool, Version)])
-> [Either SchemaRegistryError (Bool, Version)]
-> ExceptT SchemaRegistryError IO [(Bool, Version)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Either SchemaRegistryError (Bool, Version)]
-> Either SchemaRegistryError [(Bool, Version)]
forall (t :: * -> *) (f :: * -> *) a.
(Traversable t, Applicative f) =>
t (f a) -> f (t a)
forall (f :: * -> *) a. Applicative f => [f a] -> f [a]
sequenceA
([Either SchemaRegistryError (Bool, Version)]
-> ExceptT SchemaRegistryError IO [(Bool, Version)])
-> ExceptT
SchemaRegistryError IO [Either SchemaRegistryError (Bool, Version)]
-> ExceptT SchemaRegistryError IO [(Bool, Version)]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (Version
-> ExceptT
SchemaRegistryError
IO
(Either SchemaRegistryError (Bool, Version)))
-> [Version]
-> ExceptT
SchemaRegistryError IO [Either SchemaRegistryError (Bool, Version)]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse (\Version
ver -> (Bool -> (Bool, Version))
-> Either SchemaRegistryError Bool
-> Either SchemaRegistryError (Bool, Version)
forall a b.
(a -> b)
-> Either SchemaRegistryError a -> Either SchemaRegistryError b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (,Version
ver) (Either SchemaRegistryError Bool
-> Either SchemaRegistryError (Bool, Version))
-> ExceptT SchemaRegistryError IO (Either SchemaRegistryError Bool)
-> ExceptT
SchemaRegistryError IO (Either SchemaRegistryError (Bool, Version))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SchemaRegistry
-> Subject
-> Version
-> Schema
-> ExceptT SchemaRegistryError IO (Either SchemaRegistryError Bool)
forall (m :: * -> *).
MonadIO m =>
SchemaRegistry
-> Subject
-> Version
-> Schema
-> m (Either SchemaRegistryError Bool)
isCompatible SchemaRegistry
sr Subject
subj Version
ver Schema
sc) [Version]
versions
let mCompatibleVersion :: Maybe Version
mCompatibleVersion = (Bool, Version) -> Version
forall a b. (a, b) -> b
snd ((Bool, Version) -> Version)
-> Maybe (Bool, Version) -> Maybe Version
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ((Bool, Version) -> Bool)
-> [(Bool, Version)] -> Maybe (Bool, Version)
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (Bool, Version) -> Bool
forall a b. (a, b) -> a
fst [(Bool, Version)]
compatibilites
Version
compatibleVersion <- Either SchemaRegistryError Version
-> ExceptT SchemaRegistryError IO Version
forall e (m :: * -> *) a. MonadError e m => Either e a -> m a
liftEither
(Either SchemaRegistryError Version
-> ExceptT SchemaRegistryError IO Version)
-> Either SchemaRegistryError Version
-> ExceptT SchemaRegistryError IO Version
forall a b. (a -> b) -> a -> b
$ case Maybe Version
mCompatibleVersion of
Just Version
version -> Version -> Either SchemaRegistryError Version
forall a b. b -> Either a b
Right Version
version
Maybe Version
Nothing -> SchemaRegistryError -> Either SchemaRegistryError Version
forall a b. a -> Either a b
Left (SchemaRegistryError -> Either SchemaRegistryError Version)
-> SchemaRegistryError -> Either SchemaRegistryError Version
forall a b. (a -> b) -> a -> b
$ Schema -> SchemaRegistryError
SchemaRegistryNoCompatibleSchemaFound Schema
sc
Schema
_ <- Either SchemaRegistryError Schema
-> ExceptT SchemaRegistryError IO Schema
forall e (m :: * -> *) a. MonadError e m => Either e a -> m a
liftEither (Either SchemaRegistryError Schema
-> ExceptT SchemaRegistryError IO Schema)
-> ExceptT
SchemaRegistryError IO (Either SchemaRegistryError Schema)
-> ExceptT SchemaRegistryError IO Schema
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< SchemaRegistry
-> Subject
-> Version
-> ExceptT
SchemaRegistryError IO (Either SchemaRegistryError Schema)
forall (m :: * -> *).
MonadIO m =>
SchemaRegistry
-> Subject -> Version -> m (Either SchemaRegistryError Schema)
loadSubjectSchema SchemaRegistry
sr Subject
subj Version
compatibleVersion
Maybe SchemaId
mSid <- SchemaRegistry
-> Subject
-> SchemaName
-> ExceptT SchemaRegistryError IO (Maybe SchemaId)
forall (m :: * -> *).
MonadIO m =>
SchemaRegistry -> Subject -> SchemaName -> m (Maybe SchemaId)
cachedId SchemaRegistry
sr Subject
subj SchemaName
schemaName
Either SchemaRegistryError SchemaId
-> ExceptT SchemaRegistryError IO SchemaId
forall e (m :: * -> *) a. MonadError e m => Either e a -> m a
liftEither (Either SchemaRegistryError SchemaId
-> ExceptT SchemaRegistryError IO SchemaId)
-> Either SchemaRegistryError SchemaId
-> ExceptT SchemaRegistryError IO SchemaId
forall a b. (a -> b) -> a -> b
$ case Maybe SchemaId
mSid of
Just SchemaId
sid' -> SchemaId -> Either SchemaRegistryError SchemaId
forall a. a -> Either SchemaRegistryError a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SchemaId
sid'
Maybe SchemaId
Nothing -> SchemaRegistryError -> Either SchemaRegistryError SchemaId
forall a b. a -> Either a b
Left SchemaRegistryError
SchemaRegistryCacheError
getVersions :: MonadIO m => SchemaRegistry -> Subject -> m (Either SchemaRegistryError [Version])
getVersions :: forall (m :: * -> *).
MonadIO m =>
SchemaRegistry
-> Subject -> m (Either SchemaRegistryError [Version])
getVersions SchemaRegistry
sr subj :: Subject
subj@(Subject Text
sbj) = IO (Either SchemaRegistryError [Version])
-> m (Either SchemaRegistryError [Version])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SchemaRegistryError [Version])
-> m (Either SchemaRegistryError [Version]))
-> (ExceptT SchemaRegistryError IO [Version]
-> IO (Either SchemaRegistryError [Version]))
-> ExceptT SchemaRegistryError IO [Version]
-> m (Either SchemaRegistryError [Version])
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT SchemaRegistryError IO [Version]
-> IO (Either SchemaRegistryError [Version])
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT SchemaRegistryError IO [Version]
-> m (Either SchemaRegistryError [Version]))
-> ExceptT SchemaRegistryError IO [Version]
-> m (Either SchemaRegistryError [Version])
forall a b. (a -> b) -> a -> b
$ do
let url :: [Char]
url = SchemaRegistry -> [Char]
srBaseUrl SchemaRegistry
sr [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"/subjects/" [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Text -> [Char]
unpack Text
sbj [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"/versions"
Response ByteString
resp <- (SomeException -> SchemaRegistryError)
-> IO (Response ByteString)
-> ExceptT SchemaRegistryError IO (Response ByteString)
forall (m :: * -> *) e a.
MonadCatch m =>
(SomeException -> e) -> m a -> ExceptT e m a
tryWith (Subject -> SomeException -> SchemaRegistryError
wrapErrorWithSubject Subject
subj) (IO (Response ByteString)
-> ExceptT SchemaRegistryError IO (Response ByteString))
-> IO (Response ByteString)
-> ExceptT SchemaRegistryError IO (Response ByteString)
forall a b. (a -> b) -> a -> b
$ Options -> [Char] -> IO (Response ByteString)
Wreq.getWith (SchemaRegistry -> Options
wreqOpts SchemaRegistry
sr) [Char]
url
Either SchemaRegistryError [Version]
-> ExceptT SchemaRegistryError IO [Version]
forall (m :: * -> *) e a. Monad m => Either e a -> ExceptT e m a
except (Either SchemaRegistryError [Version]
-> ExceptT SchemaRegistryError IO [Version])
-> Either SchemaRegistryError [Version]
-> ExceptT SchemaRegistryError IO [Version]
forall a b. (a -> b) -> a -> b
$ (SomeException -> SchemaRegistryError)
-> (Response [Word32] -> [Version])
-> Either SomeException (Response [Word32])
-> Either SchemaRegistryError [Version]
forall a b c d. (a -> b) -> (c -> d) -> Either a c -> Either b d
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap SomeException -> SchemaRegistryError
wrapError ((Word32 -> Version) -> [Word32] -> [Version]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Word32 -> Version
Version ([Word32] -> [Version])
-> (Response [Word32] -> [Word32])
-> Response [Word32]
-> [Version]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Getting [Word32] (Response [Word32]) [Word32]
-> Response [Word32] -> [Word32]
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting [Word32] (Response [Word32]) [Word32]
forall body0 body1 (f :: * -> *).
Functor f =>
(body0 -> f body1) -> Response body0 -> f (Response body1)
Wreq.responseBody) (Response ByteString -> Either SomeException (Response [Word32])
forall (m :: * -> *) a.
(MonadThrow m, FromJSON a) =>
Response ByteString -> m (Response a)
Wreq.asJSON Response ByteString
resp)
isCompatible :: MonadIO m => SchemaRegistry -> Subject -> Version -> Schema -> m (Either SchemaRegistryError Bool)
isCompatible :: forall (m :: * -> *).
MonadIO m =>
SchemaRegistry
-> Subject
-> Version
-> Schema
-> m (Either SchemaRegistryError Bool)
isCompatible SchemaRegistry
sr (Subject Text
sbj) (Version Word32
version) Schema
schema = do
let url :: [Char]
url = SchemaRegistry -> [Char]
srBaseUrl SchemaRegistry
sr [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"/compatibility/subjects/" [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Text -> [Char]
unpack Text
sbj [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"/versions/" [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Word32 -> [Char]
forall a. Show a => a -> [Char]
show Word32
version
Either SomeException (Response ByteString)
respE <- IO (Either SomeException (Response ByteString))
-> m (Either SomeException (Response ByteString))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SomeException (Response ByteString))
-> m (Either SomeException (Response ByteString)))
-> (IO (Response ByteString)
-> IO (Either SomeException (Response ByteString)))
-> IO (Response ByteString)
-> m (Either SomeException (Response ByteString))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Response ByteString)
-> IO (Either SomeException (Response ByteString))
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (IO (Response ByteString)
-> m (Either SomeException (Response ByteString)))
-> IO (Response ByteString)
-> m (Either SomeException (Response ByteString))
forall a b. (a -> b) -> a -> b
$ Options -> [Char] -> Value -> IO (Response ByteString)
forall a.
Postable a =>
Options -> [Char] -> a -> IO (Response ByteString)
Wreq.postWith (SchemaRegistry -> Options
wreqOpts SchemaRegistry
sr) [Char]
url (RegisteredSchema -> Value
forall a. ToJSON a => a -> Value
toJSON (RegisteredSchema -> Value) -> RegisteredSchema -> Value
forall a b. (a -> b) -> a -> b
$ Schema -> RegisteredSchema
RegisteredSchema Schema
schema)
case Either SomeException (Response ByteString)
respE of
Left SomeException
exc -> Either SchemaRegistryError Bool
-> m (Either SchemaRegistryError Bool)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either SchemaRegistryError Bool
-> m (Either SchemaRegistryError Bool))
-> (SchemaRegistryError -> Either SchemaRegistryError Bool)
-> SchemaRegistryError
-> m (Either SchemaRegistryError Bool)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SchemaRegistryError -> Either SchemaRegistryError Bool
forall a b. a -> Either a b
Left (SchemaRegistryError -> m (Either SchemaRegistryError Bool))
-> SchemaRegistryError -> m (Either SchemaRegistryError Bool)
forall a b. (a -> b) -> a -> b
$ [Char] -> SomeException -> SchemaRegistryError
wrapErrorWithUrl [Char]
url SomeException
exc
Right Response ByteString
resp -> do
let wrapped :: Either SchemaRegistryError Value
wrapped = (SomeException -> SchemaRegistryError)
-> (Response Value -> Value)
-> Either SomeException (Response Value)
-> Either SchemaRegistryError Value
forall a b c d. (a -> b) -> (c -> d) -> Either a c -> Either b d
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap SomeException -> SchemaRegistryError
wrapError (Getting Value (Response Value) Value -> Response Value -> Value
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting Value (Response Value) Value
forall body0 body1 (f :: * -> *).
Functor f =>
(body0 -> f body1) -> Response body0 -> f (Response body1)
Wreq.responseBody) (Response ByteString -> Either SomeException (Response Value)
forall (m :: * -> *).
MonadThrow m =>
Response ByteString -> m (Response Value)
Wreq.asValue Response ByteString
resp)
(SchemaRegistryError -> m (Either SchemaRegistryError Bool))
-> (Value -> m (Either SchemaRegistryError Bool))
-> Either SchemaRegistryError Value
-> m (Either SchemaRegistryError Bool)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (Either SchemaRegistryError Bool
-> m (Either SchemaRegistryError Bool)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either SchemaRegistryError Bool
-> m (Either SchemaRegistryError Bool))
-> (SchemaRegistryError -> Either SchemaRegistryError Bool)
-> SchemaRegistryError
-> m (Either SchemaRegistryError Bool)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SchemaRegistryError -> Either SchemaRegistryError Bool
forall a b. a -> Either a b
Left) Value -> m (Either SchemaRegistryError Bool)
forall (m :: * -> *) e. MonadIO m => Value -> m (Either e Bool)
getCompatibility Either SchemaRegistryError Value
wrapped
where
getCompatibility :: MonadIO m => Value -> m (Either e Bool)
getCompatibility :: forall (m :: * -> *) e. MonadIO m => Value -> m (Either e Bool)
getCompatibility = IO (Either e Bool) -> m (Either e Bool)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either e Bool) -> m (Either e Bool))
-> (Value -> IO (Either e Bool)) -> Value -> m (Either e Bool)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Either e Bool)
-> (Bool -> IO (Either e Bool)) -> Maybe Bool -> IO (Either e Bool)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (JSONError -> IO (Either e Bool)
forall e a. Exception e => e -> IO a
throwIO (JSONError -> IO (Either e Bool))
-> JSONError -> IO (Either e Bool)
forall a b. (a -> b) -> a -> b
$ [Char] -> JSONError
Wreq.JSONError [Char]
"Missing key 'is_compatible' in Schema Registry response") (Either e Bool -> IO (Either e Bool)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either e Bool -> IO (Either e Bool))
-> (Bool -> Either e Bool) -> Bool -> IO (Either e Bool)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Bool -> Either e Bool
forall a. a -> Either e a
forall (m :: * -> *) a. Monad m => a -> m a
return) (Maybe Bool -> IO (Either e Bool))
-> (Value -> Maybe Bool) -> Value -> IO (Either e Bool)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Value -> Maybe Bool
viewCompatibility
viewCompatibility :: Value -> Maybe Bool
viewCompatibility :: Value -> Maybe Bool
viewCompatibility (Object Object
obj) = Key -> Object -> Maybe Value
forall v. Key -> KeyMap v -> Maybe v
KM.lookup Key
"is_compatible" Object
obj Maybe Value -> (Value -> Maybe Bool) -> Maybe Bool
forall a b. Maybe a -> (a -> Maybe b) -> Maybe b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Value -> Maybe Bool
toBool
viewCompatibility Value
_ = Maybe Bool
forall a. Maybe a
Nothing
toBool :: Value -> Maybe Bool
toBool :: Value -> Maybe Bool
toBool (Bool Bool
b) = Bool -> Maybe Bool
forall a. a -> Maybe a
Just Bool
b
toBool Value
_ = Maybe Bool
forall a. Maybe a
Nothing
getGlobalConfig :: MonadIO m => SchemaRegistry -> m (Either SchemaRegistryError Compatibility)
getGlobalConfig :: forall (m :: * -> *).
MonadIO m =>
SchemaRegistry -> m (Either SchemaRegistryError Compatibility)
getGlobalConfig SchemaRegistry
sr = do
let url :: [Char]
url = SchemaRegistry -> [Char]
srBaseUrl SchemaRegistry
sr [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"/config"
Either SomeException (Response ByteString)
respE <- IO (Either SomeException (Response ByteString))
-> m (Either SomeException (Response ByteString))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SomeException (Response ByteString))
-> m (Either SomeException (Response ByteString)))
-> (IO (Response ByteString)
-> IO (Either SomeException (Response ByteString)))
-> IO (Response ByteString)
-> m (Either SomeException (Response ByteString))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Response ByteString)
-> IO (Either SomeException (Response ByteString))
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (IO (Response ByteString)
-> m (Either SomeException (Response ByteString)))
-> IO (Response ByteString)
-> m (Either SomeException (Response ByteString))
forall a b. (a -> b) -> a -> b
$ Options -> [Char] -> IO (Response ByteString)
Wreq.getWith (SchemaRegistry -> Options
wreqOpts SchemaRegistry
sr) [Char]
url
Either SchemaRegistryError Compatibility
-> m (Either SchemaRegistryError Compatibility)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either SchemaRegistryError Compatibility
-> m (Either SchemaRegistryError Compatibility))
-> Either SchemaRegistryError Compatibility
-> m (Either SchemaRegistryError Compatibility)
forall a b. (a -> b) -> a -> b
$ case Either SomeException (Response ByteString)
respE of
Left SomeException
exc -> SchemaRegistryError -> Either SchemaRegistryError Compatibility
forall a b. a -> Either a b
Left (SchemaRegistryError -> Either SchemaRegistryError Compatibility)
-> SchemaRegistryError -> Either SchemaRegistryError Compatibility
forall a b. (a -> b) -> a -> b
$ SomeException -> SchemaRegistryError
wrapError SomeException
exc
Right Response ByteString
resp -> (SomeException -> SchemaRegistryError)
-> (Response Compatibility -> Compatibility)
-> Either SomeException (Response Compatibility)
-> Either SchemaRegistryError Compatibility
forall a b c d. (a -> b) -> (c -> d) -> Either a c -> Either b d
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap SomeException -> SchemaRegistryError
wrapError (Getting Compatibility (Response Compatibility) Compatibility
-> Response Compatibility -> Compatibility
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting Compatibility (Response Compatibility) Compatibility
forall body0 body1 (f :: * -> *).
Functor f =>
(body0 -> f body1) -> Response body0 -> f (Response body1)
Wreq.responseBody) (Response ByteString
-> Either SomeException (Response Compatibility)
forall (m :: * -> *) a.
(MonadThrow m, FromJSON a) =>
Response ByteString -> m (Response a)
Wreq.asJSON Response ByteString
resp)
getSubjectConfig :: MonadIO m => SchemaRegistry -> Subject -> m (Either SchemaRegistryError Compatibility)
getSubjectConfig :: forall (m :: * -> *).
MonadIO m =>
SchemaRegistry
-> Subject -> m (Either SchemaRegistryError Compatibility)
getSubjectConfig SchemaRegistry
sr subj :: Subject
subj@(Subject Text
sbj) = IO (Either SchemaRegistryError Compatibility)
-> m (Either SchemaRegistryError Compatibility)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SchemaRegistryError Compatibility)
-> m (Either SchemaRegistryError Compatibility))
-> (ExceptT SchemaRegistryError IO Compatibility
-> IO (Either SchemaRegistryError Compatibility))
-> ExceptT SchemaRegistryError IO Compatibility
-> m (Either SchemaRegistryError Compatibility)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT SchemaRegistryError IO Compatibility
-> IO (Either SchemaRegistryError Compatibility)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT SchemaRegistryError IO Compatibility
-> m (Either SchemaRegistryError Compatibility))
-> ExceptT SchemaRegistryError IO Compatibility
-> m (Either SchemaRegistryError Compatibility)
forall a b. (a -> b) -> a -> b
$ do
let url :: [Char]
url = SchemaRegistry -> [Char]
srBaseUrl SchemaRegistry
sr [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"/config/" [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Text -> [Char]
unpack Text
sbj
Response ByteString
resp <- (SomeException -> SchemaRegistryError)
-> IO (Response ByteString)
-> ExceptT SchemaRegistryError IO (Response ByteString)
forall (m :: * -> *) e a.
MonadCatch m =>
(SomeException -> e) -> m a -> ExceptT e m a
tryWith (Subject -> SomeException -> SchemaRegistryError
wrapErrorWithSubject Subject
subj) (IO (Response ByteString)
-> ExceptT SchemaRegistryError IO (Response ByteString))
-> IO (Response ByteString)
-> ExceptT SchemaRegistryError IO (Response ByteString)
forall a b. (a -> b) -> a -> b
$ Options -> [Char] -> IO (Response ByteString)
Wreq.getWith (SchemaRegistry -> Options
wreqOpts SchemaRegistry
sr) [Char]
url
Either SchemaRegistryError Compatibility
-> ExceptT SchemaRegistryError IO Compatibility
forall (m :: * -> *) e a. Monad m => Either e a -> ExceptT e m a
except (Either SchemaRegistryError Compatibility
-> ExceptT SchemaRegistryError IO Compatibility)
-> Either SchemaRegistryError Compatibility
-> ExceptT SchemaRegistryError IO Compatibility
forall a b. (a -> b) -> a -> b
$ (SomeException -> SchemaRegistryError)
-> (Response Compatibility -> Compatibility)
-> Either SomeException (Response Compatibility)
-> Either SchemaRegistryError Compatibility
forall a b c d. (a -> b) -> (c -> d) -> Either a c -> Either b d
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap SomeException -> SchemaRegistryError
wrapError (Getting Compatibility (Response Compatibility) Compatibility
-> Response Compatibility -> Compatibility
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting Compatibility (Response Compatibility) Compatibility
forall body0 body1 (f :: * -> *).
Functor f =>
(body0 -> f body1) -> Response body0 -> f (Response body1)
Wreq.responseBody) (Response ByteString
-> Either SomeException (Response Compatibility)
forall (m :: * -> *) a.
(MonadThrow m, FromJSON a) =>
Response ByteString -> m (Response a)
Wreq.asJSON Response ByteString
resp)
getSubjects :: MonadIO m => SchemaRegistry -> m (Either SchemaRegistryError [Subject])
getSubjects :: forall (m :: * -> *).
MonadIO m =>
SchemaRegistry -> m (Either SchemaRegistryError [Subject])
getSubjects SchemaRegistry
sr = IO (Either SchemaRegistryError [Subject])
-> m (Either SchemaRegistryError [Subject])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SchemaRegistryError [Subject])
-> m (Either SchemaRegistryError [Subject]))
-> (ExceptT SchemaRegistryError IO [Subject]
-> IO (Either SchemaRegistryError [Subject]))
-> ExceptT SchemaRegistryError IO [Subject]
-> m (Either SchemaRegistryError [Subject])
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT SchemaRegistryError IO [Subject]
-> IO (Either SchemaRegistryError [Subject])
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT SchemaRegistryError IO [Subject]
-> m (Either SchemaRegistryError [Subject]))
-> ExceptT SchemaRegistryError IO [Subject]
-> m (Either SchemaRegistryError [Subject])
forall a b. (a -> b) -> a -> b
$ do
let url :: [Char]
url = SchemaRegistry -> [Char]
srBaseUrl SchemaRegistry
sr [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"/subjects"
Response ByteString
resp <- (SomeException -> SchemaRegistryError)
-> IO (Response ByteString)
-> ExceptT SchemaRegistryError IO (Response ByteString)
forall (m :: * -> *) e a.
MonadCatch m =>
(SomeException -> e) -> m a -> ExceptT e m a
tryWith SomeException -> SchemaRegistryError
wrapError (IO (Response ByteString)
-> ExceptT SchemaRegistryError IO (Response ByteString))
-> IO (Response ByteString)
-> ExceptT SchemaRegistryError IO (Response ByteString)
forall a b. (a -> b) -> a -> b
$ Options -> [Char] -> IO (Response ByteString)
Wreq.getWith (SchemaRegistry -> Options
wreqOpts SchemaRegistry
sr) [Char]
url
Either SchemaRegistryError [Subject]
-> ExceptT SchemaRegistryError IO [Subject]
forall (m :: * -> *) e a. Monad m => Either e a -> ExceptT e m a
except (Either SchemaRegistryError [Subject]
-> ExceptT SchemaRegistryError IO [Subject])
-> Either SchemaRegistryError [Subject]
-> ExceptT SchemaRegistryError IO [Subject]
forall a b. (a -> b) -> a -> b
$ (SomeException -> SchemaRegistryError)
-> (Response [Text] -> [Subject])
-> Either SomeException (Response [Text])
-> Either SchemaRegistryError [Subject]
forall a b c d. (a -> b) -> (c -> d) -> Either a c -> Either b d
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap SomeException -> SchemaRegistryError
wrapError ((Text -> Subject) -> [Text] -> [Subject]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Text -> Subject
Subject ([Text] -> [Subject])
-> (Response [Text] -> [Text]) -> Response [Text] -> [Subject]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Getting [Text] (Response [Text]) [Text]
-> Response [Text] -> [Text]
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting [Text] (Response [Text]) [Text]
forall body0 body1 (f :: * -> *).
Functor f =>
(body0 -> f body1) -> Response body0 -> f (Response body1)
Wreq.responseBody) (Response ByteString -> Either SomeException (Response [Text])
forall (m :: * -> *) a.
(MonadThrow m, FromJSON a) =>
Response ByteString -> m (Response a)
Wreq.asJSON Response ByteString
resp)
wreqOpts :: SchemaRegistry -> Wreq.Options
wreqOpts :: SchemaRegistry -> Options
wreqOpts SchemaRegistry
sr =
let
accept :: [ByteString]
accept = [ByteString
"application/vnd.schemaregistry.v1+json", ByteString
"application/vnd.schemaregistry+json", ByteString
"application/json"]
acceptHeader :: Options -> Options
acceptHeader = HeaderName -> Lens' Options [ByteString]
Wreq.header HeaderName
"Accept" (([ByteString] -> Identity [ByteString])
-> Options -> Identity Options)
-> [ByteString] -> Options -> Options
forall s t a b. ASetter s t a b -> b -> s -> t
.~ [ByteString]
accept
authHeader :: Options -> Options
authHeader = (Maybe Auth -> Identity (Maybe Auth))
-> Options -> Identity Options
Lens' Options (Maybe Auth)
Wreq.auth ((Maybe Auth -> Identity (Maybe Auth))
-> Options -> Identity Options)
-> Maybe Auth -> Options -> Options
forall s t a b. ASetter s t a b -> b -> s -> t
.~ SchemaRegistryConfig -> Maybe Auth
cAuth (SchemaRegistry -> SchemaRegistryConfig
srConfig SchemaRegistry
sr)
extraHeaders :: Options -> Options
extraHeaders = ([Header] -> Identity [Header]) -> Options -> Identity Options
Lens' Options [Header]
Wreq.headers (([Header] -> Identity [Header]) -> Options -> Identity Options)
-> ([Header] -> [Header]) -> Options -> Options
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
%~ ([Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++ SchemaRegistryConfig -> [Header]
cExtraHeaders (SchemaRegistry -> SchemaRegistryConfig
srConfig SchemaRegistry
sr))
in Options
Wreq.defaults Options -> (Options -> Options) -> Options
forall a b. a -> (a -> b) -> b
& Options -> Options
acceptHeader Options -> (Options -> Options) -> Options
forall a b. a -> (a -> b) -> b
& Options -> Options
authHeader Options -> (Options -> Options) -> Options
forall a b. a -> (a -> b) -> b
& Options -> Options
extraHeaders
getSchemaById :: SchemaRegistry -> SchemaId -> IO (Either SchemaRegistryError RegisteredSchema)
getSchemaById :: SchemaRegistry
-> SchemaId -> IO (Either SchemaRegistryError RegisteredSchema)
getSchemaById SchemaRegistry
sr sid :: SchemaId
sid@(SchemaId Int32
i) = ExceptT SchemaRegistryError IO RegisteredSchema
-> IO (Either SchemaRegistryError RegisteredSchema)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT SchemaRegistryError IO RegisteredSchema
-> IO (Either SchemaRegistryError RegisteredSchema))
-> ExceptT SchemaRegistryError IO RegisteredSchema
-> IO (Either SchemaRegistryError RegisteredSchema)
forall a b. (a -> b) -> a -> b
$ do
let
baseUrl :: [Char]
baseUrl = SchemaRegistry -> [Char]
srBaseUrl SchemaRegistry
sr
schemaUrl :: [Char]
schemaUrl = [Char]
baseUrl [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"/schemas/ids/" [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int32 -> [Char]
forall a. Show a => a -> [Char]
show Int32
i
Response ByteString
resp <- (SomeException -> SchemaRegistryError)
-> IO (Response ByteString)
-> ExceptT SchemaRegistryError IO (Response ByteString)
forall (m :: * -> *) e a.
MonadCatch m =>
(SomeException -> e) -> m a -> ExceptT e m a
tryWith (SchemaId -> SomeException -> SchemaRegistryError
wrapErrorWithSchemaId SchemaId
sid) (IO (Response ByteString)
-> ExceptT SchemaRegistryError IO (Response ByteString))
-> IO (Response ByteString)
-> ExceptT SchemaRegistryError IO (Response ByteString)
forall a b. (a -> b) -> a -> b
$ Options -> [Char] -> IO (Response ByteString)
Wreq.getWith (SchemaRegistry -> Options
wreqOpts SchemaRegistry
sr) [Char]
schemaUrl
Either SchemaRegistryError RegisteredSchema
-> ExceptT SchemaRegistryError IO RegisteredSchema
forall (m :: * -> *) e a. Monad m => Either e a -> ExceptT e m a
except (Either SchemaRegistryError RegisteredSchema
-> ExceptT SchemaRegistryError IO RegisteredSchema)
-> Either SchemaRegistryError RegisteredSchema
-> ExceptT SchemaRegistryError IO RegisteredSchema
forall a b. (a -> b) -> a -> b
$ (SomeException -> SchemaRegistryError)
-> (Response RegisteredSchema -> RegisteredSchema)
-> Either SomeException (Response RegisteredSchema)
-> Either SchemaRegistryError RegisteredSchema
forall a b c d. (a -> b) -> (c -> d) -> Either a c -> Either b d
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap (SchemaRegistryError -> SomeException -> SchemaRegistryError
forall a b. a -> b -> a
const (SchemaId -> SchemaRegistryError
SchemaRegistryLoadError SchemaId
sid)) (Getting
RegisteredSchema (Response RegisteredSchema) RegisteredSchema
-> Response RegisteredSchema -> RegisteredSchema
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting
RegisteredSchema (Response RegisteredSchema) RegisteredSchema
forall body0 body1 (f :: * -> *).
Functor f =>
(body0 -> f body1) -> Response body0 -> f (Response body1)
Wreq.responseBody) (Response ByteString
-> Either SomeException (Response RegisteredSchema)
forall (m :: * -> *) a.
(MonadThrow m, FromJSON a) =>
Response ByteString -> m (Response a)
Wreq.asJSON Response ByteString
resp)
putSchema :: SchemaRegistry -> Subject -> RegisteredSchema -> IO (Either SchemaRegistryError SchemaId)
putSchema :: SchemaRegistry
-> Subject
-> RegisteredSchema
-> IO (Either SchemaRegistryError SchemaId)
putSchema SchemaRegistry
sr subj :: Subject
subj@(Subject Text
sbj) RegisteredSchema
schema = ExceptT SchemaRegistryError IO SchemaId
-> IO (Either SchemaRegistryError SchemaId)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT SchemaRegistryError IO SchemaId
-> IO (Either SchemaRegistryError SchemaId))
-> ExceptT SchemaRegistryError IO SchemaId
-> IO (Either SchemaRegistryError SchemaId)
forall a b. (a -> b) -> a -> b
$ do
let
baseUrl :: [Char]
baseUrl = SchemaRegistry -> [Char]
srBaseUrl SchemaRegistry
sr
schemaUrl :: [Char]
schemaUrl = [Char]
baseUrl [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"/subjects/" [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Text -> [Char]
unpack Text
sbj [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"/versions"
Response ByteString
resp <- (SomeException -> SchemaRegistryError)
-> IO (Response ByteString)
-> ExceptT SchemaRegistryError IO (Response ByteString)
forall (m :: * -> *) e a.
MonadCatch m =>
(SomeException -> e) -> m a -> ExceptT e m a
tryWith (Subject -> SomeException -> SchemaRegistryError
wrapErrorWithSubject Subject
subj) (IO (Response ByteString)
-> ExceptT SchemaRegistryError IO (Response ByteString))
-> IO (Response ByteString)
-> ExceptT SchemaRegistryError IO (Response ByteString)
forall a b. (a -> b) -> a -> b
$ Options -> [Char] -> Value -> IO (Response ByteString)
forall a.
Postable a =>
Options -> [Char] -> a -> IO (Response ByteString)
Wreq.postWith (SchemaRegistry -> Options
wreqOpts SchemaRegistry
sr) [Char]
schemaUrl (RegisteredSchema -> Value
forall a. ToJSON a => a -> Value
toJSON RegisteredSchema
schema)
Either SchemaRegistryError SchemaId
-> ExceptT SchemaRegistryError IO SchemaId
forall (m :: * -> *) e a. Monad m => Either e a -> ExceptT e m a
except (Either SchemaRegistryError SchemaId
-> ExceptT SchemaRegistryError IO SchemaId)
-> Either SchemaRegistryError SchemaId
-> ExceptT SchemaRegistryError IO SchemaId
forall a b. (a -> b) -> a -> b
$ (SomeException -> SchemaRegistryError)
-> (Response SchemaId -> SchemaId)
-> Either SomeException (Response SchemaId)
-> Either SchemaRegistryError SchemaId
forall a b c d. (a -> b) -> (c -> d) -> Either a c -> Either b d
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap SomeException -> SchemaRegistryError
wrapError (Getting SchemaId (Response SchemaId) SchemaId
-> Response SchemaId -> SchemaId
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting SchemaId (Response SchemaId) SchemaId
forall body0 body1 (f :: * -> *).
Functor f =>
(body0 -> f body1) -> Response body0 -> f (Response body1)
Wreq.responseBody) (Response ByteString -> Either SomeException (Response SchemaId)
forall (m :: * -> *) a.
(MonadThrow m, FromJSON a) =>
Response ByteString -> m (Response a)
Wreq.asJSON Response ByteString
resp)
fromHttpError :: HttpException -> (HttpExceptionContent -> SchemaRegistryError) -> SchemaRegistryError
fromHttpError :: HttpException
-> (HttpExceptionContent -> SchemaRegistryError)
-> SchemaRegistryError
fromHttpError HttpException
err HttpExceptionContent -> SchemaRegistryError
f = case HttpException
err of
InvalidUrlException [Char]
fld [Char]
err' -> [Char] -> SchemaRegistryError
SchemaRegistryConnectError ([Char]
fld [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
": " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
err')
HttpExceptionRequest Request
_ (ConnectionFailure SomeException
err) -> [Char] -> SchemaRegistryError
SchemaRegistryConnectError (SomeException -> [Char]
forall e. Exception e => e -> [Char]
displayException SomeException
err)
HttpExceptionRequest Request
_ HttpExceptionContent
ConnectionTimeout -> [Char] -> SchemaRegistryError
SchemaRegistryConnectError (HttpException -> [Char]
forall e. Exception e => e -> [Char]
displayException HttpException
err)
HttpExceptionRequest Request
_ ProxyConnectException{} -> [Char] -> SchemaRegistryError
SchemaRegistryConnectError (HttpException -> [Char]
forall e. Exception e => e -> [Char]
displayException HttpException
err)
HttpExceptionRequest Request
_ HttpExceptionContent
ConnectionClosed -> [Char] -> SchemaRegistryError
SchemaRegistryConnectError (HttpException -> [Char]
forall e. Exception e => e -> [Char]
displayException HttpException
err)
HttpExceptionRequest Request
_ (InvalidDestinationHost ByteString
_) -> [Char] -> SchemaRegistryError
SchemaRegistryConnectError (HttpException -> [Char]
forall e. Exception e => e -> [Char]
displayException HttpException
err)
HttpExceptionRequest Request
_ HttpExceptionContent
TlsNotSupported -> [Char] -> SchemaRegistryError
SchemaRegistryConnectError (HttpException -> [Char]
forall e. Exception e => e -> [Char]
displayException HttpException
err)
HttpExceptionRequest Request
_ (InvalidProxySettings Text
_) -> [Char] -> SchemaRegistryError
SchemaRegistryConnectError (HttpException -> [Char]
forall e. Exception e => e -> [Char]
displayException HttpException
err)
HttpExceptionRequest Request
_ HttpExceptionContent
err' -> HttpExceptionContent -> SchemaRegistryError
f HttpExceptionContent
err'
wrapError :: SomeException -> SchemaRegistryError
wrapError :: SomeException -> SchemaRegistryError
wrapError SomeException
someErr = case SomeException -> Maybe HttpException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
someErr of
Maybe HttpException
Nothing -> [Char] -> SchemaRegistryError
SchemaRegistrySendError (SomeException -> [Char]
forall e. Exception e => e -> [Char]
displayException SomeException
someErr)
Just HttpException
httpErr -> HttpException
-> (HttpExceptionContent -> SchemaRegistryError)
-> SchemaRegistryError
fromHttpError HttpException
httpErr (\HttpExceptionContent
_ -> [Char] -> SchemaRegistryError
SchemaRegistrySendError (SomeException -> [Char]
forall e. Exception e => e -> [Char]
displayException SomeException
someErr))
wrapErrorWithSchemaId :: SchemaId -> SomeException -> SchemaRegistryError
wrapErrorWithSchemaId :: SchemaId -> SomeException -> SchemaRegistryError
wrapErrorWithSchemaId = (SchemaId -> SchemaRegistryError)
-> SchemaId -> SomeException -> SchemaRegistryError
forall a.
(a -> SchemaRegistryError)
-> a -> SomeException -> SchemaRegistryError
wrapErrorWith SchemaId -> SchemaRegistryError
SchemaRegistrySchemaNotFound
wrapErrorWithSubject :: Subject -> SomeException -> SchemaRegistryError
wrapErrorWithSubject :: Subject -> SomeException -> SchemaRegistryError
wrapErrorWithSubject = (Subject -> SchemaRegistryError)
-> Subject -> SomeException -> SchemaRegistryError
forall a.
(a -> SchemaRegistryError)
-> a -> SomeException -> SchemaRegistryError
wrapErrorWith Subject -> SchemaRegistryError
SchemaRegistrySubjectNotFound
wrapErrorWithUrl :: String -> SomeException -> SchemaRegistryError
wrapErrorWithUrl :: [Char] -> SomeException -> SchemaRegistryError
wrapErrorWithUrl = ([Char] -> SchemaRegistryError)
-> [Char] -> SomeException -> SchemaRegistryError
forall a.
(a -> SchemaRegistryError)
-> a -> SomeException -> SchemaRegistryError
wrapErrorWith [Char] -> SchemaRegistryError
SchemaRegistryUrlNotFound
wrapErrorWith :: (a -> SchemaRegistryError) -> a -> SomeException -> SchemaRegistryError
wrapErrorWith :: forall a.
(a -> SchemaRegistryError)
-> a -> SomeException -> SchemaRegistryError
wrapErrorWith a -> SchemaRegistryError
mkError a
x SomeException
exception = case SomeException -> Maybe HttpException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exception of
Just (HttpExceptionRequest Request
_ (StatusCodeException Response ()
response ByteString
_)) | Response () -> Status
forall body. Response body -> Status
responseStatus Response ()
response Status -> Status -> Bool
forall a. Eq a => a -> a -> Bool
== Status
notFound404 -> a -> SchemaRegistryError
mkError a
x
Maybe HttpException
_ -> SomeException -> SchemaRegistryError
wrapError SomeException
exception
tryWith :: MonadCatch m => (SomeException -> e) -> m a -> ExceptT e m a
tryWith :: forall (m :: * -> *) e a.
MonadCatch m =>
(SomeException -> e) -> m a -> ExceptT e m a
tryWith SomeException -> e
wrapException = (SomeException -> e) -> ExceptT SomeException m a -> ExceptT e m a
forall (m :: * -> *) e e' a.
Functor m =>
(e -> e') -> ExceptT e m a -> ExceptT e' m a
withExceptT SomeException -> e
wrapException (ExceptT SomeException m a -> ExceptT e m a)
-> (m a -> ExceptT SomeException m a) -> m a -> ExceptT e m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m (Either SomeException a) -> ExceptT SomeException m a
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (m (Either SomeException a) -> ExceptT SomeException m a)
-> (m a -> m (Either SomeException a))
-> m a
-> ExceptT SomeException m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> m (Either SomeException a)
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
try
fullTypeName :: Schema -> SchemaName
fullTypeName :: Schema -> SchemaName
fullTypeName Schema
r = Text -> SchemaName
SchemaName (Text -> SchemaName) -> Text -> SchemaName
forall a b. (a -> b) -> a -> b
$ Schema -> Text
typeName Schema
r
cachedSchema :: MonadIO m => SchemaRegistry -> SchemaId -> m (Maybe Schema)
cachedSchema :: forall (m :: * -> *).
MonadIO m =>
SchemaRegistry -> SchemaId -> m (Maybe Schema)
cachedSchema SchemaRegistry
sr SchemaId
k = IO (Maybe Schema) -> m (Maybe Schema)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe Schema) -> m (Maybe Schema))
-> IO (Maybe Schema) -> m (Maybe Schema)
forall a b. (a -> b) -> a -> b
$ Cache SchemaId Schema -> SchemaId -> IO (Maybe Schema)
forall k v. (Eq k, Hashable k) => Cache k v -> k -> IO (Maybe v)
C.lookup (SchemaRegistry -> Cache SchemaId Schema
srCache SchemaRegistry
sr) SchemaId
k
{-# INLINE cachedSchema #-}
cacheSchema :: MonadIO m => SchemaRegistry -> SchemaId -> Schema -> m ()
cacheSchema :: forall (m :: * -> *).
MonadIO m =>
SchemaRegistry -> SchemaId -> Schema -> m ()
cacheSchema SchemaRegistry
sr SchemaId
k Schema
v = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Cache SchemaId Schema -> SchemaId -> Schema -> IO ()
forall k v. (Eq k, Hashable k) => Cache k v -> k -> v -> IO ()
C.insert (SchemaRegistry -> Cache SchemaId Schema
srCache SchemaRegistry
sr) SchemaId
k Schema
v
{-# INLINE cacheSchema #-}
cachedId :: MonadIO m => SchemaRegistry -> Subject -> SchemaName -> m (Maybe SchemaId)
cachedId :: forall (m :: * -> *).
MonadIO m =>
SchemaRegistry -> Subject -> SchemaName -> m (Maybe SchemaId)
cachedId SchemaRegistry
sr Subject
subj SchemaName
scn = IO (Maybe SchemaId) -> m (Maybe SchemaId)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe SchemaId) -> m (Maybe SchemaId))
-> IO (Maybe SchemaId) -> m (Maybe SchemaId)
forall a b. (a -> b) -> a -> b
$ Cache (Subject, SchemaName) SchemaId
-> (Subject, SchemaName) -> IO (Maybe SchemaId)
forall k v. (Eq k, Hashable k) => Cache k v -> k -> IO (Maybe v)
C.lookup (SchemaRegistry -> Cache (Subject, SchemaName) SchemaId
srReverseCache SchemaRegistry
sr) (Subject
subj, SchemaName
scn)
{-# INLINE cachedId #-}
cacheId :: MonadIO m => SchemaRegistry -> Subject -> SchemaName -> SchemaId -> m ()
cacheId :: forall (m :: * -> *).
MonadIO m =>
SchemaRegistry -> Subject -> SchemaName -> SchemaId -> m ()
cacheId SchemaRegistry
sr Subject
subj SchemaName
scn SchemaId
sid = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Cache (Subject, SchemaName) SchemaId
-> (Subject, SchemaName) -> SchemaId -> IO ()
forall k v. (Eq k, Hashable k) => Cache k v -> k -> v -> IO ()
C.insert (SchemaRegistry -> Cache (Subject, SchemaName) SchemaId
srReverseCache SchemaRegistry
sr) (Subject
subj, SchemaName
scn) SchemaId
sid
{-# INLINE cacheId #-}
instance FromJSON RegisteredSchema where
parseJSON :: Value -> Parser RegisteredSchema
parseJSON (Object Object
v) =
[Char]
-> (Object -> Parser RegisteredSchema)
-> Value
-> Parser RegisteredSchema
forall a. [Char] -> (Object -> Parser a) -> Value -> Parser a
withObject [Char]
"expected schema" (\Object
obj -> do
Text
sch <- Object
obj Object -> Key -> Parser Text
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"schema"
Parser RegisteredSchema
-> (Schema -> Parser RegisteredSchema)
-> Maybe Schema
-> Parser RegisteredSchema
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Parser RegisteredSchema
forall a. Monoid a => a
mempty (RegisteredSchema -> Parser RegisteredSchema
forall a. a -> Parser a
forall (m :: * -> *) a. Monad m => a -> m a
return (RegisteredSchema -> Parser RegisteredSchema)
-> (Schema -> RegisteredSchema)
-> Schema
-> Parser RegisteredSchema
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Schema -> RegisteredSchema
RegisteredSchema) (ByteString -> Maybe Schema
forall a. FromJSON a => ByteString -> Maybe a
decode (ByteString -> Maybe Schema) -> ByteString -> Maybe Schema
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
LText.encodeUtf8 Text
sch)
) (Object -> Value
Object Object
v)
parseJSON Value
_ = Parser RegisteredSchema
forall a. Monoid a => a
mempty
instance ToJSON RegisteredSchema where
toJSON :: RegisteredSchema -> Value
toJSON (RegisteredSchema Schema
v) = [Pair] -> Value
object [Key
"schema" Key -> Text -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= ByteString -> Text
LText.decodeUtf8 (Value -> ByteString
forall a. ToJSON a => a -> ByteString
encode (Value -> ByteString) -> Value -> ByteString
forall a b. (a -> b) -> a -> b
$ Schema -> Value
forall a. ToJSON a => a -> Value
toJSON Schema
v)]
instance FromJSON SchemaId where
parseJSON :: Value -> Parser SchemaId
parseJSON (Object Object
v) = Int32 -> SchemaId
SchemaId (Int32 -> SchemaId) -> Parser Int32 -> Parser SchemaId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
v Object -> Key -> Parser Int32
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"id"
parseJSON Value
_ = Parser SchemaId
forall a. Monoid a => a
mempty
instance FromJSON Compatibility where
parseJSON :: Value -> Parser Compatibility
parseJSON = [Char]
-> (Object -> Parser Compatibility)
-> Value
-> Parser Compatibility
forall a. [Char] -> (Object -> Parser a) -> Value -> Parser a
withObject [Char]
"Compatibility" ((Object -> Parser Compatibility) -> Value -> Parser Compatibility)
-> (Object -> Parser Compatibility)
-> Value
-> Parser Compatibility
forall a b. (a -> b) -> a -> b
$ \Object
v -> do
Value
compatibility <- Object
v Object -> Key -> Parser Value
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"compatibilityLevel"
case Value
compatibility of
Value
"NONE" -> Compatibility -> Parser Compatibility
forall a. a -> Parser a
forall (m :: * -> *) a. Monad m => a -> m a
return (Compatibility -> Parser Compatibility)
-> Compatibility -> Parser Compatibility
forall a b. (a -> b) -> a -> b
$ Compatibility
NoCompatibility
Value
"FULL" -> Compatibility -> Parser Compatibility
forall a. a -> Parser a
forall (m :: * -> *) a. Monad m => a -> m a
return (Compatibility -> Parser Compatibility)
-> Compatibility -> Parser Compatibility
forall a b. (a -> b) -> a -> b
$ Compatibility
FullCompatibility
Value
"FORWARD" -> Compatibility -> Parser Compatibility
forall a. a -> Parser a
forall (m :: * -> *) a. Monad m => a -> m a
return (Compatibility -> Parser Compatibility)
-> Compatibility -> Parser Compatibility
forall a b. (a -> b) -> a -> b
$ Compatibility
ForwardCompatibility
Value
"BACKWARD" -> Compatibility -> Parser Compatibility
forall a. a -> Parser a
forall (m :: * -> *) a. Monad m => a -> m a
return (Compatibility -> Parser Compatibility)
-> Compatibility -> Parser Compatibility
forall a b. (a -> b) -> a -> b
$ Compatibility
BackwardCompatibility
Value
_ -> [Char] -> Value -> Parser Compatibility
forall a. [Char] -> Value -> Parser a
typeMismatch [Char]
"Compatibility" Value
compatibility