Skip to content

Add dynamic DAG bundle configuration from file path#63928

Open
tardunge wants to merge 3 commits intoapache:mainfrom
tardunge:feature/dynamic-dag-bundle-config
Open

Add dynamic DAG bundle configuration from file path#63928
tardunge wants to merge 3 commits intoapache:mainfrom
tardunge:feature/dynamic-dag-bundle-config

Conversation

@tardunge
Copy link
Copy Markdown

Add a new dag_bundle_config_path setting that allows loading DAG bundle configurations from JSON files in a directory, enabling hot-reloading without restarting the DAG processor.

Motivation: In Kubernetes environments, bundle configurations need to change frequently (e.g., via ConfigMaps). The current dag_bundle_config_list requires static config in airflow.cfg and a processor restart. This feature allows mounting a directory of JSON files and having the DAG processor automatically detect and apply changes.

Key changes:

  • DagBundlesManager: new _parse_config_from_path(), check_config_path_changes(), and get_bundle_path_safe() methods
  • DagFileProcessorManager: reuses a single DagBundlesManager instance; detects config path changes in the refresh loop, cleaning up removed bundles (processors, file stats, DAGs)
  • ParseImportError.full_file_path(): graceful handling when bundles are removed
  • config.yml: new dag_bundle_config_path option under [dag_processor]

JSON config format:

{
  "name": "my-git-repo",
  "classpath": "airflow.providers.git.bundles.git.GitDagBundle",
  "kwargs": {
    "tracking_ref": "main",
    "git_conn_id": "my_git_conn"
  }
}

Behavior:

  • When dag_bundle_config_path is set, it takes precedence over dag_bundle_config_list
  • The DAG processor monitors the directory for JSON file additions, removals, and modifications via mtime tracking
  • On change: reloads config, syncs to DB, cleans up removed bundle processors/DAGs, force-refreshes active bundles
  • Invalid JSON files are logged and skipped; duplicate bundle names are warned and skipped

Was generative AI tooling used to co-author this PR?
  • Yes — Claude Code (Opus 4.6)

Generated-by: Claude Code (Opus 4.6) following the guidelines

@boring-cyborg
Copy link
Copy Markdown

boring-cyborg bot commented Mar 19, 2026

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our prek-hooks will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@tardunge tardunge force-pushed the feature/dynamic-dag-bundle-config branch 2 times, most recently from ff9f785 to fbdca7e Compare March 19, 2026 16:13
TARDUNGE added 3 commits March 20, 2026 10:41
Adds a new `dag_bundle_config_path` setting that allows loading DAG bundle
configurations from JSON files in a directory. This enables hot-reloading
of bundle configurations without restarting the DAG processor, which is
particularly useful in Kubernetes environments where ConfigMaps can be
mounted as files.

When `dag_bundle_config_path` is set, it takes precedence over the
existing `dag_bundle_config_list` setting. The DAG processor monitors
the directory for file additions, removals, and modifications, and
automatically reloads bundles when changes are detected.

Key changes:
- DagBundlesManager: new _parse_config_from_path(), check_config_path_changes(),
  and get_bundle_path_safe() methods
- DagFileProcessorManager: reuses a single DagBundlesManager instance and
  detects config path changes in the refresh loop, cleaning up removed bundles
- ParseImportError: graceful handling when bundles are removed
- config.yml: new dag_bundle_config_path option
Rename loop variable to avoid shadowing Path-typed file_path with
str-typed iteration variable from dict.items().
…nges

The check_config_path_changes() call in _refresh_dag_bundles() must use
`is True` identity check rather than truthiness, because MagicMock
returns a truthy MagicMock object by default. Also restore sync_bundles()
and get_all_bundles() to use throwaway DagBundlesManager instances
(matching upstream pattern) to avoid mock interference.
@tardunge tardunge force-pushed the feature/dynamic-dag-bundle-config branch from fbdca7e to 7cfe224 Compare March 20, 2026 02:41
@uranusjr
Copy link
Copy Markdown
Member

It looks to me this relies on the directory content following certain conventions that did not previously exist. This is a problem on its own, but it means descriptions must be added to the documentation in relevant sections.

@potiuk potiuk added the ready for maintainer review Set after triaging when all criteria pass. label Mar 20, 2026
@kaxil kaxil requested a review from Copilot April 2, 2026 00:44
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds support for dynamically loading DAG bundle configuration from a directory of JSON files (with change detection), enabling hot-reload behavior in the DAG processor without requiring restarts.

Changes:

  • Add dag_bundle_config_path config option and JSON-directory parsing logic to DagBundlesManager
  • Add config-path change detection + bundle cleanup/reload flow in DagFileProcessorManager
  • Make ParseImportError.full_file_path() resilient when a referenced bundle has been removed

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py Adds unit tests for config-path parsing, precedence, change detection, and safe bundle accessors
airflow-core/src/airflow/models/errors.py Avoids raising when a bundle referenced by an import error has been removed
airflow-core/src/airflow/dag_processing/manager.py Reuses a bundles manager and triggers reload/cleanup when config-path changes
airflow-core/src/airflow/dag_processing/bundles/manager.py Implements config-path parsing, mtime tracking, and “safe” bundle access methods
airflow-core/src/airflow/config_templates/config.yml Documents new dag_bundle_config_path setting

