Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion concore_cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,26 @@ concore run workflow.graphml --source ./src --output ./build --auto-build

Validates a GraphML workflow file before running.

**Options:**
- `-s, --source <dir>` - Source directory to verify file references exist

Checks:
- Valid XML structure
- GraphML format compliance
- Node and edge definitions
- File references and naming conventions
- ZMQ vs file-based communication
- Source file existence (when --source provided)
- ZMQ port conflicts and reserved ports
- Circular dependencies (warns for control loops)
- Edge connectivity

**Options:**
- `-s, --source <dir>` - Source directory (default: src)

**Example:**
```bash
concore validate workflow.graphml
concore validate workflow.graphml --source ./src
```

### `concore status`
Expand Down
83 changes: 79 additions & 4 deletions concore_cli/commands/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def validate_workflow(workflow_file, source_dir, console):
def finalize():
show_results(console, errors, warnings, info)
return len(errors) == 0

try:
with open(workflow_path, 'r') as f:
content = f.read()
Expand Down Expand Up @@ -69,7 +69,7 @@ def finalize():
warnings.append("No edges found in workflow")
else:
info.append(f"Found {len(edges)} edge(s)")

if not source_root.exists():
warnings.append(f"Source directory not found: {source_root}")

Expand All @@ -96,7 +96,7 @@ def finalize():
if re.search(r'[;&|`$\'"()\\]', label):
errors.append(f"Node '{label}' contains unsafe shell characters")
continue

if ':' not in label:
warnings.append(f"Node '{label}' missing format 'ID:filename'")
else:
Expand Down Expand Up @@ -155,15 +155,90 @@ def finalize():
if file_edges > 0:
info.append(f"File-based edges: {file_edges}")

_check_cycles(soup, errors, warnings)
_check_zmq_ports(soup, errors, warnings)

return finalize()

except FileNotFoundError:
console.print(f"[red]Error:[/red] File not found: {workflow_path}")
return False
except Exception as e:
console.print(f"[red]Validation failed:[/red] {str(e)}")
return False

def _check_cycles(soup, errors, warnings):
nodes = soup.find_all('node')
edges = soup.find_all('edge')

node_ids = [node.get('id') for node in nodes if node.get('id')]
if not node_ids:
return

graph = {nid: [] for nid in node_ids}
for edge in edges:
source = edge.get('source')
target = edge.get('target')
if source and target and source in graph:
graph[source].append(target)

def has_cycle_from(start, visited, rec_stack):
visited.add(start)
rec_stack.add(start)

for neighbor in graph.get(start, []):
if neighbor not in visited:
if has_cycle_from(neighbor, visited, rec_stack):
return True
elif neighbor in rec_stack:
return True

rec_stack.remove(start)
return False

visited = set()
for node_id in node_ids:
if node_id not in visited:
if has_cycle_from(node_id, visited, set()):
warnings.append("Workflow contains cycles (expected for control loops)")
return

def _check_zmq_ports(soup, errors, warnings):
edges = soup.find_all('edge')
port_pattern = re.compile(r"0x([a-fA-F0-9]+)_(\S+)")

ports_used = {}

for edge in edges:
label_tag = edge.find('y:EdgeLabel') or edge.find('EdgeLabel')
if not label_tag or not label_tag.text:
continue

match = port_pattern.match(label_tag.text.strip())
if not match:
continue

port_hex = match.group(1)
port_name = match.group(2)
port_num = int(port_hex, 16)

if port_num < 1:
errors.append(f"Invalid port number: {port_num} (0x{port_hex}) must be at least 1")
continue
elif port_num > 65535:
errors.append(f"Invalid port number: {port_num} (0x{port_hex}) exceeds maximum (65535)")
continue

if port_num in ports_used:
existing_name = ports_used[port_num]
if existing_name != port_name:
errors.append(f"Port conflict: 0x{port_hex} used for both '{existing_name}' and '{port_name}'")
else:
ports_used[port_num] = port_name

if port_num < 1024:
warnings.append(f"Port {port_num} (0x{port_hex}) is in reserved range (< 1024)")

def show_results(console, errors, warnings, info):
if errors:
console.print("[red]✗ Validation failed[/red]\n")
Expand Down
159 changes: 159 additions & 0 deletions tests/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,165 @@ def test_validate_valid_graph(self):

self.assertIn('Validation passed', result.output)
self.assertIn('Workflow is valid', result.output)

def test_validate_missing_source_file(self):
content = '''
<graphml xmlns:y="http://www.yworks.com/xml/graphml">
<graph id="G" edgedefault="directed">
<node id="n0">
<data key="d0"><y:NodeLabel>n0:missing.py</y:NodeLabel></data>
</node>
</graph>
</graphml>
'''
filepath = self.create_graph_file('workflow.graphml', content)
source_dir = Path(self.temp_dir) / 'src'
source_dir.mkdir()

result = self.runner.invoke(cli, ['validate', filepath, '--source', str(source_dir)])

self.assertIn('Validation failed', result.output)
self.assertIn('Missing source file', result.output)

