Add dynamic DAG bundle configuration from file path#63928
Add dynamic DAG bundle configuration from file path#63928tardunge wants to merge 3 commits intoapache:mainfrom
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
ff9f785 to
fbdca7e
Compare
Adds a new `dag_bundle_config_path` setting that allows loading DAG bundle configurations from JSON files in a directory. This enables hot-reloading of bundle configurations without restarting the DAG processor, which is particularly useful in Kubernetes environments where ConfigMaps can be mounted as files. When `dag_bundle_config_path` is set, it takes precedence over the existing `dag_bundle_config_list` setting. The DAG processor monitors the directory for file additions, removals, and modifications, and automatically reloads bundles when changes are detected. Key changes: - DagBundlesManager: new _parse_config_from_path(), check_config_path_changes(), and get_bundle_path_safe() methods - DagFileProcessorManager: reuses a single DagBundlesManager instance and detects config path changes in the refresh loop, cleaning up removed bundles - ParseImportError: graceful handling when bundles are removed - config.yml: new dag_bundle_config_path option
Rename loop variable to avoid shadowing Path-typed file_path with str-typed iteration variable from dict.items().
…nges The check_config_path_changes() call in _refresh_dag_bundles() must use `is True` identity check rather than truthiness, because MagicMock returns a truthy MagicMock object by default. Also restore sync_bundles() and get_all_bundles() to use throwaway DagBundlesManager instances (matching upstream pattern) to avoid mock interference.
fbdca7e to
7cfe224
Compare
|
It looks to me this relies on the directory content following certain conventions that did not previously exist. This is a problem on its own, but it means descriptions must be added to the documentation in relevant sections. |
There was a problem hiding this comment.
Pull request overview
Adds support for dynamically loading DAG bundle configuration from a directory of JSON files (with change detection), enabling hot-reload behavior in the DAG processor without requiring restarts.
Changes:
- Add
dag_bundle_config_pathconfig option and JSON-directory parsing logic toDagBundlesManager - Add config-path change detection + bundle cleanup/reload flow in
DagFileProcessorManager - Make
ParseImportError.full_file_path()resilient when a referenced bundle has been removed
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py | Adds unit tests for config-path parsing, precedence, change detection, and safe bundle accessors |
| airflow-core/src/airflow/models/errors.py | Avoids raising when a bundle referenced by an import error has been removed |
| airflow-core/src/airflow/dag_processing/manager.py | Reuses a bundles manager and triggers reload/cleanup when config-path changes |
| airflow-core/src/airflow/dag_processing/bundles/manager.py | Implements config-path parsing, mtime tracking, and “safe” bundle access methods |
| airflow-core/src/airflow/config_templates/config.yml | Documents new dag_bundle_config_path setting |
| # Clear old mtime entries before repopulating | ||
| self._config_path_mtime.clear() | ||
|
|
||
| config_list = [] | ||
| seen_bundle_names: set[str] = set() | ||
|
|
||
| for file_path in path.glob("*.json"): | ||
| try: | ||
| config = json.loads(file_path.read_text()) | ||
| if not isinstance(config, dict): | ||
| self.log.error("Invalid config in %s: Expected dict but got %s", file_path, type(config)) | ||
| continue | ||
|
|
||
| # Check for duplicate bundle names | ||
| bundle_name = config.get("name") | ||
| if bundle_name and bundle_name in seen_bundle_names: | ||
| self.log.warning( | ||
| "Duplicate bundle name '%s' found in %s, skipping this file", | ||
| bundle_name, | ||
| file_path, | ||
| ) | ||
| continue | ||
| if bundle_name: | ||
| seen_bundle_names.add(bundle_name) | ||
|
|
||
| config_list.append(config) | ||
| # Track file modification time for change detection | ||
| self._config_path_mtime[str(file_path)] = file_path.stat().st_mtime | ||
| except json.JSONDecodeError as e: | ||
| self.log.error("Failed to parse JSON from %s: %s", file_path, e) | ||
| except Exception as e: | ||
| self.log.error("Error reading config file %s: %s", file_path, e) |
There was a problem hiding this comment.
check_config_path_changes() compares the directory's *.json files to self._config_path_mtime, but _config_path_mtime is currently populated only for files that successfully parse and are not skipped as duplicates. If the directory contains any invalid JSON, schema-invalid config, or skipped duplicate-name file, the key sets will never match and the manager will report changes on every loop, causing repeated reload churn. Consider tracking mtimes for all *.json files (even ones skipped for invalid content/duplicates), and keep parsing/validation concerns separate from change-detection bookkeeping.
| # Check for added or removed files | ||
| if set(current_files.keys()) != set(self._config_path_mtime.keys()): | ||
| self.log.info("DAG bundle config files added or removed") | ||
| return True | ||
|
|
||
| # Check for modified files | ||
| for fpath, mtime in current_files.items(): | ||
| if fpath in self._config_path_mtime and self._config_path_mtime[fpath] != mtime: | ||
| self.log.info("DAG bundle config file modified: %s", fpath) | ||
| return True | ||
|
|
||
| return False |
There was a problem hiding this comment.
check_config_path_changes() compares the directory's *.json files to self._config_path_mtime, but _config_path_mtime is currently populated only for files that successfully parse and are not skipped as duplicates. If the directory contains any invalid JSON, schema-invalid config, or skipped duplicate-name file, the key sets will never match and the manager will report changes on every loop, causing repeated reload churn. Consider tracking mtimes for all *.json files (even ones skipped for invalid content/duplicates), and keep parsing/validation concerns separate from change-detection bookkeeping.
| # Check for added or removed files | |
| if set(current_files.keys()) != set(self._config_path_mtime.keys()): | |
| self.log.info("DAG bundle config files added or removed") | |
| return True | |
| # Check for modified files | |
| for fpath, mtime in current_files.items(): | |
| if fpath in self._config_path_mtime and self._config_path_mtime[fpath] != mtime: | |
| self.log.info("DAG bundle config file modified: %s", fpath) | |
| return True | |
| return False | |
| # Use a dedicated snapshot of all observed JSON files for change detection. | |
| # This is intentionally separate from self._config_path_mtime, which only | |
| # tracks successfully parsed and non-duplicate configs. | |
| previous_files: dict[str, float] = getattr(self, "_config_path_all_mtime", {}) | |
| files_changed = False | |
| # Check for added or removed files | |
| if set(current_files.keys()) != set(previous_files.keys()): | |
| self.log.info("DAG bundle config files added or removed") | |
| files_changed = True | |
| else: | |
| # Check for modified files | |
| for fpath, mtime in current_files.items(): | |
| if previous_files.get(fpath) != mtime: | |
| self.log.info("DAG bundle config file modified: %s", fpath) | |
| files_changed = True | |
| break | |
| # Update the snapshot to the current state so that subsequent checks | |
| # compare against the latest observed directory contents. | |
| self._config_path_all_mtime = current_files | |
| return files_changed |
| if bundle_name and bundle_name in seen_bundle_names: | ||
| self.log.warning( | ||
| "Duplicate bundle name '%s' found in %s, skipping this file", | ||
| bundle_name, | ||
| file_path, | ||
| ) | ||
| continue | ||
| if bundle_name: | ||
| seen_bundle_names.add(bundle_name) |
There was a problem hiding this comment.
bundle_name is taken directly from JSON and added to a set. If name is present but not a string (e.g., a JSON array/object), seen_bundle_names.add(bundle_name) will raise TypeError: unhashable type, breaking config loading. Validate name is a str (and non-empty) before using it for duplicate detection; otherwise log and skip that file.
| if bundle_name and bundle_name in seen_bundle_names: | |
| self.log.warning( | |
| "Duplicate bundle name '%s' found in %s, skipping this file", | |
| bundle_name, | |
| file_path, | |
| ) | |
| continue | |
| if bundle_name: | |
| seen_bundle_names.add(bundle_name) | |
| if bundle_name is None: | |
| self.log.error("Invalid config in %s: Missing required 'name' field", file_path) | |
| continue | |
| if not isinstance(bundle_name, str): | |
| self.log.error( | |
| "Invalid config in %s: 'name' field must be a string, got %r", | |
| file_path, | |
| bundle_name, | |
| ) | |
| continue | |
| if not bundle_name: | |
| self.log.error("Invalid config in %s: 'name' field cannot be empty", file_path) | |
| continue | |
| if bundle_name in seen_bundle_names: | |
| self.log.warning( | |
| "Duplicate bundle name '%s' found in %s, skipping this file", | |
| bundle_name, | |
| file_path, | |
| ) | |
| continue | |
| seen_bundle_names.add(bundle_name) |
| path = Path(config_path) | ||
| if not path.exists(): | ||
| self.log.warning("DAG bundle config path does not exist: %s", config_path) | ||
| return |
There was a problem hiding this comment.
In path mode, _parse_config_from_path() returns early when the directory is missing or when no valid configs are found, without clearing self._bundle_config. This can leave stale bundles configured even after all JSON files are removed (or the directory disappears), which undermines the PR’s “removals” behavior and prevents cleanup from ever triggering. A robust approach is to treat 'missing/empty/no-valid-config' as 'no bundles configured' (clear _bundle_config accordingly), and have check_config_path_changes() report a change when the directory disappears (at least when it previously existed / had tracked files).
| if not config_list: | ||
| self.log.warning("No valid DAG bundle configs found in %s", config_path) | ||
| return |
There was a problem hiding this comment.
In path mode, _parse_config_from_path() returns early when the directory is missing or when no valid configs are found, without clearing self._bundle_config. This can leave stale bundles configured even after all JSON files are removed (or the directory disappears), which undermines the PR’s “removals” behavior and prevents cleanup from ever triggering. A robust approach is to treat 'missing/empty/no-valid-config' as 'no bundles configured' (clear _bundle_config accordingly), and have check_config_path_changes() report a change when the directory disappears (at least when it previously existed / had tracked files).
| return False | ||
|
|
||
| path = Path(config_path) | ||
| if not path.exists() or not path.is_dir(): |
There was a problem hiding this comment.
In path mode, _parse_config_from_path() returns early when the directory is missing or when no valid configs are found, without clearing self._bundle_config. This can leave stale bundles configured even after all JSON files are removed (or the directory disappears), which undermines the PR’s “removals” behavior and prevents cleanup from ever triggering. A robust approach is to treat 'missing/empty/no-valid-config' as 'no bundles configured' (clear _bundle_config accordingly), and have check_config_path_changes() report a change when the directory disappears (at least when it previously existed / had tracked files).
| if not path.exists() or not path.is_dir(): | |
| if not path.exists() or not path.is_dir(): | |
| # If we previously tracked files from this path, treat disappearance as a change | |
| if getattr(self, "_config_path_mtime", None): | |
| self.log.info("DAG bundle config path '%s' no longer exists or is not a directory", path) | |
| return True |
|
|
||
| config_list.append(config) | ||
| # Track file modification time for change detection | ||
| self._config_path_mtime[str(file_path)] = file_path.stat().st_mtime |
There was a problem hiding this comment.
Using st_mtime (float, often with coarse resolution depending on filesystem/platform) can miss rapid successive edits, meaning config changes may not be detected reliably. To make reload detection more robust, consider using st_mtime_ns (integer nanoseconds) or combining mtime with file size/content hashing for change detection.
| self._config_path_mtime[str(file_path)] = file_path.stat().st_mtime | |
| self._config_path_mtime[str(file_path)] = file_path.stat().st_mtime_ns |
| current_files: dict[str, float] = {} | ||
| for file_path in path.glob("*.json"): | ||
| current_files[str(file_path)] = file_path.stat().st_mtime |
There was a problem hiding this comment.
Using st_mtime (float, often with coarse resolution depending on filesystem/platform) can miss rapid successive edits, meaning config changes may not be detected reliably. To make reload detection more robust, consider using st_mtime_ns (integer nanoseconds) or combining mtime with file size/content hashing for change detection.
| current_files: dict[str, float] = {} | |
| for file_path in path.glob("*.json"): | |
| current_files[str(file_path)] = file_path.stat().st_mtime | |
| current_files: dict[str, int] = {} | |
| for file_path in path.glob("*.json"): | |
| current_files[str(file_path)] = file_path.stat().st_mtime_ns |
| # Modify the file | ||
| time.sleep(0.01) # Ensure mtime changes | ||
| bundle_config["kwargs"]["refresh_interval"] = 99 | ||
| bundle_file.write_text(json.dumps(bundle_config)) |
There was a problem hiding this comment.
This test relies on time.sleep(0.01) to force an mtime change, which can be flaky on filesystems with coarse timestamp granularity. Prefer explicitly bumping the mtime (e.g., via os.utime) or using st_mtime_ns in the implementation and adjusting the test accordingly, so the test doesn’t depend on timing.
| # Modify the file | |
| time.sleep(0.01) # Ensure mtime changes | |
| bundle_config["kwargs"]["refresh_interval"] = 99 | |
| bundle_file.write_text(json.dumps(bundle_config)) | |
| # Modify the file and ensure its mtime changes deterministically | |
| bundle_config["kwargs"]["refresh_interval"] = 99 | |
| bundle_file.write_text(json.dumps(bundle_config)) | |
| stat_result = bundle_file.stat() | |
| os.utime(bundle_file, (stat_result.st_atime, stat_result.st_mtime + 1)) |
Add a new
dag_bundle_config_pathsetting that allows loading DAG bundle configurations from JSON files in a directory, enabling hot-reloading without restarting the DAG processor.Motivation: In Kubernetes environments, bundle configurations need to change frequently (e.g., via ConfigMaps). The current
dag_bundle_config_listrequires static config inairflow.cfgand a processor restart. This feature allows mounting a directory of JSON files and having the DAG processor automatically detect and apply changes.Key changes:
DagBundlesManager: new_parse_config_from_path(),check_config_path_changes(), andget_bundle_path_safe()methodsDagFileProcessorManager: reuses a singleDagBundlesManagerinstance; detects config path changes in the refresh loop, cleaning up removed bundles (processors, file stats, DAGs)ParseImportError.full_file_path(): graceful handling when bundles are removedconfig.yml: newdag_bundle_config_pathoption under[dag_processor]JSON config format:
{ "name": "my-git-repo", "classpath": "airflow.providers.git.bundles.git.GitDagBundle", "kwargs": { "tracking_ref": "main", "git_conn_id": "my_git_conn" } }Behavior:
dag_bundle_config_pathis set, it takes precedence overdag_bundle_config_listWas generative AI tooling used to co-author this PR?
Generated-by: Claude Code (Opus 4.6) following the guidelines