diff --git a/src/orgparse/__init__.py b/src/orgparse/__init__.py index 110c474..f038364 100644 --- a/src/orgparse/__init__.py +++ b/src/orgparse/__init__.py @@ -106,13 +106,16 @@ """ # [[[end]]] +from __future__ import annotations + from collections.abc import Iterable +from itertools import chain from pathlib import Path from typing import Optional, TextIO, Union -from .node import OrgEnv, OrgNode, parse_lines # todo basenode?? +from .node import OrgBaseNode, OrgEnv, OrgNode, parse_lines # todo basenode?? -__all__ = ["load", "loadi", "loads"] +__all__ = ["dump", "dumps", "load", "loadi", "loads"] def load(path: Union[str, Path, TextIO], env: Optional[OrgEnv] = None) -> OrgNode: @@ -145,6 +148,34 @@ def load(path: Union[str, Path, TextIO], env: Optional[OrgEnv] = None) -> OrgNod return loadi(all_lines, filename=filename, env=env) +def dumps(nodes: OrgBaseNode | Iterable[OrgBaseNode]) -> str: + """ + Dump org-mode nodes to a string. + + :arg nodes: Org root node or iterable of nodes to serialize. + :rtype: str + """ + if isinstance(nodes, OrgBaseNode): + if nodes.is_root(): + lines = chain.from_iterable(node._render_lines() for node in nodes.env.nodes) + return "\n".join(lines) + return str(nodes) + return "\n".join(str(node) for node in nodes) + + +def dump(nodes: OrgBaseNode | Iterable[OrgBaseNode], path: Union[str, Path]) -> None: + """ + Dump org-mode nodes to a file. + + :type path: str or Path + :arg path: Path to write org-mode contents. + """ + content = dumps(nodes) + if isinstance(path, str): + path = Path(path) + path.write_text(content, encoding="utf8") + + def loads(string: str, filename: str = '', env: Optional[OrgEnv] = None) -> OrgNode: """ Load org-mode document from a string. diff --git a/src/orgparse/date.py b/src/orgparse/date.py index 1685f32..cc89ae8 100644 --- a/src/orgparse/date.py +++ b/src/orgparse/date.py @@ -692,19 +692,26 @@ class OrgDateRepeatedTask(OrgDate): _active_default = False - def __init__(self, start, before: str, after: str, active=None) -> None: + def __init__(self, start, before: str, after: str, active=None, *, comment: str | None = None) -> None: super().__init__(start, active=active) self._before = before self._after = after + self._comment = comment def __repr__(self) -> str: - args: list = [self._date_to_tuple(self.start), self.before, self.after] + args: list[str] = [ + repr(self._date_to_tuple(self.start)), + repr(self.before), + repr(self.after), + ] if self._active is not self._active_default: - args.append(self._active) - return '{}({})'.format(self.__class__.__name__, ', '.join(map(repr, args))) + args.append(repr(self._active)) + if self._comment is not None: + args.append(f"comment={self._comment!r}") + return "{}({})".format(self.__class__.__name__, ", ".join(args)) def __hash__(self) -> int: - return hash((self._before, self._after)) + return hash((self._before, self._after, self._comment)) def __eq__(self, other) -> bool: return ( @@ -712,6 +719,7 @@ def __eq__(self, other) -> bool: and isinstance(other, self.__class__) and self._before == other._before and self._after == other._after + and self._comment == other._comment ) @property @@ -737,3 +745,7 @@ def after(self) -> str: """ return self._after + + @property + def comment(self) -> str | None: + return self._comment diff --git a/src/orgparse/lines.py b/src/orgparse/lines.py new file mode 100644 index 0000000..598754d --- /dev/null +++ b/src/orgparse/lines.py @@ -0,0 +1,676 @@ +from __future__ import annotations + +import re +from collections.abc import Sequence +from typing import Optional, Union + +from .date import ( + TIMESTAMP_RE, + OrgDate, + OrgDateClock, + OrgDateClosed, + OrgDateDeadline, + OrgDateRepeatedTask, + OrgDateScheduled, +) + +PropertyValue = Union[str, int, float] + + +def parse_heading_level(heading: str) -> tuple[str, int] | None: + """ + Get star-stripped heading and its level + + >>> parse_heading_level('* Heading') + ('Heading', 1) + >>> parse_heading_level('******** Heading') + ('Heading', 8) + >>> parse_heading_level('*') # None since no space after star + >>> parse_heading_level('*bold*') # None + >>> parse_heading_level('not heading') # None + + """ + m = RE_HEADING_STARS.search(heading) + if m is not None: + return (m.group(2), len(m.group(1))) + return None + + +RE_HEADING_STARS = re.compile(r"^\s*(\*+)\s+(.*?)\s*$") + + +def parse_heading_tags(heading: str) -> tuple[str, list[str]]: + """ + Get first tags and heading without tags + + >>> parse_heading_tags('HEADING') + ('HEADING', []) + >>> parse_heading_tags('HEADING :TAG1:TAG2:') + ('HEADING', ['TAG1', 'TAG2']) + >>> parse_heading_tags('HEADING: this is still heading :TAG1:TAG2:') + ('HEADING: this is still heading', ['TAG1', 'TAG2']) + >>> parse_heading_tags('HEADING :@tag:_tag_:') + ('HEADING', ['@tag', '_tag_']) + + Here is the spec of tags from Org Mode manual: + + Tags are normal words containing letters, numbers, ``_``, and + ``@``. Tags must be preceded and followed by a single colon, + e.g., ``:work:``. + + -- (info "(org) Tags") + + """ + match = RE_HEADING_TAGS.search(heading) + if match: + heading = match.group(1) + tagstr = match.group(2) + tags = tagstr.split(':') + else: + tags = [] + return (heading, tags) + + +# Tags are normal words containing letters, numbers, '_', and '@'. https://orgmode.org/manual/Tags.html +RE_HEADING_TAGS = re.compile(r"(.*?)\s*:([\w@:]+):\s*$") + + +def parse_heading_todos(heading: str, todo_candidates: list[str]) -> tuple[str, Optional[str]]: + """ + Get TODO keyword and heading without TODO keyword. + + >>> todos = ['TODO', 'DONE'] + >>> parse_heading_todos('Normal heading', todos) + ('Normal heading', None) + >>> parse_heading_todos('TODO Heading', todos) + ('Heading', 'TODO') + + """ + for todo in todo_candidates: + if heading == todo: + return ('', todo) + if heading.startswith(todo + ' '): + return (heading[len(todo) + 1 :], todo) + return (heading, None) + + +def parse_heading_priority(heading: str) -> tuple[str, Optional[str]]: + """ + Get priority and heading without priority field. + + >>> parse_heading_priority('HEADING') + ('HEADING', None) + >>> parse_heading_priority('[#A] HEADING') + ('HEADING', 'A') + >>> parse_heading_priority('[#0] HEADING') + ('HEADING', '0') + >>> parse_heading_priority('[#A]') + ('', 'A') + + """ + match = RE_HEADING_PRIORITY.search(heading) + if match: + return (match.group(2), match.group(1)) + else: + return (heading, None) + + +RE_HEADING_PRIORITY = re.compile(r"^\s*\[#([A-Z0-9])\] ?(.*)$") + + +def parse_property(line: str) -> tuple[Optional[str], Optional[PropertyValue]]: + """ + Get property from given string. + + >>> parse_property(':Some_property: some value') + ('Some_property', 'some value') + >>> parse_property(':Effort: 1:10') + ('Effort', 70) + + """ + prop_key = None + prop_val: Optional[Union[str, int, float]] = None + match = RE_PROP.search(line) + if match: + prop_key = match.group(1) + prop_val = match.group(2) + if prop_key == 'Effort': + prop_val = parse_duration_to_minutes(prop_val) + return (prop_key, prop_val) + + +RE_PROP = re.compile(r"^\s*:(.*?):\s*(.*?)\s*$") +RE_PROP_LINE = re.compile(r"^(?P\s*):(?P[^:]+):\s*(?P.*?)\s*$") +RE_REPEAT_TASK_LINE = re.compile( + r"^(?P\s*)-\s+State\s+\"(?P[^\"]+)\"\s+from\s+\"(?P[^\"]+)\"\s+\[(?P[^\]]+)\](?:\s*\\\\(?P.*))?\s*$" +) + + +def parse_duration_to_minutes(duration: str) -> Union[float, int]: + """ + Parse duration minutes from given string. + Convert to integer if number has no decimal points + + >>> parse_duration_to_minutes('3:12') + 192 + >>> parse_duration_to_minutes('1:23:45') + 83.75 + >>> parse_duration_to_minutes('1y 3d 3h 4min') + 530464 + >>> parse_duration_to_minutes('1d3h5min') + 1625 + >>> parse_duration_to_minutes('3d 13:35') + 5135 + >>> parse_duration_to_minutes('2.35h') + 141 + >>> parse_duration_to_minutes('10') + 10 + >>> parse_duration_to_minutes('10.') + 10 + >>> parse_duration_to_minutes('1 h') + 60 + >>> parse_duration_to_minutes('') + 0 + """ + + minutes = parse_duration_to_minutes_float(duration) + return int(minutes) if minutes.is_integer() else minutes + + +def parse_duration_to_minutes_float(duration: str) -> float: + """ + Parse duration minutes from given string. + The following code is fully compatible with the 'org-duration-to-minutes' function in org mode: + https://github.com/emacs-mirror/emacs/blob/master/lisp/org/org-duration.el + + >>> parse_duration_to_minutes_float('3:12') + 192.0 + >>> parse_duration_to_minutes_float('1:23:45') + 83.75 + >>> parse_duration_to_minutes_float('1y 3d 3h 4min') + 530464.0 + >>> parse_duration_to_minutes_float('1d3h5min') + 1625.0 + >>> parse_duration_to_minutes_float('3d 13:35') + 5135.0 + >>> parse_duration_to_minutes_float('2.35h') + 141.0 + >>> parse_duration_to_minutes_float('10') + 10.0 + >>> parse_duration_to_minutes_float('10.') + 10.0 + >>> parse_duration_to_minutes_float('1 h') + 60.0 + >>> parse_duration_to_minutes_float('') + 0.0 + """ + + match: Optional[object] + if duration == "": + return 0.0 + if isinstance(duration, float): + return float(duration) + if RE_ORG_DURATION_H_MM.fullmatch(duration): + hours, minutes, *seconds_ = map(float, duration.split(":")) + seconds = seconds_[0] if seconds_ else 0 + return seconds / 60.0 + minutes + 60 * hours + if RE_ORG_DURATION_FULL.fullmatch(duration): + minutes = 0 + for match in RE_ORG_DURATION_UNIT.finditer(duration): + value = float(match.group(1)) + unit = match.group(2) + minutes += value * ORG_DURATION_UNITS[unit] + return float(minutes) + match = RE_ORG_DURATION_MIXED.fullmatch(duration) + if match: + units_part = match.groupdict()["A"] + hms_part = match.groupdict()["B"] + return parse_duration_to_minutes_float(units_part) + parse_duration_to_minutes_float(hms_part) + if RE_FLOAT.fullmatch(duration): + return float(duration) + raise ValueError(f"Invalid duration format {duration}") + + +# Conversion factor to minutes for a duration. +ORG_DURATION_UNITS = { + "min": 1, + "h": 60, + "d": 60 * 24, + "w": 60 * 24 * 7, + "m": 60 * 24 * 30, + "y": 60 * 24 * 365.25, +} +# Regexp matching for all units. +ORG_DURATION_UNITS_RE = r"({})".format(r"|".join(ORG_DURATION_UNITS.keys())) +# Regexp matching a duration expressed with H:MM or H:MM:SS format. +# Hours can use any number of digits. +ORG_DURATION_H_MM_RE = r"[ \t]*[0-9]+(?::[0-9]{2}){1,2}[ \t]*" +RE_ORG_DURATION_H_MM = re.compile(ORG_DURATION_H_MM_RE) +# Regexp matching a duration with an unit. +# Allowed units are defined in ORG_DURATION_UNITS. +# Match group 1 contains the bare number. +# Match group 2 contains the unit. +ORG_DURATION_UNIT_RE = r"([0-9]+(?:[.][0-9]*)?)[ \t]*" + ORG_DURATION_UNITS_RE +RE_ORG_DURATION_UNIT = re.compile(ORG_DURATION_UNIT_RE) +# Regexp matching a duration expressed with units. +# Allowed units are defined in ORG_DURATION_UNITS. +ORG_DURATION_FULL_RE = rf"(?:[ \t]*{ORG_DURATION_UNIT_RE})+[ \t]*" +RE_ORG_DURATION_FULL = re.compile(ORG_DURATION_FULL_RE) +# Regexp matching a duration expressed with units and H:MM or H:MM:SS format. +# Allowed units are defined in ORG_DURATION_UNITS. +# Match group A contains units part. +# Match group B contains H:MM or H:MM:SS part. +ORG_DURATION_MIXED_RE = rf"(?P([ \t]*{ORG_DURATION_UNIT_RE})+)[ \t]*(?P[0-9]+(?::[0-9][0-9]){{1,2}})[ \t]*" +RE_ORG_DURATION_MIXED = re.compile(ORG_DURATION_MIXED_RE) +# Regexp matching float numbers. +RE_FLOAT = re.compile(r"[0-9]+([.][0-9]*)?") + + +class LineItem: + def render(self) -> str: + raise NotImplementedError + + +class TextLine(LineItem): + def __init__(self, raw: str) -> None: + self._raw = raw + + def render(self) -> str: + return self._raw + + +class HeadingLine(LineItem): + def __init__( + self, + raw: str, + level: int, + todo: Optional[str], + priority: Optional[str], + heading: str, + tags: Sequence[str], + ) -> None: + self._raw = raw + self.level = level + self.todo = todo + self.priority = priority + self.heading = heading + self.tags = list(tags) + self._dirty = False + + @classmethod + def from_line(cls, line: str, todo_candidates: list[str]) -> HeadingLine: + heading_level = parse_heading_level(line) + if heading_level is None: + raise ValueError(f"Invalid heading line: {line!r}") + (heading, level) = heading_level + (heading, tags) = parse_heading_tags(heading) + (heading, todo) = parse_heading_todos(heading, todo_candidates) + (heading, priority) = parse_heading_priority(heading) + return cls( + raw=line, + level=level, + todo=todo, + priority=priority, + heading=heading, + tags=tags, + ) + + def mark_dirty(self) -> None: + self._dirty = True + + def render(self) -> str: + if not self._dirty: + return self._raw + + stars = "*" * self.level + tokens: list[str] = [] + if self.todo: + tokens.append(self.todo) + if self.priority: + tokens.append(f"[#{self.priority}]") + if self.heading: + tokens.append(self.heading) + + if not tokens and not self.tags: + rendered = f"{stars} " + self._raw = rendered + self._dirty = False + return rendered + + rendered = f"{stars} " + if tokens: + rendered += " ".join(tokens) + if self.tags: + if tokens: + rendered += " " + rendered += ":" + ":".join(self.tags) + ":" + + self._raw = rendered + self._dirty = False + return rendered + + +class SdcEntry(LineItem): + def __init__(self, label: str, date: OrgDate, raw: str) -> None: + self.label = label + self.date = date + self._raw = raw + self._dirty = False + + def update(self, date: OrgDate) -> None: + self.date = date + self._dirty = True + + def render(self) -> str: + if not self._dirty: + return self._raw + return f"{self.label}: {self.date}" + + +class SdcLine(LineItem): + _label_re = re.compile(r"(SCHEDULED|DEADLINE|CLOSED):\s+") + + def __init__(self, raw: str, parts: list[LineItem | str], entries: dict[str, SdcEntry]) -> None: + self._raw = raw + self._parts = parts + self._entries = entries + self._dirty = False + + @classmethod + def from_line(cls, line: str) -> SdcLine | None: + if line.lstrip().startswith("#"): + return None + parts: list[LineItem | str] = [] + entries: dict[str, SdcEntry] = {} + pos = 0 + for match in cls._label_re.finditer(line): + ts_match = TIMESTAMP_RE.match(line[match.end() :]) + if not ts_match: + continue + entry_start = match.start() + entry_end = match.end() + ts_match.end() + if entry_start > pos: + parts.append(line[pos:entry_start]) + label = match.group(1) + entry_text = line[entry_start:entry_end] + if label == "SCHEDULED": + date = OrgDateScheduled.from_str(entry_text) + elif label == "DEADLINE": + date = OrgDateDeadline.from_str(entry_text) + else: + date = OrgDateClosed.from_str(entry_text) + entry = SdcEntry(label, date, entry_text) + entries[label] = entry + parts.append(entry) + pos = entry_end + if not entries: + return None + if pos < len(line): + parts.append(line[pos:]) + return cls(line, parts, entries) + + @classmethod + def from_entries(cls, entries: dict[str, OrgDate]) -> SdcLine: + order = ["SCHEDULED", "DEADLINE", "CLOSED"] + parts: list[LineItem | str] = [] + raw_parts: list[str] = [] + entry_map: dict[str, SdcEntry] = {} + for label in order: + date = entries.get(label) + if date is None or not date: + continue + entry = SdcEntry(label, date, f"{label}: {date}") + entry_map[label] = entry + if raw_parts: + raw_parts.append(" ") + parts.append(" ") + raw_parts.append(entry.render()) + parts.append(entry) + raw = "".join(raw_parts) + return cls(raw, parts, entry_map) + + def update_entry(self, label: str, date: OrgDate | None) -> None: + if date is None or not date: + entry = self._entries.pop(label, None) + if entry is not None: + self._parts = [part for part in self._parts if part is not entry] + self._dirty = True + return + entry = self._entries.get(label) + if entry is None: + new_entry = SdcEntry(label, date, f"{label}: {date}") + if self._parts: + self._parts.append(" ") + self._parts.append(new_entry) + self._entries[label] = new_entry + else: + entry.update(date) + self._dirty = True + + def is_empty(self) -> bool: + return not self._entries + + def render(self) -> str: + if not self._dirty: + return self._raw + rendered_parts: list[str] = [] + for part in self._parts: + if isinstance(part, LineItem): + rendered = part.render() + if rendered: + rendered_parts.append(rendered) + else: + rendered_parts.append(part) + rendered = "".join(rendered_parts) + self._raw = rendered + self._dirty = False + return rendered + + +class ClockLine(LineItem): + _label_re = re.compile(r"^(?!#)(?P\s*CLOCK:\s+)") + + def __init__(self, raw: str, prefix: str, date: OrgDateClock) -> None: + self._raw = raw + self._prefix = prefix + self.date = date + self._dirty = False + + @classmethod + def _timestamp_span(cls, line: str, start: int) -> tuple[int, int] | None: + match = TIMESTAMP_RE.search(line, start) + if not match: + return None + span_start = match.start() + span_end = match.end() + if line[span_end : span_end + 2] == "--": + match2 = TIMESTAMP_RE.match(line[span_end + 2 :]) + if match2: + span_end = span_end + 2 + match2.end() + return (span_start, span_end) + + @classmethod + def from_line(cls, line: str) -> ClockLine | None: + match = cls._label_re.match(line) + if not match: + return None + span = cls._timestamp_span(line, match.end()) + if not span: + return None + date = OrgDateClock.from_str(line) + if not date: + return None + (ts_start, _ts_end) = span + prefix = line[:ts_start] + return cls(line, prefix, date) + + def update(self, date: OrgDateClock) -> None: + self.date = date + self._dirty = True + + @classmethod + def _compute_suffix(cls, date: OrgDateClock) -> str: + if not date.has_end(): + return "" + minutes = int(date.duration.total_seconds() // 60) + hours, mins = divmod(minutes, 60) + return f" => {hours}:{mins:02d}" + + def render(self) -> str: + if not self._dirty: + return self._raw + suffix = self._compute_suffix(self.date) + rendered = f"{self._prefix}{self.date}{suffix}" + self._raw = rendered + self._dirty = False + return rendered + + +class LogbookStartLine(LineItem): + def __init__(self, raw: str) -> None: + self._raw = raw + + def render(self) -> str: + return self._raw + + +class LogbookEndLine(LineItem): + def __init__(self, raw: str) -> None: + self._raw = raw + + def render(self) -> str: + return self._raw + + +class RepeatTaskLine(LineItem): + def __init__( + self, + raw: str, + repeat: OrgDateRepeatedTask, + indent: str, + comment: str | None, + ) -> None: + self._raw = raw + self.repeat = repeat + self.indent = indent + self.comment = comment + self._dirty = False + + @classmethod + def match(cls, line: str) -> re.Match[str] | None: + if line.lstrip().startswith("#"): + return None + return RE_REPEAT_TASK_LINE.match(line) + + @classmethod + def from_line(cls, line: str, comment: str | None = None) -> RepeatTaskLine | None: + if line.lstrip().startswith("#"): + return None + match = RE_REPEAT_TASK_LINE.match(line) + if not match: + return None + inline_comment = match.group("comment") + if comment is None and inline_comment is not None: + inline_comment = inline_comment.lstrip() + comment = inline_comment + date = OrgDate.from_str(match.group("date")) + repeat = OrgDateRepeatedTask(date.start, match.group("todo"), match.group("done"), comment=comment) + return cls( + raw=line, + repeat=repeat, + indent=match.group("indent"), + comment=comment, + ) + + @classmethod + def from_repeat(cls, repeat: OrgDateRepeatedTask, indent: str) -> RepeatTaskLine: + date_str = str(repeat) + raw = f"{indent}- State \"{repeat.after}\" from \"{repeat.before}\" {date_str}" + if repeat.comment is not None: + raw = f"{raw} \\\\" # comment lines are handled separately + if repeat.comment and "\n" not in repeat.comment: + raw = f"{raw} {repeat.comment}" + return cls(raw=raw, repeat=repeat, indent=indent, comment=repeat.comment) + + def update_repeat(self, repeat: OrgDateRepeatedTask) -> None: + self.repeat = repeat + self.comment = repeat.comment + self._dirty = True + + def render(self) -> str: + if not self._dirty: + return self._raw + date_str = str(self.repeat) + rendered = f"{self.indent}- State \"{self.repeat.after}\" from \"{self.repeat.before}\" {date_str}" + if self.comment is not None: + rendered = f"{rendered} \\\\" # comment lines are handled separately + if self.comment and "\n" not in self.comment: + rendered = f"{rendered} {self.comment}" + self._raw = rendered + self._dirty = False + return rendered + + +class PropertyDrawerStartLine(LineItem): + def __init__(self, raw: str) -> None: + self._raw = raw + + def render(self) -> str: + return self._raw + + +class PropertyDrawerEndLine(LineItem): + def __init__(self, raw: str) -> None: + self._raw = raw + + def render(self) -> str: + return self._raw + + +class PropertyEntryLine(LineItem): + def __init__( + self, + raw: str, + key: str, + value: PropertyValue, + render_value: str, + prefix: str, + ) -> None: + self._raw = raw + self.key = key + self.value = value + self._render_value = render_value + self._prefix = prefix + self._dirty = False + + @classmethod + def from_line(cls, line: str) -> PropertyEntryLine | None: + match = RE_PROP_LINE.match(line) + if not match: + return None + (key, value) = parse_property(line) + if key is None or value is None: + return None + return cls( + raw=line, + key=key, + value=value, + render_value=match.group("value"), + prefix=match.group("prefix"), + ) + + def update_value(self, value: PropertyValue, render_value: str) -> None: + self.value = value + self._render_value = render_value + self._dirty = True + + def render(self) -> str: + if not self._dirty: + return self._raw + if self._render_value == "": + rendered = f"{self._prefix}:{self.key}:" + else: + rendered = f"{self._prefix}:{self.key}: {self._render_value}" + self._raw = rendered + self._dirty = False + return rendered diff --git a/src/orgparse/node.py b/src/orgparse/node.py index 5794b43..4ba8994 100644 --- a/src/orgparse/node.py +++ b/src/orgparse/node.py @@ -3,24 +3,26 @@ import itertools import re from collections.abc import Iterable, Iterator, Sequence -from typing import ( - Any, - Optional, - Union, - cast, -) +from typing import Any, Optional, TypeVar, cast -from .date import ( - OrgDate, - OrgDateClock, - OrgDateClosed, - OrgDateDeadline, - OrgDateRepeatedTask, - OrgDateScheduled, - parse_sdc, -) +from .date import OrgDate, OrgDateClock, OrgDateClosed, OrgDateDeadline, OrgDateRepeatedTask, OrgDateScheduled from .extra import Rich, to_rich_text from .inline import to_plain_text +from .lines import ( + ClockLine, + HeadingLine, + LineItem, + LogbookEndLine, + LogbookStartLine, + PropertyDrawerEndLine, + PropertyDrawerStartLine, + PropertyEntryLine, + PropertyValue, + RepeatTaskLine, + SdcLine, + TextLine, + parse_duration_to_minutes, +) def lines_to_chunks(lines: Iterable[str]) -> Iterable[list[str]]: @@ -36,251 +38,38 @@ def lines_to_chunks(lines: Iterable[str]) -> Iterable[list[str]]: RE_NODE_HEADER = re.compile(r"^\*+ ") -def parse_heading_level(heading: str) -> tuple[str, int] | None: - """ - Get star-stripped heading and its level - - >>> parse_heading_level('* Heading') - ('Heading', 1) - >>> parse_heading_level('******** Heading') - ('Heading', 8) - >>> parse_heading_level('*') # None since no space after star - >>> parse_heading_level('*bold*') # None - >>> parse_heading_level('not heading') # None - - """ - m = RE_HEADING_STARS.search(heading) - if m is not None: - return (m.group(2), len(m.group(1))) - return None - - -RE_HEADING_STARS = re.compile(r'^(\*+)\s+(.*?)\s*$') - - -def parse_heading_tags(heading: str) -> tuple[str, list[str]]: - """ - Get first tags and heading without tags - - >>> parse_heading_tags('HEADING') - ('HEADING', []) - >>> parse_heading_tags('HEADING :TAG1:TAG2:') - ('HEADING', ['TAG1', 'TAG2']) - >>> parse_heading_tags('HEADING: this is still heading :TAG1:TAG2:') - ('HEADING: this is still heading', ['TAG1', 'TAG2']) - >>> parse_heading_tags('HEADING :@tag:_tag_:') - ('HEADING', ['@tag', '_tag_']) +TOrgDate = TypeVar("TOrgDate", bound=OrgDate) - Here is the spec of tags from Org Mode manual: - Tags are normal words containing letters, numbers, ``_``, and - ``@``. Tags must be preceded and followed by a single colon, - e.g., ``:work:``. - - -- (info "(org) Tags") - - """ - match = RE_HEADING_TAGS.search(heading) - if match: - heading = match.group(1) - tagstr = match.group(2) - tags = tagstr.split(':') - else: - tags = [] - return (heading, tags) - - -# Tags are normal words containing letters, numbers, '_', and '@'. https://orgmode.org/manual/Tags.html -RE_HEADING_TAGS = re.compile(r'(.*?)\s*:([\w@:]+):\s*$') - - -def parse_heading_todos(heading: str, todo_candidates: list[str]) -> tuple[str, Optional[str]]: - """ - Get TODO keyword and heading without TODO keyword. - - >>> todos = ['TODO', 'DONE'] - >>> parse_heading_todos('Normal heading', todos) - ('Normal heading', None) - >>> parse_heading_todos('TODO Heading', todos) - ('Heading', 'TODO') - - """ - for todo in todo_candidates: - if heading == todo: - return ('', todo) - if heading.startswith(todo + ' '): - return (heading[len(todo) + 1 :], todo) - return (heading, None) - - -def parse_heading_priority(heading): - """ - Get priority and heading without priority field. - - >>> parse_heading_priority('HEADING') - ('HEADING', None) - >>> parse_heading_priority('[#A] HEADING') - ('HEADING', 'A') - >>> parse_heading_priority('[#0] HEADING') - ('HEADING', '0') - >>> parse_heading_priority('[#A]') - ('', 'A') - - """ - match = RE_HEADING_PRIORITY.search(heading) - if match: - return (match.group(2), match.group(1)) - else: - return (heading, None) - - -RE_HEADING_PRIORITY = re.compile(r'^\s*\[#([A-Z0-9])\] ?(.*)$') - -PropertyValue = Union[str, int, float] - - -def parse_property(line: str) -> tuple[Optional[str], Optional[PropertyValue]]: - """ - Get property from given string. - - >>> parse_property(':Some_property: some value') - ('Some_property', 'some value') - >>> parse_property(':Effort: 1:10') - ('Effort', 70) - - """ - prop_key = None - prop_val: Optional[Union[str, int, float]] = None - match = RE_PROP.search(line) - if match: - prop_key = match.group(1) - prop_val = match.group(2) - if prop_key == 'Effort': - prop_val = parse_duration_to_minutes(prop_val) - return (prop_key, prop_val) - - -RE_PROP = re.compile(r'^\s*:(.*?):\s*(.*?)\s*$') - - -def parse_duration_to_minutes(duration: str) -> Union[float, int]: - """ - Parse duration minutes from given string. - Convert to integer if number has no decimal points - - >>> parse_duration_to_minutes('3:12') - 192 - >>> parse_duration_to_minutes('1:23:45') - 83.75 - >>> parse_duration_to_minutes('1y 3d 3h 4min') - 530464 - >>> parse_duration_to_minutes('1d3h5min') - 1625 - >>> parse_duration_to_minutes('3d 13:35') - 5135 - >>> parse_duration_to_minutes('2.35h') - 141 - >>> parse_duration_to_minutes('10') - 10 - >>> parse_duration_to_minutes('10.') - 10 - >>> parse_duration_to_minutes('1 h') - 60 - >>> parse_duration_to_minutes('') - 0 - """ - - minutes = parse_duration_to_minutes_float(duration) - return int(minutes) if minutes.is_integer() else minutes +class LogbookDrawer: + def __init__( + self, + start_line: LogbookStartLine, + end_line: LogbookEndLine, + entries: list[RepeatTaskLine], + indent: str, + *, + generated: bool, + ) -> None: + self.start_line = start_line + self.end_line = end_line + self.entries = entries + self.indent = indent + self.generated = generated -def parse_duration_to_minutes_float(duration: str) -> float: - """ - Parse duration minutes from given string. - The following code is fully compatible with the 'org-duration-to-minutes' function in org mode: - https://github.com/emacs-mirror/emacs/blob/master/lisp/org/org-duration.el - - >>> parse_duration_to_minutes_float('3:12') - 192.0 - >>> parse_duration_to_minutes_float('1:23:45') - 83.75 - >>> parse_duration_to_minutes_float('1y 3d 3h 4min') - 530464.0 - >>> parse_duration_to_minutes_float('1d3h5min') - 1625.0 - >>> parse_duration_to_minutes_float('3d 13:35') - 5135.0 - >>> parse_duration_to_minutes_float('2.35h') - 141.0 - >>> parse_duration_to_minutes_float('10') - 10.0 - >>> parse_duration_to_minutes_float('10.') - 10.0 - >>> parse_duration_to_minutes_float('1 h') - 60.0 - >>> parse_duration_to_minutes_float('') - 0.0 - """ - - match: Optional[Any] - if duration == "": - return 0.0 - if isinstance(duration, float): - return float(duration) - if RE_ORG_DURATION_H_MM.fullmatch(duration): - hours, minutes, *seconds_ = map(float, duration.split(":")) - seconds = seconds_[0] if seconds_ else 0 - return seconds / 60.0 + minutes + 60 * hours - if RE_ORG_DURATION_FULL.fullmatch(duration): - minutes = 0 - for match in RE_ORG_DURATION_UNIT.finditer(duration): - value = float(match.group(1)) - unit = match.group(2) - minutes += value * ORG_DURATION_UNITS[unit] - return float(minutes) - match = RE_ORG_DURATION_MIXED.fullmatch(duration) - if match: - units_part = match.groupdict()['A'] - hms_part = match.groupdict()['B'] - return parse_duration_to_minutes_float(units_part) + parse_duration_to_minutes_float(hms_part) - if RE_FLOAT.fullmatch(duration): - return float(duration) - raise ValueError(f"Invalid duration format {duration}") - - -# Conversion factor to minutes for a duration. -ORG_DURATION_UNITS = { - "min": 1, - "h": 60, - "d": 60 * 24, - "w": 60 * 24 * 7, - "m": 60 * 24 * 30, - "y": 60 * 24 * 365.25, -} -# Regexp matching for all units. -ORG_DURATION_UNITS_RE = r'({})'.format(r'|'.join(ORG_DURATION_UNITS.keys())) -# Regexp matching a duration expressed with H:MM or H:MM:SS format. -# Hours can use any number of digits. -ORG_DURATION_H_MM_RE = r'[ \t]*[0-9]+(?::[0-9]{2}){1,2}[ \t]*' -RE_ORG_DURATION_H_MM = re.compile(ORG_DURATION_H_MM_RE) -# Regexp matching a duration with an unit. -# Allowed units are defined in ORG_DURATION_UNITS. -# Match group 1 contains the bare number. -# Match group 2 contains the unit. -ORG_DURATION_UNIT_RE = r'([0-9]+(?:[.][0-9]*)?)[ \t]*' + ORG_DURATION_UNITS_RE -RE_ORG_DURATION_UNIT = re.compile(ORG_DURATION_UNIT_RE) -# Regexp matching a duration expressed with units. -# Allowed units are defined in ORG_DURATION_UNITS. -ORG_DURATION_FULL_RE = rf'(?:[ \t]*{ORG_DURATION_UNIT_RE})+[ \t]*' -RE_ORG_DURATION_FULL = re.compile(ORG_DURATION_FULL_RE) -# Regexp matching a duration expressed with units and H:MM or H:MM:SS format. -# Allowed units are defined in ORG_DURATION_UNITS. -# Match group A contains units part. -# Match group B contains H:MM or H:MM:SS part. -ORG_DURATION_MIXED_RE = rf'(?P([ \t]*{ORG_DURATION_UNIT_RE})+)[ \t]*(?P[0-9]+(?::[0-9][0-9]){{1,2}})[ \t]*' -RE_ORG_DURATION_MIXED = re.compile(ORG_DURATION_MIXED_RE) -# Regexp matching float numbers. -RE_FLOAT = re.compile(r'[0-9]+([.][0-9]*)?') +class PropertyDrawer: + def __init__( + self, + start_line: PropertyDrawerStartLine, + end_line: PropertyDrawerEndLine, + entries: list[PropertyEntryLine], + indent: str, + ) -> None: + self.start_line = start_line + self.end_line = end_line + self.entries = entries + self.indent = indent # -> Optional[Tuple[str, Sequence[str]]]: # todo wtf?? it says 'ABCMeta isn't subscriptable??' @@ -515,9 +304,8 @@ def __init__(self, env: OrgEnv, index: int | None = None) -> None: self.linenumber = cast(int, None) # set in parse_lines # content - self._lines: list[str] = [] - - self._properties: dict[str, PropertyValue] = {} + self._line_items: list[LineItem] = [] + self._property_drawer: PropertyDrawer | None = None self._timestamps: list[OrgDate] = [] # FIXME: use `index` argument to set index. (Currently it is @@ -757,6 +545,99 @@ def children(self): """ return list(self._find_children()) + @children.setter + def children(self, value: Iterable[OrgNode]) -> None: + new_children = list(value) + if len(set(new_children)) != len(new_children): + raise ValueError("Duplicate children are not allowed") + for child in new_children: + if not isinstance(child, OrgNode): + raise TypeError(f"Child must be OrgNode, got {type(child)}") + if child.env is not self.env: + raise ValueError("Child must belong to the same OrgEnv") + if self._node_contains(child, self): + raise ValueError("Cannot reparent an ancestor under its descendant") + for child in new_children: + for other in new_children: + if child is other: + continue + if self._node_contains(child, other): + raise ValueError("Cannot reparent a node alongside its descendant") + + subtree_map: dict[OrgNode, list[OrgBaseNode]] = {} + for child in new_children: + subtree_map[child] = self._collect_subtree_nodes(child) + + for start, end in reversed(self._direct_children_ranges()): + del self.env._nodes[start:end] + + for child in new_children: + self._remove_subtree_if_present(child) + + for child in new_children: + desired_level = self.level + 1 + delta = desired_level - child.level + if delta: + for node in subtree_map[child]: + if isinstance(node, OrgNode): + node._shift_level(delta) + + insert_at = self.env._nodes.index(self) + 1 + for child in new_children: + nodes = subtree_map[child] + self.env._nodes[insert_at:insert_at] = nodes + insert_at += len(nodes) + + for index, node in enumerate(self.env._nodes): + node._index = index + + def _subtree_end_index(self, start: int, level: int) -> int: + end = start + 1 + while end < len(self.env._nodes) and self.env._nodes[end].level > level: + end += 1 + return end + + def _collect_subtree_nodes(self, node: OrgBaseNode) -> list[OrgBaseNode]: + try: + start = self.env._nodes.index(node) + except ValueError as exc: + raise ValueError("Child must belong to the current OrgEnv node list") from exc + end = self._subtree_end_index(start, node.level) + return self.env._nodes[start:end] + + def _direct_children_ranges(self) -> list[tuple[int, int]]: + ranges: list[tuple[int, int]] = [] + index = self._index + 1 + while index < len(self.env._nodes) and self.env._nodes[index].level > self.level: + node = self.env._nodes[index] + if node.level == self.level + 1: + end = self._subtree_end_index(index, node.level) + ranges.append((index, end)) + index = end + else: + index += 1 + return ranges + + def _remove_subtree_if_present(self, node: OrgBaseNode) -> None: + try: + start = self.env._nodes.index(node) + except ValueError: + return + end = self._subtree_end_index(start, node.level) + del self.env._nodes[start:end] + + def _node_contains(self, ancestor: OrgBaseNode, node: OrgBaseNode) -> bool: + try: + start = self.env._nodes.index(ancestor) + except ValueError: + return False + end = self._subtree_end_index(start, ancestor.level) + try: + target_index = self.env._nodes.index(node) + except ValueError: + return False + return start < target_index < end + @property def root(self): """ @@ -792,7 +673,64 @@ def properties(self) -> dict[str, PropertyValue]: 'value' """ - return self._properties + drawer = self._property_drawer + if drawer is None: + return {} + props: dict[str, PropertyValue] = {} + for entry in drawer.entries: + props[entry.key] = entry.value + return props + + @properties.setter + def properties(self, value: dict[str, PropertyValue] | None) -> None: + new_props = {} if value is None else dict(value) + normalized: dict[str, PropertyValue] = {} + render_values: dict[str, str] = {} + for key, prop_value in new_props.items(): + (norm_value, render_value) = self._normalize_property_value(key, prop_value) + normalized[key] = norm_value + render_values[key] = render_value + if not normalized: + if self._property_drawer is not None: + start_index = self._line_items.index(self._property_drawer.start_line) + end_index = self._line_items.index(self._property_drawer.end_line) + for index in range(end_index, start_index - 1, -1): + self._remove_line_item(index) + self._property_drawer = None + return + + drawer = self._property_drawer + if drawer is None: + drawer = self._create_property_drawer() + + entries_by_key: dict[str, list[PropertyEntryLine]] = {} + for entry in drawer.entries: + entries_by_key.setdefault(entry.key, []).append(entry) + + keys_to_remove = {entry.key for entry in drawer.entries if entry.key not in normalized} + if keys_to_remove: + for entry in list(drawer.entries): + if entry.key in keys_to_remove: + index = self._line_items.index(entry) + self._remove_line_item(index) + drawer.entries.remove(entry) + + for key, prop_value in normalized.items(): + render_value = render_values[key] + entries = entries_by_key.get(key, []) + if entries: + entries[-1].update_value(prop_value, render_value) + else: + insert_index = self._line_items.index(drawer.end_line) + entry = PropertyEntryLine( + raw=f"{drawer.indent}:{key}: {render_value}" if render_value else f"{drawer.indent}:{key}:", + key=key, + value=prop_value, + render_value=render_value, + prefix=drawer.indent, + ) + self._insert_line_item(insert_index, entry) + drawer.entries.append(entry) def get_property(self, key, val=None) -> Optional[PropertyValue]: """ @@ -805,21 +743,21 @@ def get_property(self, key, val=None) -> Optional[PropertyValue]: Default value to return. """ - return self._properties.get(key, val) + return self.properties.get(key, val) # parser @classmethod def from_chunk(cls, env, lines): self = cls(env) - self._lines = lines + self._line_items = [TextLine(line) for line in lines] self._parse_comments() return self def _parse_comments(self): special_comments: dict[str, list[str]] = {} - for line in self._lines: - parsed = parse_comment(line) + for line_item in self._line_items: + parsed = parse_comment(line_item.render()) if parsed: (key, vals) = parsed key = key.upper() # case insensitive, so keep as uppercase @@ -830,23 +768,58 @@ def _parse_comments(self): for val in special_comments.get(todokey, []): self.env.add_todo_keys(*parse_seq_todo(val)) - def _iparse_properties(self, ilines: Iterator[str]) -> Iterator[str]: - self._properties = {} - in_property_field = False - for line in ilines: - if in_property_field: - if line.find(":END:") >= 0: - break - else: - (key, val) = parse_property(line) - if key is not None and val is not None: - self._properties.update({key: val}) - elif line.find(":PROPERTIES:") >= 0: - in_property_field = True - else: - yield line - for line in ilines: - yield line + def _normalize_property_value(self, key: str, value: PropertyValue) -> tuple[PropertyValue, str]: + if key == "Effort" and isinstance(value, str): + return (parse_duration_to_minutes(value), value) + return (value, str(value)) + + def _create_property_drawer(self) -> PropertyDrawer: + insert_at = self._property_drawer_insert_index() + indent = self._property_drawer_indent(insert_at) + start_line = PropertyDrawerStartLine(f"{indent}:PROPERTIES:") + end_line = PropertyDrawerEndLine(f"{indent}:END:") + self._insert_line_item(insert_at, start_line) + self._insert_line_item(insert_at + 1, end_line) + drawer = PropertyDrawer(start_line, end_line, [], indent) + self._property_drawer = drawer + return drawer + + def _property_drawer_insert_index(self) -> int: + return 0 + + def _property_drawer_indent(self, insert_at: int) -> str: + if insert_at < len(self._line_items): + line = self._line_items[insert_at].render() + return line[: len(line) - len(line.lstrip(" "))] + return "" + + def _sync_property_drawer_from_lines(self) -> None: + self._property_drawer = None + index = 0 + while index < len(self._line_items): + line_item = self._line_items[index] + line = line_item.render() + if isinstance(line_item, PropertyDrawerStartLine) or line.strip() == ":PROPERTIES:": + start_line = PropertyDrawerStartLine(line) + self._update_line_item(index, start_line) + indent = line[: len(line) - len(line.lstrip(" "))] + entries: list[PropertyEntryLine] = [] + end_index = index + 1 + while end_index < len(self._line_items): + end_line_item = self._line_items[end_index] + end_line = end_line_item.render() + if isinstance(end_line_item, PropertyDrawerEndLine) or end_line.strip() == ":END:": + end_line_item = PropertyDrawerEndLine(end_line) + self._update_line_item(end_index, end_line_item) + self._property_drawer = PropertyDrawer(start_line, end_line_item, entries, indent) + return + entry = PropertyEntryLine.from_line(end_line) + if entry is not None: + self._update_line_item(end_index, entry) + entries.append(entry) + end_index += 1 + return + index += 1 # misc @@ -916,18 +889,57 @@ def get_body(self, format: str = 'plain') -> str: # noqa: A002 See also: :meth:`get_heading`. """ - return self._get_text('\n'.join(self._body_lines), format) if self._lines else '' + return self._get_text("\n".join(self._body_lines), format) if self._body_lines else "" @property def body(self) -> str: """Alias of ``.get_body(format='plain')``.""" return self.get_body() + @body.setter + def body(self, value: str) -> None: + new_lines = value.splitlines() + self._replace_body_lines(new_lines) + @property def body_rich(self) -> Iterator[Rich]: r = self.get_body(format='rich') return cast(Iterator[Rich], r) # meh.. + def _replace_body_lines(self, new_lines: list[str]) -> None: + body_indices = self._body_line_indices() + if body_indices: + insert_at = body_indices[0] + for index in reversed(body_indices): + self._remove_line_item(index) + else: + insert_at = self._body_insert_index() + for offset, line in enumerate(new_lines): + self._insert_line_item(insert_at + offset, TextLine(line)) + self._body_lines = list(new_lines) + self._refresh_timestamps_after_body_change() + + def _body_line_indices(self) -> list[int]: + return [index for index, item in enumerate(self._line_items) if self._is_body_line_item(index, item)] + + def _is_body_line_item(self, index: int, item: LineItem) -> bool: # noqa: ARG002 + return not isinstance( + item, + ( + PropertyDrawerStartLine, + PropertyDrawerEndLine, + PropertyEntryLine, + ), + ) + + def _body_insert_index(self) -> int: + return len(self._line_items) + + def _refresh_timestamps_after_body_change(self) -> None: + self._timestamps = [] + for line in self._body_lines: + self._timestamps.extend(OrgDate.list_from_str(line)) + @property def heading(self) -> str: raise NotImplementedError @@ -1055,7 +1067,19 @@ def rangelist(self): return self.get_timestamps(active=True, inactive=True, range=True) def __str__(self) -> str: - return "\n".join(self._lines) + return "\n".join(self._render_lines()) + + def _render_lines(self) -> list[str]: + return [line.render() for line in self._line_items] + + def _update_line_item(self, index: int, item: LineItem) -> None: + self._line_items[index] = item + + def _insert_line_item(self, index: int, item: LineItem) -> None: + self._line_items.insert(index, item) + + def _remove_line_item(self, index: int) -> None: + del self._line_items[index] # todo hmm, not sure if it really belongs here and not to OrgRootNode? def get_file_property_list(self, property: str): # noqa: A002 @@ -1108,16 +1132,16 @@ def is_root(self) -> bool: def _parse_pre(self): """Call parsers which must be called before tree structuring""" - ilines: Iterator[str] = iter(self._lines) - ilines = self._iparse_properties(ilines) - ilines = self._iparse_timestamps(ilines) - self._body_lines = list(ilines) - - def _iparse_timestamps(self, ilines: Iterator[str]) -> Iterator[str]: + self._sync_property_drawer_from_lines() + self._body_lines = [ + item.render() for index, item in enumerate(self._line_items) if self._is_body_line_item(index, item) + ] self._timestamps = [] - for line in ilines: + for line in self._body_lines: self._timestamps.extend(OrgDate.list_from_str(line)) - yield line + + def _property_drawer_indent(self, insert_at: int) -> str: # noqa: ARG002 + return "" class OrgNode(OrgBaseNode): @@ -1131,109 +1155,262 @@ class OrgNode(OrgBaseNode): def __init__(self, *args, **kwds) -> None: super().__init__(*args, **kwds) # fixme instead of casts, should organize code in such a way that they aren't necessary - self._heading = cast(str, None) self._level: int | None = None - self._tags = cast(list[str], None) - self._todo: Optional[str] = None - self._priority = None - self._scheduled = OrgDateScheduled(None) - self._deadline = OrgDateDeadline(None) - self._closed = OrgDateClosed(None) - self._clocklist: list[OrgDateClock] = [] + self._heading_line: HeadingLine | None = None + self._sdc_line: SdcLine | None = None + self._clock_lines: list[ClockLine] = [] self._body_lines: list[str] = [] self._repeated_tasks: list[OrgDateRepeatedTask] = [] + self._logbook_drawers: list[LogbookDrawer] = [] # parser def _parse_pre(self): """Call parsers which must be called before tree structuring""" self._parse_heading() - # FIXME: make the following parsers "lazy" - ilines: Iterator[str] = iter(self._lines) - try: - next(ilines) # skip heading - except StopIteration: - return - ilines = self._iparse_sdc(ilines) - ilines = self._iparse_clock(ilines) - ilines = self._iparse_properties(ilines) - ilines = self._iparse_repeated_tasks(ilines) - ilines = self._iparse_timestamps(ilines) - self._body_lines = list(ilines) + self._sync_sdc_line_from_items() + self._sync_clock_lines_from_items() + self._sync_property_drawer_from_lines() + self._sync_logbook_drawers_from_lines() + self._sync_repeated_tasks_cache() + self._refresh_body_and_timestamps() def _parse_heading(self) -> None: - heading = self._lines[0] - heading_level = parse_heading_level(heading) - if heading_level is not None: - (heading, self._level) = heading_level - (heading, self._tags) = parse_heading_tags(heading) - (heading, self._todo) = parse_heading_todos(heading, self.env.all_todo_keys) - (heading, self._priority) = parse_heading_priority(heading) - self._heading = heading - - # The following ``_iparse_*`` methods are simple generator based - # parser. See ``_parse_pre`` for how it is used. The principle - # is simple: these methods get an iterator and returns an iterator. - # If the item returned by the input iterator must be dedicated to - # the parser, do not yield the item or yield it as-is otherwise. - - def _iparse_sdc(self, ilines: Iterator[str]) -> Iterator[str]: - """ - Parse SCHEDULED, DEADLINE and CLOSED time tamps. - - They are assumed be in the first line. - - """ - try: - line = next(ilines) - except StopIteration: + if not self._line_items: + raise ValueError("OrgNode has no lines to parse heading") + heading_line = HeadingLine.from_line(self._line_items[0].render(), self.env.all_todo_keys) + self._heading_line = heading_line + self._level = heading_line.level + self._update_line_item(0, heading_line) + + def _normalize_tags(self, tags: Iterable[str] | None) -> list[str]: + if tags is None: + return [] + if isinstance(tags, str): + return [tags] + if isinstance(tags, set): + return sorted(tags) + return list(tags) + + def _shift_level(self, delta: int) -> None: + if self._level is None: return - (self._scheduled, self._deadline, self._closed) = parse_sdc(line) + self._level += delta + if self._heading_line is not None: + self._heading_line.level = self._level + self._heading_line.mark_dirty() + self._update_line_item(0, self._heading_line) + + def _is_body_line_item(self, index: int, item: LineItem) -> bool: # noqa: ARG002 + return not isinstance( + item, + ( + HeadingLine, + SdcLine, + ClockLine, + PropertyDrawerStartLine, + PropertyDrawerEndLine, + PropertyEntryLine, + RepeatTaskLine, + ), + ) + + def _refresh_timestamps_after_body_change(self) -> None: + self._timestamps = [] + if self._heading_line is not None: + self._timestamps.extend(OrgDate.list_from_str(self._heading_line.heading)) + for line in self._body_lines: + self._timestamps.extend(OrgDate.list_from_str(line)) - if not (self._scheduled or self._deadline or self._closed): - yield line # when none of them were found + def _update_heading_line(self) -> None: + if self._heading_line is None: + return + self._heading_line.mark_dirty() + self._update_line_item(0, self._heading_line) - for line in ilines: - yield line + def _sync_sdc_line_from_items(self) -> None: + self._sdc_line = None + if len(self._line_items) < 2: + return + line_item = self._line_items[1] + sdc_line = SdcLine.from_line(line_item.render()) + if sdc_line is not None: + self._sdc_line = sdc_line + self._update_line_item(1, sdc_line) + + def _sync_clock_lines_from_items(self) -> None: + self._clock_lines = [] + for index, item in enumerate(self._line_items): + clock_line = ClockLine.from_line(item.render()) + if clock_line is None: + continue + self._clock_lines.append(clock_line) + self._update_line_item(index, clock_line) + + def _sync_repeated_tasks_cache(self) -> None: + self._repeated_tasks = [line.repeat for line in self._repeat_task_lines_in_order()] + + def _refresh_body_and_timestamps(self) -> None: + self._body_lines = [ + item.render() for index, item in enumerate(self._line_items) if self._is_body_line_item(index, item) + ] + self._timestamps = [] + if self._heading_line is not None: + self._timestamps.extend(OrgDate.list_from_str(self._heading_line.heading)) + for line in self._body_lines: + self._timestamps.extend(OrgDate.list_from_str(line)) - def _iparse_clock(self, ilines: Iterator[str]) -> Iterator[str]: - self._clocklist = [] - for line in ilines: - cl = OrgDateClock.from_str(line) - if cl: - self._clocklist.append(cl) - else: - yield line + def _coerce_sdc_date(self, value: Any, cls: type[TOrgDate]) -> TOrgDate: + if value is None: + return cls(None) + if isinstance(value, OrgDate): + return cls(value.start, value.end, active=value.is_active()) + return cls(value) - def _iparse_timestamps(self, ilines: Iterator[str]) -> Iterator[str]: - self._timestamps = [] - self._timestamps.extend(OrgDate.list_from_str(self._heading)) - for l in ilines: - self._timestamps.extend(OrgDate.list_from_str(l)) - yield l - - def _iparse_repeated_tasks(self, ilines: Iterator[str]) -> Iterator[str]: - self._repeated_tasks = [] - for line in ilines: - match = self._repeated_tasks_re.search(line) - if match: - # FIXME: move this parsing to OrgDateRepeatedTask.from_str - mdict = match.groupdict() - done_state = mdict['done'] - todo_state = mdict['todo'] - date = OrgDate.from_str(mdict['date']) - self._repeated_tasks.append(OrgDateRepeatedTask(date.start, todo_state, done_state)) - else: - yield line - - _repeated_tasks_re = re.compile( - r''' - \s*- \s+ - State \s+ "(?P [^"]+)" \s+ - from \s+ "(?P [^"]+)" \s+ - \[ (?P [^\]]+) \]''', - re.VERBOSE, - ) + def _update_sdc_entry(self, label: str, date: OrgDate) -> None: + if self._sdc_line is None: + if not date: + return + self._sdc_line = SdcLine.from_entries({label: date}) + self._insert_line_item(1, self._sdc_line) + return + self._sdc_line.update_entry(label, date) + if self._sdc_line.is_empty(): + index = self._line_items.index(self._sdc_line) + self._remove_line_item(index) + self._sdc_line = None + return + + def _format_clock_line(self, clock: OrgDateClock) -> ClockLine: + prefix = " CLOCK: " + suffix = ClockLine._compute_suffix(clock) + return ClockLine(f"{prefix}{clock}{suffix}", prefix, clock) + + def _sync_logbook_drawers_from_lines(self) -> None: + self._logbook_drawers = [] + in_logbook = False + start_line: LogbookStartLine | None = None + entries: list[RepeatTaskLine] = [] + indent = "" + index = 0 + + def parse_repeat_with_comment(start_index: int) -> tuple[RepeatTaskLine | None, int]: + line_item = self._line_items[start_index] + line = line_item.render() + match = RepeatTaskLine.match(line) + if match is None: + return (None, start_index + 1) + inline_comment = match.group("comment") + comment_marker = inline_comment is not None + comment_lines: list[str] = [] + if inline_comment is not None: + inline_comment = inline_comment.lstrip() + if inline_comment: + comment_lines.append(inline_comment) + next_index = start_index + 1 + if comment_marker: + base_indent_len = len(match.group("indent")) + while next_index < len(self._line_items): + next_line = self._line_items[next_index].render() + if next_line.strip().upper() in (":END:", ":LOGBOOK:"): + break + if RepeatTaskLine.match(next_line) is not None: + break + next_indent = re.match(r"\s*", next_line) + assert next_indent is not None + if len(next_indent.group(0)) <= base_indent_len: + break + comment_lines.append(next_line.lstrip()) + next_index += 1 + comment: str | None = None + if comment_marker: + comment = "\n".join(comment_lines) + entry = RepeatTaskLine.from_line(line, comment=comment) + return (entry, next_index if comment_marker else start_index + 1) + + while index < len(self._line_items): + line_item = self._line_items[index] + line = line_item.render() + if line.lstrip().startswith("#"): + index += 1 + continue + if line.strip().upper() == ":LOGBOOK:": + start_line = LogbookStartLine(line) + self._update_line_item(index, start_line) + in_logbook = True + entries = [] + indent = line[: len(line) - len(line.lstrip(" "))] + index += 1 + continue + if in_logbook and line.strip().upper() == ":END:": + end_line = LogbookEndLine(line) + self._update_line_item(index, end_line) + assert start_line is not None + self._logbook_drawers.append(LogbookDrawer(start_line, end_line, entries, indent, generated=False)) + in_logbook = False + index += 1 + continue + if in_logbook: + entry, next_index = parse_repeat_with_comment(index) + if entry is not None: + self._update_line_item(index, entry) + entries.append(entry) + index = next_index + continue + entry, next_index = parse_repeat_with_comment(index) + if entry is not None: + self._update_line_item(index, entry) + index = next_index + + def _repeat_task_lines_in_order(self) -> list[RepeatTaskLine]: + return [item for item in self._line_items if isinstance(item, RepeatTaskLine)] + + def _logbook_drawer_insert_index(self) -> int: + index = 1 + while index < len(self._line_items): + item = self._line_items[index] + if isinstance(item, (SdcLine, ClockLine)): + index += 1 + continue + if isinstance(item, (PropertyDrawerStartLine, PropertyDrawerEndLine, PropertyEntryLine)): + index += 1 + continue + break + return index + + def _logbook_drawer_indent(self, insert_at: int) -> str: + if insert_at > 0: + before = self._line_items[insert_at - 1].render() + indent = before[: len(before) - len(before.lstrip(" "))] + return indent or " " + return " " + + def _create_logbook_drawer(self) -> LogbookDrawer: + insert_at = self._logbook_drawer_insert_index() + indent = self._logbook_drawer_indent(insert_at) + start_line = LogbookStartLine(f"{indent}:LOGBOOK:") + end_line = LogbookEndLine(f"{indent}:END:") + self._insert_line_item(insert_at, start_line) + self._insert_line_item(insert_at + 1, end_line) + drawer = LogbookDrawer(start_line, end_line, [], indent, generated=True) + self._logbook_drawers.append(drawer) + return drawer + + def _property_drawer_insert_index(self) -> int: + index = 1 + while index < len(self._line_items): + item = self._line_items[index] + if isinstance(item, (SdcLine, ClockLine)): + index += 1 + continue + break + return index + + def _property_drawer_indent(self, insert_at: int) -> str: + if insert_at > 0: + before = self._line_items[insert_at - 1].render() + return before[: len(before) - len(before.lstrip(" "))] or " " + return " " def get_heading(self, format: str = 'plain') -> str: # noqa: A002 """ @@ -1255,13 +1432,23 @@ def get_heading(self, format: str = 'plain') -> str: # noqa: A002 '[[link][Node 1]]' """ - return self._get_text(self._heading, format) + heading = "" + if self._heading_line is not None: + heading = self._heading_line.heading + return self._get_text(heading, format) @property def heading(self) -> str: """Alias of ``.get_heading(format='plain')``.""" return self.get_heading() + @heading.setter + def heading(self, value: str) -> None: + if self._heading_line is None: + return + self._heading_line.heading = value + self._update_heading_line() + @property def level(self): """ @@ -1299,10 +1486,23 @@ def priority(self) -> str | None: True """ - return self._priority + if self._heading_line is None: + return None + return self._heading_line.priority + + @priority.setter + def priority(self, value: str | None) -> None: + if value == "": + value = None + if self._heading_line is None: + return + self._heading_line.priority = value + self._update_heading_line() def _get_tags(self, *, inher: bool = False) -> set[str]: - tags = set(self._tags) + tags = set() + if self._heading_line is not None: + tags = set(self._heading_line.tags) if inher: parent = self.get_parent() if parent: @@ -1320,7 +1520,29 @@ def todo(self) -> Optional[str]: 'TODO' """ - return self._todo + if self._heading_line is None: + return None + return self._heading_line.todo + + @todo.setter + def todo(self, value: Optional[str]) -> None: + if value == "": + value = None + if self._heading_line is None: + return + self._heading_line.todo = value + self._update_heading_line() + + @property + def tags(self) -> set[str]: + return self._get_tags(inher=True) + + @tags.setter + def tags(self, value: Iterable[str] | None) -> None: + if self._heading_line is None: + return + self._heading_line.tags = self._normalize_tags(value) + self._update_heading_line() @property def scheduled(self): @@ -1338,7 +1560,17 @@ def scheduled(self): OrgDateScheduled((2012, 2, 26)) """ - return self._scheduled + if self._sdc_line is None: + return OrgDateScheduled(None) + entry = self._sdc_line._entries.get("SCHEDULED") + if entry is None: + return OrgDateScheduled(None) + return cast(OrgDateScheduled, entry.date) + + @scheduled.setter + def scheduled(self, value: Any) -> None: + date = self._coerce_sdc_date(value, OrgDateScheduled) + self._update_sdc_entry("SCHEDULED", date) @property def deadline(self): @@ -1356,7 +1588,17 @@ def deadline(self): OrgDateDeadline((2012, 2, 26)) """ - return self._deadline + if self._sdc_line is None: + return OrgDateDeadline(None) + entry = self._sdc_line._entries.get("DEADLINE") + if entry is None: + return OrgDateDeadline(None) + return cast(OrgDateDeadline, entry.date) + + @deadline.setter + def deadline(self, value: Any) -> None: + date = self._coerce_sdc_date(value, OrgDateDeadline) + self._update_sdc_entry("DEADLINE", date) @property def closed(self): @@ -1374,7 +1616,17 @@ def closed(self): OrgDateClosed((2012, 2, 26, 21, 15, 0)) """ - return self._closed + if self._sdc_line is None: + return OrgDateClosed(None) + entry = self._sdc_line._entries.get("CLOSED") + if entry is None: + return OrgDateClosed(None) + return cast(OrgDateClosed, entry.date) + + @closed.setter + def closed(self, value: Any) -> None: + date = self._coerce_sdc_date(value, OrgDateClosed) + self._update_sdc_entry("CLOSED", date) @property def clock(self): @@ -1392,7 +1644,32 @@ def clock(self): [OrgDateClock((2012, 2, 26, 21, 10, 0), (2012, 2, 26, 21, 15, 0))] """ - return self._clocklist + return [line.date for line in self._clock_lines] + + @clock.setter + def clock(self, value: Iterable[OrgDateClock]) -> None: + new_clocks = list(value) + existing_indices = [i for i, item in enumerate(self._line_items) if isinstance(item, ClockLine)] + if existing_indices: + for i, clock in enumerate(new_clocks): + if i < len(existing_indices): + line_item = self._line_items[existing_indices[i]] + if isinstance(line_item, ClockLine): + line_item.update(clock) + else: + self._update_line_item(existing_indices[i], self._format_clock_line(clock)) + else: + insert_at = existing_indices[-1] + (i - len(existing_indices) + 1) + self._insert_line_item(insert_at, self._format_clock_line(clock)) + for i in reversed(existing_indices[len(new_clocks) :]): + self._remove_line_item(i) + else: + insert_at = 1 + if self._sdc_line is not None: + insert_at = self._line_items.index(self._sdc_line) + 1 + for i, clock in enumerate(new_clocks): + self._insert_line_item(insert_at + i, self._format_clock_line(clock)) + self._clock_lines = [item for item in self._line_items if isinstance(item, ClockLine)] def has_date(self): """ @@ -1446,6 +1723,61 @@ def repeated_tasks(self): """ return self._repeated_tasks + @repeated_tasks.setter + def repeated_tasks(self, value: Iterable[OrgDateRepeatedTask]) -> None: + new_repeats = list(value) + self._repeated_tasks = new_repeats + existing_lines = self._repeat_task_lines_in_order() + + for line, repeat in zip(existing_lines, new_repeats): + line.update_repeat(repeat) + + for line in reversed(existing_lines[len(new_repeats) :]): + index = self._line_items.index(line) + self._remove_line_item(index) + for drawer in self._logbook_drawers: + if line in drawer.entries: + drawer.entries.remove(line) + + for repeat in new_repeats[len(existing_lines) :]: + insert_drawer: LogbookDrawer | None + insert_index: int + indent: str + (insert_drawer, insert_index, indent) = self._repeat_task_insert_target(existing_lines) + entry = RepeatTaskLine.from_repeat(repeat, indent) + self._insert_line_item(insert_index, entry) + if insert_drawer is not None: + insert_drawer.entries.append(entry) + existing_lines.append(entry) + + self._remove_empty_generated_logbooks() + self._sync_repeated_tasks_cache() + + def _repeat_task_insert_target( + self, + existing_lines: list[RepeatTaskLine], + ) -> tuple[LogbookDrawer | None, int, str]: + if self._logbook_drawers: + drawer = self._logbook_drawers[-1] + insert_index = self._line_items.index(drawer.end_line) + return (drawer, insert_index, drawer.indent) + if existing_lines: + last_line = existing_lines[-1] + insert_index = self._line_items.index(last_line) + 1 + return (None, insert_index, last_line.indent) + drawer = self._create_logbook_drawer() + insert_index = self._line_items.index(drawer.end_line) + return (drawer, insert_index, drawer.indent) + + def _remove_empty_generated_logbooks(self) -> None: + for drawer in list(self._logbook_drawers): + if drawer.generated and not drawer.entries: + start_index = self._line_items.index(drawer.start_line) + end_index = self._line_items.index(drawer.end_line) + for index in range(end_index, start_index - 1, -1): + self._remove_line_item(index) + self._logbook_drawers.remove(drawer) + def parse_lines(lines: Iterable[str], filename, env=None) -> OrgNode: if not env: diff --git a/src/orgparse/tests/test_dump.py b/src/orgparse/tests/test_dump.py new file mode 100644 index 0000000..9b32aa6 --- /dev/null +++ b/src/orgparse/tests/test_dump.py @@ -0,0 +1,57 @@ +from __future__ import annotations + +from pathlib import Path + +from .. import dump, dumps, load, loads + + +def test_dump_roundtrip(tmp_path: Path) -> None: + content = """* Node + Body""" + source = tmp_path / "source.org" + target = tmp_path / "target.org" + source.write_text(content, encoding="utf8") + + root = load(source) + dump(root, target) + + assert target.read_text(encoding="utf8") == content + + +def test_dumps_iterable_nodes() -> None: + root = loads("""* A +* B""") + output = dumps(root.children) + assert ( + output + == """* A +* B""" + ) + + +def test_dump_reflects_updates() -> None: + root = loads("""* Node + Body""") + node = root.children[0] + node.heading = "Updated" + node.body = "New body" + + assert ( + dumps(root) + == """* Updated +New body""" + ) + + +def test_dump_multiple_nodes(tmp_path: Path) -> None: + root = loads("""* A +* B""") + target = tmp_path / "nodes.org" + + dump(root.children, target) + + assert ( + target.read_text(encoding="utf8") + == """* A +* B""" + ) diff --git a/src/orgparse/tests/test_misc.py b/src/orgparse/tests/test_misc.py index bb1382e..3c24204 100644 --- a/src/orgparse/tests/test_misc.py +++ b/src/orgparse/tests/test_misc.py @@ -69,6 +69,89 @@ def test_stars(): assert h2.heading == '' +def test_stars_inside_blocks() -> None: + root = loads(''' +* Node 1 +#+begin_src c + /* + * Comment + */ +#+end_src +#+begin_example + * Example line +#+end_example +* Node 2 +''') + assert len(root.children) == 2 + body = root.children[0].get_body(format='raw') + assert '* Comment' in body + assert '* Example line' in body + + +def test_stars_inside_list() -> None: + root = loads(''' +* Node 1 + - List item + + Nested List item + * Another nested list item +* Node 2 +''') + assert len(root.children) == 2 + body = root.children[0].get_body(format='raw') + assert '* Another nested list item' in body + + +def test_repeated_tasks_with_comments() -> None: + root = loads(''' +* Node 1 +** TODO Test node + SCHEDULED: <2026-01-10 Sat +1m> + :LOGBOOK: + - State "DONE" from "TODO" [2026-01-02 Fri 17:04] + - State "CANCELLED" from "TODO" [2025-12-07 nie 17:15] + - State "CANCELLED" from "TODO" [2025-12-07 nie 17:14] \\\\ + Comment. + - State "CANCELLED" from "TODO" [2025-09-21 nie 16:09] \\\\ + Another comment. + - State "CANCELLED" from "TODO" [2025-09-06 sob 14:10] \\\\ + One more. + - State "CANCELLED" from "TODO" [2025-07-28 pon 18:36] + - State "CANCELLED" from "TODO" [2025-06-28 sob 15:34] \\\\ + Should be 7 repeats + :END: +''') + node = root.children[0].children[0] + assert len(node.repeated_tasks) == 7 + assert [repeat.comment for repeat in node.repeated_tasks] == [ + None, + None, + "Comment.", + "Another comment.", + "One more.", + None, + "Should be 7 repeats", + ] + + +def test_repeated_tasks_mixed_comments() -> None: + root = loads(''' +* Node 1 +** TODO Test node + :LOGBOOK: + - State "DONE" from "TODO" [2026-01-02 Fri 17:04] \\\\ + First line + Second line + - State "DONE" from "TODO" [2026-01-03 Fri 17:04] + :END: +''') + node = root.children[0].children[0] + assert len(node.repeated_tasks) == 2 + assert [repeat.comment for repeat in node.repeated_tasks] == [ + "First line\nSecond line", + None, + ] + + def test_parse_custom_todo_keys(): todo_keys = ['TODO', 'CUSTOM1', 'ANOTHER_KEYWORD'] done_keys = ['DONE', 'A'] @@ -117,11 +200,11 @@ def test_add_custom_todo_keys(): def test_get_file_property() -> None: content = """#+TITLE: Test: title - * Node 1 - test 1 - * Node 2 - test 2 - """ +* Node 1 +test 1 +* Node 2 +test 2 +""" # after parsing, all keys are set root = loads(content) @@ -137,11 +220,11 @@ def test_get_file_property_multivalued() -> None: #+OTHER: Test title #+title: alternate title - * Node 1 - test 1 - * Node 2 - test 2 - """ +* Node 1 +test 1 +* Node 2 +test 2 +""" # after parsing, all keys are set root = loads(content) diff --git a/src/orgparse/tests/test_node_mutations.py b/src/orgparse/tests/test_node_mutations.py new file mode 100644 index 0000000..157743c --- /dev/null +++ b/src/orgparse/tests/test_node_mutations.py @@ -0,0 +1,618 @@ +import datetime + +from orgparse.date import OrgDate, OrgDateClock, OrgDateRepeatedTask + +from .. import loads + + +def test_dynamic_heading_edits() -> None: + content = """* TODO [#A] Heading :tag1:tag2: + Body line + Second line""" + root = loads(content) + node = root.children[0] + assert str(node) == content + + node.todo = "DONE" + assert node.todo == "DONE" + assert str(node).splitlines()[0] == "* DONE [#A] Heading :tag1:tag2:" + + node.priority = None + assert node.priority is None + assert str(node).splitlines()[0] == "* DONE Heading :tag1:tag2:" + + node.heading = "Updated heading" + assert node.heading == "Updated heading" + assert str(node).splitlines()[0] == "* DONE Updated heading :tag1:tag2:" + + node.tags = ["x", "y"] + assert node.shallow_tags == {"x", "y"} + assert str(node).splitlines()[0] == "* DONE Updated heading :x:y:" + + node.todo = None + node.priority = "B" + node.tags = [] + + assert ( + str(node) + == """* [#B] Updated heading + Body line + Second line""" + ) + + +def test_children_setter_reparents_root() -> None: + content = """* A +** A1 +* B +** B1""" + root = loads(content) + (a, b) = root.children + b1 = b.children[0] + + root.children = [b] + + assert root.children == [b] + assert b.parent is root + assert b1.parent is b + assert [node.heading for node in root[1:]] == ["B", "B1"] + assert a not in list(root) + + +def test_children_setter_reparents_and_adjusts_levels() -> None: + content = """* A +** A1 +*** A1a +* B +*** B1""" + root = loads(content) + a = root.children[0] + b = root.children[1] + b1 = b.children[0] + + a.children = [b1] + + assert a.children == [b1] + assert b.children == [] + assert b1.parent is a + assert b1.level == a.level + 1 + assert str(b1).splitlines()[0] == "** B1" + assert [node.heading for node in root[1:]] == ["A", "B1", "B"] + + +def test_children_setter_reparents_subtree_and_shifts_descendants() -> None: + content = """* A +** A1 +*** A1a +* B +** B1 +*** B1a""" + root = loads(content) + a1 = root.children[0].children[0] + b = root.children[1] + b1 = b.children[0] + b1a = b1.children[0] + + a1.children = [b1] + + assert a1.children == [b1] + assert b.children == [] + assert b1.parent is a1 + assert b1.level == a1.level + 1 + assert b1a.level == b1.level + 1 + assert str(b1).splitlines()[0] == "*** B1" + assert str(b1a).splitlines()[0] == "**** B1a" + assert "A1a" not in [node.heading for node in root[1:]] + + +def test_dynamic_timestamp_edits() -> None: + content = """* Node + CLOSED: [2012-02-26 Sun 21:15] SCHEDULED: <2012-02-26 Sun> + CLOCK: [2012-02-26 Sun 21:10]--[2012-02-26 Sun 21:15] => 0:05 + Body""" + root = loads(content) + node = root.children[0] + + node.deadline = datetime.date(2012, 3, 1) + sdc_line = str(node).splitlines()[1] + assert "DEADLINE: <2012-03-01 Thu>" in sdc_line + assert "SCHEDULED: <2012-02-26 Sun>" in sdc_line + assert "CLOSED: [2012-02-26 Sun 21:15]" in sdc_line + + node.closed = None + sdc_line = str(node).splitlines()[1] + assert "CLOSED:" not in sdc_line + + node.clock = [OrgDateClock((2012, 2, 26, 22, 0, 0), (2012, 2, 26, 22, 30, 0))] + clock_line = str(node).splitlines()[2] + assert "CLOCK: [2012-02-26 Sun 22:00]--[2012-02-26 Sun 22:30]" in clock_line + + +def test_add_scheduled_timestamp_line() -> None: + content = """* Node + Body""" + root = loads(content) + node = root.children[0] + + node.scheduled = datetime.date(2012, 2, 26) + assert ( + str(node) + == """* Node +SCHEDULED: <2012-02-26 Sun> + Body""" + ) + + +def test_overwrite_existing_dates() -> None: + content = """* Node + SCHEDULED: <2012-02-26 Sun> DEADLINE: <2012-02-27 Mon> CLOSED: [2012-02-25 Sat] + Body""" + node = loads(content).children[0] + + node.scheduled = datetime.date(2012, 3, 1) + node.deadline = datetime.date(2012, 3, 2) + node.closed = datetime.datetime(2012, 3, 3, 10, 30) + + sdc_line = str(node).splitlines()[1] + assert "SCHEDULED: <2012-03-01 Thu>" in sdc_line + assert "DEADLINE: <2012-03-02 Fri>" in sdc_line + assert "CLOSED: [2012-03-03 Sat 10:30]" in sdc_line + + +def test_add_dates_to_node_without_dates() -> None: + content = """* Node + Body""" + node = loads(content).children[0] + + node.scheduled = datetime.date(2012, 2, 26) + node.deadline = datetime.date(2012, 3, 1) + node.closed = datetime.datetime(2012, 2, 27, 8, 30) + + assert ( + str(node) + == """* Node +SCHEDULED: <2012-02-26 Sun> DEADLINE: <2012-03-01 Thu> CLOSED: [2012-02-27 Mon 08:30] + Body""" + ) + + +def test_add_dates_to_node_with_existing_dates() -> None: + content = """* Node + SCHEDULED: <2012-02-26 Sun> + Body""" + node = loads(content).children[0] + + node.deadline = datetime.date(2012, 3, 1) + node.closed = datetime.datetime(2012, 2, 27, 8, 30) + + sdc_line = str(node).splitlines()[1] + assert "SCHEDULED: <2012-02-26 Sun>" in sdc_line + assert "DEADLINE: <2012-03-01 Thu>" in sdc_line + assert "CLOSED: [2012-02-27 Mon 08:30]" in sdc_line + + +def test_remove_dates_from_node_without_dates() -> None: + content = """* Node + Body""" + node = loads(content).children[0] + + node.scheduled = None + node.deadline = None + node.closed = None + + assert str(node) == content + + +def test_remove_dates_from_node_with_dates() -> None: + content = """* Node + SCHEDULED: <2012-02-26 Sun> DEADLINE: <2012-03-01 Thu> + Body""" + node = loads(content).children[0] + + node.scheduled = None + node.deadline = None + + assert ( + str(node) + == """* Node + Body""" + ) + + +def test_duplicate_scheduled_dates() -> None: + content = """* Node + SCHEDULED: <2012-02-26 Sun> SCHEDULED: <2012-03-01 Thu> + Body""" + node = loads(content).children[0] + assert node.scheduled.start == datetime.date(2012, 3, 1) + + node.scheduled = datetime.date(2012, 4, 1) + sdc_line = str(node).splitlines()[1] + assert "SCHEDULED: <2012-02-26 Sun>" in sdc_line + assert "SCHEDULED: <2012-04-01 Sun>" in sdc_line + + +def test_multiple_clock_entries() -> None: + content = """* Node + CLOCK: [2012-02-26 Sun 21:10]--[2012-02-26 Sun 21:15] => 0:05 + CLOCK: [2012-02-26 Sun 22:00]--[2012-02-26 Sun 22:30] => 0:30 + Body""" + node = loads(content).children[0] + + node.clock = [ + OrgDateClock((2012, 2, 26, 23, 0, 0), (2012, 2, 26, 23, 30, 0)), + OrgDateClock((2012, 2, 27, 1, 0, 0), (2012, 2, 27, 1, 15, 0)), + ] + + assert ( + str(node) + == """* Node + CLOCK: [2012-02-26 Sun 23:00]--[2012-02-26 Sun 23:30] => 0:30 + CLOCK: [2012-02-27 Mon 01:00]--[2012-02-27 Mon 01:15] => 0:15 + Body""" + ) + + node.clock = [] + + assert ( + str(node) + == """* Node + Body""" + ) + + +def test_clock_updates_recompute_duration() -> None: + content = """* Node + CLOCK: [2012-02-26 Sun 21:10]--[2012-02-26 Sun 21:15] => 0:05 + Body""" + node = loads(content).children[0] + + node.clock = [OrgDateClock((2012, 2, 26, 21, 10, 0), (2012, 2, 26, 21, 40, 0))] + + assert ( + str(node) + == """* Node + CLOCK: [2012-02-26 Sun 21:10]--[2012-02-26 Sun 21:40] => 0:30 + Body""" + ) + + +def test_clock_updates_drop_duration_when_no_end() -> None: + content = """* Node + CLOCK: [2012-02-26 Sun 21:10]--[2012-02-26 Sun 21:15] => 0:05 + Body""" + node = loads(content).children[0] + + node.clock = [OrgDateClock((2012, 2, 26, 21, 10, 0))] + + assert ( + str(node) + == """* Node + CLOCK: [2012-02-26 Sun 21:10] + Body""" + ) + + +def test_setting_inactive_scheduled_date() -> None: + content = """* Node + Body""" + node = loads(content).children[0] + + node.scheduled = OrgDate((2012, 2, 26), active=False) + assert ( + str(node) + == """* Node +SCHEDULED: [2012-02-26 Sun] + Body""" + ) + + +def test_overwrite_properties() -> None: + content = """* Node + :PROPERTIES: + :Owner: Jane + :Effort: 1:10 + :END: + Body""" + node = loads(content).children[0] + + node.properties = {"Owner": "Alex", "Effort": "0:30"} + assert node.properties["Effort"] == 30 + assert ( + str(node) + == """* Node + :PROPERTIES: + :Owner: Alex + :Effort: 0:30 + :END: + Body""" + ) + + +def test_add_properties_without_drawer() -> None: + content = """* Node + Body""" + node = loads(content).children[0] + + node.properties = {"Owner": "Alex"} + assert ( + str(node) + == """* Node + :PROPERTIES: + :Owner: Alex + :END: + Body""" + ) + + +def test_add_properties_with_existing_drawer() -> None: + content = """* Node + :PROPERTIES: + :Owner: Jane + :END: + Body""" + node = loads(content).children[0] + + node.properties = {"Owner": "Jane", "Project": "Alpha"} + assert ( + str(node) + == """* Node + :PROPERTIES: + :Owner: Jane + :Project: Alpha + :END: + Body""" + ) + + +def test_remove_properties_without_drawer() -> None: + content = """* Node + Body""" + node = loads(content).children[0] + + node.properties = {} + assert str(node) == content + + +def test_remove_properties_with_drawer() -> None: + content = """* Node + :PROPERTIES: + :Owner: Jane + :END: + Body""" + node = loads(content).children[0] + + node.properties = {} + assert ( + str(node) + == """* Node + Body""" + ) + + +def test_duplicate_properties_update_last() -> None: + content = """* Node + :PROPERTIES: + :Owner: Jane + :Owner: Jill + :END: + Body""" + node = loads(content).children[0] + assert node.properties["Owner"] == "Jill" + + node.properties = {"Owner": "Alex"} + assert ( + str(node) + == """* Node + :PROPERTIES: + :Owner: Jane + :Owner: Alex + :END: + Body""" + ) + + +def test_properties_preserve_output_when_unchanged() -> None: + content = """* Node + :PROPERTIES: + :Owner: Jane + :END: + Body""" + node = loads(content).children[0] + assert str(node) == content + + +def test_root_node_properties() -> None: + content = """Intro + +:PROPERTIES: +:Title: Example +:END: + +* Node""" + root = loads(content) + root.properties = {"Title": "Updated"} + assert ( + str(root) + == """Intro + +:PROPERTIES: +:Title: Updated +:END: +""" + ) + + +def test_overwrite_repeated_tasks_in_logbook() -> None: + content = """* Node + :LOGBOOK: + - State "DONE" from "TODO" [2005-09-01 Thu 16:10] + - State "DONE" from "TODO" [2005-08-01 Mon 19:44] + :END: + Body""" + node = loads(content).children[0] + node.repeated_tasks = [ + OrgDateRepeatedTask((2005, 7, 1, 17, 27, 0), "TODO", "DONE"), + OrgDateRepeatedTask((2005, 6, 1, 10, 0, 0), "TODO", "DONE"), + ] + assert ( + str(node) + == """* Node + :LOGBOOK: + - State "DONE" from "TODO" [2005-07-01 Fri 17:27] + - State "DONE" from "TODO" [2005-06-01 Wed 10:00] + :END: + Body""" + ) + + +def test_add_repeated_tasks_without_logbook() -> None: + content = """* Node + Body""" + node = loads(content).children[0] + node.repeated_tasks = [ + OrgDateRepeatedTask((2005, 9, 1, 16, 10, 0), "TODO", "DONE"), + ] + assert ( + str(node) + == """* Node + :LOGBOOK: + - State "DONE" from "TODO" [2005-09-01 Thu 16:10] + :END: + Body""" + ) + + +def test_add_repeated_tasks_with_logbook() -> None: + content = """* Node + :LOGBOOK: + CLOCK: [2012-10-26 Fri 16:01] + :END: + Body""" + node = loads(content).children[0] + node.repeated_tasks = [ + OrgDateRepeatedTask((2005, 9, 1, 16, 10, 0), "TODO", "DONE"), + ] + assert ( + str(node) + == """* Node + :LOGBOOK: + CLOCK: [2012-10-26 Fri 16:01] + - State "DONE" from "TODO" [2005-09-01 Thu 16:10] + :END: + Body""" + ) + + +def test_remove_repeated_tasks_without_logbook() -> None: + content = """* Node + Body""" + node = loads(content).children[0] + node.repeated_tasks = [] + assert str(node) == content + + +def test_remove_repeated_tasks_with_logbook() -> None: + content = """* Node + :LOGBOOK: + - State "DONE" from "TODO" [2005-09-01 Thu 16:10] + :END: + Body""" + node = loads(content).children[0] + node.repeated_tasks = [] + assert ( + str(node) + == """* Node + :LOGBOOK: + :END: + Body""" + ) + + +def test_multiple_logbook_drawers_update_all() -> None: + content = """* Node + :LOGBOOK: + - State "DONE" from "TODO" [2005-09-01 Thu 16:10] + :END: + :LOGBOOK: + - State "DONE" from "TODO" [2005-08-01 Mon 19:44] + :END: + Body""" + node = loads(content).children[0] + node.repeated_tasks = [ + OrgDateRepeatedTask((2005, 7, 1, 17, 27, 0), "TODO", "DONE"), + OrgDateRepeatedTask((2005, 6, 1, 10, 0, 0), "TODO", "DONE"), + ] + rendered = str(node) + assert rendered.count("[2005-07-01 Fri 17:27]") == 1 + assert rendered.count("[2005-06-01 Wed 10:00]") == 1 + + +def test_external_repeated_tasks_update() -> None: + content = """* Node + - State "DONE" from "TODO" [2005-09-01 Thu 16:10] + Body""" + node = loads(content).children[0] + node.repeated_tasks = [OrgDateRepeatedTask((2005, 7, 1, 17, 27, 0), "TODO", "DONE")] + assert ( + str(node) + == """* Node + - State "DONE" from "TODO" [2005-07-01 Fri 17:27] + Body""" + ) + + +def test_remove_generated_logbook_when_cleared() -> None: + content = """* Node + Body""" + node = loads(content).children[0] + node.repeated_tasks = [OrgDateRepeatedTask((2005, 9, 1, 16, 10, 0), "TODO", "DONE")] + node.repeated_tasks = [] + assert str(node) == content + + +def test_body_setter_preserves_structure() -> None: + content = """* Node + SCHEDULED: <2020-01-01 Wed> + :PROPERTIES: + :Foo: bar + :END: + - State "DONE" from "TODO" [2020-01-02 Thu] + Body line 1 + Body line 2 + CLOCK: [2020-01-03 Fri 10:00]--[2020-01-03 Fri 11:00] => 1:00""" + node = loads(content).children[0] + assert str(node) == content + + node.body = "New body\nSecond line" + + expected = """* Node + SCHEDULED: <2020-01-01 Wed> + :PROPERTIES: + :Foo: bar + :END: + - State "DONE" from "TODO" [2020-01-02 Thu] +New body +Second line + CLOCK: [2020-01-03 Fri 10:00]--[2020-01-03 Fri 11:00] => 1:00""" + assert str(node) == expected + + +def test_body_setter_clears_body() -> None: + content = """* Node + Body line 1 + Body line 2""" + node = loads(content).children[0] + node.body = "" + assert node.body == "" + assert str(node) == """* Node""" + + +def test_body_setter_updates_timestamps() -> None: + content = """* Node + Body with <2020-01-01 Wed> and [2020-01-02 Thu]""" + node = loads(content).children[0] + assert [str(date) for date in node.datelist] == ["<2020-01-01 Wed>", "[2020-01-02 Thu]"] + + node.body = "New <2020-02-03 Mon>" + assert [str(date) for date in node.datelist] == ["<2020-02-03 Mon>"] diff --git a/src/orgparse/tests/test_rich.py b/src/orgparse/tests/test_rich.py index 5171bb0..761f441 100644 --- a/src/orgparse/tests/test_rich.py +++ b/src/orgparse/tests/test_rich.py @@ -39,9 +39,10 @@ def test_table() -> None: [_gap1, t1, _gap2, t2, _gap3, t3, _gap4] = root.body_rich - t1 = Table(root._lines[1:10]) - t2 = Table(root._lines[11:19]) - t3 = Table(root._lines[22:26]) + rendered = root._render_lines() + t1 = Table(rendered[1:10]) + t2 = Table(rendered[11:19]) + t3 = Table(rendered[22:26]) assert ilen(t1.blocks) == 4 assert list(t1.blocks)[2] == []