From 2e10f4800b5c30a5db359bf59af3e4d6e8a1097d Mon Sep 17 00:00:00 2001 From: Titas-Ghosh Date: Sun, 15 Feb 2026 02:14:40 +0530 Subject: [PATCH] Allow relative subdirectory paths in node labels --- mkconcore.py | 123 ++++++++++++++++++++++++++++++---------------- tests/test_cli.py | 24 +++++++++ 2 files changed, 104 insertions(+), 43 deletions(-) diff --git a/mkconcore.py b/mkconcore.py index 14abf34d..9457d74e 100644 --- a/mkconcore.py +++ b/mkconcore.py @@ -92,6 +92,24 @@ def safe_name(value, context, allow_path=False): if re.search(pattern, value): raise ValueError(f"Unsafe {context}: '{value}' contains illegal characters.") return value + +def safe_relpath(value, context): + """ + Allow relative subpaths while blocking traversal and absolute/drive paths. + """ + if not value: + raise ValueError(f"{context} cannot be empty") + normalized = value.replace("\\", "/") + safe_name(normalized, context, allow_path=True) + if normalized.startswith("/") or normalized.startswith("~"): + raise ValueError(f"Unsafe {context}: absolute paths are not allowed.") + if re.match(r"^[A-Za-z]:", normalized): + raise ValueError(f"Unsafe {context}: drive paths are not allowed.") + if ":" in normalized: + raise ValueError(f"Unsafe {context}: ':' is not allowed in relative paths.") + if any(part in ("", "..") for part in normalized.split("/")): + raise ValueError(f"Unsafe {context}: invalid path segment.") + return normalized MKCONCORE_VER = "22-09-18" @@ -273,14 +291,15 @@ def cleanup_script_files(): node_label = re.sub(r'(\s+|\n)', ' ', node_label) #Validate node labels - if ':' in node_label: - container_part, source_part = node_label.split(':', 1) - safe_name(container_part, f"Node container name '{container_part}'") - safe_name(source_part, f"Node source file '{source_part}'") - else: - safe_name(node_label, f"Node label '{node_label}'") - # Explicitly reject incorrect format to prevent later crashes and ambiguity - raise ValueError(f"Invalid node label '{node_label}': expected format 'container:source' with a ':' separator.") + if ':' in node_label: + container_part, source_part = node_label.split(':', 1) + safe_name(container_part, f"Node container name '{container_part}'") + source_part = safe_relpath(source_part, f"Node source file '{source_part}'") + node_label = f"{container_part}:{source_part}" + else: + safe_name(node_label, f"Node label '{node_label}'") + # Explicitly reject incorrect format to prevent later crashes and ambiguity + raise ValueError(f"Invalid node label '{node_label}': expected format 'container:source' with a ':' separator.") nodes_dict[node['id']] = node_label node_id_to_label_map[node['id']] = node_label.split(':')[0] @@ -466,12 +485,15 @@ def cleanup_script_files(): if not sourcecode: continue - if "." in sourcecode: - dockername, langext = os.path.splitext(sourcecode) - else: - dockername, langext = sourcecode, "" - - script_target_path = os.path.join(outdir, "src", sourcecode) + if "." in sourcecode: + dockername, langext = os.path.splitext(sourcecode) + else: + dockername, langext = sourcecode, "" + + script_target_path = os.path.join(outdir, "src", sourcecode) + script_target_parent = os.path.dirname(script_target_path) + if script_target_parent: + os.makedirs(script_target_parent, exist_ok=True) # If the script was specialized, it's already in outdir/src. If not, copy from sourcedir. if node_id_key not in node_edge_params: @@ -661,27 +683,33 @@ def cleanup_script_files(): # 4. Write final iport/oport files logging.info("Writing .iport and .oport files...") -for node_label, ports in node_port_mappings.items(): +for node_label, ports in node_port_mappings.items(): try: containername, sourcecode = node_label.split(':', 1) - if not sourcecode or "." not in sourcecode: continue - dockername = os.path.splitext(sourcecode)[0] - with open(os.path.join(outdir, "src", f"{dockername}.iport"), "w") as fport: - fport.write(str(ports['iport']).replace("'" + prefixedgenode, "'")) - with open(os.path.join(outdir, "src", f"{dockername}.oport"), "w") as fport: - fport.write(str(ports['oport']).replace("'" + prefixedgenode, "'")) + if not sourcecode or "." not in sourcecode: continue + dockername = os.path.splitext(sourcecode)[0] + iport_path = os.path.join(outdir, "src", f"{dockername}.iport") + oport_path = os.path.join(outdir, "src", f"{dockername}.oport") + iport_parent = os.path.dirname(iport_path) + if iport_parent: + os.makedirs(iport_parent, exist_ok=True) + with open(iport_path, "w") as fport: + fport.write(str(ports['iport']).replace("'" + prefixedgenode, "'")) + with open(oport_path, "w") as fport: + fport.write(str(ports['oport']).replace("'" + prefixedgenode, "'")) except ValueError: continue #if docker, make docker-dirs, generate build, run, stop, clear scripts and quit -if (concoretype=="docker"): - for node in nodes_dict: - containername,sourcecode = nodes_dict[node].split(':') - if len(sourcecode)!=0 and sourcecode.find(".")!=-1: #3/28/21 - dockername,langext = sourcecode.split(".") - if not os.path.exists(outdir+"/src/Dockerfile."+dockername): # 3/30/21 - try: +if (concoretype=="docker"): + for node in nodes_dict: + containername,sourcecode = nodes_dict[node].split(':') + if len(sourcecode)!=0 and sourcecode.find(".")!=-1: #3/28/21 + dockername,langext = sourcecode.split(".") + dockerfile_path = os.path.join(outdir, "src", f"Dockerfile.{dockername}") + if not os.path.exists(dockerfile_path): # 3/30/21 + try: if langext=="py": src_path = CONCOREPATH+"/Dockerfile.py" logging.info("assuming .py extension for Dockerfile") @@ -699,11 +727,14 @@ def cleanup_script_files(): logging.info("assuming .m extension for Dockerfile") with open(src_path) as fsource: source_content = fsource.read() - except: - logging.error(f"{CONCOREPATH} is not correct path to concore") - quit() - with open(outdir+"/src/Dockerfile."+dockername,"w") as fcopy: - fcopy.write(source_content) + except: + logging.error(f"{CONCOREPATH} is not correct path to concore") + quit() + dockerfile_parent = os.path.dirname(dockerfile_path) + if dockerfile_parent: + os.makedirs(dockerfile_parent, exist_ok=True) + with open(dockerfile_path,"w") as fcopy: + fcopy.write(source_content) if langext=="py": fcopy.write('CMD ["python", "-i", "'+sourcecode+'"]\n') if langext=="m": @@ -947,16 +978,22 @@ def cleanup_script_files(): if concoretype=="posix": fbuild.write('#!/bin/bash' + "\n") -for node in nodes_dict: - containername,sourcecode = nodes_dict[node].split(':') - if len(sourcecode)!=0: - if sourcecode.find(".")==-1: - logging.error("cannot pull container "+sourcecode+" with control core type "+concoretype) #3/28/21 - quit() - dockername,langext = sourcecode.split(".") - fbuild.write('mkdir '+containername+"\n") - if concoretype == "windows": - fbuild.write("copy .\\src\\"+sourcecode+" .\\"+containername+"\\"+sourcecode+"\n") +for node in nodes_dict: + containername,sourcecode = nodes_dict[node].split(':') + if len(sourcecode)!=0: + if sourcecode.find(".")==-1: + logging.error("cannot pull container "+sourcecode+" with control core type "+concoretype) #3/28/21 + quit() + dockername,langext = sourcecode.split(".") + fbuild.write('mkdir '+containername+"\n") + source_subdir = os.path.dirname(sourcecode).replace("\\", "/") + if source_subdir: + if concoretype == "windows": + fbuild.write("mkdir .\\"+containername+"\\"+source_subdir.replace("/", "\\")+"\n") + else: + fbuild.write("mkdir -p ./"+containername+"/"+source_subdir+"\n") + if concoretype == "windows": + fbuild.write("copy .\\src\\"+sourcecode+" .\\"+containername+"\\"+sourcecode+"\n") if langext == "py": fbuild.write("copy .\\src\\concore.py .\\" + containername + "\\concore.py\n") elif langext == "cpp": diff --git a/tests/test_cli.py b/tests/test_cli.py index 8d5a3994..b1853b49 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -114,6 +114,30 @@ def test_run_command_default_type(self): else: self.assertTrue(Path('out/build').exists()) + def test_run_command_subdir_source(self): + with self.runner.isolated_filesystem(temp_dir=self.temp_dir): + result = self.runner.invoke(cli, ['init', 'test-project']) + self.assertEqual(result.exit_code, 0) + + subdir = Path('test-project/src/subdir') + subdir.mkdir(parents=True, exist_ok=True) + shutil.move('test-project/src/script.py', subdir / 'script.py') + + workflow_path = Path('test-project/workflow.graphml') + content = workflow_path.read_text() + content = content.replace('N1:script.py', 'N1:subdir/script.py') + workflow_path.write_text(content) + + result = self.runner.invoke(cli, [ + 'run', + 'test-project/workflow.graphml', + '--source', 'test-project/src', + '--output', 'out', + '--type', 'posix' + ]) + self.assertEqual(result.exit_code, 0) + self.assertTrue(Path('out/src/subdir/script.py').exists()) + def test_run_command_existing_output(self): with self.runner.isolated_filesystem(temp_dir=self.temp_dir): result = self.runner.invoke(cli, ['init', 'test-project'])