Viewing file: pop3testserver.py (8.03 KB) -rwxr-xr-x Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
#! /usr/bin/python3 # -*- test-case-name: twisted.mail.test.test_pop3client -*-
# Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details.
import sys
from twisted.internet import reactor from twisted.internet.protocol import Factory from twisted.protocols import basic
USER = "test" PASS = "twisted"
PORT = 1100
SSL_SUPPORT = True UIDL_SUPPORT = True INVALID_SERVER_RESPONSE = False INVALID_CAPABILITY_RESPONSE = False INVALID_LOGIN_RESPONSE = False DENY_CONNECTION = False DROP_CONNECTION = False BAD_TLS_RESPONSE = False TIMEOUT_RESPONSE = False TIMEOUT_DEFERRED = False SLOW_GREETING = False
"""Commands""" CONNECTION_MADE = b"+OK POP3 localhost v2003.83 server ready"
CAPABILITIES = [b"TOP", b"LOGIN-DELAY 180", b"USER", b"SASL LOGIN"]
CAPABILITIES_SSL = b"STLS" CAPABILITIES_UIDL = b"UIDL"
INVALID_RESPONSE = b"-ERR Unknown request" VALID_RESPONSE = b"+OK Command Completed" AUTH_DECLINED = b"-ERR LOGIN failed" AUTH_ACCEPTED = b"+OK Mailbox open, 0 messages" TLS_ERROR = b"-ERR server side error start TLS handshake" LOGOUT_COMPLETE = b"+OK quit completed" NOT_LOGGED_IN = b"-ERR Unknown AUHORIZATION state command" STAT = b"+OK 0 0" UIDL = b"+OK Unique-ID listing follows\r\n." LIST = b"+OK Mailbox scan listing follows\r\n." CAP_START = b"+OK Capability list follows:"
class POP3TestServer(basic.LineReceiver): def __init__(self, contextFactory=None): self.loggedIn = False self.caps = None self.tmpUser = None self.ctx = contextFactory
def sendSTATResp(self, req): self.sendLine(STAT)
def sendUIDLResp(self, req): self.sendLine(UIDL)
def sendLISTResp(self, req): self.sendLine(LIST)
def sendCapabilities(self): if self.caps is None: self.caps = [CAP_START]
if UIDL_SUPPORT: self.caps.append(CAPABILITIES_UIDL)
if SSL_SUPPORT: self.caps.append(CAPABILITIES_SSL)
for cap in CAPABILITIES: self.caps.append(cap) resp = b"\r\n".join(self.caps) resp += b"\r\n."
self.sendLine(resp)
def connectionMade(self): if DENY_CONNECTION: self.disconnect() return
if SLOW_GREETING: reactor.callLater(20, self.sendGreeting)
else: self.sendGreeting()
def sendGreeting(self): self.sendLine(CONNECTION_MADE)
def lineReceived(self, line): """Error Conditions"""
uline = line.upper() find = lambda s: uline.find(s) != -1
if TIMEOUT_RESPONSE: # Do not respond to clients request return
if DROP_CONNECTION: self.disconnect() return
elif find(b"CAPA"): if INVALID_CAPABILITY_RESPONSE: self.sendLine(INVALID_RESPONSE) else: self.sendCapabilities()
elif find(b"STLS") and SSL_SUPPORT: self.startTLS()
elif find(b"USER"): if INVALID_LOGIN_RESPONSE: self.sendLine(INVALID_RESPONSE) return
resp = None try: self.tmpUser = line.split(" ")[1] resp = VALID_RESPONSE except BaseException: resp = AUTH_DECLINED
self.sendLine(resp)
elif find(b"PASS"): resp = None try: pwd = line.split(" ")[1]
if self.tmpUser is None or pwd is None: resp = AUTH_DECLINED elif self.tmpUser == USER and pwd == PASS: resp = AUTH_ACCEPTED self.loggedIn = True else: resp = AUTH_DECLINED except BaseException: resp = AUTH_DECLINED
self.sendLine(resp)
elif find(b"QUIT"): self.loggedIn = False self.sendLine(LOGOUT_COMPLETE) self.disconnect()
elif INVALID_SERVER_RESPONSE: self.sendLine(INVALID_RESPONSE)
elif not self.loggedIn: self.sendLine(NOT_LOGGED_IN)
elif find(b"NOOP"): self.sendLine(VALID_RESPONSE)
elif find(b"STAT"): if TIMEOUT_DEFERRED: return self.sendLine(STAT)
elif find(b"LIST"): if TIMEOUT_DEFERRED: return self.sendLine(LIST)
elif find(b"UIDL"): if TIMEOUT_DEFERRED: return elif not UIDL_SUPPORT: self.sendLine(INVALID_RESPONSE) return
self.sendLine(UIDL)
def startTLS(self): if self.ctx is None: self.getContext()
if SSL_SUPPORT and self.ctx is not None: self.sendLine(b"+OK Begin TLS negotiation now") self.transport.startTLS(self.ctx) else: self.sendLine(b"-ERR TLS not available")
def disconnect(self): self.transport.loseConnection()
def getContext(self): try: from twisted.internet import ssl except ImportError: self.ctx = None else: self.ctx = ssl.ClientContextFactory() self.ctx.method = ssl.SSL.TLSv1_METHOD
usage = """popServer.py [arg] (default is Standard POP Server with no messages) no_ssl - Start with no SSL support no_uidl - Start with no UIDL support bad_resp - Send a non-RFC compliant response to the Client bad_cap_resp - send a non-RFC compliant response when the Client sends a 'CAPABILITY' request bad_login_resp - send a non-RFC compliant response when the Client sends a 'LOGIN' request deny - Deny the connection drop - Drop the connection after sending the greeting bad_tls - Send a bad response to a STARTTLS timeout - Do not return a response to a Client request to_deferred - Do not return a response on a 'Select' request. This will test Deferred callback handling slow - Wait 20 seconds after the connection is made to return a Server Greeting """
def printMessage(msg): print("Server Starting in %s mode" % msg)
def processArg(arg):
if arg.lower() == "no_ssl": global SSL_SUPPORT SSL_SUPPORT = False printMessage("NON-SSL")
elif arg.lower() == "no_uidl": global UIDL_SUPPORT UIDL_SUPPORT = False printMessage("NON-UIDL")
elif arg.lower() == "bad_resp": global INVALID_SERVER_RESPONSE INVALID_SERVER_RESPONSE = True printMessage("Invalid Server Response")
elif arg.lower() == "bad_cap_resp": global INVALID_CAPABILITY_RESPONSE INVALID_CAPABILITY_RESPONSE = True printMessage("Invalid Capability Response")
elif arg.lower() == "bad_login_resp": global INVALID_LOGIN_RESPONSE INVALID_LOGIN_RESPONSE = True printMessage("Invalid Capability Response")
elif arg.lower() == "deny": global DENY_CONNECTION DENY_CONNECTION = True printMessage("Deny Connection")
elif arg.lower() == "drop": global DROP_CONNECTION DROP_CONNECTION = True printMessage("Drop Connection")
elif arg.lower() == "bad_tls": global BAD_TLS_RESPONSE BAD_TLS_RESPONSE = True printMessage("Bad TLS Response")
elif arg.lower() == "timeout": global TIMEOUT_RESPONSE TIMEOUT_RESPONSE = True printMessage("Timeout Response")
elif arg.lower() == "to_deferred": global TIMEOUT_DEFERRED TIMEOUT_DEFERRED = True printMessage("Timeout Deferred Response")
elif arg.lower() == "slow": global SLOW_GREETING SLOW_GREETING = True printMessage("Slow Greeting")
elif arg.lower() == "--help": print(usage) sys.exit()
else: print(usage) sys.exit()
def main():
if len(sys.argv) < 2: printMessage("POP3 with no messages") else: args = sys.argv[1:]
for arg in args: processArg(arg)
f = Factory() f.protocol = POP3TestServer reactor.listenTCP(PORT, f) reactor.run()
if __name__ == "__main__": main()
|