diff --git a/distributed-process.cabal b/distributed-process.cabal index 5f3871b0..4d6ad482 100644 --- a/distributed-process.cabal +++ b/distributed-process.cabal @@ -72,6 +72,7 @@ Library Control.Distributed.Process.Internal.StrictContainerAccessors, Control.Distributed.Process.Internal.StrictList, Control.Distributed.Process.Internal.StrictMVar, + Control.Distributed.Process.Internal.ThreadPool, Control.Distributed.Process.Internal.Types, Control.Distributed.Process.Internal.WeakTQueue, Control.Distributed.Process.Management, diff --git a/src/Control/Distributed/Process/Internal/ThreadPool.hs b/src/Control/Distributed/Process/Internal/ThreadPool.hs new file mode 100644 index 00000000..333f3e1c --- /dev/null +++ b/src/Control/Distributed/Process/Internal/ThreadPool.hs @@ -0,0 +1,78 @@ +-- | An implementation of a pool of threads +-- +{-# LANGUAGE RecursiveDo #-} +module Control.Distributed.Process.Internal.ThreadPool + ( newThreadPool + , submitTask + , lookupWorker + , ThreadPool + ) where + +import Control.Exception +import Control.Monad +import Data.IORef +import qualified Data.Map as Map + + +-- | A pool of worker threads that execute tasks. +-- +-- Each worker thread is named with a key @k@. Tasks are submitted to a +-- specific worker using its key. While the worker is busy the tasks are queued. +-- When there are no more queued tasks the worker ceases to exist. +-- +-- The next time a task is submitted the worker will be respawned. +-- +newtype ThreadPool k w = ThreadPool (IORef (Map.Map k (Maybe (IO ()), w))) + +-- Each worker has an entry in the map with a closure that contains all +-- queued actions fot it. +-- +-- No entry in the map is kept for defunct workers. + +-- | Creates a pool with no workers. +newThreadPool :: IO (ThreadPool k w) +newThreadPool = fmap ThreadPool $ newIORef Map.empty + +-- | @submitTask pool fork k task@ submits a task for the worker @k@. +-- +-- If worker @k@ is busy, then the task is queued until the worker is available. +-- +-- If worker @k@ does not exist, then the given @fork@ operation is used to +-- spawn the worker. @fork@ returns whatever information is deemed useful for +-- later retrieval via 'lookupWorker'. +-- +submitTask :: Ord k + => ThreadPool k w + -> (IO () -> IO w) + -> k -> IO () -> IO () +submitTask (ThreadPool mapRef) fork k task = mdo + m' <- join $ atomicModifyIORef mapRef $ \m -> + case Map.lookup k m of + -- There is no worker for this key, create one. + Nothing -> ( m' + , do w <- fork $ flip onException terminateWorker $ do + task + continue + return $ Map.insert k (Nothing, w) m + ) + -- Queue an action for the existing worker. + Just (mp, w) -> + (m', return $ Map.insert k (Just $ maybe task (>> task) mp, w) m) + return () + where + continue = join $ atomicModifyIORef mapRef $ \m -> + case Map.lookup k m of + -- Execute the next batch of queued actions. + Just (Just p, w) -> (Map.insert k (Nothing, w) m, p >> continue) + -- There are no more queued actions. Terminate the worker. + Just (Nothing, w) -> (Map.delete k m, return ()) + -- The worker key was removed already (?) + Nothing -> (m, return ()) + -- Remove the worker key regardless of whether there are more queued + -- actions. + terminateWorker = atomicModifyIORef mapRef $ \m -> (Map.delete k m, ()) + +-- | Looks up a worker with the given key. +lookupWorker :: Ord k => ThreadPool k w -> k -> IO (Maybe w) +lookupWorker (ThreadPool mapRef) k = + atomicModifyIORef mapRef $ \m -> (m, fmap snd $ Map.lookup k m) diff --git a/src/Control/Distributed/Process/Internal/Types.hs b/src/Control/Distributed/Process/Internal/Types.hs index 4acf74a7..c7eb3abf 100644 --- a/src/Control/Distributed/Process/Internal/Types.hs +++ b/src/Control/Distributed/Process/Internal/Types.hs @@ -130,6 +130,7 @@ import Control.Distributed.Process.Internal.StrictMVar , withMVar , modifyMVar_ ) +import Control.Distributed.Process.Internal.ThreadPool import Control.Distributed.Process.Internal.WeakTQueue (TQueue) import Control.Distributed.Static (RemoteTable, Closure) import qualified Control.Distributed.Process.Internal.StrictContainerAccessors as DAC (mapMaybe) @@ -259,6 +260,8 @@ data LocalNode = LocalNode -- | Runtime lookup table for supporting closures -- TODO: this should be part of the CH state, not the local endpoint state , remoteTable :: !RemoteTable + -- The pool of threads and queues to send messages + , localSendPool :: ThreadPool NodeId ThreadId } data ImplicitReconnect = WithImplicitReconnect | NoImplicitReconnect diff --git a/src/Control/Distributed/Process/Node.hs b/src/Control/Distributed/Process/Node.hs index 5830f6fe..26f91cad 100644 --- a/src/Control/Distributed/Process/Node.hs +++ b/src/Control/Distributed/Process/Node.hs @@ -77,6 +77,12 @@ import Control.Distributed.Process.Internal.StrictMVar , putMVar , takeMVar ) +import Control.Distributed.Process.Internal.ThreadPool + ( newThreadPool + , submitTask + , lookupWorker + , ThreadPool + ) import Control.Concurrent.Chan (newChan, writeChan, readChan) import qualified Control.Concurrent.MVar as MVar (newEmptyMVar, takeMVar) import Control.Concurrent.STM @@ -242,12 +248,14 @@ createBareLocalNode endPoint rtable = do , _localConnections = Map.empty } ctrlChan <- newChan + sendPool <- newThreadPool let node = LocalNode { localNodeId = NodeId $ NT.address endPoint , localEndPoint = endPoint , localState = state , localCtrlChan = ctrlChan , localEventBus = MxEventBusInitialising , remoteTable = rtable + , localSendPool = sendPool } tracedNode <- startMxAgent node @@ -560,6 +568,8 @@ handleIncomingMessages node = go initConnectionState , ctrlMsgSignal = Died nid DiedDisconnect } let notLost k = not (k `Set.member` (st ^. incomingFrom theirAddr)) + liftIO $ lookupWorker (localSendPool node) (NodeId theirAddr) + >>= maybe (return ()) killThread closeImplicitReconnections node nid go ( (incomingFrom theirAddr ^= Set.empty) . (incoming ^: Map.filterWithKey (const . notLost)) @@ -614,6 +624,9 @@ data NCState = NCState , _registeredOnNodes :: !(Map ProcessId [(NodeId,Int)]) } +submitSendPool :: LocalNode -> NodeId -> IO () -> IO () +submitSendPool node nid task = submitTask (localSendPool node) forkIO nid task + newtype NC a = NC { unNC :: StateT NCState (ReaderT LocalNode IO) a } deriving ( Applicative , Functor @@ -624,11 +637,12 @@ newtype NC a = NC { unNC :: StateT NCState (ReaderT LocalNode IO) a } ) initNCState :: NCState -initNCState = NCState { _links = Map.empty - , _monitors = Map.empty - , _registeredHere = Map.empty - , _registeredOnNodes = Map.empty - } +initNCState = NCState + { _links = Map.empty + , _monitors = Map.empty + , _registeredHere = Map.empty + , _registeredOnNodes = Map.empty + } -- | Thrown in response to the user invoking 'kill' (see Primitives.hs). This -- type is deliberately not exported so it cannot be caught explicitly. @@ -683,7 +697,7 @@ nodeController = do -- [Unified: Table 7, rule nc_forward] case destNid (ctrlMsgSignal msg) of Just nid' | nid' /= localNodeId node -> - liftIO $ sendBinary node + liftIO $ submitSendPool node nid' $ sendBinary node (ctrlMsgSender msg) (NodeIdentifier nid') WithImplicitReconnect @@ -754,7 +768,7 @@ ncEffectMonitor from them mRef = do -- TODO: this is the right sender according to the Unified semantics, -- but perhaps having 'them' as the sender would make more sense -- (see also: notifyDied) - liftIO $ sendBinary node + liftIO $ submitSendPool node (processNodeId from) $ sendBinary node (NodeIdentifier $ localNodeId node) (NodeIdentifier $ processNodeId from) WithImplicitReconnect @@ -823,7 +837,7 @@ ncEffectDied ident reason = do modify' $ registeredOnNodes ^= (Map.fromList (catMaybes remaining)) where forwardNameDeath node nid = - liftIO $ sendBinary node + liftIO $ submitSendPool node nid $ sendBinary node (NodeIdentifier $ localNodeId node) (NodeIdentifier $ nid) WithImplicitReconnect @@ -844,7 +858,7 @@ ncEffectSpawn pid cProc ref = do Right p -> p node <- ask pid' <- liftIO $ forkProcess node proc - liftIO $ sendMessage node + liftIO $ submitSendPool node (processNodeId pid) $ sendMessage node (NodeIdentifier (localNodeId node)) (ProcessIdentifier pid) WithImplicitReconnect @@ -872,7 +886,8 @@ ncEffectRegister from label atnode mPid reregistration = do case mPid of (Just p) -> liftIO $ trace node (MxRegistered p label) Nothing -> liftIO $ trace node (MxUnRegistered (fromJust currentVal) label) - liftIO $ sendMessage node + liftIO $ submitSendPool node (processNodeId from) $ + sendMessage node (NodeIdentifier (localNodeId node)) (ProcessIdentifier from) WithImplicitReconnect @@ -906,7 +921,7 @@ ncEffectRegister from label atnode mPid reregistration = do decList (x:xs) tag = x:decList xs tag forward node to reg = when (not $ isLocal node (NodeIdentifier to)) $ - liftIO $ sendBinary node + liftIO $ submitSendPool node to $ sendBinary node (ProcessIdentifier from) (NodeIdentifier to) WithImplicitReconnect @@ -921,7 +936,7 @@ ncEffectWhereIs :: ProcessId -> String -> NC () ncEffectWhereIs from label = do node <- ask mPid <- gets (^. registeredHereFor label) - liftIO $ sendMessage node + liftIO $ submitSendPool node (processNodeId from) $ sendMessage node (NodeIdentifier (localNodeId node)) (ProcessIdentifier from) WithImplicitReconnect @@ -1022,7 +1037,7 @@ ncEffectGetInfo from pid = -> NC () dispatch True dest _ pInfo = postAsMessage dest $ pInfo dispatch False dest node pInfo = do - liftIO $ sendMessage node + liftIO $ submitSendPool node (processNodeId dest) $ sendMessage node (NodeIdentifier (localNodeId node)) (ProcessIdentifier dest) WithImplicitReconnect @@ -1073,7 +1088,7 @@ notifyDied dest src reason mRef = do throwException dest $ PortLinkException pid reason (False, _, _) -> -- The change in sender comes from [Unified: Table 10] - liftIO $ sendBinary node + liftIO $ submitSendPool node (processNodeId dest) $ sendBinary node (NodeIdentifier $ localNodeId node) (NodeIdentifier $ processNodeId dest) WithImplicitReconnect