1672 lines
46 KiB
Python
1672 lines
46 KiB
Python
# Copyright (c) Twisted Matrix Laboratories.
|
|
# See LICENSE for details.
|
|
|
|
"""
|
|
Test cases for Ltwisted.mail.pop3} module.
|
|
"""
|
|
|
|
from __future__ import print_function
|
|
|
|
import hmac
|
|
import base64
|
|
import itertools
|
|
|
|
from collections import OrderedDict
|
|
from io import BytesIO
|
|
|
|
from zope.interface import implementer
|
|
|
|
from twisted import cred
|
|
from twisted import internet
|
|
from twisted import mail
|
|
from twisted.internet import defer
|
|
from twisted.mail import pop3
|
|
from twisted.protocols import loopback
|
|
from twisted.python import failure
|
|
from twisted.python.compat import intToBytes
|
|
from twisted.test.proto_helpers import LineSendingProtocol
|
|
from twisted.trial import unittest, util
|
|
import twisted.cred.checkers
|
|
import twisted.cred.credentials
|
|
import twisted.cred.portal
|
|
import twisted.internet.protocol
|
|
import twisted.mail.pop3
|
|
import twisted.mail.protocols
|
|
|
|
|
|
class UtilityTests(unittest.TestCase):
|
|
"""
|
|
Test the various helper functions and classes used by the POP3 server
|
|
protocol implementation.
|
|
"""
|
|
|
|
def test_LineBuffering(self):
|
|
"""
|
|
Test creating a LineBuffer and feeding it some lines. The lines should
|
|
build up in its internal buffer for a while and then get spat out to
|
|
the writer.
|
|
"""
|
|
output = []
|
|
input = iter(itertools.cycle(['012', '345', '6', '7', '8', '9']))
|
|
c = pop3._IteratorBuffer(output.extend, input, 6)
|
|
i = iter(c)
|
|
self.assertEqual(output, []) # Nothing is buffer
|
|
next(i)
|
|
self.assertEqual(output, []) # '012' is buffered
|
|
next(i)
|
|
self.assertEqual(output, []) # '012345' is buffered
|
|
next(i)
|
|
self.assertEqual(output, ['012', '345', '6']) # Nothing is buffered
|
|
for n in range(5):
|
|
next(i)
|
|
self.assertEqual(output, ['012', '345', '6', '7', '8', '9', '012',
|
|
'345'])
|
|
|
|
|
|
def test_FinishLineBuffering(self):
|
|
"""
|
|
Test that a LineBuffer flushes everything when its iterator is
|
|
exhausted, and itself raises StopIteration.
|
|
"""
|
|
output = []
|
|
input = iter(['a', 'b', 'c'])
|
|
c = pop3._IteratorBuffer(output.extend, input, 5)
|
|
for i in c:
|
|
pass
|
|
self.assertEqual(output, ['a', 'b', 'c'])
|
|
|
|
|
|
def test_SuccessResponseFormatter(self):
|
|
"""
|
|
Test that the thing that spits out POP3 'success responses' works
|
|
right.
|
|
"""
|
|
self.assertEqual(
|
|
pop3.successResponse(b'Great.'),
|
|
b'+OK Great.\r\n')
|
|
|
|
|
|
def test_StatLineFormatter(self):
|
|
"""
|
|
Test that the function which formats stat lines does so appropriately.
|
|
"""
|
|
statLine = list(pop3.formatStatResponse([]))[-1]
|
|
self.assertEqual(statLine, b'+OK 0 0\r\n')
|
|
|
|
statLine = list(pop3.formatStatResponse([10, 31, 0, 10101]))[-1]
|
|
self.assertEqual(statLine, b'+OK 4 10142\r\n')
|
|
|
|
|
|
def test_ListLineFormatter(self):
|
|
"""
|
|
Test that the function which formats the lines in response to a LIST
|
|
command does so appropriately.
|
|
"""
|
|
listLines = list(pop3.formatListResponse([]))
|
|
self.assertEqual(
|
|
listLines,
|
|
[b'+OK 0\r\n', b'.\r\n'])
|
|
|
|
listLines = list(pop3.formatListResponse([1, 2, 3, 100]))
|
|
self.assertEqual(
|
|
listLines,
|
|
[b'+OK 4\r\n', b'1 1\r\n', b'2 2\r\n', b'3 3\r\n', b'4 100\r\n',
|
|
b'.\r\n'])
|
|
|
|
|
|
def test_UIDListLineFormatter(self):
|
|
"""
|
|
Test that the function which formats lines in response to a UIDL
|
|
command does so appropriately.
|
|
"""
|
|
uids = ['abc', 'def', 'ghi']
|
|
listLines = list(pop3.formatUIDListResponse([], uids.__getitem__))
|
|
self.assertEqual(
|
|
listLines,
|
|
[b'+OK \r\n', b'.\r\n'])
|
|
|
|
listLines = list(pop3.formatUIDListResponse([123, 431, 591],
|
|
uids.__getitem__))
|
|
self.assertEqual(
|
|
listLines,
|
|
[b'+OK \r\n', b'1 abc\r\n', b'2 def\r\n', b'3 ghi\r\n', b'.\r\n'])
|
|
|
|
listLines = list(pop3.formatUIDListResponse([0, None, 591],
|
|
uids.__getitem__))
|
|
self.assertEqual(
|
|
listLines,
|
|
[b'+OK \r\n', b'1 abc\r\n', b'3 ghi\r\n', b'.\r\n'])
|
|
|
|
|
|
|
|
class MyVirtualPOP3(mail.protocols.VirtualPOP3):
|
|
"""
|
|
A virtual-domain-supporting POP3 server.
|
|
"""
|
|
magic = b'<moshez>'
|
|
|
|
def authenticateUserAPOP(self, user, digest):
|
|
"""
|
|
Authenticate against a user against a virtual domain.
|
|
|
|
@param user: The username.
|
|
@param digest: The digested password.
|
|
|
|
@return: A three-tuple like the one returned by
|
|
L{IRealm.requestAvatar}. The mailbox will be for the user given
|
|
by C{user}.
|
|
"""
|
|
user, domain = self.lookupDomain(user)
|
|
return self.service.domains[b'baz.com'].authenticateUserAPOP(
|
|
user, digest, self.magic, domain)
|
|
|
|
|
|
|
|
class DummyDomain:
|
|
"""
|
|
A virtual domain for a POP3 server.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.users = {}
|
|
|
|
|
|
def addUser(self, name):
|
|
"""
|
|
Create a mailbox for a new user.
|
|
|
|
@param name: The username.
|
|
"""
|
|
self.users[name] = []
|
|
|
|
|
|
def addMessage(self, name, message):
|
|
"""
|
|
Add a message to the mailbox of the named user.
|
|
|
|
@param name: The username.
|
|
@param message: The contents of the message.
|
|
"""
|
|
self.users[name].append(message)
|
|
|
|
|
|
def authenticateUserAPOP(self, name, digest, magic, domain):
|
|
"""
|
|
Succeed with a L{ListMailbox}.
|
|
|
|
@param name: The name of the user authenticating.
|
|
@param digest: ignored
|
|
@param magic: ignored
|
|
@param domain: ignored
|
|
|
|
@return: A three-tuple like the one returned by
|
|
L{IRealm.requestAvatar}. The mailbox will be for the user given
|
|
by C{name}.
|
|
"""
|
|
return pop3.IMailbox, ListMailbox(self.users[name]), lambda: None
|
|
|
|
|
|
|
|
class ListMailbox:
|
|
"""
|
|
A simple in-memory list implementation of L{IMailbox}.
|
|
"""
|
|
def __init__(self, list):
|
|
"""
|
|
@param list: The messages.
|
|
"""
|
|
self.list = list
|
|
|
|
|
|
def listMessages(self, i=None):
|
|
"""
|
|
Get some message information.
|
|
|
|
@param i: See L{pop3.IMailbox.listMessages}.
|
|
@return: See L{pop3.IMailbox.listMessages}.
|
|
"""
|
|
if i is None:
|
|
return [len(l) for l in self.list]
|
|
return len(self.list[i])
|
|
|
|
|
|
def getMessage(self, i):
|
|
"""
|
|
Get the message content.
|
|
|
|
@param i: See L{pop3.IMailbox.getMessage}.
|
|
@return: See L{pop3.IMailbox.getMessage}.
|
|
"""
|
|
return BytesIO(self.list[i])
|
|
|
|
|
|
def getUidl(self, i):
|
|
"""
|
|
Construct a UID by using the given index value.
|
|
|
|
@param i: See L{pop3.IMailbox.getUidl}.
|
|
@return: See L{pop3.IMailbox.getUidl}.
|
|
"""
|
|
return i
|
|
|
|
|
|
def deleteMessage(self, i):
|
|
"""
|
|
Wipe the message at the given index.
|
|
|
|
@param i: See L{pop3.IMailbox.deleteMessage}.
|
|
"""
|
|
self.list[i] = b''
|
|
|
|
|
|
def sync(self):
|
|
"""
|
|
No-op.
|
|
|
|
@see: L{pop3.IMailbox.sync}
|
|
"""
|
|
|
|
|
|
|
|
class MyPOP3Downloader(pop3.POP3Client):
|
|
"""
|
|
A POP3 client which downloads all messages from the server.
|
|
"""
|
|
def handle_WELCOME(self, line):
|
|
"""
|
|
Authenticate.
|
|
|
|
@param line: The welcome response.
|
|
"""
|
|
pop3.POP3Client.handle_WELCOME(self, line)
|
|
self.apop(b'hello@baz.com', b'world')
|
|
|
|
|
|
def handle_APOP(self, line):
|
|
"""
|
|
Require an I{OK} response to I{APOP}.
|
|
|
|
@param line: The I{APOP} response.
|
|
"""
|
|
parts = line.split()
|
|
code = parts[0]
|
|
if code != b'+OK':
|
|
raise AssertionError('code is: %s , parts is: %s ' % (code, parts))
|
|
self.lines = []
|
|
self.retr(1)
|
|
|
|
|
|
def handle_RETR_continue(self, line):
|
|
"""
|
|
Record one line of message information.
|
|
|
|
@param line: A I{RETR} response line.
|
|
"""
|
|
self.lines.append(line)
|
|
|
|
|
|
def handle_RETR_end(self):
|
|
"""
|
|
Record the received message information.
|
|
"""
|
|
self.message = b'\n'.join(self.lines) + b'\n'
|
|
self.quit()
|
|
|
|
|
|
def handle_QUIT(self, line):
|
|
"""
|
|
Require an I{OK} response to I{QUIT}.
|
|
|
|
@param line: The I{QUIT} response.
|
|
"""
|
|
if line[:3] != b'+OK':
|
|
raise AssertionError(b'code is ' + line)
|
|
|
|
|
|
|
|
class POP3Tests(unittest.TestCase):
|
|
"""
|
|
Tests for L{pop3.POP3}.
|
|
"""
|
|
|
|
message = b'''\
|
|
Subject: urgent
|
|
|
|
Someone set up us the bomb!
|
|
'''
|
|
|
|
expectedOutput = (b'''\
|
|
+OK <moshez>\015
|
|
+OK Authentication succeeded\015
|
|
+OK \015
|
|
1 0\015
|
|
.\015
|
|
+OK ''' + intToBytes(len(message)) + b'''\015
|
|
Subject: urgent\015
|
|
\015
|
|
Someone set up us the bomb!\015
|
|
.\015
|
|
+OK \015
|
|
''')
|
|
|
|
def setUp(self):
|
|
"""
|
|
Set up a POP3 server with virtual domain support.
|
|
"""
|
|
self.factory = internet.protocol.Factory()
|
|
self.factory.domains = {}
|
|
self.factory.domains[b'baz.com'] = DummyDomain()
|
|
self.factory.domains[b'baz.com'].addUser(b'hello')
|
|
self.factory.domains[b'baz.com'].addMessage(b'hello', self.message)
|
|
|
|
|
|
def test_messages(self):
|
|
"""
|
|
Messages can be downloaded over a loopback TCP connection.
|
|
"""
|
|
client = LineSendingProtocol([
|
|
b'APOP hello@baz.com world',
|
|
b'UIDL',
|
|
b'RETR 1',
|
|
b'QUIT',
|
|
])
|
|
server = MyVirtualPOP3()
|
|
server.service = self.factory
|
|
def check(ignored):
|
|
output = b'\r\n'.join(client.response) + b'\r\n'
|
|
self.assertEqual(output, self.expectedOutput)
|
|
return loopback.loopbackTCP(server, client).addCallback(check)
|
|
|
|
|
|
def test_loopback(self):
|
|
"""
|
|
Messages can be downloaded over a loopback connection.
|
|
"""
|
|
protocol = MyVirtualPOP3()
|
|
protocol.service = self.factory
|
|
clientProtocol = MyPOP3Downloader()
|
|
def check(ignored):
|
|
self.assertEqual(clientProtocol.message, self.message)
|
|
protocol.connectionLost(
|
|
failure.Failure(Exception("Test harness disconnect")))
|
|
d = loopback.loopbackAsync(protocol, clientProtocol)
|
|
return d.addCallback(check)
|
|
test_loopback.suppress = [util.suppress(
|
|
message="twisted.mail.pop3.POP3Client is deprecated")]
|
|
|
|
|
|
def test_incorrectDomain(self):
|
|
"""
|
|
Look up a user in a domain which this server does not support.
|
|
"""
|
|
factory = internet.protocol.Factory()
|
|
factory.domains = {}
|
|
factory.domains[b'twistedmatrix.com'] = DummyDomain()
|
|
|
|
server = MyVirtualPOP3()
|
|
server.service = factory
|
|
exc = self.assertRaises(pop3.POP3Error,
|
|
server.authenticateUserAPOP, b'nobody@baz.com', b'password')
|
|
self.assertEqual(exc.args[0], 'no such domain baz.com')
|
|
|
|
|
|
|
|
class DummyPOP3(pop3.POP3):
|
|
"""
|
|
A simple POP3 server with a hard-coded mailbox for any user.
|
|
"""
|
|
magic = b'<moshez>'
|
|
|
|
def authenticateUserAPOP(self, user, password):
|
|
"""
|
|
Succeed with a L{DummyMailbox}.
|
|
|
|
@param user: ignored
|
|
@param password: ignored
|
|
|
|
@return: A three-tuple like the one returned by
|
|
L{IRealm.requestAvatar}.
|
|
"""
|
|
return pop3.IMailbox, DummyMailbox(ValueError), lambda: None
|
|
|
|
|
|
|
|
class DummyPOP3Auth(DummyPOP3):
|
|
"""
|
|
Class to test successful authentication in twisted.mail.pop3.POP3.
|
|
"""
|
|
def __init__(self, user, password):
|
|
self.portal = cred.portal.Portal(TestRealm())
|
|
ch = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
|
|
ch.addUser(user, password)
|
|
self.portal.registerChecker(ch)
|
|
|
|
|
|
|
|
class DummyMailbox(pop3.Mailbox):
|
|
"""
|
|
An in-memory L{pop3.IMailbox} implementation.
|
|
|
|
@ivar messages: A sequence of L{bytes} defining the messages in this
|
|
mailbox.
|
|
|
|
@ivar exceptionType: The type of exception to raise when an out-of-bounds
|
|
index is addressed.
|
|
"""
|
|
messages = [b'From: moshe\nTo: moshe\n\nHow are you, friend?\n']
|
|
|
|
def __init__(self, exceptionType):
|
|
self.messages = DummyMailbox.messages[:]
|
|
self.exceptionType = exceptionType
|
|
|
|
|
|
def listMessages(self, i=None):
|
|
"""
|
|
Get some message information.
|
|
|
|
@param i: See L{pop3.IMailbox.listMessages}.
|
|
@return: See L{pop3.IMailbox.listMessages}.
|
|
"""
|
|
if i is None:
|
|
return [len(m) for m in self.messages]
|
|
if i >= len(self.messages):
|
|
raise self.exceptionType()
|
|
return len(self.messages[i])
|
|
|
|
|
|
def getMessage(self, i):
|
|
"""
|
|
Get the message content.
|
|
|
|
@param i: See L{pop3.IMailbox.getMessage}.
|
|
@return: See L{pop3.IMailbox.getMessage}.
|
|
"""
|
|
return BytesIO(self.messages[i])
|
|
|
|
|
|
def getUidl(self, i):
|
|
"""
|
|
Construct a UID which is simply the string representation of the given
|
|
index.
|
|
|
|
@param i: See L{pop3.IMailbox.getUidl}.
|
|
@return: See L{pop3.IMailbox.getUidl}.
|
|
"""
|
|
if i >= len(self.messages):
|
|
raise self.exceptionType()
|
|
return intToBytes(i)
|
|
|
|
|
|
def deleteMessage(self, i):
|
|
"""
|
|
Wipe the message at the given index.
|
|
|
|
@param i: See L{pop3.IMailbox.deleteMessage}.
|
|
"""
|
|
self.messages[i] = b''
|
|
|
|
|
|
|
|
class AnotherPOP3Tests(unittest.TestCase):
|
|
"""
|
|
Additional L{pop3.POP3} tests.
|
|
"""
|
|
def runTest(self, lines, expectedOutput, protocolInstance=None):
|
|
"""
|
|
Assert that when C{lines} are delivered to L{pop3.POP3} it responds
|
|
with C{expectedOutput}.
|
|
|
|
@param lines: A sequence of L{bytes} representing lines to deliver to
|
|
the server.
|
|
|
|
@param expectedOutput: A sequence of L{bytes} representing the
|
|
expected response from the server.
|
|
|
|
@param protocolInstance: Instance of L{twisted.mail.pop3.POP3} or
|
|
L{None}. If L{None}, a new DummyPOP3 will be used.
|
|
|
|
@return: A L{Deferred} that fires when the lines have been delivered
|
|
and the output checked.
|
|
"""
|
|
dummy = protocolInstance if protocolInstance else DummyPOP3()
|
|
client = LineSendingProtocol(lines)
|
|
d = loopback.loopbackAsync(dummy, client)
|
|
return d.addCallback(self._cbRunTest, client, dummy, expectedOutput)
|
|
|
|
|
|
def _cbRunTest(self, ignored, client, dummy, expectedOutput):
|
|
self.assertEqual(b'\r\n'.join(expectedOutput),
|
|
b'\r\n'.join(client.response))
|
|
dummy.connectionLost(failure.Failure(
|
|
Exception("Test harness disconnect")))
|
|
return ignored
|
|
|
|
|
|
def test_buffer(self):
|
|
"""
|
|
Test a lot of different POP3 commands in an extremely pipelined
|
|
scenario.
|
|
|
|
This test may cover legitimate behavior, but the intent and
|
|
granularity are not very good. It would likely be an improvement to
|
|
split it into a number of smaller, more focused tests.
|
|
"""
|
|
return self.runTest(
|
|
[b"APOP moshez dummy",
|
|
b"LIST",
|
|
b"UIDL",
|
|
b"RETR 1",
|
|
b"RETR 2",
|
|
b"DELE 1",
|
|
b"RETR 1",
|
|
b"QUIT"],
|
|
[b'+OK <moshez>',
|
|
b'+OK Authentication succeeded',
|
|
b'+OK 1',
|
|
b'1 44',
|
|
b'.',
|
|
b'+OK ',
|
|
b'1 0',
|
|
b'.',
|
|
b'+OK 44',
|
|
b'From: moshe',
|
|
b'To: moshe',
|
|
b'',
|
|
b'How are you, friend?',
|
|
b'.',
|
|
b'-ERR Bad message number argument',
|
|
b'+OK ',
|
|
b'-ERR message deleted',
|
|
b'+OK '])
|
|
|
|
|
|
def test_noop(self):
|
|
"""
|
|
Test the no-op command.
|
|
"""
|
|
return self.runTest(
|
|
[b'APOP spiv dummy',
|
|
b'NOOP',
|
|
b'QUIT'],
|
|
[b'+OK <moshez>',
|
|
b'+OK Authentication succeeded',
|
|
b'+OK ',
|
|
b'+OK '])
|
|
|
|
|
|
def test_badUTF8CharactersInCommand(self):
|
|
"""
|
|
Sending a command with invalid UTF-8 characters
|
|
will raise a L{pop3.POP3Error}.
|
|
"""
|
|
error = b'not authenticated yet: cannot do \x81PASS'
|
|
d = self.runTest(
|
|
[b'\x81PASS',
|
|
b'QUIT'],
|
|
[b'+OK <moshez>',
|
|
b"-ERR bad protocol or server: POP3Error: " +
|
|
error,
|
|
b'+OK '])
|
|
errors = self.flushLoggedErrors(pop3.POP3Error)
|
|
self.assertEqual(len(errors), 1)
|
|
return d
|
|
|
|
|
|
def test_authListing(self):
|
|
"""
|
|
L{pop3.POP3} responds to an I{AUTH} command with a list of supported
|
|
authentication types based on its factory's C{challengers}.
|
|
"""
|
|
p = DummyPOP3()
|
|
p.factory = internet.protocol.Factory()
|
|
p.factory.challengers = {b'Auth1': None, b'secondAuth': None,
|
|
b'authLast': None}
|
|
client = LineSendingProtocol([
|
|
b"AUTH",
|
|
b"QUIT",
|
|
])
|
|
|
|
d = loopback.loopbackAsync(p, client)
|
|
return d.addCallback(self._cbTestAuthListing, client)
|
|
|
|
|
|
def _cbTestAuthListing(self, ignored, client):
|
|
self.assertTrue(client.response[1].startswith(b'+OK'))
|
|
self.assertEqual(sorted(client.response[2:5]),
|
|
[b"AUTH1", b"AUTHLAST", b"SECONDAUTH"])
|
|
self.assertEqual(client.response[5], b".")
|
|
|
|
|
|
def run_PASS(self, real_user, real_password,
|
|
tried_user=None, tried_password=None,
|
|
after_auth_input=[], after_auth_output=[]):
|
|
"""
|
|
Test a login with PASS.
|
|
|
|
If L{real_user} matches L{tried_user} and L{real_password} matches
|
|
L{tried_password}, a successful login will be expected.
|
|
Otherwise an unsuccessful login will be expected.
|
|
|
|
@type real_user: L{bytes}
|
|
@param real_user: The user to test.
|
|
|
|
@type real_password: L{bytes}
|
|
@param real_password: The password of the test user.
|
|
|
|
@type tried_user: L{bytes} or L{None}
|
|
@param tried_user: The user to call USER with.
|
|
If None, real_user will be used.
|
|
|
|
@type tried_password: L{bytes} or L{None}
|
|
@param tried_password: The password to call PASS with.
|
|
If None, real_password will be used.
|
|
|
|
@type after_auth_input: L{list} of l{bytes}
|
|
@param after_auth_input: Extra protocol input after authentication.
|
|
|
|
@type after_auth_output: L{list} of l{bytes}
|
|
@param after_auth_output: Extra protocol output after authentication.
|
|
"""
|
|
if not tried_user:
|
|
tried_user = real_user
|
|
if not tried_password:
|
|
tried_password = real_password
|
|
response = [b'+OK <moshez>',
|
|
b'+OK USER accepted, send PASS',
|
|
b'-ERR Authentication failed']
|
|
if real_user == tried_user and real_password == tried_password:
|
|
response = [b'+OK <moshez>',
|
|
b'+OK USER accepted, send PASS',
|
|
b'+OK Authentication succeeded']
|
|
fullInput = [b' '.join([b'USER', tried_user]),
|
|
b' '.join([b'PASS', tried_password])]
|
|
|
|
fullInput += after_auth_input + [b'QUIT']
|
|
response += after_auth_output + [b'+OK ']
|
|
|
|
return self.runTest(
|
|
fullInput,
|
|
response,
|
|
protocolInstance=DummyPOP3Auth(real_user, real_password))
|
|
|
|
|
|
def run_PASS_before_USER(self, password):
|
|
"""
|
|
Test protocol violation produced by calling PASS before USER.
|
|
@type password: L{bytes}
|
|
@param password: A password to test.
|
|
"""
|
|
return self.runTest(
|
|
[b' '.join([b'PASS', password]),
|
|
b'QUIT'],
|
|
[b'+OK <moshez>',
|
|
b'-ERR USER required before PASS',
|
|
b'+OK '])
|
|
|
|
|
|
def test_illegal_PASS_before_USER(self):
|
|
"""
|
|
Test PASS before USER with a wrong password.
|
|
"""
|
|
return self.run_PASS_before_USER(b'fooz')
|
|
|
|
|
|
def test_empty_PASS_before_USER(self):
|
|
"""
|
|
Test PASS before USER with an empty password.
|
|
"""
|
|
return self.run_PASS_before_USER(b'')
|
|
|
|
|
|
def test_one_space_PASS_before_USER(self):
|
|
"""
|
|
Test PASS before USER with an password that is a space.
|
|
"""
|
|
return self.run_PASS_before_USER(b' ')
|
|
|
|
|
|
def test_space_PASS_before_USER(self):
|
|
"""
|
|
Test PASS before USER with a password containing a space.
|
|
"""
|
|
return self.run_PASS_before_USER(b'fooz barz')
|
|
|
|
|
|
def test_multiple_spaces_PASS_before_USER(self):
|
|
"""
|
|
Test PASS before USER with a password containing multiple spaces.
|
|
"""
|
|
return self.run_PASS_before_USER(b'fooz barz asdf')
|
|
|
|
|
|
def test_other_whitespace_PASS_before_USER(self):
|
|
"""
|
|
Test PASS before USER with a password containing tabs and spaces.
|
|
"""
|
|
return self.run_PASS_before_USER(b'fooz barz\tcrazy@! \t ')
|
|
|
|
|
|
def test_good_PASS(self):
|
|
"""
|
|
Test PASS with a good password.
|
|
"""
|
|
return self.run_PASS(b'testuser', b'fooz')
|
|
|
|
|
|
def test_space_PASS(self):
|
|
"""
|
|
Test PASS with a password containing a space.
|
|
"""
|
|
return self.run_PASS(b'testuser', b'fooz barz')
|
|
|
|
|
|
def test_multiple_spaces_PASS(self):
|
|
"""
|
|
Test PASS with a password containing a space.
|
|
"""
|
|
return self.run_PASS(b'testuser', b'fooz barz asdf')
|
|
|
|
|
|
def test_other_whitespace_PASS(self):
|
|
"""
|
|
Test PASS with a password containing tabs and spaces.
|
|
"""
|
|
return self.run_PASS(b'testuser', b'fooz barz\tcrazy@! \t ')
|
|
|
|
|
|
def test_pass_wrong_user(self):
|
|
"""
|
|
Test PASS with a wrong user.
|
|
"""
|
|
return self.run_PASS(b'testuser', b'fooz',
|
|
tried_user=b'wronguser')
|
|
|
|
|
|
def test_wrong_PASS(self):
|
|
"""
|
|
Test PASS with a wrong password.
|
|
"""
|
|
return self.run_PASS(b'testuser', b'fooz',
|
|
tried_password=b'barz')
|
|
|
|
|
|
def test_wrong_space_PASS(self):
|
|
"""
|
|
Test PASS with a password containing a space.
|
|
"""
|
|
return self.run_PASS(b'testuser', b'fooz barz',
|
|
tried_password=b'foozbarz ')
|
|
|
|
|
|
def test_wrong_multiple_spaces_PASS(self):
|
|
"""
|
|
Test PASS with a password containing a space.
|
|
"""
|
|
return self.run_PASS(b'testuser', b'fooz barz asdf',
|
|
tried_password=b'foozbarz ')
|
|
|
|
|
|
def test_wrong_other_whitespace_PASS(self):
|
|
"""
|
|
Test PASS with a password containing tabs and spaces.
|
|
"""
|
|
return self.run_PASS(b'testuser', b'fooz barz\tcrazy@! \t ')
|
|
|
|
|
|
def test_wrong_command(self):
|
|
"""
|
|
After logging in, test a dummy command that is not defined.
|
|
"""
|
|
extra_input = [b'DUMMY COMMAND']
|
|
extra_output = [b' '.join([b'-ERR bad protocol or server: POP3Error:',
|
|
b'Unknown protocol command: DUMMY'])]
|
|
|
|
return self.run_PASS(b'testuser', b'testpassword',
|
|
after_auth_input=extra_input,
|
|
after_auth_output=extra_output,
|
|
).addCallback(self.flushLoggedErrors,
|
|
pop3.POP3Error)
|
|
|
|
|
|
|
|
@implementer(pop3.IServerFactory)
|
|
class TestServerFactory:
|
|
"""
|
|
A L{pop3.IServerFactory} implementation, for use by the test suite, with
|
|
some behavior controlled by the values of (settable) public attributes and
|
|
other behavior based on values hard-coded both here and in some test
|
|
methods.
|
|
"""
|
|
def cap_IMPLEMENTATION(self):
|
|
"""
|
|
Return the hard-coded value.
|
|
|
|
@return: L{pop3.IServerFactory}
|
|
"""
|
|
return "Test Implementation String"
|
|
|
|
|
|
def cap_EXPIRE(self):
|
|
"""
|
|
Return the hard-coded value.
|
|
|
|
@return: L{pop3.IServerFactory}
|
|
"""
|
|
return 60
|
|
|
|
challengers = OrderedDict([(b"SCHEME_1", None), (b"SCHEME_2", None)])
|
|
|
|
def cap_LOGIN_DELAY(self):
|
|
"""
|
|
Return the hard-coded value.
|
|
|
|
@return: L{pop3.IServerFactory}
|
|
"""
|
|
return 120
|
|
|
|
pue = True
|
|
def perUserExpiration(self):
|
|
"""
|
|
Return the hard-coded value.
|
|
|
|
@return: L{pop3.IServerFactory}
|
|
"""
|
|
return self.pue
|
|
|
|
puld = True
|
|
def perUserLoginDelay(self):
|
|
"""
|
|
Return the hard-coded value.
|
|
|
|
@return: L{pop3.IServerFactory}
|
|
"""
|
|
return self.puld
|
|
|
|
|
|
|
|
class TestMailbox:
|
|
"""
|
|
An incomplete L{IMailbox} implementation with certain per-user values
|
|
hard-coded and known by tests in this module.
|
|
|
|
|
|
This is useful for testing the server's per-user capability
|
|
implementation.
|
|
"""
|
|
loginDelay = 100
|
|
messageExpiration = 25
|
|
|
|
|
|
|
|
def contained(testcase, s, *caps):
|
|
"""
|
|
Assert that the given capability is included in all of the capability
|
|
sets.
|
|
|
|
@param testcase: A L{unittest.TestCase} to use to make assertions.
|
|
|
|
@param s: The capability for which to check.
|
|
@type s: L{bytes}
|
|
|
|
@param caps: The capability sets in which to check.
|
|
@type caps: L{tuple} of iterable
|
|
"""
|
|
for c in caps:
|
|
testcase.assertIn(s, c)
|
|
|
|
|
|
|
|
class CapabilityTests(unittest.TestCase):
|
|
"""
|
|
Tests for L{pop3.POP3}'s per-user capability handling.
|
|
"""
|
|
def setUp(self):
|
|
"""
|
|
Create a POP3 server with some capabilities.
|
|
"""
|
|
s = BytesIO()
|
|
p = pop3.POP3()
|
|
p.factory = TestServerFactory()
|
|
p.transport = internet.protocol.FileWrapper(s)
|
|
p.connectionMade()
|
|
p.do_CAPA()
|
|
|
|
self.caps = p.listCapabilities()
|
|
self.pcaps = s.getvalue().splitlines()
|
|
|
|
s = BytesIO()
|
|
p.mbox = TestMailbox()
|
|
p.transport = internet.protocol.FileWrapper(s)
|
|
p.do_CAPA()
|
|
|
|
self.lpcaps = s.getvalue().splitlines()
|
|
p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
|
|
|
|
|
|
def test_UIDL(self):
|
|
"""
|
|
The server can advertise the I{UIDL} capability.
|
|
"""
|
|
contained(self, b"UIDL", self.caps, self.pcaps, self.lpcaps)
|
|
|
|
|
|
def test_TOP(self):
|
|
"""
|
|
The server can advertise the I{TOP} capability.
|
|
"""
|
|
contained(self, b"TOP", self.caps, self.pcaps, self.lpcaps)
|
|
|
|
|
|
def test_USER(self):
|
|
"""
|
|
The server can advertise the I{USER} capability.
|
|
"""
|
|
contained(self, b"USER", self.caps, self.pcaps, self.lpcaps)
|
|
|
|
|
|
def test_EXPIRE(self):
|
|
"""
|
|
The server can advertise its per-user expiration as well as a global
|
|
expiration.
|
|
"""
|
|
contained(self, b"EXPIRE 60 USER", self.caps, self.pcaps)
|
|
contained(self, b"EXPIRE 25", self.lpcaps)
|
|
|
|
|
|
def test_IMPLEMENTATION(self):
|
|
"""
|
|
The server can advertise its implementation string.
|
|
"""
|
|
contained(
|
|
self,
|
|
b"IMPLEMENTATION Test Implementation String",
|
|
self.caps, self.pcaps, self.lpcaps
|
|
)
|
|
|
|
|
|
def test_SASL(self):
|
|
"""
|
|
The server can advertise the SASL schemes it supports.
|
|
"""
|
|
contained(
|
|
self,
|
|
b"SASL SCHEME_1 SCHEME_2",
|
|
self.caps, self.pcaps, self.lpcaps
|
|
)
|
|
|
|
|
|
def test_LOGIN_DELAY(self):
|
|
"""
|
|
The can advertise a per-user login delay as well as a global login
|
|
delay.
|
|
"""
|
|
contained(self, b"LOGIN-DELAY 120 USER", self.caps, self.pcaps)
|
|
self.assertIn(b"LOGIN-DELAY 100", self.lpcaps)
|
|
|
|
|
|
|
|
class GlobalCapabilitiesTests(unittest.TestCase):
|
|
"""
|
|
Tests for L{pop3.POP3}'s global capability handling.
|
|
"""
|
|
def setUp(self):
|
|
"""
|
|
Create a POP3 server with some capabilities.
|
|
"""
|
|
s = BytesIO()
|
|
p = pop3.POP3()
|
|
p.factory = TestServerFactory()
|
|
p.factory.pue = p.factory.puld = False
|
|
p.transport = internet.protocol.FileWrapper(s)
|
|
p.connectionMade()
|
|
p.do_CAPA()
|
|
|
|
self.caps = p.listCapabilities()
|
|
self.pcaps = s.getvalue().splitlines()
|
|
|
|
s = BytesIO()
|
|
p.mbox = TestMailbox()
|
|
p.transport = internet.protocol.FileWrapper(s)
|
|
p.do_CAPA()
|
|
|
|
self.lpcaps = s.getvalue().splitlines()
|
|
p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
|
|
|
|
|
|
def test_EXPIRE(self):
|
|
"""
|
|
I{EXPIRE} is in the server's advertised capabilities.
|
|
"""
|
|
contained(self, b"EXPIRE 60", self.caps, self.pcaps, self.lpcaps)
|
|
|
|
|
|
def test_LOGIN_DELAY(self):
|
|
"""
|
|
I{LOGIN-DELAY} is in the server's advertised capabilities.
|
|
"""
|
|
contained(self, b"LOGIN-DELAY 120", self.caps, self.pcaps, self.lpcaps)
|
|
|
|
|
|
|
|
class TestRealm:
|
|
"""
|
|
An L{IRealm} which knows about a single test account's mailbox.
|
|
"""
|
|
def requestAvatar(self, avatarId, mind, *interfaces):
|
|
"""
|
|
Retrieve a mailbox for I{testuser} or fail.
|
|
|
|
@param avatarId: See L{IRealm.requestAvatar}.
|
|
@param mind: See L{IRealm.requestAvatar}.
|
|
@param interfaces: See L{IRealm.requestAvatar}.
|
|
|
|
@raises: L{AssertionError} when requesting an C{avatarId} other than
|
|
I{testuser}.
|
|
"""
|
|
if avatarId == b'testuser':
|
|
return pop3.IMailbox, DummyMailbox(ValueError), lambda: None
|
|
assert False
|
|
|
|
|
|
|
|
class SASLTests(unittest.TestCase):
|
|
"""
|
|
Tests for L{pop3.POP3}'s SASL implementation.
|
|
"""
|
|
def test_ValidLogin(self):
|
|
"""
|
|
A CRAM-MD5-based SASL login attempt succeeds if it uses a username and
|
|
a hashed password known to the server's credentials checker.
|
|
"""
|
|
p = pop3.POP3()
|
|
p.factory = TestServerFactory()
|
|
p.factory.challengers = {b'CRAM-MD5':
|
|
cred.credentials.CramMD5Credentials}
|
|
p.portal = cred.portal.Portal(TestRealm())
|
|
ch = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
|
|
ch.addUser(b'testuser', b'testpassword')
|
|
p.portal.registerChecker(ch)
|
|
|
|
s = BytesIO()
|
|
p.transport = internet.protocol.FileWrapper(s)
|
|
p.connectionMade()
|
|
|
|
p.lineReceived(b"CAPA")
|
|
self.assertTrue(s.getvalue().find(b"SASL CRAM-MD5") >= 0)
|
|
|
|
p.lineReceived(b"AUTH CRAM-MD5")
|
|
chal = s.getvalue().splitlines()[-1][2:]
|
|
chal = base64.decodestring(chal)
|
|
response = hmac.HMAC(b'testpassword', chal).hexdigest().encode("ascii")
|
|
|
|
p.lineReceived(
|
|
base64.encodestring(b'testuser ' + response).rstrip(b'\n'))
|
|
self.assertTrue(p.mbox)
|
|
self.assertTrue(s.getvalue().splitlines()[-1].find(b"+OK") >= 0)
|
|
p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
|
|
|
|
|
|
|
|
class CommandMixin:
|
|
"""
|
|
Tests for all the commands a POP3 server is allowed to receive.
|
|
"""
|
|
|
|
extraMessage = b'''\
|
|
From: guy
|
|
To: fellow
|
|
|
|
More message text for you.
|
|
'''
|
|
|
|
|
|
def setUp(self):
|
|
"""
|
|
Make a POP3 server protocol instance hooked up to a simple mailbox and
|
|
a transport that buffers output to a BytesIO.
|
|
"""
|
|
p = pop3.POP3()
|
|
p.mbox = self.mailboxType(self.exceptionType)
|
|
p.schedule = list
|
|
self.pop3Server = p
|
|
|
|
s = BytesIO()
|
|
p.transport = internet.protocol.FileWrapper(s)
|
|
p.connectionMade()
|
|
s.seek(0)
|
|
s.truncate(0)
|
|
self.pop3Transport = s
|
|
|
|
|
|
def tearDown(self):
|
|
"""
|
|
Disconnect the server protocol so it can clean up anything it might
|
|
need to clean up.
|
|
"""
|
|
self.pop3Server.connectionLost(failure.Failure(
|
|
Exception("Test harness disconnect")))
|
|
|
|
|
|
def _flush(self):
|
|
"""
|
|
Do some of the things that the reactor would take care of, if the
|
|
reactor were actually running.
|
|
"""
|
|
# Oh man FileWrapper is pooh.
|
|
self.pop3Server.transport._checkProducer()
|
|
|
|
|
|
def test_LIST(self):
|
|
"""
|
|
Test the two forms of list: with a message index number, which should
|
|
return a short-form response, and without a message index number, which
|
|
should return a long-form response, one line per message.
|
|
"""
|
|
p = self.pop3Server
|
|
s = self.pop3Transport
|
|
|
|
p.lineReceived(b"LIST 1")
|
|
self._flush()
|
|
self.assertEqual(s.getvalue(), b"+OK 1 44\r\n")
|
|
s.seek(0)
|
|
s.truncate(0)
|
|
|
|
p.lineReceived(b"LIST")
|
|
self._flush()
|
|
self.assertEqual(s.getvalue(), b"+OK 1\r\n1 44\r\n.\r\n")
|
|
|
|
|
|
def test_LISTWithBadArgument(self):
|
|
"""
|
|
Test that non-integers and out-of-bound integers produce appropriate
|
|
error responses.
|
|
"""
|
|
p = self.pop3Server
|
|
s = self.pop3Transport
|
|
|
|
p.lineReceived(b"LIST a")
|
|
self.assertEqual(
|
|
s.getvalue(),
|
|
b"-ERR Invalid message-number: a\r\n")
|
|
s.seek(0)
|
|
s.truncate(0)
|
|
|
|
p.lineReceived(b"LIST 0")
|
|
self.assertEqual(
|
|
s.getvalue(),
|
|
b"-ERR Invalid message-number: 0\r\n")
|
|
s.seek(0)
|
|
s.truncate(0)
|
|
|
|
p.lineReceived(b"LIST 2")
|
|
self.assertEqual(
|
|
s.getvalue(),
|
|
b"-ERR Invalid message-number: 2\r\n")
|
|
s.seek(0)
|
|
s.truncate(0)
|
|
|
|
|
|
def test_UIDL(self):
|
|
"""
|
|
Test the two forms of the UIDL command. These are just like the two
|
|
forms of the LIST command.
|
|
"""
|
|
p = self.pop3Server
|
|
s = self.pop3Transport
|
|
|
|
p.lineReceived(b"UIDL 1")
|
|
self.assertEqual(s.getvalue(), b"+OK 0\r\n")
|
|
s.seek(0)
|
|
s.truncate(0)
|
|
|
|
p.lineReceived(b"UIDL")
|
|
self._flush()
|
|
self.assertEqual(s.getvalue(), b"+OK \r\n1 0\r\n.\r\n")
|
|
|
|
|
|
def test_UIDLWithBadArgument(self):
|
|
"""
|
|
Test that UIDL with a non-integer or an out-of-bounds integer produces
|
|
the appropriate error response.
|
|
"""
|
|
p = self.pop3Server
|
|
s = self.pop3Transport
|
|
|
|
p.lineReceived(b"UIDL a")
|
|
self.assertEqual(
|
|
s.getvalue(),
|
|
b"-ERR Bad message number argument\r\n")
|
|
s.seek(0)
|
|
s.truncate(0)
|
|
|
|
p.lineReceived(b"UIDL 0")
|
|
self.assertEqual(
|
|
s.getvalue(),
|
|
b"-ERR Bad message number argument\r\n")
|
|
s.seek(0)
|
|
s.truncate(0)
|
|
|
|
p.lineReceived(b"UIDL 2")
|
|
self.assertEqual(
|
|
s.getvalue(),
|
|
b"-ERR Bad message number argument\r\n")
|
|
s.seek(0)
|
|
s.truncate(0)
|
|
|
|
|
|
def test_STAT(self):
|
|
"""
|
|
Test the single form of the STAT command, which returns a short-form
|
|
response of the number of messages in the mailbox and their total size.
|
|
"""
|
|
p = self.pop3Server
|
|
s = self.pop3Transport
|
|
|
|
p.lineReceived(b"STAT")
|
|
self._flush()
|
|
self.assertEqual(s.getvalue(), b"+OK 1 44\r\n")
|
|
|
|
|
|
def test_RETR(self):
|
|
"""
|
|
Test downloading a message.
|
|
"""
|
|
p = self.pop3Server
|
|
s = self.pop3Transport
|
|
|
|
p.lineReceived(b"RETR 1")
|
|
self._flush()
|
|
self.assertEqual(
|
|
s.getvalue(),
|
|
b"+OK 44\r\n"
|
|
b"From: moshe\r\n"
|
|
b"To: moshe\r\n"
|
|
b"\r\n"
|
|
b"How are you, friend?\r\n"
|
|
b".\r\n")
|
|
s.seek(0)
|
|
s.truncate(0)
|
|
|
|
|
|
def test_RETRWithBadArgument(self):
|
|
"""
|
|
Test that trying to download a message with a bad argument, either not
|
|
an integer or an out-of-bounds integer, fails with the appropriate
|
|
error response.
|
|
"""
|
|
p = self.pop3Server
|
|
s = self.pop3Transport
|
|
|
|
p.lineReceived(b"RETR a")
|
|
self.assertEqual(
|
|
s.getvalue(),
|
|
b"-ERR Bad message number argument\r\n")
|
|
s.seek(0)
|
|
s.truncate(0)
|
|
|
|
p.lineReceived(b"RETR 0")
|
|
self.assertEqual(
|
|
s.getvalue(),
|
|
b"-ERR Bad message number argument\r\n")
|
|
s.seek(0)
|
|
s.truncate(0)
|
|
|
|
p.lineReceived(b"RETR 2")
|
|
self.assertEqual(
|
|
s.getvalue(),
|
|
b"-ERR Bad message number argument\r\n")
|
|
s.seek(0)
|
|
s.truncate(0)
|
|
|
|
|
|
def test_TOP(self):
|
|
"""
|
|
Test downloading the headers and part of the body of a message.
|
|
"""
|
|
p = self.pop3Server
|
|
s = self.pop3Transport
|
|
p.mbox.messages.append(self.extraMessage)
|
|
|
|
p.lineReceived(b"TOP 1 0")
|
|
self._flush()
|
|
self.assertEqual(
|
|
s.getvalue(),
|
|
b"+OK Top of message follows\r\n"
|
|
b"From: moshe\r\n"
|
|
b"To: moshe\r\n"
|
|
b"\r\n"
|
|
b".\r\n")
|
|
|
|
|
|
def test_TOPWithBadArgument(self):
|
|
"""
|
|
Test that trying to download a message with a bad argument, either a
|
|
message number which isn't an integer or is an out-of-bounds integer or
|
|
a number of lines which isn't an integer or is a negative integer,
|
|
fails with the appropriate error response.
|
|
"""
|
|
p = self.pop3Server
|
|
s = self.pop3Transport
|
|
p.mbox.messages.append(self.extraMessage)
|
|
|
|
p.lineReceived(b"TOP 1 a")
|
|
self.assertEqual(
|
|
s.getvalue(),
|
|
b"-ERR Bad line count argument\r\n")
|
|
s.seek(0)
|
|
s.truncate(0)
|
|
|
|
p.lineReceived(b"TOP 1 -1")
|
|
self.assertEqual(
|
|
s.getvalue(),
|
|
b"-ERR Bad line count argument\r\n")
|
|
s.seek(0)
|
|
s.truncate(0)
|
|
|
|
p.lineReceived(b"TOP a 1")
|
|
self.assertEqual(
|
|
s.getvalue(),
|
|
b"-ERR Bad message number argument\r\n")
|
|
s.seek(0)
|
|
s.truncate(0)
|
|
|
|
p.lineReceived(b"TOP 0 1")
|
|
self.assertEqual(
|
|
s.getvalue(),
|
|
b"-ERR Bad message number argument\r\n")
|
|
s.seek(0)
|
|
s.truncate(0)
|
|
|
|
p.lineReceived(b"TOP 3 1")
|
|
self.assertEqual(
|
|
s.getvalue(),
|
|
b"-ERR Bad message number argument\r\n")
|
|
s.seek(0)
|
|
s.truncate(0)
|
|
|
|
|
|
def test_LAST(self):
|
|
"""
|
|
Test the exceedingly pointless LAST command, which tells you the
|
|
highest message index which you have already downloaded.
|
|
"""
|
|
p = self.pop3Server
|
|
s = self.pop3Transport
|
|
p.mbox.messages.append(self.extraMessage)
|
|
|
|
p.lineReceived(b'LAST')
|
|
self.assertEqual(
|
|
s.getvalue(),
|
|
b"+OK 0\r\n")
|
|
s.seek(0)
|
|
s.truncate(0)
|
|
|
|
|
|
def test_RetrieveUpdatesHighest(self):
|
|
"""
|
|
Test that issuing a RETR command updates the LAST response.
|
|
"""
|
|
p = self.pop3Server
|
|
s = self.pop3Transport
|
|
p.mbox.messages.append(self.extraMessage)
|
|
|
|
p.lineReceived(b'RETR 2')
|
|
self._flush()
|
|
s.seek(0)
|
|
s.truncate(0)
|
|
p.lineReceived(b'LAST')
|
|
self.assertEqual(
|
|
s.getvalue(),
|
|
b'+OK 2\r\n')
|
|
s.seek(0)
|
|
s.truncate(0)
|
|
|
|
|
|
def test_TopUpdatesHighest(self):
|
|
"""
|
|
Test that issuing a TOP command updates the LAST response.
|
|
"""
|
|
p = self.pop3Server
|
|
s = self.pop3Transport
|
|
p.mbox.messages.append(self.extraMessage)
|
|
|
|
p.lineReceived(b'TOP 2 10')
|
|
self._flush()
|
|
s.seek(0)
|
|
s.truncate(0)
|
|
p.lineReceived(b'LAST')
|
|
self.assertEqual(
|
|
s.getvalue(),
|
|
b'+OK 2\r\n')
|
|
|
|
|
|
def test_HighestOnlyProgresses(self):
|
|
"""
|
|
Test that downloading a message with a smaller index than the current
|
|
LAST response doesn't change the LAST response.
|
|
"""
|
|
p = self.pop3Server
|
|
s = self.pop3Transport
|
|
p.mbox.messages.append(self.extraMessage)
|
|
|
|
p.lineReceived(b'RETR 2')
|
|
self._flush()
|
|
p.lineReceived(b'TOP 1 10')
|
|
self._flush()
|
|
s.seek(0)
|
|
s.truncate(0)
|
|
p.lineReceived(b'LAST')
|
|
self.assertEqual(
|
|
s.getvalue(),
|
|
b'+OK 2\r\n')
|
|
|
|
|
|
def test_ResetClearsHighest(self):
|
|
"""
|
|
Test that issuing RSET changes the LAST response to 0.
|
|
"""
|
|
p = self.pop3Server
|
|
s = self.pop3Transport
|
|
p.mbox.messages.append(self.extraMessage)
|
|
|
|
p.lineReceived(b'RETR 2')
|
|
self._flush()
|
|
p.lineReceived(b'RSET')
|
|
s.seek(0)
|
|
s.truncate(0)
|
|
p.lineReceived(b'LAST')
|
|
self.assertEqual(
|
|
s.getvalue(),
|
|
b'+OK 0\r\n')
|
|
|
|
|
|
|
|
_listMessageDeprecation = (
|
|
"twisted.mail.pop3.IMailbox.listMessages may not "
|
|
"raise IndexError for out-of-bounds message numbers: "
|
|
"raise ValueError instead.")
|
|
_listMessageSuppression = util.suppress(
|
|
message=_listMessageDeprecation,
|
|
category=PendingDeprecationWarning)
|
|
|
|
_getUidlDeprecation = (
|
|
"twisted.mail.pop3.IMailbox.getUidl may not "
|
|
"raise IndexError for out-of-bounds message numbers: "
|
|
"raise ValueError instead.")
|
|
_getUidlSuppression = util.suppress(
|
|
message=_getUidlDeprecation,
|
|
category=PendingDeprecationWarning)
|
|
|
|
class IndexErrorCommandTests(CommandMixin, unittest.TestCase):
|
|
"""
|
|
Run all of the command tests against a mailbox which raises IndexError
|
|
when an out of bounds request is made. This behavior will be deprecated
|
|
shortly and then removed.
|
|
"""
|
|
exceptionType = IndexError
|
|
mailboxType = DummyMailbox
|
|
|
|
def test_LISTWithBadArgument(self):
|
|
"""
|
|
An attempt to get metadata about a message with a bad argument fails
|
|
with an I{ERR} response even if the mailbox implementation raises
|
|
L{IndexError}.
|
|
"""
|
|
return CommandMixin.test_LISTWithBadArgument(self)
|
|
test_LISTWithBadArgument.suppress = [_listMessageSuppression]
|
|
|
|
|
|
def test_UIDLWithBadArgument(self):
|
|
"""
|
|
An attempt to look up the UID of a message with a bad argument fails
|
|
with an I{ERR} response even if the mailbox implementation raises
|
|
L{IndexError}.
|
|
"""
|
|
return CommandMixin.test_UIDLWithBadArgument(self)
|
|
test_UIDLWithBadArgument.suppress = [_getUidlSuppression]
|
|
|
|
|
|
def test_TOPWithBadArgument(self):
|
|
"""
|
|
An attempt to download some of a message with a bad argument fails with
|
|
an I{ERR} response even if the mailbox implementation raises
|
|
L{IndexError}.
|
|
"""
|
|
return CommandMixin.test_TOPWithBadArgument(self)
|
|
test_TOPWithBadArgument.suppress = [_listMessageSuppression]
|
|
|
|
|
|
def test_RETRWithBadArgument(self):
|
|
"""
|
|
An attempt to download a message with a bad argument fails with an
|
|
I{ERR} response even if the mailbox implementation raises
|
|
L{IndexError}.
|
|
"""
|
|
return CommandMixin.test_RETRWithBadArgument(self)
|
|
test_RETRWithBadArgument.suppress = [_listMessageSuppression]
|
|
|
|
|
|
|
|
class ValueErrorCommandTests(CommandMixin, unittest.TestCase):
|
|
"""
|
|
Run all of the command tests against a mailbox which raises ValueError
|
|
when an out of bounds request is made. This is the correct behavior and
|
|
after support for mailboxes which raise IndexError is removed, this will
|
|
become just C{CommandTestCase}.
|
|
"""
|
|
exceptionType = ValueError
|
|
mailboxType = DummyMailbox
|
|
|
|
|
|
|
|
class SyncDeferredMailbox(DummyMailbox):
|
|
"""
|
|
Mailbox which has a listMessages implementation which returns a Deferred
|
|
which has already fired.
|
|
"""
|
|
def listMessages(self, n=None):
|
|
"""
|
|
Synchronously list messages.
|
|
|
|
@type n: L{int} or L{None}
|
|
@param n: The 0-based index of the message.
|
|
|
|
@return: A L{Deferred} which already has a message list result.
|
|
"""
|
|
return defer.succeed(DummyMailbox.listMessages(self, n))
|
|
|
|
|
|
|
|
class IndexErrorSyncDeferredCommandTests(IndexErrorCommandTests):
|
|
"""
|
|
Run all of the L{IndexErrorCommandTests} tests with a
|
|
synchronous-Deferred returning IMailbox implementation.
|
|
"""
|
|
mailboxType = SyncDeferredMailbox
|
|
|
|
|
|
|
|
class ValueErrorSyncDeferredCommandTests(ValueErrorCommandTests):
|
|
"""
|
|
Run all of the L{ValueErrorCommandTests} tests with a
|
|
synchronous-Deferred returning IMailbox implementation.
|
|
"""
|
|
mailboxType = SyncDeferredMailbox
|
|
|
|
|
|
|
|
class AsyncDeferredMailbox(DummyMailbox):
|
|
"""
|
|
Mailbox which has a listMessages implementation which returns a Deferred
|
|
which has not yet fired.
|
|
"""
|
|
def __init__(self, *a, **kw):
|
|
self.waiting = []
|
|
DummyMailbox.__init__(self, *a, **kw)
|
|
|
|
|
|
def listMessages(self, n=None):
|
|
"""
|
|
Record a new unfired L{Deferred} in C{self.waiting} and return it.
|
|
|
|
@type n: L{int} or L{None}
|
|
@param n: The 0-based index of the message.
|
|
|
|
@return: The L{Deferred}
|
|
"""
|
|
d = defer.Deferred()
|
|
# See AsyncDeferredMailbox._flush
|
|
self.waiting.append((d, DummyMailbox.listMessages(self, n)))
|
|
return d
|
|
|
|
|
|
|
|
class IndexErrorAsyncDeferredCommandTests(IndexErrorCommandTests):
|
|
"""
|
|
Run all of the L{IndexErrorCommandTests} tests with an
|
|
asynchronous-Deferred returning IMailbox implementation.
|
|
"""
|
|
mailboxType = AsyncDeferredMailbox
|
|
|
|
def _flush(self):
|
|
"""
|
|
Fire whatever Deferreds we've built up in our mailbox.
|
|
"""
|
|
while self.pop3Server.mbox.waiting:
|
|
d, a = self.pop3Server.mbox.waiting.pop()
|
|
d.callback(a)
|
|
IndexErrorCommandTests._flush(self)
|
|
|
|
|
|
|
|
class ValueErrorAsyncDeferredCommandTests(ValueErrorCommandTests):
|
|
"""
|
|
Run all of the L{IndexErrorCommandTests} tests with an
|
|
asynchronous-Deferred returning IMailbox implementation.
|
|
"""
|
|
mailboxType = AsyncDeferredMailbox
|
|
|
|
def _flush(self):
|
|
"""
|
|
Fire whatever Deferreds we've built up in our mailbox.
|
|
"""
|
|
while self.pop3Server.mbox.waiting:
|
|
d, a = self.pop3Server.mbox.waiting.pop()
|
|
d.callback(a)
|
|
ValueErrorCommandTests._flush(self)
|
|
|
|
|
|
|
|
class POP3MiscTests(unittest.TestCase):
|
|
"""
|
|
Miscellaneous tests more to do with module/package structure than
|
|
anything to do with the Post Office Protocol.
|
|
"""
|
|
def test_all(self):
|
|
"""
|
|
This test checks that all names listed in
|
|
twisted.mail.pop3.__all__ are actually present in the module.
|
|
"""
|
|
mod = twisted.mail.pop3
|
|
for attr in mod.__all__:
|
|
self.assertTrue(hasattr(mod, attr))
|