Skip to content

feat: add KubeflowExecutor for Kubeflow Training Operator (TrainJob CRD)#462

Open
ko3n1g wants to merge 16 commits intomainfrom
feat/pytorchjob-executor
Open

feat: add KubeflowExecutor for Kubeflow Training Operator (TrainJob CRD)#462
ko3n1g wants to merge 16 commits intomainfrom
feat/pytorchjob-executor

Conversation

@ko3n1g
Copy link
Contributor

@ko3n1g ko3n1g commented Mar 12, 2026

Summary

  • Adds `KubeflowExecutor` that submits distributed training jobs to any Kubernetes cluster running the Kubeflow Training Operator
  • Supports both PyTorchJob (Training Operator v1) and TrainJob (Training Operator v2) via a `job_kind` toggle
  • Pairs with a TorchX scheduler so jobs integrate with `run.run()` and `run.Experiment`
  • Kubernetes config loaded automatically (local kubeconfig → in-cluster fallback)

PyTorchJob vs TrainJob

PyTorchJob TrainJob
API `kubeflow.org/v1` `trainer.kubeflow.org/v1alpha1`
Pod config directly in replica pod spec `podTemplateOverrides[].spec`
`nproc` `spec.nprocPerNode` `spec.trainer.numProcPerNode`

Notable fields

  • `tolerations`, `affinity` — go into pod spec / `podTemplateOverrides` automatically
  • `env_list` — full env var dicts supporting `valueFrom` / `secretKeyRef`
  • `pod_spec_overrides` — arbitrary extra pod spec fields (e.g. `resourceClaims` for IMEX channels)
  • `launch(wait=True)` — polls until `RUNNING` / `SUCCEEDED` / `FAILED`
  • `cancel(wait=True)` — polls until CR gone and all pods terminated
  • `UNKNOWN`/`None` status → `AppState.PENDING` (avoids false failures on transient API errors)

Minimal E2E example

```python
import nemo_run as run
from nemo_run.core.execution.kubeflow import KubeflowExecutor

executor = KubeflowExecutor(
namespace="my-namespace",
image="pytorch/pytorch:2.3.0-cuda12.1-cudnn8-runtime",
num_nodes=2,
gpus_per_node=8,
launcher=run.Torchrun(), # torchrun args injected automatically
volumes=[{"name": "data", "persistentVolumeClaim": {"claimName": "my-pvc"}}],
volume_mounts=[{"name": "data", "mountPath": "/data"}],
)

script = run.Script("train.py")

run.run(script, executor=executor, name="my-training-job")
```

Test plan

  • 63 unit tests passing (`pytest test/core/execution/test_kubeflow.py test/run/torchx_backend/schedulers/test_kubeflow.py`)
  • PyTorchJob e2e verified against AWS EKS (`local/example.py`): launch → RUNNING → log sentinel → cancel(wait=True)
  • TrainJob e2e pending GKE cluster readiness (`local/example_trainjob.py`)

🤖 Generated with Claude Code

Loading
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants