Skip to content
4 changes: 2 additions & 2 deletions components/polylith/check/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from polylith.check import collect, grouping, report
from polylith.check import collect, report

__all__ = ["collect", "grouping", "report"]
__all__ = ["collect", "report"]
4 changes: 2 additions & 2 deletions components/polylith/check/collect.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from pathlib import Path
from typing import Set

from polylith import check, imports, workspace
from polylith import imports, workspace


def extract_bricks(paths: Set[Path], ns: str) -> dict:
all_imports = imports.fetch_all_imports(paths)

return check.grouping.extract_brick_imports(all_imports, ns)
return imports.extract_brick_imports(all_imports, ns)


def with_unknown_components(root: Path, ns: str, brick_imports: dict) -> dict:
Expand Down
6 changes: 3 additions & 3 deletions components/polylith/check/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Set

from polylith import imports, libs, workspace
from polylith.check import collect, grouping
from polylith.check import collect
from polylith.reporting import theme
from rich.console import Console

Expand Down Expand Up @@ -78,8 +78,8 @@ def extract_collected_imports(
ns: str, imports_in_bases: dict, imports_in_components: dict
) -> dict:
brick_imports = {
"bases": grouping.extract_brick_imports(imports_in_bases, ns),
"components": grouping.extract_brick_imports(imports_in_components, ns),
"bases": imports.grouping.extract_brick_imports(imports_in_bases, ns),
"components": imports.grouping.extract_brick_imports(imports_in_components, ns),
}

