Skip to content

feat: Add resource quota awareness to KubernetesPodOperator#63946

Draft
mrrsm wants to merge 1 commit intoapache:mainfrom
mrrsm:feat/resource-aware-pod-operator
Draft

feat: Add resource quota awareness to KubernetesPodOperator#63946
mrrsm wants to merge 1 commit intoapache:mainfrom
mrrsm:feat/resource-aware-pod-operator

Conversation

@mrrsm
Copy link
Copy Markdown
Contributor

@mrrsm mrrsm commented Mar 19, 2026

This adds, in a non-breaking way, additional parameters for the KubernesPodOperator which allow the lookup of a resource quota in the namespace that is being deployed to to ensure there is proper resources available for the deployment of the pod. The user can choose what to do if the quota would be exceeded. The options if the quota is exceeded are to queue (default), fail, or ignore and try anyways. The default retry is 60 seconds but is configurable via another param.

I was able to successfully test this in a breeze setup connected to a kind cluster.

I will open the PR to update the decorator after any changes to the params are finalized in this PR.

closes: #63944

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Generated-by: [Claude Sonnet 4.5] following the guidelines

@boring-cyborg boring-cyborg Bot added area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues labels Mar 19, 2026
@mrrsm mrrsm force-pushed the feat/resource-aware-pod-operator branch from 06085e2 to c2c6976 Compare March 19, 2026 18:39
Comment on lines +659 to +677
if self.check_resource_quotas and self.on_quota_exceeded != "ignore":
try:
check_pod_quota_compliance(self.client, pod_request_obj, pod_request_obj.metadata.namespace)
except PodResourceQuotaExceededException as e:
if self.on_quota_exceeded == "queue":
self.log.warning(
"Pod creation would exceed resource quota. Task will be rescheduled to retry later. %s",
str(e),
)
reschedule_time = datetime.datetime.now(datetime.timezone.utc) + timedelta(
seconds=self.quota_check_interval
)
raise AirflowRescheduleException(reschedule_time)
if self.on_quota_exceeded == "fail":
self.log.error(
"Pod creation blocked due to resource quota violation. "
"Set on_quota_exceeded='queue' to retry or 'ignore' to skip this check."
)
raise
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The quota issue is not so trivial problem and I think this is somewhat over simplifying it.
I find this also somewhat confusing as reschedule is something reserved for scheduler and this is not about that.

There is also the question of how K8s executor should act on quota issues.

Copy link
Copy Markdown
Contributor

@SameerMesiah97 SameerMesiah97 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to agree with @eladkal here but from a slightly different angle. This is arguably well-motivated as an ersatz "Kueue-like" layer but there are several issues with this PR:

  1. The biggest one: this check is inherently non-atomic. Since quota enforcement is handled by Kubernetes, there is no guarantee that the result of this check still holds when the pod creation request is actually sent. In a concurrent scenario (multiple tasks doing this at roughly the same time), multiple tasks could pass the check and then race to create pods, but only some will actually be admitted. So this ends up being best-effort at best, and in some cases kind of redundant.

  2. The "queue" behavior sounds nice in isolation, but here it’s basically introducing scheduling/retry semantics inside the operator. Airflow already has task-level retries and rescheduling, so this feels like duplicating that logic but in a slightly different form. Also not sure KPO is the right place for this; it’s usually just a thin wrapper around pod submission, not something that should be doing scheduling decisions.

  3. Now, I will caveat this by stating upfront that I am not too familiar with the internals of Kubernetes Admission Control (KAC), but in resource_quota.py, it looks like you are trying to predict whether a request will be accepted based on your own rudimentary implementation of KAC internals. First, this seems unnecessary. Why predict something that you can easily discover by trying to send the request. Second, how confident are you that this fully replicates KAC internals. And third, even if you can replicate KAC internals faithfully, are we willing to accept the risk of this new implementation diverging from it?

More broadly this feels like it’s drifting from “quota awareness” into “quota-aware scheduling”, which is probably better handled either at the Kubernetes level (e.g. Kueue) or by leaning on existing Airflow primitives.

Copy link
Copy Markdown
Contributor

@Nataneljpwd Nataneljpwd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, a few comments and questions

Comment on lines +91 to +94
try:
from airflow.sdk.exceptions import AirflowRescheduleException
except ImportError:
from airflow.exceptions import AirflowRescheduleException # type: ignore[no-redef]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use the common compat provider here?
It does this for you

