#
# Copyright (C) 2010-2017 Samuel Abels
# The MIT License (MIT)
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
The Telnet protocol.
"""
from __future__ import absolute_import, unicode_literals
from future import standard_library
standard_library.install_aliases()
from ..util.tty import get_terminal_size
from . import telnetlib
from .protocol import Protocol
from .exception import ProtocolException, TimeoutException, \
DriverReplacedException, ExpectCancelledException
[docs]class Telnet(Protocol):
"""
The Telnet protocol adapter.
"""
[docs] def __init__(self, **kwargs):
Protocol.__init__(self, **kwargs)
self.tn = None
def _telnetlib_received(self, data):
self._receive_cb(data, False)
self.buffer.append(data)
def _connect_hook(self, hostname, port):
assert self.tn is None
rows, cols = get_terminal_size()
self.tn = telnetlib.Telnet(hostname,
port or 23,
encoding=self.encoding,
connect_timeout=self.connect_timeout,
termsize=(rows, cols),
termtype=self.termtype,
stderr=self.stderr,
receive_callback=self._telnetlib_received)
if self.debug >= 5:
self.tn.set_debuglevel(1)
if self.tn is None:
return False
return True
[docs] def send(self, data):
self._dbg(4, 'Sending %s' % repr(data))
try:
self.tn.write(data)
except Exception:
self._dbg(1, 'Error while writing to connection')
raise
def _domatch(self, prompt, flush):
if flush:
func = self.tn.expect
else:
func = self.tn.waitfor
# Wait for a prompt.
clean = self.get_driver().clean_response_for_re_match
self.response = None
try:
result, match, self.response = func(
prompt, self.timeout, cleanup=clean)
except Exception:
self._dbg(1, 'Error while waiting for ' + repr(prompt))
raise
if match:
self._dbg(2, "Got a prompt, match was %s" % repr(match.group()))
self.buffer.pop(len(self.response))
self._dbg(5, "Response was %s" % repr(self.response))
if result == -1:
error = 'Error while waiting for response from device'
raise TimeoutException(error)
if result == -2:
if self.driver_replaced:
self.driver_replaced = False
raise DriverReplacedException()
else:
raise ExpectCancelledException()
if self.response is None:
raise ProtocolException('whoops - response is None')
return result, match
[docs] def cancel_expect(self):
self.tn.cancel_expect = True
def _set_terminal_size(self, rows, cols):
self.tn.set_window_size(rows, cols)
[docs] def interact(self, key_handlers=None, handle_window_size=True):
return self._open_shell(self.tn.sock, key_handlers, handle_window_size)
[docs] def close(self, force=False):
if self.tn is None:
return
if not force:
try:
self.response = self.tn.read_all()
except Exception:
pass
self.tn.close()
self.tn = None
self.buffer.clear()
super(Telnet, self).close()