diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 22080c95b..d7f3464ff 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -32,7 +32,9 @@ jobs: uvx typos . - name: Unit test run: | - uvx --with . --with pytest coverage run -m pytest tests/ + uvx --with . --with pytest coverage[toml] run -m pytest tests/ + uvx coverage[toml] combine + uvx coverage[toml] report - name: Type Checking if: ${{ matrix.python-version != '3.8' }} run: | diff --git a/pyproject.toml b/pyproject.toml index 1d4e169b0..26ac7a1f7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,7 +22,7 @@ classifiers = [ ] requires-python = ">=3.8" dependencies = [ - "wcwidth>=0.1.4", + "wcwidth>=0.5.0", ] [project.urls] @@ -86,6 +86,9 @@ extend-ignore-re = [ # Lorem ipsum. "Nam", "varius", + # Partial words in grapheme clustering tests (niño, café). + "nin", + "caf", ] locale = 'en-us' # US English. @@ -118,6 +121,10 @@ warn_return_any = true warn_unused_configs = true warn_unused_ignores = true +[tool.coverage.run] +source = ["src/prompt_toolkit"] +parallel = true + [build-system] requires = ["setuptools>=68"] build-backend = "setuptools.build_meta" diff --git a/src/prompt_toolkit/buffer.py b/src/prompt_toolkit/buffer.py index f5847d4ab..f14cb44ca 100644 --- a/src/prompt_toolkit/buffer.py +++ b/src/prompt_toolkit/buffer.py @@ -18,6 +18,8 @@ from functools import wraps from typing import Any, Callable, Coroutine, Iterable, TypeVar, cast +import wcwidth + from .application.current import get_app from .application.run_in_terminal import run_in_terminal from .auto_suggest import AutoSuggest, Suggestion @@ -764,20 +766,24 @@ def auto_down( def delete_before_cursor(self, count: int = 1) -> str: """ - Delete specified number of characters before cursor and return the - deleted text. + Delete specified number of grapheme clusters before cursor and return + the deleted text. """ assert count >= 0 deleted = "" if self.cursor_position > 0: - deleted = self.text[self.cursor_position - count : self.cursor_position] - - new_text = ( - self.text[: self.cursor_position - count] - + self.text[self.cursor_position :] - ) - new_cursor_position = self.cursor_position - len(deleted) + # Find position after deleting `count` grapheme clusters. + # Loop is required since grapheme clusters have variable length. + pos = self.cursor_position + for _ in range(count): + if pos <= 0: + break + pos = wcwidth.grapheme_boundary_before(self.text, pos) + + deleted = self.text[pos : self.cursor_position] + new_text = self.text[:pos] + self.text[self.cursor_position :] + new_cursor_position = pos # Set new Document atomically. self.document = Document(new_text, new_cursor_position) @@ -786,14 +792,19 @@ def delete_before_cursor(self, count: int = 1) -> str: def delete(self, count: int = 1) -> str: """ - Delete specified number of characters and Return the deleted text. + Delete specified number of grapheme clusters and return the deleted text. """ if self.cursor_position < len(self.text): - deleted = self.document.text_after_cursor[:count] - self.text = ( - self.text[: self.cursor_position] - + self.text[self.cursor_position + len(deleted) :] - ) + # Find position after `count` grapheme clusters. + text_after = self.text[self.cursor_position :] + pos = 0 + for i, grapheme in enumerate(wcwidth.iter_graphemes(text_after)): + if i >= count: + break + pos += len(grapheme) + + deleted = text_after[:pos] + self.text = self.text[: self.cursor_position] + text_after[pos:] return deleted else: return "" diff --git a/src/prompt_toolkit/document.py b/src/prompt_toolkit/document.py index d2657a50e..40c508e27 100644 --- a/src/prompt_toolkit/document.py +++ b/src/prompt_toolkit/document.py @@ -10,6 +10,8 @@ import weakref from typing import Callable, Dict, Iterable, List, NoReturn, Pattern, cast +import wcwidth + from .clipboard import ClipboardData from .filters import vi_mode from .selection import PasteMode, SelectionState, SelectionType @@ -158,13 +160,49 @@ def selection(self) -> SelectionState | None: @property def current_char(self) -> str: - """Return character under cursor or an empty string.""" - return self._get_char_relative_to_cursor(0) or "" + """ + Return grapheme cluster at cursor position, or empty string at end. + + Note: Returns a grapheme cluster which may contain multiple code points. + If cursor is inside a grapheme cluster (e.g., on a combining character), + returns the complete grapheme containing the cursor. + """ + if self.cursor_position >= len(self.text): + return "" + grapheme_start = wcwidth.grapheme_boundary_before( + self.text, self.cursor_position + 1 + ) + for g in wcwidth.iter_graphemes(self.text[grapheme_start:]): + return g + return "" @property def char_before_cursor(self) -> str: - """Return character before the cursor or an empty string.""" - return self._get_char_relative_to_cursor(-1) or "" + """ + Return grapheme cluster before the cursor, or empty string at start. + + Note: Returns a grapheme cluster which may contain multiple code points. + If cursor is inside a grapheme cluster (e.g., on a combining character), + returns the grapheme before the one containing the cursor. + """ + if self.cursor_position == 0: + return "" + + text = self.text + cursor = self.cursor_position + + # Find reference point: cursor position or start of containing grapheme. + if cursor >= len(text): + reference = len(text) + else: + grapheme_start = wcwidth.grapheme_boundary_before(text, cursor + 1) + reference = grapheme_start if grapheme_start < cursor else cursor + + if reference == 0: + return "" + + prev_start = wcwidth.grapheme_boundary_before(text, reference) + return text[prev_start:reference] @property def text_before_cursor(self) -> str: @@ -251,15 +289,6 @@ def leading_whitespace_in_current_line(self) -> str: length = len(current_line) - len(current_line.lstrip()) return current_line[:length] - def _get_char_relative_to_cursor(self, offset: int = 0) -> str: - """ - Return character relative to cursor position, or empty string - """ - try: - return self.text[self.cursor_position + offset] - except IndexError: - return "" - @property def on_first_line(self) -> bool: """ @@ -692,21 +721,44 @@ def find_previous_matching_line( def get_cursor_left_position(self, count: int = 1) -> int: """ - Relative position for cursor left. + Relative position for cursor left (grapheme cluster aware). """ if count < 0: return self.get_cursor_right_position(-count) - return -min(self.cursor_position_col, count) + line_before = self.current_line_before_cursor + if not line_before: + return 0 + + pos = len(line_before) + for _ in range(count): + if pos <= 0: + break + new_pos = wcwidth.grapheme_boundary_before(line_before, pos) + if new_pos == pos: + break + pos = new_pos + + return pos - len(line_before) def get_cursor_right_position(self, count: int = 1) -> int: """ - Relative position for cursor_right. + Relative position for cursor right (grapheme cluster aware). """ if count < 0: return self.get_cursor_left_position(-count) - return min(count, len(self.current_line_after_cursor)) + line_after = self.current_line_after_cursor + if not line_after: + return 0 + + pos = 0 + for i, grapheme in enumerate(wcwidth.iter_graphemes(line_after)): + if i >= count: + break + pos += len(grapheme) + + return pos def get_cursor_up_position( self, count: int = 1, preferred_column: int | None = None diff --git a/src/prompt_toolkit/formatted_text/utils.py b/src/prompt_toolkit/formatted_text/utils.py index a6f78cb4e..4dfcfcf7c 100644 --- a/src/prompt_toolkit/formatted_text/utils.py +++ b/src/prompt_toolkit/formatted_text/utils.py @@ -9,7 +9,7 @@ from typing import Iterable, cast -from prompt_toolkit.utils import get_cwidth +import wcwidth from .base import ( AnyFormattedText, @@ -48,17 +48,15 @@ def fragment_list_len(fragments: StyleAndTextTuples) -> int: def fragment_list_width(fragments: StyleAndTextTuples) -> int: """ Return the character width of this text fragment list. - (Take double width characters into account.) + (Take double width characters and grapheme clusters into account.) :param fragments: List of ``(style_str, text)`` or ``(style_str, text, mouse_handler)`` tuples. """ - ZeroWidthEscape = "[ZeroWidthEscape]" return sum( - get_cwidth(c) + wcwidth.width(item[1], control_codes="ignore") for item in fragments - for c in item[1] - if ZeroWidthEscape not in item[0] + if "[ZeroWidthEscape]" not in item[0] ) diff --git a/src/prompt_toolkit/layout/containers.py b/src/prompt_toolkit/layout/containers.py index f6fe381f5..ffabde018 100644 --- a/src/prompt_toolkit/layout/containers.py +++ b/src/prompt_toolkit/layout/containers.py @@ -10,6 +10,8 @@ from functools import partial from typing import TYPE_CHECKING, Callable, Sequence, Union, cast +import wcwidth + from prompt_toolkit.application.current import get_app from prompt_toolkit.cache import SimpleCache from prompt_toolkit.data_structures import Point @@ -2014,7 +2016,7 @@ def copy_line( new_screen.zero_width_escapes[y + ypos][x + xpos] += text continue - for c in text: + for c in wcwidth.iter_graphemes(text): char = _CHAR_CACHE[c, style] char_width = char.width @@ -2052,26 +2054,7 @@ def copy_line( for i in range(1, char_width): new_buffer_row[x + xpos + i] = empty_char - # If this is a zero width characters, then it's - # probably part of a decomposed unicode character. - # See: https://en.wikipedia.org/wiki/Unicode_equivalence - # Merge it in the previous cell. - elif char_width == 0: - # Handle all character widths. If the previous - # character is a multiwidth character, then - # merge it two positions back. - for pw in [2, 1]: # Previous character width. - if ( - x - pw >= 0 - and new_buffer_row[x + xpos - pw].width == pw - ): - prev_char = new_buffer_row[x + xpos - pw] - char2 = _CHAR_CACHE[ - prev_char.char + c, prev_char.style - ] - new_buffer_row[x + xpos - pw] = char2 - - # Keep track of write position for each character. + # Keep track of write position for each grapheme. current_rowcol_to_yx[lineno, col + skipped] = ( y + ypos, x + xpos, diff --git a/src/prompt_toolkit/layout/controls.py b/src/prompt_toolkit/layout/controls.py index 5083c8286..fd6f71dcb 100644 --- a/src/prompt_toolkit/layout/controls.py +++ b/src/prompt_toolkit/layout/controls.py @@ -8,6 +8,8 @@ from abc import ABCMeta, abstractmethod from typing import TYPE_CHECKING, Callable, Hashable, Iterable, NamedTuple +import wcwidth + from prompt_toolkit.application.current import get_app from prompt_toolkit.buffer import Buffer from prompt_toolkit.cache import SimpleCache @@ -674,19 +676,29 @@ def transform( ) -> _ProcessedLine: "Transform the fragments for a given line number." - # Get cursor position at this line. - def source_to_display(i: int) -> int: - """X position from the buffer to the x position in the - processed fragment list. By default, we start from the 'identity' - operation.""" - return i + # Build code point to grapheme index mapping for cursor positioning. + line_text = fragment_list_to_text(fragments) + codepoint_to_grapheme: dict[int, int] = {} + grapheme_idx = 0 + codepoint_idx = 0 + for grapheme in wcwidth.iter_graphemes(line_text): + for _ in grapheme: + codepoint_to_grapheme[codepoint_idx] = grapheme_idx + codepoint_idx += 1 + grapheme_idx += 1 + + def grapheme_source_to_display(i: int) -> int: + """Map code point index to grapheme index.""" + if i >= codepoint_idx: + return grapheme_idx + (i - codepoint_idx) + return codepoint_to_grapheme.get(i, grapheme_idx) transformation = merged_processor.apply_transformation( TransformationInput( self, document, lineno, - source_to_display, + grapheme_source_to_display, fragments, width, height, @@ -694,9 +706,15 @@ def source_to_display(i: int) -> int: ) ) + # Compose grapheme mapping with processor transformations. + proc_s2d = transformation.source_to_display + + def final_source_to_display(i: int) -> int: + return proc_s2d(grapheme_source_to_display(i)) + return _ProcessedLine( transformation.fragments, - transformation.source_to_display, + final_source_to_display, transformation.display_to_source, ) diff --git a/src/prompt_toolkit/layout/utils.py b/src/prompt_toolkit/layout/utils.py index 373fe52a5..161a8c170 100644 --- a/src/prompt_toolkit/layout/utils.py +++ b/src/prompt_toolkit/layout/utils.py @@ -2,6 +2,8 @@ from typing import TYPE_CHECKING, Iterable, List, TypeVar, cast, overload +import wcwidth + from prompt_toolkit.formatted_text.base import OneStyleAndTextTuple if TYPE_CHECKING: @@ -60,7 +62,7 @@ def __setitem__( def explode_text_fragments(fragments: Iterable[_T]) -> _ExplodedList[_T]: """ Turn a list of (style_str, text) tuples into another list where each string is - exactly one character. + exactly one grapheme cluster. It should be fine to call this function several times. Calling this on a list that is already exploded, is a null operation. @@ -74,7 +76,7 @@ def explode_text_fragments(fragments: Iterable[_T]) -> _ExplodedList[_T]: result: list[_T] = [] for style, string, *rest in fragments: - for c in string: - result.append((style, c, *rest)) # type: ignore + for grapheme in wcwidth.iter_graphemes(string): + result.append((style, grapheme, *rest)) # type: ignore return _ExplodedList(result) diff --git a/src/prompt_toolkit/utils.py b/src/prompt_toolkit/utils.py index 1a99a2868..8daea7e60 100644 --- a/src/prompt_toolkit/utils.py +++ b/src/prompt_toolkit/utils.py @@ -11,16 +11,19 @@ Dict, Generator, Generic, + Iterator, TypeVar, Union, ) -from wcwidth import wcwidth +import wcwidth __all__ = [ "Event", "DummyContext", "get_cwidth", + "iter_grapheme_clusters", + "grapheme_cluster_count", "suspend_to_background_supported", "is_conemu_ansi", "is_windows", @@ -138,15 +141,7 @@ def __init__(self) -> None: self._long_strings: deque[str] = deque() def __missing__(self, string: str) -> int: - # Note: We use the `max(0, ...` because some non printable control - # characters, like e.g. Ctrl-underscore get a -1 wcwidth value. - # It can be possible that these characters end up in the input - # text. - result: int - if len(string) == 1: - result = max(0, wcwidth(string)) - else: - result = sum(self[c] for c in string) + result = wcwidth.width(string, control_codes="ignore") # Store in cache. self[string] = result @@ -175,6 +170,20 @@ def get_cwidth(string: str) -> int: return _CHAR_SIZES_CACHE[string] +def iter_grapheme_clusters(text: str) -> Iterator[str]: + """ + Iterate over grapheme clusters in text. Wrapper around ``wcwidth.iter_graphemes``. + """ + return wcwidth.iter_graphemes(text) + + +def grapheme_cluster_count(text: str) -> int: + """ + Return the number of grapheme clusters in text. + """ + return sum(1 for _ in wcwidth.iter_graphemes(text)) + + def suspend_to_background_supported() -> bool: """ Returns `True` when the Python implementation supports diff --git a/tests/pty_accessories.py b/tests/pty_accessories.py new file mode 100644 index 000000000..a95868637 --- /dev/null +++ b/tests/pty_accessories.py @@ -0,0 +1,165 @@ +"""PTY testing utilities, ported from 'blessed' by Jeff Quast.""" + +from __future__ import annotations + +import codecs +import contextlib +import os +import platform +import signal +import struct +import sys +import time +import warnings + +IS_WINDOWS = platform.system() == "Windows" + + +def init_subproc_coverage(run_note: str | None = None): + """ + Initialize coverage tracking in a forked subprocess. + + Ported from blessed library's test accessories. Call this at the start + of any script executed via PTY fork/exec to enable coverage tracking. + + :param run_note: Optional note for coverage context (unused). + :returns: Coverage instance or None if coverage not available. + """ + try: + import coverage + except ImportError: + return None + + # Look for pyproject.toml or tox.ini as coverage config + test_dir = os.path.dirname(__file__) + for config_name in ("pyproject.toml", "tox.ini"): + config_path = os.path.join(test_dir, os.pardir, config_name) + if os.path.exists(config_path): + break + else: + config_path = None + + cov = coverage.Coverage(config_file=config_path) + cov.start() + return cov + + +if not IS_WINDOWS: + import fcntl + import pty + import termios + +# note how the tty driver translates '\n' output to '\r\n' +SEND_SEMAPHORE = b"SEMAPHORE\n" +RECV_SEMAPHORE = b"SEMAPHORE\r\n" + + +def _setwinsize(fd: int, rows: int, cols: int) -> None: + """Set PTY window size via TIOCSWINSZ ioctl.""" + TIOCSWINSZ = getattr(termios, "TIOCSWINSZ", -2146929561) + fcntl.ioctl(fd, TIOCSWINSZ, struct.pack("HHHH", rows, cols, 0, 0)) + + +def read_until_marker(fd: int, marker: str, timeout: float = 5.0) -> str: + """Read from fd until marker found or timeout.""" + decoder = codecs.getincrementaldecoder("utf8")() + output = "" + start = time.time() + while marker not in output: + if time.time() - start > timeout: + raise TimeoutError(f"Marker {marker!r} not found. Got: {output!r}") + try: + chunk = os.read(fd, 1) + except OSError: + break + if not chunk: + break + output += decoder.decode(chunk, final=False) + return output + + +@contextlib.contextmanager +def echo_off(fd: int): + """Disable PTY echo.""" + attrs = termios.tcgetattr(fd) + try: + attrs[3] = attrs[3] & ~termios.ECHO + termios.tcsetattr(fd, termios.TCSANOW, attrs) + yield + finally: + attrs[3] = attrs[3] | termios.ECHO + termios.tcsetattr(fd, termios.TCSANOW, attrs) + + +def spawn_pty_process(script: str, rows: int = 24, cols: int = 80) -> tuple[int, int]: + """Spawn script in PTY with given size. Returns (master_fd, pid).""" + with warnings.catch_warnings(): + # modern python 3.14+ raises a DeprecationWarning, I guess they may plan to delete pty + # module someday and we will have to manage our own backport? + warnings.filterwarnings("ignore", category=DeprecationWarning) + pid, master_fd = pty.fork() + if pid == 0: + # note how sys.executable is used, to ensure the given script is executed with exactly the + # same python interpreter as used for the parent process, receiving all of its environment + # variables, site-packages, PATH and PYTHONPATH that got it here. + os.execv(sys.executable, [sys.executable, script]) + attrs = termios.tcgetattr(master_fd) + attrs[3] = attrs[3] & ~termios.ECHO + termios.tcsetattr(master_fd, termios.TCSANOW, attrs) + _setwinsize(master_fd, rows, cols) + return master_fd, pid + + +def cleanup_child(pid: int, master_fd: int, timeout: float = 5.0) -> int: + """Wait for child, kill if needed. Returns exit status.""" + start = time.time() + while True: + result, status = os.waitpid(pid, os.WNOHANG) + if result != 0: + # if the child is not ready to exit, send EOF, causes most programs to exit. + os.close(master_fd) + return os.WEXITSTATUS(status) + if time.time() - start > timeout: + # but after timeout, we have a "locked up" client, "not responding", most likely we made + # an error in our "call, reply" pattern of the tests. Kill the program so that the + # MainProcess can become unblocked reading their side of the pty and move on. + try: + os.kill(pid, signal.SIGKILL) + os.waitpid(pid, 0) + except OSError: + pass + os.close(master_fd) + raise TimeoutError(f"Child {pid} did not exit within {timeout}s") + time.sleep(0.05) + + +def extract_output(text: str, start: str = "OUTPUT:", end: str = ":END") -> str: + """Extract text between markers.""" + if start not in text: + return "" + after = text.split(start, 1)[1] + return after.split(end, 1)[0] if end in after else after + + +@contextlib.contextmanager +def pty_session(script: str, rows: int = 24, cols: int = 80): + """ + Context manager for PTY test sessions. + + Spawns pty_repl.py, waits for READY, yields master_fd, then sends QUIT + and cleans up the child process. + + Usage:: + + with pty_session(repl_script, rows=24, cols=80) as fd: + os.write(fd, b"some input\\r") + output = read_until_marker(fd, ":END") + """ + master_fd, pid = spawn_pty_process(script, rows, cols) + try: + read_until_marker(master_fd, "READY") + yield master_fd + finally: + os.write(master_fd, b"QUIT\r") + time.sleep(0.1) + cleanup_child(pid, master_fd) diff --git a/tests/pty_repl.py b/tests/pty_repl.py new file mode 100644 index 000000000..08fe99d31 --- /dev/null +++ b/tests/pty_repl.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +""" +Minimal REPL for PTY integration tests. + +This offers code coverage without mocks, but using real tty features of the natural "live" call to +PromptSession() and session.prompt. prompt_toolkit sees a real terminal through use of shared +pty_accessories module. +""" + +from __future__ import annotations + +import os +import sys +import termios +import tty + +from pty_accessories import init_subproc_coverage + +from prompt_toolkit import PromptSession +from prompt_toolkit.input import create_input +from prompt_toolkit.output import create_output + + +def main() -> None: + """Run REPL: SIZE/TERMIOS/QUIT commands, else echo with OUTPUT:...:END.""" + # Signal readiness before creating session (avoids race with PTY setup) + os.write(sys.stdout.fileno(), b"READY\n") + + # Use natural stdin/stdout - a PTY is indistinguishable from real tty a pty provides the same + # facility as a real terminal "emulator", allowing us to write tests (and coverage) + # for natural "live" calls of PromptSession(). + session = PromptSession(input=create_input(), output=create_output()) + + try: + while True: + result = session.prompt("> ") + cmd = result.strip().upper() + + if cmd == "SIZE": + size = os.get_terminal_size(sys.stdin.fileno()) + os.write( + sys.stdout.fileno(), + f"SIZE:{size.lines}x{size.columns}:END\n".encode(), + ) + elif cmd == "TERMIOS": + attrs = termios.tcgetattr(sys.stdin.fileno()) + lflag, iflag = attrs[tty.LFLAG], attrs[tty.IFLAG] + flags = { + "ECHO": bool(lflag & termios.ECHO), + "ICANON": bool(lflag & termios.ICANON), + "ISIG": bool(lflag & termios.ISIG), + "IEXTEN": bool(lflag & termios.IEXTEN), + "ICRNL": bool(iflag & termios.ICRNL), + "IXON": bool(iflag & termios.IXON), + "VMIN": attrs[tty.CC][termios.VMIN], + } + flag_str = ",".join(f"{k}={v}" for k, v in sorted(flags.items())) + os.write(sys.stdout.fileno(), f"TERMIOS:{flag_str}:END\n".encode()) + elif cmd in ("QUIT", "EXIT"): + break + else: + os.write(sys.stdout.fileno(), f"OUTPUT:{result}:END\n".encode()) + break # Single-shot mode for grapheme tests + except (EOFError, KeyboardInterrupt): + pass + + +if __name__ == "__main__": + cov = init_subproc_coverage("pty_repl") + try: + main() + finally: + if cov is not None: + cov.stop() + cov.save() diff --git a/tests/test_formatted_text.py b/tests/test_formatted_text.py index a111a7f20..72ba69164 100644 --- a/tests/test_formatted_text.py +++ b/tests/test_formatted_text.py @@ -9,7 +9,7 @@ merge_formatted_text, to_formatted_text, ) -from prompt_toolkit.formatted_text.utils import split_lines +from prompt_toolkit.formatted_text.utils import fragment_list_width, split_lines def test_basic_html(): @@ -336,3 +336,15 @@ def test_split_lines_4(): [("class:a", "line1")], [("class:a", "")], ] + + +def test_fragment_list_width(): + family = "\U0001f468\u200d\U0001f469\u200d\U0001f467" # ZWJ sequence + heart = "\u2764\ufe0f" # VS-16 emoji + assert fragment_list_width([("", "hello")]) == 5 + assert fragment_list_width([("", family)]) == 2 + assert fragment_list_width([("", heart)]) == 2 + + +def test_fragment_list_width_zero_width_escape(): + assert fragment_list_width([("[ZeroWidthEscape]", "arbitrary")]) == 0 diff --git a/tests/test_pty_basic.py b/tests/test_pty_basic.py new file mode 100644 index 000000000..f407bcbb7 --- /dev/null +++ b/tests/test_pty_basic.py @@ -0,0 +1,59 @@ +r"""PTY-based tests for terminal size and termios state.""" + +from __future__ import annotations + +import fcntl +import os +import platform +import struct +import termios +import time + +import pytest +from pty_accessories import extract_output, pty_session, read_until_marker + +pytestmark = pytest.mark.skipif( + platform.system() == "Windows", reason="PTY tests not supported on Windows" +) + + +@pytest.fixture +def repl_script(): + return os.path.join(os.path.dirname(__file__), "pty_repl.py") + + +def _setwinsize(fd: int, rows: int, cols: int) -> None: + TIOCSWINSZ = getattr(termios, "TIOCSWINSZ", -2146929561) + fcntl.ioctl(fd, TIOCSWINSZ, struct.pack("HHHH", rows, cols, 0, 0)) + + +def _get_size(fd: int) -> tuple[int, int]: + # manage call and reply of 'SIZE' command to pty_repl.py + os.write(fd, b"SIZE\r") + size_str = extract_output(read_until_marker(fd, ":END"), "SIZE:", ":END") + r, c = size_str.split("x") + return int(r), int(c) + + +@pytest.mark.parametrize("rows,cols", [(25, 80), (3, 10), (100, 200), (1, 1)]) +def test_size_detection(repl_script, rows, cols): + with pty_session(repl_script, rows=rows, cols=cols) as fd: + assert _get_size(fd) == (rows, cols) + + +def test_dynamic_size_change(repl_script): + with pty_session(repl_script, rows=24, cols=80) as fd: + assert _get_size(fd) == (24, 80) + # resize and verify each change is detected + for target in [(40, 120), (10, 40), (3, 10)]: + _setwinsize(fd, *target) + time.sleep(0.05) + assert _get_size(fd) == target + + +def test_termios_flags(repl_script): + with pty_session(repl_script) as fd: + os.write(fd, b"TERMIOS\r") + flags_str = extract_output(read_until_marker(fd, ":END"), "TERMIOS:", ":END") + flags = dict(pair.split("=") for pair in flags_str.split(",")) + assert all(k in flags for k in ("ECHO", "ICANON", "ISIG", "VMIN")) diff --git a/tests/test_pty_grapheme.py b/tests/test_pty_grapheme.py new file mode 100644 index 000000000..4c69ace94 --- /dev/null +++ b/tests/test_pty_grapheme.py @@ -0,0 +1,106 @@ +r"""PTY-based grapheme clustering tests. + +Tests that cursor movement and editing operations respect grapheme cluster +boundaries for complex Unicode sequences like emoji with ZWJ, skin tones, +combining characters, and regional indicators. + +The pty helpers are designed for this particular test, while improving support +for grapheme clustering, so many errors were only found by interactive testing, +but systematic to test--just move the cursor, inserting, and erasing text +and test the desired result. +""" + +from __future__ import annotations + +import os +import platform +import time + +import pytest +from pty_accessories import extract_output, pty_session, read_until_marker + +pytestmark = pytest.mark.skipif( + platform.system() == "Windows", reason="PTY tests not supported on Windows" +) + +# Key sequences +LEFT = "\x1b[D" # cursor left +RIGHT = "\x1b[C" # cursor right +HOME = "\x1b[H" # home +BS = "\x7f" # backspace +DEL = "\x1b[3~" # forward delete +ENTER = "\r" + +# Grapheme clusters covering major Unicode complexity classes +GRAPHEMES = [ + # U+1F468 U+200D U+1F469 U+200D U+1F467 + ( + "\U0001f468\u200d\U0001f469\u200d\U0001f467", + "zwj_family", + ), + # U+2764 U+FE0F (VS-16) + ("\u2764\ufe0f", "vs16_heart"), + # U+1F1E8 U+1F1E6 (C+A regional indicators) + ("\U0001f1e8\U0001f1e6", "flag_ca"), + # U+0065 U+0301 + ("e\u0301", "combining_acute"), + # U+1100 U+1161 + ("\u1100\u1161", "hangul_lv"), + # Devanagari conjunct + ("\u0915\u094d\u0937\u093f", "devanagari"), + # U+1F44B U+1F3FB + ("\U0001f44b\U0001f3fb", "skin_tone"), +] + + +@pytest.fixture +def repl_script(): + return os.path.join(os.path.dirname(__file__), "pty_repl.py") + + +@pytest.mark.parametrize("grapheme,name", GRAPHEMES) +def test_backspace_deletes_grapheme(repl_script, grapheme, name): + """Type 3 graphemes, backspace twice, verify 1 remains.""" + with pty_session(repl_script, rows=3, cols=40) as fd: + os.write(fd, (grapheme * 3).encode()) + time.sleep(0.1) + os.write(fd, (BS * 2 + ENTER).encode()) + assert extract_output(read_until_marker(fd, ":END")) == grapheme + + +@pytest.mark.parametrize("grapheme,name", GRAPHEMES) +def test_cursor_movement_respects_grapheme(repl_script, grapheme, name): + """Type 3 graphemes, LEFT, insert 'x' -> pattern is 2+x+1.""" + with pty_session(repl_script, rows=3, cols=40) as fd: + os.write(fd, (grapheme * 3).encode()) + time.sleep(0.1) + os.write(fd, (LEFT + "x" + ENTER).encode()) + assert ( + extract_output(read_until_marker(fd, ":END")) + == grapheme * 2 + "x" + grapheme + ) + + +@pytest.mark.parametrize("grapheme,name", GRAPHEMES) +def test_forward_delete_removes_grapheme(repl_script, grapheme, name): + """Type 3 graphemes, HOME, DELETE -> 2 remain.""" + with pty_session(repl_script, rows=3, cols=40) as fd: + os.write(fd, (grapheme * 3).encode()) + time.sleep(0.1) + os.write(fd, (HOME + DEL + ENTER).encode()) + assert extract_output(read_until_marker(fd, ":END")) == grapheme * 2 + + +@pytest.mark.parametrize( + "grapheme,name", [GRAPHEMES[0], GRAPHEMES[2]] +) # ZWJ family, CA flag +def test_grapheme_in_tiny_window(repl_script, grapheme, name): + """Type 10 graphemes in 10-col window, backspace 5, verify 5 remain.""" + with pty_session(repl_script, rows=3, cols=10) as fd: + os.write(fd, (grapheme * 10).encode()) + time.sleep(0.2) + os.write(fd, (BS * 5 + ENTER).encode()) + time.sleep(0.2) + assert ( + extract_output(read_until_marker(fd, ":END", timeout=10.0)) == grapheme * 5 + ) diff --git a/tests/test_wcwidth_integration.py b/tests/test_wcwidth_integration.py new file mode 100644 index 000000000..656e44a8a --- /dev/null +++ b/tests/test_wcwidth_integration.py @@ -0,0 +1,167 @@ +from prompt_toolkit.document import Document +from prompt_toolkit.layout.utils import explode_text_fragments +from prompt_toolkit.utils import ( + get_cwidth, + grapheme_cluster_count, + iter_grapheme_clusters, +) + +# Test constants +FAMILY = "\U0001f468\u200d\U0001f469\u200d\U0001f467" # ZWJ sequence +FLAG = "\U0001f1fa\U0001f1f8" # Regional indicators +CAFE = "cafe\u0301" # Combining accent +NINO = "nin\u0303o" # n + i + n + combining tilde + o = niño + + +def test_get_cwidth(): + # ASCII + assert get_cwidth("") == 0 + assert get_cwidth("hello") == 5 + + # CJK wide characters + assert get_cwidth("\u4e2d") == 2 + assert get_cwidth("\u4e2d\u6587") == 4 + + # Emoji sequences (ZWJ, flags, skin tones, VS-16) + assert get_cwidth(FAMILY) == 2 + assert get_cwidth(FLAG) == 2 + assert get_cwidth("\U0001f44b\U0001f3fb") == 2 # skin tone + assert get_cwidth("\u2764\ufe0f") == 2 # VS-16 + + # Combining characters + assert get_cwidth("e\u0301") == 1 + assert get_cwidth(CAFE) == 4 + + +def test_grapheme_cluster_iteration(): + assert list(iter_grapheme_clusters("hello")) == ["h", "e", "l", "l", "o"] + assert list(iter_grapheme_clusters(FAMILY)) == [FAMILY] + assert list(iter_grapheme_clusters(FLAG)) == [FLAG] + assert list(iter_grapheme_clusters(CAFE)) == ["c", "a", "f", "e\u0301"] + + +def test_grapheme_cluster_count(): + assert grapheme_cluster_count("hello") == 5 + assert grapheme_cluster_count(FAMILY) == 1 + assert grapheme_cluster_count(CAFE) == 4 + + +def test_cursor_right_grapheme(): + # ASCII unchanged + assert Document("hello", 0).get_cursor_right_position() == 1 + assert Document("hello", 0).get_cursor_right_position(2) == 2 + + # Skips entire grapheme cluster + assert Document(FAMILY + "x", 0).get_cursor_right_position() == len(FAMILY) + assert Document(FLAG + "x", 0).get_cursor_right_position() == len(FLAG) + + # At position 3, 'e\u0301' is one grapheme but 2 code points + assert Document(CAFE, 3).get_cursor_right_position() == 2 + + +def test_cursor_left_grapheme(): + # ASCII unchanged + assert Document("hello", 5).get_cursor_left_position() == -1 + assert Document("hello", 5).get_cursor_left_position(2) == -2 + + # Skips entire grapheme cluster + assert Document(FAMILY + "x", len(FAMILY)).get_cursor_left_position() == -len( + FAMILY + ) + assert Document(FLAG + "x", len(FLAG)).get_cursor_left_position() == -len(FLAG) + + # 'e\u0301' is one grapheme but 2 code points, so -2 + assert Document(CAFE, len(CAFE)).get_cursor_left_position() == -2 + + +def test_current_char_grapheme(): + assert Document(FAMILY + "x", 0).current_char == FAMILY + assert Document(CAFE, 3).current_char == "e\u0301" # position 3 = 'e' + accent + + +def test_current_char_inside_grapheme(): + """Cursor on combining tilde returns full grapheme.""" + assert Document(NINO, 3).current_char == "n\u0303" + + +def test_current_char_at_end(): + """Cursor at end of text returns empty string.""" + assert Document("hello", 5).current_char == "" + assert Document("", 0).current_char == "" + + +def test_char_before_cursor_grapheme(): + assert Document(FAMILY + "x", len(FAMILY)).char_before_cursor == FAMILY + assert Document(CAFE, len(CAFE)).char_before_cursor == "e\u0301" + + +def test_char_before_cursor_inside_grapheme(): + """Cursor on combining tilde returns previous grapheme.""" + assert Document(NINO, 3).char_before_cursor == "i" + + +def test_char_before_cursor_at_start(): + """Cursor at start of text returns empty string.""" + assert Document("hello", 0).char_before_cursor == "" + assert Document("", 0).char_before_cursor == "" + + +def test_explode_text_fragments_grapheme(): + # Family emoji should stay as single fragment + fragments = [("", FAMILY + "x")] + exploded = explode_text_fragments(fragments) + assert len(exploded) == 2 + assert exploded[0][1] == FAMILY + assert exploded[1][1] == "x" + + # Combining accent should stay with base character + fragments = [("", CAFE)] + exploded = explode_text_fragments(fragments) + assert len(exploded) == 4 + assert exploded[3][1] == "e\u0301" + + # Flag should stay as single fragment + fragments = [("", FLAG + "!")] + exploded = explode_text_fragments(fragments) + assert len(exploded) == 2 + assert exploded[0][1] == FLAG + assert exploded[1][1] == "!" + + +def test_delete_before_cursor_grapheme(): + from prompt_toolkit.buffer import Buffer + + # Deleting skin tone modifier should delete entire emoji + WAVE_DARK = "\U0001f44b\U0001f3ff" # 👋🏿 + buf = Buffer() + buf.text = WAVE_DARK + "x" + buf.cursor_position = len(WAVE_DARK) + deleted = buf.delete_before_cursor(count=1) + assert deleted == WAVE_DARK + assert buf.text == "x" + + # Deleting combining accent should delete entire grapheme + buf.text = CAFE + buf.cursor_position = len(CAFE) + deleted = buf.delete_before_cursor(count=1) + assert deleted == "e\u0301" + assert buf.text == "caf" + + +def test_delete_grapheme(): + from prompt_toolkit.buffer import Buffer + + # Forward delete on emoji should delete entire grapheme + buf = Buffer() + buf.text = FAMILY + "x" + buf.cursor_position = 0 + deleted = buf.delete(count=1) + assert deleted == FAMILY + assert buf.text == "x" + + # Forward delete on combining character + buf.text = CAFE + buf.cursor_position = 3 # Before 'e' + accent + deleted = buf.delete(count=1) + assert deleted == "e\u0301" + assert buf.text == "caf"