def test_validate_with_existing_source_file(self):
content = '''
<graphml xmlns:y="http://www.yworks.com/xml/graphml">
<graph id="G" edgedefault="directed">
<node id="n0">
<data key="d0"><y:NodeLabel>n0:exists.py</y:NodeLabel></data>
</node>
</graph>
</graphml>
'''
filepath = self.create_graph_file('workflow.graphml', content)
source_dir = Path(self.temp_dir) / 'src'
source_dir.mkdir()
(source_dir / 'exists.py').write_text('print("hello")')

result = self.runner.invoke(cli, ['validate', filepath, '--source', str(source_dir)])

self.assertIn('Validation passed', result.output)

def test_validate_zmq_port_conflict(self):
content = '''
<graphml xmlns:y="http://www.yworks.com/xml/graphml">
<graph id="G" edgedefault="directed">
<node id="n0">
<data key="d0"><y:NodeLabel>n0:script1.py</y:NodeLabel></data>
</node>
<node id="n1">
<data key="d0"><y:NodeLabel>n1:script2.py</y:NodeLabel></data>
</node>
<edge source="n0" target="n1">
<data key="d1"><y:EdgeLabel>0x1234_portA</y:EdgeLabel></data>
</edge>
<edge source="n1" target="n0">
<data key="d1"><y:EdgeLabel>0x1234_portB</y:EdgeLabel></data>
</edge>
</graph>
</graphml>
'''
filepath = self.create_graph_file('conflict.graphml', content)

result = self.runner.invoke(cli, ['validate', filepath])

self.assertIn('Validation failed', result.output)
self.assertIn('Port conflict', result.output)

def test_validate_reserved_port(self):
content = '''
<graphml xmlns:y="http://www.yworks.com/xml/graphml">
<graph id="G" edgedefault="directed">
<node id="n0">
<data key="d0"><y:NodeLabel>n0:script1.py</y:NodeLabel></data>
</node>
<node id="n1">
<data key="d0"><y:NodeLabel>n1:script2.py</y:NodeLabel></data>
</node>
<edge source="n0" target="n1">
<data key="d1"><y:EdgeLabel>0x50_data</y:EdgeLabel></data>
</edge>
</graph>
</graphml>
'''
filepath = self.create_graph_file('reserved.graphml', content)

result = self.runner.invoke(cli, ['validate', filepath])

self.assertIn('Port 80', result.output)
self.assertIn('reserved range', result.output)

Comment on lines +187 to +235
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Port validation tests cover reserved-port warnings and same-port conflicts, but there’s no test for the stated port-range requirement (1–65535). After adding the <1 check (and optionally asserting the existing >65535 behavior), add tests for port 0 (should fail) and a port > 65535 (should fail) to lock in the intended behavior.

Copilot uses AI. Check for mistakes.
def test_validate_cycle_detection(self):
content = '''
<graphml xmlns:y="http://www.yworks.com/xml/graphml">
<graph id="G" edgedefault="directed">
<node id="n0">
<data key="d0"><y:NodeLabel>n0:controller.py</y:NodeLabel></data>
</node>
<node id="n1">
<data key="d0"><y:NodeLabel>n1:plant.py</y:NodeLabel></data>
</node>
<edge source="n0" target="n1">
<data key="d1"><y:EdgeLabel>control_signal</y:EdgeLabel></data>
</edge>
<edge source="n1" target="n0">
<data key="d1"><y:EdgeLabel>sensor_data</y:EdgeLabel></data>
</edge>
</graph>
</graphml>
'''
filepath = self.create_graph_file('cycle.graphml', content)

result = self.runner.invoke(cli, ['validate', filepath])

self.assertIn('cycles', result.output)
self.assertIn('control loops', result.output)

def test_validate_port_zero(self):
content = '''
<graphml xmlns:y="http://www.yworks.com/xml/graphml">
<graph id="G" edgedefault="directed">
<node id="n0">
<data key="d0"><y:NodeLabel>n0:script1.py</y:NodeLabel></data>
</node>
<node id="n1">
<data key="d0"><y:NodeLabel>n1:script2.py</y:NodeLabel></data>
</node>
<edge source="n0" target="n1">
<data key="d1"><y:EdgeLabel>0x0_invalid</y:EdgeLabel></data>
</edge>
</graph>
</graphml>
'''
filepath = self.create_graph_file('port_zero.graphml', content)

result = self.runner.invoke(cli, ['validate', filepath])

self.assertIn('Validation failed', result.output)
self.assertIn('must be at least 1', result.output)

def test_validate_port_exceeds_maximum(self):
content = '''
<graphml xmlns:y="http://www.yworks.com/xml/graphml">
<graph id="G" edgedefault="directed">
<node id="n0">
<data key="d0"><y:NodeLabel>n0:script1.py</y:NodeLabel></data>
</node>
<node id="n1">
<data key="d0"><y:NodeLabel>n1:script2.py</y:NodeLabel></data>
</node>
<edge source="n0" target="n1">
<data key="d1"><y:EdgeLabel>0x10000_toobig</y:EdgeLabel></data>
</edge>
</graph>
</graphml>
'''
filepath = self.create_graph_file('port_max.graphml', content)

result = self.runner.invoke(cli, ['validate', filepath])

self.assertIn('Validation failed', result.output)
self.assertIn('exceeds maximum (65535)', result.output)

if __name__ == '__main__':
unittest.main()