third_party_imports = {
Expand Down
10 changes: 10 additions & 0 deletions components/polylith/imports/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
from polylith.imports.grouping import (
extract_brick_imports,
extract_brick_imports_with_namespaces,
)
from polylith.imports.parser import (
extract_top_ns,
fetch_all_imports,
fetch_api,
fetch_brick_import_usages,
fetch_excluded_imports,
list_imports,
)

__all__ = [
"extract_brick_imports",
"extract_brick_imports_with_namespaces",
"extract_top_ns",
"fetch_all_imports",
"fetch_api",
"fetch_brick_import_usages",
"fetch_excluded_imports",
"list_imports",
]
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,9 @@ def extract_brick_imports(all_imports: dict, top_ns) -> dict:
with_only_brick_names = only_brick_names(with_only_bricks)

return exclude_empty(with_only_brick_names)


def extract_brick_imports_with_namespaces(all_imports: dict, top_ns) -> dict:
with_only_bricks = only_bricks(all_imports, top_ns)

return exclude_empty(with_only_bricks)
73 changes: 72 additions & 1 deletion components/polylith/imports/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from collections.abc import Iterable
from functools import lru_cache
from pathlib import Path
from typing import List, Set, Union
from typing import FrozenSet, List, Set, Union

typing_ns = "typing"
type_checking = "TYPE_CHECKING"
Expand Down Expand Up @@ -68,6 +68,48 @@ def parse_node(node: ast.AST) -> Union[dict, None]:
return None


def find_imported(node_id: str, imported: FrozenSet[str]) -> Union[str, None]:
return next((i for i in imported if str.endswith(i, f".{node_id}")), None)


def extract_api_part(path: str) -> str:
*_parts, api = str.split(path, ".")

return api


def find_matching_node(expr: ast.expr, imported: FrozenSet[str]) -> Union[str, None]:
api = {extract_api_part(i) for i in imported}

if isinstance(expr, ast.Name) and expr.id in api:
return find_imported(expr.id, imported)

return None


def parse_import_usage(node: ast.AST, imported: FrozenSet[str]) -> Union[str, None]:
found = None
child = None

wrapper_nodes = (ast.Await, ast.Expr, ast.NamedExpr, ast.Starred, ast.Subscript)

if isinstance(node, ast.Attribute):
found = find_matching_node(node.value, imported)
child = node.value
elif isinstance(node, wrapper_nodes):
child = node.value
elif isinstance(node, ast.Call):
found = find_matching_node(node.func, imported)
child = node.func
elif isinstance(node, ast.UnaryOp):
child = node.operand

if found:
return found

return parse_import_usage(child, imported) if child is not None else None


def parse_module(path: Path) -> ast.AST:
with open(path.as_posix(), "r", encoding="utf-8", errors="ignore") as f:
tree = ast.parse(f.read(), path.name)
Expand Down Expand Up @@ -113,6 +155,35 @@ def fetch_all_imports(paths: Set[Path]) -> dict:
return {k: v for row in rows for k, v in row.items()}


def fetch_import_usages_in_module(path: Path, imported: FrozenSet[str]) -> Set[str]:
tree = parse_module(path)

nodes = (parse_import_usage(n, imported) for n in ast.walk(tree))

return {n for n in nodes if n is not None}


@lru_cache(maxsize=None)
def fetch_brick_import_usages(path: Path, imported: FrozenSet[str]) -> Set[str]:
py_modules = find_files(path)

res = (fetch_import_usages_in_module(p, imported) for p in py_modules)

return {i for n in res if n for i in n}


def extract_api(paths: Set[str]) -> Set[str]:
return {extract_api_part(p) for p in paths}


def fetch_api(paths: Set[Path]) -> dict:
interfaces = [Path(p / "__init__.py") for p in paths]

rows = [{i.parent.name: extract_api(list_imports(i))} for i in interfaces]

return {k: v for row in rows for k, v in row.items()}


def should_exclude(path: Path, excludes: Set[str]):
return any(path.match(pattern) for pattern in excludes)

Expand Down
3 changes: 2 additions & 1 deletion components/polylith/interface/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from polylith.interface import report
from polylith.interface.interfaces import create_interface

__all__ = ["create_interface"]
__all__ = ["create_interface", "report"]
56 changes: 56 additions & 0 deletions components/polylith/interface/report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from pathlib import Path

from polylith import imports, workspace
from polylith.reporting import theme
from rich import box
from rich.console import Console
from rich.table import Table


def get_brick_data(root: Path, ns: str, brick: str, brick_type: str) -> dict:
paths = {brick}

if brick_type == "base":
brick_path = workspace.paths.collect_bases_paths(root, ns, paths)
else:
brick_path = workspace.paths.collect_components_paths(root, ns, paths)

brick_api = imports.fetch_api(brick_path)
exposes = brick_api.get(brick) or set()

return {
"name": brick,
"type": brick_type,
"exposes": sorted(exposes),
}


def get_brick_imports(root: Path, ns: str, bases: set, components: set) -> dict:
bases_paths = workspace.paths.collect_bases_paths(root, ns, bases)
components_paths = workspace.paths.collect_components_paths(root, ns, components)

in_bases = imports.fetch_all_imports(bases_paths)
in_components = imports.fetch_all_imports(components_paths)

return {
"bases": imports.extract_brick_imports_with_namespaces(in_bases, ns),
"components": imports.extract_brick_imports_with_namespaces(in_components, ns),
}


def print_brick_interface(root: Path, ns: str, brick: str, bricks: dict) -> None:
bases = bricks["bases"]
tag = "base" if brick in bases else "comp"

brick_data = get_brick_data(root, ns, brick, tag)
exposes = brick_data["exposes"]

console = Console(theme=theme.poly_theme)

table = Table(box=box.SIMPLE_HEAD)
table.add_column(f"[{tag}]{brick}[/] [data]brick interface[/]")

for e in exposes:
table.add_row(f"[data]{e}[/]")

console.print(table, overflow="ellipsis")
4 changes: 2 additions & 2 deletions components/polylith/test/core.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from pathlib import Path
from typing import List, Union

from polylith import check, diff, imports
from polylith import diff, imports


def is_test(root: Path, ns: str, path: Path, theme: str) -> bool:
Expand Down Expand Up @@ -34,4 +34,4 @@ def get_brick_imports_in_tests(

all_imports = {k: v for k, v in enumerate(listed_imports)}

return check.grouping.extract_brick_imports(all_imports, ns)
return imports.extract_brick_imports(all_imports, ns)