Conversation
Migrated from FalkorDB/code-graph-backend PR #95. Original issue: FalkorDB/code-graph-backend#93 Resolves #531 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
📝 WalkthroughWalkthroughThis PR adds Kotlin language support to the code analyzer while refactoring the symbol resolution architecture. The Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| if any(path.rglob('*.cs')): | ||
| config = MultilspyConfig.from_dict({"code_language": "csharp"}) | ||
| lsps[".cs"] = SyncLanguageServer.create(config, logger, str(path)) | ||
| if any(path.rglob('*.kt')) or any(path.rglob('*.kts')): |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression High
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 22 hours ago
In general, to fix uncontrolled path usage, you should constrain user-provided paths to lie under a known safe root and validate them after normalization. That usually means: (1) decide on an allowed base directory (for example, a projects root), (2) convert user input into a Path relative to that base or reject absolute paths, (3) call .resolve() (or os.path.realpath) and then confirm that the resolved path is within the base directory using a prefix or ancestor check (is_relative_to or try/except with relative_to), and (4) only then use that path for filesystem traversal or as a workspace root.
For this codebase, the best non-breaking fix is to add validation inside SourceAnalyzer.analyze_local_folder, since that is where the untrusted path: str first becomes a Path and before it is used by analyze_sources and second_pass. We can introduce a constant safe root (for example, the directory where repositories live) and then:
- If the incoming
pathis absolute, interpret it as relative to the safe root only if it does not escape that root; otherwise reject it. - Normalize with
Path(...).resolve()to eliminate..segments and symbolic links, then verify it is under the safe root viaresolved_path.is_relative_to(base_dir)(Python 3.9+) or atry: resolved_path.relative_to(base_dir)check. - If validation fails, log an error and raise a clear exception. The HTTP layer already catches exceptions around repository operations in other endpoints and can be adapted if necessary; within
tests/index.pythe test route still just callsanalyze_local_folder, so raising will surface as an internal error, which is acceptable for a security fix.
Concretely, in api/analyzers/source_analyzer.py around analyze_local_folder, we will:
- Add an import for
osat the top (a standard library module) because we may want to useos.pathconstants if needed, though we can largely usePath. - Replace the direct use of
Path(path)with a small normalization/validation sequence:- Define a base directory, for example using an environment variable
CODE_GRAPH_PROJECTS_ROOTwith a sensible default (such as the current working directory or areposdirectory under it). - Build
base_path = Path(base_root).resolve(). - Build
user_path = (base_path / path).resolve()ifpathis relative, orPath(path).resolve()if absolute, then ensureuser_pathis underbase_pathusing.relative_to. If not, raiseValueError("Path is outside of allowed root").
- Define a base directory, for example using an environment variable
- Pass this validated
user_pathintoanalyze_sourcesinstead of the raw, possibly unsafePath(path).
This keeps all existing analyzer functionality, but prevents a client from using HTTP to point the analyzer at arbitrary directories outside the intended root. It also addresses all the variants, since every call path goes through analyze_local_folder.
| @@ -20,6 +20,8 @@ | ||
| # Configure logging | ||
| logging.basicConfig(level=logging.DEBUG, format='%(filename)s - %(asctime)s - %(levelname)s - %(message)s') | ||
|
|
||
| import os | ||
|
|
||
| # List of available analyzers | ||
| analyzers: dict[str, AbstractAnalyzer] = { | ||
| # '.c': CAnalyzer(), | ||
| @@ -191,9 +193,27 @@ | ||
|
|
||
| logging.info(f"Analyzing local folder {path}") | ||
|
|
||
| # Analyze source files | ||
| self.analyze_sources(Path(path), ignore, g) | ||
| # Determine the allowed base directory for analysis. | ||
| # Default to the current working directory if not explicitly configured. | ||
| base_root = os.environ.get("CODE_GRAPH_PROJECTS_ROOT", os.getcwd()) | ||
| base_path = Path(base_root).resolve() | ||
|
|
||
| # Build and normalize the requested analysis path. | ||
| requested_path = Path(path) | ||
| if not requested_path.is_absolute(): | ||
| requested_path = (base_path / requested_path) | ||
| resolved_path = requested_path.resolve() | ||
|
|
||
| # Ensure the resolved path is within the allowed base directory. | ||
| try: | ||
| resolved_path.relative_to(base_path) | ||
| except ValueError: | ||
| logging.error("Requested path '%s' is outside of allowed root '%s'", resolved_path, base_path) | ||
| raise ValueError("Requested path is outside of allowed root directory") | ||
|
|
||
| # Analyze source files using the validated path | ||
| self.analyze_sources(resolved_path, ignore, g) | ||
|
|
||
| logging.info("Done analyzing path") | ||
|
|
||
| def analyze_local_repository(self, path: str, ignore: Optional[list[str]] = None) -> Graph: |
| if any(path.rglob('*.cs')): | ||
| config = MultilspyConfig.from_dict({"code_language": "csharp"}) | ||
| lsps[".cs"] = SyncLanguageServer.create(config, logger, str(path)) | ||
| if any(path.rglob('*.kt')) or any(path.rglob('*.kts')): |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression High
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 22 hours ago
General approach: normalize user-provided paths and constrain them to a configured safe root directory. Convert the incoming string to an absolute, normalized Path (using resolve()), then ensure it is a subpath of a “projects root” directory that the server is allowed to analyze. Reject paths outside that root before calling analyze_local_folder. This addresses all flows where tainted path is turned into a Path and used with rglob or as a base for other paths.
Best concrete fix with minimal behavior change:
- In
tests/index.py, before usingpath, resolve it to an absolute path and ensure it is under a configured root (e.g., from an environment variable likeCODE_GRAPH_PROJECTS_ROOT, defaulting to the current working directory for backward compatibility). Ifpathescapes that root, reject the request with a 400 error. - In
SourceAnalyzer.analyze_local_folder, immediately convert the incoming string to a normalizedPathviaPath(path).resolve()and then pass that normalizedPathtoanalyze_sources. This limits propagation of any oddities like..segments and ensures downstream code uses a canonical path. Sinceanalyze_local_folderis the main entry point for string paths, tightening it is low-impact and addresses all future call sites as well. - The
second_passandanalyze_sourcesmethods already operate onPathobjects; with the above normalization in place, no further change is required there.
Concretely:
-
In
tests/index.py, in the handler that callsanalyze_local_folder(path, g, ignore)(lines 340–369 region), normalize and confinepathbefore validation and use:- Add
from pathlib import Pathis already present, andosis imported, so no new imports are needed. - Introduce something like:
root = Path(os.environ.get("CODE_GRAPH_PROJECTS_ROOT", os.getcwd())).resolve() requested_path = Path(path).resolve() if root not in requested_path.parents and requested_path != root: logging.error("Path '%s' is outside of the allowed root '%s'", requested_path, root) return jsonify({"status": "Invalid path: outside of allowed root directory"}), 400 path = str(requested_path)
- Then keep the existing
os.path.isdir(path)check.
- Add
-
In
api/analyzers/source_analyzer.py, inanalyze_local_folder, convertpathonce to a resolvedPathand pass that intoanalyze_sources:resolved_path = Path(path).resolve() logging.info(f"Analyzing local folder {resolved_path}") self.analyze_sources(resolved_path, ignore, g)
This preserves existing functionality (still analyzes the same directory, only now canonicalized) and stops arbitrary “weird” paths from flowing unprocessed, while tests/index.py enforces that untrusted HTTP clients can only request analysis within the configured root directory.
| @@ -189,10 +189,11 @@ | ||
| ignore (List(str)): List of paths to skip | ||
| """ | ||
|
|
||
| logging.info(f"Analyzing local folder {path}") | ||
| resolved_path = Path(path).resolve() | ||
| logging.info(f"Analyzing local folder {resolved_path}") | ||
|
|
||
| # Analyze source files | ||
| self.analyze_sources(Path(path), ignore, g) | ||
| self.analyze_sources(resolved_path, ignore, g) | ||
|
|
||
| logging.info("Done analyzing path") | ||
|
|
| @@ -349,6 +349,14 @@ | ||
| logging.error("'path' is missing from the request.") | ||
| return jsonify({"status": "'path' is required."}), 400 | ||
|
|
||
| # Normalize and confine path to an allowed root directory | ||
| projects_root = Path(os.environ.get("CODE_GRAPH_PROJECTS_ROOT", os.getcwd())).resolve() | ||
| requested_path = Path(path).resolve() | ||
| if projects_root not in requested_path.parents and requested_path != projects_root: | ||
| logging.error("Path '%s' is outside of the allowed root '%s'", requested_path, projects_root) | ||
| return jsonify({"status": "Invalid path: outside of allowed root directory"}), 400 | ||
| path = str(requested_path) | ||
|
|
||
| # Validate path exists and is a directory | ||
| if not os.path.isdir(path): | ||
| logging.error(f"Path '{path}' does not exist or is not a directory") |
| def analyze_sources(self, path: Path, ignore: list[str], graph: Graph) -> None: | ||
| path = path.resolve() | ||
| files = list(path.rglob("*.java")) + list(path.rglob("*.py")) + list(path.rglob("*.cs")) | ||
| files = list(path.rglob("*.java")) + list(path.rglob("*.py")) + list(path.rglob("*.kt")) + list(path.rglob("*.kts")) |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression High
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 22 hours ago
In general, to fix this kind of issue you should restrict user-supplied paths so they cannot escape a designated safe root. For server-side code, that typically means: define a base directory under which analysis is allowed; combine the user input with that base; normalize the result (e.g., via Path.resolve() or os.path.realpath()); then verify that the normalized path is still within the base before using it to walk the filesystem.
For this codebase, the single best fix with minimal functional change is:
- In the route handler in
tests/index.py, normalize and constrain the user-providedpathto a configured base directory before passing it intoSourceAnalyzer.analyze_local_folder. That prevents analysis of arbitrary locations while keeping the rest of the analyzer logic unchanged. - Additionally, harden
SourceAnalyzer.analyze_local_folderinapi/analyzers/source_analyzer.pyby normalizing itspathargument withPath(path).resolve()and then using that resolvedPathconsistently. This ensures that even other call sites (such asanalyze_local_repository) benefit from normalization, and it ensures that downstream uses (likeanalyze_sources) see a canonical path.
Concretely:
- In
tests/index.py, after validating thatpathis a directory, convert it to an absolute, normalizedPathobject, derive asafe_base(for example from an environment variable likeCODE_GRAPH_BASE_DIRor defaulting to the current working directory or another project root), and then check thatresolved_pathis withinsafe_base(using.resolve()and a.relative_to()try/except). If the check fails, return HTTP 400. Pass the normalized string/path intoanalyzer.analyze_local_folder. - In
api/analyzers/source_analyzer.py, inanalyze_local_folder, convertpathtobase_path = Path(path).resolve()once and usebase_pathfor logging and foranalyze_sources. This prevents relative path oddities and guaranteesanalyze_sourcesalways works with a normalizedPath.
No new imports are needed beyond pathlib.Path and os, which are already used in these files.
| @@ -189,10 +189,11 @@ | ||
| ignore (List(str)): List of paths to skip | ||
| """ | ||
|
|
||
| logging.info(f"Analyzing local folder {path}") | ||
| base_path = Path(path).resolve() | ||
| logging.info(f"Analyzing local folder {base_path}") | ||
|
|
||
| # Analyze source files | ||
| self.analyze_sources(Path(path), ignore, g) | ||
| self.analyze_sources(base_path, ignore, g) | ||
|
|
||
| logging.info("Done analyzing path") | ||
|
|
| @@ -354,19 +354,30 @@ | ||
| logging.error(f"Path '{path}' does not exist or is not a directory") | ||
| return jsonify({"status": "Invalid path: must be an existing directory"}), 400 | ||
|
|
||
| # Normalize and restrict the analysis path to a safe base directory | ||
| user_path = Path(path).resolve() | ||
| # Define the base directory under which analysis is allowed; defaults to current working directory | ||
| base_dir = Path(os.environ.get("CODE_GRAPH_BASE_DIR", os.getcwd())).resolve() | ||
| try: | ||
| # This will raise ValueError if user_path is not within base_dir | ||
| user_path.relative_to(base_dir) | ||
| except ValueError: | ||
| logging.error(f"Path '{user_path}' is outside the allowed base directory '{base_dir}'") | ||
| return jsonify({"status": "Invalid path: must be within the allowed base directory"}), 400 | ||
|
|
||
| # Validate ignore list contains valid paths | ||
| if not isinstance(ignore, list): | ||
| logging.error("'ignore' must be a list of paths") | ||
| return jsonify({"status": "'ignore' must be a list of paths"}), 400 | ||
|
|
||
| proj_name = Path(path).name | ||
| proj_name = user_path.name | ||
|
|
||
| # Initialize the graph with the provided project name | ||
| g = Graph(proj_name) | ||
|
|
||
| # Analyze source code within given folder | ||
| analyzer = SourceAnalyzer() | ||
| analyzer.analyze_local_folder(path, g, ignore) | ||
| analyzer.analyze_local_folder(str(user_path), g, ignore) | ||
|
|
||
| # Return response | ||
| response = { |
| def analyze_sources(self, path: Path, ignore: list[str], graph: Graph) -> None: | ||
| path = path.resolve() | ||
| files = list(path.rglob("*.java")) + list(path.rglob("*.py")) + list(path.rglob("*.cs")) | ||
| files = list(path.rglob("*.java")) + list(path.rglob("*.py")) + list(path.rglob("*.kt")) + list(path.rglob("*.kts")) |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression High
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 22 hours ago
In general, to fix this issue you should not let arbitrary user‑supplied paths control which part of the filesystem is traversed. Instead, define a safe root directory (or an allow‑list of roots), normalize the user input, and ensure the resolved path is contained within that root before using it for any filesystem operations like rglob.
For this codebase, the minimal change that preserves behavior is to (a) introduce a helper that validates a user‑provided path against a configured base directory, and (b) call it in SourceAnalyzer.analyze_local_folder before converting to Path and traversing. A reasonable base directory is the repository root or a subdirectory configured via environment variable; to avoid assumptions about project layout, we can resolve the provided path to an absolute path and ensure it is under the current working directory (or another obvious base such as an environment variable, if present). The validation should: use Path(path).resolve() to normalize any .. segments or symlinks, compute the allowed base as Path(os.getcwd()).resolve() (or similar) and ensure resolved_path.is_dir() and allowed_base in resolved_path.parents or resolved_path == allowed_base. If the check fails, log and raise an exception rather than proceeding.
Concretely, in api/analyzers/source_analyzer.py you will: (1) import os for environment access if needed, (2) add a small private method _validate_root_path or a local check in analyze_local_folder that resolves and validates the incoming path string, returning a safe Path object, and (3) replace the direct Path(path) call in analyze_local_folder with the validated path returned by this helper. No changes are needed in tests/index.py or api/index.py because the taint will be neutralized within the analyzer. This fixes all variants of the alert because every entry point (analyze_local_folder, analyze_local_repository) funnels through this validation before rglob or repository operations.
| @@ -180,6 +180,32 @@ | ||
| # Second pass analysis of the source code | ||
| self.second_pass(graph, files, path) | ||
|
|
||
| def _validate_root_path(self, path: str) -> Path: | ||
| """ | ||
| Validate and normalize the root path to ensure it is safe to traverse. | ||
| Currently, this confines analysis to directories under the current | ||
| working directory. | ||
| """ | ||
| base_path = Path.cwd().resolve() | ||
| try: | ||
| resolved_path = Path(path).resolve() | ||
| except Exception as e: | ||
| logging.error(f"Failed to resolve path '%s': %s", path, e) | ||
| raise | ||
|
|
||
| if not resolved_path.is_dir(): | ||
| logging.error("Provided path '%s' is not a directory after resolution", resolved_path) | ||
| raise ValueError(f"Invalid path: '{path}' is not a directory") | ||
|
|
||
| try: | ||
| # Ensure resolved_path is within base_path | ||
| resolved_path.relative_to(base_path) | ||
| except ValueError: | ||
| logging.error("Provided path '%s' is outside of allowed base '%s'", resolved_path, base_path) | ||
| raise ValueError("Invalid path: outside of allowed root") | ||
|
|
||
| return resolved_path | ||
|
|
||
| def analyze_local_folder(self, path: str, g: Graph, ignore: Optional[list[str]] = []) -> None: | ||
| """ | ||
| Analyze path. | ||
| @@ -191,8 +217,10 @@ | ||
|
|
||
| logging.info(f"Analyzing local folder {path}") | ||
|
|
||
| safe_root = self._validate_root_path(path) | ||
|
|
||
| # Analyze source files | ||
| self.analyze_sources(Path(path), ignore, g) | ||
| self.analyze_sources(safe_root, ignore, g) | ||
|
|
||
| logging.info("Done analyzing path") | ||
|
|
| def analyze_sources(self, path: Path, ignore: list[str], graph: Graph) -> None: | ||
| path = path.resolve() | ||
| files = list(path.rglob("*.java")) + list(path.rglob("*.py")) + list(path.rglob("*.cs")) | ||
| files = list(path.rglob("*.java")) + list(path.rglob("*.py")) + list(path.rglob("*.kt")) + list(path.rglob("*.kts")) |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression High
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 22 hours ago
General approach: normalize any user-supplied path before using it for filesystem traversal and ensure it lies under a configured safe root directory. This protects operations like path.rglob(...) from being used to scan arbitrary locations. The safest place to do this is at the boundary where untrusted str becomes a Path (inside SourceAnalyzer.analyze_local_folder and analyze_local_repository), so all downstream methods can assume they receive safe Path objects.
Concrete fix in api/analyzers/source_analyzer.py:
-
Decide on a safe root. A natural choice is the current working directory (server root) or a directory pointed to by an environment variable. Since we must not add new imports beyond “well-known” ones and can’t see the rest of the project, we’ll conservatively restrict analysis to be under the server’s current working directory (
Path.cwd()), which is at least stable and predictable. If your app already has a notion of a repos directory, you can later replacesafe_rootwith that. -
In
analyze_local_folder(self, path: str, g: Graph, ignore: Optional[list[str]] = []), instead of passingPath(path)directly toanalyze_sources, build aPathfrom the string, resolve it to an absolute canonical path (.resolve()), and verify it is withinsafe_root. If not, log and raise a clear exception. Example logic:base_path = Path.cwd().resolve()target = Path(path).expanduser().resolve()- If
base_path not in target.parents and target != base_path: raise ValueError(...) - Then call
self.analyze_sources(target, ignore, g)
This uses
Path.resolve()to normalize..and symbolic links, which is thepathlibequivalent ofos.path.realpath. Usingis_relative_towould be ideal in Python 3.9+, but to maintain compatibility, we can checkbase_path in target.parents or target == base_path. -
Apply the same validation in
analyze_local_repository(self, path: str, ignore: Optional[list[str]] = None). Herepathagain comes from untrusted sources (e.g., via API), and we should ensure the repository lies under the same safe root. Normalize and validate the path before:- Constructing
proj_name = Path(path).name - Instantiating
Repository(path) - Calling
self.analyze_local_folder(path, graph, ignore)
After validation, we should use the normalized/safe
str(e.g.,safe_path_str = str(target)) consistently forRepositoryand for forwarding toanalyze_local_folder. - Constructing
-
No changes are needed to
analyze_sources,analyze_files,first_pass, orsecond_pass, because they already operate onPathobjects and lists of files built from that base. By ensuring the basePathitself is safe, those calls become safe. -
We can reuse existing imports:
Pathfrompathlibis already imported. We will add no new external dependencies.
This fix does not change the core functionality for “normal” use (analyzing directories under the server’s working directory), but prevents clients from making the server analyze or access arbitrary directories outside that root.
| @@ -189,10 +189,17 @@ | ||
| ignore (List(str)): List of paths to skip | ||
| """ | ||
|
|
||
| logging.info(f"Analyzing local folder {path}") | ||
| # Normalize and validate the provided path against a safe root | ||
| base_path = Path.cwd().resolve() | ||
| target_path = Path(path).expanduser().resolve() | ||
| if target_path != base_path and base_path not in target_path.parents: | ||
| logging.error("Refusing to analyze folder outside of safe root. Safe root: %s, requested: %s", base_path, target_path) | ||
| raise ValueError(f"Path '{path}' is outside of the allowed root directory") | ||
|
|
||
| logging.info(f"Analyzing local folder {target_path}") | ||
|
|
||
| # Analyze source files | ||
| self.analyze_sources(Path(path), ignore, g) | ||
| self.analyze_sources(target_path, ignore, g) | ||
|
|
||
| logging.info("Done analyzing path") | ||
|
|
||
| @@ -207,14 +212,21 @@ | ||
| if ignore is None: | ||
| ignore = [] | ||
|
|
||
| # Normalize and validate the repository path against a safe root | ||
| base_path = Path.cwd().resolve() | ||
| repo_path = Path(path).expanduser().resolve() | ||
| if repo_path != base_path and base_path not in repo_path.parents: | ||
| logging.error("Refusing to analyze repository outside of safe root. Safe root: %s, requested: %s", base_path, repo_path) | ||
| raise ValueError(f"Repository path '{path}' is outside of the allowed root directory") | ||
|
|
||
| from pygit2.repository import Repository | ||
|
|
||
| proj_name = Path(path).name | ||
| proj_name = repo_path.name | ||
| graph = Graph(proj_name) | ||
| self.analyze_local_folder(path, graph, ignore) | ||
| self.analyze_local_folder(str(repo_path), graph, ignore) | ||
|
|
||
| # Save processed commit hash to the DB | ||
| repo = Repository(path) | ||
| repo = Repository(str(repo_path)) | ||
| current_commit = repo.walk(repo.head.target).__next__() | ||
| graph.set_graph_commit(current_commit.short_id) | ||
|
|
| def analyze_sources(self, path: Path, ignore: list[str], graph: Graph) -> None: | ||
| path = path.resolve() | ||
| files = list(path.rglob("*.java")) + list(path.rglob("*.py")) + list(path.rglob("*.cs")) | ||
| files = list(path.rglob("*.java")) + list(path.rglob("*.py")) + list(path.rglob("*.kt")) + list(path.rglob("*.kts")) |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression High
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 22 hours ago
In general, to fix this kind of problem you must constrain user-controlled paths before using them in filesystem operations. A common approach is to define a trusted “root” directory under which all operations must occur, resolve the user-provided value to an absolute/normalized path, and then verify that this resolved path is contained within the root (for example by checking that resolved_path.is_relative_to(root) in modern Python, or by comparing string prefixes after resolve() / normpath()).
For this codebase, the minimal change that addresses CodeQL’s concern without altering external behavior too much is to validate the path argument inside SourceAnalyzer.analyze_local_folder before it is converted to a Path and passed to analyze_sources. We can:
- Choose a safe root directory. A reasonable default in this project—without changing configuration mechanisms—is the current working directory (or potentially the directory where the process is running). This prevents clients from requesting analysis of arbitrary absolute directories elsewhere on the system.
- Convert the incoming
pathargument and the root toPathobjects, call.resolve()on both to normalize and remove..segments, and then ensure the user path is under the root. In Python 3.9+,Path.is_relative_to(root)is the cleanest way; otherwise we fall back to checking thatstr(resolved_path).startswith(str(root) + os.sep)or similar. - If the check fails, log and raise an exception or otherwise stop the analysis; this prevents traversal outside the root.
Concretely, in api/analyzers/source_analyzer.py we will:
- Import
ossince we needos.getcwd()and, for compatibility, possiblyos.path.commonpathif we avoidis_relative_to. - Add normalization and containment checks in
analyze_local_folderbefore callingself.analyze_sources(Path(path), ...). - Keep the external API (
analyze_local_folder(path: str, ...)) and its call sites unchanged, so the rest of the application logic does not need modification.
No changes are required in tests/index.py or api/index.py for this particular issue; they will continue to pass the untrusted path string to analyze_local_folder, which now enforces safe usage.
| @@ -1,6 +1,7 @@ | ||
| from contextlib import nullcontext | ||
| from pathlib import Path | ||
| from typing import Optional | ||
| import os | ||
|
|
||
| from api.entities.entity import Entity | ||
| from api.entities.file import File | ||
| @@ -191,8 +192,27 @@ | ||
|
|
||
| logging.info(f"Analyzing local folder {path}") | ||
|
|
||
| # Normalize and validate the path against a safe root (current working directory) | ||
| root_path = Path(os.getcwd()).resolve() | ||
| try: | ||
| target_path = Path(path).resolve() | ||
| except OSError as e: | ||
| logging.error(f"Failed to resolve path '{path}': {e}") | ||
| raise | ||
|
|
||
| # Ensure the requested path is within the allowed root directory | ||
| try: | ||
| is_subpath = target_path.is_relative_to(root_path) # Python 3.9+ | ||
| except AttributeError: | ||
| # Fallback for Python versions without Path.is_relative_to | ||
| is_subpath = os.path.commonpath([str(root_path), str(target_path)]) == str(root_path) | ||
|
|
||
| if not is_subpath: | ||
| logging.error(f"Path '{target_path}' is outside of the allowed root '{root_path}'") | ||
| raise ValueError("Invalid path: outside of allowed root directory") | ||
|
|
||
| # Analyze source files | ||
| self.analyze_sources(Path(path), ignore, g) | ||
| self.analyze_sources(target_path, ignore, g) | ||
|
|
||
| logging.info("Done analyzing path") | ||
|
|
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
api/analyzers/analyzer.py (1)
136-147:⚠️ Potential issue | 🟠 MajorDon't narrow
resolve_symbol()until the caller model changes too.
api/entities/entity.pystill iterates over the resolver result, and the concrete analyzers still buildlist[Entity]viaresolve_type()/resolve_method(). With this abstract signature set toEntity, the contract is internally inconsistent and a literal implementation will raise as soon as the caller tries to iterate it.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api/analyzers/analyzer.py` around lines 136 - 147, The abstract method resolve_symbol currently returns Entity but callers (api/entities/entity.py) iterate over its result and concrete analyzers implement resolve_type/resolve_method to return list[Entity], so change the resolve_symbol signature to return an iterable of entities (e.g., Iterable[Entity] or list[Entity]) to match callers; update the abstract method in analyzer.py (resolve_symbol) and any related abstract signatures (resolve_type/resolve_method) to the chosen iterable type so implementations and callers stay consistent until the caller model is intentionally changed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@api/analyzers/kotlin/analyzer.py`:
- Around line 68-91: The code currently adds the same captured user_type nodes
as both "base_class" and "implement_interface" for class_declaration; instead,
take the first captured user_type as the superclass (base_class) and treat any
remaining captured user_type nodes as interfaces (implement_interface). In
practice, change the logic around the delegation_specifier/type_identifier
captures in the class_declaration branch (the queries that populate
superclass_captures and interface_captures) to collect a single ordered list of
captured user_type/type_identifier nodes, add the first element via
entity.add_symbol("base_class", ...) and add any subsequent elements via
entity.add_symbol("implement_interface", ...); leave the object_declaration
branch emitting only implement_interface as-is.
- Around line 70-112: Several Query objects (superclass_query, interface_query,
query, param_query, return_type_query) are calling .captures() directly which
Tree-sitter 0.25.x doesn't support; replace each call like
superclass_query.captures(entity.node) with the analyzer helper
self._captures(<pattern>, entity.node) (use the original query pattern string
passed to self.language.query for the first argument) so all six query usages
use self._captures(pattern, node) consistently (update the two interface_query
usages, the superclass_query, the call-expression query, the parameter query,
and the return_type_query).
In `@api/analyzers/source_analyzer.py`:
- Around line 142-149: The current conditional always assigns
NullLanguageServer() to lsps[".kt"] and lsps[".kts"], so Kotlin symbol
resolution remains disabled; update the branch so that when
any(path.rglob('*.kt')) or any(path.rglob('*.kts')) is true you instantiate and
assign the actual Kotlin language server (e.g., KotlinLanguageServer or the
project's kotlin-language-server wrapper) to lsps[".kt"] and lsps[".kts"]
instead of NullLanguageServer(), ensure those keys are started in the context
manager call (the with lsps[".java"].start_server(), ... block), and keep
NullLanguageServer() only in the else branch so AbstractAnalyzer.resolve() and
request_definition() can resolve Kotlin lookups correctly.
- Around line 156-169: The loop is using raw tree-sitter Node IDs (symbol.id)
when calling graph.connect_entities, but connect_entities expects the graph
database entity IDs produced by graph.add_entity; this miswires relations.
Replace uses of symbol.id with the resolved graph entity ID for that symbol
(lookup the entity ID created earlier when the symbol node was added—e.g., use
whatever mapping or helper you maintain from AST Node to graph entity ID, such
as a node_to_entity_id/map or a graph.get_entity_id_for_node(node) function) so
that
graph.connect_entities("EXTENDS"/"IMPLEMENTS"/"CALLS"/"RETURNS"/"PARAMETERS",
entity.id, resolved_symbol_entity_id) is called instead of passing symbol.id.
Ensure you handle cases where a symbol node (like parameter or return-type) may
not have been added as an entity by checking the lookup and skipping or creating
the entity before connecting.
---
Outside diff comments:
In `@api/analyzers/analyzer.py`:
- Around line 136-147: The abstract method resolve_symbol currently returns
Entity but callers (api/entities/entity.py) iterate over its result and concrete
analyzers implement resolve_type/resolve_method to return list[Entity], so
change the resolve_symbol signature to return an iterable of entities (e.g.,
Iterable[Entity] or list[Entity]) to match callers; update the abstract method
in analyzer.py (resolve_symbol) and any related abstract signatures
(resolve_type/resolve_method) to the chosen iterable type so implementations and
callers stay consistent until the caller model is intentionally changed.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 9dd188fc-f654-43b9-aafe-5fbc4b4919ae
📒 Files selected for processing (8)
api/analyzers/analyzer.pyapi/analyzers/java/analyzer.pyapi/analyzers/kotlin/__init__.pyapi/analyzers/kotlin/analyzer.pyapi/analyzers/python/analyzer.pyapi/analyzers/source_analyzer.pyapi/entities/entity.pypyproject.toml
| if entity.node.type == 'class_declaration': | ||
| # Find superclass (extends) | ||
| superclass_query = self.language.query("(delegation_specifier (user_type (type_identifier) @superclass))") | ||
| superclass_captures = superclass_query.captures(entity.node) | ||
| if 'superclass' in superclass_captures: | ||
| for superclass in superclass_captures['superclass']: | ||
| entity.add_symbol("base_class", superclass) | ||
|
|
||
| # Find interfaces (implements) | ||
| # In Kotlin, both inheritance and interface implementation use the same syntax | ||
| # We'll treat all as interfaces for now since Kotlin can only extend one class | ||
| interface_query = self.language.query("(delegation_specifier (user_type (type_identifier) @interface))") | ||
| interface_captures = interface_query.captures(entity.node) | ||
| if 'interface' in interface_captures: | ||
| for interface in interface_captures['interface']: | ||
| entity.add_symbol("implement_interface", interface) | ||
|
|
||
| elif entity.node.type == 'object_declaration': | ||
| # Objects can also have delegation specifiers | ||
| interface_query = self.language.query("(delegation_specifier (user_type (type_identifier) @interface))") | ||
| interface_captures = interface_query.captures(entity.node) | ||
| if 'interface' in interface_captures: | ||
| for interface in interface_captures['interface']: | ||
| entity.add_symbol("implement_interface", interface) |
There was a problem hiding this comment.
Separate Kotlin superclass and interface symbols before storing them.
The class branch records the same captured user_type nodes as both base_class and implement_interface, while the object branch can only emit implement_interface. That guarantees incorrect inheritance metadata for Kotlin declarations.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@api/analyzers/kotlin/analyzer.py` around lines 68 - 91, The code currently
adds the same captured user_type nodes as both "base_class" and
"implement_interface" for class_declaration; instead, take the first captured
user_type as the superclass (base_class) and treat any remaining captured
user_type nodes as interfaces (implement_interface). In practice, change the
logic around the delegation_specifier/type_identifier captures in the
class_declaration branch (the queries that populate superclass_captures and
interface_captures) to collect a single ordered list of captured
user_type/type_identifier nodes, add the first element via
entity.add_symbol("base_class", ...) and add any subsequent elements via
entity.add_symbol("implement_interface", ...); leave the object_declaration
branch emitting only implement_interface as-is.
| superclass_query = self.language.query("(delegation_specifier (user_type (type_identifier) @superclass))") | ||
| superclass_captures = superclass_query.captures(entity.node) | ||
| if 'superclass' in superclass_captures: | ||
| for superclass in superclass_captures['superclass']: | ||
| entity.add_symbol("base_class", superclass) | ||
|
|
||
| # Find interfaces (implements) | ||
| # In Kotlin, both inheritance and interface implementation use the same syntax | ||
| # We'll treat all as interfaces for now since Kotlin can only extend one class | ||
| interface_query = self.language.query("(delegation_specifier (user_type (type_identifier) @interface))") | ||
| interface_captures = interface_query.captures(entity.node) | ||
| if 'interface' in interface_captures: | ||
| for interface in interface_captures['interface']: | ||
| entity.add_symbol("implement_interface", interface) | ||
|
|
||
| elif entity.node.type == 'object_declaration': | ||
| # Objects can also have delegation specifiers | ||
| interface_query = self.language.query("(delegation_specifier (user_type (type_identifier) @interface))") | ||
| interface_captures = interface_query.captures(entity.node) | ||
| if 'interface' in interface_captures: | ||
| for interface in interface_captures['interface']: | ||
| entity.add_symbol("implement_interface", interface) | ||
|
|
||
| elif entity.node.type == 'function_declaration': | ||
| # Find function calls | ||
| query = self.language.query("(call_expression) @reference.call") | ||
| captures = query.captures(entity.node) | ||
| if 'reference.call' in captures: | ||
| for caller in captures['reference.call']: | ||
| entity.add_symbol("call", caller) | ||
|
|
||
| # Find parameters with types | ||
| param_query = self.language.query("(parameter type: (user_type (type_identifier) @parameter))") | ||
| param_captures = param_query.captures(entity.node) | ||
| if 'parameter' in param_captures: | ||
| for parameter in param_captures['parameter']: | ||
| entity.add_symbol("parameters", parameter) | ||
|
|
||
| # Find return type | ||
| return_type_query = self.language.query("(function_declaration type: (user_type (type_identifier) @return_type))") | ||
| return_type_captures = return_type_query.captures(entity.node) | ||
| if 'return_type' in return_type_captures: | ||
| for return_type in return_type_captures['return_type']: |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In the Python tree-sitterpackage version 0.25.x, doesQueryexpose acaptures(node)method directly, or must captures be executed viaQueryCursor(query).captures(node)? Please use the official tree-sitter Python API documentation.
💡 Result:
In tree-sitter (py-tree-sitter) 0.25.x, Query does not expose a captures(node) method directly—its documented methods are things like capture_name, disable_capture, etc., but no captures. [1]
To execute captures, you must use QueryCursor.captures(node), i.e. create a cursor for the query and run captures through it (conceptually: QueryCursor(query).captures(node)). [2]
Sources (official docs): [1], [2]
🏁 Script executed:
cat -n api/analyzers/kotlin/analyzer.py | sed -n '70,112p'Repository: FalkorDB/code-graph
Length of output: 3029
🏁 Script executed:
# Check if similar issues exist in other analyzer files
rg "\.captures\(" api/analyzers/ -nRepository: FalkorDB/code-graph
Length of output: 1247
🏁 Script executed:
# Check how other analyzers use the query API
rg "_captures\(" api/analyzers/ -A 2 -B 2 | head -50Repository: FalkorDB/code-graph
Length of output: 4566
Use self._captures() instead of calling .captures() directly on query objects.
The Query object in tree-sitter 0.25.x does not expose a .captures() method. All six query calls at lines 71, 80, 88, 96, 103, and 110 must use self._captures(pattern, node) instead, which properly wraps the QueryCursor. Every other analyzer in the codebase follows this pattern; the Kotlin analyzer is the only one using the incorrect direct call syntax.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@api/analyzers/kotlin/analyzer.py` around lines 70 - 112, Several Query
objects (superclass_query, interface_query, query, param_query,
return_type_query) are calling .captures() directly which Tree-sitter 0.25.x
doesn't support; replace each call like superclass_query.captures(entity.node)
with the analyzer helper self._captures(<pattern>, entity.node) (use the
original query pattern string passed to self.language.query for the first
argument) so all six query usages use self._captures(pattern, node) consistently
(update the two interface_query usages, the superclass_query, the
call-expression query, the parameter query, and the return_type_query).
| if any(path.rglob('*.kt')) or any(path.rglob('*.kts')): | ||
| # For now, use NullLanguageServer for Kotlin as we need to set up kotlin-language-server | ||
| lsps[".kt"] = NullLanguageServer() | ||
| lsps[".kts"] = NullLanguageServer() | ||
| else: | ||
| lsps[".cs"] = NullLanguageServer() | ||
| with lsps[".java"].start_server(), lsps[".py"].start_server(), lsps[".cs"].start_server(): | ||
| lsps[".kt"] = NullLanguageServer() | ||
| lsps[".kts"] = NullLanguageServer() | ||
| with lsps[".java"].start_server(), lsps[".py"].start_server(), lsps[".kt"].start_server(), lsps[".kts"].start_server(): |
There was a problem hiding this comment.
Kotlin relation resolution is still disabled here.
Both branches assign NullLanguageServer() for .kt and .kts. Since AbstractAnalyzer.resolve() depends on request_definition(), once the edge wiring below is corrected every Kotlin lookup will still resolve to [].
🧰 Tools
🪛 GitHub Check: CodeQL
[failure] 142-142: Uncontrolled data used in path expression
This path depends on a user-provided value.
This path depends on a user-provided value.
[failure] 142-142: Uncontrolled data used in path expression
This path depends on a user-provided value.
This path depends on a user-provided value.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@api/analyzers/source_analyzer.py` around lines 142 - 149, The current
conditional always assigns NullLanguageServer() to lsps[".kt"] and lsps[".kts"],
so Kotlin symbol resolution remains disabled; update the branch so that when
any(path.rglob('*.kt')) or any(path.rglob('*.kts')) is true you instantiate and
assign the actual Kotlin language server (e.g., KotlinLanguageServer or the
project's kotlin-language-server wrapper) to lsps[".kt"] and lsps[".kts"]
instead of NullLanguageServer(), ensure those keys are started in the context
manager call (the with lsps[".java"].start_server(), ... block), and keep
NullLanguageServer() only in the else branch so AbstractAnalyzer.resolve() and
request_definition() can resolve Kotlin lookups correctly.
| for key, symbols in entity.symbols.items(): | ||
| for symbol in symbols: | ||
| if len(symbol.resolved_symbol) == 0: | ||
| continue | ||
| resolved_symbol = next(iter(symbol.resolved_symbol)) | ||
| if key == "base_class": | ||
| graph.connect_entities("EXTENDS", entity.id, resolved_symbol.id) | ||
| graph.connect_entities("EXTENDS", entity.id, symbol.id) | ||
| elif key == "implement_interface": | ||
| graph.connect_entities("IMPLEMENTS", entity.id, resolved_symbol.id) | ||
| graph.connect_entities("IMPLEMENTS", entity.id, symbol.id) | ||
| elif key == "extend_interface": | ||
| graph.connect_entities("EXTENDS", entity.id, resolved_symbol.id) | ||
| graph.connect_entities("EXTENDS", entity.id, symbol.id) | ||
| elif key == "call": | ||
| graph.connect_entities("CALLS", entity.id, resolved_symbol.id, {"line": symbol.symbol.start_point.row, "text": symbol.symbol.text.decode("utf-8")}) | ||
| graph.connect_entities("CALLS", entity.id, symbol.id) | ||
| elif key == "return_type": | ||
| graph.connect_entities("RETURNS", entity.id, resolved_symbol.id) | ||
| graph.connect_entities("RETURNS", entity.id, symbol.id) | ||
| elif key == "parameters": | ||
| graph.connect_entities("PARAMETERS", entity.id, resolved_symbol.id) | ||
| graph.connect_entities("PARAMETERS", entity.id, symbol.id) |
There was a problem hiding this comment.
Use resolved entity IDs here, not Node.id.
entity.symbols now holds raw tree-sitter Node objects. symbol.id is a parser node identifier, but Graph.connect_entities() expects the database ID returned by graph.add_entity(), so these relations now miss or miswire edges across every language. This is especially broken for parameter and return-type nodes, which are never graph entities on their own.
Suggested fix
- for key, symbols in entity.symbols.items():
- for symbol in symbols:
+ for key, resolved_entities in entity.resolved_symbols.items():
+ for resolved_entity in resolved_entities:
if key == "base_class":
- graph.connect_entities("EXTENDS", entity.id, symbol.id)
+ graph.connect_entities("EXTENDS", entity.id, resolved_entity.id)
elif key == "implement_interface":
- graph.connect_entities("IMPLEMENTS", entity.id, symbol.id)
+ graph.connect_entities("IMPLEMENTS", entity.id, resolved_entity.id)
elif key == "extend_interface":
- graph.connect_entities("EXTENDS", entity.id, symbol.id)
+ graph.connect_entities("EXTENDS", entity.id, resolved_entity.id)
elif key == "call":
- graph.connect_entities("CALLS", entity.id, symbol.id)
+ graph.connect_entities("CALLS", entity.id, resolved_entity.id)
elif key == "return_type":
- graph.connect_entities("RETURNS", entity.id, symbol.id)
+ graph.connect_entities("RETURNS", entity.id, resolved_entity.id)
elif key == "parameters":
- graph.connect_entities("PARAMETERS", entity.id, symbol.id)
+ graph.connect_entities("PARAMETERS", entity.id, resolved_entity.id)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@api/analyzers/source_analyzer.py` around lines 156 - 169, The loop is using
raw tree-sitter Node IDs (symbol.id) when calling graph.connect_entities, but
connect_entities expects the graph database entity IDs produced by
graph.add_entity; this miswires relations. Replace uses of symbol.id with the
resolved graph entity ID for that symbol (lookup the entity ID created earlier
when the symbol node was added—e.g., use whatever mapping or helper you maintain
from AST Node to graph entity ID, such as a node_to_entity_id/map or a
graph.get_entity_id_for_node(node) function) so that
graph.connect_entities("EXTENDS"/"IMPLEMENTS"/"CALLS"/"RETURNS"/"PARAMETERS",
entity.id, resolved_symbol_entity_id) is called instead of passing symbol.id.
Ensure you handle cases where a symbol node (like parameter or return-type) may
not have been added as an entity by checking the lookup and skipping or creating
the entity before connecting.
Migrated from falkordb/code-graph-backend#95
Summary
Implements Kotlin code analysis for graph representation alongside existing Python, Java, and C# support.
Changes:
KotlinAnalyzerimplementingAbstractAnalyzertree-sitter-kotlindependency.ktand.ktsextensionsResolves #531
Originally authored by @Copilot in falkordb/code-graph-backend#95
Summary by CodeRabbit
New Features
Changes
Refactor