diff --git a/concore_cli/README.md b/concore_cli/README.md
index b0da505..55546c0 100644
--- a/concore_cli/README.md
+++ b/concore_cli/README.md
@@ -72,12 +72,18 @@ concore run workflow.graphml --source ./src --output ./build --auto-build
Validates a GraphML workflow file before running.
+**Options:**
+- `-s, --source
` - Source directory to verify file references exist
+
Checks:
- Valid XML structure
- GraphML format compliance
- Node and edge definitions
- File references and naming conventions
-- ZMQ vs file-based communication
+- Source file existence (when --source provided)
+- ZMQ port conflicts and reserved ports
+- Circular dependencies (warns for control loops)
+- Edge connectivity
**Options:**
- `-s, --source ` - Source directory (default: src)
@@ -85,6 +91,7 @@ Checks:
**Example:**
```bash
concore validate workflow.graphml
+concore validate workflow.graphml --source ./src
```
### `concore status`
diff --git a/concore_cli/commands/validate.py b/concore_cli/commands/validate.py
index 6f51ce6..7cacab9 100644
--- a/concore_cli/commands/validate.py
+++ b/concore_cli/commands/validate.py
@@ -19,7 +19,7 @@ def validate_workflow(workflow_file, source_dir, console):
def finalize():
show_results(console, errors, warnings, info)
return len(errors) == 0
-
+
try:
with open(workflow_path, 'r') as f:
content = f.read()
@@ -69,7 +69,7 @@ def finalize():
warnings.append("No edges found in workflow")
else:
info.append(f"Found {len(edges)} edge(s)")
-
+
if not source_root.exists():
warnings.append(f"Source directory not found: {source_root}")
@@ -96,7 +96,7 @@ def finalize():
if re.search(r'[;&|`$\'"()\\]', label):
errors.append(f"Node '{label}' contains unsafe shell characters")
continue
-
+
if ':' not in label:
warnings.append(f"Node '{label}' missing format 'ID:filename'")
else:
@@ -155,8 +155,11 @@ def finalize():
if file_edges > 0:
info.append(f"File-based edges: {file_edges}")
+ _check_cycles(soup, errors, warnings)
+ _check_zmq_ports(soup, errors, warnings)
+
return finalize()
-
+
except FileNotFoundError:
console.print(f"[red]Error:[/red] File not found: {workflow_path}")
return False
@@ -164,6 +167,78 @@ def finalize():
console.print(f"[red]Validation failed:[/red] {str(e)}")
return False
+def _check_cycles(soup, errors, warnings):
+ nodes = soup.find_all('node')
+ edges = soup.find_all('edge')
+
+ node_ids = [node.get('id') for node in nodes if node.get('id')]
+ if not node_ids:
+ return
+
+ graph = {nid: [] for nid in node_ids}
+ for edge in edges:
+ source = edge.get('source')
+ target = edge.get('target')
+ if source and target and source in graph:
+ graph[source].append(target)
+
+ def has_cycle_from(start, visited, rec_stack):
+ visited.add(start)
+ rec_stack.add(start)
+
+ for neighbor in graph.get(start, []):
+ if neighbor not in visited:
+ if has_cycle_from(neighbor, visited, rec_stack):
+ return True
+ elif neighbor in rec_stack:
+ return True
+
+ rec_stack.remove(start)
+ return False
+
+ visited = set()
+ for node_id in node_ids:
+ if node_id not in visited:
+ if has_cycle_from(node_id, visited, set()):
+ warnings.append("Workflow contains cycles (expected for control loops)")
+ return
+
+def _check_zmq_ports(soup, errors, warnings):
+ edges = soup.find_all('edge')
+ port_pattern = re.compile(r"0x([a-fA-F0-9]+)_(\S+)")
+
+ ports_used = {}
+
+ for edge in edges:
+ label_tag = edge.find('y:EdgeLabel') or edge.find('EdgeLabel')
+ if not label_tag or not label_tag.text:
+ continue
+
+ match = port_pattern.match(label_tag.text.strip())
+ if not match:
+ continue
+
+ port_hex = match.group(1)
+ port_name = match.group(2)
+ port_num = int(port_hex, 16)
+
+ if port_num < 1:
+ errors.append(f"Invalid port number: {port_num} (0x{port_hex}) must be at least 1")
+ continue
+ elif port_num > 65535:
+ errors.append(f"Invalid port number: {port_num} (0x{port_hex}) exceeds maximum (65535)")
+ continue
+
+ if port_num in ports_used:
+ existing_name = ports_used[port_num]
+ if existing_name != port_name:
+ errors.append(f"Port conflict: 0x{port_hex} used for both '{existing_name}' and '{port_name}'")
+ else:
+ ports_used[port_num] = port_name
+
+ if port_num < 1024:
+ warnings.append(f"Port {port_num} (0x{port_hex}) is in reserved range (< 1024)")
+
def show_results(console, errors, warnings, info):
if errors:
console.print("[red]✗ Validation failed[/red]\n")
diff --git a/tests/test_graph.py b/tests/test_graph.py
index f6e2825..6aef0d3 100644
--- a/tests/test_graph.py
+++ b/tests/test_graph.py
@@ -145,6 +145,165 @@ def test_validate_valid_graph(self):
self.assertIn('Validation passed', result.output)
self.assertIn('Workflow is valid', result.output)
+
+ def test_validate_missing_source_file(self):
+ content = '''
+
+
+
+ n0:missing.py
+
+
+
+ '''
+ filepath = self.create_graph_file('workflow.graphml', content)
+ source_dir = Path(self.temp_dir) / 'src'
+ source_dir.mkdir()
+
+ result = self.runner.invoke(cli, ['validate', filepath, '--source', str(source_dir)])
+
+ self.assertIn('Validation failed', result.output)
+ self.assertIn('Missing source file', result.output)
+
+ def test_validate_with_existing_source_file(self):
+ content = '''
+
+
+
+ n0:exists.py
+
+
+
+ '''
+ filepath = self.create_graph_file('workflow.graphml', content)
+ source_dir = Path(self.temp_dir) / 'src'
+ source_dir.mkdir()
+ (source_dir / 'exists.py').write_text('print("hello")')
+
+ result = self.runner.invoke(cli, ['validate', filepath, '--source', str(source_dir)])
+
+ self.assertIn('Validation passed', result.output)
+
+ def test_validate_zmq_port_conflict(self):
+ content = '''
+
+
+
+ n0:script1.py
+
+
+ n1:script2.py
+
+
+ 0x1234_portA
+
+
+ 0x1234_portB
+
+
+
+ '''
+ filepath = self.create_graph_file('conflict.graphml', content)
+
+ result = self.runner.invoke(cli, ['validate', filepath])
+
+ self.assertIn('Validation failed', result.output)
+ self.assertIn('Port conflict', result.output)
+
+ def test_validate_reserved_port(self):
+ content = '''
+
+
+
+ n0:script1.py
+
+
+ n1:script2.py
+
+
+ 0x50_data
+
+
+
+ '''
+ filepath = self.create_graph_file('reserved.graphml', content)
+
+ result = self.runner.invoke(cli, ['validate', filepath])
+
+ self.assertIn('Port 80', result.output)
+ self.assertIn('reserved range', result.output)
+
+ def test_validate_cycle_detection(self):
+ content = '''
+
+
+
+ n0:controller.py
+
+
+ n1:plant.py
+
+
+ control_signal
+
+
+ sensor_data
+
+
+
+ '''
+ filepath = self.create_graph_file('cycle.graphml', content)
+
+ result = self.runner.invoke(cli, ['validate', filepath])
+
+ self.assertIn('cycles', result.output)
+ self.assertIn('control loops', result.output)
+
+ def test_validate_port_zero(self):
+ content = '''
+
+
+
+ n0:script1.py
+
+
+ n1:script2.py
+
+
+ 0x0_invalid
+
+
+
+ '''
+ filepath = self.create_graph_file('port_zero.graphml', content)
+
+ result = self.runner.invoke(cli, ['validate', filepath])
+
+ self.assertIn('Validation failed', result.output)
+ self.assertIn('must be at least 1', result.output)
+
+ def test_validate_port_exceeds_maximum(self):
+ content = '''
+
+
+
+ n0:script1.py
+
+
+ n1:script2.py
+
+
+ 0x10000_toobig
+
+
+
+ '''
+ filepath = self.create_graph_file('port_max.graphml', content)
+
+ result = self.runner.invoke(cli, ['validate', filepath])
+
+ self.assertIn('Validation failed', result.output)
+ self.assertIn('exceeds maximum (65535)', result.output)
if __name__ == '__main__':
unittest.main()
\ No newline at end of file