From aed10aa93b8898ac5a65553297e1079f0c32a687 Mon Sep 17 00:00:00 2001 From: Jeff Quast Date: Tue, 27 Jan 2026 16:04:16 -0500 Subject: [PATCH] Add grapheme clustering support for cursor movement MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **Problem** Test sequence (copy and paste into any REPL/edit area):: 👨‍👩‍👧 👩‍❤‍👨 👩‍💻👋🏿 ❤️⭐ 🇯🇵🇩🇪 café niño Åộ 中文!. Moving the cursor over and around emojis get strange. insertions become chaotic. Cursor position becomes indeterminate (even negative!), input result becomes more corrupted with user confusion as draws become corrupted. This is briefly described in #274 by @jonathanslenders: > Notice that it still requires multiple cursor movements (left/right arrow) to move across these characters. **Solution**: Close #274 "Handle decomposed unicode characters" (2018) through careful integration of new functions, [wcwidth.iter_graphemes](https://wcwidth.readthedocs.io/en/latest/intro.html#iter-graphemes) and [wcwidth.grapheme_boundary_before](https://wcwidth.readthedocs.io/en/latest/api.html#wcwidth.grapheme_boundary_before). getting there, working on a PTY test suite I don't feel comfortable changing so much code for a large library without also including more detailed tests -- i keep fixing all errors with TDD/automatic tests, but when using it interactively, the cursor position is out of control --- .github/workflows/test.yaml | 4 +- pyproject.toml | 9 +- src/prompt_toolkit/buffer.py | 41 +++-- src/prompt_toolkit/document.py | 86 ++++++++--- src/prompt_toolkit/formatted_text/utils.py | 10 +- src/prompt_toolkit/layout/containers.py | 25 +-- src/prompt_toolkit/layout/controls.py | 34 ++++- src/prompt_toolkit/layout/utils.py | 8 +- src/prompt_toolkit/utils.py | 29 ++-- tests/pty_accessories.py | 165 ++++++++++++++++++++ tests/pty_repl.py | 75 +++++++++ tests/test_formatted_text.py | 14 +- tests/test_pty_basic.py | 59 ++++++++ tests/test_pty_grapheme.py | 106 +++++++++++++ tests/test_wcwidth_integration.py | 167 +++++++++++++++++++++ 15 files changed, 749 insertions(+), 83 deletions(-) create mode 100644 tests/pty_accessories.py create mode 100644 tests/pty_repl.py create mode 100644 tests/test_pty_basic.py create mode 100644 tests/test_pty_grapheme.py create mode 100644 tests/test_wcwidth_integration.py diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 22080c95b..d7f3464ff 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -32,7 +32,9 @@ jobs: uvx typos . - name: Unit test run: | - uvx --with . --with pytest coverage run -m pytest tests/ + uvx --with . --with pytest coverage[toml] run -m pytest tests/ + uvx coverage[toml] combine + uvx coverage[toml] report - name: Type Checking if: ${{ matrix.python-version != '3.8' }} run: | diff --git a/pyproject.toml b/pyproject.toml index 1d4e169b0..26ac7a1f7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,7 +22,7 @@ classifiers = [ ] requires-python = ">=3.8" dependencies = [ - "wcwidth>=0.1.4", + "wcwidth>=0.5.0", ] [project.urls] @@ -86,6 +86,9 @@ extend-ignore-re = [ # Lorem ipsum. "Nam", "varius", + # Partial words in grapheme clustering tests (niño, café). + "nin", + "caf", ] locale = 'en-us' # US English. @@ -118,6 +121,10 @@ warn_return_any = true warn_unused_configs = true warn_unused_ignores = true +[tool.coverage.run] +source = ["src/prompt_toolkit"] +parallel = true + [build-system] requires = ["setuptools>=68"] build-backend = "setuptools.build_meta" diff --git a/src/prompt_toolkit/buffer.py b/src/prompt_toolkit/buffer.py index f5847d4ab..f14cb44ca 100644 --- a/src/prompt_toolkit/buffer.py +++ b/src/prompt_toolkit/buffer.py @@ -18,6 +18,8 @@ from functools import wraps from typing import Any, Callable, Coroutine, Iterable, TypeVar, cast +import wcwidth + from .application.current import get_app from .application.run_in_terminal import run_in_terminal from .auto_suggest import AutoSuggest, Suggestion @@ -764,20 +766,24 @@ def auto_down( def delete_before_cursor(self, count: int = 1) -> str: """ - Delete specified number of characters before cursor and return the - deleted text. + Delete specified number of grapheme clusters before cursor and return + the deleted text. """ assert count >= 0 deleted = "" if self.cursor_position > 0: - deleted = self.text[self.cursor_position - count : self.cursor_position] - - new_text = ( - self.text[: self.cursor_position - count] - + self.text[self.cursor_position :] - ) - new_cursor_position = self.cursor_position - len(deleted) + # Find position after deleting `count` grapheme clusters. + # Loop is required since grapheme clusters have variable length. + pos = self.cursor_position + for _ in range(count): + if pos <= 0: + break + pos = wcwidth.grapheme_boundary_before(self.text, pos) + + deleted = self.text[pos : self.cursor_position] + new_text = self.text[:pos] + self.text[self.cursor_position :] + new_cursor_position = pos # Set new Document atomically. self.document = Document(new_text, new_cursor_position) @@ -786,14 +792,19 @@ def delete_before_cursor(self, count: int = 1) -> str: def delete(self, count: int = 1) -> str: """ - Delete specified number of characters and Return the deleted text. + Delete specified number of grapheme clusters and return the deleted text. """ if self.cursor_position < len(self.text): - deleted = self.document.text_after_cursor[:count] - self.text = ( - self.text[: self.cursor_position] - + self.text[self.cursor_position + len(deleted) :] - ) + # Find position after `count` grapheme clusters. + text_after = self.text[self.cursor_position :] + pos = 0 + for i, grapheme in enumerate(wcwidth.iter_graphemes(text_after)): + if i >= count: + break + pos += len(grapheme) + + deleted = text_after[:pos] + self.text = self.text[: self.cursor_position] + text_after[pos:] return deleted else: return "" diff --git a/src/prompt_toolkit/document.py b/src/prompt_toolkit/document.py index d2657a50e..40c508e27 100644 --- a/src/prompt_toolkit/document.py +++ b/src/prompt_toolkit/document.py @@ -10,6 +10,8 @@ import weakref from typing import Callable, Dict, Iterable, List, NoReturn, Pattern, cast +import wcwidth + from .clipboard import ClipboardData from .filters import vi_mode from .selection import PasteMode, SelectionState, SelectionType @@ -158,13 +160,49 @@ def selection(self) -> SelectionState | None: @property def current_char(self) -> str: - """Return character under cursor or an empty string.""" - return self._get_char_relative_to_cursor(0) or "" + """ + Return grapheme cluster at cursor position, or empty string at end. + + Note: Returns a grapheme cluster which may contain multiple code points. + If cursor is inside a grapheme cluster (e.g., on a combining character), + returns the complete grapheme containing the cursor. + """ + if self.cursor_position >= len(self.text): + return "" + grapheme_start = wcwidth.grapheme_boundary_before( + self.text, self.cursor_position + 1 + ) + for g in wcwidth.iter_graphemes(self.text[grapheme_start:]): + return g + return "" @property def char_before_cursor(self) -> str: - """Return character before the cursor or an empty string.""" - return self._get_char_relative_to_cursor(-1) or "" + """ + Return grapheme cluster before the cursor, or empty string at start. + + Note: Returns a grapheme cluster which may contain multiple code points. + If cursor is inside a grapheme cluster (e.g., on a combining character), + returns the grapheme before the one containing the cursor. + """ + if self.cursor_position == 0: + return "" + + text = self.text + cursor = self.cursor_position + + # Find reference point: cursor position or start of containing grapheme. + if cursor >= len(text): + reference = len(text) + else: + grapheme_start = wcwidth.grapheme_boundary_before(text, cursor + 1) + reference = grapheme_start if grapheme_start < cursor else cursor + + if reference == 0: + return "" + + prev_start = wcwidth.grapheme_boundary_before(text, reference) + return text[prev_start:reference] @property def text_before_cursor(self) -> str: @@ -251,15 +289,6 @@ def leading_whitespace_in_current_line(self) -> str: length = len(current_line) - len(current_line.lstrip()) return current_line[:length] - def _get_char_relative_to_cursor(self, offset: int = 0) -> str: - """ - Return character relative to cursor position, or empty string - """ - try: - return self.text[self.cursor_position + offset] - except IndexError: - return "" - @property def on_first_line(self) -> bool: """ @@ -692,21 +721,44 @@ def find_previous_matching_line( def get_cursor_left_position(self, count: int = 1) -> int: """ - Relative position for cursor left. + Relative position for cursor left (grapheme cluster aware). """ if count < 0: return self.get_cursor_right_position(-count) - return -min(self.cursor_position_col, count) + line_before = self.current_line_before_cursor + if not line_before: + return 0 + + pos = len(line_before) + for _ in range(count): + if pos <= 0: + break + new_pos = wcwidth.grapheme_boundary_before(line_before, pos) + if new_pos == pos: + break + pos = new_pos + + return pos - len(line_before) def get_cursor_right_position(self, count: int = 1) -> int: """ - Relative position for cursor_right. + Relative position for cursor right (grapheme cluster aware). """ if count < 0: return self.get_cursor_left_position(-count) - return min(count, len(self.current_line_after_cursor)) + line_after = self.current_line_after_cursor + if not line_after: + return 0 + + pos = 0 + for i, grapheme in enumerate(wcwidth.iter_graphemes(line_after)): + if i >= count: + break + pos += len(grapheme) + + return pos def get_cursor_up_position( self, count: int = 1, preferred_column: int | None = None diff --git a/src/prompt_toolkit/formatted_text/utils.py b/src/prompt_toolkit/formatted_text/utils.py index a6f78cb4e..4dfcfcf7c 100644 --- a/src/prompt_toolkit/formatted_text/utils.py +++ b/src/prompt_toolkit/formatted_text/utils.py @@ -9,7 +9,7 @@ from typing import Iterable, cast -from prompt_toolkit.utils import get_cwidth +import wcwidth from .base import ( AnyFormattedText, @@ -48,17 +48,15 @@ def fragment_list_len(fragments: StyleAndTextTuples) -> int: def fragment_list_width(fragments: StyleAndTextTuples) -> int: """ Return the character width of this text fragment list. - (Take double width characters into account.) + (Take double width characters and grapheme clusters into account.) :param fragments: List of ``(style_str, text)`` or ``(style_str, text, mouse_handler)`` tuples. """ - ZeroWidthEscape = "[ZeroWidthEscape]" return sum( - get_cwidth(c) + wcwidth.width(item[1], control_codes="ignore") for item in fragments - for c in item[1] - if ZeroWidthEscape not in item[0] + if "[ZeroWidthEscape]" not in item[0] ) diff --git a/src/prompt_toolkit/layout/containers.py b/src/prompt_toolkit/layout/containers.py index f6fe381f5..ffabde018 100644 --- a/src/prompt_toolkit/layout/containers.py +++ b/src/prompt_toolkit/layout/containers.py @@ -10,6 +10,8 @@ from functools import partial from typing import TYPE_CHECKING, Callable, Sequence, Union, cast +import wcwidth + from prompt_toolkit.application.current import get_app from prompt_toolkit.cache import SimpleCache from prompt_toolkit.data_structures import Point @@ -2014,7 +2016,7 @@ def copy_line( new_screen.zero_width_escapes[y + ypos][x + xpos] += text continue - for c in text: + for c in wcwidth.iter_graphemes(text): char = _CHAR_CACHE[c, style] char_width = char.width @@ -2052,26 +2054,7 @@ def copy_line( for i in range(1, char_width): new_buffer_row[x + xpos + i] = empty_char - # If this is a zero width characters, then it's - # probably part of a decomposed unicode character. - # See: https://en.wikipedia.org/wiki/Unicode_equivalence - # Merge it in the previous cell. - elif char_width == 0: - # Handle all character widths. If the previous - # character is a multiwidth character, then - # merge it two positions back. - for pw in [2, 1]: # Previous character width. - if ( - x - pw >= 0 - and new_buffer_row[x + xpos - pw].width == pw - ): - prev_char = new_buffer_row[x + xpos - pw] - char2 = _CHAR_CACHE[ - prev_char.char + c, prev_char.style - ] - new_buffer_row[x + xpos - pw] = char2 - - # Keep track of write position for each character. + # Keep track of write position for each grapheme. current_rowcol_to_yx[lineno, col + skipped] = ( y + ypos, x + xpos, diff --git a/src/prompt_toolkit/layout/controls.py b/src/prompt_toolkit/layout/controls.py index 5083c8286..fd6f71dcb 100644 --- a/src/prompt_toolkit/layout/controls.py +++ b/src/prompt_toolkit/layout/controls.py @@ -8,6 +8,8 @@ from abc import ABCMeta, abstractmethod from typing import TYPE_CHECKING, Callable, Hashable, Iterable, NamedTuple +import wcwidth + from prompt_toolkit.application.current import get_app from prompt_toolkit.buffer import Buffer from prompt_toolkit.cache import SimpleCache @@ -674,19 +676,29 @@ def transform( ) -> _ProcessedLine: "Transform the fragments for a given line number." - # Get cursor position at this line. - def source_to_display(i: int) -> int: - """X position from the buffer to the x position in the - processed fragment list. By default, we start from the 'identity' - operation.""" - return i + # Build code point to grapheme index mapping for cursor positioning. + line_text = fragment_list_to_text(fragments) + codepoint_to_grapheme: dict[int, int] = {} + grapheme_idx = 0 + codepoint_idx = 0 + for grapheme in wcwidth.iter_graphemes(line_text): + for _ in grapheme: + codepoint_to_grapheme[codepoint_idx] = grapheme_idx + codepoint_idx += 1 + grapheme_idx += 1 + + def grapheme_source_to_display(i: int) -> int: + """Map code point index to grapheme index.""" + if i >= codepoint_idx: + return grapheme_idx + (i - codepoint_idx) + return codepoint_to_grapheme.get(i, grapheme_idx) transformation = merged_processor.apply_transformation( TransformationInput( self, document, lineno, - source_to_display, + grapheme_source_to_display, fragments, width, height, @@ -694,9 +706,15 @@ def source_to_display(i: int) -> int: ) ) + # Compose grapheme mapping with processor transformations. + proc_s2d = transformation.source_to_display + + def final_source_to_display(i: int) -> int: + return proc_s2d(grapheme_source_to_display(i)) + return _ProcessedLine( transformation.fragments, - transformation.source_to_display, + final_source_to_display, transformation.display_to_source, ) diff --git a/src/prompt_toolkit/layout/utils.py b/src/prompt_toolkit/layout/utils.py index 373fe52a5..161a8c170 100644 --- a/src/prompt_toolkit/layout/utils.py +++ b/src/prompt_toolkit/layout/utils.py @@ -2,6 +2,8 @@ from typing import TYPE_CHECKING, Iterable, List, TypeVar, cast, overload +import wcwidth + from prompt_toolkit.formatted_text.base import OneStyleAndTextTuple if TYPE_CHECKING: @@ -60,7 +62,7 @@ def __setitem__( def explode_text_fragments(fragments: Iterable[_T]) -> _ExplodedList[_T]: """ Turn a list of (style_str, text) tuples into another list where each string is - exactly one character. + exactly one grapheme cluster. It should be fine to call this function several times. Calling this on a list that is already exploded, is a null operation. @@ -74,7 +76,7 @@ def explode_text_fragments(fragments: Iterable[_T]) -> _ExplodedList[_T]: result: list[_T] = [] for style, string, *rest in fragments: - for c in string: - result.append((style, c, *rest)) # type: ignore + for grapheme in wcwidth.iter_graphemes(string): + result.append((style, grapheme, *rest)) # type: ignore return _ExplodedList(result) diff --git a/src/prompt_toolkit/utils.py b/src/prompt_toolkit/utils.py index 1a99a2868..8daea7e60 100644 --- a/src/prompt_toolkit/utils.py +++ b/src/prompt_toolkit/utils.py @@ -11,16 +11,19 @@ Dict, Generator, Generic, + Iterator, TypeVar, Union, ) -from wcwidth import wcwidth +import wcwidth __all__ = [ "Event", "DummyContext", "get_cwidth", + "iter_grapheme_clusters", + "grapheme_cluster_count", "suspend_to_background_supported", "is_conemu_ansi", "is_windows", @@ -138,15 +141,7 @@ def __init__(self) -> None: self._long_strings: deque[str] = deque() def __missing__(self, string: str) -> int: - # Note: We use the `max(0, ...` because some non printable control - # characters, like e.g. Ctrl-underscore get a -1 wcwidth value. - # It can be possible that these characters end up in the input - # text. - result: int - if len(string) == 1: - result = max(0, wcwidth(string)) - else: - result = sum(self[c] for c in string) + result = wcwidth.width(string, control_codes="ignore") # Store in cache. self[string] = result @@ -175,6 +170,20 @@ def get_cwidth(string: str) -> int: return _CHAR_SIZES_CACHE[string] +def iter_grapheme_clusters(text: str) -> Iterator[str]: + """ + Iterate over grapheme clusters in text. Wrapper around ``wcwidth.iter_graphemes``. + """ + return wcwidth.iter_graphemes(text) + + +def grapheme_cluster_count(text: str) -> int: + """ + Return the number of grapheme clusters in text. + """ + return sum(1 for _ in wcwidth.iter_graphemes(text)) + + def suspend_to_background_supported() -> bool: """ Returns `True` when the Python implementation supports diff --git a/tests/pty_accessories.py b/tests/pty_accessories.py new file mode 100644 index 000000000..a95868637 --- /dev/null +++ b/tests/pty_accessories.py @@ -0,0 +1,165 @@ +"""PTY testing utilities, ported from 'blessed' by Jeff Quast.""" + +from __future__ import annotations + +import codecs +import contextlib +import os +import platform +import signal +import struct +import sys +import time +import warnings + +IS_WINDOWS = platform.system() == "Windows" + + +def init_subproc_coverage(run_note: str | None = None): + """ + Initialize coverage tracking in a forked subprocess. + + Ported from blessed library's test accessories. Call this at the start + of any script executed via PTY fork/exec to enable coverage tracking. + + :param run_note: Optional note for coverage context (unused). + :returns: Coverage instance or None if coverage not available. + """ + try: + import coverage + except ImportError: + return None + + # Look for pyproject.toml or tox.ini as coverage config + test_dir = os.path.dirname(__file__) + for config_name in ("pyproject.toml", "tox.ini"): + config_path = os.path.join(test_dir, os.pardir, config_name) + if os.path.exists(config_path): + break + else: + config_path = None + + cov = coverage.Coverage(config_file=config_path) + cov.start() + return cov + + +if not IS_WINDOWS: + import fcntl + import pty + import termios + +# note how the tty driver translates '\n' output to '\r\n' +SEND_SEMAPHORE = b"SEMAPHORE\n" +RECV_SEMAPHORE = b"SEMAPHORE\r\n" + + +def _setwinsize(fd: int, rows: int, cols: int) -> None: + """Set PTY window size via TIOCSWINSZ ioctl.""" + TIOCSWINSZ = getattr(termios, "TIOCSWINSZ", -2146929561) + fcntl.ioctl(fd, TIOCSWINSZ, struct.pack("HHHH", rows, cols, 0, 0)) + + +def read_until_marker(fd: int, marker: str, timeout: float = 5.0) -> str: + """Read from fd until marker found or timeout.""" + decoder = codecs.getincrementaldecoder("utf8")() + output = "" + start = time.time() + while marker not in output: + if time.time() - start > timeout: + raise TimeoutError(f"Marker {marker!r} not found. Got: {output!r}") + try: + chunk = os.read(fd, 1) + except OSError: + break + if not chunk: + break + output += decoder.decode(chunk, final=False) + return output + + +@contextlib.contextmanager +def echo_off(fd: int): + """Disable PTY echo.""" + attrs = termios.tcgetattr(fd) + try: + attrs[3] = attrs[3] & ~termios.ECHO + termios.tcsetattr(fd, termios.TCSANOW, attrs) + yield + finally: + attrs[3] = attrs[3] | termios.ECHO + termios.tcsetattr(fd, termios.TCSANOW, attrs) + + +def spawn_pty_process(script: str, rows: int = 24, cols: int = 80) -> tuple[int, int]: + """Spawn script in PTY with given size. Returns (master_fd, pid).""" + with warnings.catch_warnings(): + # modern python 3.14+ raises a DeprecationWarning, I guess they may plan to delete pty + # module someday and we will have to manage our own backport? + warnings.filterwarnings("ignore", category=DeprecationWarning) + pid, master_fd = pty.fork() + if pid == 0: + # note how sys.executable is used, to ensure the given script is executed with exactly the + # same python interpreter as used for the parent process, receiving all of its environment + # variables, site-packages, PATH and PYTHONPATH that got it here. + os.execv(sys.executable, [sys.executable, script]) + attrs = termios.tcgetattr(master_fd) + attrs[3] = attrs[3] & ~termios.ECHO + termios.tcsetattr(master_fd, termios.TCSANOW, attrs) + _setwinsize(master_fd, rows, cols) + return master_fd, pid + + +def cleanup_child(pid: int, master_fd: int, timeout: float = 5.0) -> int: + """Wait for child, kill if needed. Returns exit status.""" + start = time.time() + while True: + result, status = os.waitpid(pid, os.WNOHANG) + if result != 0: + # if the child is not ready to exit, send EOF, causes most programs to exit. + os.close(master_fd) + return os.WEXITSTATUS(status) + if time.time() - start > timeout: + # but after timeout, we have a "locked up" client, "not responding", most likely we made + # an error in our "call, reply" pattern of the tests. Kill the program so that the + # MainProcess can become unblocked reading their side of the pty and move on. + try: + os.kill(pid, signal.SIGKILL) + os.waitpid(pid, 0) + except OSError: + pass + os.close(master_fd) + raise TimeoutError(f"Child {pid} did not exit within {timeout}s") + time.sleep(0.05) + + +def extract_output(text: str, start: str = "OUTPUT:", end: str = ":END") -> str: + """Extract text between markers.""" + if start not in text: + return "" + after = text.split(start, 1)[1] + return after.split(end, 1)[0] if end in after else after + + +@contextlib.contextmanager +def pty_session(script: str, rows: int = 24, cols: int = 80): + """ + Context manager for PTY test sessions. + + Spawns pty_repl.py, waits for READY, yields master_fd, then sends QUIT + and cleans up the child process. + + Usage:: + + with pty_session(repl_script, rows=24, cols=80) as fd: + os.write(fd, b"some input\\r") + output = read_until_marker(fd, ":END") + """ + master_fd, pid = spawn_pty_process(script, rows, cols) + try: + read_until_marker(master_fd, "READY") + yield master_fd + finally: + os.write(master_fd, b"QUIT\r") + time.sleep(0.1) + cleanup_child(pid, master_fd) diff --git a/tests/pty_repl.py b/tests/pty_repl.py new file mode 100644 index 000000000..08fe99d31 --- /dev/null +++ b/tests/pty_repl.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +""" +Minimal REPL for PTY integration tests. + +This offers code coverage without mocks, but using real tty features of the natural "live" call to +PromptSession() and session.prompt. prompt_toolkit sees a real terminal through use of shared +pty_accessories module. +""" + +from __future__ import annotations + +import os +import sys +import termios +import tty + +from pty_accessories import init_subproc_coverage + +from prompt_toolkit import PromptSession +from prompt_toolkit.input import create_input +from prompt_toolkit.output import create_output + + +def main() -> None: + """Run REPL: SIZE/TERMIOS/QUIT commands, else echo with OUTPUT:...:END.""" + # Signal readiness before creating session (avoids race with PTY setup) + os.write(sys.stdout.fileno(), b"READY\n") + + # Use natural stdin/stdout - a PTY is indistinguishable from real tty a pty provides the same + # facility as a real terminal "emulator", allowing us to write tests (and coverage) + # for natural "live" calls of PromptSession(). + session = PromptSession(input=create_input(), output=create_output()) + + try: + while True: + result = session.prompt("> ") + cmd = result.strip().upper() + + if cmd == "SIZE": + size = os.get_terminal_size(sys.stdin.fileno()) + os.write( + sys.stdout.fileno(), + f"SIZE:{size.lines}x{size.columns}:END\n".encode(), + ) + elif cmd == "TERMIOS": + attrs = termios.tcgetattr(sys.stdin.fileno()) + lflag, iflag = attrs[tty.LFLAG], attrs[tty.IFLAG] + flags = { + "ECHO": bool(lflag & termios.ECHO), + "ICANON": bool(lflag & termios.ICANON), + "ISIG": bool(lflag & termios.ISIG), + "IEXTEN": bool(lflag & termios.IEXTEN), + "ICRNL": bool(iflag & termios.ICRNL), + "IXON": bool(iflag & termios.IXON), + "VMIN": attrs[tty.CC][termios.VMIN], + } + flag_str = ",".join(f"{k}={v}" for k, v in sorted(flags.items())) + os.write(sys.stdout.fileno(), f"TERMIOS:{flag_str}:END\n".encode()) + elif cmd in ("QUIT", "EXIT"): + break + else: + os.write(sys.stdout.fileno(), f"OUTPUT:{result}:END\n".encode()) + break # Single-shot mode for grapheme tests + except (EOFError, KeyboardInterrupt): + pass + + +if __name__ == "__main__": + cov = init_subproc_coverage("pty_repl") + try: + main() + finally: + if cov is not None: + cov.stop() + cov.save() diff --git a/tests/test_formatted_text.py b/tests/test_formatted_text.py index a111a7f20..72ba69164 100644 --- a/tests/test_formatted_text.py +++ b/tests/test_formatted_text.py @@ -9,7 +9,7 @@ merge_formatted_text, to_formatted_text, ) -from prompt_toolkit.formatted_text.utils import split_lines +from prompt_toolkit.formatted_text.utils import fragment_list_width, split_lines def test_basic_html(): @@ -336,3 +336,15 @@ def test_split_lines_4(): [("class:a", "line1")], [("class:a", "")], ] + + +def test_fragment_list_width(): + family = "\U0001f468\u200d\U0001f469\u200d\U0001f467" # ZWJ sequence + heart = "\u2764\ufe0f" # VS-16 emoji + assert fragment_list_width([("", "hello")]) == 5 + assert fragment_list_width([("", family)]) == 2 + assert fragment_list_width([("", heart)]) == 2 + + +def test_fragment_list_width_zero_width_escape(): + assert fragment_list_width([("[ZeroWidthEscape]", "arbitrary")]) == 0 diff --git a/tests/test_pty_basic.py b/tests/test_pty_basic.py new file mode 100644 index 000000000..f407bcbb7 --- /dev/null +++ b/tests/test_pty_basic.py @@ -0,0 +1,59 @@ +r"""PTY-based tests for terminal size and termios state.""" + +from __future__ import annotations + +import fcntl +import os +import platform +import struct +import termios +import time + +import pytest +from pty_accessories import extract_output, pty_session, read_until_marker + +pytestmark = pytest.mark.skipif( + platform.system() == "Windows", reason="PTY tests not supported on Windows" +) + + +@pytest.fixture +def repl_script(): + return os.path.join(os.path.dirname(__file__), "pty_repl.py") + + +def _setwinsize(fd: int, rows: int, cols: int) -> None: + TIOCSWINSZ = getattr(termios, "TIOCSWINSZ", -2146929561) + fcntl.ioctl(fd, TIOCSWINSZ, struct.pack("HHHH", rows, cols, 0, 0)) + + +def _get_size(fd: int) -> tuple[int, int]: + # manage call and reply of 'SIZE' command to pty_repl.py + os.write(fd, b"SIZE\r") + size_str = extract_output(read_until_marker(fd, ":END"), "SIZE:", ":END") + r, c = size_str.split("x") + return int(r), int(c) + + +@pytest.mark.parametrize("rows,cols", [(25, 80), (3, 10), (100, 200), (1, 1)]) +def test_size_detection(repl_script, rows, cols): + with pty_session(repl_script, rows=rows, cols=cols) as fd: + assert _get_size(fd) == (rows, cols) + + +def test_dynamic_size_change(repl_script): + with pty_session(repl_script, rows=24, cols=80) as fd: + assert _get_size(fd) == (24, 80) + # resize and verify each change is detected + for target in [(40, 120), (10, 40), (3, 10)]: + _setwinsize(fd, *target) + time.sleep(0.05) + assert _get_size(fd) == target + + +def test_termios_flags(repl_script): + with pty_session(repl_script) as fd: + os.write(fd, b"TERMIOS\r") + flags_str = extract_output(read_until_marker(fd, ":END"), "TERMIOS:", ":END") + flags = dict(pair.split("=") for pair in flags_str.split(",")) + assert all(k in flags for k in ("ECHO", "ICANON", "ISIG", "VMIN")) diff --git a/tests/test_pty_grapheme.py b/tests/test_pty_grapheme.py new file mode 100644 index 000000000..4c69ace94 --- /dev/null +++ b/tests/test_pty_grapheme.py @@ -0,0 +1,106 @@ +r"""PTY-based grapheme clustering tests. + +Tests that cursor movement and editing operations respect grapheme cluster +boundaries for complex Unicode sequences like emoji with ZWJ, skin tones, +combining characters, and regional indicators. + +The pty helpers are designed for this particular test, while improving support +for grapheme clustering, so many errors were only found by interactive testing, +but systematic to test--just move the cursor, inserting, and erasing text +and test the desired result. +""" + +from __future__ import annotations + +import os +import platform +import time + +import pytest +from pty_accessories import extract_output, pty_session, read_until_marker + +pytestmark = pytest.mark.skipif( + platform.system() == "Windows", reason="PTY tests not supported on Windows" +) + +# Key sequences +LEFT = "\x1b[D" # cursor left +RIGHT = "\x1b[C" # cursor right +HOME = "\x1b[H" # home +BS = "\x7f" # backspace +DEL = "\x1b[3~" # forward delete +ENTER = "\r" + +# Grapheme clusters covering major Unicode complexity classes +GRAPHEMES = [ + # U+1F468 U+200D U+1F469 U+200D U+1F467 + ( + "\U0001f468\u200d\U0001f469\u200d\U0001f467", + "zwj_family", + ), + # U+2764 U+FE0F (VS-16) + ("\u2764\ufe0f", "vs16_heart"), + # U+1F1E8 U+1F1E6 (C+A regional indicators) + ("\U0001f1e8\U0001f1e6", "flag_ca"), + # U+0065 U+0301 + ("e\u0301", "combining_acute"), + # U+1100 U+1161 + ("\u1100\u1161", "hangul_lv"), + # Devanagari conjunct + ("\u0915\u094d\u0937\u093f", "devanagari"), + # U+1F44B U+1F3FB + ("\U0001f44b\U0001f3fb", "skin_tone"), +] + + +@pytest.fixture +def repl_script(): + return os.path.join(os.path.dirname(__file__), "pty_repl.py") + + +@pytest.mark.parametrize("grapheme,name", GRAPHEMES) +def test_backspace_deletes_grapheme(repl_script, grapheme, name): + """Type 3 graphemes, backspace twice, verify 1 remains.""" + with pty_session(repl_script, rows=3, cols=40) as fd: + os.write(fd, (grapheme * 3).encode()) + time.sleep(0.1) + os.write(fd, (BS * 2 + ENTER).encode()) + assert extract_output(read_until_marker(fd, ":END")) == grapheme + + +@pytest.mark.parametrize("grapheme,name", GRAPHEMES) +def test_cursor_movement_respects_grapheme(repl_script, grapheme, name): + """Type 3 graphemes, LEFT, insert 'x' -> pattern is 2+x+1.""" + with pty_session(repl_script, rows=3, cols=40) as fd: + os.write(fd, (grapheme * 3).encode()) + time.sleep(0.1) + os.write(fd, (LEFT + "x" + ENTER).encode()) + assert ( + extract_output(read_until_marker(fd, ":END")) + == grapheme * 2 + "x" + grapheme + ) + + +@pytest.mark.parametrize("grapheme,name", GRAPHEMES) +def test_forward_delete_removes_grapheme(repl_script, grapheme, name): + """Type 3 graphemes, HOME, DELETE -> 2 remain.""" + with pty_session(repl_script, rows=3, cols=40) as fd: + os.write(fd, (grapheme * 3).encode()) + time.sleep(0.1) + os.write(fd, (HOME + DEL + ENTER).encode()) + assert extract_output(read_until_marker(fd, ":END")) == grapheme * 2 + + +@pytest.mark.parametrize( + "grapheme,name", [GRAPHEMES[0], GRAPHEMES[2]] +) # ZWJ family, CA flag +def test_grapheme_in_tiny_window(repl_script, grapheme, name): + """Type 10 graphemes in 10-col window, backspace 5, verify 5 remain.""" + with pty_session(repl_script, rows=3, cols=10) as fd: + os.write(fd, (grapheme * 10).encode()) + time.sleep(0.2) + os.write(fd, (BS * 5 + ENTER).encode()) + time.sleep(0.2) + assert ( + extract_output(read_until_marker(fd, ":END", timeout=10.0)) == grapheme * 5 + ) diff --git a/tests/test_wcwidth_integration.py b/tests/test_wcwidth_integration.py new file mode 100644 index 000000000..656e44a8a --- /dev/null +++ b/tests/test_wcwidth_integration.py @@ -0,0 +1,167 @@ +from prompt_toolkit.document import Document +from prompt_toolkit.layout.utils import explode_text_fragments +from prompt_toolkit.utils import ( + get_cwidth, + grapheme_cluster_count, + iter_grapheme_clusters, +) + +# Test constants +FAMILY = "\U0001f468\u200d\U0001f469\u200d\U0001f467" # ZWJ sequence +FLAG = "\U0001f1fa\U0001f1f8" # Regional indicators +CAFE = "cafe\u0301" # Combining accent +NINO = "nin\u0303o" # n + i + n + combining tilde + o = niño + + +def test_get_cwidth(): + # ASCII + assert get_cwidth("") == 0 + assert get_cwidth("hello") == 5 + + # CJK wide characters + assert get_cwidth("\u4e2d") == 2 + assert get_cwidth("\u4e2d\u6587") == 4 + + # Emoji sequences (ZWJ, flags, skin tones, VS-16) + assert get_cwidth(FAMILY) == 2 + assert get_cwidth(FLAG) == 2 + assert get_cwidth("\U0001f44b\U0001f3fb") == 2 # skin tone + assert get_cwidth("\u2764\ufe0f") == 2 # VS-16 + + # Combining characters + assert get_cwidth("e\u0301") == 1 + assert get_cwidth(CAFE) == 4 + + +def test_grapheme_cluster_iteration(): + assert list(iter_grapheme_clusters("hello")) == ["h", "e", "l", "l", "o"] + assert list(iter_grapheme_clusters(FAMILY)) == [FAMILY] + assert list(iter_grapheme_clusters(FLAG)) == [FLAG] + assert list(iter_grapheme_clusters(CAFE)) == ["c", "a", "f", "e\u0301"] + + +def test_grapheme_cluster_count(): + assert grapheme_cluster_count("hello") == 5 + assert grapheme_cluster_count(FAMILY) == 1 + assert grapheme_cluster_count(CAFE) == 4 + + +def test_cursor_right_grapheme(): + # ASCII unchanged + assert Document("hello", 0).get_cursor_right_position() == 1 + assert Document("hello", 0).get_cursor_right_position(2) == 2 + + # Skips entire grapheme cluster + assert Document(FAMILY + "x", 0).get_cursor_right_position() == len(FAMILY) + assert Document(FLAG + "x", 0).get_cursor_right_position() == len(FLAG) + + # At position 3, 'e\u0301' is one grapheme but 2 code points + assert Document(CAFE, 3).get_cursor_right_position() == 2 + + +def test_cursor_left_grapheme(): + # ASCII unchanged + assert Document("hello", 5).get_cursor_left_position() == -1 + assert Document("hello", 5).get_cursor_left_position(2) == -2 + + # Skips entire grapheme cluster + assert Document(FAMILY + "x", len(FAMILY)).get_cursor_left_position() == -len( + FAMILY + ) + assert Document(FLAG + "x", len(FLAG)).get_cursor_left_position() == -len(FLAG) + + # 'e\u0301' is one grapheme but 2 code points, so -2 + assert Document(CAFE, len(CAFE)).get_cursor_left_position() == -2 + + +def test_current_char_grapheme(): + assert Document(FAMILY + "x", 0).current_char == FAMILY + assert Document(CAFE, 3).current_char == "e\u0301" # position 3 = 'e' + accent + + +def test_current_char_inside_grapheme(): + """Cursor on combining tilde returns full grapheme.""" + assert Document(NINO, 3).current_char == "n\u0303" + + +def test_current_char_at_end(): + """Cursor at end of text returns empty string.""" + assert Document("hello", 5).current_char == "" + assert Document("", 0).current_char == "" + + +def test_char_before_cursor_grapheme(): + assert Document(FAMILY + "x", len(FAMILY)).char_before_cursor == FAMILY + assert Document(CAFE, len(CAFE)).char_before_cursor == "e\u0301" + + +def test_char_before_cursor_inside_grapheme(): + """Cursor on combining tilde returns previous grapheme.""" + assert Document(NINO, 3).char_before_cursor == "i" + + +def test_char_before_cursor_at_start(): + """Cursor at start of text returns empty string.""" + assert Document("hello", 0).char_before_cursor == "" + assert Document("", 0).char_before_cursor == "" + + +def test_explode_text_fragments_grapheme(): + # Family emoji should stay as single fragment + fragments = [("", FAMILY + "x")] + exploded = explode_text_fragments(fragments) + assert len(exploded) == 2 + assert exploded[0][1] == FAMILY + assert exploded[1][1] == "x" + + # Combining accent should stay with base character + fragments = [("", CAFE)] + exploded = explode_text_fragments(fragments) + assert len(exploded) == 4 + assert exploded[3][1] == "e\u0301" + + # Flag should stay as single fragment + fragments = [("", FLAG + "!")] + exploded = explode_text_fragments(fragments) + assert len(exploded) == 2 + assert exploded[0][1] == FLAG + assert exploded[1][1] == "!" + + +def test_delete_before_cursor_grapheme(): + from prompt_toolkit.buffer import Buffer + + # Deleting skin tone modifier should delete entire emoji + WAVE_DARK = "\U0001f44b\U0001f3ff" # 👋🏿 + buf = Buffer() + buf.text = WAVE_DARK + "x" + buf.cursor_position = len(WAVE_DARK) + deleted = buf.delete_before_cursor(count=1) + assert deleted == WAVE_DARK + assert buf.text == "x" + + # Deleting combining accent should delete entire grapheme + buf.text = CAFE + buf.cursor_position = len(CAFE) + deleted = buf.delete_before_cursor(count=1) + assert deleted == "e\u0301" + assert buf.text == "caf" + + +def test_delete_grapheme(): + from prompt_toolkit.buffer import Buffer + + # Forward delete on emoji should delete entire grapheme + buf = Buffer() + buf.text = FAMILY + "x" + buf.cursor_position = 0 + deleted = buf.delete(count=1) + assert deleted == FAMILY + assert buf.text == "x" + + # Forward delete on combining character + buf.text = CAFE + buf.cursor_position = 3 # Before 'e' + accent + deleted = buf.delete(count=1) + assert deleted == "e\u0301" + assert buf.text == "caf"