:param on_quota_exceeded: action to take when pod would exceed resource quota. Options:
"queue" (default) - reschedule the task to try again later,
"fail" - fail the task immediately with an exception,
"ignore" - proceed with pod creation anyway (same as check_resource_quotas=False).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this option needed if we already have the option to have the same behavior?
It can simplify the code and probably even remove a few tests

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If both the above and this comment is true, the code can be vastly simplified as you can remove all that is not the queue mode behavior

Default to False.
:param on_quota_exceeded: action to take when pod would exceed resource quota. Options:
"queue" (default) - reschedule the task to try again later,
"fail" - fail the task immediately with an exception,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of now it will happen either way no?
What is done differently here? To me it seems like the exact same behavior

reschedule_time = datetime.datetime.now(datetime.timezone.utc) + timedelta(
seconds=self.quota_check_interval
)
raise AirflowRescheduleException(reschedule_time)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is reserved for the scheduler usually, maybe instead it is better to wait and retry later?

Comment on lines +49 to +54
if not quantity:
return 0.0

quantity = str(quantity).strip()
if not quantity:
return 0.0
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it checked twice? Why not the same if?

Comment on lines +57 to +74
binary_suffixes = {
"Ki": 1024,
"Mi": 1024**2,
"Gi": 1024**3,
"Ti": 1024**4,
"Pi": 1024**5,
"Ei": 1024**6,
}

# Decimal suffixes (base 1000)
decimal_suffixes = {
"k": 1000,
"M": 1000**2,
"G": 1000**3,
"T": 1000**4,
"P": 1000**5,
"E": 1000**6,
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

K8s and python has this functionality built in using format_quantity and parse_quantity

Comment on lines +78 to +97
try:
return float(quantity[:-1]) / 1000
except ValueError:
return 0.0

# Check for binary suffixes
for suffix, multiplier in binary_suffixes.items():
if quantity.endswith(suffix):
try:
return float(quantity[: -len(suffix)]) * multiplier
except ValueError:
return 0.0

# Check for decimal suffixes
for suffix, multiplier in decimal_suffixes.items():
if quantity.endswith(suffix):
try:
return float(quantity[: -len(suffix)]) * multiplier
except ValueError:
return 0.0
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is redundant and already implemented

Comment on lines +157 to +162
if e.status == 403:
logger.warning(
"Insufficient permissions to check resource quotas in namespace %s. "
"Skipping quota validation.",
namespace,
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it is better to fail here, wdyt?

)
return None
if e.status == 404:
logger.debug("Namespace %s not found for quota check", namespace)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

# No quotas defined or couldn't check, allow pod creation
return

used_resources, hard_limits = quota_info
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will still fail if pods are created between getting the resource and the check here, maybe a different approach is meeded

@potiuk potiuk marked this pull request as draft March 20, 2026 07:59
@potiuk
Copy link
Copy Markdown
Member

potiuk commented Mar 20, 2026

@mrrsm This PR has been converted to draft because it does not yet meet our Pull Request quality criteria.

Issues found:

  • mypy (type checking): Failing: CI image checks / MyPy checks (mypy-providers). Run prek --stage manual mypy-providers --all-files locally to reproduce. You need breeze ci-image build --python 3.10 for Docker-based mypy. See mypy (type checking) docs.
  • Provider tests: Failing: provider distributions tests / Compat 3.0.6:P3.10:. Run provider tests with breeze run pytest <provider-test-path> -xvs. See Provider tests docs.
  • Other failing CI checks: Failing: CI image checks / Build documentation (--spellcheck-only). Run prek run --from-ref main locally to reproduce. See static checks docs.
  • ⚠️ Unresolved review comments: This PR has 1 unresolved review thread from maintainers: @eladkal (MEMBER): 1 unresolved thread. Please review and resolve all inline review comments before requesting another review. You can resolve a conversation by clicking 'Resolve conversation' on each thread after addressing the feedback. See pull request guidelines.

What to do next:

  • The comment informs you what you need to do.
  • Fix each issue, then mark the PR as "Ready for review" in the GitHub UI - but only after making sure that all the issues are fixed.
  • There is no rush — take your time and work at your own pace. We appreciate your contribution and are happy to wait for updates.
  • Maintainers will then proceed with a normal review.

Converting a PR to draft is not a rejection — it is an invitation to bring the PR up to the project's standards so that maintainer review time is spent productively. There is no rush — take your time and work at your own pace. We appreciate your contribution and are happy to wait for updates. If you have questions, feel free to ask on the Airflow Slack.

@kaxil kaxil requested a review from Copilot April 2, 2026 00:44
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Resource quota aware kubernetes pod operator

6 participants