{-# OPTIONS_HADDOCK hide #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Network.Xmpp.Concurrent.Threads where
import Control.Applicative((<$>))
import Control.Concurrent
import Control.Concurrent.STM
import qualified Control.Exception.Lifted as Ex
import Control.Monad
import Control.Monad.Except
import Control.Monad.IO.Class
import qualified Data.ByteString as BS
import GHC.IO (unsafeUnmask)
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Stream
import Network.Xmpp.Types
import System.Log.Logger
readWorker :: (XmppElement -> IO ())
-> (XmppFailure -> IO ())
-> TMVar Stream
-> IO a
readWorker :: forall a.
(XmppElement -> IO ())
-> (XmppFailure -> IO ()) -> TMVar Stream -> IO a
readWorker XmppElement -> IO ()
onElement XmppFailure -> IO ()
onCClosed TMVar Stream
stateRef = IO () -> IO a
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO a) -> (IO () -> IO ()) -> IO () -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ()
forall (m :: * -> *) a. MonadBaseControl IO m => m a -> m a
Ex.mask_ (IO () -> IO a) -> IO () -> IO a
forall a b. (a -> b) -> a -> b
$ do
Maybe Stream
s' <- IO (Maybe Stream)
-> [Handler IO (Maybe Stream)] -> IO (Maybe Stream)
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> [Handler m a] -> m a
Ex.catches ( do
STM (Maybe Stream) -> IO (Maybe Stream)
forall a. STM a -> IO a
atomically (STM (Maybe Stream) -> IO (Maybe Stream))
-> STM (Maybe Stream) -> IO (Maybe Stream)
forall a b. (a -> b) -> a -> b
$ do
s :: Stream
s@(Stream TMVar StreamState
con) <- TMVar Stream -> STM Stream
forall a. TMVar a -> STM a
readTMVar TMVar Stream
stateRef
ConnectionState
scs <- StreamState -> ConnectionState
streamConnectionState (StreamState -> ConnectionState)
-> STM StreamState -> STM ConnectionState
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar StreamState -> STM StreamState
forall a. TMVar a -> STM a
readTMVar TMVar StreamState
con
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnectionState -> Bool
stateIsClosed ConnectionState
scs)
STM ()
forall a. STM a
retry
Maybe Stream -> STM (Maybe Stream)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe Stream -> STM (Maybe Stream))
-> Maybe Stream -> STM (Maybe Stream)
forall a b. (a -> b) -> a -> b
$ Stream -> Maybe Stream
forall a. a -> Maybe a
Just Stream
s
)
[ (Interrupt -> IO (Maybe Stream)) -> Handler IO (Maybe Stream)
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Ex.Handler ((Interrupt -> IO (Maybe Stream)) -> Handler IO (Maybe Stream))
-> (Interrupt -> IO (Maybe Stream)) -> Handler IO (Maybe Stream)
forall a b. (a -> b) -> a -> b
$ \(Interrupt TMVar ()
t) -> do
IO [()] -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO [()] -> IO ()) -> IO [()] -> IO ()
forall a b. (a -> b) -> a -> b
$ [TMVar ()] -> IO [()]
handleInterrupts [TMVar ()
t]
Maybe Stream -> IO (Maybe Stream)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Stream
forall a. Maybe a
Nothing
]
case Maybe Stream
s' of
Maybe Stream
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just Stream
s -> do
Maybe XmppElement
res <- IO (Maybe XmppElement)
-> [Handler IO (Maybe XmppElement)] -> IO (Maybe XmppElement)
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> [Handler m a] -> m a
Ex.catches (do
IO ()
allowInterrupt
Either XmppFailure XmppElement
res <- Stream -> IO (Either XmppFailure XmppElement)
pullXmppElement Stream
s
case Either XmppFailure XmppElement
res of
Left XmppFailure
e -> do
[Char] -> [Char] -> IO ()
errorM [Char]
"Pontarius.Xmpp" ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Read error: "
[Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ XmppFailure -> [Char]
forall a. Show a => a -> [Char]
show XmppFailure
e
()
_ <- Stream -> IO ()
closeStreams Stream
s
XmppFailure -> IO ()
onCClosed XmppFailure
e
Maybe XmppElement -> IO (Maybe XmppElement)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe XmppElement
forall a. Maybe a
Nothing
Right XmppElement
r -> Maybe XmppElement -> IO (Maybe XmppElement)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe XmppElement -> IO (Maybe XmppElement))
-> Maybe XmppElement -> IO (Maybe XmppElement)
forall a b. (a -> b) -> a -> b
$ XmppElement -> Maybe XmppElement
forall a. a -> Maybe a
Just XmppElement
r
)
[ (Interrupt -> IO (Maybe XmppElement))
-> Handler IO (Maybe XmppElement)
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Ex.Handler ((Interrupt -> IO (Maybe XmppElement))
-> Handler IO (Maybe XmppElement))
-> (Interrupt -> IO (Maybe XmppElement))
-> Handler IO (Maybe XmppElement)
forall a b. (a -> b) -> a -> b
$ \(Interrupt TMVar ()
t) -> do
IO [()] -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO [()] -> IO ()) -> IO [()] -> IO ()
forall a b. (a -> b) -> a -> b
$ [TMVar ()] -> IO [()]
handleInterrupts [TMVar ()
t]
Maybe XmppElement -> IO (Maybe XmppElement)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe XmppElement
forall a. Maybe a
Nothing
]
case Maybe XmppElement
res of
Maybe XmppElement
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just XmppElement
sta -> IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ XmppElement -> IO ()
onElement XmppElement
sta
where
allowInterrupt :: IO ()
allowInterrupt :: IO ()
allowInterrupt = IO () -> IO ()
forall a. IO a -> IO a
unsafeUnmask (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
handleInterrupts :: [TMVar ()] -> IO [()]
handleInterrupts :: [TMVar ()] -> IO [()]
handleInterrupts [TMVar ()]
ts =
IO [()] -> (Interrupt -> IO [()]) -> IO [()]
forall (m :: * -> *) e a.
(MonadBaseControl IO m, Exception e) =>
m a -> (e -> m a) -> m a
Ex.catch (STM [()] -> IO [()]
forall a. STM a -> IO a
atomically (STM [()] -> IO [()]) -> STM [()] -> IO [()]
forall a b. (a -> b) -> a -> b
$ [TMVar ()] -> (TMVar () -> STM ()) -> STM [()]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [TMVar ()]
ts TMVar () -> STM ()
forall a. TMVar a -> STM a
takeTMVar)
(\(Interrupt TMVar ()
t) -> [TMVar ()] -> IO [()]
handleInterrupts (TMVar ()
tTMVar () -> [TMVar ()] -> [TMVar ()]
forall a. a -> [a] -> [a]
:[TMVar ()]
ts))
stateIsClosed :: ConnectionState -> Bool
stateIsClosed ConnectionState
Closed = Bool
True
stateIsClosed ConnectionState
Finished = Bool
True
stateIsClosed ConnectionState
_ = Bool
False
startThreadsWith :: TMVar (BS.ByteString -> IO (Either XmppFailure ()))
-> (XmppElement -> IO ())
-> TMVar EventHandlers
-> Stream
-> Maybe Int
-> IO (Either XmppFailure (IO (),
TMVar Stream,
ThreadId))
startThreadsWith :: TMVar (ByteString -> IO (Either XmppFailure ()))
-> (XmppElement -> IO ())
-> TMVar EventHandlers
-> Stream
-> Maybe Int
-> IO (Either XmppFailure (IO (), TMVar Stream, ThreadId))
startThreadsWith TMVar (ByteString -> IO (Either XmppFailure ()))
writeSem XmppElement -> IO ()
stanzaHandler TMVar EventHandlers
eh Stream
con Maybe Int
keepAlive = do
TMVar Stream
conS <- Stream -> IO (TMVar Stream)
forall a. a -> IO (TMVar a)
newTMVarIO Stream
con
ThreadId
cp <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Maybe Int
-> TMVar (ByteString -> IO (Either XmppFailure ())) -> IO ()
forall a. Maybe Int -> TMVar (ByteString -> IO a) -> IO ()
connPersist Maybe Int
keepAlive TMVar (ByteString -> IO (Either XmppFailure ()))
writeSem
let onConClosed :: XmppFailure -> IO ()
onConClosed XmppFailure
failure = do
IO ()
stopWrites
TMVar EventHandlers -> XmppFailure -> IO ()
noCon TMVar EventHandlers
eh XmppFailure
failure
ThreadId
rdw <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ (XmppElement -> IO ())
-> (XmppFailure -> IO ()) -> TMVar Stream -> IO ()
forall a.
(XmppElement -> IO ())
-> (XmppFailure -> IO ()) -> TMVar Stream -> IO a
readWorker XmppElement -> IO ()
stanzaHandler XmppFailure -> IO ()
onConClosed TMVar Stream
conS
Either XmppFailure (IO (), TMVar Stream, ThreadId)
-> IO (Either XmppFailure (IO (), TMVar Stream, ThreadId))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either XmppFailure (IO (), TMVar Stream, ThreadId)
-> IO (Either XmppFailure (IO (), TMVar Stream, ThreadId)))
-> Either XmppFailure (IO (), TMVar Stream, ThreadId)
-> IO (Either XmppFailure (IO (), TMVar Stream, ThreadId))
forall a b. (a -> b) -> a -> b
$ (IO (), TMVar Stream, ThreadId)
-> Either XmppFailure (IO (), TMVar Stream, ThreadId)
forall a b. b -> Either a b
Right ( [ThreadId] -> IO ()
forall {m :: * -> *} {t :: * -> *}.
(MonadIO m, Traversable t) =>
t ThreadId -> m ()
killConnection [ThreadId
rdw, ThreadId
cp]
, TMVar Stream
conS
, ThreadId
rdw
)
where
stopWrites :: IO ()
stopWrites = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
ByteString -> IO (Either XmppFailure ())
_ <- TMVar (ByteString -> IO (Either XmppFailure ()))
-> STM (ByteString -> IO (Either XmppFailure ()))
forall a. TMVar a -> STM a
takeTMVar TMVar (ByteString -> IO (Either XmppFailure ()))
writeSem
TMVar (ByteString -> IO (Either XmppFailure ()))
-> (ByteString -> IO (Either XmppFailure ())) -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (ByteString -> IO (Either XmppFailure ()))
writeSem ((ByteString -> IO (Either XmppFailure ())) -> STM ())
-> (ByteString -> IO (Either XmppFailure ())) -> STM ()
forall a b. (a -> b) -> a -> b
$ \ByteString
_ -> Either XmppFailure () -> IO (Either XmppFailure ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either XmppFailure () -> IO (Either XmppFailure ()))
-> Either XmppFailure () -> IO (Either XmppFailure ())
forall a b. (a -> b) -> a -> b
$ XmppFailure -> Either XmppFailure ()
forall a b. a -> Either a b
Left XmppFailure
XmppNoStream
killConnection :: t ThreadId -> m ()
killConnection t ThreadId
threads = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
[Char] -> [Char] -> IO ()
debugM [Char]
"Pontarius.Xmpp" [Char]
"killing connection"
IO ()
stopWrites
[Char] -> [Char] -> IO ()
debugM [Char]
"Pontarius.Xmpp" [Char]
"killing threads"
t ()
_ <- t ThreadId -> (ThreadId -> IO ()) -> IO (t ())
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM t ThreadId
threads ThreadId -> IO ()
killThread
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
noCon :: TMVar EventHandlers -> XmppFailure -> IO ()
noCon :: TMVar EventHandlers -> XmppFailure -> IO ()
noCon TMVar EventHandlers
h XmppFailure
e = do
EventHandlers
hands <- STM EventHandlers -> IO EventHandlers
forall a. STM a -> IO a
atomically (STM EventHandlers -> IO EventHandlers)
-> STM EventHandlers -> IO EventHandlers
forall a b. (a -> b) -> a -> b
$ TMVar EventHandlers -> STM EventHandlers
forall a. TMVar a -> STM a
readTMVar TMVar EventHandlers
h
ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ EventHandlers -> XmppFailure -> IO ()
connectionClosedHandler EventHandlers
hands XmppFailure
e
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
connPersist :: Maybe Int -> TMVar (BS.ByteString -> IO a) -> IO ()
connPersist :: forall a. Maybe Int -> TMVar (ByteString -> IO a) -> IO ()
connPersist (Just Int
delay) TMVar (ByteString -> IO a)
sem = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
ByteString -> IO a
pushBS <- STM (ByteString -> IO a) -> IO (ByteString -> IO a)
forall a. STM a -> IO a
atomically (STM (ByteString -> IO a) -> IO (ByteString -> IO a))
-> STM (ByteString -> IO a) -> IO (ByteString -> IO a)
forall a b. (a -> b) -> a -> b
$ TMVar (ByteString -> IO a) -> STM (ByteString -> IO a)
forall a. TMVar a -> STM a
takeTMVar TMVar (ByteString -> IO a)
sem
a
_ <- ByteString -> IO a
pushBS ByteString
" "
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (ByteString -> IO a) -> (ByteString -> IO a) -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (ByteString -> IO a)
sem ByteString -> IO a
pushBS
Int -> IO ()
threadDelay (Int
delayInt -> Int -> Int
forall a. Num a => a -> a -> a
*Int
1000000)
connPersist Maybe Int
Nothing TMVar (ByteString -> IO a)
_ = () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()