feat: Add resource quota awareness to KubernetesPodOperator#63946
feat: Add resource quota awareness to KubernetesPodOperator#63946mrrsm wants to merge 1 commit intoapache:mainfrom
Conversation
06085e2 to
c2c6976
Compare
| if self.check_resource_quotas and self.on_quota_exceeded != "ignore": | ||
| try: | ||
| check_pod_quota_compliance(self.client, pod_request_obj, pod_request_obj.metadata.namespace) | ||
| except PodResourceQuotaExceededException as e: | ||
| if self.on_quota_exceeded == "queue": | ||
| self.log.warning( | ||
| "Pod creation would exceed resource quota. Task will be rescheduled to retry later. %s", | ||
| str(e), | ||
| ) | ||
| reschedule_time = datetime.datetime.now(datetime.timezone.utc) + timedelta( | ||
| seconds=self.quota_check_interval | ||
| ) | ||
| raise AirflowRescheduleException(reschedule_time) | ||
| if self.on_quota_exceeded == "fail": | ||
| self.log.error( | ||
| "Pod creation blocked due to resource quota violation. " | ||
| "Set on_quota_exceeded='queue' to retry or 'ignore' to skip this check." | ||
| ) | ||
| raise |
There was a problem hiding this comment.
The quota issue is not so trivial problem and I think this is somewhat over simplifying it.
I find this also somewhat confusing as reschedule is something reserved for scheduler and this is not about that.
There is also the question of how K8s executor should act on quota issues.
SameerMesiah97
left a comment
There was a problem hiding this comment.
I have to agree with @eladkal here but from a slightly different angle. This is arguably well-motivated as an ersatz "Kueue-like" layer but there are several issues with this PR:
-
The biggest one: this check is inherently non-atomic. Since quota enforcement is handled by Kubernetes, there is no guarantee that the result of this check still holds when the pod creation request is actually sent. In a concurrent scenario (multiple tasks doing this at roughly the same time), multiple tasks could pass the check and then race to create pods, but only some will actually be admitted. So this ends up being best-effort at best, and in some cases kind of redundant.
-
The "queue" behavior sounds nice in isolation, but here it’s basically introducing scheduling/retry semantics inside the operator. Airflow already has task-level retries and rescheduling, so this feels like duplicating that logic but in a slightly different form. Also not sure KPO is the right place for this; it’s usually just a thin wrapper around pod submission, not something that should be doing scheduling decisions.
-
Now, I will caveat this by stating upfront that I am not too familiar with the internals of Kubernetes Admission Control (KAC), but in
resource_quota.py, it looks like you are trying to predict whether a request will be accepted based on your own rudimentary implementation of KAC internals. First, this seems unnecessary. Why predict something that you can easily discover by trying to send the request. Second, how confident are you that this fully replicates KAC internals. And third, even if you can replicate KAC internals faithfully, are we willing to accept the risk of this new implementation diverging from it?
More broadly this feels like it’s drifting from “quota awareness” into “quota-aware scheduling”, which is probably better handled either at the Kubernetes level (e.g. Kueue) or by leaning on existing Airflow primitives.
Nataneljpwd
left a comment
There was a problem hiding this comment.
Looks good, a few comments and questions
| try: | ||
| from airflow.sdk.exceptions import AirflowRescheduleException | ||
| except ImportError: | ||
| from airflow.exceptions import AirflowRescheduleException # type: ignore[no-redef] |
There was a problem hiding this comment.
Why not use the common compat provider here?
It does this for you
| :param on_quota_exceeded: action to take when pod would exceed resource quota. Options: | ||
| "queue" (default) - reschedule the task to try again later, | ||
| "fail" - fail the task immediately with an exception, | ||
| "ignore" - proceed with pod creation anyway (same as check_resource_quotas=False). |
There was a problem hiding this comment.
Is this option needed if we already have the option to have the same behavior?
It can simplify the code and probably even remove a few tests
There was a problem hiding this comment.
If both the above and this comment is true, the code can be vastly simplified as you can remove all that is not the queue mode behavior
| Default to False. | ||
| :param on_quota_exceeded: action to take when pod would exceed resource quota. Options: | ||
| "queue" (default) - reschedule the task to try again later, | ||
| "fail" - fail the task immediately with an exception, |
There was a problem hiding this comment.
As of now it will happen either way no?
What is done differently here? To me it seems like the exact same behavior
| reschedule_time = datetime.datetime.now(datetime.timezone.utc) + timedelta( | ||
| seconds=self.quota_check_interval | ||
| ) | ||
| raise AirflowRescheduleException(reschedule_time) |
There was a problem hiding this comment.
This is reserved for the scheduler usually, maybe instead it is better to wait and retry later?
| if not quantity: | ||
| return 0.0 | ||
|
|
||
| quantity = str(quantity).strip() | ||
| if not quantity: | ||
| return 0.0 |
There was a problem hiding this comment.
Why is it checked twice? Why not the same if?
| binary_suffixes = { | ||
| "Ki": 1024, | ||
| "Mi": 1024**2, | ||
| "Gi": 1024**3, | ||
| "Ti": 1024**4, | ||
| "Pi": 1024**5, | ||
| "Ei": 1024**6, | ||
| } | ||
|
|
||
| # Decimal suffixes (base 1000) | ||
| decimal_suffixes = { | ||
| "k": 1000, | ||
| "M": 1000**2, | ||
| "G": 1000**3, | ||
| "T": 1000**4, | ||
| "P": 1000**5, | ||
| "E": 1000**6, | ||
| } |
There was a problem hiding this comment.
K8s and python has this functionality built in using format_quantity and parse_quantity
| try: | ||
| return float(quantity[:-1]) / 1000 | ||
| except ValueError: | ||
| return 0.0 | ||
|
|
||
| # Check for binary suffixes | ||
| for suffix, multiplier in binary_suffixes.items(): | ||
| if quantity.endswith(suffix): | ||
| try: | ||
| return float(quantity[: -len(suffix)]) * multiplier | ||
| except ValueError: | ||
| return 0.0 | ||
|
|
||
| # Check for decimal suffixes | ||
| for suffix, multiplier in decimal_suffixes.items(): | ||
| if quantity.endswith(suffix): | ||
| try: | ||
| return float(quantity[: -len(suffix)]) * multiplier | ||
| except ValueError: | ||
| return 0.0 |
There was a problem hiding this comment.
This check is redundant and already implemented
| if e.status == 403: | ||
| logger.warning( | ||
| "Insufficient permissions to check resource quotas in namespace %s. " | ||
| "Skipping quota validation.", | ||
| namespace, | ||
| ) |
There was a problem hiding this comment.
Maybe it is better to fail here, wdyt?
| ) | ||
| return None | ||
| if e.status == 404: | ||
| logger.debug("Namespace %s not found for quota check", namespace) |
| # No quotas defined or couldn't check, allow pod creation | ||
| return | ||
|
|
||
| used_resources, hard_limits = quota_info |
There was a problem hiding this comment.
This will still fail if pods are created between getting the resource and the check here, maybe a different approach is meeded
|
@mrrsm This PR has been converted to draft because it does not yet meet our Pull Request quality criteria. Issues found:
What to do next:
Converting a PR to draft is not a rejection — it is an invitation to bring the PR up to the project's standards so that maintainer review time is spent productively. There is no rush — take your time and work at your own pace. We appreciate your contribution and are happy to wait for updates. If you have questions, feel free to ask on the Airflow Slack. |
This adds, in a non-breaking way, additional parameters for the KubernesPodOperator which allow the lookup of a resource quota in the namespace that is being deployed to to ensure there is proper resources available for the deployment of the pod. The user can choose what to do if the quota would be exceeded. The options if the quota is exceeded are to queue (default), fail, or ignore and try anyways. The default retry is 60 seconds but is configurable via another param.
I was able to successfully test this in a breeze setup connected to a kind cluster.
I will open the PR to update the decorator after any changes to the params are finalized in this PR.
closes: #63944
Was generative AI tooling used to co-author this PR?
Generated-by: [Claude Sonnet 4.5] following the guidelines