{-# OPTIONS_HADDOCK hide #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Network.Xmpp.Concurrent.Threads where

import           Control.Applicative((<$>))
import           Control.Concurrent
import           Control.Concurrent.STM
import qualified Control.Exception.Lifted as Ex
import           Control.Monad
import           Control.Monad.Except
import           Control.Monad.IO.Class
import qualified Data.ByteString as BS
import           GHC.IO (unsafeUnmask)
import           Network.Xmpp.Concurrent.Types
import           Network.Xmpp.Stream
import           Network.Xmpp.Types
import           System.Log.Logger

-- Worker to read stanzas from the stream and concurrently distribute them to
-- all listener threads.
readWorker :: (XmppElement -> IO ())
           -> (XmppFailure -> IO ())
           -> TMVar Stream
           -> IO a
readWorker :: forall a.
(XmppElement -> IO ())
-> (XmppFailure -> IO ()) -> TMVar Stream -> IO a
readWorker XmppElement -> IO ()
onElement XmppFailure -> IO ()
onCClosed TMVar Stream
stateRef = IO () -> IO a
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO a) -> (IO () -> IO ()) -> IO () -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ()
forall (m :: * -> *) a. MonadBaseControl IO m => m a -> m a
Ex.mask_ (IO () -> IO a) -> IO () -> IO a
forall a b. (a -> b) -> a -> b
$ do
    Maybe Stream
s' <- IO (Maybe Stream)
-> [Handler IO (Maybe Stream)] -> IO (Maybe Stream)
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> [Handler m a] -> m a
Ex.catches ( do
                        STM (Maybe Stream) -> IO (Maybe Stream)
forall a. STM a -> IO a
atomically (STM (Maybe Stream) -> IO (Maybe Stream))
-> STM (Maybe Stream) -> IO (Maybe Stream)
forall a b. (a -> b) -> a -> b
$ do
                            s :: Stream
s@(Stream TMVar StreamState
con) <- TMVar Stream -> STM Stream
forall a. TMVar a -> STM a
readTMVar TMVar Stream
stateRef
                            ConnectionState
scs <- StreamState -> ConnectionState
streamConnectionState (StreamState -> ConnectionState)
-> STM StreamState -> STM ConnectionState
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar StreamState -> STM StreamState
forall a. TMVar a -> STM a
readTMVar TMVar StreamState
con
                            Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnectionState -> Bool
stateIsClosed ConnectionState
scs)
                                 STM ()
forall a. STM a
retry
                            Maybe Stream -> STM (Maybe Stream)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe Stream -> STM (Maybe Stream))
-> Maybe Stream -> STM (Maybe Stream)
forall a b. (a -> b) -> a -> b
$ Stream -> Maybe Stream
forall a. a -> Maybe a
Just Stream
s
                   )
               [ (Interrupt -> IO (Maybe Stream)) -> Handler IO (Maybe Stream)
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Ex.Handler ((Interrupt -> IO (Maybe Stream)) -> Handler IO (Maybe Stream))
-> (Interrupt -> IO (Maybe Stream)) -> Handler IO (Maybe Stream)
forall a b. (a -> b) -> a -> b
$ \(Interrupt TMVar ()
t) -> do
                     IO [()] -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO [()] -> IO ()) -> IO [()] -> IO ()
forall a b. (a -> b) -> a -> b
$ [TMVar ()] -> IO [()]
handleInterrupts [TMVar ()
t]
                     Maybe Stream -> IO (Maybe Stream)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Stream
forall a. Maybe a
Nothing

               ]
    case Maybe Stream
s' of -- Maybe Stream
        Maybe Stream
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just Stream
s -> do -- Stream
            Maybe XmppElement
res <- IO (Maybe XmppElement)
-> [Handler IO (Maybe XmppElement)] -> IO (Maybe XmppElement)
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> [Handler m a] -> m a
Ex.catches (do
                   -- we don't know whether pull will
                   -- necessarily be interruptible
                             IO ()
allowInterrupt
                             Either XmppFailure XmppElement
res <- Stream -> IO (Either XmppFailure XmppElement)
pullXmppElement Stream
s
                             case Either XmppFailure XmppElement
res of
                                 Left XmppFailure
e -> do
                                     [Char] -> [Char] -> IO ()
errorM [Char]
"Pontarius.Xmpp" ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Read error: "
                                         [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ XmppFailure -> [Char]
forall a. Show a => a -> [Char]
show XmppFailure
e
                                     ()
_ <- Stream -> IO ()
closeStreams Stream
s
                                     XmppFailure -> IO ()
onCClosed XmppFailure
e
                                     Maybe XmppElement -> IO (Maybe XmppElement)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe XmppElement
forall a. Maybe a
Nothing
                                 Right XmppElement
r -> Maybe XmppElement -> IO (Maybe XmppElement)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe XmppElement -> IO (Maybe XmppElement))
-> Maybe XmppElement -> IO (Maybe XmppElement)
forall a b. (a -> b) -> a -> b
$ XmppElement -> Maybe XmppElement
forall a. a -> Maybe a
Just XmppElement
r
                              )
                       [ (Interrupt -> IO (Maybe XmppElement))
-> Handler IO (Maybe XmppElement)
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Ex.Handler ((Interrupt -> IO (Maybe XmppElement))
 -> Handler IO (Maybe XmppElement))
-> (Interrupt -> IO (Maybe XmppElement))
-> Handler IO (Maybe XmppElement)
forall a b. (a -> b) -> a -> b
$ \(Interrupt TMVar ()
t) -> do
                              IO [()] -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO [()] -> IO ()) -> IO [()] -> IO ()
