Viewing file: test_testing.py (15.86 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details.
""" Tests for L{twisted.internet.testing}. """
from zope.interface.verify import verifyObject
from twisted.internet.address import IPv4Address from twisted.internet.interfaces import ( IAddress, IConnector, IConsumer, IListeningPort, IPushProducer, IReactorSSL, IReactorTCP, IReactorUNIX, ITransport, ) from twisted.internet.protocol import ClientFactory, Factory from twisted.internet.testing import ( MemoryReactor, NonStreamingProducer, RaisingMemoryReactor, StringTransport, ) from twisted.python.reflect import namedAny from twisted.trial.unittest import TestCase
class StringTransportTests(TestCase): """ Tests for L{twisted.internet.testing.StringTransport}. """
def setUp(self): self.transport = StringTransport()
def test_interfaces(self): """ L{StringTransport} instances provide L{ITransport}, L{IPushProducer}, and L{IConsumer}. """ self.assertTrue(verifyObject(ITransport, self.transport)) self.assertTrue(verifyObject(IPushProducer, self.transport)) self.assertTrue(verifyObject(IConsumer, self.transport))
def test_registerProducer(self): """ L{StringTransport.registerProducer} records the arguments supplied to it as instance attributes. """ producer = object() streaming = object() self.transport.registerProducer(producer, streaming) self.assertIs(self.transport.producer, producer) self.assertIs(self.transport.streaming, streaming)
def test_disallowedRegisterProducer(self): """ L{StringTransport.registerProducer} raises L{RuntimeError} if a producer is already registered. """ producer = object() self.transport.registerProducer(producer, True) self.assertRaises( RuntimeError, self.transport.registerProducer, object(), False ) self.assertIs(self.transport.producer, producer) self.assertTrue(self.transport.streaming)
def test_unregisterProducer(self): """ L{StringTransport.unregisterProducer} causes the transport to forget about the registered producer and makes it possible to register a new one. """ oldProducer = object() newProducer = object() self.transport.registerProducer(oldProducer, False) self.transport.unregisterProducer() self.assertIsNone(self.transport.producer) self.transport.registerProducer(newProducer, True) self.assertIs(self.transport.producer, newProducer) self.assertTrue(self.transport.streaming)
def test_invalidUnregisterProducer(self): """ L{StringTransport.unregisterProducer} raises L{RuntimeError} if called when no producer is registered. """ self.assertRaises(RuntimeError, self.transport.unregisterProducer)
def test_initialProducerState(self): """ L{StringTransport.producerState} is initially C{'producing'}. """ self.assertEqual(self.transport.producerState, "producing")
def test_pauseProducing(self): """ L{StringTransport.pauseProducing} changes the C{producerState} of the transport to C{'paused'}. """ self.transport.pauseProducing() self.assertEqual(self.transport.producerState, "paused")
def test_resumeProducing(self): """ L{StringTransport.resumeProducing} changes the C{producerState} of the transport to C{'producing'}. """ self.transport.pauseProducing() self.transport.resumeProducing() self.assertEqual(self.transport.producerState, "producing")
def test_stopProducing(self): """ L{StringTransport.stopProducing} changes the C{'producerState'} of the transport to C{'stopped'}. """ self.transport.stopProducing() self.assertEqual(self.transport.producerState, "stopped")
def test_stoppedTransportCannotPause(self): """ L{StringTransport.pauseProducing} raises L{RuntimeError} if the transport has been stopped. """ self.transport.stopProducing() self.assertRaises(RuntimeError, self.transport.pauseProducing)
def test_stoppedTransportCannotResume(self): """ L{StringTransport.resumeProducing} raises L{RuntimeError} if the transport has been stopped. """ self.transport.stopProducing() self.assertRaises(RuntimeError, self.transport.resumeProducing)
def test_disconnectingTransportCannotPause(self): """ L{StringTransport.pauseProducing} raises L{RuntimeError} if the transport is being disconnected. """ self.transport.loseConnection() self.assertRaises(RuntimeError, self.transport.pauseProducing)
def test_disconnectingTransportCannotResume(self): """ L{StringTransport.resumeProducing} raises L{RuntimeError} if the transport is being disconnected. """ self.transport.loseConnection() self.assertRaises(RuntimeError, self.transport.resumeProducing)
def test_loseConnectionSetsDisconnecting(self): """ L{StringTransport.loseConnection} toggles the C{disconnecting} instance variable to C{True}. """ self.assertFalse(self.transport.disconnecting) self.transport.loseConnection() self.assertTrue(self.transport.disconnecting)
def test_specifiedHostAddress(self): """ If a host address is passed to L{StringTransport.__init__}, that value is returned from L{StringTransport.getHost}. """ address = object() self.assertIs(StringTransport(address).getHost(), address)
def test_specifiedPeerAddress(self): """ If a peer address is passed to L{StringTransport.__init__}, that value is returned from L{StringTransport.getPeer}. """ address = object() self.assertIs(StringTransport(peerAddress=address).getPeer(), address)
def test_defaultHostAddress(self): """ If no host address is passed to L{StringTransport.__init__}, an L{IPv4Address} is returned from L{StringTransport.getHost}. """ address = StringTransport().getHost() self.assertIsInstance(address, IPv4Address)
def test_defaultPeerAddress(self): """ If no peer address is passed to L{StringTransport.__init__}, an L{IPv4Address} is returned from L{StringTransport.getPeer}. """ address = StringTransport().getPeer() self.assertIsInstance(address, IPv4Address)
class ReactorTests(TestCase): """ Tests for L{MemoryReactor} and L{RaisingMemoryReactor}. """
def test_memoryReactorProvides(self): """ L{MemoryReactor} provides all of the attributes described by the interfaces it advertises. """ memoryReactor = MemoryReactor() verifyObject(IReactorTCP, memoryReactor) verifyObject(IReactorSSL, memoryReactor) verifyObject(IReactorUNIX, memoryReactor)
def test_raisingReactorProvides(self): """ L{RaisingMemoryReactor} provides all of the attributes described by the interfaces it advertises. """ raisingReactor = RaisingMemoryReactor() verifyObject(IReactorTCP, raisingReactor) verifyObject(IReactorSSL, raisingReactor) verifyObject(IReactorUNIX, raisingReactor)
def test_connectDestination(self): """ L{MemoryReactor.connectTCP}, L{MemoryReactor.connectSSL}, and L{MemoryReactor.connectUNIX} will return an L{IConnector} whose C{getDestination} method returns an L{IAddress} with attributes which reflect the values passed. """ memoryReactor = MemoryReactor() for connector in [ memoryReactor.connectTCP("test.example.com", 8321, ClientFactory()), memoryReactor.connectSSL("test.example.com", 8321, ClientFactory(), None), ]: verifyObject(IConnector, connector) address = connector.getDestination() verifyObject(IAddress, address) self.assertEqual(address.host, "test.example.com") self.assertEqual(address.port, 8321) connector = memoryReactor.connectUNIX(b"/fake/path", ClientFactory()) verifyObject(IConnector, connector) address = connector.getDestination() verifyObject(IAddress, address) self.assertEqual(address.name, b"/fake/path")
def test_listenDefaultHost(self): """ L{MemoryReactor.listenTCP}, L{MemoryReactor.listenSSL} and L{MemoryReactor.listenUNIX} will return an L{IListeningPort} whose C{getHost} method returns an L{IAddress}; C{listenTCP} and C{listenSSL} will have a default host of C{'0.0.0.0'}, and a port that reflects the value passed, and C{listenUNIX} will have a name that reflects the path passed. """ memoryReactor = MemoryReactor() for port in [ memoryReactor.listenTCP(8242, Factory()), memoryReactor.listenSSL(8242, Factory(), None), ]: verifyObject(IListeningPort, port) address = port.getHost() verifyObject(IAddress, address) self.assertEqual(address.host, "0.0.0.0") self.assertEqual(address.port, 8242) port = memoryReactor.listenUNIX(b"/path/to/socket", Factory()) verifyObject(IListeningPort, port) address = port.getHost() verifyObject(IAddress, address) self.assertEqual(address.name, b"/path/to/socket")
def test_readers(self): """ Adding, removing, and listing readers works. """ reader = object() reactor = MemoryReactor()
reactor.addReader(reader) reactor.addReader(reader)
self.assertEqual(reactor.getReaders(), [reader])
reactor.removeReader(reader)
self.assertEqual(reactor.getReaders(), [])
def test_writers(self): """ Adding, removing, and listing writers works. """ writer = object() reactor = MemoryReactor()
reactor.addWriter(writer) reactor.addWriter(writer)
self.assertEqual(reactor.getWriters(), [writer])
reactor.removeWriter(writer)
self.assertEqual(reactor.getWriters(), [])
class TestConsumer: """ A very basic test consumer for use with the NonStreamingProducerTests. """
def __init__(self): self.writes = [] self.producer = None self.producerStreaming = None
def registerProducer(self, producer, streaming): """ Registers a single producer with this consumer. Just keeps track of it.
@param producer: The producer to register. @param streaming: Whether the producer is a streaming one or not. """ self.producer = producer self.producerStreaming = streaming
def unregisterProducer(self): """ Forget the producer we had previously registered. """ self.producer = None self.producerStreaming = None
def write(self, data): """ Some data was written to the consumer: stores it for later use.
@param data: The data to write. """ self.writes.append(data)
class NonStreamingProducerTests(TestCase): """ Tests for the L{NonStreamingProducer} to validate behaviour. """
def test_producesOnly10Times(self): """ When the L{NonStreamingProducer} has resumeProducing called 10 times, it writes the counter each time and then fails. """ consumer = TestConsumer() producer = NonStreamingProducer(consumer) consumer.registerProducer(producer, False)
self.assertIs(consumer.producer, producer) self.assertIs(producer.consumer, consumer) self.assertFalse(consumer.producerStreaming)
for _ in range(10): producer.resumeProducing()
# We should have unregistered the producer and printed the 10 results. expectedWrites = [b"0", b"1", b"2", b"3", b"4", b"5", b"6", b"7", b"8", b"9"] self.assertIsNone(consumer.producer) self.assertIsNone(consumer.producerStreaming) self.assertIsNone(producer.consumer) self.assertEqual(consumer.writes, expectedWrites)
# Another attempt to produce fails. self.assertRaises(RuntimeError, producer.resumeProducing)
def test_cannotPauseProduction(self): """ When the L{NonStreamingProducer} is paused, it raises a L{RuntimeError}. """ consumer = TestConsumer() producer = NonStreamingProducer(consumer) consumer.registerProducer(producer, False)
# Produce once, just to be safe. producer.resumeProducing()
self.assertRaises(RuntimeError, producer.pauseProducing)
class DeprecationTests(TestCase): """ Deprecations in L{twisted.test.proto_helpers}. """
def helper(self, test, obj): new_path = f"twisted.internet.testing.{obj.__name__}" warnings = self.flushWarnings([test]) self.assertEqual(DeprecationWarning, warnings[0]["category"]) self.assertEqual(1, len(warnings)) self.assertIn(new_path, warnings[0]["message"]) self.assertIs(obj, namedAny(new_path))
def test_accumulatingProtocol(self): from twisted.test.proto_helpers import AccumulatingProtocol
self.helper(self.test_accumulatingProtocol, AccumulatingProtocol)
def test_lineSendingProtocol(self): from twisted.test.proto_helpers import LineSendingProtocol
self.helper(self.test_lineSendingProtocol, LineSendingProtocol)
def test_fakeDatagramTransport(self): from twisted.test.proto_helpers import FakeDatagramTransport
self.helper(self.test_fakeDatagramTransport, FakeDatagramTransport)
def test_stringTransport(self): from twisted.test.proto_helpers import StringTransport
self.helper(self.test_stringTransport, StringTransport)
def test_stringTransportWithDisconnection(self): from twisted.test.proto_helpers import StringTransportWithDisconnection
self.helper( self.test_stringTransportWithDisconnection, StringTransportWithDisconnection )
def test_stringIOWithoutClosing(self): from twisted.test.proto_helpers import StringIOWithoutClosing
self.helper(self.test_stringIOWithoutClosing, StringIOWithoutClosing)
def test__fakeConnector(self): from twisted.test.proto_helpers import _FakeConnector
self.helper(self.test__fakeConnector, _FakeConnector)
def test__fakePort(self): from twisted.test.proto_helpers import _FakePort
self.helper(self.test__fakePort, _FakePort)
def test_memoryReactor(self): from twisted.test.proto_helpers import MemoryReactor
self.helper(self.test_memoryReactor, MemoryReactor)
def test_memoryReactorClock(self): from twisted.test.proto_helpers import MemoryReactorClock
self.helper(self.test_memoryReactorClock, MemoryReactorClock)
def test_raisingMemoryReactor(self): from twisted.test.proto_helpers import RaisingMemoryReactor
self.helper(self.test_raisingMemoryReactor, RaisingMemoryReactor)
def test_nonStreamingProducer(self): from twisted.test.proto_helpers import NonStreamingProducer
self.helper(self.test_nonStreamingProducer, NonStreamingProducer)
def test_waitUntilAllDisconnected(self): from twisted.test.proto_helpers import waitUntilAllDisconnected
self.helper(self.test_waitUntilAllDisconnected, waitUntilAllDisconnected)
def test_eventLoggingObserver(self): from twisted.test.proto_helpers import EventLoggingObserver
self.helper(self.test_eventLoggingObserver, EventLoggingObserver)
|