Comment on lines +424 to +455
# Clear old mtime entries before repopulating
self._config_path_mtime.clear()

config_list = []
seen_bundle_names: set[str] = set()

for file_path in path.glob("*.json"):
try:
config = json.loads(file_path.read_text())
if not isinstance(config, dict):
self.log.error("Invalid config in %s: Expected dict but got %s", file_path, type(config))
continue

# Check for duplicate bundle names
bundle_name = config.get("name")
if bundle_name and bundle_name in seen_bundle_names:
self.log.warning(
"Duplicate bundle name '%s' found in %s, skipping this file",
bundle_name,
file_path,
)
continue
if bundle_name:
seen_bundle_names.add(bundle_name)

config_list.append(config)
# Track file modification time for change detection
self._config_path_mtime[str(file_path)] = file_path.stat().st_mtime
except json.JSONDecodeError as e:
self.log.error("Failed to parse JSON from %s: %s", file_path, e)
except Exception as e:
self.log.error("Error reading config file %s: %s", file_path, e)
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check_config_path_changes() compares the directory's *.json files to self._config_path_mtime, but _config_path_mtime is currently populated only for files that successfully parse and are not skipped as duplicates. If the directory contains any invalid JSON, schema-invalid config, or skipped duplicate-name file, the key sets will never match and the manager will report changes on every loop, causing repeated reload churn. Consider tracking mtimes for all *.json files (even ones skipped for invalid content/duplicates), and keep parsing/validation concerns separate from change-detection bookkeeping.

Copilot uses AI. Check for mistakes.
Comment on lines +501 to +512
# Check for added or removed files
if set(current_files.keys()) != set(self._config_path_mtime.keys()):
self.log.info("DAG bundle config files added or removed")
return True

# Check for modified files
for fpath, mtime in current_files.items():
if fpath in self._config_path_mtime and self._config_path_mtime[fpath] != mtime:
self.log.info("DAG bundle config file modified: %s", fpath)
return True

return False
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check_config_path_changes() compares the directory's *.json files to self._config_path_mtime, but _config_path_mtime is currently populated only for files that successfully parse and are not skipped as duplicates. If the directory contains any invalid JSON, schema-invalid config, or skipped duplicate-name file, the key sets will never match and the manager will report changes on every loop, causing repeated reload churn. Consider tracking mtimes for all *.json files (even ones skipped for invalid content/duplicates), and keep parsing/validation concerns separate from change-detection bookkeeping.

Suggested change
# Check for added or removed files
if set(current_files.keys()) != set(self._config_path_mtime.keys()):
self.log.info("DAG bundle config files added or removed")
return True
# Check for modified files
for fpath, mtime in current_files.items():
if fpath in self._config_path_mtime and self._config_path_mtime[fpath] != mtime:
self.log.info("DAG bundle config file modified: %s", fpath)
return True
return False
# Use a dedicated snapshot of all observed JSON files for change detection.
# This is intentionally separate from self._config_path_mtime, which only
# tracks successfully parsed and non-duplicate configs.
previous_files: dict[str, float] = getattr(self, "_config_path_all_mtime", {})
files_changed = False
# Check for added or removed files
if set(current_files.keys()) != set(previous_files.keys()):
self.log.info("DAG bundle config files added or removed")
files_changed = True
else:
# Check for modified files
for fpath, mtime in current_files.items():
if previous_files.get(fpath) != mtime:
self.log.info("DAG bundle config file modified: %s", fpath)
files_changed = True
break
# Update the snapshot to the current state so that subsequent checks
# compare against the latest observed directory contents.
self._config_path_all_mtime = current_files
return files_changed

Copilot uses AI. Check for mistakes.
Comment on lines +439 to +447
if bundle_name and bundle_name in seen_bundle_names:
self.log.warning(
"Duplicate bundle name '%s' found in %s, skipping this file",
bundle_name,
file_path,
)
continue
if bundle_name:
seen_bundle_names.add(bundle_name)
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bundle_name is taken directly from JSON and added to a set. If name is present but not a string (e.g., a JSON array/object), seen_bundle_names.add(bundle_name) will raise TypeError: unhashable type, breaking config loading. Validate name is a str (and non-empty) before using it for duplicate detection; otherwise log and skip that file.

Suggested change
if bundle_name and bundle_name in seen_bundle_names:
self.log.warning(
"Duplicate bundle name '%s' found in %s, skipping this file",
bundle_name,
file_path,
)
continue
if bundle_name:
seen_bundle_names.add(bundle_name)
if bundle_name is None:
self.log.error("Invalid config in %s: Missing required 'name' field", file_path)
continue
if not isinstance(bundle_name, str):
self.log.error(
"Invalid config in %s: 'name' field must be a string, got %r",
file_path,
bundle_name,
)
continue
if not bundle_name:
self.log.error("Invalid config in %s: 'name' field cannot be empty", file_path)
continue
if bundle_name in seen_bundle_names:
self.log.warning(
"Duplicate bundle name '%s' found in %s, skipping this file",
bundle_name,
file_path,
)
continue
seen_bundle_names.add(bundle_name)

