811 lines
26 KiB
Python
811 lines
26 KiB
Python
# -*- test-case-name: twisted.conch.test.test_telnet -*-
|
|
# Copyright (c) Twisted Matrix Laboratories.
|
|
# See LICENSE for details.
|
|
|
|
"""
|
|
Tests for L{twisted.conch.telnet}.
|
|
"""
|
|
|
|
from __future__ import absolute_import, division
|
|
|
|
from zope.interface import implementer
|
|
from zope.interface.verify import verifyObject
|
|
|
|
from twisted.internet import defer
|
|
|
|
from twisted.conch import telnet
|
|
|
|
from twisted.trial import unittest
|
|
from twisted.test import proto_helpers
|
|
from twisted.python.compat import iterbytes
|
|
|
|
|
|
@implementer(telnet.ITelnetProtocol)
|
|
class TestProtocol:
|
|
localEnableable = ()
|
|
remoteEnableable = ()
|
|
|
|
def __init__(self):
|
|
self.data = b''
|
|
self.subcmd = []
|
|
self.calls = []
|
|
|
|
self.enabledLocal = []
|
|
self.enabledRemote = []
|
|
self.disabledLocal = []
|
|
self.disabledRemote = []
|
|
|
|
|
|
def makeConnection(self, transport):
|
|
d = transport.negotiationMap = {}
|
|
d[b'\x12'] = self.neg_TEST_COMMAND
|
|
|
|
d = transport.commandMap = transport.commandMap.copy()
|
|
for cmd in ('NOP', 'DM', 'BRK', 'IP', 'AO', 'AYT', 'EC', 'EL', 'GA'):
|
|
d[getattr(telnet, cmd)] = lambda arg, cmd=cmd: self.calls.append(cmd)
|
|
|
|
|
|
def dataReceived(self, data):
|
|
self.data += data
|
|
|
|
|
|
def connectionLost(self, reason):
|
|
pass
|
|
|
|
|
|
def neg_TEST_COMMAND(self, payload):
|
|
self.subcmd = payload
|
|
|
|
|
|
def enableLocal(self, option):
|
|
if option in self.localEnableable:
|
|
self.enabledLocal.append(option)
|
|
return True
|
|
return False
|
|
|
|
|
|
def disableLocal(self, option):
|
|
self.disabledLocal.append(option)
|
|
|
|
|
|
def enableRemote(self, option):
|
|
if option in self.remoteEnableable:
|
|
self.enabledRemote.append(option)
|
|
return True
|
|
return False
|
|
|
|
|
|
def disableRemote(self, option):
|
|
self.disabledRemote.append(option)
|
|
|
|
|
|
|
|
class InterfacesTests(unittest.TestCase):
|
|
def test_interface(self):
|
|
"""
|
|
L{telnet.TelnetProtocol} implements L{telnet.ITelnetProtocol}
|
|
"""
|
|
p = telnet.TelnetProtocol()
|
|
verifyObject(telnet.ITelnetProtocol, p)
|
|
|
|
|
|
|
|
class TelnetTransportTests(unittest.TestCase):
|
|
"""
|
|
Tests for L{telnet.TelnetTransport}.
|
|
"""
|
|
def setUp(self):
|
|
self.p = telnet.TelnetTransport(TestProtocol)
|
|
self.t = proto_helpers.StringTransport()
|
|
self.p.makeConnection(self.t)
|
|
|
|
|
|
def testRegularBytes(self):
|
|
# Just send a bunch of bytes. None of these do anything
|
|
# with telnet. They should pass right through to the
|
|
# application layer.
|
|
h = self.p.protocol
|
|
|
|
L = [b"here are some bytes la la la",
|
|
b"some more arrive here",
|
|
b"lots of bytes to play with",
|
|
b"la la la",
|
|
b"ta de da",
|
|
b"dum"]
|
|
for b in L:
|
|
self.p.dataReceived(b)
|
|
|
|
self.assertEqual(h.data, b''.join(L))
|
|
|
|
|
|
def testNewlineHandling(self):
|
|
# Send various kinds of newlines and make sure they get translated
|
|
# into \n.
|
|
h = self.p.protocol
|
|
|
|
L = [b"here is the first line\r\n",
|
|
b"here is the second line\r\0",
|
|
b"here is the third line\r\n",
|
|
b"here is the last line\r\0"]
|
|
|
|
for b in L:
|
|
self.p.dataReceived(b)
|
|
|
|
self.assertEqual(h.data, L[0][:-2] + b'\n' +
|
|
L[1][:-2] + b'\r' +
|
|
L[2][:-2] + b'\n' +
|
|
L[3][:-2] + b'\r')
|
|
|
|
|
|
def testIACEscape(self):
|
|
# Send a bunch of bytes and a couple quoted \xFFs. Unquoted,
|
|
# \xFF is a telnet command. Quoted, one of them from each pair
|
|
# should be passed through to the application layer.
|
|
h = self.p.protocol
|
|
|
|
L = [b"here are some bytes\xff\xff with an embedded IAC",
|
|
b"and here is a test of a border escape\xff",
|
|
b"\xff did you get that IAC?"]
|
|
|
|
for b in L:
|
|
self.p.dataReceived(b)
|
|
|
|
self.assertEqual(h.data, b''.join(L).replace(b'\xff\xff', b'\xff'))
|
|
|
|
|
|
def _simpleCommandTest(self, cmdName):
|
|
# Send a single simple telnet command and make sure
|
|
# it gets noticed and the appropriate method gets
|
|
# called.
|
|
h = self.p.protocol
|
|
|
|
cmd = telnet.IAC + getattr(telnet, cmdName)
|
|
L = [b"Here's some bytes, tra la la",
|
|
b"But ono!" + cmd + b" an interrupt"]
|
|
|
|
for b in L:
|
|
self.p.dataReceived(b)
|
|
|
|
self.assertEqual(h.calls, [cmdName])
|
|
self.assertEqual(h.data, b''.join(L).replace(cmd, b''))
|
|
|
|
|
|
def testInterrupt(self):
|
|
self._simpleCommandTest("IP")
|
|
|
|
|
|
def testNoOperation(self):
|
|
self._simpleCommandTest("NOP")
|
|
|
|
|
|
def testDataMark(self):
|
|
self._simpleCommandTest("DM")
|
|
|
|
|
|
def testBreak(self):
|
|
self._simpleCommandTest("BRK")
|
|
|
|
|
|
def testAbortOutput(self):
|
|
self._simpleCommandTest("AO")
|
|
|
|
|
|
def testAreYouThere(self):
|
|
self._simpleCommandTest("AYT")
|
|
|
|
|
|
def testEraseCharacter(self):
|
|
self._simpleCommandTest("EC")
|
|
|
|
|
|
def testEraseLine(self):
|
|
self._simpleCommandTest("EL")
|
|
|
|
|
|
def testGoAhead(self):
|
|
self._simpleCommandTest("GA")
|
|
|
|
|
|
def testSubnegotiation(self):
|
|
# Send a subnegotiation command and make sure it gets
|
|
# parsed and that the correct method is called.
|
|
h = self.p.protocol
|
|
|
|
cmd = telnet.IAC + telnet.SB + b'\x12hello world' + telnet.IAC + telnet.SE
|
|
L = [b"These are some bytes but soon" + cmd,
|
|
b"there will be some more"]
|
|
|
|
for b in L:
|
|
self.p.dataReceived(b)
|
|
|
|
self.assertEqual(h.data, b''.join(L).replace(cmd, b''))
|
|
self.assertEqual(h.subcmd, list(iterbytes(b"hello world")))
|
|
|
|
|
|
def testSubnegotiationWithEmbeddedSE(self):
|
|
# Send a subnegotiation command with an embedded SE. Make sure
|
|
# that SE gets passed to the correct method.
|
|
h = self.p.protocol
|
|
|
|
cmd = (telnet.IAC + telnet.SB +
|
|
b'\x12' + telnet.SE +
|
|
telnet.IAC + telnet.SE)
|
|
|
|
L = [b"Some bytes are here" + cmd + b"and here",
|
|
b"and here"]
|
|
|
|
for b in L:
|
|
self.p.dataReceived(b)
|
|
|
|
self.assertEqual(h.data, b''.join(L).replace(cmd, b''))
|
|
self.assertEqual(h.subcmd, [telnet.SE])
|
|
|
|
|
|
def testBoundarySubnegotiation(self):
|
|
# Send a subnegotiation command. Split it at every possible byte boundary
|
|
# and make sure it always gets parsed and that it is passed to the correct
|
|
# method.
|
|
cmd = (telnet.IAC + telnet.SB +
|
|
b'\x12' + telnet.SE + b'hello' +
|
|
telnet.IAC + telnet.SE)
|
|
|
|
for i in range(len(cmd)):
|
|
h = self.p.protocol = TestProtocol()
|
|
h.makeConnection(self.p)
|
|
|
|
a, b = cmd[:i], cmd[i:]
|
|
L = [b"first part" + a,
|
|
b + b"last part"]
|
|
|
|
for data in L:
|
|
self.p.dataReceived(data)
|
|
|
|
self.assertEqual(h.data, b''.join(L).replace(cmd, b''))
|
|
self.assertEqual(h.subcmd, [telnet.SE] + list(iterbytes(b'hello')))
|
|
|
|
|
|
def _enabledHelper(self, o, eL=[], eR=[], dL=[], dR=[]):
|
|
self.assertEqual(o.enabledLocal, eL)
|
|
self.assertEqual(o.enabledRemote, eR)
|
|
self.assertEqual(o.disabledLocal, dL)
|
|
self.assertEqual(o.disabledRemote, dR)
|
|
|
|
|
|
def testRefuseWill(self):
|
|
# Try to enable an option. The server should refuse to enable it.
|
|
cmd = telnet.IAC + telnet.WILL + b'\x12'
|
|
|
|
data = b"surrounding bytes" + cmd + b"to spice things up"
|
|
self.p.dataReceived(data)
|
|
|
|
self.assertEqual(self.p.protocol.data, data.replace(cmd, b''))
|
|
self.assertEqual(self.t.value(), telnet.IAC + telnet.DONT + b'\x12')
|
|
self._enabledHelper(self.p.protocol)
|
|
|
|
|
|
def testRefuseDo(self):
|
|
# Try to enable an option. The server should refuse to enable it.
|
|
cmd = telnet.IAC + telnet.DO + b'\x12'
|
|
|
|
data = b"surrounding bytes" + cmd + b"to spice things up"
|
|
self.p.dataReceived(data)
|
|
|
|
self.assertEqual(self.p.protocol.data, data.replace(cmd, b''))
|
|
self.assertEqual(self.t.value(), telnet.IAC + telnet.WONT + b'\x12')
|
|
self._enabledHelper(self.p.protocol)
|
|
|
|
|
|
def testAcceptDo(self):
|
|
# Try to enable an option. The option is in our allowEnable
|
|
# list, so we will allow it to be enabled.
|
|
cmd = telnet.IAC + telnet.DO + b'\x19'
|
|
data = b'padding' + cmd + b'trailer'
|
|
|
|
h = self.p.protocol
|
|
h.localEnableable = (b'\x19',)
|
|
self.p.dataReceived(data)
|
|
|
|
self.assertEqual(self.t.value(), telnet.IAC + telnet.WILL + b'\x19')
|
|
self._enabledHelper(h, eL=[b'\x19'])
|
|
|
|
|
|
def testAcceptWill(self):
|
|
# Same as testAcceptDo, but reversed.
|
|
cmd = telnet.IAC + telnet.WILL + b'\x91'
|
|
data = b'header' + cmd + b'padding'
|
|
|
|
h = self.p.protocol
|
|
h.remoteEnableable = (b'\x91',)
|
|
self.p.dataReceived(data)
|
|
|
|
self.assertEqual(self.t.value(), telnet.IAC + telnet.DO + b'\x91')
|
|
self._enabledHelper(h, eR=[b'\x91'])
|
|
|
|
|
|
def testAcceptWont(self):
|
|
# Try to disable an option. The server must allow any option to
|
|
# be disabled at any time. Make sure it disables it and sends
|
|
# back an acknowledgement of this.
|
|
cmd = telnet.IAC + telnet.WONT + b'\x29'
|
|
|
|
# Jimmy it - after these two lines, the server will be in a state
|
|
# such that it believes the option to have been previously enabled
|
|
# via normal negotiation.
|
|
s = self.p.getOptionState(b'\x29')
|
|
s.him.state = 'yes'
|
|
|
|
data = b"fiddle dee" + cmd
|
|
self.p.dataReceived(data)
|
|
|
|
self.assertEqual(self.p.protocol.data, data.replace(cmd, b''))
|
|
self.assertEqual(self.t.value(), telnet.IAC + telnet.DONT + b'\x29')
|
|
self.assertEqual(s.him.state, 'no')
|
|
self._enabledHelper(self.p.protocol, dR=[b'\x29'])
|
|
|
|
|
|
def testAcceptDont(self):
|
|
# Try to disable an option. The server must allow any option to
|
|
# be disabled at any time. Make sure it disables it and sends
|
|
# back an acknowledgement of this.
|
|
cmd = telnet.IAC + telnet.DONT + b'\x29'
|
|
|
|
# Jimmy it - after these two lines, the server will be in a state
|
|
# such that it believes the option to have beenp previously enabled
|
|
# via normal negotiation.
|
|
s = self.p.getOptionState(b'\x29')
|
|
s.us.state = 'yes'
|
|
|
|
data = b"fiddle dum " + cmd
|
|
self.p.dataReceived(data)
|
|
|
|
self.assertEqual(self.p.protocol.data, data.replace(cmd, b''))
|
|
self.assertEqual(self.t.value(), telnet.IAC + telnet.WONT + b'\x29')
|
|
self.assertEqual(s.us.state, 'no')
|
|
self._enabledHelper(self.p.protocol, dL=[b'\x29'])
|
|
|
|
|
|
def testIgnoreWont(self):
|
|
# Try to disable an option. The option is already disabled. The
|
|
# server should send nothing in response to this.
|
|
cmd = telnet.IAC + telnet.WONT + b'\x47'
|
|
|
|
data = b"dum de dum" + cmd + b"tra la la"
|
|
self.p.dataReceived(data)
|
|
|
|
self.assertEqual(self.p.protocol.data, data.replace(cmd, b''))
|
|
self.assertEqual(self.t.value(), b'')
|
|
self._enabledHelper(self.p.protocol)
|
|
|
|
|
|
def testIgnoreDont(self):
|
|
# Try to disable an option. The option is already disabled. The
|
|
# server should send nothing in response to this. Doing so could
|
|
# lead to a negotiation loop.
|
|
cmd = telnet.IAC + telnet.DONT + b'\x47'
|
|
|
|
data = b"dum de dum" + cmd + b"tra la la"
|
|
self.p.dataReceived(data)
|
|
|
|
self.assertEqual(self.p.protocol.data, data.replace(cmd, b''))
|
|
self.assertEqual(self.t.value(), b'')
|
|
self._enabledHelper(self.p.protocol)
|
|
|
|
|
|
def testIgnoreWill(self):
|
|
# Try to enable an option. The option is already enabled. The
|
|
# server should send nothing in response to this. Doing so could
|
|
# lead to a negotiation loop.
|
|
cmd = telnet.IAC + telnet.WILL + b'\x56'
|
|
|
|
# Jimmy it - after these two lines, the server will be in a state
|
|
# such that it believes the option to have been previously enabled
|
|
# via normal negotiation.
|
|
s = self.p.getOptionState(b'\x56')
|
|
s.him.state = 'yes'
|
|
|
|
data = b"tra la la" + cmd + b"dum de dum"
|
|
self.p.dataReceived(data)
|
|
|
|
self.assertEqual(self.p.protocol.data, data.replace(cmd, b''))
|
|
self.assertEqual(self.t.value(), b'')
|
|
self._enabledHelper(self.p.protocol)
|
|
|
|
|
|
def testIgnoreDo(self):
|
|
# Try to enable an option. The option is already enabled. The
|
|
# server should send nothing in response to this. Doing so could
|
|
# lead to a negotiation loop.
|
|
cmd = telnet.IAC + telnet.DO + b'\x56'
|
|
|
|
# Jimmy it - after these two lines, the server will be in a state
|
|
# such that it believes the option to have been previously enabled
|
|
# via normal negotiation.
|
|
s = self.p.getOptionState(b'\x56')
|
|
s.us.state = 'yes'
|
|
|
|
data = b"tra la la" + cmd + b"dum de dum"
|
|
self.p.dataReceived(data)
|
|
|
|
self.assertEqual(self.p.protocol.data, data.replace(cmd, b''))
|
|
self.assertEqual(self.t.value(), b'')
|
|
self._enabledHelper(self.p.protocol)
|
|
|
|
|
|
def testAcceptedEnableRequest(self):
|
|
# Try to enable an option through the user-level API. This
|
|
# returns a Deferred that fires when negotiation about the option
|
|
# finishes. Make sure it fires, make sure state gets updated
|
|
# properly, make sure the result indicates the option was enabled.
|
|
d = self.p.do(b'\x42')
|
|
|
|
h = self.p.protocol
|
|
h.remoteEnableable = (b'\x42',)
|
|
|
|
self.assertEqual(self.t.value(), telnet.IAC + telnet.DO + b'\x42')
|
|
|
|
self.p.dataReceived(telnet.IAC + telnet.WILL + b'\x42')
|
|
|
|
d.addCallback(self.assertEqual, True)
|
|
d.addCallback(lambda _: self._enabledHelper(h, eR=[b'\x42']))
|
|
return d
|
|
|
|
|
|
def test_refusedEnableRequest(self):
|
|
"""
|
|
If the peer refuses to enable an option we request it to enable, the
|
|
L{Deferred} returned by L{TelnetProtocol.do} fires with an
|
|
L{OptionRefused} L{Failure}.
|
|
"""
|
|
# Try to enable an option through the user-level API. This returns a
|
|
# Deferred that fires when negotiation about the option finishes. Make
|
|
# sure it fires, make sure state gets updated properly, make sure the
|
|
# result indicates the option was enabled.
|
|
self.p.protocol.remoteEnableable = (b'\x42',)
|
|
d = self.p.do(b'\x42')
|
|
|
|
self.assertEqual(self.t.value(), telnet.IAC + telnet.DO + b'\x42')
|
|
|
|
s = self.p.getOptionState(b'\x42')
|
|
self.assertEqual(s.him.state, 'no')
|
|
self.assertEqual(s.us.state, 'no')
|
|
self.assertTrue(s.him.negotiating)
|
|
self.assertFalse(s.us.negotiating)
|
|
|
|
self.p.dataReceived(telnet.IAC + telnet.WONT + b'\x42')
|
|
|
|
d = self.assertFailure(d, telnet.OptionRefused)
|
|
d.addCallback(lambda ignored: self._enabledHelper(self.p.protocol))
|
|
d.addCallback(
|
|
lambda ignored: self.assertFalse(s.him.negotiating))
|
|
return d
|
|
|
|
|
|
def test_refusedEnableOffer(self):
|
|
"""
|
|
If the peer refuses to allow us to enable an option, the L{Deferred}
|
|
returned by L{TelnetProtocol.will} fires with an L{OptionRefused}
|
|
L{Failure}.
|
|
"""
|
|
# Try to offer an option through the user-level API. This returns a
|
|
# Deferred that fires when negotiation about the option finishes. Make
|
|
# sure it fires, make sure state gets updated properly, make sure the
|
|
# result indicates the option was enabled.
|
|
self.p.protocol.localEnableable = (b'\x42',)
|
|
d = self.p.will(b'\x42')
|
|
|
|
self.assertEqual(self.t.value(), telnet.IAC + telnet.WILL + b'\x42')
|
|
|
|
s = self.p.getOptionState(b'\x42')
|
|
self.assertEqual(s.him.state, 'no')
|
|
self.assertEqual(s.us.state, 'no')
|
|
self.assertFalse(s.him.negotiating)
|
|
self.assertTrue(s.us.negotiating)
|
|
|
|
self.p.dataReceived(telnet.IAC + telnet.DONT + b'\x42')
|
|
|
|
d = self.assertFailure(d, telnet.OptionRefused)
|
|
d.addCallback(lambda ignored: self._enabledHelper(self.p.protocol))
|
|
d.addCallback(
|
|
lambda ignored: self.assertFalse(s.us.negotiating))
|
|
return d
|
|
|
|
|
|
def testAcceptedDisableRequest(self):
|
|
# Try to disable an option through the user-level API. This
|
|
# returns a Deferred that fires when negotiation about the option
|
|
# finishes. Make sure it fires, make sure state gets updated
|
|
# properly, make sure the result indicates the option was enabled.
|
|
s = self.p.getOptionState(b'\x42')
|
|
s.him.state = 'yes'
|
|
|
|
d = self.p.dont(b'\x42')
|
|
|
|
self.assertEqual(self.t.value(), telnet.IAC + telnet.DONT + b'\x42')
|
|
|
|
self.p.dataReceived(telnet.IAC + telnet.WONT + b'\x42')
|
|
|
|
d.addCallback(self.assertEqual, True)
|
|
d.addCallback(lambda _: self._enabledHelper(self.p.protocol,
|
|
dR=[b'\x42']))
|
|
return d
|
|
|
|
|
|
def testNegotiationBlocksFurtherNegotiation(self):
|
|
# Try to disable an option, then immediately try to enable it, then
|
|
# immediately try to disable it. Ensure that the 2nd and 3rd calls
|
|
# fail quickly with the right exception.
|
|
s = self.p.getOptionState(b'\x24')
|
|
s.him.state = 'yes'
|
|
self.p.dont(b'\x24') # fires after the first line of _final
|
|
|
|
def _do(x):
|
|
d = self.p.do(b'\x24')
|
|
return self.assertFailure(d, telnet.AlreadyNegotiating)
|
|
|
|
|
|
def _dont(x):
|
|
d = self.p.dont(b'\x24')
|
|
return self.assertFailure(d, telnet.AlreadyNegotiating)
|
|
|
|
|
|
def _final(x):
|
|
self.p.dataReceived(telnet.IAC + telnet.WONT + b'\x24')
|
|
# an assertion that only passes if d2 has fired
|
|
self._enabledHelper(self.p.protocol, dR=[b'\x24'])
|
|
# Make sure we allow this
|
|
self.p.protocol.remoteEnableable = (b'\x24',)
|
|
d = self.p.do(b'\x24')
|
|
self.p.dataReceived(telnet.IAC + telnet.WILL + b'\x24')
|
|
d.addCallback(self.assertEqual, True)
|
|
d.addCallback(lambda _: self._enabledHelper(self.p.protocol,
|
|
eR=[b'\x24'],
|
|
dR=[b'\x24']))
|
|
return d
|
|
|
|
d = _do(None)
|
|
d.addCallback(_dont)
|
|
d.addCallback(_final)
|
|
return d
|
|
|
|
|
|
def testSuperfluousDisableRequestRaises(self):
|
|
# Try to disable a disabled option. Make sure it fails properly.
|
|
d = self.p.dont(b'\xab')
|
|
return self.assertFailure(d, telnet.AlreadyDisabled)
|
|
|
|
|
|
def testSuperfluousEnableRequestRaises(self):
|
|
# Try to disable a disabled option. Make sure it fails properly.
|
|
s = self.p.getOptionState(b'\xab')
|
|
s.him.state = 'yes'
|
|
d = self.p.do(b'\xab')
|
|
return self.assertFailure(d, telnet.AlreadyEnabled)
|
|
|
|
|
|
def testLostConnectionFailsDeferreds(self):
|
|
d1 = self.p.do(b'\x12')
|
|
d2 = self.p.do(b'\x23')
|
|
d3 = self.p.do(b'\x34')
|
|
|
|
class TestException(Exception):
|
|
pass
|
|
|
|
self.p.connectionLost(TestException("Total failure!"))
|
|
|
|
d1 = self.assertFailure(d1, TestException)
|
|
d2 = self.assertFailure(d2, TestException)
|
|
d3 = self.assertFailure(d3, TestException)
|
|
return defer.gatherResults([d1, d2, d3])
|
|
|
|
|
|
class TestTelnet(telnet.Telnet):
|
|
"""
|
|
A trivial extension of the telnet protocol class useful to unit tests.
|
|
"""
|
|
def __init__(self):
|
|
telnet.Telnet.__init__(self)
|
|
self.events = []
|
|
|
|
|
|
def applicationDataReceived(self, data):
|
|
"""
|
|
Record the given data in C{self.events}.
|
|
"""
|
|
self.events.append(('bytes', data))
|
|
|
|
|
|
def unhandledCommand(self, command, data):
|
|
"""
|
|
Record the given command in C{self.events}.
|
|
"""
|
|
self.events.append(('command', command, data))
|
|
|
|
|
|
def unhandledSubnegotiation(self, command, data):
|
|
"""
|
|
Record the given subnegotiation command in C{self.events}.
|
|
"""
|
|
self.events.append(('negotiate', command, data))
|
|
|
|
|
|
|
|
class TelnetTests(unittest.TestCase):
|
|
"""
|
|
Tests for L{telnet.Telnet}.
|
|
|
|
L{telnet.Telnet} implements the TELNET protocol (RFC 854), including option
|
|
and suboption negotiation, and option state tracking.
|
|
"""
|
|
def setUp(self):
|
|
"""
|
|
Create an unconnected L{telnet.Telnet} to be used by tests.
|
|
"""
|
|
self.protocol = TestTelnet()
|
|
|
|
|
|
def test_enableLocal(self):
|
|
"""
|
|
L{telnet.Telnet.enableLocal} should reject all options, since
|
|
L{telnet.Telnet} does not know how to implement any options.
|
|
"""
|
|
self.assertFalse(self.protocol.enableLocal(b'\0'))
|
|
|
|
|
|
def test_enableRemote(self):
|
|
"""
|
|
L{telnet.Telnet.enableRemote} should reject all options, since
|
|
L{telnet.Telnet} does not know how to implement any options.
|
|
"""
|
|
self.assertFalse(self.protocol.enableRemote(b'\0'))
|
|
|
|
|
|
def test_disableLocal(self):
|
|
"""
|
|
It is an error for L{telnet.Telnet.disableLocal} to be called, since
|
|
L{telnet.Telnet.enableLocal} will never allow any options to be enabled
|
|
locally. If a subclass overrides enableLocal, it must also override
|
|
disableLocal.
|
|
"""
|
|
self.assertRaises(NotImplementedError, self.protocol.disableLocal, b'\0')
|
|
|
|
|
|
def test_disableRemote(self):
|
|
"""
|
|
It is an error for L{telnet.Telnet.disableRemote} to be called, since
|
|
L{telnet.Telnet.enableRemote} will never allow any options to be
|
|
enabled remotely. If a subclass overrides enableRemote, it must also
|
|
override disableRemote.
|
|
"""
|
|
self.assertRaises(NotImplementedError, self.protocol.disableRemote, b'\0')
|
|
|
|
|
|
def test_requestNegotiation(self):
|
|
"""
|
|
L{telnet.Telnet.requestNegotiation} formats the feature byte and the
|
|
payload bytes into the subnegotiation format and sends them.
|
|
|
|
See RFC 855.
|
|
"""
|
|
transport = proto_helpers.StringTransport()
|
|
self.protocol.makeConnection(transport)
|
|
self.protocol.requestNegotiation(b'\x01', b'\x02\x03')
|
|
self.assertEqual(
|
|
transport.value(),
|
|
# IAC SB feature bytes IAC SE
|
|
b'\xff\xfa\x01\x02\x03\xff\xf0')
|
|
|
|
|
|
def test_requestNegotiationEscapesIAC(self):
|
|
"""
|
|
If the payload for a subnegotiation includes I{IAC}, it is escaped by
|
|
L{telnet.Telnet.requestNegotiation} with another I{IAC}.
|
|
|
|
See RFC 855.
|
|
"""
|
|
transport = proto_helpers.StringTransport()
|
|
self.protocol.makeConnection(transport)
|
|
self.protocol.requestNegotiation(b'\x01', b'\xff')
|
|
self.assertEqual(
|
|
transport.value(),
|
|
b'\xff\xfa\x01\xff\xff\xff\xf0')
|
|
|
|
|
|
def _deliver(self, data, *expected):
|
|
"""
|
|
Pass the given bytes to the protocol's C{dataReceived} method and
|
|
assert that the given events occur.
|
|
"""
|
|
received = self.protocol.events = []
|
|
self.protocol.dataReceived(data)
|
|
self.assertEqual(received, list(expected))
|
|
|
|
|
|
def test_oneApplicationDataByte(self):
|
|
"""
|
|
One application-data byte in the default state gets delivered right
|
|
away.
|
|
"""
|
|
self._deliver(b'a', ('bytes', b'a'))
|
|
|
|
|
|
def test_twoApplicationDataBytes(self):
|
|
"""
|
|
Two application-data bytes in the default state get delivered
|
|
together.
|
|
"""
|
|
self._deliver(b'bc', ('bytes', b'bc'))
|
|
|
|
|
|
def test_threeApplicationDataBytes(self):
|
|
"""
|
|
Three application-data bytes followed by a control byte get
|
|
delivered, but the control byte doesn't.
|
|
"""
|
|
self._deliver(b'def' + telnet.IAC, ('bytes', b'def'))
|
|
|
|
|
|
def test_escapedControl(self):
|
|
"""
|
|
IAC in the escaped state gets delivered and so does another
|
|
application-data byte following it.
|
|
"""
|
|
self._deliver(telnet.IAC)
|
|
self._deliver(telnet.IAC + b'g', ('bytes', telnet.IAC + b'g'))
|
|
|
|
|
|
def test_carriageReturn(self):
|
|
"""
|
|
A carriage return only puts the protocol into the newline state. A
|
|
linefeed in the newline state causes just the newline to be
|
|
delivered. A nul in the newline state causes a carriage return to
|
|
be delivered. An IAC in the newline state causes a carriage return
|
|
to be delivered and puts the protocol into the escaped state.
|
|
Anything else causes a carriage return and that thing to be
|
|
delivered.
|
|
"""
|
|
self._deliver(b'\r')
|
|
self._deliver(b'\n', ('bytes', b'\n'))
|
|
self._deliver(b'\r\n', ('bytes', b'\n'))
|
|
|
|
self._deliver(b'\r')
|
|
self._deliver(b'\0', ('bytes', b'\r'))
|
|
self._deliver(b'\r\0', ('bytes', b'\r'))
|
|
|
|
self._deliver(b'\r')
|
|
self._deliver(b'a', ('bytes', b'\ra'))
|
|
self._deliver(b'\ra', ('bytes', b'\ra'))
|
|
|
|
self._deliver(b'\r')
|
|
self._deliver(
|
|
telnet.IAC + telnet.IAC + b'x', ('bytes', b'\r' + telnet.IAC + b'x'))
|
|
|
|
|
|
def test_applicationDataBeforeSimpleCommand(self):
|
|
"""
|
|
Application bytes received before a command are delivered before the
|
|
command is processed.
|
|
"""
|
|
self._deliver(
|
|
b'x' + telnet.IAC + telnet.NOP,
|
|
('bytes', b'x'), ('command', telnet.NOP, None))
|
|
|
|
|
|
def test_applicationDataBeforeCommand(self):
|
|
"""
|
|
Application bytes received before a WILL/WONT/DO/DONT are delivered
|
|
before the command is processed.
|
|
"""
|
|
self.protocol.commandMap = {}
|
|
self._deliver(
|
|
b'y' + telnet.IAC + telnet.WILL + b'\x00',
|
|
('bytes', b'y'), ('command', telnet.WILL, b'\x00'))
|
|
|
|
|
|
def test_applicationDataBeforeSubnegotiation(self):
|
|
"""
|
|
Application bytes received before a subnegotiation command are
|
|
delivered before the negotiation is processed.
|
|
"""
|
|
self._deliver(
|
|
b'z' + telnet.IAC + telnet.SB + b'Qx' + telnet.IAC + telnet.SE,
|
|
('bytes', b'z'), ('negotiate', b'Q', [b'x']))
|