Ausgabe der neuen DB Einträge

This commit is contained in:
hubobel 2022-01-02 21:50:48 +01:00
parent bad48e1627
commit cfbbb9ee3d
2399 changed files with 843193 additions and 43 deletions

View file

@ -0,0 +1,7 @@
# -*- test-case-name: twisted.cred.test -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Unit tests for C{twisted.cred}.
"""

View file

@ -0,0 +1,89 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.cred}'s implementation of CRAM-MD5.
"""
from __future__ import division, absolute_import
from hmac import HMAC
from binascii import hexlify
from twisted.trial.unittest import TestCase
from twisted.cred.credentials import CramMD5Credentials
from twisted.cred.credentials import IUsernameHashedPassword
class CramMD5CredentialsTests(TestCase):
"""
Tests for L{CramMD5Credentials}.
"""
def test_idempotentChallenge(self):
"""
The same L{CramMD5Credentials} will always provide the same challenge,
no matter how many times it is called.
"""
c = CramMD5Credentials()
chal = c.getChallenge()
self.assertEqual(chal, c.getChallenge())
def test_checkPassword(self):
"""
When a valid response (which is a hex digest of the challenge that has
been encrypted by the user's shared secret) is set on the
L{CramMD5Credentials} that created the challenge, and C{checkPassword}
is called with the user's shared secret, it will return L{True}.
"""
c = CramMD5Credentials()
chal = c.getChallenge()
c.response = hexlify(HMAC(b'secret', chal).digest())
self.assertTrue(c.checkPassword(b'secret'))
def test_noResponse(self):
"""
When there is no response set, calling C{checkPassword} will return
L{False}.
"""
c = CramMD5Credentials()
self.assertFalse(c.checkPassword(b'secret'))
def test_wrongPassword(self):
"""
When an invalid response is set on the L{CramMD5Credentials} (one that
is not the hex digest of the challenge, encrypted with the user's shared
secret) and C{checkPassword} is called with the user's correct shared
secret, it will return L{False}.
"""
c = CramMD5Credentials()
chal = c.getChallenge()
c.response = hexlify(HMAC(b'thewrongsecret', chal).digest())
self.assertFalse(c.checkPassword(b'secret'))
def test_setResponse(self):
"""
When C{setResponse} is called with a string that is the username and
the hashed challenge separated with a space, they will be set on the
L{CramMD5Credentials}.
"""
c = CramMD5Credentials()
chal = c.getChallenge()
c.setResponse(b" ".join(
(b"squirrel",
hexlify(HMAC(b'supersecret', chal).digest()))))
self.assertTrue(c.checkPassword(b'supersecret'))
self.assertEqual(c.username, b"squirrel")
def test_interface(self):
"""
L{CramMD5Credentials} implements the L{IUsernameHashedPassword}
interface.
"""
self.assertTrue(
IUsernameHashedPassword.implementedBy(CramMD5Credentials))

View file

