Viewing file: test_threadworker.py (7.8 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details.
""" Tests for L{twisted._threads._threadworker}. """
import gc import weakref from threading import ThreadError, local
from twisted.trial.unittest import SynchronousTestCase from .. import AlreadyQuit, LockWorker, ThreadWorker
class FakeQueueEmpty(Exception): """ L{FakeQueue}'s C{get} has exhausted the queue. """
class WouldDeadlock(Exception): """ If this were a real lock, you'd be deadlocked because the lock would be double-acquired. """
class FakeThread: """ A fake L{threading.Thread}.
@ivar target: A target function to run. @type target: L{callable}
@ivar started: Has this thread been started? @type started: L{bool} """
def __init__(self, target): """ Create a L{FakeThread} with a target. """ self.target = target self.started = False
def start(self): """ Set the "started" flag. """ self.started = True
class FakeQueue: """ A fake L{Queue} implementing C{put} and C{get}.
@ivar items: A lit of items placed by C{put} but not yet retrieved by C{get}. @type items: L{list} """
def __init__(self): """ Create a L{FakeQueue}. """ self.items = []
def put(self, item): """ Put an item into the queue for later retrieval by L{FakeQueue.get}.
@param item: any object """ self.items.append(item)
def get(self): """ Get an item.
@return: an item previously put by C{put}. """ if not self.items: raise FakeQueueEmpty() return self.items.pop(0)
class FakeLock: """ A stand-in for L{threading.Lock}.
@ivar acquired: Whether this lock is presently acquired. """
def __init__(self): """ Create a lock in the un-acquired state. """ self.acquired = False
def acquire(self): """ Acquire the lock. Raise an exception if the lock is already acquired. """ if self.acquired: raise WouldDeadlock() self.acquired = True
def release(self): """ Release the lock. Raise an exception if the lock is not presently acquired. """ if not self.acquired: raise ThreadError() self.acquired = False
class ThreadWorkerTests(SynchronousTestCase): """ Tests for L{ThreadWorker}. """
def setUp(self): """ Create a worker with fake threads. """ self.fakeThreads = [] self.fakeQueue = FakeQueue()
def startThread(target): newThread = FakeThread(target=target) newThread.start() self.fakeThreads.append(newThread) return newThread
self.worker = ThreadWorker(startThread, self.fakeQueue)
def test_startsThreadAndPerformsWork(self): """ L{ThreadWorker} calls its C{createThread} callable to create a thread, its C{createQueue} callable to create a queue, and then the thread's target pulls work from that queue. """ self.assertEqual(len(self.fakeThreads), 1) self.assertEqual(self.fakeThreads[0].started, True)
def doIt(): doIt.done = True
doIt.done = False self.worker.do(doIt) self.assertEqual(doIt.done, False) self.assertRaises(FakeQueueEmpty, self.fakeThreads[0].target) self.assertEqual(doIt.done, True)
def test_quitPreventsFutureCalls(self): """ L{ThreadWorker.quit} causes future calls to L{ThreadWorker.do} and L{ThreadWorker.quit} to raise L{AlreadyQuit}. """ self.worker.quit() self.assertRaises(AlreadyQuit, self.worker.quit) self.assertRaises(AlreadyQuit, self.worker.do, list)
class LockWorkerTests(SynchronousTestCase): """ Tests for L{LockWorker}. """
def test_fakeDeadlock(self): """ The L{FakeLock} test fixture will alert us if there's a potential deadlock. """ lock = FakeLock() lock.acquire() self.assertRaises(WouldDeadlock, lock.acquire)
def test_fakeDoubleRelease(self): """ The L{FakeLock} test fixture will alert us if there's a potential double-release. """ lock = FakeLock() self.assertRaises(ThreadError, lock.release) lock.acquire() self.assertEqual(None, lock.release()) self.assertRaises(ThreadError, lock.release)
def test_doExecutesImmediatelyWithLock(self): """ L{LockWorker.do} immediately performs the work it's given, while the lock is acquired. """ storage = local() lock = FakeLock() worker = LockWorker(lock, storage)
def work(): work.done = True work.acquired = lock.acquired
work.done = False worker.do(work) self.assertEqual(work.done, True) self.assertEqual(work.acquired, True) self.assertEqual(lock.acquired, False)
def test_doUnwindsReentrancy(self): """ If L{LockWorker.do} is called recursively, it postpones the inner call until the outer one is complete. """ lock = FakeLock() worker = LockWorker(lock, local()) levels = [] acquired = []
def work(): work.level += 1 levels.append(work.level) acquired.append(lock.acquired) if len(levels) < 2: worker.do(work) work.level -= 1
work.level = 0 worker.do(work) self.assertEqual(levels, [1, 1]) self.assertEqual(acquired, [True, True])
def test_quit(self): """ L{LockWorker.quit} frees the resources associated with its lock and causes further calls to C{do} and C{quit} to fail. """ lock = FakeLock() ref = weakref.ref(lock) worker = LockWorker(lock, local()) lock = None self.assertIsNot(ref(), None) worker.quit() gc.collect() self.assertIs(ref(), None) self.assertRaises(AlreadyQuit, worker.quit) self.assertRaises(AlreadyQuit, worker.do, list)
def test_quitWhileWorking(self): """ If L{LockWorker.quit} is invoked during a call to L{LockWorker.do}, all recursive work scheduled with L{LockWorker.do} will be completed and the lock will be released. """ lock = FakeLock() ref = weakref.ref(lock) worker = LockWorker(lock, local())
def phase1(): worker.do(phase2) worker.quit() self.assertRaises(AlreadyQuit, worker.do, list) phase1.complete = True
phase1.complete = False
def phase2(): phase2.complete = True phase2.acquired = lock.acquired
phase2.complete = False worker.do(phase1) self.assertEqual(phase1.complete, True) self.assertEqual(phase2.complete, True) self.assertEqual(lock.acquired, False) lock = None gc.collect() self.assertIs(ref(), None)
def test_quitWhileGettingLock(self): """ If L{LockWorker.do} is called concurrently with L{LockWorker.quit}, and C{quit} wins the race before C{do} gets the lock attribute, then L{AlreadyQuit} will be raised. """
class RacyLockWorker(LockWorker): @property def _lock(self): self.quit() return self.__dict__["_lock"]
@_lock.setter def _lock(self, value): self.__dict__["_lock"] = value
worker = RacyLockWorker(FakeLock(), local()) self.assertRaises(AlreadyQuit, worker.do, list)
|