forall a b. (a -> b) -> a -> b
$ [TMVar ()] -> IO [()]
handleInterrupts [TMVar ()
t]
                              Maybe XmppElement -> IO (Maybe XmppElement)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe XmppElement
forall a. Maybe a
Nothing
                       ]
            case Maybe XmppElement
res of
                Maybe XmppElement
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return () -- Caught an exception, nothing to
                                     -- do. TODO: Can this happen?
                Just XmppElement
sta -> IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ XmppElement -> IO ()
onElement XmppElement
sta
  where
    -- Defining an Control.Exception.allowInterrupt equivalent for GHC 7
    -- compatibility.
    allowInterrupt :: IO ()
    allowInterrupt :: IO ()
allowInterrupt = IO () -> IO ()
forall a. IO a -> IO a
unsafeUnmask (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    -- While waiting for the first semaphore(s) to flip we might receive another
    -- interrupt. When that happens we add it's semaphore to the list and retry
    -- waiting.
    handleInterrupts :: [TMVar ()] -> IO [()]
    handleInterrupts :: [TMVar ()] -> IO [()]
handleInterrupts [TMVar ()]
ts =
        IO [()] -> (Interrupt -> IO [()]) -> IO [()]
forall (m :: * -> *) e a.
(MonadBaseControl IO m, Exception e) =>
m a -> (e -> m a) -> m a
Ex.catch (STM [()] -> IO [()]
forall a. STM a -> IO a
atomically (STM [()] -> IO [()]) -> STM [()] -> IO [()]
forall a b. (a -> b) -> a -> b
$ [TMVar ()] -> (TMVar () -> STM ()) -> STM [()]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [TMVar ()]
ts TMVar () -> STM ()
forall a. TMVar a -> STM a
takeTMVar)
            (\(Interrupt TMVar ()
t) -> [TMVar ()] -> IO [()]
handleInterrupts (TMVar ()
tTMVar () -> [TMVar ()] -> [TMVar ()]
forall a. a -> [a] -> [a]
:[TMVar ()]
ts))
    stateIsClosed :: ConnectionState -> Bool
stateIsClosed ConnectionState
Closed   = Bool
True
    stateIsClosed ConnectionState
Finished = Bool
True
    stateIsClosed ConnectionState
_        = Bool
False

-- | Runs thread in XmppState monad. Returns channel of incoming and outgoing
-- stances, respectively, and an Action to stop the Threads and close the
-- connection.
startThreadsWith :: TMVar (BS.ByteString -> IO (Either XmppFailure ()))
                 -> (XmppElement -> IO ())
                 -> TMVar EventHandlers
                 -> Stream
                 -> Maybe Int
                 -> IO (Either XmppFailure (IO (),
                                            TMVar Stream,
                                            ThreadId))
startThreadsWith :: TMVar (ByteString -> IO (Either XmppFailure ()))
-> (XmppElement -> IO ())
-> TMVar EventHandlers
-> Stream
-> Maybe Int
-> IO (Either XmppFailure (IO (), TMVar Stream, ThreadId))
startThreadsWith TMVar (ByteString -> IO (Either XmppFailure ()))
writeSem XmppElement -> IO ()
stanzaHandler TMVar EventHandlers
eh Stream
con Maybe Int
keepAlive = do
    -- read' <- withStream' (gets $ streamSend . streamHandle) con
    -- writeSem <- newTMVarIO read'
    TMVar Stream
conS <- Stream -> IO (TMVar Stream)
forall a. a -> IO (TMVar a)
newTMVarIO Stream
con
    ThreadId
cp <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Maybe Int
-> TMVar (ByteString -> IO (Either XmppFailure ())) -> IO ()
forall a. Maybe Int -> TMVar (ByteString -> IO a) -> IO ()
connPersist Maybe Int
keepAlive TMVar (ByteString -> IO (Either XmppFailure ()))
writeSem
    let onConClosed :: XmppFailure -> IO ()
onConClosed XmppFailure
failure = do
            IO ()
stopWrites
            TMVar EventHandlers -> XmppFailure -> IO ()
noCon TMVar EventHandlers
eh XmppFailure
failure
    ThreadId
rdw <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ (XmppElement -> IO ())
-> (XmppFailure -> IO ()) -> TMVar Stream -> IO ()
forall a.
(XmppElement -> IO ())
-> (XmppFailure -> IO ()) -> TMVar Stream -> IO a
readWorker XmppElement -> IO ()
stanzaHandler XmppFailure -> IO ()
onConClosed TMVar Stream
conS
    Either XmppFailure (IO (), TMVar Stream, ThreadId)
-> IO (Either XmppFailure (IO (), TMVar Stream, ThreadId))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either XmppFailure (IO (), TMVar Stream, ThreadId)
 -> IO (Either XmppFailure (IO (), TMVar Stream, ThreadId)))