@ -0,0 +1,441 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.cred}, now with 30% more starch.
"""
from __future__ import absolute_import, division
from zope.interface import implementer, Interface
from binascii import hexlify, unhexlify
from twisted.trial import unittest
from twisted.python.compat import nativeString, networkString
from twisted.python import components
from twisted.internet import defer
from twisted.cred import checkers, credentials, portal, error
try:
from crypt import crypt
except ImportError:
crypt = None
class ITestable(Interface):
"""
An interface for a theoretical protocol.
"""
pass
class TestAvatar(object):
"""
A test avatar.
"""
def __init__(self, name):
self.name = name
self.loggedIn = False
self.loggedOut = False
def login(self):
assert not self.loggedIn
self.loggedIn = True
def logout(self):
self.loggedOut = True
@implementer(ITestable)
class Testable(components.Adapter):
"""
A theoretical protocol for testing.
"""
pass
components.registerAdapter(Testable, TestAvatar, ITestable)
class IDerivedCredentials(credentials.IUsernamePassword):
pass
@implementer(IDerivedCredentials, ITestable)
class DerivedCredentials(object):
def __init__(self, username, password):
self.username = username
self.password = password
def checkPassword(self, password):
return password == self.password
@implementer(portal.IRealm)
class TestRealm(object):
"""
A basic test realm.
"""
def __init__(self):
self.avatars = {}
def requestAvatar(self, avatarId, mind, *interfaces):
if avatarId in self.avatars:
avatar = self.avatars[avatarId]
else:
avatar = TestAvatar(avatarId)
self.avatars[avatarId] = avatar
avatar.login()
return (interfaces[0], interfaces[0](avatar),
avatar.logout)
class CredTests(unittest.TestCase):
"""
Tests for the meat of L{twisted.cred} -- realms, portals, avatars, and
checkers.
"""
def setUp(self):
self.realm = TestRealm()
self.portal = portal.Portal(self.realm)
self.checker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
self.checker.addUser(b"bob", b"hello")
self.portal.registerChecker(self.checker)
def test_listCheckers(self):
"""
The checkers in a portal can check only certain types of credentials.
Since this portal has
L{checkers.InMemoryUsernamePasswordDatabaseDontUse} registered, it
"""
expected = [credentials.IUsernamePassword,
credentials.IUsernameHashedPassword]
got = self.portal.listCredentialsInterfaces()
self.assertEqual(sorted(got), sorted(expected))
def test_basicLogin(self):
"""
Calling C{login} on a portal with correct credentials and an interface
that the portal's realm supports works.
"""
login = self.successResultOf(self.portal.login(
credentials.UsernamePassword(b"bob", b"hello"), self, ITestable))
iface, impl, logout = login
# whitebox
self.assertEqual(iface, ITestable)
self.assertTrue(iface.providedBy(impl),
"%s does not implement %s" % (impl, iface))
# greybox
self.assertTrue(impl.original.loggedIn)
self.assertTrue(not impl.original.loggedOut)
logout()
self.assertTrue(impl.original.loggedOut)
def test_derivedInterface(self):
"""
Logging in with correct derived credentials and an interface
that the portal's realm supports works.
"""
login = self.successResultOf(self.portal.login(
DerivedCredentials(b"bob", b"hello"), self, ITestable))
iface, impl, logout = login
# whitebox
self.assertEqual(iface, ITestable)
self.assertTrue(iface.providedBy(impl),
"%s does not implement %s" % (impl, iface))
# greybox
self.assertTrue(impl.original.loggedIn)
self.assertTrue(not impl.original.loggedOut)
logout()
self.assertTrue(impl.original.loggedOut)
def test_failedLoginPassword(self):
"""
Calling C{login} with incorrect credentials (in this case a wrong
password) causes L{error.UnauthorizedLogin} to be raised.
"""
login = self.failureResultOf(self.portal.login(
credentials.UsernamePassword(b"bob", b"h3llo"), self, ITestable))
self.assertTrue(login)
self.assertEqual(error.UnauthorizedLogin, login.type)
def test_failedLoginName(self):
"""
Calling C{login} with incorrect credentials (in this case no known
user) causes L{error.UnauthorizedLogin} to be raised.
"""
login = self.failureResultOf(self.portal.login(
credentials.UsernamePassword(b"jay", b"hello"), self, ITestable))
self.assertTrue(login)
self.assertEqual(error.UnauthorizedLogin, login.type)
class OnDiskDatabaseTests(unittest.TestCase):
users = [
(b'user1', b'pass1'),
(b'user2', b'pass2'),
(b'user3', b'pass3'),
]
def setUp(self):
self.dbfile = self.mktemp()
with open(self.dbfile, 'wb') as f:
for (u, p) in self.users:
f.write(u + b":" + p + b"\n")
def test_getUserNonexistentDatabase(self):
"""
A missing db file will cause a permanent rejection of authorization
attempts.
"""
self.db = checkers.FilePasswordDB('test_thisbetternoteverexist.db')
self.assertRaises(error.UnauthorizedLogin, self.db.getUser, 'user')
def testUserLookup(self):
self.db = checkers.FilePasswordDB(self.dbfile)
for (u, p) in self.users:
self.assertRaises(KeyError, self.db.getUser, u.upper())
self.assertEqual(self.db.getUser(u), (u, p))
def testCaseInSensitivity(self):
self.db = checkers.FilePasswordDB(self.dbfile, caseSensitive=False)
for (u, p) in self.users:
self.assertEqual(self.db.getUser(u.upper()), (u, p))
def testRequestAvatarId(self):
self.db = checkers.FilePasswordDB(self.dbfile)
creds = [credentials.UsernamePassword(u, p) for u, p in self.users]
d = defer.gatherResults(
[defer.maybeDeferred(self.db.requestAvatarId, c) for c in creds])
d.addCallback(self.assertEqual, [u for u, p in self.users])
return d
def testRequestAvatarId_hashed(self):
self.db = checkers.FilePasswordDB(self.dbfile)
creds = [credentials.UsernameHashedPassword(u, p)
for u, p in self.users]
d = defer.gatherResults(
[defer.maybeDeferred(self.db.requestAvatarId, c) for c in creds])
d.addCallback(self.assertEqual, [u for u, p in self.users])
return d
class HashedPasswordOnDiskDatabaseTests(unittest.TestCase):
users = [
(b'user1', b'pass1'),
(b'user2', b'pass2'),
(b'user3', b'pass3'),
]
def setUp(self):
dbfile = self.mktemp()
self.db = checkers.FilePasswordDB(dbfile, hash=self.hash)
with open(dbfile, 'wb') as f:
for (u, p) in self.users:
f.write(u + b":" + self.hash(u, p, u[:2]) + b"\n")
r = TestRealm()
self.port = portal.Portal(r)
self.port.registerChecker(self.db)
def hash(self, u, p, s):
return networkString(crypt(nativeString(p), nativeString(s)))
def testGoodCredentials(self):
goodCreds = [credentials.UsernamePassword(u, p) for u, p in self.users]
d = defer.gatherResults([self.db.requestAvatarId(c)
for c in goodCreds])
d.addCallback(self.assertEqual, [u for u, p in self.users])
return d
def testGoodCredentials_login(self):
goodCreds = [credentials.UsernamePassword(u, p) for u, p in self.users]
d = defer.gatherResults([self.port.login(c, None, ITestable)
for c in goodCreds])
d.addCallback(lambda x: [a.original.name for i, a, l in x])
d.addCallback(self.assertEqual, [u for u, p in self.users])
return d
def testBadCredentials(self):
badCreds = [credentials.UsernamePassword(u, 'wrong password')
for u, p in self.users]
d = defer.DeferredList([self.port.login(c, None, ITestable)
for c in badCreds], consumeErrors=True)
d.addCallback(self._assertFailures, error.UnauthorizedLogin)
return d
def testHashedCredentials(self):
hashedCreds = [credentials.UsernameHashedPassword(
u, self.hash(None, p, u[:2])) for u, p in self.users]
d = defer.DeferredList([self.port.login(c, None, ITestable)
for c in hashedCreds], consumeErrors=True)
d.addCallback(self._assertFailures, error.UnhandledCredentials)
return d
def _assertFailures(self, failures, *expectedFailures):
for flag, failure in failures:
self.assertEqual(flag, defer.FAILURE)
failure.trap(*expectedFailures)
return None
if crypt is None:
skip = "crypt module not available"
class CheckersMixin(object):
"""
L{unittest.TestCase} mixin for testing that some checkers accept
and deny specified credentials.
Subclasses must provide
- C{getCheckers} which returns a sequence of
L{checkers.ICredentialChecker}
- C{getGoodCredentials} which returns a list of 2-tuples of
credential to check and avaterId to expect.
- C{getBadCredentials} which returns a list of credentials
which are expected to be unauthorized.
"""
@defer.inlineCallbacks
def test_positive(self):
"""
The given credentials are accepted by all the checkers, and give
the expected C{avatarID}s
"""
for chk in self.getCheckers():
for (cred, avatarId) in self.getGoodCredentials():
r = yield chk.requestAvatarId(cred)
self.assertEqual(r, avatarId)
@defer.inlineCallbacks
def test_negative(self):
"""
The given credentials are rejected by all the checkers.
"""
for chk in self.getCheckers():
for cred in self.getBadCredentials():
d = chk.requestAvatarId(cred)
yield self.assertFailure(d, error.UnauthorizedLogin)
class HashlessFilePasswordDBMixin(object):
credClass = credentials.UsernamePassword
diskHash = None
networkHash = staticmethod(lambda x: x)
_validCredentials = [
(b'user1', b'password1'),
(b'user2', b'password2'),
(b'user3', b'password3')]
def getGoodCredentials(self):
for u, p in self._validCredentials:
yield self.credClass(u, self.networkHash(p)), u
def getBadCredentials(self):
for u, p in [(b'user1', b'password3'),
(b'user2', b'password1'),
(b'bloof', b'blarf')]:
yield self.credClass(u, self.networkHash(p))
def getCheckers(self):
diskHash = self.diskHash or (lambda x: x)
hashCheck = self.diskHash and (lambda username, password,
stored: self.diskHash(password))
for cache in True, False:
fn = self.mktemp()
with open(fn, 'wb') as fObj:
for u, p in self._validCredentials:
fObj.write(u + b":" + diskHash(p) + b"\n")
yield checkers.FilePasswordDB(fn, cache=cache, hash=hashCheck)
fn = self.mktemp()
with open(fn, 'wb') as fObj:
for u, p in self._validCredentials:
fObj.write(diskHash(p) + b' dingle dongle ' + u + b'\n')
yield checkers.FilePasswordDB(fn, b' ', 3, 0,
cache=cache, hash=hashCheck)
fn = self.mktemp()
with open(fn, 'wb') as fObj:
for u, p in self._validCredentials:
fObj.write(b'zip,zap,' + u.title() + b',zup,'\
+ diskHash(p) + b'\n',)
yield checkers.FilePasswordDB(fn, b',', 2, 4, False,
cache=cache, hash=hashCheck)
class LocallyHashedFilePasswordDBMixin(HashlessFilePasswordDBMixin):
diskHash = staticmethod(lambda x: hexlify(x))
class NetworkHashedFilePasswordDBMixin(HashlessFilePasswordDBMixin):
networkHash = staticmethod(lambda x: hexlify(x))
class credClass(credentials.UsernameHashedPassword):
def checkPassword(self, password):
return unhexlify(self.hashed) == password
class HashlessFilePasswordDBCheckerTests(HashlessFilePasswordDBMixin,
CheckersMixin, unittest.TestCase):
pass
class LocallyHashedFilePasswordDBCheckerTests(LocallyHashedFilePasswordDBMixin,
CheckersMixin,
unittest.TestCase):
pass
class NetworkHashedFilePasswordDBCheckerTests(NetworkHashedFilePasswordDBMixin,
CheckersMixin,
unittest.TestCase):
pass

View file

@ -0,0 +1,698 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.cred._digest} and the associated bits in
L{twisted.cred.credentials}.
"""
from __future__ import division, absolute_import
import base64
from binascii import hexlify
from hashlib import md5, sha1
from zope.interface.verify import verifyObject
from twisted.trial.unittest import TestCase
from twisted.internet.address import IPv4Address
from twisted.cred.error import LoginFailed
from twisted.cred.credentials import calcHA1, calcHA2, IUsernameDigestHash
from twisted.cred.credentials import calcResponse, DigestCredentialFactory
from twisted.python.compat import networkString
def b64encode(s):
return base64.b64encode(s).strip()
class FakeDigestCredentialFactory(DigestCredentialFactory):
"""
A Fake Digest Credential Factory that generates a predictable
nonce and opaque
"""
def __init__(self, *args, **kwargs):
super(FakeDigestCredentialFactory, self).__init__(*args, **kwargs)
self.privateKey = b"0"
def _generateNonce(self):
"""
Generate a static nonce
"""
return b'178288758716122392881254770685'
def _getTime(self):
"""
Return a stable time
"""
return 0
class DigestAuthTests(TestCase):
"""
L{TestCase} mixin class which defines a number of tests for
L{DigestCredentialFactory}. Because this mixin defines C{setUp}, it
must be inherited before L{TestCase}.
"""
def setUp(self):
"""
Create a DigestCredentialFactory for testing
"""
self.username = b"foobar"
self.password = b"bazquux"
self.realm = b"test realm"
self.algorithm = b"md5"
self.cnonce = b"29fc54aa1641c6fa0e151419361c8f23"
self.qop = b"auth"
self.uri = b"/write/"
self.clientAddress = IPv4Address('TCP', '10.2.3.4', 43125)
self.method = b'GET'
self.credentialFactory = DigestCredentialFactory(
self.algorithm, self.realm)
def test_MD5HashA1(self, _algorithm=b'md5', _hash=md5):
"""
L{calcHA1} accepts the C{'md5'} algorithm and returns an MD5 hash of
its parameters, excluding the nonce and cnonce.
"""
nonce = b'abc123xyz'
hashA1 = calcHA1(_algorithm, self.username, self.realm, self.password,
nonce, self.cnonce)
a1 = b":".join((self.username, self.realm, self.password))
expected = hexlify(_hash(a1).digest())
self.assertEqual(hashA1, expected)
def test_MD5SessionHashA1(self):
"""
L{calcHA1} accepts the C{'md5-sess'} algorithm and returns an MD5 hash
of its parameters, including the nonce and cnonce.
"""
nonce = b'xyz321abc'
hashA1 = calcHA1(b'md5-sess', self.username, self.realm, self.password,
nonce, self.cnonce)
a1 = self.username + b':' + self.realm + b':' + self.password
ha1 = hexlify(md5(a1).digest())
a1 = ha1 + b':' + nonce + b':' + self.cnonce
expected = hexlify(md5(a1).digest())
self.assertEqual(hashA1, expected)
def test_SHAHashA1(self):
"""
L{calcHA1} accepts the C{'sha'} algorithm and returns a SHA hash of its
parameters, excluding the nonce and cnonce.
"""
self.test_MD5HashA1(b'sha', sha1)
def test_MD5HashA2Auth(self, _algorithm=b'md5', _hash=md5):
"""
L{calcHA2} accepts the C{'md5'} algorithm and returns an MD5 hash of
its arguments, excluding the entity hash for QOP other than
C{'auth-int'}.
"""
method = b'GET'
hashA2 = calcHA2(_algorithm, method, self.uri, b'auth', None)
a2 = method + b':' + self.uri
expected = hexlify(_hash(a2).digest())
self.assertEqual(hashA2, expected)
def test_MD5HashA2AuthInt(self, _algorithm=b'md5', _hash=md5):
"""
L{calcHA2} accepts the C{'md5'} algorithm and returns an MD5 hash of
its arguments, including the entity hash for QOP of C{'auth-int'}.
"""
method = b'GET'
hentity = b'foobarbaz'
hashA2 = calcHA2(_algorithm, method, self.uri, b'auth-int', hentity)
a2 = method + b':' + self.uri + b':' + hentity
expected = hexlify(_hash(a2).digest())
self.assertEqual(hashA2, expected)
def test_MD5SessHashA2Auth(self):
"""
L{calcHA2} accepts the C{'md5-sess'} algorithm and QOP of C{'auth'} and
returns the same value as it does for the C{'md5'} algorithm.
"""
self.test_MD5HashA2Auth(b'md5-sess')
def test_MD5SessHashA2AuthInt(self):
"""
L{calcHA2} accepts the C{'md5-sess'} algorithm and QOP of C{'auth-int'}
and returns the same value as it does for the C{'md5'} algorithm.
"""
self.test_MD5HashA2AuthInt(b'md5-sess')
def test_SHAHashA2Auth(self):
"""
L{calcHA2} accepts the C{'sha'} algorithm and returns a SHA hash of
its arguments, excluding the entity hash for QOP other than
C{'auth-int'}.
"""
self.test_MD5HashA2Auth(b'sha', sha1)
def test_SHAHashA2AuthInt(self):
"""
L{calcHA2} accepts the C{'sha'} algorithm and returns a SHA hash of
its arguments, including the entity hash for QOP of C{'auth-int'}.
"""
self.test_MD5HashA2AuthInt(b'sha', sha1)
def test_MD5HashResponse(self, _algorithm=b'md5', _hash=md5):
"""
L{calcResponse} accepts the C{'md5'} algorithm and returns an MD5 hash
of its parameters, excluding the nonce count, client nonce, and QoP
value if the nonce count and client nonce are L{None}
"""
hashA1 = b'abc123'
hashA2 = b'789xyz'
nonce = b'lmnopq'
response = hashA1 + b':' + nonce + b':' + hashA2
expected = hexlify(_hash(response).digest())
digest = calcResponse(hashA1, hashA2, _algorithm, nonce, None, None,
None)
self.assertEqual(expected, digest)
def test_MD5SessionHashResponse(self):
"""
L{calcResponse} accepts the C{'md5-sess'} algorithm and returns an MD5
hash of its parameters, excluding the nonce count, client nonce, and
QoP value if the nonce count and client nonce are L{None}
"""
self.test_MD5HashResponse(b'md5-sess')
def test_SHAHashResponse(self):
"""
L{calcResponse} accepts the C{'sha'} algorithm and returns a SHA hash
of its parameters, excluding the nonce count, client nonce, and QoP
value if the nonce count and client nonce are L{None}
"""
self.test_MD5HashResponse(b'sha', sha1)
def test_MD5HashResponseExtra(self, _algorithm=b'md5', _hash=md5):
"""
L{calcResponse} accepts the C{'md5'} algorithm and returns an MD5 hash
of its parameters, including the nonce count, client nonce, and QoP
value if they are specified.
"""
hashA1 = b'abc123'
hashA2 = b'789xyz'
nonce = b'lmnopq'
nonceCount = b'00000004'
clientNonce = b'abcxyz123'
qop = b'auth'
response = hashA1 + b':' + nonce + b':' + nonceCount + b':' +\
clientNonce + b':' + qop + b':' + hashA2
expected = hexlify(_hash(response).digest())
digest = calcResponse(
hashA1, hashA2, _algorithm, nonce, nonceCount, clientNonce, qop)
self.assertEqual(expected, digest)
def test_MD5SessionHashResponseExtra(self):
"""
L{calcResponse} accepts the C{'md5-sess'} algorithm and returns an MD5
hash of its parameters, including the nonce count, client nonce, and
QoP value if they are specified.
"""
self.test_MD5HashResponseExtra(b'md5-sess')
def test_SHAHashResponseExtra(self):
"""
L{calcResponse} accepts the C{'sha'} algorithm and returns a SHA hash
of its parameters, including the nonce count, client nonce, and QoP
value if they are specified.
"""
self.test_MD5HashResponseExtra(b'sha', sha1)
def formatResponse(self, quotes=True, **kw):
"""
Format all given keyword arguments and their values suitably for use as
the value of an HTTP header.
@types quotes: C{bool}
@param quotes: A flag indicating whether to quote the values of each
field in the response.
@param **kw: Keywords and C{bytes} values which will be treated as field
name/value pairs to include in the result.
@rtype: C{bytes}
@return: The given fields formatted for use as an HTTP header value.
"""
if 'username' not in kw:
kw['username'] = self.username
if 'realm' not in kw:
kw['realm'] = self.realm
if 'algorithm' not in kw:
kw['algorithm'] = self.algorithm
if 'qop' not in kw:
kw['qop'] = self.qop
if 'cnonce' not in kw:
kw['cnonce'] = self.cnonce
if 'uri' not in kw:
kw['uri'] = self.uri
if quotes:
quote = b'"'
else:
quote = b''
return b', '.join([
b"".join((networkString(k), b"=", quote, v, quote))
for (k, v)
in kw.items()
if v is not None])
def getDigestResponse(self, challenge, ncount):
"""
Calculate the response for the given challenge
"""
nonce = challenge.get('nonce')
algo = challenge.get('algorithm').lower()
qop = challenge.get('qop')
ha1 = calcHA1(
algo, self.username, self.realm, self.password, nonce, self.cnonce)
ha2 = calcHA2(algo, b"GET", self.uri, qop, None)
expected = calcResponse(
ha1, ha2, algo, nonce, ncount, self.cnonce, qop)
return expected
def test_response(self, quotes=True):
"""
L{DigestCredentialFactory.decode} accepts a digest challenge response
and parses it into an L{IUsernameHashedPassword} provider.
"""
challenge = self.credentialFactory.getChallenge(
self.clientAddress.host)
nc = b"00000001"
clientResponse = self.formatResponse(
quotes=quotes,
nonce=challenge['nonce'],
response=self.getDigestResponse(challenge, nc),
nc=nc,
opaque=challenge['opaque'])
creds = self.credentialFactory.decode(
clientResponse, self.method, self.clientAddress.host)
self.assertTrue(creds.checkPassword(self.password))
self.assertFalse(creds.checkPassword(self.password + b'wrong'))
def test_responseWithoutQuotes(self):
"""
L{DigestCredentialFactory.decode} accepts a digest challenge response
which does not quote the values of its fields and parses it into an
L{IUsernameHashedPassword} provider in the same way it would a
response which included quoted field values.
"""
self.test_response(False)
def test_responseWithCommaURI(self):
"""
L{DigestCredentialFactory.decode} accepts a digest challenge response
which quotes the values of its fields and includes a C{b","} in the URI
field.
"""
self.uri = b"/some,path/"
self.test_response(True)
def test_caseInsensitiveAlgorithm(self):
"""
The case of the algorithm value in the response is ignored when
checking the credentials.
"""
self.algorithm = b'MD5'
self.test_response()
def test_md5DefaultAlgorithm(self):
"""
The algorithm defaults to MD5 if it is not supplied in the response.
"""
self.algorithm = None
self.test_response()
def test_responseWithoutClientIP(self):
"""
L{DigestCredentialFactory.decode} accepts a digest challenge response
even if the client address it is passed is L{None}.
"""
challenge = self.credentialFactory.getChallenge(None)
nc = b"00000001"
clientResponse = self.formatResponse(
nonce=challenge['nonce'],
response=self.getDigestResponse(challenge, nc),
nc=nc,
opaque=challenge['opaque'])
creds = self.credentialFactory.decode(clientResponse, self.method,
None)
self.assertTrue(creds.checkPassword(self.password))
self.assertFalse(creds.checkPassword(self.password + b'wrong'))
def test_multiResponse(self):
"""
L{DigestCredentialFactory.decode} handles multiple responses to a
single challenge.
"""
challenge = self.credentialFactory.getChallenge(
self.clientAddress.host)
nc = b"00000001"
clientResponse = self.formatResponse(
nonce=challenge['nonce'],
response=self.getDigestResponse(challenge, nc),
nc=nc,
opaque=challenge['opaque'])
creds = self.credentialFactory.decode(clientResponse, self.method,
self.clientAddress.host)
self.assertTrue(creds.checkPassword(self.password))
self.assertFalse(creds.checkPassword(self.password + b'wrong'))
nc = b"00000002"
clientResponse = self.formatResponse(
nonce=challenge['nonce'],
response=self.getDigestResponse(challenge, nc),
nc=nc,
opaque=challenge['opaque'])
creds = self.credentialFactory.decode(clientResponse, self.method,
self.clientAddress.host)
self.assertTrue(creds.checkPassword(self.password))
self.assertFalse(creds.checkPassword(self.password + b'wrong'))
def test_failsWithDifferentMethod(self):
"""
L{DigestCredentialFactory.decode} returns an L{IUsernameHashedPassword}
provider which rejects a correct password for the given user if the
challenge response request is made using a different HTTP method than
was used to request the initial challenge.
"""
challenge = self.credentialFactory.getChallenge(
self.clientAddress.host)
nc = b"00000001"
clientResponse = self.formatResponse(
nonce=challenge['nonce'],
response=self.getDigestResponse(challenge, nc),
nc=nc,
opaque=challenge['opaque'])
creds = self.credentialFactory.decode(clientResponse, b'POST',
self.clientAddress.host)
self.assertFalse(creds.checkPassword(self.password))
self.assertFalse(creds.checkPassword(self.password + b'wrong'))
def test_noUsername(self):
"""
L{DigestCredentialFactory.decode} raises L{LoginFailed} if the response
has no username field or if the username field is empty.
"""
# Check for no username
e = self.assertRaises(
LoginFailed,
self.credentialFactory.decode,
self.formatResponse(username=None),
self.method, self.clientAddress.host)
self.assertEqual(str(e), "Invalid response, no username given.")
# Check for an empty username
e = self.assertRaises(
LoginFailed,
self.credentialFactory.decode,
self.formatResponse(username=b""),
self.method, self.clientAddress.host)
self.assertEqual(str(e), "Invalid response, no username given.")
def test_noNonce(self):
"""
L{DigestCredentialFactory.decode} raises L{LoginFailed} if the response
has no nonce.
"""
e = self.assertRaises(
LoginFailed,
self.credentialFactory.decode,
self.formatResponse(opaque=b"abc123"),
self.method, self.clientAddress.host)
self.assertEqual(str(e), "Invalid response, no nonce given.")
def test_noOpaque(self):
"""
L{DigestCredentialFactory.decode} raises L{LoginFailed} if the response
has no opaque.
"""
e = self.assertRaises(
LoginFailed,
self.credentialFactory.decode,
self.formatResponse(),
self.method, self.clientAddress.host)
self.assertEqual(str(e), "Invalid response, no opaque given.")
def test_checkHash(self):
"""
L{DigestCredentialFactory.decode} returns an L{IUsernameDigestHash}
provider which can verify a hash of the form 'username:realm:password'.
"""
challenge = self.credentialFactory.getChallenge(
self.clientAddress.host)
nc = b"00000001"
clientResponse = self.formatResponse(
nonce=challenge['nonce'],
response=self.getDigestResponse(challenge, nc),
nc=nc,
opaque=challenge['opaque'])
creds = self.credentialFactory.decode(clientResponse, self.method,
self.clientAddress.host)
self.assertTrue(verifyObject(IUsernameDigestHash, creds))
cleartext = self.username + b":" + self.realm + b":" + self.password
hash = md5(cleartext)
self.assertTrue(creds.checkHash(hexlify(hash.digest())))
hash.update(b'wrong')
self.assertFalse(creds.checkHash(hexlify(hash.digest())))
def test_invalidOpaque(self):
"""
L{DigestCredentialFactory.decode} raises L{LoginFailed} when the opaque
value does not contain all the required parts.
"""
credentialFactory = FakeDigestCredentialFactory(self.algorithm,
self.realm)
challenge = credentialFactory.getChallenge(self.clientAddress.host)
exc = self.assertRaises(
LoginFailed,
credentialFactory._verifyOpaque,
b'badOpaque',
challenge['nonce'],
self.clientAddress.host)
self.assertEqual(str(exc), 'Invalid response, invalid opaque value')
badOpaque = b'foo-' + b64encode(b'nonce,clientip')
exc = self.assertRaises(
LoginFailed,
credentialFactory._verifyOpaque,
badOpaque,
challenge['nonce'],
self.clientAddress.host)
self.assertEqual(str(exc), 'Invalid response, invalid opaque value')
exc = self.assertRaises(
LoginFailed,
credentialFactory._verifyOpaque,
b'',
challenge['nonce'],
self.clientAddress.host)
self.assertEqual(str(exc), 'Invalid response, invalid opaque value')
badOpaque = b'foo-' + b64encode(
b",".join((challenge['nonce'],
networkString(self.clientAddress.host),
b"foobar")))
exc = self.assertRaises(
LoginFailed,
credentialFactory._verifyOpaque,
badOpaque,
challenge['nonce'],
self.clientAddress.host)
self.assertEqual(
str(exc), 'Invalid response, invalid opaque/time values')
def test_incompatibleNonce(self):
"""
L{DigestCredentialFactory.decode} raises L{LoginFailed} when the given
nonce from the response does not match the nonce encoded in the opaque.
"""
credentialFactory = FakeDigestCredentialFactory(self.algorithm,
self.realm)
challenge = credentialFactory.getChallenge(self.clientAddress.host)
badNonceOpaque = credentialFactory._generateOpaque(
b'1234567890',
self.clientAddress.host)
exc = self.assertRaises(
LoginFailed,
credentialFactory._verifyOpaque,
badNonceOpaque,
challenge['nonce'],
self.clientAddress.host)
self.assertEqual(
str(exc),
'Invalid response, incompatible opaque/nonce values')
exc = self.assertRaises(
LoginFailed,
credentialFactory._verifyOpaque,
badNonceOpaque,
b'',
self.clientAddress.host)
self.assertEqual(
str(exc),
'Invalid response, incompatible opaque/nonce values')
def test_incompatibleClientIP(self):
"""
L{DigestCredentialFactory.decode} raises L{LoginFailed} when the
request comes from a client IP other than what is encoded in the
opaque.
"""
credentialFactory = FakeDigestCredentialFactory(self.algorithm,
self.realm)
challenge = credentialFactory.getChallenge(self.clientAddress.host)
badAddress = '10.0.0.1'
# Sanity check
self.assertNotEqual(self.clientAddress.host, badAddress)
badNonceOpaque = credentialFactory._generateOpaque(
challenge['nonce'], badAddress)
self.assertRaises(
LoginFailed,
credentialFactory._verifyOpaque,
badNonceOpaque,
challenge['nonce'],
self.clientAddress.host)
def test_oldNonce(self):
"""
L{DigestCredentialFactory.decode} raises L{LoginFailed} when the given
opaque is older than C{DigestCredentialFactory.CHALLENGE_LIFETIME_SECS}
"""
credentialFactory = FakeDigestCredentialFactory(self.algorithm,
self.realm)
challenge = credentialFactory.getChallenge(self.clientAddress.host)
key = b",".join((challenge['nonce'],
networkString(self.clientAddress.host),
b'-137876876'))
digest = hexlify(md5(key + credentialFactory.privateKey).digest())
ekey = b64encode(key)
oldNonceOpaque = b"-".join((digest, ekey.strip(b'\n')))
self.assertRaises(
LoginFailed,
credentialFactory._verifyOpaque,
oldNonceOpaque,
challenge['nonce'],
self.clientAddress.host)
def test_mismatchedOpaqueChecksum(self):
"""
L{DigestCredentialFactory.decode} raises L{LoginFailed} when the opaque
checksum fails verification.
"""
credentialFactory = FakeDigestCredentialFactory(self.algorithm,
self.realm)
challenge = credentialFactory.getChallenge(self.clientAddress.host)
key = b",".join((challenge['nonce'],
networkString(self.clientAddress.host),
b'0'))
digest = hexlify(md5(key + b'this is not the right pkey').digest())
badChecksum = b"-".join((digest, b64encode(key)))
self.assertRaises(
LoginFailed,
credentialFactory._verifyOpaque,
badChecksum,
challenge['nonce'],
self.clientAddress.host)
def test_incompatibleCalcHA1Options(self):
"""
L{calcHA1} raises L{TypeError} when any of the pszUsername, pszRealm,
or pszPassword arguments are specified with the preHA1 keyword
argument.
"""
arguments = (
(b"user", b"realm", b"password", b"preHA1"),
(None, b"realm", None, b"preHA1"),
(None, None, b"password", b"preHA1"),
)
for pszUsername, pszRealm, pszPassword, preHA1 in arguments:
self.assertRaises(
TypeError,
calcHA1,
b"md5",
pszUsername,
pszRealm,
pszPassword,
b"nonce",
b"cnonce",
preHA1=preHA1)
def test_noNewlineOpaque(self):
"""
L{DigestCredentialFactory._generateOpaque} returns a value without
newlines, regardless of the length of the nonce.
"""
opaque = self.credentialFactory._generateOpaque(
b"long nonce " * 10, None)
self.assertNotIn(b'\n', opaque)

View file

@ -0,0 +1,93 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for basic constructs of L{twisted.cred.credentials}.
"""
from __future__ import division, absolute_import
from twisted.trial.unittest import TestCase
from twisted.cred.credentials import UsernamePassword, IUsernamePassword
from twisted.cred.credentials import UsernameHashedPassword
from twisted.cred.credentials import IUsernameHashedPassword
class UsernamePasswordTests(TestCase):
"""
Tests for L{UsernamePassword}.
"""
def test_initialisation(self):
"""
The initialisation of L{UsernamePassword} will set C{username} and
C{password} on it.
"""
creds = UsernamePassword(b"foo", b"bar")
self.assertEqual(creds.username, b"foo")
self.assertEqual(creds.password, b"bar")
def test_correctPassword(self):
"""
Calling C{checkPassword} on a L{UsernamePassword} will return L{True}
when the password given is the password on the object.
"""
creds = UsernamePassword(b"user", b"pass")
self.assertTrue(creds.checkPassword(b"pass"))
def test_wrongPassword(self):
"""
Calling C{checkPassword} on a L{UsernamePassword} will return L{False}
when the password given is NOT the password on the object.
"""
creds = UsernamePassword(b"user", b"pass")
self.assertFalse(creds.checkPassword(b"someotherpass"))
def test_interface(self):
"""
L{UsernamePassword} implements L{IUsernamePassword}.
"""
self.assertTrue(IUsernamePassword.implementedBy(UsernamePassword))
class UsernameHashedPasswordTests(TestCase):
"""
Tests for L{UsernameHashedPassword}.
"""
def test_initialisation(self):
"""
The initialisation of L{UsernameHashedPassword} will set C{username}
and C{hashed} on it.
"""
creds = UsernameHashedPassword(b"foo", b"bar")
self.assertEqual(creds.username, b"foo")
self.assertEqual(creds.hashed, b"bar")
def test_correctPassword(self):
"""
Calling C{checkPassword} on a L{UsernameHashedPassword} will return
L{True} when the password given is the password on the object.
"""
creds = UsernameHashedPassword(b"user", b"pass")
self.assertTrue(creds.checkPassword(b"pass"))
def test_wrongPassword(self):
"""
Calling C{checkPassword} on a L{UsernameHashedPassword} will return
L{False} when the password given is NOT the password on the object.
"""
creds = UsernameHashedPassword(b"user", b"pass")
self.assertFalse(creds.checkPassword(b"someotherpass"))
def test_interface(self):
"""
L{UsernameHashedPassword} implements L{IUsernameHashedPassword}.
"""
self.assertTrue(
IUsernameHashedPassword.implementedBy(UsernameHashedPassword))

View file

@ -0,0 +1,748 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
L{twisted.cred.strcred}.
"""
from __future__ import absolute_import, division
import os
from twisted import plugin
from twisted.trial import unittest
from twisted.cred import credentials, checkers, error, strcred
from twisted.plugins import cred_file, cred_anonymous, cred_unix
from twisted.python import usage
from twisted.python.compat import NativeStringIO
from twisted.python.filepath import FilePath
from twisted.python.fakepwd import UserDatabase
from twisted.python.reflect import requireModule
try:
import crypt
except ImportError:
crypt = None
try:
import pwd
except ImportError:
pwd = None
try:
import spwd
except ImportError:
spwd = None
def getInvalidAuthType():
"""
Helper method to produce an auth type that doesn't exist.
"""
invalidAuthType = 'ThisPluginDoesNotExist'
while (invalidAuthType in
[factory.authType for factory in strcred.findCheckerFactories()]):
invalidAuthType += '_'
return invalidAuthType
class PublicAPITests(unittest.TestCase):
def test_emptyDescription(self):
"""
The description string cannot be empty.
"""
iat = getInvalidAuthType()
self.assertRaises(strcred.InvalidAuthType, strcred.makeChecker, iat)
self.assertRaises(
strcred.InvalidAuthType, strcred.findCheckerFactory, iat)
def test_invalidAuthType(self):
"""
An unrecognized auth type raises an exception.
"""
iat = getInvalidAuthType()
self.assertRaises(strcred.InvalidAuthType, strcred.makeChecker, iat)
self.assertRaises(
strcred.InvalidAuthType, strcred.findCheckerFactory, iat)
class StrcredFunctionsTests(unittest.TestCase):
def test_findCheckerFactories(self):
"""
L{strcred.findCheckerFactories} returns all available plugins.
"""
availablePlugins = list(strcred.findCheckerFactories())
for plg in plugin.getPlugins(strcred.ICheckerFactory):
self.assertIn(plg, availablePlugins)
def test_findCheckerFactory(self):
"""
L{strcred.findCheckerFactory} returns the first plugin
available for a given authentication type.
"""
self.assertIdentical(strcred.findCheckerFactory('file'),
cred_file.theFileCheckerFactory)
class MemoryCheckerTests(unittest.TestCase):
def setUp(self):
self.admin = credentials.UsernamePassword('admin', 'asdf')
self.alice = credentials.UsernamePassword('alice', 'foo')
self.badPass = credentials.UsernamePassword('alice', 'foobar')
self.badUser = credentials.UsernamePassword('x', 'yz')
self.checker = strcred.makeChecker('memory:admin:asdf:alice:foo')
def test_isChecker(self):
"""
Verifies that strcred.makeChecker('memory') returns an object
that implements the L{ICredentialsChecker} interface.
"""
self.assertTrue(checkers.ICredentialsChecker.providedBy(self.checker))
self.assertIn(credentials.IUsernamePassword,
self.checker.credentialInterfaces)
def test_badFormatArgString(self):
"""
An argument string which does not contain user:pass pairs
(i.e., an odd number of ':' characters) raises an exception.
"""
self.assertRaises(strcred.InvalidAuthArgumentString,
strcred.makeChecker, 'memory:a:b:c')
def test_memoryCheckerSucceeds(self):
"""
The checker works with valid credentials.
"""
def _gotAvatar(username):
self.assertEqual(username, self.admin.username)
return (self.checker
.requestAvatarId(self.admin)
.addCallback(_gotAvatar))
def test_memoryCheckerFailsUsername(self):
"""
The checker fails with an invalid username.
"""
return self.assertFailure(self.checker.requestAvatarId(self.badUser),
error.UnauthorizedLogin)
def test_memoryCheckerFailsPassword(self):
"""
The checker fails with an invalid password.
"""
return self.assertFailure(self.checker.requestAvatarId(self.badPass),
error.UnauthorizedLogin)
class AnonymousCheckerTests(unittest.TestCase):
def test_isChecker(self):
"""
Verifies that strcred.makeChecker('anonymous') returns an object
that implements the L{ICredentialsChecker} interface.
"""
checker = strcred.makeChecker('anonymous')
self.assertTrue(checkers.ICredentialsChecker.providedBy(checker))
self.assertIn(credentials.IAnonymous, checker.credentialInterfaces)
def testAnonymousAccessSucceeds(self):
"""
We can log in anonymously using this checker.
"""
checker = strcred.makeChecker('anonymous')
request = checker.requestAvatarId(credentials.Anonymous())
def _gotAvatar(avatar):
self.assertIdentical(checkers.ANONYMOUS, avatar)
return request.addCallback(_gotAvatar)
class UnixCheckerTests(unittest.TestCase):
users = {
'admin': 'asdf',
'alice': 'foo',
}
def _spwd_getspnam(self, username):
return spwd.struct_spwd((username,
crypt.crypt(self.users[username], 'F/'),
0, 0, 99999, 7, -1, -1, -1))
def setUp(self):
self.admin = credentials.UsernamePassword('admin', 'asdf')
self.alice = credentials.UsernamePassword('alice', 'foo')
self.badPass = credentials.UsernamePassword('alice', 'foobar')
self.badUser = credentials.UsernamePassword('x', 'yz')
self.checker = strcred.makeChecker('unix')
self.adminBytes = credentials.UsernamePassword(b'admin', b'asdf')
self.aliceBytes = credentials.UsernamePassword(b'alice', b'foo')
self.badPassBytes = credentials.UsernamePassword(b'alice', b'foobar')
self.badUserBytes = credentials.UsernamePassword(b'x', b'yz')
self.checkerBytes = strcred.makeChecker('unix')
# Hack around the pwd and spwd modules, since we can't really
# go about reading your /etc/passwd or /etc/shadow files
if pwd:
database = UserDatabase()
for username, password in self.users.items():
database.addUser(
username, crypt.crypt(password, 'F/'),
1000, 1000, username, '/home/' + username, '/bin/sh')
self.patch(pwd, 'getpwnam', database.getpwnam)
if spwd:
self.patch(spwd, 'getspnam', self._spwd_getspnam)
def test_isChecker(self):
"""
Verifies that strcred.makeChecker('unix') returns an object
that implements the L{ICredentialsChecker} interface.
"""
self.assertTrue(checkers.ICredentialsChecker.providedBy(self.checker))
self.assertIn(credentials.IUsernamePassword,
self.checker.credentialInterfaces)
self.assertTrue(checkers.ICredentialsChecker.providedBy(
self.checkerBytes))
self.assertIn(credentials.IUsernamePassword,
self.checkerBytes.credentialInterfaces)
def test_unixCheckerSucceeds(self):
"""
The checker works with valid credentials.
"""
def _gotAvatar(username):
self.assertEqual(username, self.admin.username)
return (self.checker
.requestAvatarId(self.admin)
.addCallback(_gotAvatar))
def test_unixCheckerSucceedsBytes(self):
"""
The checker works with valid L{bytes} credentials.
"""
def _gotAvatar(username):
self.assertEqual(username,
self.adminBytes.username.decode("utf-8"))
return (self.checkerBytes
.requestAvatarId(self.adminBytes)
.addCallback(_gotAvatar))
def test_unixCheckerFailsUsername(self):
"""
The checker fails with an invalid username.
"""
return self.assertFailure(self.checker.requestAvatarId(self.badUser),
error.UnauthorizedLogin)
def test_unixCheckerFailsUsernameBytes(self):
"""
The checker fails with an invalid L{bytes} username.
"""
return self.assertFailure(self.checkerBytes.requestAvatarId(
self.badUserBytes), error.UnauthorizedLogin)
def test_unixCheckerFailsPassword(self):
"""
The checker fails with an invalid password.
"""
return self.assertFailure(self.checker.requestAvatarId(self.badPass),
error.UnauthorizedLogin)
def test_unixCheckerFailsPasswordBytes(self):
"""
The checker fails with an invalid L{bytes} password.
"""
return self.assertFailure(self.checkerBytes.requestAvatarId(
self.badPassBytes), error.UnauthorizedLogin)
if None in (pwd, spwd, crypt):
availability = []
for module, name in ((pwd, "pwd"), (spwd, "spwd"), (crypt, "crypt")):
if module is None:
availability += [name]
for method in (test_unixCheckerSucceeds,
test_unixCheckerSucceedsBytes,
test_unixCheckerFailsUsername,
test_unixCheckerFailsUsernameBytes,
test_unixCheckerFailsPassword,
test_unixCheckerFailsPasswordBytes):
method.skip = ("Required module(s) are unavailable: " +
", ".join(availability))
class CryptTests(unittest.TestCase):
"""
L{crypt} has functions for encrypting password.
"""
if not crypt:
skip = "Required module is unavailable: crypt"
def test_verifyCryptedPassword(self):
"""
L{cred_unix.verifyCryptedPassword}
"""
password = "sample password ^%$"
for salt in (None, "ab"):
try:
cryptedCorrect = crypt.crypt(password, salt)
except TypeError:
# Older Python versions would throw a TypeError if
# a value of None was is used for the salt.
# Newer Python versions allow it.
continue
cryptedIncorrect = "$1x1234"
self.assertTrue(cred_unix.verifyCryptedPassword(cryptedCorrect,
password))
self.assertFalse(cred_unix.verifyCryptedPassword(cryptedIncorrect,
password))
# Python 3.3+ has crypt.METHOD_*, but not all
# platforms implement all methods.
for method in ("METHOD_SHA512", "METHOD_SHA256", "METHOD_MD5",
"METHOD_CRYPT"):
cryptMethod = getattr(crypt, method, None)
if not cryptMethod:
continue
password = "interesting password xyz"
crypted = crypt.crypt(password, cryptMethod)
incorrectCrypted = crypted + "blahfooincorrect"
result = cred_unix.verifyCryptedPassword(crypted, password)
self.assertTrue(result)
# Try to pass in bytes
result = cred_unix.verifyCryptedPassword(crypted.encode("utf-8"),
password.encode("utf-8"))
self.assertTrue(result)
result = cred_unix.verifyCryptedPassword(incorrectCrypted, password)
self.assertFalse(result)
# Try to pass in bytes
result = cred_unix.verifyCryptedPassword(incorrectCrypted.encode("utf-8"),
password.encode("utf-8"))
self.assertFalse(result)
class FileDBCheckerTests(unittest.TestCase):
"""
C{--auth=file:...} file checker.
"""
def setUp(self):
self.admin = credentials.UsernamePassword(b'admin', b'asdf')
self.alice = credentials.UsernamePassword(b'alice', b'foo')
self.badPass = credentials.UsernamePassword(b'alice', b'foobar')
self.badUser = credentials.UsernamePassword(b'x', b'yz')
self.filename = self.mktemp()
FilePath(self.filename).setContent(b'admin:asdf\nalice:foo\n')
self.checker = strcred.makeChecker('file:' + self.filename)
def _fakeFilename(self):
filename = '/DoesNotExist'
while os.path.exists(filename):
filename += '_'
return filename
def test_isChecker(self):
"""
Verifies that strcred.makeChecker('memory') returns an object
that implements the L{ICredentialsChecker} interface.
"""
self.assertTrue(checkers.ICredentialsChecker.providedBy(self.checker))
self.assertIn(credentials.IUsernamePassword,
self.checker.credentialInterfaces)
def test_fileCheckerSucceeds(self):
"""
The checker works with valid credentials.
"""
def _gotAvatar(username):
self.assertEqual(username, self.admin.username)
return (self.checker
.requestAvatarId(self.admin)
.addCallback(_gotAvatar))
def test_fileCheckerFailsUsername(self):
"""
The checker fails with an invalid username.
"""
return self.assertFailure(self.checker.requestAvatarId(self.badUser),
error.UnauthorizedLogin)
def test_fileCheckerFailsPassword(self):
"""
The checker fails with an invalid password.
"""
return self.assertFailure(self.checker.requestAvatarId(self.badPass),
error.UnauthorizedLogin)
def test_failsWithEmptyFilename(self):
"""
An empty filename raises an error.
"""
self.assertRaises(ValueError, strcred.makeChecker, 'file')
self.assertRaises(ValueError, strcred.makeChecker, 'file:')
def test_warnWithBadFilename(self):
"""
When the file auth plugin is given a file that doesn't exist, it
should produce a warning.
"""
oldOutput = cred_file.theFileCheckerFactory.errorOutput
newOutput = NativeStringIO()
cred_file.theFileCheckerFactory.errorOutput = newOutput
strcred.makeChecker('file:' + self._fakeFilename())
cred_file.theFileCheckerFactory.errorOutput = oldOutput
self.assertIn(cred_file.invalidFileWarning, newOutput.getvalue())
class SSHCheckerTests(unittest.TestCase):
"""
Tests for the C{--auth=sshkey:...} checker. The majority of the tests for the
ssh public key database checker are in
L{twisted.conch.test.test_checkers.SSHPublicKeyCheckerTestCase}.
"""
skip = None
if requireModule('cryptography') is None:
skip = 'cryptography is not available'
if requireModule('pyasn1') is None:
skip = 'pyasn1 is not available'
def test_isChecker(self):
"""
Verifies that strcred.makeChecker('sshkey') returns an object
that implements the L{ICredentialsChecker} interface.
"""
sshChecker = strcred.makeChecker('sshkey')
self.assertTrue(checkers.ICredentialsChecker.providedBy(sshChecker))
self.assertIn(
credentials.ISSHPrivateKey, sshChecker.credentialInterfaces)
class DummyOptions(usage.Options, strcred.AuthOptionMixin):
"""
Simple options for testing L{strcred.AuthOptionMixin}.
"""
class CheckerOptionsTests(unittest.TestCase):
def test_createsList(self):
"""
The C{--auth} command line creates a list in the
Options instance and appends values to it.
"""
options = DummyOptions()
options.parseOptions(['--auth', 'memory'])
self.assertEqual(len(options['credCheckers']), 1)
options = DummyOptions()
options.parseOptions(['--auth', 'memory', '--auth', 'memory'])
self.assertEqual(len(options['credCheckers']), 2)
def test_invalidAuthError(self):
"""
The C{--auth} command line raises an exception when it
gets a parameter it doesn't understand.
"""
options = DummyOptions()
# If someone adds a 'ThisPluginDoesNotExist' then this unit
# test should still run.
invalidParameter = getInvalidAuthType()
self.assertRaises(
usage.UsageError,
options.parseOptions, ['--auth', invalidParameter])
self.assertRaises(
usage.UsageError,
options.parseOptions, ['--help-auth-type', invalidParameter])
def test_createsDictionary(self):
"""
The C{--auth} command line creates a dictionary mapping supported
interfaces to the list of credentials checkers that support it.
"""
options = DummyOptions()
options.parseOptions(['--auth', 'memory', '--auth', 'anonymous'])
chd = options['credInterfaces']
self.assertEqual(len(chd[credentials.IAnonymous]), 1)
self.assertEqual(len(chd[credentials.IUsernamePassword]), 1)
chdAnonymous = chd[credentials.IAnonymous][0]
chdUserPass = chd[credentials.IUsernamePassword][0]
self.assertTrue(checkers.ICredentialsChecker.providedBy(chdAnonymous))
self.assertTrue(checkers.ICredentialsChecker.providedBy(chdUserPass))
self.assertIn(credentials.IAnonymous,
chdAnonymous.credentialInterfaces)
self.assertIn(credentials.IUsernamePassword,
chdUserPass.credentialInterfaces)
def test_credInterfacesProvidesLists(self):
"""
When two C{--auth} arguments are passed along which support the same
interface, a list with both is created.
"""
options = DummyOptions()
options.parseOptions(['--auth', 'memory', '--auth', 'unix'])
self.assertEqual(
options['credCheckers'],
options['credInterfaces'][credentials.IUsernamePassword])
def test_listDoesNotDisplayDuplicates(self):
"""
The list for C{--help-auth} does not duplicate items.
"""
authTypes = []
options = DummyOptions()
for cf in options._checkerFactoriesForOptHelpAuth():
self.assertNotIn(cf.authType, authTypes)
authTypes.append(cf.authType)
def test_displaysListCorrectly(self):
"""
The C{--help-auth} argument correctly displays all
available authentication plugins, then exits.
"""
newStdout = NativeStringIO()
options = DummyOptions()
options.authOutput = newStdout
self.assertRaises(SystemExit, options.parseOptions, ['--help-auth'])
for checkerFactory in strcred.findCheckerFactories():
self.assertIn(checkerFactory.authType, newStdout.getvalue())
def test_displaysHelpCorrectly(self):
"""
The C{--help-auth-for} argument will correctly display the help file for a
particular authentication plugin.
"""
newStdout = NativeStringIO()
options = DummyOptions()
options.authOutput = newStdout
self.assertRaises(
SystemExit, options.parseOptions, ['--help-auth-type', 'file'])
for line in cred_file.theFileCheckerFactory.authHelp:
if line.strip():
self.assertIn(line.strip(), newStdout.getvalue())
def test_unexpectedException(self):
"""
When the checker specified by C{--auth} raises an unexpected error, it
should be caught and re-raised within a L{usage.UsageError}.
"""
options = DummyOptions()
err = self.assertRaises(usage.UsageError, options.parseOptions,
['--auth', 'file'])
self.assertEqual(str(err),
"Unexpected error: 'file' requires a filename")
class OptionsForUsernamePassword(usage.Options, strcred.AuthOptionMixin):
supportedInterfaces = (credentials.IUsernamePassword,)
class OptionsForUsernameHashedPassword(usage.Options, strcred.AuthOptionMixin):
supportedInterfaces = (credentials.IUsernameHashedPassword,)
class OptionsSupportsAllInterfaces(usage.Options, strcred.AuthOptionMixin):
supportedInterfaces = None
class OptionsSupportsNoInterfaces(usage.Options, strcred.AuthOptionMixin):
supportedInterfaces = []
class LimitingInterfacesTests(unittest.TestCase):
"""
Tests functionality that allows an application to limit the
credential interfaces it can support. For the purposes of this
test, we use IUsernameHashedPassword, although this will never
really be used by the command line.
(I have, to date, not thought of a half-decent way for a user to
specify a hash algorithm via the command-line. Nor do I think it's
very useful.)
I should note that, at first, this test is counter-intuitive,
because we're using the checker with a pre-defined hash function
as the 'bad' checker. See the documentation for
L{twisted.cred.checkers.FilePasswordDB.hash} for more details.
"""
def setUp(self):
self.filename = self.mktemp()
with open(self.filename, 'wb') as f:
f.write(b'admin:asdf\nalice:foo\n')
self.goodChecker = checkers.FilePasswordDB(self.filename)
self.badChecker = checkers.FilePasswordDB(
self.filename, hash=self._hash)
self.anonChecker = checkers.AllowAnonymousAccess()
def _hash(self, networkUsername, networkPassword, storedPassword):
"""
A dumb hash that doesn't really do anything.
"""
return networkPassword
def test_supportsInterface(self):
"""
The supportsInterface method behaves appropriately.
"""
options = OptionsForUsernamePassword()
self.assertTrue(
options.supportsInterface(credentials.IUsernamePassword))
self.assertFalse(
options.supportsInterface(credentials.IAnonymous))
self.assertRaises(
strcred.UnsupportedInterfaces, options.addChecker,
self.anonChecker)
def test_supportsAllInterfaces(self):
"""
The supportsInterface method behaves appropriately
when the supportedInterfaces attribute is None.
"""
options = OptionsSupportsAllInterfaces()
self.assertTrue(
options.supportsInterface(credentials.IUsernamePassword))
self.assertTrue(
options.supportsInterface(credentials.IAnonymous))
def test_supportsCheckerFactory(self):
"""
The supportsCheckerFactory method behaves appropriately.
"""
options = OptionsForUsernamePassword()
fileCF = cred_file.theFileCheckerFactory
anonCF = cred_anonymous.theAnonymousCheckerFactory
self.assertTrue(options.supportsCheckerFactory(fileCF))
self.assertFalse(options.supportsCheckerFactory(anonCF))
def test_canAddSupportedChecker(self):
"""
When addChecker is called with a checker that implements at least one
of the interfaces our application supports, it is successful.
"""
options = OptionsForUsernamePassword()
options.addChecker(self.goodChecker)
iface = options.supportedInterfaces[0]
# Test that we did get IUsernamePassword
self.assertIdentical(
options['credInterfaces'][iface][0], self.goodChecker)
self.assertIdentical(options['credCheckers'][0], self.goodChecker)
# Test that we didn't get IUsernameHashedPassword
self.assertEqual(len(options['credInterfaces'][iface]), 1)
self.assertEqual(len(options['credCheckers']), 1)
def test_failOnAddingUnsupportedChecker(self):
"""
When addChecker is called with a checker that does not implement any
supported interfaces, it fails.
"""
options = OptionsForUsernameHashedPassword()
self.assertRaises(strcred.UnsupportedInterfaces,
options.addChecker, self.badChecker)
def test_unsupportedInterfaceError(self):
"""
The C{--auth} command line raises an exception when it
gets a checker we don't support.
"""
options = OptionsSupportsNoInterfaces()
authType = cred_anonymous.theAnonymousCheckerFactory.authType
self.assertRaises(
usage.UsageError,
options.parseOptions, ['--auth', authType])
def test_helpAuthLimitsOutput(self):
"""
C{--help-auth} will only list checkers that purport to
supply at least one of the credential interfaces our
application can use.
"""
options = OptionsForUsernamePassword()
for factory in options._checkerFactoriesForOptHelpAuth():
invalid = True
for interface in factory.credentialInterfaces:
if options.supportsInterface(interface):
invalid = False
if invalid:
raise strcred.UnsupportedInterfaces()
def test_helpAuthTypeLimitsOutput(self):
"""
C{--help-auth-type} will display a warning if you get
help for an authType that does not supply at least one of the
credential interfaces our application can use.
"""
options = OptionsForUsernamePassword()
# Find an interface that we can use for our test
invalidFactory = None
for factory in strcred.findCheckerFactories():
if not options.supportsCheckerFactory(factory):
invalidFactory = factory
break
self.assertNotIdentical(invalidFactory, None)
# Capture output and make sure the warning is there
newStdout = NativeStringIO()
options.authOutput = newStdout
self.assertRaises(SystemExit, options.parseOptions,
['--help-auth-type', 'anonymous'])
self.assertIn(strcred.notSupportedWarning, newStdout.getvalue())