diff --git a/docs/history.rst b/docs/history.rst index a1948670..bb9c2680 100644 --- a/docs/history.rst +++ b/docs/history.rst @@ -1,5 +1,10 @@ History ======= +4.0.4 + * bugfix: servers using ``robot_check=True`` with ``encoding=False`` raised ``TypeError: buf + expected bytes, got ``. ``telnetlib3-server`` now also accepts ``--encoding=False`` + CLI argument. ``latin1`` encoding is used the default server shell and robot check. + 4.0.3 * bugfix: long-running servers leaked memory through :class:`~telnetlib3.server.Server` ``_protocols`` list and ``_new_client`` asyncio.Queue. Both are now bounded diff --git a/pyproject.toml b/pyproject.toml index a766a723..8efca6d0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "telnetlib3" -version = "4.0.3" # Keep in sync with telnetlib3/accessories.py::get_version ! +version = "4.0.4" # Keep in sync with telnetlib3/accessories.py::get_version ! description = " Python Telnet server and client CLI and Protocol library" readme = "README.rst" license = "ISC" diff --git a/telnetlib3/accessories.py b/telnetlib3/accessories.py index 9ba91975..6233f0cc 100644 --- a/telnetlib3/accessories.py +++ b/telnetlib3/accessories.py @@ -42,7 +42,7 @@ def get_version() -> str: """Return the current version of telnetlib3.""" - return "4.0.3" # keep in sync with pyproject.toml ! + return "4.0.4" # keep in sync with pyproject.toml ! def encoding_from_lang(lang: str) -> Optional[str]: diff --git a/telnetlib3/guard_shells.py b/telnetlib3/guard_shells.py index 3091799d..a76c20d7 100644 --- a/telnetlib3/guard_shells.py +++ b/telnetlib3/guard_shells.py @@ -18,7 +18,7 @@ import re import asyncio import logging -from typing import Tuple, Union, Optional, Generator, cast +from typing import Tuple, Union, Optional, Generator from contextlib import contextmanager # local @@ -64,6 +64,14 @@ def _latin1_reading( reader._decoder = None +def _writer_write(writer: Union[TelnetWriter, TelnetWriterUnicode], data: str) -> None: + """Write string data to writer in the appropriate type (str or bytes).""" + if writer.is_binary_writer: + writer.write(data.encode("latin-1")) + else: + writer.write(data) # type: ignore[arg-type] + + class ConnectionCounter: """Simple shared counter for limiting concurrent connections.""" @@ -100,12 +108,13 @@ def count(self) -> int: async def _read_line_inner(reader: Union[TelnetReader, TelnetReaderUnicode], max_len: int) -> str: """Inner loop for _read_line, separated for wait_for compatibility.""" - _reader = cast(TelnetReaderUnicode, reader) buf = "" while len(buf) < max_len: - char = await _reader.read(1) + char = await reader.read(1) if not char: break + if isinstance(char, bytes): + char = char.decode("latin-1") if char in ("\r", "\n"): break buf += char @@ -166,8 +175,7 @@ async def _get_cursor_position( :returns: (row, col) tuple or (None, None) on timeout/failure. """ # Send Device Status Report request - _writer = cast(TelnetWriterUnicode, writer) - _writer.write("\x1b[6n") + _writer_write(writer, "\x1b[6n") await writer.drain() # Read response: ESC [ row ; col R @@ -189,21 +197,20 @@ async def _measure_width( :returns: Width in columns, or None on failure. """ - _writer = cast(TelnetWriterUnicode, writer) _, x1 = await _get_cursor_position(reader, writer, timeout) if x1 is None: return None - _writer.write(text) - await _writer.drain() + _writer_write(writer, text) + await writer.drain() _, x2 = await _get_cursor_position(reader, writer, timeout) if x2 is None: return None # Clear the test character - _writer.write(f"\x1b[{x1}G" + " " * (x2 - x1) + f"\x1b[{x1}G") - await _writer.drain() + _writer_write(writer, f"\x1b[{x1}G" + " " * (x2 - x1) + f"\x1b[{x1}G") + await writer.drain() return x2 - x1 @@ -230,10 +237,9 @@ async def _ask_question( timeout: float = 10.0, ) -> Optional[str]: """Ask a question, echoing input and repeating prompt on blank input.""" - _writer = cast(TelnetWriterUnicode, writer) while True: - _writer.write(prompt) - await _writer.drain() + _writer_write(writer, prompt) + await writer.drain() line = await _readline_with_echo(reader, writer, timeout) if line is None: @@ -242,7 +248,7 @@ async def _ask_question( if line.strip(): return line # Blank input - repeat prompt - _writer.write("\r\n") + _writer_write(writer, "\r\n") async def robot_shell( @@ -254,7 +260,6 @@ async def robot_shell( Asks philosophical questions, logs responses, and disconnects. """ - writer = cast(TelnetWriterUnicode, writer) peername = writer.get_extra_info("peername") logger.info("robot_shell: connection from %s", peername) @@ -275,7 +280,7 @@ async def robot_shell( return answers.append(line2) - writer.write("\r\n") + _writer_write(writer, "\r\n") await writer.drain() finally: if answers: @@ -291,10 +296,9 @@ async def busy_shell( Displays busy message, logs any input, and disconnects. """ - writer = cast(TelnetWriterUnicode, writer) logger.info("busy_shell: connection from %s (limit reached)", writer.get_extra_info("peername")) - writer.write("Machine is busy, do not touch! ") + _writer_write(writer, "Machine is busy, do not touch! ") await writer.drain() with _latin1_reading(reader): @@ -302,12 +306,12 @@ async def busy_shell( if line1 is not None: logger.info("busy_shell: input1=%r", line1) - writer.write("\r\nYou hear a distant explosion... ") + _writer_write(writer, "\r\nYou hear a distant explosion... ") await writer.drain() line2 = await _read_line(reader, timeout=30.0) if line2 is not None: logger.info("busy_shell: input2=%r", line2) - writer.write("\r\n") + _writer_write(writer, "\r\n") await writer.drain() diff --git a/telnetlib3/server.py b/telnetlib3/server.py index a722e349..56bb74d5 100755 --- a/telnetlib3/server.py +++ b/telnetlib3/server.py @@ -913,7 +913,7 @@ def begin_advanced_negotiation(self) -> None: self.writer.iac(DO, LINEMODE) def _negotiate_echo(self) -> None: - """Skip ``WILL ECHO`` — LINEMODE EDIT client handles local echo.""" + """Skip ``WILL ECHO``: LINEMODE EDIT client handles local echo.""" if self._echo_negotiated: return self._echo_negotiated = True @@ -1297,7 +1297,12 @@ def parse_server_args( default=_config.connect_maxwait, help="timeout for pending negotiation", ) - parser.add_argument("--encoding", default=_config.encoding, help="encoding name") + parser.add_argument( + "--encoding", + default=_config.encoding, + type=lambda val: False if val.lower() == "false" else val, + help="encoding name, or 'false'/'False' to disable unicode", + ) parser.add_argument( "--force-binary", action="store_true", @@ -1398,9 +1403,12 @@ def parse_server_args( result["pty_raw"] = False # Auto-enable force_binary for any non-ASCII encoding that uses high-bit bytes. - enc_key = result["encoding"].lower().replace("-", "_") - if enc_key not in ("us_ascii", "ascii"): + if result["encoding"] is False: result["force_binary"] = True + else: + enc_key = result["encoding"].lower().replace("-", "_") + if enc_key not in ("us_ascii", "ascii"): + result["force_binary"] = True # Build SSLContext from --ssl-certfile / --ssl-keyfile ssl_certfile = result.pop("ssl_certfile", None) diff --git a/telnetlib3/server_shell.py b/telnetlib3/server_shell.py index 35a75ed8..8abc9a7d 100644 --- a/telnetlib3/server_shell.py +++ b/telnetlib3/server_shell.py @@ -5,7 +5,7 @@ # std imports import types import asyncio -from typing import Union, Optional, Generator, cast +from typing import Any, Union, Optional, Coroutine, Generator # 3rd party from wcwidth import wcswidth as _wcswidth @@ -34,36 +34,69 @@ _SS3 = "O" -async def filter_ansi(reader: TelnetReaderUnicode, _writer: TelnetWriterUnicode) -> str: +def _write(writer: Union[TelnetWriter, TelnetWriterUnicode], data: str) -> None: + """Write string data to writer in the appropriate type (str or bytes).""" + if writer.is_binary_writer: + writer.write(data.encode("latin-1")) + else: + writer.write(data) # type: ignore[arg-type] + + +def _echo(writer: Union[TelnetWriter, TelnetWriterUnicode], data: str) -> None: + """Echo string data to writer in the appropriate type (str or bytes).""" + if writer.is_binary_writer: + writer.echo(data.encode("latin-1")) + else: + writer.echo(data) # type: ignore[arg-type] + + +async def filter_ansi( + reader: Union[TelnetReader, TelnetReaderUnicode], + _writer: Union[TelnetWriter, TelnetWriterUnicode], +) -> str: """ Read and return the next non-ANSI-escape character from reader. When wcwidth is available, handles CSI, OSC, DCS, APC, PM, charset designation, Fe, Fp, and SS3 sequences. Otherwise falls back to CSI and SS3 only. """ - while True: - char = await reader.read(1) - if not char: + binary = reader.is_binary_reader + + def _rch() -> "Coroutine[Any, Any, Union[str, bytes]]": + """Read one character.""" + return reader.read(1) + + async def _next() -> str: + ch = await _rch() + if not ch: return "" + return ch.decode("latin-1") if binary else ch # type: ignore[return-value] + + char = await _next() + + while True: if char != ESC: return char - next_char = await reader.read(1) + next_char = await _next() if not next_char: return "" - # SS3: ESC O + one final byte (F1-F4, keypad, app-mode arrows). - # Handled before wcwidth's ZERO_WIDTH_PATTERN which would match - # ESC O as a 2-byte Fe sequence, missing the third byte. if next_char == _SS3: - await reader.read(1) + # SS3: ESC O + one final byte (F1-F4, keypad, app-mode arrows). + # Handled before wcwidth's ZERO_WIDTH_PATTERN which would match + # ESC O as a 2-byte Fe sequence, missing the third byte. + await _rch() # consume SS3 final byte + char = await _next() + if not char: + return "" continue buf = ESC + next_char if next_char in _SEQ_STARTERS: # Multi-byte: CSI, OSC, DCS, APC, PM, or charset while len(buf) < 256: - seq_char = await reader.read(1) + seq_char = await _next() if not seq_char: break buf += seq_char @@ -81,6 +114,10 @@ async def filter_ansi(reader: TelnetReaderUnicode, _writer: TelnetWriterUnicode) if not match: return next_char + char = await _next() + if not char: + return "" + def _backspace_grapheme(command: str) -> tuple[str, str]: """Remove last grapheme cluster, return (new_command, echo_str).""" @@ -157,56 +194,53 @@ async def telnet_server_shell( This shell provides a very simple REPL, allowing introspection and state toggling of the connected client session. """ - _reader = cast(TelnetReaderUnicode, reader) - writer = cast(TelnetWriterUnicode, writer) - ssl_obj = writer.get_extra_info("ssl_object") if ssl_obj is not None: version = ssl_obj.version() or "TLS" - writer.write(f"Ready (secure: {version})." + CR + LF) + _write(writer, f"Ready (secure: {version})." + CR + LF) else: - writer.write("Ready." + CR + LF) + _write(writer, "Ready." + CR + LF) command = None while not writer.is_closing(): if command: - writer.write(CR + LF) - writer.write("tel:sh> ") + _write(writer, CR + LF) + _write(writer, "tel:sh> ") if not getattr(writer.protocol, "never_send_ga", False): writer.send_ga() await writer.drain() - command = await readline_async(_reader, writer) + command = await readline_async(reader, writer) if command is None: return - writer.write(CR + LF) + _write(writer, CR + LF) if command == "quit": # server hangs up on client - writer.write("Goodbye." + CR + LF) + _write(writer, "Goodbye." + CR + LF) break if command == "help": - writer.write("quit, writer, slc, linemode, toggle [option|all], reader, proto, dump") + _write(writer, "quit, writer, slc, linemode, toggle [option|all], reader, proto, dump") elif command == "writer": # show 'writer' status - writer.write(repr(writer)) + _write(writer, repr(writer)) elif command == "reader": # show 'reader' status - writer.write(repr(reader)) + _write(writer, repr(reader)) elif command == "proto": # show 'proto' details of writer - writer.write(repr(writer.protocol)) + _write(writer, repr(writer.protocol)) elif command == "version": - writer.write(accessories.get_version()) + _write(writer, accessories.get_version()) elif command == "slc": # show 'slc' support and data tables - writer.write(get_slcdata(writer)) + _write(writer, get_slcdata(writer)) elif command == "linemode": - writer.write(get_linemode(writer)) + _write(writer, get_linemode(writer)) elif command.startswith("toggle"): # toggle specified options option = command[len("toggle ") :] or None - writer.write(do_toggle(writer, option)) + _write(writer, do_toggle(writer, option)) elif command.startswith("dump"): # dump [kb] [ms_delay] [drain|nodrain] [close|noclose] # @@ -232,22 +266,22 @@ async def telnet_server_shell( except IndexError: do_close = False msg = f"kb_limit={kb_limit}, delay={delay}," f" drain={drain}, do_close={do_close}:\r\n" - writer.write(msg) + _write(writer, msg) for lineout in character_dump(kb_limit): if writer.is_closing(): break - writer.write(lineout) + _write(writer, lineout) if drain: await writer.drain() if delay: await asyncio.sleep(delay) if not writer.is_closing(): - writer.write(f"\r\n{kb_limit} OK") + _write(writer, f"\r\n{kb_limit} OK") if do_close: break elif command: - writer.write("no such command.") + _write(writer, "no such command.") writer.close() @@ -275,13 +309,15 @@ def readline( Uses ``_LineEditor`` for grapheme-aware backspace and max_visible_width support. """ - _writer = cast(TelnetWriterUnicode, writer) editor = _LineEditor(max_visible_width=max_visible_width) inp = yield None while True: echo, cmd = editor.feed(inp) if echo: - _writer.echo(echo) + if writer.is_binary_writer: + writer.echo(echo.encode("latin-1")) + else: + writer.echo(echo) # type: ignore[arg-type] inp = yield cmd @@ -296,11 +332,9 @@ async def readline_async( Uses ``filter_ansi()`` to strip escape sequences and ``_LineEditor`` for grapheme-aware backspace and max_visible_width support. """ - _reader = cast(TelnetReaderUnicode, reader) - _writer = cast(TelnetWriterUnicode, writer) editor = _LineEditor(max_visible_width=max_visible_width) while True: - next_char = await filter_ansi(_reader, _writer) + next_char = await filter_ansi(reader, writer) if not next_char: return None # Skip leading LF/NUL on empty buffer -- accounts for @@ -309,7 +343,10 @@ async def readline_async( continue echo, cmd = editor.feed(next_char) if echo: - _writer.echo(echo) + if writer.is_binary_writer: + writer.echo(echo.encode("latin-1")) + else: + writer.echo(echo) # type: ignore[arg-type] if cmd is not None: return cmd diff --git a/telnetlib3/stream_reader.py b/telnetlib3/stream_reader.py index 1cc3b0cf..436056fc 100644 --- a/telnetlib3/stream_reader.py +++ b/telnetlib3/stream_reader.py @@ -24,6 +24,9 @@ class TelnetReader: A copy of :class:`asyncio.StreamReader` with telnet-aware readline(). """ + #: Whether this reader returns raw bytes (True) or unicode strings (False). + is_binary_reader: bool = True + _source_traceback = None def __init__(self, limit: int = _DEFAULT_LIMIT) -> None: @@ -552,6 +555,9 @@ class TelnetReaderUnicode(TelnetReader): configurable encoding determined by callback function. """ + #: Unicode readers return strings, not raw bytes. + is_binary_reader: bool = False + #: Late-binding instance of :class:`codecs.IncrementalDecoder`, some #: bytes may be lost if the protocol's encoding is changed after #: previously receiving a partial multibyte. This isn't common in diff --git a/telnetlib3/stream_writer.py b/telnetlib3/stream_writer.py index 4b0f7492..94f349d8 100644 --- a/telnetlib3/stream_writer.py +++ b/telnetlib3/stream_writer.py @@ -116,6 +116,9 @@ class TelnetWriter: A copy of :class:`asyncio.StreamWriter` with IAC interpretation. """ + #: Whether this writer expects raw bytes (True) or unicode strings (False). + is_binary_writer: bool = True + #: Total bytes sent to :meth:`~.feed_byte` byte_count = 0 @@ -3283,6 +3286,9 @@ class TelnetWriterUnicode(TelnetWriter): discovered by ``LANG`` environment variables by NEW_ENVIRON, :rfc:`1572`. """ + #: Unicode writers receive strings, not raw bytes. + is_binary_writer: bool = False + def __init__( self, transport: asyncio.Transport, diff --git a/telnetlib3/tests/test_client_unit.py b/telnetlib3/tests/test_client_unit.py index b832245d..70cce62b 100644 --- a/telnetlib3/tests/test_client_unit.py +++ b/telnetlib3/tests/test_client_unit.py @@ -277,6 +277,8 @@ async def test_guard_shells_busy_shell(): from telnetlib3.guard_shells import busy_shell class MockWriter: + is_binary_writer = False + def __init__(self): self.output = [] self._extra = {"peername": ("127.0.0.1", 12345)} @@ -307,6 +309,8 @@ async def test_guard_shells_robot_check_timeout(): from telnetlib3.guard_shells import robot_check class MockWriter: + is_binary_writer = False + def __init__(self): self.output = [] self._extra = {"peername": ("127.0.0.1", 12345)} diff --git a/telnetlib3/tests/test_guard_integration.py b/telnetlib3/tests/test_guard_integration.py index 5fa61d91..d621ee4c 100644 --- a/telnetlib3/tests/test_guard_integration.py +++ b/telnetlib3/tests/test_guard_integration.py @@ -97,6 +97,8 @@ async def test_guarded_shell_pattern_busy_shell(): shell_done = asyncio.Event() class MockWriter: + is_binary_writer = False + def __init__(self): self._closing = False @@ -116,6 +118,8 @@ def get_extra_info(self, key, default=None): return ("127.0.0.1", 12345) if key == "peername" else default class MockReader: + is_binary_reader = False + def __init__(self): self._data = list("response\r") self._idx = 0 @@ -169,6 +173,8 @@ async def test_guarded_shell_pattern_robot_check(): robot_shell_calls = [] class MockWriter: + is_binary_writer = False + def __init__(self): self._closing = False @@ -188,6 +194,8 @@ def get_extra_info(self, key, default=None): return ("127.0.0.1", 12345) if key == "peername" else default class MockReader: + is_binary_reader = False + def __init__(self): self._data = list("response\r") self._idx = 0 @@ -249,6 +257,8 @@ async def test_full_guarded_shell_flow(): robot_calls = [] class MockWriter: + is_binary_writer = False + def __init__(self): self._closing = False self.output = [] @@ -272,6 +282,8 @@ def get_extra_info(self, key, default=None): return ("127.0.0.1", 12345) if key == "peername" else default class MockReader: + is_binary_reader = False + def __init__(self, responses=None): self._data = responses or list("response\r") self._idx = 0 @@ -446,3 +458,99 @@ async def guarded_shell(reader, writer): assert measured_width, "server shell never ran" assert measured_width[0] == 1, f"expected width=1, got {measured_width[0]}" + + +async def test_robot_check_encoding_false_binary_writer(): + """robot_check works with encoding=False (binary TelnetReader/TelnetWriter).""" + from telnetlib3.guard_shells import robot_check as do_robot_check + + class MockBinaryWriter: + is_binary_writer = True + + def __init__(self): + self._closing = False + self.written = [] + + def write(self, data): + assert isinstance(data, bytes), f"expected bytes, got {type(data)}" + self.written.append(data) + + async def drain(self): + pass + + def is_closing(self): + return self._closing + + def close(self): + self._closing = True + + def get_extra_info(self, key, default=None): + return ("127.0.0.1", 12345) if key == "peername" else default + + class MockBinaryReader: + def __init__(self): + self._chunks = [b"\x1b[1;1R", b"\x1b[1;2R"] + self._idx = 0 + self._pos = 0 + + async def read(self, n): + while self._idx < len(self._chunks): + chunk = self._chunks[self._idx] + if self._pos < len(chunk): + result = chunk[self._pos : self._pos + n] + self._pos += n + return result + self._idx += 1 + self._pos = 0 + return b"" + + reader = MockBinaryReader() + writer = MockBinaryWriter() + result = await do_robot_check(reader, writer, timeout=2.0) + assert result is True, f"robot_check should pass, got {result}" + + +@pytest.mark.asyncio +async def test_encoding_false_with_robot_check_passes(unused_tcp_port): + """Server with encoding=False + robot_check passes a CPR-capable client.""" + import functools + + import telnetlib3 + from telnetlib3.guard_shells import robot_check as do_robot_check + from telnetlib3.guard_shells import robot_shell + from telnetlib3.tests.accessories import create_server + + check_results = [] + + async def shell(reader, writer): + passed = await do_robot_check(reader, writer, timeout=5.0) + check_results.append(passed) + if passed: + writer.write("PASS\r\n") + else: + await robot_shell(reader, writer) + await writer.drain() + writer.close() + await writer.wait_closed() + + async with create_server( + host="127.0.0.1", port=unused_tcp_port, shell=shell, encoding=False, connect_maxwait=0.5 + ): + import telnetlib3 + from telnetlib3 import server_fingerprinting as sfp + + client_shell = functools.partial( + sfp.fingerprinting_client_shell, + host="127.0.0.1", + port=unused_tcp_port, + silent=True, + banner_quiet_time=1.0, + banner_max_wait=5.0, + ) + reader, writer = await telnetlib3.open_connection( + host="127.0.0.1", port=unused_tcp_port, encoding=False, shell=client_shell + ) + await asyncio.wait_for(writer.protocol.waiter_closed, timeout=10.0) + + assert check_results, "server shell never ran" + assert check_results[0] is True, f"robot_check should pass, got {check_results[0]}" diff --git a/telnetlib3/tests/test_relay_server.py b/telnetlib3/tests/test_relay_server.py index 22ce921b..6133864d 100644 --- a/telnetlib3/tests/test_relay_server.py +++ b/telnetlib3/tests/test_relay_server.py @@ -9,6 +9,8 @@ class FakeWriter: + is_binary_writer = False + def __init__(self): self.buffer = [] self.closed = False diff --git a/telnetlib3/tests/test_server.py b/telnetlib3/tests/test_server.py index b8cc79eb..67f4d2ff 100644 --- a/telnetlib3/tests/test_server.py +++ b/telnetlib3/tests/test_server.py @@ -350,6 +350,22 @@ def test_parse_server_args_ascii_no_force_binary(): assert result["force_binary"] is False +@pytest.mark.parametrize( + "arg,expected_encoding,expected_force_binary", + [ + pytest.param("false", False, True, id="false_lowercase"), + pytest.param("False", False, True, id="False_capitalized"), + pytest.param("FALSE", False, True, id="FALSE_uppercase"), + ], +) +def test_parse_server_args_encoding_false(arg, expected_encoding, expected_force_binary): + """parse_server_args converts --encoding=false/False to boolean False.""" + with patch("sys.argv", ["test", "--encoding", arg]): + result = parse_server_args() + assert result["encoding"] is expected_encoding + assert result["force_binary"] is expected_force_binary + + @pytest.mark.asyncio async def test_run_server_guarded_shell_wrapping(): """run_server wraps shell with robot_check and pty_fork_limit guards.""" diff --git a/telnetlib3/tests/test_server_shell_unit.py b/telnetlib3/tests/test_server_shell_unit.py index 459873a5..69de3269 100644 --- a/telnetlib3/tests/test_server_shell_unit.py +++ b/telnetlib3/tests/test_server_shell_unit.py @@ -15,6 +15,8 @@ class DummyWriter: + is_binary_writer = False + def __init__(self, slctab=None): self.echos = [] self.slctab = slctab or slc_mod.generate_slctab() @@ -41,6 +43,8 @@ def _run_readline(sequence, max_visible_width=0): class MockReader: + is_binary_reader = False + def __init__(self, data): self._data = list(data) self._idx = 0 @@ -54,6 +58,8 @@ async def read(self, n): class SlowReader: + is_binary_reader = False + async def read(self, n): await asyncio.sleep(1.0) return "" @@ -65,6 +71,8 @@ def __init__(self, never_send_ga=False): class MockWriter: + is_binary_writer = False + def __init__(self, protocol=None): self.written = [] self._closing = False @@ -700,3 +708,372 @@ async def test_filter_ansi_esc_then_eof(): reader = MockReader(["\x1b", ""]) result = await ss.filter_ansi(reader, MockWriter()) assert not result + + +class MockBinaryReader: + is_binary_reader = True + + def __init__(self, data): + self._data = [d.encode("latin-1") if isinstance(d, str) else d for d in data] + self._idx = 0 + + async def read(self, n): + if self._idx >= len(self._data): + return b"" + result = self._data[self._idx] + self._idx += 1 + return result + + +class MockBinaryWriter: + is_binary_writer = True + + def __init__(self): + self.written = [] + + def write(self, data): + assert isinstance(data, bytes), f"expected bytes, got {type(data)}" + self.written.append(data) + + def echo(self, data): + assert isinstance(data, bytes), f"expected bytes, got {type(data)}" + self.written.append(data) + + async def drain(self): + pass + + def is_closing(self): + return False + + +@pytest.mark.parametrize( + "input_chars,expected", + [ + pytest.param([b"\x1b", b"[", b"A", b"x"], "x", id="csi_bytes"), + pytest.param([b"\x1b", b"!", b"x"], "!", id="esc_non_sequence_bytes"), + pytest.param([b"a"], "a", id="normal_char_bytes"), + pytest.param([b""], "", id="eof_bytes"), + pytest.param([b"\x1b", b"O", b"P", b"x"], "x", id="ss3_f1_bytes"), + pytest.param([b"\x1b", b"(", b"B", b"z"], "z", id="charset_bytes"), + pytest.param([b"\x1b", b"[", b"1", b";", b"2", b"H", b"z"], "z", id="csi_params_bytes"), + pytest.param([b"\x1b", b"[", b""], "", id="csi_no_final_bytes"), + pytest.param( + [b"\x1b", b"]", b"0", b";", b"t", b"\x07", b"x"], "x", id="osc_sequence_bytes" + ), + pytest.param([b"\x1b", b"D", b"w"], "w", id="fe_sequence_bytes"), + pytest.param([b"\x1b", b"X", b"v"], "v", id="fe_sos_sequence_bytes"), + pytest.param([b"\x1b", b"P", b"q", b"\x1b", b"\\", b"y"], "y", id="dcs_sequence_bytes"), + pytest.param([b"\x1b", b"[", b"1", b"m", b"z"], "z", id="match_mid_buffer_bytes"), + ], +) +@pytest.mark.asyncio +async def test_filter_ansi_binary_reader(input_chars, expected): + result = await ss.filter_ansi(MockBinaryReader(input_chars), MockBinaryWriter()) + assert result == expected + + +@pytest.mark.parametrize( + "input_chars,expected", + [ + pytest.param([b"h", b"e", b"l", b"l", b"o", b"\r"], "hello", id="basic_bytes"), + pytest.param([b"h", b"x", b"\x7f", b"i", b"\r"], "hi", id="backspace_bytes"), + pytest.param([b"\n", b"\x00", b"a", b"\r"], "a", id="skip_lf_nul_bytes"), + pytest.param([b""], None, id="eof_bytes"), + ], +) +@pytest.mark.asyncio +async def test_readline_async_binary_reader(input_chars, expected): + result = await ss.readline_async(MockBinaryReader(input_chars), MockBinaryWriter()) + assert result == expected + + +def test_readline_binary_writer(): + """Readline (sync) with binary writer writes bytes for echo.""" + writer = MockBinaryWriter() + gen = ss.readline(None, writer, max_visible_width=0) + gen.send(None) + for ch in "hello\r": + out = gen.send(ch) + if out is not None: + assert out == "hello" + assert len(writer.written) > 0 + + +def test_readline_binary_writer_no_echo(): + """Readline (sync) with binary writer: terminators produce no echo.""" + writer = MockBinaryWriter() + gen = ss.readline(None, writer) + gen.send(None) + out = gen.send("\r") + assert out == "" + assert writer.written == [] + + +def test_write_binary_writer(): + writer = MockBinaryWriter() + ss._write(writer, "hello") + assert writer.written == [b"hello"] + + +def test_echo_non_binary_writer(): + writer = MockWriter() + ss._echo(writer, "test") + assert writer.written == ["test"] + + +def test_echo_binary_writer(): + writer = MockBinaryWriter() + ss._echo(writer, "test") + assert writer.written == [b"test"] + + +class MockWriterClosing(MockWriter): + def __init__(self, closing=False): + super().__init__() + self._closing = closing + + +class MockWriterFull(MockWriter): + def __init__(self): + super().__init__() + self.local_option = types.SimpleNamespace(enabled=lambda opt: False) + self.remote_option = types.SimpleNamespace(enabled=lambda opt: False) + self.outbinary = False + self.inbinary = False + self.xon_any = False + self.lflow = True + self.linemode = types.SimpleNamespace( + edit=False, trapsig=False, soft_tab=False, lit_echo=False, ack=False + ) + self._iac_calls = [] + self._send_lineflow_calls = [] + self._request_linemode_calls = [] + + def iac(self, cmd, opt): + self._iac_calls.append((cmd, opt)) + + def send_lineflow_mode(self): + self._send_lineflow_calls.append(True) + + def request_linemode_change(self, edit=None, trapsig=None): + self._request_linemode_calls.append({"edit": edit, "trapsig": trapsig}) + + +def test_do_toggle_no_option(): + writer = MockWriterFull() + result = ss.do_toggle(writer, None) + for opt in ( + "echo", + "goahead", + "outbinary", + "inbinary", + "binary", + "xon-any", + "lflow", + "linemode", + ): + assert opt in result + + +@pytest.mark.parametrize( + "option,expected_substrs,iac_count,lineflow_count", + [ + pytest.param("echo", ["echo"], 1, 0, id="echo"), + pytest.param("goahead", ["go-ahead"], 1, 0, id="goahead"), + pytest.param("outbinary", ["outbinary"], 1, 0, id="outbinary"), + pytest.param("inbinary", ["inbinary"], 1, 0, id="inbinary"), + pytest.param("binary", ["outbinary", "inbinary"], 2, 0, id="binary"), + pytest.param("xon-any", ["xon-any"], 0, 1, id="xon-any"), + pytest.param("lflow", ["lineflow"], 0, 1, id="lflow"), + pytest.param("linemode", ["linemode"], 1, 0, id="linemode"), + ], +) +def test_do_toggle_option(option, expected_substrs, iac_count, lineflow_count): + writer = MockWriterFull() + result = ss.do_toggle(writer, option) + for s in expected_substrs: + assert s in result.lower() + assert len(writer._iac_calls) == iac_count + assert len(writer._send_lineflow_calls) == lineflow_count + + +@pytest.mark.parametrize( + "option,linemode_active,expected_substr", + [ + pytest.param("linemode-edit", False, "linemode not active", id="edit_inactive"), + pytest.param("linemode-edit", True, "linemode-edit dis", id="edit_active"), + pytest.param("linemode-trapsig", False, "linemode not active", id="trapsig_inactive"), + pytest.param("linemode-trapsig", True, "linemode-trapsig dis", id="trapsig_active"), + ], +) +def test_do_toggle_linemode_sub(option, linemode_active, expected_substr): + writer = MockWriterFull() + writer.remote_option = types.SimpleNamespace(enabled=lambda opt: linemode_active) + if linemode_active: + attr = "edit" if "edit" in option else "trapsig" + base = {"edit": False, "trapsig": False, "soft_tab": False, "lit_echo": False, "ack": False} + base[attr] = True + writer.linemode = types.SimpleNamespace(**base) + result = ss.do_toggle(writer, option) + assert expected_substr in result.lower() + + +def test_do_toggle_unknown_option(): + writer = MockWriterFull() + result = ss.do_toggle(writer, "nonexistent") + assert "not an option" in result + + +def test_do_toggle_all(): + writer = MockWriterFull() + result = ss.do_toggle(writer, "all") + for s in ("echo", "go-ahead", "outbinary", "inbinary", "xon-any", "lineflow"): + assert s in result.lower() + assert len(writer._iac_calls) >= 4 + + +@pytest.mark.asyncio +async def test_telnet_server_shell_already_closing(): + writer = MockWriterClosing(closing=True) + reader = MockReader([]) + await ss.telnet_server_shell(reader, writer) + written = "".join(writer.written) + assert "Ready" in written + + +@pytest.mark.asyncio +async def test_telnet_server_shell_eof(): + writer = MockWriter() + reader = MockReader([""]) + await ss.telnet_server_shell(reader, writer) + written = "".join(writer.written) + assert "Ready" in written + + +@pytest.mark.asyncio +async def test_telnet_server_shell_unknown_command(): + writer = MockWriter() + reader = MockReader(list("bogus\r") + list("quit\r")) + await ss.telnet_server_shell(reader, writer) + written = "".join(writer.written) + assert "no such command" in written + + +@pytest.mark.asyncio +async def test_telnet_server_shell_dump_with_close(): + writer = MockWriter() + reader = MockReader(list("dump 0 0 drain close\r")) + await ss.telnet_server_shell(reader, writer) + written = "".join(writer.written) + assert "kb_limit=0" in written + assert "do_close=True" in written + assert "quit" not in written + + +class MockBinaryWriterShell(MockBinaryWriter): + def __init__(self, protocol=None): + super().__init__() + self._closing = False + self._extra = {"peername": ("127.0.0.1", 12345)} + self.protocol = protocol or _MockProtocol() + self.ga_calls = [] + + def get_extra_info(self, key, default=None): + return self._extra.get(key, default) + + def is_closing(self): + return self._closing + + def close(self): + self._closing = True + + def send_ga(self): + self.ga_calls.append(True) + return True + + +@pytest.mark.asyncio +async def test_telnet_server_shell_binary_writer(): + reader = MockBinaryReader([b"quit\r"]) + writer = MockBinaryWriterShell() + await ss.telnet_server_shell(reader, writer) + assert len(writer.written) > 0 + assert all(isinstance(w, bytes) for w in writer.written) + + +@pytest.mark.asyncio +async def test_readline_async_binary_writer_no_echo(): + writer = MockBinaryWriter() + reader = MockBinaryReader([b"\r"]) + result = await ss.readline_async(reader, writer) + assert result == "" + assert writer.written == [] + + +def test_readline_binary_writer_echo_backspace(): + writer = MockBinaryWriter() + gen = ss.readline(None, writer) + gen.send(None) + for ch in "a\x7fb\r": + gen.send(ch) + assert len(writer.written) > 0 + assert all(isinstance(w, bytes) for w in writer.written) + + +@pytest.mark.parametrize( + "input_data,max_len,expected", + [ + pytest.param([b"h", b"i", b"\r"], 100, "hi", id="cr_terminated"), + pytest.param([b"a", b"b"], 100, "ab", id="eof_no_terminator"), + ], +) +@pytest.mark.asyncio +async def test_read_line_inner_bytes(input_data, max_len, expected): + assert await gs._read_line_inner(MockBinaryReader(input_data), max_len) == expected + + +@pytest.mark.parametrize( + "command,expected_substr,setup", + [ + pytest.param("writer", "MockWriter", None, id="writer"), + pytest.param("reader", "MockReader", None, id="reader"), + pytest.param( + "slc", + "Special Line Characters", + lambda w: setattr(w, "slctab", slc_mod.generate_slctab()), + id="slc", + ), + ], +) +@pytest.mark.asyncio +async def test_telnet_server_shell_cmd(command, expected_substr, setup): + writer = MockWriter() + if setup: + setup(writer) + reader = MockReader(list(f"{command}\r")) + await ss.telnet_server_shell(reader, writer) + assert expected_substr in "".join(writer.written) + + +@pytest.mark.asyncio +async def test_filter_ansi_binary_long_sequence(): + reader = MockBinaryReader([b"\x1b", b"["] + [b"0"] * 254 + [b"x"]) + result = await ss.filter_ansi(reader, MockBinaryWriter()) + assert result == "x" + + +@pytest.mark.asyncio +async def test_busy_shell_second_timeout(monkeypatch): + call_count = [0] + + async def mock_read_line(reader, timeout, max_len=gs._MAX_INPUT): + call_count[0] += 1 + return "hello" if call_count[0] == 1 else None + + monkeypatch.setattr(gs, "_read_line", mock_read_line) + writer = MockWriter() + await gs.busy_shell(MockReader([]), writer) + written = "".join(writer.written) + assert "Machine is busy" in written + assert "distant explosion" in written + assert call_count[0] == 2