Viewing file: _team.py (6.98 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# -*- test-case-name: twisted._threads.test.test_team -*- # Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details.
""" Implementation of a L{Team} of workers; a thread-pool that can allocate work to workers. """
from collections import deque
from zope.interface import implementer
from . import IWorker from ._convenience import Quit
class Statistics: """ Statistics about a L{Team}'s current activity.
@ivar idleWorkerCount: The number of idle workers. @type idleWorkerCount: L{int}
@ivar busyWorkerCount: The number of busy workers. @type busyWorkerCount: L{int}
@ivar backloggedWorkCount: The number of work items passed to L{Team.do} which have not yet been sent to a worker to be performed because not enough workers are available. @type backloggedWorkCount: L{int} """
def __init__(self, idleWorkerCount, busyWorkerCount, backloggedWorkCount): self.idleWorkerCount = idleWorkerCount self.busyWorkerCount = busyWorkerCount self.backloggedWorkCount = backloggedWorkCount
@implementer(IWorker) class Team: """ A composite L{IWorker} implementation.
@ivar _quit: A L{Quit} flag indicating whether this L{Team} has been quit yet. This may be set by an arbitrary thread since L{Team.quit} may be called from anywhere.
@ivar _coordinator: the L{IExclusiveWorker} coordinating access to this L{Team}'s internal resources.
@ivar _createWorker: a callable that will create new workers.
@ivar _logException: a 0-argument callable called in an exception context when there is an unhandled error from a task passed to L{Team.do}
@ivar _idle: a L{set} of idle workers.
@ivar _busyCount: the number of workers currently busy.
@ivar _pending: a C{deque} of tasks - that is, 0-argument callables passed to L{Team.do} - that are outstanding.
@ivar _shouldQuitCoordinator: A flag indicating that the coordinator should be quit at the next available opportunity. Unlike L{Team._quit}, this flag is only set by the coordinator.
@ivar _toShrink: the number of workers to shrink this L{Team} by at the next available opportunity; set in the coordinator. """
def __init__(self, coordinator, createWorker, logException): """ @param coordinator: an L{IExclusiveWorker} which will coordinate access to resources on this L{Team}; that is to say, an L{IExclusiveWorker} whose C{do} method ensures that its given work will be executed in a mutually exclusive context, not in parallel with other work enqueued by C{do} (although possibly in parallel with the caller).
@param createWorker: A 0-argument callable that will create an L{IWorker} to perform work.
@param logException: A 0-argument callable called in an exception context when the work passed to C{do} raises an exception. """ self._quit = Quit() self._coordinator = coordinator self._createWorker = createWorker self._logException = logException
# Don't touch these except from the coordinator. self._idle = set() self._busyCount = 0 self._pending = deque() self._shouldQuitCoordinator = False self._toShrink = 0
def statistics(self): """ Gather information on the current status of this L{Team}.
@return: a L{Statistics} describing the current state of this L{Team}. """ return Statistics(len(self._idle), self._busyCount, len(self._pending))
def grow(self, n): """ Increase the the number of idle workers by C{n}.
@param n: The number of new idle workers to create. @type n: L{int} """ self._quit.check()
@self._coordinator.do def createOneWorker(): for x in range(n): worker = self._createWorker() if worker is None: return self._recycleWorker(worker)
def shrink(self, n=None): """ Decrease the number of idle workers by C{n}.
@param n: The number of idle workers to shut down, or L{None} (or unspecified) to shut down all workers. @type n: L{int} or L{None} """ self._quit.check() self._coordinator.do(lambda: self._quitIdlers(n))
def _quitIdlers(self, n=None): """ The implmentation of C{shrink}, performed by the coordinator worker.
@param n: see L{Team.shrink} """ if n is None: n = len(self._idle) + self._busyCount for x in range(n): if self._idle: self._idle.pop().quit() else: self._toShrink += 1 if self._shouldQuitCoordinator and self._busyCount == 0: self._coordinator.quit()
def do(self, task): """ Perform some work in a worker created by C{createWorker}.
@param task: the callable to run """ self._quit.check() self._coordinator.do(lambda: self._coordinateThisTask(task))
def _coordinateThisTask(self, task): """ Select a worker to dispatch to, either an idle one or a new one, and perform it.
This method should run on the coordinator worker.
@param task: the task to dispatch @type task: 0-argument callable """ worker = self._idle.pop() if self._idle else self._createWorker() if worker is None: # The createWorker method may return None if we're out of resources # to create workers. self._pending.append(task) return self._busyCount += 1
@worker.do def doWork(): try: task() except BaseException: self._logException()
@self._coordinator.do def idleAndPending(): self._busyCount -= 1 self._recycleWorker(worker)
def _recycleWorker(self, worker): """ Called only from coordinator.
Recycle the given worker into the idle pool.
@param worker: a worker created by C{createWorker} and now idle. @type worker: L{IWorker} """ self._idle.add(worker) if self._pending: # Re-try the first enqueued thing. # (Explicitly do _not_ honor _quit.) self._coordinateThisTask(self._pending.popleft()) elif self._shouldQuitCoordinator: self._quitIdlers() elif self._toShrink > 0: self._toShrink -= 1 self._idle.remove(worker) worker.quit()
def quit(self): """ Stop doing work and shut down all idle workers. """ self._quit.set() # In case all the workers are idle when we do this.
@self._coordinator.do def startFinishing(): self._shouldQuitCoordinator = True self._quitIdlers()
|