Copilot uses AI. Check for mistakes.
Comment on lines +416 to +419
path = Path(config_path)
if not path.exists():
self.log.warning("DAG bundle config path does not exist: %s", config_path)
return
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In path mode, _parse_config_from_path() returns early when the directory is missing or when no valid configs are found, without clearing self._bundle_config. This can leave stale bundles configured even after all JSON files are removed (or the directory disappears), which undermines the PR’s “removals” behavior and prevents cleanup from ever triggering. A robust approach is to treat 'missing/empty/no-valid-config' as 'no bundles configured' (clear _bundle_config accordingly), and have check_config_path_changes() report a change when the directory disappears (at least when it previously existed / had tracked files).

Copilot uses AI. Check for mistakes.
Comment on lines +457 to +459
if not config_list:
self.log.warning("No valid DAG bundle configs found in %s", config_path)
return
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In path mode, _parse_config_from_path() returns early when the directory is missing or when no valid configs are found, without clearing self._bundle_config. This can leave stale bundles configured even after all JSON files are removed (or the directory disappears), which undermines the PR’s “removals” behavior and prevents cleanup from ever triggering. A robust approach is to treat 'missing/empty/no-valid-config' as 'no bundles configured' (clear _bundle_config accordingly), and have check_config_path_changes() report a change when the directory disappears (at least when it previously existed / had tracked files).

Copilot uses AI. Check for mistakes.
return False

path = Path(config_path)
if not path.exists() or not path.is_dir():
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In path mode, _parse_config_from_path() returns early when the directory is missing or when no valid configs are found, without clearing self._bundle_config. This can leave stale bundles configured even after all JSON files are removed (or the directory disappears), which undermines the PR’s “removals” behavior and prevents cleanup from ever triggering. A robust approach is to treat 'missing/empty/no-valid-config' as 'no bundles configured' (clear _bundle_config accordingly), and have check_config_path_changes() report a change when the directory disappears (at least when it previously existed / had tracked files).

Suggested change
if not path.exists() or not path.is_dir():
if not path.exists() or not path.is_dir():
# If we previously tracked files from this path, treat disappearance as a change
if getattr(self, "_config_path_mtime", None):
self.log.info("DAG bundle config path '%s' no longer exists or is not a directory", path)
return True

Copilot uses AI. Check for mistakes.

config_list.append(config)
# Track file modification time for change detection
self._config_path_mtime[str(file_path)] = file_path.stat().st_mtime
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using st_mtime (float, often with coarse resolution depending on filesystem/platform) can miss rapid successive edits, meaning config changes may not be detected reliably. To make reload detection more robust, consider using st_mtime_ns (integer nanoseconds) or combining mtime with file size/content hashing for change detection.

Suggested change
self._config_path_mtime[str(file_path)] = file_path.stat().st_mtime
self._config_path_mtime[str(file_path)] = file_path.stat().st_mtime_ns

Copilot uses AI. Check for mistakes.
Comment on lines +497 to +499
current_files: dict[str, float] = {}
for file_path in path.glob("*.json"):
current_files[str(file_path)] = file_path.stat().st_mtime
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using st_mtime (float, often with coarse resolution depending on filesystem/platform) can miss rapid successive edits, meaning config changes may not be detected reliably. To make reload detection more robust, consider using st_mtime_ns (integer nanoseconds) or combining mtime with file size/content hashing for change detection.

Suggested change
current_files: dict[str, float] = {}
for file_path in path.glob("*.json"):
current_files[str(file_path)] = file_path.stat().st_mtime
current_files: dict[str, int] = {}
for file_path in path.glob("*.json"):
current_files[str(file_path)] = file_path.stat().st_mtime_ns

Copilot uses AI. Check for mistakes.
Comment on lines +557 to +560
# Modify the file
time.sleep(0.01) # Ensure mtime changes
bundle_config["kwargs"]["refresh_interval"] = 99
bundle_file.write_text(json.dumps(bundle_config))
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test relies on time.sleep(0.01) to force an mtime change, which can be flaky on filesystems with coarse timestamp granularity. Prefer explicitly bumping the mtime (e.g., via os.utime) or using st_mtime_ns in the implementation and adjusting the test accordingly, so the test doesn’t depend on timing.

Suggested change
# Modify the file
time.sleep(0.01) # Ensure mtime changes
bundle_config["kwargs"]["refresh_interval"] = 99
bundle_file.write_text(json.dumps(bundle_config))
# Modify the file and ensure its mtime changes deterministically
bundle_config["kwargs"]["refresh_interval"] = 99
bundle_file.write_text(json.dumps(bundle_config))
stat_result = bundle_file.stat()
os.utime(bundle_file, (stat_result.st_atime, stat_result.st_mtime + 1))

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants