\begin{code}
{-# LANGUAGE StrictData #-}
module Tox.Transport.Stream where

import           Data.Map                  (Map)
import qualified Data.Map                  as Map
import           Data.Word                 (Word64)
import           Tox.Core.Time             (Timestamp, TimeDiff)
import qualified Tox.Core.Time             as Time
import           Tox.Transport.Reliability (SeqNum)

-- | State for Congestion Control (CC) and RTT tracking.
data StreamState = StreamState
  { StreamState -> Maybe TimeDiff
ssMinRTT             :: Maybe TimeDiff -- ^ Lowest observed RTT
  , StreamState -> Maybe TimeDiff
ssLastRTT            :: Maybe TimeDiff -- ^ Most recent observed RTT
  , StreamState -> Map SeqNum Timestamp
ssSendHistory        :: Map SeqNum Timestamp -- ^ When each lossless packet was sent
  , StreamState -> Int
ssSentLastInterval   :: Int             -- ^ Packets sent in the current 1.2s window
  , StreamState -> Timestamp
ssIntervalStart      :: Timestamp       -- ^ Start of the current 1.2s window
  , StreamState -> Int
ssSendQueueSizeStart :: Int             -- ^ Send queue size at start of interval
  , StreamState -> Double
ssCurrentSendRate    :: Double          -- ^ Packets per second
  , StreamState -> Maybe Timestamp
ssLastCongestion     :: Maybe Timestamp -- ^ Last time a congestion event occurred
  } deriving (StreamState -> StreamState -> Bool
(StreamState -> StreamState -> Bool)
-> (StreamState -> StreamState -> Bool) -> Eq StreamState
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: StreamState -> StreamState -> Bool
$c/= :: StreamState -> StreamState -> Bool
== :: StreamState -> StreamState -> Bool
$c== :: StreamState -> StreamState -> Bool
Eq, Int -> StreamState -> ShowS
[StreamState] -> ShowS
StreamState -> String
(Int -> StreamState -> ShowS)
-> (StreamState -> String)
-> ([StreamState] -> ShowS)
-> Show StreamState
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [StreamState] -> ShowS
$cshowList :: [StreamState] -> ShowS
show :: StreamState -> String
$cshow :: StreamState -> String
showsPrec :: Int -> StreamState -> ShowS
$cshowsPrec :: Int -> StreamState -> ShowS
Show)

-- | Initial CC state.
initState :: Timestamp -> StreamState
initState :: Timestamp -> StreamState
initState Timestamp
now = StreamState :: Maybe TimeDiff
-> Maybe TimeDiff
-> Map SeqNum Timestamp
-> Int
-> Timestamp
-> Int
-> Double
-> Maybe Timestamp
-> StreamState
StreamState
  { ssMinRTT :: Maybe TimeDiff
ssMinRTT             = Maybe TimeDiff
forall a. Maybe a
Nothing
  , ssLastRTT :: Maybe TimeDiff
ssLastRTT            = Maybe TimeDiff
forall a. Maybe a
Nothing
  , ssSendHistory :: Map SeqNum Timestamp
ssSendHistory        = Map SeqNum Timestamp
forall k a. Map k a
Map.empty
  , ssSentLastInterval :: Int
ssSentLastInterval   = Int
0
  , ssIntervalStart :: Timestamp
ssIntervalStart      = Timestamp
now
  , ssSendQueueSizeStart :: Int
ssSendQueueSizeStart = Int
0
  , ssCurrentSendRate :: Double
ssCurrentSendRate    = Double
8.0 -- Minimum 8 packets/sec
  , ssLastCongestion :: Maybe Timestamp
ssLastCongestion     = Maybe Timestamp
forall a. Maybe a
Nothing
  }

-- | Record when a lossless packet was sent.
recordPacketSent :: SeqNum -> Timestamp -> StreamState -> StreamState
recordPacketSent :: SeqNum -> Timestamp -> StreamState -> StreamState
recordPacketSent SeqNum
s Timestamp
now StreamState
state = StreamState
state
  { ssSendHistory :: Map SeqNum Timestamp
ssSendHistory      = SeqNum -> Timestamp -> Map SeqNum Timestamp -> Map SeqNum Timestamp
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert SeqNum
s Timestamp
now (StreamState -> Map SeqNum Timestamp
ssSendHistory StreamState
state)
  , ssSentLastInterval :: Int
ssSentLastInterval = StreamState -> Int
ssSentLastInterval StreamState
state Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
  }

-- | Record when a packet was acknowledged by the peer.
-- Updates RTT metrics.
recordPacketAcked :: SeqNum -> Timestamp -> StreamState -> StreamState
recordPacketAcked :: SeqNum -> Timestamp -> StreamState -> StreamState
recordPacketAcked SeqNum
s Timestamp
now StreamState
state =
  case SeqNum -> Map SeqNum Timestamp -> Maybe Timestamp
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup SeqNum
s (StreamState -> Map SeqNum Timestamp
ssSendHistory StreamState
state) of
    Maybe Timestamp
Nothing -> StreamState
state -- Already acked or too old
    Just Timestamp
sentTime ->
      let rtt :: TimeDiff
rtt = Timestamp
now Timestamp -> Timestamp -> TimeDiff
`Time.diffTime` Timestamp
sentTime
          newMin :: Maybe TimeDiff
newMin = case StreamState -> Maybe TimeDiff
ssMinRTT StreamState
state of
            Maybe TimeDiff
Nothing -> TimeDiff -> Maybe TimeDiff
forall a. a -> Maybe a
Just TimeDiff
rtt
            Just TimeDiff
oldMin -> TimeDiff -> Maybe TimeDiff
forall a. a -> Maybe a
Just (TimeDiff -> TimeDiff -> TimeDiff
forall a. Ord a => a -> a -> a
min TimeDiff
oldMin TimeDiff
rtt)
      in StreamState
state
        { ssLastRTT :: Maybe TimeDiff
ssLastRTT     = TimeDiff -> Maybe TimeDiff
forall a. a -> Maybe a
Just TimeDiff
rtt
        , ssMinRTT :: Maybe TimeDiff
ssMinRTT      = Maybe TimeDiff
newMin
        , ssSendHistory :: Map SeqNum Timestamp
ssSendHistory = SeqNum -> Map SeqNum Timestamp -> Map SeqNum Timestamp
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete SeqNum
s (StreamState -> Map SeqNum Timestamp
ssSendHistory StreamState
state)
        }

ccInterval :: Time.TimeDiff
ccInterval :: TimeDiff
ccInterval = Integer -> TimeDiff
Time.milliseconds Integer
1200

congestionMemory :: Time.TimeDiff
congestionMemory :: TimeDiff
congestionMemory = Integer -> TimeDiff
Time.seconds Integer
2

-- | Periodically update the send rate based on throughput.
-- Should be called approximately every 1.2s.
updateSendRate :: Int -> Timestamp -> StreamState -> StreamState
updateSendRate :: Int -> Timestamp -> StreamState -> StreamState
updateSendRate Int
currentQueueSize Timestamp
now StreamState
state =
  let
    elapsed :: TimeDiff
elapsed = Timestamp
now Timestamp -> Timestamp -> TimeDiff
`Time.diffTime` StreamState -> Timestamp
ssIntervalStart StreamState
state
  in
    if TimeDiff
elapsed TimeDiff -> TimeDiff -> Bool
forall a. Ord a => a -> a -> Bool
< TimeDiff
ccInterval
    then StreamState
state
    else
      let
        -- Formula: (N - (Q_now - Q_prev)) / 1.2
        throughput :: Double
throughput = Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral (StreamState -> Int
ssSentLastInterval StreamState
state Int -> Int -> Int
forall a. Num a => a -> a -> a
- (Int
currentQueueSize Int -> Int -> Int
forall a. Num a => a -> a -> a
- StreamState -> Int
ssSendQueueSizeStart StreamState
state)) Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
1.2

        -- Apply floor of 8.0
        baseRate :: Double
baseRate = Double -> Double -> Double
forall a. Ord a => a -> a -> a
max Double
8.0 Double
throughput

        -- If no congestion in last 2s, increase rate by 25%
        hasCongestionRecently :: Bool
hasCongestionRecently = case StreamState -> Maybe Timestamp
ssLastCongestion StreamState
state of
          Maybe Timestamp
Nothing -> Bool
False
          Just Timestamp
t -> Timestamp
now Timestamp -> Timestamp -> TimeDiff
`Time.diffTime` Timestamp
t TimeDiff -> TimeDiff -> Bool
forall a. Ord a => a -> a -> Bool
< TimeDiff
congestionMemory

        newRate :: Double
newRate = if Bool
hasCongestionRecently then Double
baseRate else Double
baseRate Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1.25
      in
        StreamState
state
          { ssIntervalStart :: Timestamp
ssIntervalStart      = Timestamp
now
          , ssSentLastInterval :: Int
ssSentLastInterval   = Int
0
          , ssSendQueueSizeStart :: Int
ssSendQueueSizeStart = Int
currentQueueSize
          , ssCurrentSendRate :: Double
ssCurrentSendRate    = Double
newRate
          }

-- | Record a congestion event (e.g. when peer requests many packets).
recordCongestion :: Timestamp -> StreamState -> StreamState
recordCongestion :: Timestamp -> StreamState -> StreamState
recordCongestion Timestamp
now StreamState
state = StreamState
state { ssLastCongestion :: Maybe Timestamp
ssLastCongestion = Timestamp -> Maybe Timestamp
forall a. a -> Maybe a
Just Timestamp
now }
\end{code}

The ping or rtt (round trip time) between two peers can be calculated by saving
the time each packet was sent and taking the difference between the time the
latest packet confirmed received by a request packet was sent and the time the
request packet was received.  The rtt can be calculated for every request
packet.  The lowest one (for all packets) will be the closest to the real ping.

This ping or rtt can be used to know if a request packet that requests a packet
we just sent should be resent right away or we should wait or not for the next
one (to know if the other side actually had time to receive the packet).

The congestion control algorithm has the goal of guessing how many packets can
be sent through the link every second before none can be sent through anymore.
How it works is basically to send packets faster and faster until none can go
through the link and then stop sending them faster than that.

Currently the congestion control uses the following formula in toxcore however
that is probably not the best way to do it.

The current formula is to take the difference between the current size of the
send queue and the size of the send queue 1.2 seconds ago, take the total
number of packets sent in the last 1.2 seconds and subtract the previous number
from it.

Then divide this number by 1.2 to get a packet speed per second.  If this speed
is lower than the minimum send rate of 8 packets per second, set it to 8.

A congestion event can be defined as an event when the number of requested
packets exceeds the number of packets the congestion control says can be sent
during this frame.  If a congestion event occurred during the last 2 seconds,
the packet send rate of the connection is set to the send rate previously
calculated, if not it is set to that send rate times 1.25 in order to increase
the speed.

Like I said this isn't perfect and a better solution can likely be found or the
numbers tweaked.

To fix the possible issue where it would be impossible to send very low
bandwidth data like text messages when sending high bandwidth data like files
it is possible to make priority packets ignore the congestion control
completely by placing them into the send packet queue and sending them even if
the congestion control says not to.  This is used in toxcore for all non file
transfer packets to prevent file transfers from preventing normal message
packets from being sent.