From ac5de852e80f604970955fad9a78fc0b27396631 Mon Sep 17 00:00:00 2001 From: Giles Odigwe Date: Thu, 11 Jun 2026 14:22:52 -0700 Subject: [PATCH 1/2] Python: [Breaking] Refactor FileSkillsSource for depth-based discovery and predicate filters Refactors FileSkillsSource to make script and resource discovery more flexible. ## Changes - **Drops** resource_directories / script_directories options (preconfigured directory whitelists). - **Adds** search_depth option (>= 1, default 2): controls how deep the recursive scan goes within each skill directory. - **Adds** script_filter / resource_filter predicate options that receive a FileSkillFilterContext (skill_name + relative_file_path), allowing whitelist/blacklist filtering by file path. - **Adds** FileSkillFilterContext class exported from agent_framework. ## Notes - The Skills API is marked @experimental -- the option removals are intentional breaking changes within the experimental surface. - Security checks (path containment, symlink detection) are preserved and continue to use the skill root directory as the trusted boundary. - Ports the same refactoring from .NET PR #6109 while following Python conventions (instance methods, Callable type hints, __slots__). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../packages/core/agent_framework/__init__.py | 4 +- .../packages/core/agent_framework/_skills.py | 605 ++++++++++-------- .../packages/core/agent_framework/_tools.py | 4 +- .../packages/core/tests/core/test_skills.py | 450 +++++++------ 4 files changed, 606 insertions(+), 457 deletions(-) diff --git a/python/packages/core/agent_framework/__init__.py b/python/packages/core/agent_framework/__init__.py index 478adb2dc7d..db911b498cb 100644 --- a/python/packages/core/agent_framework/__init__.py +++ b/python/packages/core/agent_framework/__init__.py @@ -124,7 +124,6 @@ TodoSessionStore, TodoStore, ) -from ._mcp import MCPStdioTool, MCPStreamableHTTPTool, MCPTaskOptions, MCPWebsocketTool, SamplingApprovalCallback from ._harness._tool_approval import ( DEFAULT_TOOL_APPROVAL_SOURCE_ID, ToolApprovalMiddleware, @@ -134,6 +133,7 @@ create_always_approve_tool_response, create_always_approve_tool_with_arguments_response, ) +from ._mcp import MCPStdioTool, MCPStreamableHTTPTool, MCPTaskOptions, MCPWebsocketTool, SamplingApprovalCallback from ._middleware import ( AgentContext, AgentMiddleware, @@ -170,6 +170,7 @@ DeduplicatingSkillsSource, DelegatingSkillsSource, FileSkill, + FileSkillFilterContext, FileSkillScript, FileSkillsSource, FilteringSkillsSource, @@ -426,6 +427,7 @@ "FileSearchMatch", "FileSearchResult", "FileSkill", + "FileSkillFilterContext", "FileSkillScript", "FileSkillsSource", "FileSystemAgentFileStore", diff --git a/python/packages/core/agent_framework/_skills.py b/python/packages/core/agent_framework/_skills.py index 91bdb619143..0ce756e9340 100644 --- a/python/packages/core/agent_framework/_skills.py +++ b/python/packages/core/agent_framework/_skills.py @@ -1546,13 +1546,60 @@ def __call__( ".txt", ) DEFAULT_SCRIPT_EXTENSIONS: Final[tuple[str, ...]] = (".py",) +DEFAULT_SEARCH_DEPTH: Final[int] = 2 -# "." means the skill directory root itself (files directly in the skill folder). -ROOT_DIRECTORY_INDICATOR: Final[str] = "." -# Standard subdirectory names per https://agentskills.io/specification#directory-structure -DEFAULT_RESOURCE_DIRECTORIES: Final[tuple[str, ...]] = ("references", "assets") -DEFAULT_SCRIPT_DIRECTORIES: Final[tuple[str, ...]] = ("scripts",) +@experimental(feature_id=ExperimentalFeature.SKILLS) +class FileSkillFilterContext: + """Context provided to script/resource filter predicates during file-based skill discovery. + + When a ``script_filter`` or ``resource_filter`` predicate is configured on + :class:`FileSkillsSource`, each discovered file is wrapped in this context + before being passed to the predicate. The predicate returns ``True`` to + include the file or ``False`` to exclude it. + + Attributes: + skill_name: The name of the skill (from SKILL.md frontmatter). + relative_file_path: The path to the file relative to the skill + directory, using forward slashes. For root-level files this is + just the filename; for nested files it includes subdirectories. + + Examples: + A filter that excludes test scripts: + + .. code-block:: python + + def no_tests(ctx: FileSkillFilterContext) -> bool: + return "test" not in ctx.relative_file_path + + + source = FileSkillsSource( + skill_paths="./skills", + script_filter=no_tests, + ) + """ + + __slots__ = ("relative_file_path", "skill_name") + + def __init__(self, skill_name: str, relative_file_path: str) -> None: + """Initialize a FileSkillFilterContext. + + Args: + skill_name: The name of the skill as declared in SKILL.md frontmatter. + relative_file_path: Path to the file relative to the skill directory + (forward-slash-separated). + + Raises: + ValueError: If either argument is empty or whitespace-only. + """ + if not skill_name or not skill_name.strip(): + raise ValueError("skill_name cannot be empty.") + if not relative_file_path or not relative_file_path.strip(): + raise ValueError("relative_file_path cannot be empty.") + + self.skill_name = skill_name + self.relative_file_path = relative_file_path + # region Patterns and prompt template @@ -1865,8 +1912,9 @@ def from_paths( script_runner: SkillScriptRunner | None = None, resource_extensions: tuple[str, ...] | None = None, script_extensions: tuple[str, ...] | None = None, - resource_directories: Sequence[str] | None = None, - script_directories: Sequence[str] | None = None, + search_depth: int | None = None, + script_filter: Callable[[FileSkillFilterContext], bool] | None = None, + resource_filter: Callable[[FileSkillFilterContext], bool] | None = None, instruction_template: str | None = None, require_script_approval: bool = False, disable_caching: bool = False, @@ -1889,15 +1937,19 @@ def from_paths( ``(".md", ".json", ".yaml", ".yml", ".csv", ".xml", ".txt")``. script_extensions: File extensions recognized as discoverable scripts. Defaults to ``(".py",)``. - resource_directories: Relative directory paths to scan for - resource files within each skill directory. Use ``"."`` - to include files at the skill root level. Defaults to - ``("references", "assets")`` per the agentskills.io - specification. - script_directories: Relative directory paths to scan for - script files within each skill directory. Use ``"."`` - to include files at the skill root level. Defaults to - ``("scripts",)`` per the agentskills.io specification. + search_depth: Maximum depth to search for script and resource + files within each skill directory. A value of ``1`` searches + only the skill root; ``2`` (the default) searches the root + plus one level of subdirectories. Must be >= 1. + script_filter: Optional predicate that filters discovered script + files. Receives a :class:`FileSkillFilterContext` and returns + ``True`` to include or ``False`` to exclude. When ``None``, + all scripts matching allowed extensions are included. + resource_filter: Optional predicate that filters discovered + resource files. Receives a :class:`FileSkillFilterContext` + and returns ``True`` to include or ``False`` to exclude. + When ``None``, all resources matching allowed extensions are + included. instruction_template: Custom system-prompt template for advertising skills. Must contain a ``{skills}`` placeholder. Uses a built-in template when ``None``. @@ -1927,8 +1979,9 @@ def from_paths( script_runner=script_runner, resource_extensions=resource_extensions, script_extensions=script_extensions, - resource_directories=resource_directories, - script_directories=script_directories, + search_depth=search_depth, + script_filter=script_filter, + resource_filter=resource_filter, ) ) return cls( @@ -2383,15 +2436,12 @@ class FileSkillsSource(SkillsSource): Recursively scans the configured *skill_paths* directories for ``SKILL.md`` files (up to 2 levels deep), parses their YAML frontmatter, - and discovers associated resource and script files from spec-defined - subdirectories. + and discovers associated resource and script files by recursively scanning + each skill directory up to the configured *search_depth*. - By default, resources are discovered from ``references/`` and ``assets/`` - subdirectories, and scripts from ``scripts/``, per the - `agentskills.io specification - `_. Use *resource_directories* - and *script_directories* to customize which subdirectories are scanned. - Pass ``"."`` to include files at the skill root level. + By default, the scan depth is 2 (root + one level of subdirectories). + Use *script_filter* and *resource_filter* predicates to control which + discovered files are included. Security: file-based metadata is XML-escaped before prompt injection, and resource reads are guarded against path traversal and symlink escape. @@ -2405,15 +2455,15 @@ class FileSkillsSource(SkillsSource): source = FileSkillsSource(skill_paths="./skills") skills = await source.get_skills() - With a script runner and custom directories: + With a script runner and filter predicates: .. code-block:: python source = FileSkillsSource( skill_paths=["./skills", "./more-skills"], script_runner=my_runner, - resource_directories=[".", "references", "assets"], - script_directories=["scripts"], + search_depth=3, + script_filter=lambda ctx: not ctx.relative_file_path.startswith("tests/"), ) """ @@ -2424,8 +2474,9 @@ def __init__( script_runner: SkillScriptRunner | None = None, resource_extensions: tuple[str, ...] | None = None, script_extensions: tuple[str, ...] | None = None, - resource_directories: Sequence[str] | None = None, - script_directories: Sequence[str] | None = None, + search_depth: int | None = None, + script_filter: Callable[[FileSkillFilterContext], bool] | None = None, + resource_filter: Callable[[FileSkillFilterContext], bool] | None = None, ) -> None: """Initialize a FileSkillsSource. @@ -2445,18 +2496,22 @@ def __init__( ``(".md", ".json", ".yaml", ".yml", ".csv", ".xml", ".txt")``. script_extensions: File extensions recognized as discoverable scripts. Defaults to ``(".py",)``. - resource_directories: Relative directory paths to scan for - resource files within each skill directory. Use ``"."`` - to include files at the skill root level. Defaults to - ``("references", "assets")`` per the - `agentskills.io specification - `_. - script_directories: Relative directory paths to scan for - script files within each skill directory. Use ``"."`` - to include files at the skill root level. Defaults to - ``("scripts",)`` per the - `agentskills.io specification - `_. + search_depth: Maximum depth to search for script and resource + files within each skill directory. A value of ``1`` searches + only the skill root; ``2`` (the default) searches the root + plus one level of subdirectories. Must be >= 1. + script_filter: Optional predicate that filters discovered script + files. Receives a :class:`FileSkillFilterContext` and returns + ``True`` to include or ``False`` to exclude. When ``None``, + all scripts matching allowed extensions are included. + resource_filter: Optional predicate that filters discovered + resource files. Receives a :class:`FileSkillFilterContext` + and returns ``True`` to include or ``False`` to exclude. + When ``None``, all resources matching allowed extensions are + included. + + Raises: + ValueError: If *search_depth* is less than 1. """ if isinstance(skill_paths, (str, Path)): self._skill_paths: list[str] = [str(skill_paths)] @@ -2467,16 +2522,12 @@ def __init__( self._resource_extensions = resource_extensions or DEFAULT_RESOURCE_EXTENSIONS self._script_extensions = script_extensions or DEFAULT_SCRIPT_EXTENSIONS - self._resource_directories: tuple[str, ...] = ( - tuple(FileSkillsSource._validate_and_normalize_directory_names(resource_directories)) - if resource_directories is not None - else DEFAULT_RESOURCE_DIRECTORIES - ) - self._script_directories: tuple[str, ...] = ( - tuple(FileSkillsSource._validate_and_normalize_directory_names(script_directories)) - if script_directories is not None - else DEFAULT_SCRIPT_DIRECTORIES - ) + depth = search_depth if search_depth is not None else DEFAULT_SEARCH_DEPTH + if depth < 1: + raise ValueError(f"search_depth must be >= 1, got {depth}") + self._search_depth: int = depth + self._script_filter = script_filter + self._resource_filter = resource_filter async def get_skills(self) -> list[Skill]: """Discover and return all file-based skills from configured paths. @@ -2510,17 +2561,13 @@ async def get_skills(self) -> list[Skill]: # Discover file-based resources resources: list[SkillResource] = [] - for rn in FileSkillsSource._discover_resource_files( - skill_path, self._resource_extensions, self._resource_directories - ): + for rn in self._discover_resource_files(skill_path, frontmatter.name): resource_full_path = FileSkillsSource._get_validated_resource_path(skill_path, rn) resources.append(_FileSkillResource(name=rn, full_path=resource_full_path)) # Discover file-based scripts scripts: list[SkillScript] = [] - for sn in FileSkillsSource._discover_script_files( - skill_path, self._script_extensions, self._script_directories - ): + for sn in self._discover_script_files(skill_path, frontmatter.name): script_full_path = os.path.normpath(os.path.join(skill_path, sn)) # noqa: ASYNC240 scripts.append(FileSkillScript(name=sn, full_path=script_full_path, runner=self._script_runner)) @@ -2606,271 +2653,307 @@ def _has_symlink_in_path(path: str, directory: str) -> bool: return True return False - @staticmethod - def _validate_and_normalize_directory_names( - directories: Sequence[str], + def _discover_resource_files( + self, + skill_dir_path: str, + skill_name: str, ) -> list[str]: - """Validate and normalize relative directory names. + """Recursively scan a skill directory for resource files matching configured extensions. - Ensures each entry is a safe relative path. The ``"."`` root indicator - is passed through unchanged. Entries containing ``..`` segments or - representing absolute paths are rejected with a warning and skipped. - Empty or whitespace-only entries raise :class:`ValueError`. + Scans the skill directory up to :attr:`_search_depth` levels deep for + files whose extension is in :attr:`_resource_extensions`, excluding + ``SKILL.md`` itself. Each candidate is validated against path-traversal + and symlink-escape checks; unsafe files are skipped with a warning. + If a :attr:`_resource_filter` predicate is configured, files that do + not satisfy it are excluded. Args: - directories: Sequence of relative directory names to validate. + skill_dir_path: Absolute path to the skill directory to scan. + skill_name: The skill name (from frontmatter) for filter context. Returns: - A list of validated, normalized directory names. - - Raises: - ValueError: If any entry is empty or whitespace-only. + Sorted relative resource paths (forward-slash-separated) for every + discovered file that passes security and filter checks. """ - result: list[str] = [] - for directory in directories: - if not directory or not directory.strip(): - raise ValueError("Directory names must not be empty or whitespace.") + skill_dir = Path(skill_dir_path).absolute() + root_directory_path = str(skill_dir) + resources: list[str] = [] + normalized_extensions = {e.lower() for e in self._resource_extensions} + + self._scan_directory_for_resources( + target_dir=skill_dir, + skill_dir=skill_dir, + root_directory_path=root_directory_path, + skill_name=skill_name, + normalized_extensions=normalized_extensions, + resources=resources, + current_depth=1, + ) - # Normalize separators: backslash → forward slash, strip leading ./ and trailing / - normalized = PurePosixPath(directory.replace("\\", "/")).as_posix() + resources.sort() + return resources - # "." and "./" both normalize to "." — treat as root indicator - if normalized == ROOT_DIRECTORY_INDICATOR: - result.append(ROOT_DIRECTORY_INDICATOR) - continue + def _scan_directory_for_resources( + self, + target_dir: Path, + skill_dir: Path, + root_directory_path: str, + skill_name: str, + normalized_extensions: set[str], + resources: list[str], + current_depth: int, + ) -> None: + """Recursively scan a directory for resource files. - # Reject absolute paths (check both POSIX and Windows-style roots - # so validation is consistent regardless of the host OS) - if os.path.isabs(directory) or normalized.startswith("/") or re.match(r"^[A-Za-z]:[/\\]", directory): + Args: + target_dir: The directory to scan at this level. + skill_dir: The skill root directory (for relative path computation). + root_directory_path: String form of the skill root (for security checks). + skill_name: Skill name for filter predicate context. + normalized_extensions: Lowercased allowed extensions. + resources: Accumulator list for discovered relative paths. + current_depth: Current recursion depth (starts at 1). + """ + if current_depth > self._search_depth: + return + + is_root = target_dir == skill_dir + + # Directory-level symlink check for non-root directories + if not is_root: + resolved_target = str(Path(os.path.normpath(target_dir)).absolute()) + if not FileSkillsSource._is_path_within_directory(resolved_target, root_directory_path): logger.warning( - "Skipping directory '%s': absolute paths are not allowed.", - directory, + "Skipping resource directory '%s': resolves outside skill directory '%s'", + target_dir, + root_directory_path, ) - continue + return - # Reject paths containing ".." segments - if any(segment == ".." for segment in normalized.split("/")): + if FileSkillsSource._has_symlink_in_path(resolved_target, root_directory_path): logger.warning( - "Skipping directory '%s': parent traversal ('..') is not allowed.", - directory, + "Skipping resource directory '%s': symlink detected in path under skill directory '%s'", + target_dir, + root_directory_path, ) - continue - - result.append(normalized) - return result - - @staticmethod - def _discover_resource_files( - skill_dir_path: str, - extensions: tuple[str, ...] = DEFAULT_RESOURCE_EXTENSIONS, - directories: tuple[str, ...] = DEFAULT_RESOURCE_DIRECTORIES, - ) -> list[str]: - """Scan configured subdirectories for resource files matching *extensions*. + return - Scans each directory in *directories* within *skill_dir_path* for files - whose extension is in *extensions*, excluding ``SKILL.md`` itself. - Use ``"."`` in *directories* to include files at the skill root level. - Each candidate is validated against path-traversal and symlink-escape - checks; unsafe files are skipped with a warning. + try: + entries = list(target_dir.iterdir()) + except OSError: + logger.warning( + "Failed to list resource directory '%s' in skill directory '%s'; skipping.", + target_dir, + root_directory_path, + ) + return - Args: - skill_dir_path: Absolute path to the skill directory to scan. - extensions: Tuple of allowed file extensions (e.g. ``(".md", ".json")``). - directories: Relative subdirectory paths to scan for resources. + subdirectories: list[Path] = [] - Returns: - Sorted relative resource paths (forward-slash-separated) for every - discovered file that passes security checks. - """ - skill_dir = Path(skill_dir_path).absolute() - root_directory_path = str(skill_dir) - resources: list[str] = [] - normalized_extensions = {e.lower() for e in extensions} - seen_directories: set[str] = set() - - for directory in directories: - is_root = directory == ROOT_DIRECTORY_INDICATOR - target_dir = skill_dir if is_root else (skill_dir / directory) + for entry in entries: + if entry.is_dir(): + subdirectories.append(entry) + continue - # Deduplicate after resolving to avoid scanning the same directory twice. - # Use normcase for case-insensitive dedup on case-insensitive filesystems. - resolved_target = str(Path(os.path.normpath(target_dir)).absolute()) - dedup_key = os.path.normcase(resolved_target) - if dedup_key in seen_directories: + if not entry.is_file(): continue - seen_directories.add(dedup_key) - if not target_dir.is_dir(): + if entry.name.upper() == SKILL_FILE_NAME.upper(): continue - # Directory-level containment and symlink checks for non-root directories - if not is_root: - if not FileSkillsSource._is_path_within_directory(resolved_target, root_directory_path): - logger.warning( - "Skipping resource directory '%s': resolves outside skill directory '%s'", - directory, - skill_dir_path, - ) - continue + if entry.suffix.lower() not in normalized_extensions: + continue - if FileSkillsSource._has_symlink_in_path(resolved_target, root_directory_path): - logger.warning( - "Skipping resource directory '%s': symlink detected in path under skill directory '%s'", - directory, - skill_dir_path, - ) - continue + resource_full_path = str(Path(os.path.normpath(entry)).absolute()) - # Scan top-level files only (non-recursive) within this directory - try: - entries = list(target_dir.iterdir()) - except OSError: + # Containment check: file must resolve within the skill directory + if not FileSkillsSource._is_path_within_directory(resource_full_path, root_directory_path): logger.warning( - "Failed to list resource directory '%s' in skill directory '%s'; skipping.", - directory, - skill_dir_path, + "Skipping resource '%s': resolves outside skill directory '%s'", + entry, + root_directory_path, ) continue - for resource_file in entries: - if not resource_file.is_file(): - continue - - if resource_file.name.upper() == SKILL_FILE_NAME.upper(): - continue - - if resource_file.suffix.lower() not in normalized_extensions: - continue - - resource_full_path = str(Path(os.path.normpath(resource_file)).absolute()) - - # Containment check: file must resolve within the target directory - if not FileSkillsSource._is_path_within_directory(resource_full_path, resolved_target): - logger.warning( - "Skipping resource '%s': resolves outside target directory '%s'", - resource_file, - directory, - ) - continue + if FileSkillsSource._has_symlink_in_path(resource_full_path, root_directory_path): + logger.warning( + "Skipping resource '%s': symlink detected in path under skill directory '%s'", + entry, + root_directory_path, + ) + continue - if FileSkillsSource._has_symlink_in_path(resource_full_path, root_directory_path): - logger.warning( - "Skipping resource '%s': symlink detected in path under skill directory '%s'", - resource_file, - skill_dir_path, - ) - continue + rel_path = FileSkillsSource._normalize_resource_path(str(entry.relative_to(skill_dir))) - rel_path = resource_file.relative_to(skill_dir) - resources.append(FileSkillsSource._normalize_resource_path(str(rel_path))) + # Apply user-provided filter predicate + if self._resource_filter is not None and not self._resource_filter( + FileSkillFilterContext(skill_name, rel_path) + ): + continue - resources.sort() - return resources + resources.append(rel_path) + + # Recurse into subdirectories if within depth limit + if current_depth < self._search_depth: + for subdir in subdirectories: + self._scan_directory_for_resources( + target_dir=subdir, + skill_dir=skill_dir, + root_directory_path=root_directory_path, + skill_name=skill_name, + normalized_extensions=normalized_extensions, + resources=resources, + current_depth=current_depth + 1, + ) - @staticmethod def _discover_script_files( + self, skill_dir_path: str, - extensions: tuple[str, ...] = DEFAULT_SCRIPT_EXTENSIONS, - directories: tuple[str, ...] = DEFAULT_SCRIPT_DIRECTORIES, + skill_name: str, ) -> list[str]: - """Scan configured subdirectories for script files matching *extensions*. + """Recursively scan a skill directory for script files matching configured extensions. - Scans each directory in *directories* within *skill_dir_path* for files - whose extension is in *extensions*. Use ``"."`` in *directories* to - include files at the skill root level. Each candidate is validated - against path-traversal and symlink-escape checks; unsafe files are - skipped with a warning. + Scans the skill directory up to :attr:`_search_depth` levels deep for + files whose extension is in :attr:`_script_extensions`. Each candidate + is validated against path-traversal and symlink-escape checks; unsafe + files are skipped with a warning. If a :attr:`_script_filter` predicate + is configured, files that do not satisfy it are excluded. Args: skill_dir_path: Absolute path to the skill directory to scan. - extensions: Tuple of allowed script extensions (e.g. ``(".py",)``). - directories: Relative subdirectory paths to scan for scripts. + skill_name: The skill name (from frontmatter) for filter context. Returns: Sorted relative script paths (forward-slash-separated) for every - discovered file that passes security checks. + discovered file that passes security and filter checks. """ skill_dir = Path(skill_dir_path).absolute() root_directory_path = str(skill_dir) scripts: list[str] = [] - normalized_extensions = {e.lower() for e in extensions} - seen_directories: set[str] = set() + normalized_extensions = {e.lower() for e in self._script_extensions} + + self._scan_directory_for_scripts( + target_dir=skill_dir, + skill_dir=skill_dir, + root_directory_path=root_directory_path, + skill_name=skill_name, + normalized_extensions=normalized_extensions, + scripts=scripts, + current_depth=1, + ) - for directory in directories: - is_root = directory == ROOT_DIRECTORY_INDICATOR - target_dir = skill_dir if is_root else (skill_dir / directory) + scripts.sort() + return scripts - # Deduplicate after resolving to avoid scanning the same directory twice. - # Use normcase for case-insensitive dedup on case-insensitive filesystems. - resolved_target = str(Path(os.path.normpath(target_dir)).absolute()) - dedup_key = os.path.normcase(resolved_target) - if dedup_key in seen_directories: - continue - seen_directories.add(dedup_key) + def _scan_directory_for_scripts( + self, + target_dir: Path, + skill_dir: Path, + root_directory_path: str, + skill_name: str, + normalized_extensions: set[str], + scripts: list[str], + current_depth: int, + ) -> None: + """Recursively scan a directory for script files. - if not target_dir.is_dir(): - continue + Args: + target_dir: The directory to scan at this level. + skill_dir: The skill root directory (for relative path computation). + root_directory_path: String form of the skill root (for security checks). + skill_name: Skill name for filter predicate context. + normalized_extensions: Lowercased allowed extensions. + scripts: Accumulator list for discovered relative paths. + current_depth: Current recursion depth (starts at 1). + """ + if current_depth > self._search_depth: + return - # Directory-level containment and symlink checks for non-root directories - if not is_root: - if not FileSkillsSource._is_path_within_directory(resolved_target, root_directory_path): - logger.warning( - "Skipping script directory '%s': resolves outside skill directory '%s'", - directory, - skill_dir_path, - ) - continue + is_root = target_dir == skill_dir - if FileSkillsSource._has_symlink_in_path(resolved_target, root_directory_path): - logger.warning( - "Skipping script directory '%s': symlink detected in path under skill directory '%s'", - directory, - skill_dir_path, - ) - continue + # Directory-level symlink check for non-root directories + if not is_root: + resolved_target = str(Path(os.path.normpath(target_dir)).absolute()) + if not FileSkillsSource._is_path_within_directory(resolved_target, root_directory_path): + logger.warning( + "Skipping script directory '%s': resolves outside skill directory '%s'", + target_dir, + root_directory_path, + ) + return - # Scan top-level files only (non-recursive) within this directory - try: - entries = list(target_dir.iterdir()) - except OSError: + if FileSkillsSource._has_symlink_in_path(resolved_target, root_directory_path): logger.warning( - "Failed to list script directory '%s' in skill directory '%s'; skipping.", - directory, - skill_dir_path, + "Skipping script directory '%s': symlink detected in path under skill directory '%s'", + target_dir, + root_directory_path, ) + return + + try: + entries = list(target_dir.iterdir()) + except OSError: + logger.warning( + "Failed to list script directory '%s' in skill directory '%s'; skipping.", + target_dir, + root_directory_path, + ) + return + + subdirectories: list[Path] = [] + + for entry in entries: + if entry.is_dir(): + subdirectories.append(entry) continue - for script_file in entries: - if not script_file.is_file(): - continue + if not entry.is_file(): + continue - if script_file.suffix.lower() not in normalized_extensions: - continue + if entry.suffix.lower() not in normalized_extensions: + continue - script_full_path = str(Path(os.path.normpath(script_file)).absolute()) + script_full_path = str(Path(os.path.normpath(entry)).absolute()) - # Containment check: file must resolve within the target directory - if not FileSkillsSource._is_path_within_directory(script_full_path, resolved_target): - logger.warning( - "Skipping script '%s': resolves outside target directory '%s'", - script_file, - directory, - ) - continue + # Containment check: file must resolve within the skill directory + if not FileSkillsSource._is_path_within_directory(script_full_path, root_directory_path): + logger.warning( + "Skipping script '%s': resolves outside skill directory '%s'", + entry, + root_directory_path, + ) + continue - if FileSkillsSource._has_symlink_in_path(script_full_path, root_directory_path): - logger.warning( - "Skipping script '%s': symlink detected in path under skill directory '%s'", - script_file, - skill_dir_path, - ) - continue + if FileSkillsSource._has_symlink_in_path(script_full_path, root_directory_path): + logger.warning( + "Skipping script '%s': symlink detected in path under skill directory '%s'", + entry, + root_directory_path, + ) + continue - rel_path = script_file.relative_to(skill_dir) - scripts.append(FileSkillsSource._normalize_resource_path(str(rel_path))) + rel_path = FileSkillsSource._normalize_resource_path(str(entry.relative_to(skill_dir))) - scripts.sort() - return scripts + # Apply user-provided filter predicate + if self._script_filter is not None and not self._script_filter( + FileSkillFilterContext(skill_name, rel_path) + ): + continue + + scripts.append(rel_path) + + # Recurse into subdirectories if within depth limit + if current_depth < self._search_depth: + for subdir in subdirectories: + self._scan_directory_for_scripts( + target_dir=subdir, + skill_dir=skill_dir, + root_directory_path=root_directory_path, + skill_name=skill_name, + normalized_extensions=normalized_extensions, + scripts=scripts, + current_depth=current_depth + 1, + ) @staticmethod def _get_validated_resource_path(skill_dir: str, resource_name: str) -> str: diff --git a/python/packages/core/agent_framework/_tools.py b/python/packages/core/agent_framework/_tools.py index ad232ffeb44..065324289f3 100644 --- a/python/packages/core/agent_framework/_tools.py +++ b/python/packages/core/agent_framework/_tools.py @@ -1989,9 +1989,7 @@ def _store_already_approved_approval_requests( return existing_groups = state.get(_ALREADY_APPROVED_APPROVAL_REQUEST_GROUPS_KEY) - pending_groups: list[Any] = ( - list(cast(Iterable[Any], existing_groups)) if isinstance(existing_groups, list) else [] - ) + pending_groups: list[Any] = list(cast(Iterable[Any], existing_groups)) if isinstance(existing_groups, list) else [] pending_groups.append({ "approval_request_ids": visible_ids, "approval_requests": [request.to_dict() for request in already_approved_requests], diff --git a/python/packages/core/tests/core/test_skills.py b/python/packages/core/tests/core/test_skills.py index 31f679c367f..c2c8a740a11 100644 --- a/python/packages/core/tests/core/test_skills.py +++ b/python/packages/core/tests/core/test_skills.py @@ -18,6 +18,7 @@ ClassSkill, DeduplicatingSkillsSource, FileSkill, + FileSkillFilterContext, FileSkillScript, FileSkillsSource, InlineSkill, @@ -31,11 +32,9 @@ SkillsProvider, ) from agent_framework._skills import ( - DEFAULT_RESOURCE_DIRECTORIES, DEFAULT_RESOURCE_EXTENSIONS, - DEFAULT_SCRIPT_DIRECTORIES, DEFAULT_SCRIPT_EXTENSIONS, - ROOT_DIRECTORY_INDICATOR, + DEFAULT_SEARCH_DEPTH, InlineSkillResource, InlineSkillScript, _create_resource_element, @@ -146,8 +145,9 @@ async def _discover_file_skills_for_test( *, resource_extensions: tuple[str, ...] | None = None, script_extensions: tuple[str, ...] | None = None, - resource_directories: Sequence[str] | None = None, - script_directories: Sequence[str] | None = None, + search_depth: int | None = None, + script_filter: Any = None, + resource_filter: Any = None, script_runner: Any = None, ) -> dict[str, FileSkill]: """Test helper: discover file skills and return as a dict keyed by name. @@ -160,10 +160,12 @@ async def _discover_file_skills_for_test( kwargs["resource_extensions"] = resource_extensions if script_extensions is not None: kwargs["script_extensions"] = script_extensions - if resource_directories is not None: - kwargs["resource_directories"] = resource_directories - if script_directories is not None: - kwargs["script_directories"] = script_directories + if search_depth is not None: + kwargs["search_depth"] = search_depth + if script_filter is not None: + kwargs["script_filter"] = script_filter + if resource_filter is not None: + kwargs["resource_filter"] = resource_filter if script_runner is not None: kwargs["script_runner"] = script_runner @@ -197,33 +199,69 @@ def test_no_change_for_clean_path(self) -> None: assert FileSkillsSource._normalize_resource_path("refs/doc.md") == "refs/doc.md" +def _discover_resources( + skill_dir: str, + skill_name: str = "test-skill", + extensions: tuple[str, ...] | None = None, + search_depth: int | None = None, + resource_filter: Any = None, +) -> list[str]: + """Helper to call the instance-method _discover_resource_files for tests.""" + kwargs: dict[str, Any] = {} + if extensions is not None: + kwargs["resource_extensions"] = extensions + if search_depth is not None: + kwargs["search_depth"] = search_depth + if resource_filter is not None: + kwargs["resource_filter"] = resource_filter + source = FileSkillsSource(skill_dir, **kwargs) + return source._discover_resource_files(skill_dir, skill_name) + + +def _discover_scripts( + skill_dir: str, + skill_name: str = "test-skill", + extensions: tuple[str, ...] | None = None, + search_depth: int | None = None, + script_filter: Any = None, +) -> list[str]: + """Helper to call the instance-method _discover_script_files for tests.""" + kwargs: dict[str, Any] = {} + if extensions is not None: + kwargs["script_extensions"] = extensions + if search_depth is not None: + kwargs["search_depth"] = search_depth + if script_filter is not None: + kwargs["script_filter"] = script_filter + source = FileSkillsSource(skill_dir, **kwargs) + return source._discover_script_files(skill_dir, skill_name) + + class TestDiscoverResourceFiles: - """Tests for _discover_resource_files (filesystem-based resource discovery).""" + """Tests for _discover_resource_files (depth-based resource discovery).""" - def test_discovers_md_files_in_references(self, tmp_path: Path) -> None: + def test_discovers_md_files_in_subdirectory(self, tmp_path: Path) -> None: skill_dir = tmp_path / "my-skill" skill_dir.mkdir() (skill_dir / "SKILL.md").write_text("---\nname: s\ndescription: d\n---\n", encoding="utf-8") refs = skill_dir / "references" refs.mkdir() (refs / "FAQ.md").write_text("FAQ content", encoding="utf-8") - resources = FileSkillsSource._discover_resource_files(str(skill_dir)) + resources = _discover_resources(str(skill_dir)) assert "references/FAQ.md" in resources - def test_discovers_md_files_in_assets(self, tmp_path: Path) -> None: + def test_discovers_files_at_root(self, tmp_path: Path) -> None: skill_dir = tmp_path / "my-skill" skill_dir.mkdir() - assets = skill_dir / "assets" - assets.mkdir() - (assets / "guide.md").write_text("guide", encoding="utf-8") - resources = FileSkillsSource._discover_resource_files(str(skill_dir)) - assert "assets/guide.md" in resources + (skill_dir / "data.json").write_text("{}", encoding="utf-8") + resources = _discover_resources(str(skill_dir)) + assert "data.json" in resources def test_excludes_skill_md_at_root(self, tmp_path: Path) -> None: skill_dir = tmp_path / "my-skill" skill_dir.mkdir() (skill_dir / "SKILL.md").write_text("content", encoding="utf-8") - resources = FileSkillsSource._discover_resource_files(str(skill_dir), directories=(".",)) + resources = _discover_resources(str(skill_dir)) assert len(resources) == 0 def test_discovers_multiple_extensions(self, tmp_path: Path) -> None: @@ -233,7 +271,7 @@ def test_discovers_multiple_extensions(self, tmp_path: Path) -> None: (refs / "data.json").write_text("{}", encoding="utf-8") (refs / "config.yaml").write_text("key: val", encoding="utf-8") (refs / "notes.txt").write_text("notes", encoding="utf-8") - resources = FileSkillsSource._discover_resource_files(str(skill_dir)) + resources = _discover_resources(str(skill_dir)) assert len(resources) == 3 names = set(resources) assert "references/data.json" in names @@ -246,7 +284,7 @@ def test_ignores_unsupported_extensions(self, tmp_path: Path) -> None: refs.mkdir(parents=True) (refs / "image.png").write_bytes(b"\x89PNG") (refs / "binary.exe").write_bytes(b"\x00") - resources = FileSkillsSource._discover_resource_files(str(skill_dir)) + resources = _discover_resources(str(skill_dir)) assert len(resources) == 0 def test_custom_extensions(self, tmp_path: Path) -> None: @@ -255,53 +293,50 @@ def test_custom_extensions(self, tmp_path: Path) -> None: refs.mkdir(parents=True) (refs / "data.json").write_text("{}", encoding="utf-8") (refs / "notes.txt").write_text("notes", encoding="utf-8") - resources = FileSkillsSource._discover_resource_files(str(skill_dir), extensions=(".json",)) + resources = _discover_resources(str(skill_dir), extensions=(".json",)) assert resources == ["references/data.json"] - def test_does_not_discover_nested_files(self, tmp_path: Path) -> None: - """Non-recursive: files inside subdirectories of configured dirs are not discovered.""" - skill_dir = tmp_path / "my-skill" - sub = skill_dir / "references" / "deep" - sub.mkdir(parents=True) - (sub / "doc.md").write_text("deep doc", encoding="utf-8") - resources = FileSkillsSource._discover_resource_files(str(skill_dir)) - assert len(resources) == 0 - - def test_root_directory_discovers_root_files(self, tmp_path: Path) -> None: - """The '.' root indicator discovers files at the skill root level.""" + def test_depth_1_only_discovers_root_files(self, tmp_path: Path) -> None: + """search_depth=1 only finds files directly in the skill root.""" skill_dir = tmp_path / "my-skill" skill_dir.mkdir() - (skill_dir / "data.json").write_text("{}", encoding="utf-8") - resources = FileSkillsSource._discover_resource_files(str(skill_dir), directories=(".",)) - assert "data.json" in resources - - def test_root_does_not_discover_by_default(self, tmp_path: Path) -> None: - """Files at skill root are not discovered with default directories.""" - skill_dir = tmp_path / "my-skill" - skill_dir.mkdir() - (skill_dir / "data.json").write_text("{}", encoding="utf-8") - resources = FileSkillsSource._discover_resource_files(str(skill_dir)) - assert len(resources) == 0 + (skill_dir / "root.md").write_text("root", encoding="utf-8") + sub = skill_dir / "references" + sub.mkdir() + (sub / "nested.md").write_text("nested", encoding="utf-8") + resources = _discover_resources(str(skill_dir), search_depth=1) + assert "root.md" in resources + assert "references/nested.md" not in resources - def test_custom_directories(self, tmp_path: Path) -> None: - """Custom directory names override defaults.""" + def test_depth_2_discovers_one_level_deep(self, tmp_path: Path) -> None: + """Default depth=2 finds root and one level of subdirectories.""" skill_dir = tmp_path / "my-skill" - custom = skill_dir / "docs" - custom.mkdir(parents=True) - (custom / "readme.md").write_text("readme", encoding="utf-8") - resources = FileSkillsSource._discover_resource_files(str(skill_dir), directories=("docs",)) - assert "docs/readme.md" in resources - - def test_nonexistent_directory_silently_skipped(self, tmp_path: Path) -> None: + sub = skill_dir / "references" + sub.mkdir(parents=True) + deep = sub / "deep" + deep.mkdir() + (skill_dir / "root.md").write_text("root", encoding="utf-8") + (sub / "ref.md").write_text("ref", encoding="utf-8") + (deep / "hidden.md").write_text("hidden", encoding="utf-8") + resources = _discover_resources(str(skill_dir), search_depth=2) + assert "root.md" in resources + assert "references/ref.md" in resources + assert "references/deep/hidden.md" not in resources + + def test_depth_3_discovers_two_levels_deep(self, tmp_path: Path) -> None: + """search_depth=3 discovers files at depth 3.""" skill_dir = tmp_path / "my-skill" - skill_dir.mkdir() - resources = FileSkillsSource._discover_resource_files(str(skill_dir), directories=("nonexistent",)) - assert resources == [] + sub = skill_dir / "references" + deep = sub / "deep" + deep.mkdir(parents=True) + (deep / "hidden.md").write_text("hidden", encoding="utf-8") + resources = _discover_resources(str(skill_dir), search_depth=3) + assert "references/deep/hidden.md" in resources def test_empty_directory(self, tmp_path: Path) -> None: skill_dir = tmp_path / "my-skill" skill_dir.mkdir() - resources = FileSkillsSource._discover_resource_files(str(skill_dir)) + resources = _discover_resources(str(skill_dir)) assert resources == [] def test_default_extensions_match_constant(self) -> None: @@ -313,24 +348,44 @@ def test_default_extensions_match_constant(self) -> None: assert ".xml" in DEFAULT_RESOURCE_EXTENSIONS assert ".txt" in DEFAULT_RESOURCE_EXTENSIONS - def test_duplicate_directories_deduplicated(self, tmp_path: Path) -> None: - """Duplicate directory entries should not produce duplicate resources.""" - skill_dir = tmp_path / "my-skill" - refs = skill_dir / "references" - refs.mkdir(parents=True) - (refs / "doc.md").write_text("content", encoding="utf-8") - resources = FileSkillsSource._discover_resource_files(str(skill_dir), directories=("references", "references")) - assert resources == ["references/doc.md"] - def test_results_are_sorted(self, tmp_path: Path) -> None: """Results should be sorted for stable ordering.""" skill_dir = tmp_path / "my-skill" + skill_dir.mkdir() + (skill_dir / "zebra.md").write_text("z", encoding="utf-8") + (skill_dir / "alpha.md").write_text("a", encoding="utf-8") + resources = _discover_resources(str(skill_dir)) + assert resources == ["alpha.md", "zebra.md"] + + def test_resource_filter_excludes_files(self, tmp_path: Path) -> None: + """resource_filter predicate can exclude specific files.""" + skill_dir = tmp_path / "my-skill" refs = skill_dir / "references" refs.mkdir(parents=True) - (refs / "zebra.md").write_text("z", encoding="utf-8") - (refs / "alpha.md").write_text("a", encoding="utf-8") - resources = FileSkillsSource._discover_resource_files(str(skill_dir)) - assert resources == ["references/alpha.md", "references/zebra.md"] + (refs / "keep.md").write_text("keep", encoding="utf-8") + (refs / "exclude.md").write_text("exclude", encoding="utf-8") + resources = _discover_resources( + str(skill_dir), + resource_filter=lambda ctx: "exclude" not in ctx.relative_file_path, + ) + assert "references/keep.md" in resources + assert "references/exclude.md" not in resources + + def test_resource_filter_receives_correct_context(self, tmp_path: Path) -> None: + """resource_filter receives correct skill_name and relative_file_path.""" + skill_dir = tmp_path / "my-skill" + skill_dir.mkdir() + (skill_dir / "data.json").write_text("{}", encoding="utf-8") + received_contexts: list[FileSkillFilterContext] = [] + + def capture_filter(ctx: FileSkillFilterContext) -> bool: + received_contexts.append(ctx) + return True + + _discover_resources(str(skill_dir), skill_name="my-skill", resource_filter=capture_filter) + assert len(received_contexts) == 1 + assert received_contexts[0].skill_name == "my-skill" + assert received_contexts[0].relative_file_path == "data.json" class TestTryParseSkillDocument: @@ -558,7 +613,7 @@ def test_path_traversal_blocked_at_discovery(self, tmp_path: Path) -> None: skill_dir = tmp_path / "skill" skill_dir.mkdir() (tmp_path / "secret.md").write_text("secret", encoding="utf-8") - resources = FileSkillsSource._discover_resource_files(str(skill_dir), directories=(".",)) + resources = _discover_resources(str(skill_dir)) assert not any("secret" in r for r in resources) @@ -887,7 +942,7 @@ def test_discover_resource_files_rejects_symlinked_resource(self, tmp_path: Path refs_dir.mkdir() (refs_dir / "leak.md").symlink_to(outside_file) - resources = FileSkillsSource._discover_resource_files(str(skill_dir)) + resources = _discover_resources(str(skill_dir)) assert "references/leak.md" not in resources def test_discover_skips_symlinked_script(self, tmp_path: Path) -> None: @@ -906,10 +961,9 @@ def test_discover_skips_symlinked_script(self, tmp_path: Path) -> None: (scripts_dir / "safe.py").write_text("print('safe')", encoding="utf-8") (scripts_dir / "leak.py").symlink_to(outside_script) - discovered = FileSkillsSource._discover_script_files(str(skill_dir)) - discovered_names = [p for p in discovered] - assert "scripts/safe.py" in discovered_names - assert "scripts/leak.py" not in discovered_names + discovered = _discover_scripts(str(skill_dir)) + assert "scripts/safe.py" in discovered + assert "scripts/leak.py" not in discovered # --------------------------------------------------------------------------- @@ -1546,7 +1600,7 @@ def test_excludes_skill_md_case_insensitive(self, tmp_path: Path) -> None: skill_dir.mkdir() (skill_dir / "skill.md").write_text("lowercase name", encoding="utf-8") (skill_dir / "other.md").write_text("keep me", encoding="utf-8") - resources = FileSkillsSource._discover_resource_files(str(skill_dir), directories=(".",)) + resources = _discover_resources(str(skill_dir)) names = [r.lower() for r in resources] assert "skill.md" not in names assert "other.md" in resources @@ -1554,19 +1608,17 @@ def test_excludes_skill_md_case_insensitive(self, tmp_path: Path) -> None: def test_skips_directories(self, tmp_path: Path) -> None: """Directories are not included as resources even if their name matches an extension.""" skill_dir = tmp_path / "my-skill" - refs = skill_dir / "references" - refs.mkdir(parents=True) - subdir = refs / "data.json" + skill_dir.mkdir() + subdir = skill_dir / "data.json" subdir.mkdir() - resources = FileSkillsSource._discover_resource_files(str(skill_dir)) + resources = _discover_resources(str(skill_dir), search_depth=1) assert resources == [] def test_extension_matching_is_case_insensitive(self, tmp_path: Path) -> None: skill_dir = tmp_path / "my-skill" - refs = skill_dir / "references" - refs.mkdir(parents=True) - (refs / "NOTES.TXT").write_text("caps", encoding="utf-8") - resources = FileSkillsSource._discover_resource_files(str(skill_dir)) + skill_dir.mkdir() + (skill_dir / "NOTES.TXT").write_text("caps", encoding="utf-8") + resources = _discover_resources(str(skill_dir)) assert len(resources) == 1 @@ -1576,21 +1628,20 @@ class TestDiscoverFilesOSErrorWarning: def test_resource_discovery_warns_on_oserror(self, tmp_path: Path, caplog: pytest.LogCaptureFixture) -> None: """_discover_resource_files logs a warning when iterdir() raises OSError.""" skill_dir = tmp_path / "my-skill" - refs = skill_dir / "references" - refs.mkdir(parents=True) - (refs / "guide.md").write_text("content", encoding="utf-8") + skill_dir.mkdir() + (skill_dir / "guide.md").write_text("content", encoding="utf-8") original_iterdir = Path.iterdir def _patched_iterdir(self: Path) -> Any: - if self.name == "references": + if self == skill_dir: raise PermissionError("access denied") return original_iterdir(self) import unittest.mock with unittest.mock.patch.object(Path, "iterdir", _patched_iterdir): - resources = FileSkillsSource._discover_resource_files(str(skill_dir)) + resources = _discover_resources(str(skill_dir)) assert resources == [] assert any("Failed to list resource directory" in r.message for r in caplog.records) @@ -1598,173 +1649,187 @@ def _patched_iterdir(self: Path) -> Any: def test_script_discovery_warns_on_oserror(self, tmp_path: Path, caplog: pytest.LogCaptureFixture) -> None: """_discover_script_files logs a warning when iterdir() raises OSError.""" skill_dir = tmp_path / "my-skill" - scripts_dir = skill_dir / "scripts" - scripts_dir.mkdir(parents=True) - (scripts_dir / "run.py").write_text("print('hi')", encoding="utf-8") + skill_dir.mkdir() + (skill_dir / "run.py").write_text("print('hi')", encoding="utf-8") original_iterdir = Path.iterdir def _patched_iterdir(self: Path) -> Any: - if self.name == "scripts": + if self == skill_dir: raise PermissionError("access denied") return original_iterdir(self) import unittest.mock with unittest.mock.patch.object(Path, "iterdir", _patched_iterdir): - scripts = FileSkillsSource._discover_script_files(str(skill_dir)) + scripts = _discover_scripts(str(skill_dir)) assert scripts == [] assert any("Failed to list script directory" in r.message for r in caplog.records) -class TestValidateAndNormalizeDirectoryNames: - """Tests for _validate_and_normalize_directory_names.""" +class TestSearchDepthValidation: + """Tests for search_depth parameter validation.""" - def test_simple_directory_name(self) -> None: - result = FileSkillsSource._validate_and_normalize_directory_names(["references"]) - assert result == ["references"] + def test_default_search_depth(self) -> None: + assert DEFAULT_SEARCH_DEPTH == 2 - def test_root_indicator(self) -> None: - result = FileSkillsSource._validate_and_normalize_directory_names(["."]) - assert result == ["."] + def test_search_depth_zero_raises(self) -> None: + with pytest.raises(ValueError, match="search_depth must be >= 1"): + FileSkillsSource(".", search_depth=0) - def test_dot_slash_normalizes_to_root(self) -> None: - result = FileSkillsSource._validate_and_normalize_directory_names(["./"]) - assert result == ["."] + def test_search_depth_negative_raises(self) -> None: + with pytest.raises(ValueError, match="search_depth must be >= 1"): + FileSkillsSource(".", search_depth=-1) - def test_backslash_dot_normalizes_to_root(self) -> None: - result = FileSkillsSource._validate_and_normalize_directory_names([".\\"]) - assert result == ["."] + def test_search_depth_one_accepted(self) -> None: + source = FileSkillsSource(".", search_depth=1) + assert source._search_depth == 1 - def test_backslashes_normalized(self) -> None: - result = FileSkillsSource._validate_and_normalize_directory_names(["sub\\scripts"]) - assert result == ["sub/scripts"] - def test_trailing_slash_stripped(self) -> None: - result = FileSkillsSource._validate_and_normalize_directory_names(["scripts/"]) - assert result == ["scripts"] +class TestFileSkillFilterContext: + """Tests for FileSkillFilterContext.""" - def test_leading_dot_slash_stripped(self) -> None: - result = FileSkillsSource._validate_and_normalize_directory_names(["./references"]) - assert result == ["references"] + def test_creates_with_valid_args(self) -> None: + ctx = FileSkillFilterContext("my-skill", "scripts/run.py") + assert ctx.skill_name == "my-skill" + assert ctx.relative_file_path == "scripts/run.py" - def test_rejects_parent_traversal(self) -> None: - result = FileSkillsSource._validate_and_normalize_directory_names(["../secrets"]) - assert result == [] + def test_empty_skill_name_raises(self) -> None: + with pytest.raises(ValueError, match="skill_name cannot be empty"): + FileSkillFilterContext("", "file.py") - def test_rejects_embedded_parent_traversal(self) -> None: - result = FileSkillsSource._validate_and_normalize_directory_names(["sub/../secrets"]) - assert result == [] + def test_whitespace_skill_name_raises(self) -> None: + with pytest.raises(ValueError, match="skill_name cannot be empty"): + FileSkillFilterContext(" ", "file.py") - def test_rejects_absolute_path(self) -> None: - result = FileSkillsSource._validate_and_normalize_directory_names(["/etc/passwd"]) - assert result == [] + def test_empty_relative_path_raises(self) -> None: + with pytest.raises(ValueError, match="relative_file_path cannot be empty"): + FileSkillFilterContext("skill", "") - def test_rejects_windows_absolute_path(self) -> None: - result = FileSkillsSource._validate_and_normalize_directory_names(["C:\\Windows"]) - assert result == [] + def test_whitespace_relative_path_raises(self) -> None: + with pytest.raises(ValueError, match="relative_file_path cannot be empty"): + FileSkillFilterContext("skill", " ") - def test_empty_string_raises(self) -> None: - with pytest.raises(ValueError, match="empty or whitespace"): - FileSkillsSource._validate_and_normalize_directory_names([""]) - def test_whitespace_only_raises(self) -> None: - with pytest.raises(ValueError, match="empty or whitespace"): - FileSkillsSource._validate_and_normalize_directory_names([" "]) +class TestFileSkillsSourceSearchDepthAndFilters: + """Tests for search_depth, script_filter, and resource_filter parameters.""" - def test_multiple_directories(self) -> None: - result = FileSkillsSource._validate_and_normalize_directory_names([".", "references", "assets", "scripts"]) - assert result == [".", "references", "assets", "scripts"] - - def test_default_resource_directories(self) -> None: - assert DEFAULT_RESOURCE_DIRECTORIES == ("references", "assets") - - def test_default_script_directories(self) -> None: - assert DEFAULT_SCRIPT_DIRECTORIES == ("scripts",) - - def test_root_directory_indicator_is_dot(self) -> None: - assert ROOT_DIRECTORY_INDICATOR == "." - - -class TestFileSkillsSourceDirectories: - """Tests for resource_directories and script_directories parameters.""" - - async def test_custom_resource_directories(self, tmp_path: Path) -> None: - """Custom resource_directories controls which dirs are scanned.""" + async def test_search_depth_controls_resource_discovery(self, tmp_path: Path) -> None: + """search_depth limits how deep resource files are discovered.""" skill_dir = tmp_path / "my-skill" - skill_dir.mkdir() + deep = skill_dir / "a" / "b" + deep.mkdir(parents=True) (skill_dir / "SKILL.md").write_text( "---\nname: my-skill\ndescription: test\n---\nBody", encoding="utf-8", ) - # Put resource in a custom directory - docs = skill_dir / "docs" - docs.mkdir() - (docs / "guide.md").write_text("guide", encoding="utf-8") - # Also put one in default references/ — should not be found + (skill_dir / "root.md").write_text("root", encoding="utf-8") + (skill_dir / "a" / "level1.md").write_text("l1", encoding="utf-8") + (deep / "level2.md").write_text("l2", encoding="utf-8") + + # depth=1: only root + source1 = FileSkillsSource(str(tmp_path), search_depth=1) + skills1 = await source1.get_skills() + names1 = [r.name for r in skills1[0]._resources] + assert "root.md" in names1 + assert "a/level1.md" not in names1 + + # depth=2 (default): root + one level + source2 = FileSkillsSource(str(tmp_path)) + skills2 = await source2.get_skills() + names2 = [r.name for r in skills2[0]._resources] + assert "root.md" in names2 + assert "a/level1.md" in names2 + assert "a/b/level2.md" not in names2 + + # depth=3: finds all + source3 = FileSkillsSource(str(tmp_path), search_depth=3) + skills3 = await source3.get_skills() + names3 = [r.name for r in skills3[0]._resources] + assert "a/b/level2.md" in names3 + + async def test_resource_filter_excludes_files(self, tmp_path: Path) -> None: + """resource_filter predicate controls which resources are included.""" + skill_dir = tmp_path / "my-skill" refs = skill_dir / "references" - refs.mkdir() - (refs / "ref.md").write_text("ref", encoding="utf-8") + refs.mkdir(parents=True) + (skill_dir / "SKILL.md").write_text( + "---\nname: my-skill\ndescription: test\n---\nBody", + encoding="utf-8", + ) + (refs / "keep.md").write_text("keep", encoding="utf-8") + (refs / "secret.md").write_text("secret", encoding="utf-8") - source = FileSkillsSource(str(tmp_path), resource_directories=["docs"]) + source = FileSkillsSource( + str(tmp_path), + resource_filter=lambda ctx: "secret" not in ctx.relative_file_path, + ) skills = await source.get_skills() resource_names = [r.name for r in skills[0]._resources] - assert "docs/guide.md" in resource_names - assert "references/ref.md" not in resource_names + assert "references/keep.md" in resource_names + assert "references/secret.md" not in resource_names - async def test_custom_script_directories(self, tmp_path: Path) -> None: - """Custom script_directories controls which dirs are scanned.""" + async def test_script_filter_excludes_files(self, tmp_path: Path) -> None: + """script_filter predicate controls which scripts are included.""" skill_dir = tmp_path / "my-skill" skill_dir.mkdir() (skill_dir / "SKILL.md").write_text( "---\nname: my-skill\ndescription: test\n---\nBody", encoding="utf-8", ) - # Put script in a custom directory - tools = skill_dir / "tools" - tools.mkdir() - (tools / "run.py").write_text("print('run')", encoding="utf-8") + (skill_dir / "run.py").write_text("print('run')", encoding="utf-8") + (skill_dir / "test_run.py").write_text("print('test')", encoding="utf-8") - source = FileSkillsSource(str(tmp_path), script_directories=["tools"]) + source = FileSkillsSource( + str(tmp_path), + script_filter=lambda ctx: not ctx.relative_file_path.startswith("test_"), + ) skills = await source.get_skills() script_names = [s.name for s in skills[0]._scripts] - assert "tools/run.py" in script_names + assert "run.py" in script_names + assert "test_run.py" not in script_names - async def test_root_indicator_discovers_root_files(self, tmp_path: Path) -> None: - """The '.' root indicator discovers files at the skill root.""" + async def test_from_paths_passes_search_depth(self, tmp_path: Path) -> None: + """from_paths passes search_depth through to FileSkillsSource.""" skill_dir = tmp_path / "my-skill" - skill_dir.mkdir() + deep = skill_dir / "sub" + deep.mkdir(parents=True) (skill_dir / "SKILL.md").write_text( "---\nname: my-skill\ndescription: test\n---\nBody", encoding="utf-8", ) - (skill_dir / "data.json").write_text("{}", encoding="utf-8") + (skill_dir / "root.md").write_text("root", encoding="utf-8") + (deep / "nested.md").write_text("nested", encoding="utf-8") - source = FileSkillsSource(str(tmp_path), resource_directories=[".", "references"]) - skills = await source.get_skills() - resource_names = [r.name for r in skills[0]._resources] - assert "data.json" in resource_names + # depth=1 should only find root + provider = SkillsProvider.from_paths(str(tmp_path), search_depth=1) + await _init_provider(provider) + skill = _ctx(provider)[0]["my-skill"] + resource_names = [r.name for r in skill._resources] + assert "root.md" in resource_names + assert "sub/nested.md" not in resource_names - async def test_from_paths_passes_directories(self, tmp_path: Path) -> None: - """from_paths passes resource_directories and script_directories through.""" + async def test_from_paths_passes_resource_filter(self, tmp_path: Path) -> None: + """from_paths passes resource_filter through.""" skill_dir = tmp_path / "my-skill" - docs = skill_dir / "docs" - docs.mkdir(parents=True) + skill_dir.mkdir() (skill_dir / "SKILL.md").write_text( "---\nname: my-skill\ndescription: test\n---\nBody", encoding="utf-8", ) - (docs / "guide.md").write_text("guide", encoding="utf-8") + (skill_dir / "keep.md").write_text("keep", encoding="utf-8") + (skill_dir / "drop.md").write_text("drop", encoding="utf-8") provider = SkillsProvider.from_paths( str(tmp_path), - resource_directories=["docs"], + resource_filter=lambda ctx: ctx.relative_file_path == "keep.md", ) await _init_provider(provider) skill = _ctx(provider)[0]["my-skill"] resource_names = [r.name for r in skill._resources] - assert "docs/guide.md" in resource_names + assert "keep.md" in resource_names + assert "drop.md" not in resource_names # --------------------------------------------------------------------------- @@ -3784,8 +3849,8 @@ async def test_discovers_py_files_in_scripts_dir(self, tmp_path: Path) -> None: assert len(skills["my-skill"]._scripts) == 1 assert skills["my-skill"]._scripts[0].name == "scripts/analyze.py" - async def test_root_py_files_not_discovered_by_default(self, tmp_path: Path) -> None: - """Scripts at the skill root are NOT discovered with default directories.""" + async def test_root_py_files_discovered_by_default(self, tmp_path: Path) -> None: + """Scripts at the skill root ARE discovered with default depth=2.""" skill_dir = tmp_path / "my-skill" skill_dir.mkdir() (skill_dir / "SKILL.md").write_text( @@ -3796,7 +3861,8 @@ async def test_root_py_files_not_discovered_by_default(self, tmp_path: Path) -> skills = await _discover_file_skills_for_test(str(tmp_path)) assert "my-skill" in skills - assert len(skills["my-skill"]._scripts) == 0 + assert len(skills["my-skill"]._scripts) == 1 + assert skills["my-skill"]._scripts[0].name == "analyze.py" async def test_discovered_script_has_absolute_full_path(self, tmp_path: Path) -> None: skill_dir = tmp_path / "my-skill" From 641c8fd056dfee6f5bcdfe6ebabed87bd72b61c4 Mon Sep 17 00:00:00 2001 From: Giles Odigwe Date: Thu, 11 Jun 2026 22:36:18 -0700 Subject: [PATCH 2/2] Address PR feedback: clarify depth constants and skip nested skill directories - Add clarifying comments distinguishing MAX_SEARCH_DEPTH (SKILL.md discovery) from DEFAULT_SEARCH_DEPTH (per-skill resource/script scanning). - Stop recursing into subdirectories that contain their own SKILL.md, preventing child skill files from being attached to the parent skill. - Add test verifying nested skill boundary is respected. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../packages/core/agent_framework/_skills.py | 12 ++++++ .../packages/core/tests/core/test_skills.py | 38 +++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/python/packages/core/agent_framework/_skills.py b/python/packages/core/agent_framework/_skills.py index 0ce756e9340..ad70d8125e0 100644 --- a/python/packages/core/agent_framework/_skills.py +++ b/python/packages/core/agent_framework/_skills.py @@ -1532,6 +1532,8 @@ def __call__( # endregion SKILL_FILE_NAME: Final[str] = "SKILL.md" +# How deep to search for SKILL.md files within the top-level skill_paths directories. +# This is separate from DEFAULT_SEARCH_DEPTH which controls per-skill resource/script scanning. MAX_SEARCH_DEPTH: Final[int] = 2 MAX_NAME_LENGTH: Final[int] = 64 MAX_DESCRIPTION_LENGTH: Final[int] = 1024 @@ -1546,6 +1548,8 @@ def __call__( ".txt", ) DEFAULT_SCRIPT_EXTENSIONS: Final[tuple[str, ...]] = (".py",) +# How deep to scan for resource/script files within each individual skill directory. +# This is separate from MAX_SEARCH_DEPTH which controls SKILL.md discovery. DEFAULT_SEARCH_DEPTH: Final[int] = 2 @@ -2796,6 +2800,10 @@ def _scan_directory_for_resources( # Recurse into subdirectories if within depth limit if current_depth < self._search_depth: for subdir in subdirectories: + # Skip subdirectories that contain their own SKILL.md — they are + # separate skills and their files should not be attached to this one. + if (subdir / SKILL_FILE_NAME).is_file(): + continue self._scan_directory_for_resources( target_dir=subdir, skill_dir=skill_dir, @@ -2945,6 +2953,10 @@ def _scan_directory_for_scripts( # Recurse into subdirectories if within depth limit if current_depth < self._search_depth: for subdir in subdirectories: + # Skip subdirectories that contain their own SKILL.md — they are + # separate skills and their files should not be attached to this one. + if (subdir / SKILL_FILE_NAME).is_file(): + continue self._scan_directory_for_scripts( target_dir=subdir, skill_dir=skill_dir, diff --git a/python/packages/core/tests/core/test_skills.py b/python/packages/core/tests/core/test_skills.py index c2c8a740a11..12e89a9ed43 100644 --- a/python/packages/core/tests/core/test_skills.py +++ b/python/packages/core/tests/core/test_skills.py @@ -1831,6 +1831,44 @@ async def test_from_paths_passes_resource_filter(self, tmp_path: Path) -> None: assert "keep.md" in resource_names assert "drop.md" not in resource_names + async def test_nested_skill_directory_not_crossed(self, tmp_path: Path) -> None: + """Files in a nested skill directory are NOT attached to the parent skill.""" + parent_dir = tmp_path / "parent-skill" + child_dir = parent_dir / "child-skill" + child_dir.mkdir(parents=True) + (parent_dir / "SKILL.md").write_text( + "---\nname: parent-skill\ndescription: parent\n---\nParent body", + encoding="utf-8", + ) + (parent_dir / "parent-resource.md").write_text("parent", encoding="utf-8") + (child_dir / "SKILL.md").write_text( + "---\nname: child-skill\ndescription: child\n---\nChild body", + encoding="utf-8", + ) + (child_dir / "child-resource.md").write_text("child", encoding="utf-8") + (child_dir / "child-script.py").write_text("print('child')", encoding="utf-8") + + source = FileSkillsSource(str(tmp_path), search_depth=3) + skills = await source.get_skills() + skills_dict = {s.frontmatter.name: s for s in skills} + + # Both skills are discovered + assert "parent-skill" in skills_dict + assert "child-skill" in skills_dict + + # Parent does NOT pick up child's files + parent_resources = [r.name for r in skills_dict["parent-skill"]._resources] + parent_scripts = [s.name for s in skills_dict["parent-skill"]._scripts] + assert "parent-resource.md" in parent_resources + assert "child-skill/child-resource.md" not in parent_resources + assert "child-skill/child-script.py" not in parent_scripts + + # Child has its own files + child_resources = [r.name for r in skills_dict["child-skill"]._resources] + child_scripts = [s.name for s in skills_dict["child-skill"]._scripts] + assert "child-resource.md" in child_resources + assert "child-script.py" in child_scripts + # --------------------------------------------------------------------------- # Tests: _is_path_within_directory