Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/history.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
History
=======
4.0.4
* bugfix: servers using ``robot_check=True`` with ``encoding=False`` raised ``TypeError: buf
expected bytes, got <class 'str'>``. ``telnetlib3-server`` now also accepts ``--encoding=False``
CLI argument. ``latin1`` encoding is used the default server shell and robot check.

4.0.3
* bugfix: long-running servers leaked memory through :class:`~telnetlib3.server.Server`
``_protocols`` list and ``_new_client`` asyncio.Queue. Both are now bounded
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "telnetlib3"
version = "4.0.3" # Keep in sync with telnetlib3/accessories.py::get_version !
version = "4.0.4" # Keep in sync with telnetlib3/accessories.py::get_version !
description = " Python Telnet server and client CLI and Protocol library"
readme = "README.rst"
license = "ISC"
Expand Down
2 changes: 1 addition & 1 deletion telnetlib3/accessories.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

def get_version() -> str:
"""Return the current version of telnetlib3."""
return "4.0.3" # keep in sync with pyproject.toml !
return "4.0.4" # keep in sync with pyproject.toml !


def encoding_from_lang(lang: str) -> Optional[str]:
Expand Down
44 changes: 24 additions & 20 deletions telnetlib3/guard_shells.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import re
import asyncio
import logging
from typing import Tuple, Union, Optional, Generator, cast
from typing import Tuple, Union, Optional, Generator
from contextlib import contextmanager

# local
Expand Down Expand Up @@ -64,6 +64,14 @@ def _latin1_reading(
reader._decoder = None


def _writer_write(writer: Union[TelnetWriter, TelnetWriterUnicode], data: str) -> None:
"""Write string data to writer in the appropriate type (str or bytes)."""
if writer.is_binary_writer:
writer.write(data.encode("latin-1"))
else:
writer.write(data) # type: ignore[arg-type]


class ConnectionCounter:
"""Simple shared counter for limiting concurrent connections."""

Expand Down Expand Up @@ -100,12 +108,13 @@ def count(self) -> int:

async def _read_line_inner(reader: Union[TelnetReader, TelnetReaderUnicode], max_len: int) -> str:
"""Inner loop for _read_line, separated for wait_for compatibility."""
_reader = cast(TelnetReaderUnicode, reader)
buf = ""
while len(buf) < max_len:
char = await _reader.read(1)
char = await reader.read(1)
if not char:
break
if isinstance(char, bytes):
char = char.decode("latin-1")
if char in ("\r", "\n"):
break
buf += char
Expand Down Expand Up @@ -166,8 +175,7 @@ async def _get_cursor_position(
:returns: (row, col) tuple or (None, None) on timeout/failure.
"""
# Send Device Status Report request
_writer = cast(TelnetWriterUnicode, writer)
_writer.write("\x1b[6n")
_writer_write(writer, "\x1b[6n")
await writer.drain()

# Read response: ESC [ row ; col R
Expand All @@ -189,21 +197,20 @@ async def _measure_width(

:returns: Width in columns, or None on failure.
"""
_writer = cast(TelnetWriterUnicode, writer)
_, x1 = await _get_cursor_position(reader, writer, timeout)
if x1 is None:
return None

_writer.write(text)
await _writer.drain()
_writer_write(writer, text)
await writer.drain()

_, x2 = await _get_cursor_position(reader, writer, timeout)
if x2 is None:
return None

# Clear the test character
_writer.write(f"\x1b[{x1}G" + " " * (x2 - x1) + f"\x1b[{x1}G")
await _writer.drain()
_writer_write(writer, f"\x1b[{x1}G" + " " * (x2 - x1) + f"\x1b[{x1}G")
await writer.drain()

return x2 - x1

Expand All @@ -230,10 +237,9 @@ async def _ask_question(
timeout: float = 10.0,
) -> Optional[str]:
"""Ask a question, echoing input and repeating prompt on blank input."""
_writer = cast(TelnetWriterUnicode, writer)
while True:
_writer.write(prompt)
await _writer.drain()
_writer_write(writer, prompt)
await writer.drain()

line = await _readline_with_echo(reader, writer, timeout)
if line is None:
Expand All @@ -242,7 +248,7 @@ async def _ask_question(
if line.strip():
return line
# Blank input - repeat prompt
_writer.write("\r\n")
_writer_write(writer, "\r\n")


async def robot_shell(
Expand All @@ -254,7 +260,6 @@ async def robot_shell(

Asks philosophical questions, logs responses, and disconnects.
"""
writer = cast(TelnetWriterUnicode, writer)
peername = writer.get_extra_info("peername")
logger.info("robot_shell: connection from %s", peername)

Expand All @@ -275,7 +280,7 @@ async def robot_shell(
return
answers.append(line2)

writer.write("\r\n")
_writer_write(writer, "\r\n")
await writer.drain()
finally:
if answers:
Expand All @@ -291,23 +296,22 @@ async def busy_shell(

Displays busy message, logs any input, and disconnects.
"""
writer = cast(TelnetWriterUnicode, writer)
logger.info("busy_shell: connection from %s (limit reached)", writer.get_extra_info("peername"))

writer.write("Machine is busy, do not touch! ")
_writer_write(writer, "Machine is busy, do not touch! ")
await writer.drain()

with _latin1_reading(reader):
line1 = await _read_line(reader, timeout=30.0)
if line1 is not None:
logger.info("busy_shell: input1=%r", line1)

writer.write("\r\nYou hear a distant explosion... ")
_writer_write(writer, "\r\nYou hear a distant explosion... ")
await writer.drain()

line2 = await _read_line(reader, timeout=30.0)
if line2 is not None:
logger.info("busy_shell: input2=%r", line2)

writer.write("\r\n")
_writer_write(writer, "\r\n")
await writer.drain()
16 changes: 12 additions & 4 deletions telnetlib3/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ def begin_advanced_negotiation(self) -> None:
self.writer.iac(DO, LINEMODE)

def _negotiate_echo(self) -> None:
"""Skip ``WILL ECHO`` LINEMODE EDIT client handles local echo."""
"""Skip ``WILL ECHO``: LINEMODE EDIT client handles local echo."""
if self._echo_negotiated:
return
self._echo_negotiated = True
Expand Down Expand Up @@ -1297,7 +1297,12 @@ def parse_server_args(
default=_config.connect_maxwait,
help="timeout for pending negotiation",
)
parser.add_argument("--encoding", default=_config.encoding, help="encoding name")
parser.add_argument(
"--encoding",
default=_config.encoding,
type=lambda val: False if val.lower() == "false" else val,
help="encoding name, or 'false'/'False' to disable unicode",
)
parser.add_argument(
"--force-binary",
action="store_true",
Expand Down Expand Up @@ -1398,9 +1403,12 @@ def parse_server_args(
result["pty_raw"] = False

# Auto-enable force_binary for any non-ASCII encoding that uses high-bit bytes.
enc_key = result["encoding"].lower().replace("-", "_")
if enc_key not in ("us_ascii", "ascii"):
if result["encoding"] is False:
result["force_binary"] = True
else:
enc_key = result["encoding"].lower().replace("-", "_")
if enc_key not in ("us_ascii", "ascii"):
result["force_binary"] = True

# Build SSLContext from --ssl-certfile / --ssl-keyfile
ssl_certfile = result.pop("ssl_certfile", None)
Expand Down
Loading
Loading