-> Either XmppFailure (IO (), TMVar Stream, ThreadId)
-> IO (Either XmppFailure (IO (), TMVar Stream, ThreadId))
forall a b. (a -> b) -> a -> b
$ (IO (), TMVar Stream, ThreadId)
-> Either XmppFailure (IO (), TMVar Stream, ThreadId)
forall a b. b -> Either a b
Right ( [ThreadId] -> IO ()
forall {m :: * -> *} {t :: * -> *}.
(MonadIO m, Traversable t) =>
t ThreadId -> m ()
killConnection [ThreadId
rdw, ThreadId
cp]
                   , TMVar Stream
conS
                   , ThreadId
rdw
                   )
  where
    stopWrites :: IO ()
stopWrites = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        ByteString -> IO (Either XmppFailure ())
_ <- TMVar (ByteString -> IO (Either XmppFailure ()))
-> STM (ByteString -> IO (Either XmppFailure ()))
forall a. TMVar a -> STM a
takeTMVar TMVar (ByteString -> IO (Either XmppFailure ()))
writeSem
        TMVar (ByteString -> IO (Either XmppFailure ()))
-> (ByteString -> IO (Either XmppFailure ())) -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (ByteString -> IO (Either XmppFailure ()))
writeSem ((ByteString -> IO (Either XmppFailure ())) -> STM ())
-> (ByteString -> IO (Either XmppFailure ())) -> STM ()
forall a b. (a -> b) -> a -> b
$ \ByteString
_ -> Either XmppFailure () -> IO (Either XmppFailure ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either XmppFailure () -> IO (Either XmppFailure ()))
-> Either XmppFailure () -> IO (Either XmppFailure ())
forall a b. (a -> b) -> a -> b
$ XmppFailure -> Either XmppFailure ()
forall a b. a -> Either a b
Left XmppFailure
XmppNoStream
    killConnection :: t ThreadId -> m ()
killConnection t ThreadId
threads = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        [Char] -> [Char] -> IO ()
debugM [Char]
"Pontarius.Xmpp" [Char]
"killing connection"
        IO ()
stopWrites
        [Char] -> [Char] -> IO ()
debugM [Char]
"Pontarius.Xmpp" [Char]
"killing threads"
        t ()
_ <- t ThreadId -> (ThreadId -> IO ()) -> IO (t ())
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM t ThreadId
threads ThreadId -> IO ()
killThread
        () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    -- Call the connection closed handlers.
    noCon :: TMVar EventHandlers -> XmppFailure -> IO ()
    noCon :: TMVar EventHandlers -> XmppFailure -> IO ()
noCon TMVar EventHandlers
h XmppFailure
e = do
        EventHandlers
hands <- STM EventHandlers -> IO EventHandlers
forall a. STM a -> IO a
atomically (STM EventHandlers -> IO EventHandlers)
-> STM EventHandlers -> IO EventHandlers
forall a b. (a -> b) -> a -> b
$ TMVar EventHandlers -> STM EventHandlers
forall a. TMVar a -> STM a
readTMVar TMVar EventHandlers
h
        ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ EventHandlers -> XmppFailure -> IO ()
connectionClosedHandler EventHandlers
hands XmppFailure
e
        () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- Acquires the write lock, pushes a space, and releases the lock.
-- | Sends a blank space every <delay> seconds to keep the connection alive.
connPersist :: Maybe Int -> TMVar (BS.ByteString -> IO a) -> IO ()
connPersist :: forall a. Maybe Int -> TMVar (ByteString -> IO a) -> IO ()
connPersist (Just Int
delay) TMVar (ByteString -> IO a)
sem = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    ByteString -> IO a
pushBS <- STM (ByteString -> IO a) -> IO (ByteString -> IO a)
forall a. STM a -> IO a
atomically (STM (ByteString -> IO a) -> IO (ByteString -> IO a))
-> STM (ByteString -> IO a) -> IO (ByteString -> IO a)
forall a b. (a -> b) -> a -> b
$ TMVar (ByteString -> IO a) -> STM (ByteString -> IO a)
forall a. TMVar a -> STM a
takeTMVar TMVar (ByteString -> IO a)
sem
    a
_ <- ByteString -> IO a
pushBS ByteString
" "
    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (ByteString -> IO a) -> (ByteString -> IO a) -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (ByteString -> IO a)
sem ByteString -> IO a
pushBS
    Int -> IO ()
threadDelay (Int
delayInt -> Int -> Int
forall a. Num a => a -> a -> a
*Int
1000000)
connPersist Maybe Int
Nothing TMVar (ByteString -> IO a)
_ = () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()