From e432b388bca9b6d59fcdd4b8a5eecebb7d53aec0 Mon Sep 17 00:00:00 2001 From: vsoch Date: Tue, 16 Jun 2026 22:18:40 -0700 Subject: [PATCH 1/2] feat: jgf is general to support virtual/non-virtual nodes we need to model everything as a node so contraints (filters) will be honored in fluxion. Signed-off-by: vsoch --- pkg/jgf/jgf.go | 19 ++++++++ pkg/jgf/jgf_test.go | 109 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 128 insertions(+) create mode 100644 pkg/jgf/jgf_test.go diff --git a/pkg/jgf/jgf.go b/pkg/jgf/jgf.go index ca9bd98..acbb75d 100644 --- a/pkg/jgf/jgf.go +++ b/pkg/jgf/jgf.go @@ -29,6 +29,7 @@ type Vertex struct { unit string exclusive bool props map[string]any + nodeProps map[string]string } // Name is the vertex name (basename + per-type index, or an explicit name). @@ -61,6 +62,13 @@ type Options struct { // Properties are extra metadata fields merged into the vertex (e.g. // num_qubits, vendor). They are emitted alongside the standard fields. Properties map[string]any + // NodeProperties are RFC 31 resource properties, emitted as a nested + // "properties" object in the vertex metadata. Unlike Properties (which + // flattens descriptive fields into the top level), these populate + // resource_t.properties in Fluxion and are what the matcher's constraint + // pruning (by_constraint) and queries match against. Keys/values are + // strings; a bare tag is value "" (matching is by key presence). + NodeProperties map[string]string } // AddRoot creates a top-level vertex (typically the cluster). @@ -98,6 +106,7 @@ func (b *Builder) add(parent *Vertex, typ, basename string, opts Options) *Verte unit: opts.Unit, exclusive: opts.Exclusive, props: opts.Properties, + nodeProps: opts.NodeProperties, } if parent == nil { v.path = "/" + name @@ -140,6 +149,16 @@ func (m metadata) MarshalJSON() ([]byte, error) { out[k] = val } } + // RFC 31 resource properties: a nested object Fluxion loads into + // resource_t.properties (what constraint pruning and queries match on). + // Emitted only when present so vertices without properties are unchanged. + if len(m.v.nodeProps) > 0 { + props := make(map[string]string, len(m.v.nodeProps)) + for k, val := range m.v.nodeProps { + props[k] = val + } + out["properties"] = props + } // Stable key order for deterministic output. keys := make([]string, 0, len(out)) for k := range out { diff --git a/pkg/jgf/jgf_test.go b/pkg/jgf/jgf_test.go new file mode 100644 index 0000000..89bf4b2 --- /dev/null +++ b/pkg/jgf/jgf_test.go @@ -0,0 +1,109 @@ +package jgf + +import ( + "encoding/json" + "testing" +) + +// decodeNodes parses the builder's JSON into a slice of vertex metadata maps, +// keyed for easy assertions. +func decodeNodes(t *testing.T, b *Builder) []map[string]any { + t.Helper() + raw, err := b.JSON() + if err != nil { + t.Fatalf("JSON: %v", err) + } + var doc struct { + Graph struct { + Nodes []struct { + Metadata map[string]any `json:"metadata"` + } `json:"nodes"` + } `json:"graph"` + } + if err := json.Unmarshal(raw, &doc); err != nil { + t.Fatalf("unmarshal: %v\n%s", err, raw) + } + out := make([]map[string]any, 0, len(doc.Graph.Nodes)) + for _, n := range doc.Graph.Nodes { + out = append(out, n.Metadata) + } + return out +} + +func findByName(nodes []map[string]any, name string) map[string]any { + for _, m := range nodes { + if m["name"] == name { + return m + } + } + return nil +} + +// A vertex given NodeProperties emits a nested "properties" object with those +// keys/values — the RFC 31 form Fluxion matches constraints against. +func TestNodePropertiesEmittedAsNestedObject(t *testing.T) { + b := NewBuilder() + cluster := b.AddRoot("cluster", "cluster", Options{Name: "cluster"}) + b.AddChild(cluster, "node", "node", Options{ + Name: "qdevice0", + NodeProperties: map[string]string{ + "virtual": "true", + "fluxion.flux-framework.org/region": "us-east-1", + }, + }) + + meta := findByName(decodeNodes(t, b), "qdevice0") + if meta == nil { + t.Fatal("qdevice0 vertex not found") + } + props, ok := meta["properties"].(map[string]any) + if !ok { + t.Fatalf("properties is not a nested object: %#v", meta["properties"]) + } + if props["virtual"] != "true" { + t.Errorf("properties[virtual] = %v, want true", props["virtual"]) + } + if props["fluxion.flux-framework.org/region"] != "us-east-1" { + t.Errorf("properties[region] = %v, want us-east-1", props["fluxion.flux-framework.org/region"]) + } +} + +// A vertex without NodeProperties emits no "properties" key at all, so existing +// graphs are byte-for-byte unchanged. +func TestNoNodePropertiesMeansNoPropertiesKey(t *testing.T) { + b := NewBuilder() + cluster := b.AddRoot("cluster", "cluster", Options{Name: "cluster"}) + b.AddChild(cluster, "node", "node", Options{Name: "node0"}) + + meta := findByName(decodeNodes(t, b), "node0") + if meta == nil { + t.Fatal("node0 vertex not found") + } + if _, present := meta["properties"]; present { + t.Errorf("expected no properties key, got %#v", meta["properties"]) + } +} + +// NodeProperties is distinct from Properties: the latter still flattens into the +// top level, the former nests under "properties". +func TestPropertiesVsNodeProperties(t *testing.T) { + b := NewBuilder() + cluster := b.AddRoot("cluster", "cluster", Options{Name: "cluster"}) + b.AddChild(cluster, "qpu", "qpu", Options{ + Name: "sv1", + Properties: map[string]any{"vendor": "amazon"}, // flattened, descriptive + NodeProperties: map[string]string{"region": "us-east-1"}, // nested, matchable + }) + + meta := findByName(decodeNodes(t, b), "sv1") + if meta == nil { + t.Fatal("sv1 vertex not found") + } + if meta["vendor"] != "amazon" { + t.Errorf("expected flattened vendor=amazon at top level, got %v", meta["vendor"]) + } + props, ok := meta["properties"].(map[string]any) + if !ok || props["region"] != "us-east-1" { + t.Errorf("expected nested properties.region=us-east-1, got %#v", meta["properties"]) + } +} From 75037daba98e4196073631b58d66a013cfeebd41 Mon Sep 17 00:00:00 2001 From: vsoch Date: Wed, 17 Jun 2026 10:19:06 -0700 Subject: [PATCH 2/2] feat: generalize to work with custom devices This is a large refactor that models any custom device in the graph, and can handle custom constraint queries to look for specific attributes. Constraints can only be applied to node types, and further, must be done across the graph, so we need to do two queries to separate quantum (or similar) from traditional compute. Akin to before, we inject backend, etc. into the pod spec. From this I will be able to submit work to different braket backends and then test latency, and from that will determine the need to make a probe pod that will submit work and then ungate pods on the cluster. Signed-off-by: vsoch --- .github/workflows/e2e-tests.yaml | 35 +- Makefile | 4 +- README.md | 417 ++++++++++++++-------- cmd/deviceplugin/main.go | 4 +- cmd/webhook/main.go | 22 +- deploy/fluence-resources-test.yaml | 35 +- deploy/fluence-resources.yaml | 31 +- deploy/fluence-test.yaml | 8 +- examples/test/e2e/quantum-pod-mock-2.yaml | 2 +- examples/test/e2e/quantum-pod-mock.yaml | 2 +- pkg/cluster/cluster.go | 149 +++----- pkg/cluster/cluster_test.go | 40 +++ pkg/cluster/resources.go | 242 +++++++++++++ pkg/cluster/resources_graph.go | 197 ++++++++++ pkg/cluster/resources_graph_test.go | 221 ++++++++++++ pkg/cluster/resources_test.go | 214 +++++++++++ pkg/fluence/fluence.go | 266 +++++++++++--- pkg/fluence/fluence_test.go | 131 ++++++- pkg/graph/allocation.go | 95 +++-- pkg/graph/allocation_test.go | 36 ++ pkg/jgf/jgf.go | 31 +- pkg/placement/placement.go | 301 ++++++++++++++-- pkg/placement/placement_test.go | 190 +++++++--- pkg/webhook/webhook.go | 85 ++++- pkg/webhook/webhook_test.go | 89 ++++- 25 files changed, 2328 insertions(+), 519 deletions(-) create mode 100644 pkg/cluster/cluster_test.go create mode 100644 pkg/cluster/resources.go create mode 100644 pkg/cluster/resources_graph.go create mode 100644 pkg/cluster/resources_graph_test.go create mode 100644 pkg/cluster/resources_test.go diff --git a/.github/workflows/e2e-tests.yaml b/.github/workflows/e2e-tests.yaml index f3e8c45..fc60604 100644 --- a/.github/workflows/e2e-tests.yaml +++ b/.github/workflows/e2e-tests.yaml @@ -10,7 +10,7 @@ concurrency: env: KIND_VERSION: v0.32.0 - IMAGE: ghcr.io/converged-computing/fluence:latest + IMAGE: vanessa/fluence:test jobs: e2e: @@ -19,19 +19,20 @@ jobs: - name: Checkout uses: actions/checkout@v4 - #- name: Set up Docker Buildx - # uses: docker/setup-buildx-action@v3 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 - # - name: Build fluence image - # uses: docker/build-push-action@v6 - # with: - # context: . - # file: ./Dockerfile - # push: false - # load: true - # tags: ${{ env.IMAGE }} - # cache-from: type=gha - # cache-to: type=gha,mode=max + - name: Build fluence image + uses: docker/build-push-action@v6 + with: + context: . + file: ./Dockerfile + push: false + load: true + tags: ${{ env.IMAGE }} + cache-from: type=gha + cache-to: type=gha,mode=max + - name: Create k8s Kind Cluster uses: helm/kind-action@v1.10.0 with: @@ -55,6 +56,12 @@ jobs: echo "=== Disk space after cleanup ===" df -h + - name: Load docker images + run: | + kind get clusters + cluster=$(kind get clusters) + kind load --name $cluster docker-image vanessa/fluence:test + - name: Deploy fluence (base) run: | kubectl apply -f deploy/fluence-test.yaml @@ -62,6 +69,7 @@ jobs: POD=$(kubectl -n kube-system get pods -l app=fluence -o name | head -1) kubectl -n kube-system exec "${POD#pod/}" -- ls /tmp/ kubectl -n kube-system logs "${POD#pod/}" + sleep 2 kubectl -n kube-system exec "${POD#pod/}" -- /bin/bash -c "cat /tmp/fluence-graph-*.json" kubectl get nodes -o jsonpath='{range .items[*]}{.metadata.name}{": cpu="}{.status.allocatable.cpu}{" mem="}{.status.allocatable.memory}{"\n"}{end}' @@ -80,6 +88,7 @@ jobs: sleep 1 done POD=$(kubectl -n kube-system get pods -l app=fluence -o name | head -1) + sleep 2 kubectl -n kube-system exec "${POD#pod/}" -- /bin/bash -c "cat /tmp/fluence-graph-*.json" - name: Wait for webhook diff --git a/Makefile b/Makefile index ed62c28..df5519f 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ FLUX_SCHED_ROOT ?= /opt/flux-sched IMG ?= ghcr.io/converged-computing/fluence:latest -TEST_IMG ?= ghcr.io/converged-computing/fluence:test +TEST_IMG ?= vanessa/fluence:test # cgo flags for the scheduler binary: flux-sched only. CGO_CFLAGS = -I$(FLUX_SCHED_ROOT) @@ -41,7 +41,7 @@ image: ## Build the scheduler container image .PHONY: test-image test-image: ## Build the scheduler container image - docker build -t $(TEST_IMG)-test . + docker build -t $(TEST_IMG) . docker push $(TEST_IMG) .PHONY: test-image-deploy diff --git a/README.md b/README.md index 50849dc..0068322 100644 --- a/README.md +++ b/README.md @@ -2,123 +2,270 @@ ![img/fluence.png](img/fluence.png) -🚧 **UNDER DEVELOPMENT** 🚧 not ready for production use! I rolled back features since the recorded demo, and am going to add them back with proper testing. I have not finished this yet, but anticipate later in the week of 6/16/2026. Thank you for your patience! -@vsoch - -A Kubernetes scheduler plugin that places **pod groups** (and individual pods) -by matching them against a [Fluxion](https://github.com/flux-framework/flux-sched) -(flux-sched) resource graph built from the live cluster. - -This is an update from [flux-k8s](https://github.com/flux-framework/flux-k8s) -that uses the native PodGroup and optionally allows for scheduling -against arbitrary resources such as **quantum resources** modeled in the same graph. -I am also improving the design by not requiring a sidecar for fluence, and not -requiring the `kubernetes-sigs/scheduler-plugins` dependency. We use native Gang -scheduling provided by Kubernetes. - -For quantum resource modeling, we start from the prototype proven out in +🚧 **UNDER DEVELOPMENT** 🚧 Thank you for your patience! -@vsoch + +A Kubernetes scheduler plugin that places **pod groups** (and individual pods) by +matching them against a [Fluxion](https://github.com/flux-framework/flux-sched) +resource graph built from the live cluster. Beyond ordinary compute, it can model +**arbitrary virtual resources** — quantum backends today, anything with a count +and some attributes tomorrow — as first-class graph vertices that can be filtered +on and whose attributes are injected into the workload's environment. Nothing in +the design is quantum-specific: a resource `type` is an opaque string throughout. + +This updates [flux-k8s](https://github.com/flux-framework/flux-k8s) to use the +native Kubernetes PodGroup (Gang) API — no sidecar, and no +`kubernetes-sigs/scheduler-plugins` dependency. The quantum resource model is +proven out first in [fluxion-quantum](https://github.com/converged-computing/fluxion-quantum). -## How it works - -### Gang Scheduling - -Gang semantics (all-or-nothing) come from the native `PodGroup` API. Fluence is -responsible only for **placement**: - -1. **Discover** — on startup fluence lists cluster nodes and turns their - cpu/memory/gpu capacity into a Fluxion JGF resource graph - (`pkg/cluster` + `pkg/jgf`). If a resources config is provided (via - `FLUENCE_RESOURCES`), its entries (e.g. quantum backends) are injected as - `qpu`/`qubit` vertices. With no config the graph is classical-only. -2. **Match** — when the first pod of a group hits `PreFilter`, fluence builds a - Fluxion jobspec for the whole gang (`pkg/placement.JobspecForGroup`), asks the - matcher to allocate (`pkg/graph.FluxionGraph.MatchAllocateSpec`), and parses - the allocation into node and backend names (`PlacementFromAllocation`). -3. **Place** — `Filter` permits each pod only on its allocated node. (A - quantum-only pod allocates a `qpu` but no node — the backend is a remote API - any node can reach — so fluence imposes no node constraint in that case.) -4. **Hand off** — for a quantum pod, `PreBind` records the allocated backend on - the pod as the `fluence.flux-framework.org/backend` annotation. The mutating - webhook (installed with the base) injects a downward-API env so the container - reads it as `QRMI_BACKEND` with no boilerplate in the manifest. - -### Design Choices - -While Quantum resources are this first target, notably we should be able to support -any arbitrary resource in the graph. I decided that a pod can request a graph resource generically -e.g., `fluxion.flux-framework.org/` (like `.../qpu: "1"`) and that becomes a jobspec count -of ``. To support this, we deploy a **device plugin** that can advertise these virtual -types on every node. We need to do this because of the in-tree `NodeResourcesFit` endpoint. -If we do not have the device plugin, this call will not be satisfied. Note that -this device plugin will return True for any resources it sees added to the Fluxion resource graph, -but is not actually involved with scheduling. Fluxion does the real matching. +## How the pieces fit together ```console -nodes (kubectl get nodes) ──┐ - ├─► JGF resource graph ─► Fluxion match ─► node + backend placement -fluence-resources ConfigMap ┘ + resources.yaml --+ + +--------------+-------------------------------+ + v v v +scheduler device plugin webhook +(pkg/fluence) (advertises types) (injects env) + | + | build graph (pkg/cluster -> pkg/jgf) + | generate jobspecs (pkg/placement -> pkg/jobspec) + | match + cancel via Fluxion (pkg/graph, cgo) + | parse allocation (pkg/graph, pkg/placement) + v + pod bound to a node + backend/attrs stamped as annotations + | + v + webhook-injected env reads those annotations +``` + +A single `resources.yaml` is the source of truth, read by three independent +processes: the **scheduler** builds the graph and validates requests, the **device +plugin** advertises the requestable types as Kubernetes extended resources (so +pods requesting them pass admission), and the **webhook** computes the +environment-variable contract it injects into workloads. Note that the device +plugin can be removed from the design, assuming we do not need a `NodeResourcesFit` check. +I have kept it for now anticipating future cases where we have node-specific resources. +For now, the counts can be viewed as API quotas. + +For one pod group the scheduler generates one or more jobspecs, match-allocates +each against the graph (all-or-nothing), combines them into a placement, binds the +pod, and stamps the matched backend + attributes as annotations. The webhook has +already injected downward-API env vars that read those annotations, so the +workload sees which backend and attributes it got. + +## The resource model + +**Every allocatable thing in the graph is a `node`-typed vertex** -- physical +compute nodes and every level of every virtual resource alike. This is deliberate: +Fluxion's RFC 31 property constraints (the only attribute-based pruning the matcher +offers) apply **only** to `node` and `storage_node` vertices, so modeling a virtual +resource as a node is what makes it filterable. + +- **Physical nodes** carry `virtual=false`. +- **Virtual resources** carry `virtual=true`, a `class=` for the resource's + own type *and each descendant type in its subtree*, and their attributes. + +Property values are encoded into the property **key** (`virtual=true`, +`class=qpu`, `region=us-east-1`) because RFC 31 matching is key-presence only -- it +never compares the value half. A resource is selected by a `class=` +constraint, so any level is independently requestable (`class=qdevice`, +`class=qpu`, `class=qubit`). Descendant classes are propagated **up** so a global +constraint selecting a nested type isn't pruned at an ancestor node on the way +down; attributes are inherited **down** (a child gets its parent's attributes +unless it overrides or clears a key) so an attribute filter combined with a nested +class still reaches the target. + +Because a jobspec carries one global constraint, compute (`virtual=false`) and a +virtual resource (`virtual=true`) cannot be co-selected in one match -- one would +prune the other. A pod needing both produces **two** match-allocate calls, held +together all-or-nothing. + +--- + +## Components + +### `pkg/jgf` — JGF graph builder + +Builds the JSON Graph Format document Fluxion consumes. `AddRoot`/`AddChild` +create vertices with `Options` (size, unit, exclusivity, rank, properties). +`Options.NodeProperties` are RFC 31 resource properties (a nested `properties` +object the matcher prunes against); `Options.Properties` are descriptive metadata +only. `Options.Rank` sets a real execution rank — every `node` vertex needs one. + +### `pkg/cluster` — config schema and graph construction + +Parses the resource config and turns it (plus the live cluster nodes) into a graph. + +```yaml +resources: + - type: qdevice # opaque type; the resource's class + name: rigetti_cepheus # vertex name / backend identity + parent: cluster # where it attaches (default "cluster") + attributes: aws-east # inline map OR a reference into the registry below + with: # recursive children, same schema + - type: qpu + count: 1 + with: + - type: qubit + count: 80 + +attributes: # named attribute-set registry (reuse across backends) + aws-east: + region: us-east-1 + connectivity: all-to-all ``` -I am also choosing to keep credentials and qrmi interactions on the level of the application. -I am not comfortable with the design of an operator holding any kind of credential or being -responsible for managing calls with qrmi in a multi-tenant environment. Finally, since -there are (and will continue to be) a lot of environment variables that I do not want -to place on the user to define, we have a webhook to handle this. We can combine an annotation -added with the webhook with a PreBind call to define the annotation to orchestrate that. +`LoadResourcesConfig` parses YAML/JSON, resolves attributes (inline or by +reference) with downward inheritance, and errors on a missing `type`, an unknown +reference, or a non-empty config that defines no `resources:` (a schema mismatch, +so it fails loudly instead of building an empty graph). `BuildGraph` emits each +cluster node as a `virtual=false` node, then appends the resource trees: every +resource at every depth becomes a `virtual=true` node carrying its class set and +inherited attributes, on a real rank from a counter shared with the physical +nodes. `FluxionResourceNames` returns every requestable type (what the device +plugin advertises); `AttributeKeys` returns the union of attribute keys (the +webhook's env contract). + +### `pkg/jobspec` — jobspec types + +The Fluxion jobspec representation (`Jobspec`, `Resource`, `Task`) with YAML/JSON +round-tripping: a `resources` tree (a `slot` holding the request), an `attributes` +block (constraint + duration), and `tasks`. + +### `pkg/placement` — jobspec generation and allocation parsing + +`JobspecsForGroup` turns a pod group into the jobspecs to match: + +- Always one **compute** jobspec — a slot per pod holding core/memory/gpu, + constrained to `virtual=false`. A classical group produces only this (one match). +- One **device** jobspec per requested virtual type, requesting a `node` + constrained to `virtual=true` + `class=`. The class selects which virtual + node; the count comes from the request. +- Every pod gets at least one core (a device-only pod still needs a host). +- Every jobspec sets `duration: 0` — held until explicitly cancelled. +- A request for an unmodeled type is a hard error. + +Jobspecs are submitted as **JSON** (not YAML): the constraint parser requires +quoted property scalars, and JSON always quotes. + +`PlacementFromAllocation` classifies an allocation's node vertices by the +`virtual` marker — `virtual=false`/unmarked are compute bind targets, a +`virtual=true` node is the backend identity, and its `fluxion.flux-framework.org/` +properties are decomposed into attributes for env injection. This package also +owns the shared names: the `fluxion.flux-framework.org/` request prefix, the +`fluence.flux-framework.org/{backend,jobid,attr-*}` annotations, and the +`FLUXION_` env prefix. + +### `pkg/graph` — Fluxion binding (cgo) + +Wraps the cgo matcher (`FluxionGraph`: `Init`, `MatchAllocateSpec`, `Cancel`, +`Satisfy`) and parses allocations. Fluence uses the **jgf** match format: it emits +every allocated vertex with its properties regardless of type, so a virtual +allocation that bottoms out in nodes (no cores) still serializes — which rv1 +cannot. `NodesFromAllocation` returns node vertices with their properties for the +marker-based classification. This is the only cgo-dependent package, so it gates +local builds of everything importing it. + +### `pkg/fluence` — the scheduler plugin + +`New` lists the cluster, loads the config, builds and logs the graph, and inits the +matcher. + +- **PreFilter** runs per group: generate jobspecs, then `matchGroup` runs each as + an independently held allocation, **all-or-nothing** — any failure cancels the + successes, so the group never holds a partial set. +- **Filter** permits only the nodes Fluxion assigned. +- **PreBind** records durable state: the jobids on the owning object (PodGroup for + a gang, else the pod) for cancellation, and the matched backend + attributes as + annotations the webhook reads. +- **Cancellation** is informer-driven (no framework delete hook): deleting a + PodGroup or ungrouped pod frees its held allocations. The jobid annotation is the + durable source of truth; the graph (and allocations) are rebuilt on restart from + the same annotations. + +Gang semantics are delegated to the native PodGroup API; fluence only places. + +### `pkg/deviceplugin` — extended-resource advertisement + +Advertises each requestable type (`fluxion.flux-framework.org/`) as a +counted extended resource, so a pod requesting one passes `NodeResourcesFit` +admission. The real gating is Fluxion (and the backend's own limits); since a +virtual backend is reachable from any node, each type is advertised at a large +ceiling. Types come from the same config as the graph, so they can't drift. + +### `pkg/webhook` — environment injection + +A mutating webhook that surfaces scheduler-chosen values to a workload. Container +env is fixed at creation but the match happens after admission, so it injects +**downward-API** env vars whose values populate later from the annotations PreBind +writes. The injected set is a **config-derived contract**: `FLUXION_BACKEND` plus +one `FLUXION_` per attribute key across all backends — add an attribute and +its env var appears, no code change. The webhook self-manages TLS (generates a CA +and patches its own `caBundle`), so no cert-manager is needed. A vendor-agnostic +workload reads these normalized names regardless of which backend it matched. + +## Commands + +- `cmd/fluence` — the scheduler binary (stock kube-scheduler + the plugin). +- `cmd/deviceplugin` — the extended-resource DaemonSet. +- `cmd/webhook` — the env-injection webhook. +- `cmd/recovery-probe` — verifies allocation replay survives a graph rebuild + (what a restart does); see `make test-restore`. Note this was implemented but removed because the code in fluxion is only part of a PR branch, and I feel nervous about depending on it. + +## Configuration + +These are environment variables for fluence. + +| Env var | Read by | Meaning | +|---|---|---| +| `FLUENCE_RESOURCES` | scheduler, device plugin, webhook | path to `resources.yaml`; absent = classical-only | +| `FLUENCE_MATCH_POLICY` | scheduler | Fluxion match policy (default `first`) | +| `FLUENCE_RESOURCE_CAPACITY` | device plugin | per-node ceiling per type (default 1000) | + +## Observability + +The scheduler logs (prefix `[fluence]`) the full graph and known devices at +startup and, per match, the submitted jobspec, the raw Fluxion allocation, and the +parsed placement. The webhook logs (`[fluence-webhook]`) the env contract at +startup. Because live behavior (cgo matcher, real Kubernetes) can't be fully +unit-tested, these logs are the primary debugging surface. ## Build -The scheduler binary links flux-sched (the matcher). It does **not** link QRMI — -quantum job submission lives in a separate workload container +The scheduler links flux-sched (the matcher). It does **not** link QRMI or any +quantum backend — quantum job submission lives in a separate workload container ([qrmi-sampler](https://github.com/converged-computing/qrmi-sampler)), not here. ```bash # Inside the .devcontainer (flux-sched at /opt/flux-sched): -# builds bin/fluence (cgo+flux) + bin/fluence-deviceplugin + bin/fluence-webhook -make build +make build # bin/fluence (cgo+flux) + bin/fluence-deviceplugin + bin/fluence-webhook make test - -# Or build the container image (all three binaries): -make image +make image # or build the container image with all three binaries ``` ## Deploy -Create a development cluster on a Kubernetes release that supports native gang -scheduling, with the feature gates enabled: +Create a cluster on a Kubernetes release with native gang scheduling and the +feature gates the kind config enables (`GangScheduling`, `GenericWorkload`, the +`scheduling.k8s.io/v1alpha2` API group): ```bash kind create cluster --image kindest/node:v1.36.1 --config deploy/kind-config.yaml -``` - -(See [installing kind](https://kind.sigs.k8s.io/docs/user/quick-start#installing-from-release-binaries).) -The kind config turns on the `GangScheduling` and `GenericWorkload` feature gates -and the `scheduling.k8s.io/v1alpha2` API group on the apiserver and scheduler. In -the future these will likely be enabled by default. - -Load the image (built above) into the cluster: - -```bash kind load docker-image ghcr.io/converged-computing/fluence:latest ``` -### 1. Gang Scheduling - -Install the **base** scheduler (this is all you need for classical scheduling — -no device plugin, no quantum): +### 1. Gang scheduling (classical — all you need for non-quantum) ```bash -kubectl apply -f deploy/fluence.yaml +kubectl apply -f deploy/fluence.yaml # scheduler, RBAC, and the webhook ``` -This installs the scheduler, its RBAC, and the mutating webhook. Pods opt in with -`schedulerName: fluence`; a multi-pod gang adds a `scheduling.k8s.io/pod-group` -label (a single pod is treated as a group of one and needs no label). Test with a pod group: +Pods opt in with `schedulerName: fluence`; a multi-pod gang adds a +`scheduling.k8s.io/pod-group` label (a single pod is a group of one, no label +needed). ```bash kubectl apply -f examples/podgroup.yaml -kubectl get pods -o wide -kubectl get events --field-selector reason=Scheduled kubectl get podgroups.scheduling.k8s.io ``` ```console @@ -126,119 +273,81 @@ NAME POLICY WORKLOAD STATUS AGE training Gang Scheduled 15s ``` -And a quick cleanup. +Cleanup: ```bash kubectl patch podgroup training -n default --type=merge -p '{"metadata":{"finalizers":null}}' kubectl delete -f examples/podgroup.yaml ``` -### 2. Quantum +### 2. Quantum (the resources add-on) -Quantum needs the resources add-on, which supplies the `fluence-resources` -ConfigMap (the single source of truth for which backends exist) **and** the -device plugin that advertises them: +This supplies the `fluence-resources` ConfigMap (the source of truth for which +backends exist) and the device plugin that advertises them: ```bash kubectl apply -f deploy/fluence-resources.yaml kubectl apply -f deploy/device-plugin.yaml -# The scheduler reads its resources config at startup, so restart it to pick up -# the quantum vertices: +# The scheduler and webhook read the config at startup — restart to pick it up: kubectl rollout restart deployment/fluence -n kube-system +kubectl rollout restart deployment/fluence-webhook -n kube-system ``` -Confirm the device plugin advertised the resources on the nodes: +Confirm the resources are advertised: ```bash kubectl get nodes -o jsonpath='{range .items[*]}{.metadata.name}{"\t"}{.status.allocatable}{"\n"}{end}' \ | grep fluxion.flux-framework.org ``` -```console -kind-control-plane {"cpu":"16","ephemeral-storage":"982292956Ki","fluxion.flux-framework.org/qpu":"1k","fluxion.flux-framework.org/qubit":"1k","hugepages-1Gi":"0","hugepages-2Mi":"0","memory":"61400748Ki","pods":"110"} -kind-worker {"cpu":"16","ephemeral-storage":"982292956Ki","fluxion.flux-framework.org/qpu":"1k","fluxion.flux-framework.org/qubit":"1k","hugepages-1Gi":"0","hugepages-2Mi":"0","memory":"61400748Ki","pods":"110"} -kind-worker2 {"cpu":"16","ephemeral-storage":"982292956Ki","fluxion.flux-framework.org/qpu":"1k","fluxion.flux-framework.org/qubit":"1k","hugepages-1Gi":"0","hugepages-2Mi":"0","memory":"61400748Ki","pods":"110"} -``` -Create the IBM credentials the **workload** uses to submit (in the namespace -where the workload runs — the scheduler itself never needs them): +Create the IBM credentials the **workload** uses to submit (the scheduler never +needs them): ```bash -# If you don't have this yet -curl -fsSL https://clis.cloud.ibm.com/install/linux | sudo sh ibmcloud login --apikey -# 12 for us-east -``` -```bash export IBM_CLOUD_TOKEN= -export IBM_CLOUD_CRN=$(ibmcloud resource service-instances --service-name quantum-computing --output json | jq -r '.[] | {name: .name, crn: .crn}' | jq -r .crn) +export IBM_CLOUD_CRN=$(ibmcloud resource service-instances --service-name quantum-computing --output json | jq -r '.[0].crn') +kubectl create secret generic ibm-quantum -n default \ + --from-literal=token="$IBM_CLOUD_TOKEN" --from-literal=crn="$IBM_CLOUD_CRN" ``` -```bash -kubectl create secret generic ibm-quantum -n default --from-literal=token="$IBM_CLOUD_TOKEN" --from-literal=crn="$IBM_CLOUD_CRN" -``` - -Run a single quantum pod. It just requests `fluxion.flux-framework.org/qpu` — no -group, and no hard-coded backend (the webhook + PreBind supply `QRMI_BACKEND`): +Run a single quantum pod — it just requests `fluxion.flux-framework.org/qpu`, with +no hard-coded backend (the webhook + PreBind supply `FLUXION_BACKEND`): ```bash kubectl apply -f examples/quantum-pod.yaml -kubectl get pod sampler -o wide -# fluence's chosen backend, injected as an environment variable: +# fluence's chosen backend, also injected as $FLUXION_BACKEND in the container: kubectl get pod sampler -o jsonpath='{.metadata.annotations.fluence\.flux-framework\.org/backend}{"\n"}' kubectl logs sampler ``` ```console -kubectl logs sampler -f 2026/06/06 19:04:38 submitting sampler job to ibm_marrakesh -{"results": [{"data": {"c": {"samples": ["0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x0", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0"], "num_bits": 1}}, "metadata": {"circuit_metadata": {}}}], "metadata": {"execution": {"execution_spans": [[{"date": "2026-06-06T19:04:43.221657"}, {"date": "2026-06-06T19:04:44.372421"}, {"0": [[256], [0, 1], [0, 256]]}]]}, "version": 2}} +{"results": [ ... samples ... ]} 2026/06/06 19:04:50 done: 2070 bytes from ibm_marrakesh ``` -Boum! You will see in the fluence logs that when the pod completes, the fluxion job is cancelled, freeing the resources. - -```bash -kubectl logs -n kube-system fluence-75d6848778-g4lh6 -... -I0610 18:33:05.843325 1 eventhandlers.go:443] "Delete event for scheduled pod" pod="default/sampler" - 🌀 Cancel jobid: 1 -(env) (base) vanessa@vanessa-ThinkPad-P14s-Gen-4:~/Desktop/Code/fluence$ kubectl get pods -NAME READY STATUS RESTARTS AGE -sampler 0/1 Completed 0 24s -``` - -### A note on deletion +When the pod completes, fluence cancels the Fluxion allocation, freeing the +resources (visible in the scheduler logs as `Cancel jobid: N`). Note that I lost access to +my IBM account so this has not been tested live since mid June. -When developing/debugging, a PodGroup (or its pods) can hang on delete because of -finalizers (the workload controller may not be running). Our plugin is designed to handle this, -and normally it just takes a little time to finish. If you are impatient, you can do: +Submission is **not** done by the scheduler — the workload container holds the +user's credentials and submits via qrmi-go. Fluence only schedules and hands off +the backend. (When we control local quantum devices this will change.) -```bash -kubectl patch podgroup training -n default --type=merge -p '{"metadata":{"finalizers":null}}' -``` - -### Restoring State - -The plugin is designed to be able to come up and restore state, meaning we can read in existing groups -and repopulate the graph. We do that by way of annotating each group with the resources "R" that fluxion returns on a match, -and then in the case of a restart, we re-populate the graph using this metadata. Here is how to test that. - -```bash -make test-restore -``` +### Notes -Importantly, submission is **not** done by the scheduler — the workload container holds the -user's credentials and submits via qrmi-go (job mode on the IBM open plan; see -fluxion-quantum for that story). Fluence only schedules and hands off the backend. -When we actually have control of local quantum devices this will be different. +- **Deletion hangs.** A PodGroup can hang on delete via finalizers if the workload + controller isn't running; clear them with the `kubectl patch` above. +- **State restore.** On restart the plugin repopulates the graph from each group's + jobid annotations and re-holds the allocations: `make test-restore`. ## License -HPCIC DevTools is distributed under the terms of the MIT license. -All new contributions must be made under this license. +Distributed under the MIT license; all contributions must be made under it. -See [LICENSE](LICENSE), [COPYRIGHT](COPYRIGHT), and [NOTICE](NOTICE) for details. +See [LICENSE](LICENSE), [COPYRIGHT](COPYRIGHT), and [NOTICE](NOTICE). SPDX-License-Identifier: MIT -LLNL-CODE-842614 +LLNL-CODE-842614 \ No newline at end of file diff --git a/cmd/deviceplugin/main.go b/cmd/deviceplugin/main.go index 8e88e6b..29cb55f 100644 --- a/cmd/deviceplugin/main.go +++ b/cmd/deviceplugin/main.go @@ -40,11 +40,11 @@ func main() { var names []string if data, err := os.ReadFile(cfgPath); err == nil { - qc, perr := cluster.LoadQuantumConfig(data) + rc, perr := cluster.LoadResourcesConfig(data) if perr != nil { log.Fatalf("parse resources config %s: %v", cfgPath, perr) } - names = cluster.FluxionResourceNames(qc.Backends) + names = cluster.FluxionResourceNames(rc.Resources) log.Printf("derived %d resource(s) from %s: %v", len(names), cfgPath, names) } else { log.Printf("no resources config at %s (%v); advertising nothing", cfgPath, err) diff --git a/cmd/webhook/main.go b/cmd/webhook/main.go index bc5f816..20fac0d 100644 --- a/cmd/webhook/main.go +++ b/cmd/webhook/main.go @@ -17,6 +17,7 @@ import ( "os" "time" + "github.com/converged-computing/fluence/pkg/cluster" "github.com/converged-computing/fluence/pkg/webhook" "k8s.io/client-go/kubernetes" @@ -66,8 +67,27 @@ func main() { cancel() log.Printf("patched caBundle on MutatingWebhookConfiguration %q", cfgName) + // The env contract is the union of attribute keys across the configured + // backends (plus FLUXION_BACKEND), so the set of injected env vars tracks the + // config automatically. Loaded from the same FLUENCE_RESOURCES the scheduler + // and device plugin use; absent/unset means just FLUXION_BACKEND. + var attrKeys []string + if path := os.Getenv("FLUENCE_RESOURCES"); path != "" { + if data, rerr := os.ReadFile(path); rerr == nil { + rc, perr := cluster.LoadResourcesConfig(data) + if perr != nil { + log.Fatalf("parse resources config %s: %v", path, perr) + } + attrKeys = cluster.AttributeKeys(rc.Resources) + } else { + log.Printf("no resources config at %s (%v); injecting FLUXION_BACKEND only", path, rerr) + } + } + mutator := &webhook.Mutator{AttributeKeys: attrKeys} + log.Printf("[fluence-webhook] env contract injected into fluxion pods: %v", mutator.EnvVarNames()) + mux := http.NewServeMux() - mux.HandleFunc("/mutate", webhook.Handler) + mux.HandleFunc("/mutate", mutator.Handler) mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) }) srv := &http.Server{ diff --git a/deploy/fluence-resources-test.yaml b/deploy/fluence-resources-test.yaml index d4fed97..b584bfb 100644 --- a/deploy/fluence-resources-test.yaml +++ b/deploy/fluence-resources-test.yaml @@ -22,13 +22,30 @@ metadata: namespace: kube-system data: resources.yaml: | - backends: - - name: ibm_fez - num_qubits: 156 - vendor: ibm - qrmi_type: qiskit-runtime-service - - name: ibm_marrakesh - num_qubits: 156 + # New generic schema: each backend is a resource tree attached under the + # cluster. Modeled as a qdevice carrying a requestable qpu child; attributes + # become filterable graph properties AND injected env (FLUXION_). + resources: + - type: qdevice + name: ibm_fez + attributes: ibm + with: + - type: qpu + count: 1 + with: + - type: qubit + count: 156 + - type: qdevice + name: ibm_marrakesh + attributes: ibm + with: + - type: qpu + count: 1 + with: + - type: qubit + count: 156 + attributes: + ibm: vendor: ibm qrmi_type: qiskit-runtime-service --- @@ -53,7 +70,7 @@ spec: - operator: Exists # run on every node, including tainted/control-plane containers: - name: deviceplugin - image: ghcr.io/converged-computing/fluence:test + image: vanessa/fluence:test command: ["/bin/fluence-deviceplugin"] env: - name: FLUENCE_RESOURCES @@ -73,4 +90,4 @@ spec: path: /var/lib/kubelet/device-plugins - name: resources configMap: - name: fluence-resources + name: fluence-resources \ No newline at end of file diff --git a/deploy/fluence-resources.yaml b/deploy/fluence-resources.yaml index 973d586..c11fea3 100644 --- a/deploy/fluence-resources.yaml +++ b/deploy/fluence-resources.yaml @@ -22,12 +22,29 @@ metadata: namespace: kube-system data: resources.yaml: | - backends: - - name: ibm_fez - num_qubits: 156 - vendor: ibm - qrmi_type: qiskit-runtime-service - - name: ibm_marrakesh - num_qubits: 156 + # New generic schema: each backend is a resource tree attached under the + # cluster. Modeled as a qdevice carrying a requestable qpu child; attributes + # become filterable graph properties AND injected env (FLUXION_). + resources: + - type: qdevice + name: ibm_fez + attributes: ibm + with: + - type: qpu + count: 1 + with: + - type: qubit + count: 156 + - type: qdevice + name: ibm_marrakesh + attributes: ibm + with: + - type: qpu + count: 1 + with: + - type: qubit + count: 156 + attributes: + ibm: vendor: ibm qrmi_type: qiskit-runtime-service diff --git a/deploy/fluence-test.yaml b/deploy/fluence-test.yaml index 525713e..0eb6f8a 100644 --- a/deploy/fluence-test.yaml +++ b/deploy/fluence-test.yaml @@ -124,9 +124,9 @@ spec: serviceAccountName: fluence containers: - name: fluence - image: ghcr.io/converged-computing/fluence:test + image: vanessa/fluence:test # Allows for kind load - # imagePullPolicy: Never + imagePullPolicy: Never command: - /bin/fluence - --config=/etc/fluence/scheduler-config.yaml @@ -173,9 +173,9 @@ spec: serviceAccountName: fluence containers: - name: webhook - image: ghcr.io/converged-computing/fluence:test + image: vanessa/fluence:test # Allows for kind load - # imagePullPolicy: Never + imagePullPolicy: Never command: ["/bin/fluence-webhook"] ports: - containerPort: 8443 diff --git a/examples/test/e2e/quantum-pod-mock-2.yaml b/examples/test/e2e/quantum-pod-mock-2.yaml index 5286fe0..e8c443b 100644 --- a/examples/test/e2e/quantum-pod-mock-2.yaml +++ b/examples/test/e2e/quantum-pod-mock-2.yaml @@ -11,7 +11,7 @@ spec: containers: - name: sampler image: busybox - command: ["sh", "-c", "echo BACKEND=$QRMI_BACKEND; sleep 3600"] + command: ["sh", "-c", "echo BACKEND=$FLUXION_BACKEND; sleep 3600"] resources: requests: fluxion.flux-framework.org/qpu: "1" diff --git a/examples/test/e2e/quantum-pod-mock.yaml b/examples/test/e2e/quantum-pod-mock.yaml index 92849dd..3290611 100644 --- a/examples/test/e2e/quantum-pod-mock.yaml +++ b/examples/test/e2e/quantum-pod-mock.yaml @@ -13,7 +13,7 @@ spec: - name: sampler image: busybox # Print the injected backend, then idle so we can assert on it. - command: ["sh", "-c", "echo BACKEND=$QRMI_BACKEND; sleep 3600"] + command: ["sh", "-c", "echo BACKEND=$FLUXION_BACKEND; sleep 3600"] resources: requests: fluxion.flux-framework.org/qpu: "1" diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 6bb572a..56a845b 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -1,45 +1,25 @@ // Package cluster builds a Fluxion resource graph from the live Kubernetes -// cluster. Traditional compute (cpu/memory/gpu) is discovered from node -// capacity; virtual quantum resources are injected from configuration so the -// same graph can carry both classical and quantum vertices. +// cluster. Physical compute (cpu/memory/gpu) is discovered from node capacity; +// virtual resources (e.g. quantum backends, but nothing here is quantum-specific) +// are injected from a generic resource-tree configuration so the same graph can +// carry both physical and virtual vertices. +// +// Model: every allocatable thing is a "node" vertex. Physical nodes carry the +// property virtual=false; configured virtual resources carry virtual=true plus +// their configured attributes as RFC 31 properties. A pod's jobspec constrains +// virtual=false for the compute it lands on and virtual=true for any virtual +// device it requests, so the two are matched from disjoint vertex sets out of a +// single shared rank space. package cluster import ( - "fmt" - "sort" - "github.com/converged-computing/fluence/pkg/jgf" - "github.com/converged-computing/fluence/pkg/placement" corev1 "k8s.io/api/core/v1" - "sigs.k8s.io/yaml" ) // DefaultGPUResource is the resource name GPUs are advertised under. const DefaultGPUResource = "nvidia.com/gpu" -// QuantumBackend describes a virtual quantum resource to model in the graph. -// The Name becomes the qpu vertex name (and the QRMI backend the job runs on). -type QuantumBackend struct { - Name string `json:"name"` - NumQubits int `json:"num_qubits,omitempty"` - Vendor string `json:"vendor,omitempty"` - QRMIType string `json:"qrmi_type,omitempty"` -} - -// QuantumConfig is the on-disk config that adds quantum resources to the graph. -type QuantumConfig struct { - Backends []QuantumBackend `json:"backends"` -} - -// LoadQuantumConfig reads a YAML or JSON list of quantum backends. -func LoadQuantumConfig(data []byte) (*QuantumConfig, error) { - var c QuantumConfig - if err := yaml.Unmarshal(data, &c); err != nil { - return nil, fmt.Errorf("parse quantum config: %w", err) - } - return &c, nil -} - // Options configures graph construction. type Options struct { // ClusterName is the root vertex name (default "cluster"). @@ -47,14 +27,20 @@ type Options struct { // GPUResource is the resource name GPUs are advertised under // (default DefaultGPUResource). GPUResource corev1.ResourceName - // Quantum backends to inject under a qgateway. - Quantum []QuantumBackend + // Resources are the configured virtual resource trees to inject. + Resources []Resource // IncludeUnschedulable includes cordoned nodes (default false). IncludeUnschedulable bool } -// BuildGraph turns cluster nodes (plus any configured quantum backends) into a +// BuildGraph turns cluster nodes (plus any configured virtual resources) into a // Fluxion JGF resource graph, returned as JSON ready for FluxionGraph.Init. +// +// Physical nodes are built first, then the configured resource trees are +// appended under their parent vertices. A single contiguous rank counter spans +// physical and virtual node vertices: every node-typed vertex gets a real rank +// (the rv1 writer requires it), and the virtual=true/false property keeps the +// two sets apart at match time. func BuildGraph(nodes []corev1.Node, opts Options) ([]byte, error) { b := jgf.NewBuilder() @@ -69,15 +55,35 @@ func BuildGraph(nodes []corev1.Node, opts Options) ([]byte, error) { cluster := b.AddRoot("cluster", "cluster", jgf.Options{Name: clusterName}) + // rank is the shared, contiguous execution-rank counter across all node + // vertices (physical first, then virtual). + var rank int64 + + // parents maps a vertex name to its builder handle so a configured resource + // tree can attach under a named parent (default the cluster root). + parents := map[string]*jgf.Vertex{clusterName: cluster} + for i := range nodes { n := &nodes[i] if n.Spec.Unschedulable && !opts.IncludeUnschedulable { continue } + // Control-plane nodes are typically tainted (NoSchedule) rather than + // cordoned, so the Unschedulable check above does not catch them. Skip by + // taint, which is name- and type-independent (unlike matching the node + // name), unless the caller opts to include unschedulable nodes. + if isControlPlane(n) && !opts.IncludeUnschedulable { + continue + } + r := rank + rank++ nodeV := b.AddChild(cluster, "node", "node", jgf.Options{ - Name: n.Name, - Properties: map[string]any{"hostname": n.Name}, + Name: n.Name, + Rank: &r, + Properties: map[string]any{"hostname": n.Name}, + NodeProperties: map[string]string{ComposeProperty(VirtualProperty, "false"): ""}, }) + parents[n.Name] = nodeV if cpu := count(n, corev1.ResourceCPU); cpu > 0 { b.AddChild(nodeV, "core", "core", jgf.Options{Size: cpu}) @@ -90,64 +96,28 @@ func BuildGraph(nodes []corev1.Node, opts Options) ([]byte, error) { } } - if len(opts.Quantum) > 0 { - AddQuantum(b, cluster, opts.Quantum) + if err := appendResources(b, parents, clusterName, opts.Resources, &rank); err != nil { + return nil, err } return b.JSON() } -// FluxionResourceNames returns the distinct extended-resource names a device -// plugin should advertise for a set of quantum backends. It uses the SAME -// type-derivation rule as AddQuantum — each backend is a `qpu`, and a backend -// with num_qubits > 0 contributes `qubit` — so the device plugin's advertised -// resources and the graph's resource types are derived from one config and -// cannot drift. Names are prefixed with placement.FluxionResourcePrefix so they -// match what the scheduler strips off a pod request. -func FluxionResourceNames(backends []QuantumBackend) []string { - types := map[string]bool{} - if len(backends) > 0 { - types["qpu"] = true - } - for _, b := range backends { - if b.NumQubits > 0 { - types["qubit"] = true - } - } - names := make([]string, 0, len(types)) - for t := range types { - names = append(names, placement.FluxionResourcePrefix+t) - } - sort.Strings(names) - return names +// controlPlaneTaints are the well-known taint keys Kubernetes places on +// control-plane nodes. They are tainted NoSchedule rather than cordoned, so a +// taint check (not an Unschedulable check) is what excludes them. +var controlPlaneTaints = map[string]bool{ + "node-role.kubernetes.io/control-plane": true, + "node-role.kubernetes.io/master": true, } -// AddQuantum injects a qgateway under the cluster with one qpu vertex per -// backend. Exposed so a graph built elsewhere can be augmented the same way. -func AddQuantum(b *jgf.Builder, cluster *jgf.Vertex, backends []QuantumBackend) { - gw := b.AddChild(cluster, "qgateway", "qgateway", jgf.Options{ - Properties: map[string]any{"vendor": "ibm"}, - }) - for _, be := range backends { - props := map[string]any{"qrmi_type": orDefault(be.QRMIType, "qiskit-runtime-service")} - if be.NumQubits > 0 { - props["num_qubits"] = be.NumQubits - } - if be.Vendor != "" { - props["vendor"] = be.Vendor - } - qpu := b.AddChild(gw, "qpu", "qpu", jgf.Options{ - Name: be.Name, - Exclusive: true, - Properties: props, - }) - // Model qubits as a counted child so a request for N qubits matches a - // backend with at least that many (Fluxion count matching is >=). This - // is how the numeric "at least N qubits" ask is expressed without a - // numeric constraint (RFC 31 properties are boolean tags, not >=). - if be.NumQubits > 0 { - b.AddChild(qpu, "qubit", "qubit", jgf.Options{Size: int64(be.NumQubits)}) +// isControlPlane reports whether a node carries a control-plane taint. +func isControlPlane(n *corev1.Node) bool { + for i := range n.Spec.Taints { + if controlPlaneTaints[n.Spec.Taints[i].Key] { + return true } } + return false } // count reads an integer resource count, preferring allocatable over capacity. @@ -172,10 +142,3 @@ func memoryMB(n *corev1.Node) int64 { } return q.Value() / (1024 * 1024) } - -func orDefault(s, def string) string { - if s == "" { - return def - } - return s -} \ No newline at end of file diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go new file mode 100644 index 0000000..26a8a99 --- /dev/null +++ b/pkg/cluster/cluster_test.go @@ -0,0 +1,40 @@ +package cluster + +import ( + "testing" + + corev1 "k8s.io/api/core/v1" +) + +// isControlPlane detects control-plane nodes by taint, independent of node name +// or type, and only for the well-known control-plane/master taint keys. +func TestIsControlPlane(t *testing.T) { + cp := &corev1.Node{} + cp.Spec.Taints = []corev1.Taint{ + {Key: "node-role.kubernetes.io/control-plane", Effect: corev1.TaintEffectNoSchedule}, + } + if !isControlPlane(cp) { + t.Error("expected control-plane node (control-plane taint) to be detected") + } + + master := &corev1.Node{} + master.Spec.Taints = []corev1.Taint{ + {Key: "node-role.kubernetes.io/master", Effect: corev1.TaintEffectNoSchedule}, + } + if !isControlPlane(master) { + t.Error("expected control-plane node (master taint) to be detected") + } + + worker := &corev1.Node{} + worker.Spec.Taints = []corev1.Taint{ + {Key: "some/other-taint", Effect: corev1.TaintEffectNoSchedule}, + } + if isControlPlane(worker) { + t.Error("worker with an unrelated taint should not be treated as control-plane") + } + + plain := &corev1.Node{} + if isControlPlane(plain) { + t.Error("node with no taints should not be control-plane") + } +} diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go new file mode 100644 index 0000000..9338a2a --- /dev/null +++ b/pkg/cluster/resources.go @@ -0,0 +1,242 @@ +package cluster + +import ( + "encoding/json" + "fmt" + "strings" + + "sigs.k8s.io/yaml" +) + +// This file defines the generic resource-tree configuration that fluence reads +// to inject virtual resources into the Fluxion graph. It is deliberately free +// of any quantum-specific assumptions: a Resource has an arbitrary string type +// and may nest children, so any shape (a qdevice over qpus over qubits, a +// license server, an FPGA pool, ...) is expressible without per-type code. +// +// Two top-level keys: +// +// resources: a forest of resource trees to add to the graph. Each tree is +// attached under a named parent vertex (default "cluster"), so the +// virtual subtree sits alongside the physical "rack" tree without +// being linked to any specific physical node (today). A future +// design could set parent to a node name to link them. +// attributes: a registry of named attribute sets. A Resource's Attributes may +// be given inline (a map) or by reference (a string naming an entry +// here), so a shared set (e.g. a region/connectivity profile) is +// defined once and reused. +// +// Every attribute becomes both a queryable/prunable RFC 31 graph property and an +// injected environment value, so what a user can constrain on, they can also +// read back. (Graph/jobspec wiring lives in later pieces; this file only parses +// and validates.) + +// DefaultParent is the vertex a resource tree attaches under when none is given. +const DefaultParent = "cluster" + +// Resource is one vertex in a configured resource tree. Type is the Fluxion +// resource type (any string); Count is the size at this vertex; With are child +// resources (recursive). Attributes are resolved to a concrete map after load. +type Resource struct { + // Type is the Fluxion vertex type (e.g. "qdevice", "qpu", "qubit"). Required. + Type string `json:"type"` + // Name is an explicit vertex name (e.g. a backend id like "rigetti_cepheus"). + // Optional; the builder derives one from type+index when empty. + Name string `json:"name,omitempty"` + // Count is the resource quantity at this vertex (maps to the graph vertex + // size). Defaults to 1 when zero. + Count int64 `json:"count,omitempty"` + // Parent is the vertex this tree attaches under. Only meaningful on a + // top-level resource (children attach under their parent in With). Defaults + // to DefaultParent ("cluster"). + Parent string `json:"parent,omitempty"` + // Attributes is either an inline map (object) or a reference (string) into + // the top-level attributes registry. Resolved into ResolvedAttributes at + // load time; do not read this field directly afterward. + Attributes AttributeSpec `json:"attributes,omitempty"` + // With are child resources contained by this one (recursive). + With []Resource `json:"with,omitempty"` + + // ResolvedAttributes is the concrete attribute map after reference + // resolution. Populated by LoadResourcesConfig; nil if none. + ResolvedAttributes map[string]string `json:"-"` +} + +// AttributeSpec is a polymorphic attributes field: either a reference (a string +// naming a registry entry) or an inline map. Exactly one form is used. +type AttributeSpec struct { + // Ref is the registry key when attributes were given as a string. + Ref string + // Inline is the attribute map when attributes were given as an object. + Inline map[string]string +} + +// UnmarshalJSON accepts either a JSON string (reference) or object (inline map). +// (sigs.k8s.io/yaml converts YAML to JSON first, so this covers YAML too.) +func (a *AttributeSpec) UnmarshalJSON(data []byte) error { + if len(data) == 0 || string(data) == "null" { + return nil + } + switch data[0] { + case '"': + var s string + if err := json.Unmarshal(data, &s); err != nil { + return err + } + a.Ref = s + return nil + case '{': + m := map[string]string{} + if err := json.Unmarshal(data, &m); err != nil { + return err + } + a.Inline = m + return nil + default: + return fmt.Errorf("attributes must be a string reference or a map, got: %s", data) + } +} + +// empty reports whether no attributes were specified. +func (a AttributeSpec) empty() bool { + return a.Ref == "" && len(a.Inline) == 0 +} + +// ResourcesConfig is the on-disk configuration: a forest of resource trees plus +// a registry of named attribute sets they may reference. +type ResourcesConfig struct { + Resources []Resource `json:"resources"` + Attributes map[string]map[string]string `json:"attributes,omitempty"` +} + +// LoadResourcesConfig parses a YAML or JSON resources configuration and resolves +// every resource's attributes (inline or by reference) into a concrete map. A +// reference to an unknown attribute set is a hard error, as is a resource with +// no type. +func LoadResourcesConfig(data []byte) (*ResourcesConfig, error) { + var c ResourcesConfig + if err := yaml.Unmarshal(data, &c); err != nil { + return nil, fmt.Errorf("parse resources config: %w", err) + } + // Guard against a silent empty graph: if the file has real content (not just + // whitespace/comments) but produced zero resources, the schema almost + // certainly doesn't match (e.g. a top-level key other than "resources:", like + // an older "backends:" layout). Building a classical-only graph silently in + // that case hides the misconfiguration — the configured devices simply never + // appear in the graph. A genuinely empty file (an optional configmap not yet + // populated) is fine and stays classical-only. + if len(c.Resources) == 0 && hasContent(data) { + return nil, fmt.Errorf( + "resources config has content but defines no resources: " + + "expected a top-level 'resources:' list (check the schema)") + } + for i := range c.Resources { + if err := resolveResource(&c.Resources[i], c.Attributes, true); err != nil { + return nil, err + } + } + return &c, nil +} + +// hasContent reports whether data has any non-comment, non-whitespace content, +// so a truly empty config (classical-only) is distinguished from a malformed or +// schema-mismatched one. +func hasContent(data []byte) bool { + for _, line := range strings.Split(string(data), "\n") { + t := strings.TrimSpace(line) + if t == "" || strings.HasPrefix(t, "#") { + continue + } + return true + } + return false +} + +// resolveResource validates a resource and resolves its attributes, recursing +// into children. topLevel resources get a default parent. +func resolveResource(r *Resource, registry map[string]map[string]string, topLevel bool) error { + return resolveResourceInherited(r, registry, topLevel, nil) +} + +// resolveResourceInherited resolves a resource's attributes and threads them +// down to children. A child inherits every attribute of its parent, then applies +// its own: a key the child also sets overrides the inherited value, and a key the +// child sets to the empty string clears it (removed from the resolved set). This +// makes a constraint that selects a nested node by class AND filters on an +// attribute reachable — every node on the path (parent down to the target) +// carries the attribute, so none is pruned mid-descent. +func resolveResourceInherited( + r *Resource, + registry map[string]map[string]string, + topLevel bool, + inherited map[string]string, +) error { + if r.Type == "" { + return fmt.Errorf("resource is missing required field 'type'") + } + if topLevel && r.Parent == "" { + r.Parent = DefaultParent + } + + // own holds this resource's explicitly-set attributes (before inheritance). + var own map[string]string + if !r.Attributes.empty() { + resolved, err := resolveAttributes(r.Attributes, registry, r.Type) + if err != nil { + return err + } + own = resolved + } + + // Merge: start from inherited, apply own (override), honor explicit-clear. + merged := make(map[string]string, len(inherited)+len(own)) + for k, v := range inherited { + merged[k] = v + } + for k, v := range own { + if v == "" { + delete(merged, k) // explicit clear + continue + } + merged[k] = v + } + if len(merged) > 0 { + r.ResolvedAttributes = merged + } else { + r.ResolvedAttributes = nil + } + + for i := range r.With { + if err := resolveResourceInherited(&r.With[i], registry, false, merged); err != nil { + return err + } + } + return nil +} + +// resolveAttributes turns an AttributeSpec into a concrete map: inline is used +// directly; a reference is looked up in the registry (error if absent). +func resolveAttributes( + spec AttributeSpec, + registry map[string]map[string]string, + resourceType string, +) (map[string]string, error) { + if spec.Ref != "" { + set, ok := registry[spec.Ref] + if !ok { + return nil, fmt.Errorf( + "resource %q references unknown attribute set %q", resourceType, spec.Ref) + } + // Copy so callers can't mutate the shared registry entry. + out := make(map[string]string, len(set)) + for k, v := range set { + out[k] = v + } + return out, nil + } + out := make(map[string]string, len(spec.Inline)) + for k, v := range spec.Inline { + out[k] = v + } + return out, nil +} diff --git a/pkg/cluster/resources_graph.go b/pkg/cluster/resources_graph.go new file mode 100644 index 0000000..d5a5e25 --- /dev/null +++ b/pkg/cluster/resources_graph.go @@ -0,0 +1,197 @@ +package cluster + +import ( + "fmt" + "sort" + + "github.com/converged-computing/fluence/pkg/jgf" + "github.com/converged-computing/fluence/pkg/placement" +) + +// Property keys fluence stamps on graph vertices. VirtualProperty marks whether +// a node vertex is a configured virtual resource (true) or a physical compute +// node (false); it is the discriminator the jobspec constrains on and that +// placement uses to tell a bind target from a backend. ClassProperty preserves +// the configured resource type when a virtual resource is modeled as a node, so +// the original type (e.g. "qdevice") is not lost. +const ( + VirtualProperty = "virtual" + ClassProperty = "class" +) + +// ComposeProperty encodes a key/value as the single "key=value" string that +// Fluxion stores as a property key. RFC 31 PropertyConstraint matching is +// key-presence only — it never compares the value half of a vertex property — +// so to filter on a value the value must be part of the key. This is also +// exactly how the rv1 match writer emits properties (prop + "=" + value), so the +// encoding round-trips symmetrically between what we put in and what we read +// back from an allocation. A bare tag (empty value) composes to just "key". +func ComposeProperty(key, value string) string { + if value == "" { + return key + } + return key + "=" + value +} + +// virtualProperties builds the property SET (composed key=value strings, stored +// as JGF property keys with empty values) for a configured virtual resource: the +// virtual marker, the class of this resource AND of every descendant in its +// subtree, and each user attribute namespaced under the Fluxion resource prefix. +// Every entry is filterable by an identical composed string in a jobspec +// constraint. +// +// Descendant classes are included because a jobspec constraint is GLOBAL and is +// evaluated against every node vertex the matcher descends through (RFC 31 +// constraints prune a node, and its subtree, on mismatch). To select a nested +// node by class (e.g. class=qpu where qpu sits under a qdevice node), every +// ancestor node on the path must also satisfy class=qpu — so each node carries +// the classes of everything beneath it. The trees are shallow, so these sets +// stay small. A class= constraint then reaches any node of that type +// because every ancestor down to it advertises that class. +func virtualProperties(res *Resource) map[string]string { + props := map[string]string{ + ComposeProperty(VirtualProperty, "true"): "", + } + for t := range subtreeClasses(res) { + props[ComposeProperty(ClassProperty, t)] = "" + } + for k, v := range res.ResolvedAttributes { + props[ComposeProperty(placement.FluxionResourcePrefix+k, v)] = "" + } + return props +} + +// subtreeClasses returns the set of resource types in res's subtree, including +// res itself — the classes a node must advertise so a constraint selecting any +// type within the subtree is not pruned at this node on the way down. +func subtreeClasses(res *Resource) map[string]bool { + types := map[string]bool{} + var walk func(r *Resource) + walk = func(r *Resource) { + types[r.Type] = true + for i := range r.With { + walk(&r.With[i]) + } + } + walk(res) + return types +} + +// appendResources attaches each configured resource tree under its parent in +// the builder, continuing the shared node-rank counter. parents maps vertex +// names (physical nodes + the cluster root) to their builder handles. +func appendResources( + b *jgf.Builder, + parents map[string]*jgf.Vertex, + clusterName string, + resources []Resource, + rank *int64, +) error { + for i := range resources { + res := &resources[i] + parentName := res.Parent + if parentName == "" { + parentName = clusterName + } + parent, ok := parents[parentName] + if !ok { + return fmt.Errorf( + "resource %q references unknown parent vertex %q", res.Type, parentName) + } + if err := addResource(b, parent, res, rank); err != nil { + return err + } + } + return nil +} + +// addResource adds one configured resource, at ANY depth, as a node vertex +// carrying virtual=true, its configured type as the class property, and its +// attributes as RFC 31 properties — then recurses on its children the same way. +// +// Every level is a node so that every level is independently selectable by a +// class= constraint (RFC 31 property constraints match only node and +// storage_node vertices, so a non-node vertex could never be filtered). The +// vertex basename stays the configured type, so jgf auto-names it (qpu0, qubit0) +// and class= is meaningful; only the graph TYPE is "node". +// +// Every node consumes a real, unique rank from the shared counter. Ranks must be +// real (not -1) for any node vertex; the virtual sub-resources therefore appear +// in the rv1 nodelist, which is fine here because the allocation is a hold whose +// response we read directly — we never dispatch work to those ranks (no +// flux-core execution). +func addResource(b *jgf.Builder, parent *jgf.Vertex, res *Resource, rank *int64) error { + r := *rank + *rank++ + size := res.Count + if size == 0 { + size = 1 + } + v := b.AddChild(parent, "node", res.Type, jgf.Options{ + Name: res.Name, // empty -> jgf auto-names as + Size: size, + Rank: &r, + NodeProperties: virtualProperties(res), + }) + + for i := range res.With { + if err := addResource(b, v, &res.With[i], rank); err != nil { + return err + } + } + return nil +} + +// FluxionResourceNames returns the distinct extended-resource names a device +// plugin should advertise for the configured resources: every counted resource +// TYPE under a top-level resource (the top-level virtual node is matched via the +// FluxionResourceNames returns the distinct extended-resource names a device +// plugin should advertise: every resource TYPE at every level of the configured +// trees. Every virtual resource is a node selectable by class=, so every +// type — top-level (qdevice) and nested (qpu, qubit) alike — is requestable. +// Prefixed so they match what the scheduler strips off a pod request. +func FluxionResourceNames(resources []Resource) []string { + types := map[string]bool{} + for i := range resources { + collectTypes(&resources[i], types) + } + names := make([]string, 0, len(types)) + for t := range types { + names = append(names, placement.FluxionResourcePrefix+t) + } + sort.Strings(names) + return names +} + +func collectTypes(res *Resource, types map[string]bool) { + types[res.Type] = true + for i := range res.With { + collectTypes(&res.With[i], types) + } +} + +// AttributeKeys returns the sorted union of every user attribute key across all +// configured resources (and their children). This is the set of attributes that +// may be injected into a workload's environment; computing it from the config +// (rather than hardcoding) means the env contract automatically tracks whatever +// attributes the backends declare. The backend identity itself is separate +// (FLUXION_BACKEND) and not included here. +func AttributeKeys(resources []Resource) []string { + keys := map[string]bool{} + var walk func(rs []Resource) + walk = func(rs []Resource) { + for i := range rs { + for k := range rs[i].ResolvedAttributes { + keys[k] = true + } + walk(rs[i].With) + } + } + walk(resources) + out := make([]string, 0, len(keys)) + for k := range keys { + out = append(out, k) + } + sort.Strings(out) + return out +} diff --git a/pkg/cluster/resources_graph_test.go b/pkg/cluster/resources_graph_test.go new file mode 100644 index 0000000..9c7339a --- /dev/null +++ b/pkg/cluster/resources_graph_test.go @@ -0,0 +1,221 @@ +package cluster + +import ( + "encoding/json" + "testing" + + "github.com/converged-computing/fluence/pkg/jgf" +) + +// buildResourceNodes appends the given config's resource trees to a fresh graph +// (simulating physical nodes already having consumed ranks 0..startRank-1) and +// returns the emitted vertex metadata for assertions. +func buildResourceNodes(t *testing.T, cfg string, startRank int64) []map[string]any { + t.Helper() + c, err := LoadResourcesConfig([]byte(cfg)) + if err != nil { + t.Fatalf("load: %v", err) + } + b := jgf.NewBuilder() + cluster := b.AddRoot("cluster", "cluster", jgf.Options{Name: "cluster"}) + parents := map[string]*jgf.Vertex{"cluster": cluster} + rank := startRank + if err := appendResources(b, parents, "cluster", c.Resources, &rank); err != nil { + t.Fatalf("append: %v", err) + } + raw, _ := b.JSON() + var doc struct { + Graph struct { + Nodes []struct { + Metadata map[string]any `json:"metadata"` + } `json:"nodes"` + } `json:"graph"` + } + if err := json.Unmarshal(raw, &doc); err != nil { + t.Fatalf("unmarshal: %v\n%s", err, raw) + } + out := []map[string]any{} + for _, n := range doc.Graph.Nodes { + out = append(out, n.Metadata) + } + return out +} + +func metaByName(nodes []map[string]any, name string) map[string]any { + for _, m := range nodes { + if m["name"] == name { + return m + } + } + return nil +} + +// A configured virtual resource is modeled as a node vertex carrying +// virtual=true, the class of itself AND its descendants, its (inherited) +// attributes as namespaced RFC 31 properties, and a real rank. EVERY level +// (qdevice, qpu, qubit) is a node so each is independently selectable by a +// class= constraint. +func TestVirtualResourceModeledAsNode(t *testing.T) { + cfg := ` +resources: + - type: qdevice + name: rigetti_cepheus + count: 1 + attributes: + region: us-east-1 + with: + - type: qpu + count: 1 + with: + - type: qubit + count: 80 +` + nodes := buildResourceNodes(t, cfg, 3) // ranks 0-2 taken by physical nodes + + dev := metaByName(nodes, "rigetti_cepheus") + if dev == nil { + t.Fatal("rigetti_cepheus vertex not found") + } + if dev["type"] != "node" { + t.Errorf("virtual device type = %v, want node", dev["type"]) + } + if dev["rank"] != float64(3) { + t.Errorf("virtual device rank = %v, want 3 (continues physical ranks)", dev["rank"]) + } + props, ok := dev["properties"].(map[string]any) + if !ok { + t.Fatalf("no properties object: %#v", dev["properties"]) + } + // Properties are composed key=value strings (RFC 31 match is key-presence + // only, so the value must be in the key), stored with empty values. + if _, ok := props["virtual=true"]; !ok { + t.Errorf("missing virtual=true property; got %#v", props) + } + // The qdevice node carries its own class AND its descendants' classes, so a + // class=qpu / class=qubit constraint is not pruned at this node on the way + // down to a nested target. + for _, want := range []string{"class=qdevice", "class=qpu", "class=qubit"} { + if _, ok := props[want]; !ok { + t.Errorf("missing %q property on qdevice node; got %#v", want, props) + } + } + if _, ok := props["fluxion.flux-framework.org/region=us-east-1"]; !ok { + t.Errorf("missing composed region property; got %#v", props) + } + + // The qpu is now ALSO a node, with class=qpu (and its descendant class=qubit), + // virtual=true, and the inherited region attribute, on its own real rank. + qpu := metaByName(nodes, "qpu0") + if qpu == nil { + t.Fatal("qpu child (qpu0) not found") + } + if qpu["type"] != "node" { + t.Errorf("qpu type = %v, want node (every level is a node)", qpu["type"]) + } + if qpu["rank"] == float64(-1) { + t.Errorf("qpu rank = -1, want a real rank (it is a node)") + } + qprops, _ := qpu["properties"].(map[string]any) + for _, want := range []string{"class=qpu", "class=qubit", "virtual=true", "fluxion.flux-framework.org/region=us-east-1"} { + if _, ok := qprops[want]; !ok { + t.Errorf("qpu node missing %q (class self+descendant, virtual, inherited attr); got %#v", want, qprops) + } + } + // The qpu must NOT advertise class=qdevice (that is an ancestor, not in its + // subtree) — otherwise class=qdevice would wrongly select the qpu too. + if _, ok := qprops["class=qdevice"]; ok { + t.Errorf("qpu node should not carry class=qdevice (ancestor class); got %#v", qprops) + } +} + +// FluxionResourceNames includes every type at every level (qdevice, qpu, qubit), +// because every virtual resource is a node selectable by class=. +func TestFluxionResourceNamesIncludesAllLevels(t *testing.T) { + cfg := ` +resources: + - type: qdevice + name: d + with: + - type: qpu + count: 1 + with: + - type: qubit + count: 4 +` + c, err := LoadResourcesConfig([]byte(cfg)) + if err != nil { + t.Fatal(err) + } + names := FluxionResourceNames(c.Resources) + want := map[string]bool{ + "fluxion.flux-framework.org/qdevice": true, + "fluxion.flux-framework.org/qpu": true, + "fluxion.flux-framework.org/qubit": true, + } + if len(names) != len(want) { + t.Fatalf("names = %v, want all of %v", names, want) + } + for _, n := range names { + if !want[n] { + t.Errorf("unexpected name %q", n) + } + } +} + +// An unknown parent reference is a hard error. +func TestUnknownParentErrors(t *testing.T) { + cfg := ` +resources: + - type: qpu + name: orphan + parent: nonexistent-node +` + c, err := LoadResourcesConfig([]byte(cfg)) + if err != nil { + t.Fatal(err) + } + b := jgf.NewBuilder() + cluster := b.AddRoot("cluster", "cluster", jgf.Options{Name: "cluster"}) + parents := map[string]*jgf.Vertex{"cluster": cluster} + var rank int64 + if err := appendResources(b, parents, "cluster", c.Resources, &rank); err == nil { + t.Fatal("expected an error for an unknown parent") + } +} + +// AttributeKeys returns the sorted union of attribute keys across all resources +// and children, deduplicated — this is the env-injection contract. +func TestAttributeKeysUnion(t *testing.T) { + cfg := ` +resources: + - type: qdevice + name: a + attributes: + region: us-east-1 + connectivity: all-to-all + with: + - type: qpu + attributes: + qubits: "80" + - type: qdevice + name: b + attributes: + region: us-west-2 + vendor: ibm +` + c, err := LoadResourcesConfig([]byte(cfg)) + if err != nil { + t.Fatal(err) + } + keys := AttributeKeys(c.Resources) + // union: connectivity, qubits, region, vendor (region deduped) + want := []string{"connectivity", "qubits", "region", "vendor"} + if len(keys) != len(want) { + t.Fatalf("keys = %v, want %v", keys, want) + } + for i := range want { + if keys[i] != want[i] { + t.Fatalf("keys = %v, want %v", keys, want) + } + } +} diff --git a/pkg/cluster/resources_test.go b/pkg/cluster/resources_test.go new file mode 100644 index 0000000..b90370f --- /dev/null +++ b/pkg/cluster/resources_test.go @@ -0,0 +1,214 @@ +package cluster + +import ( + "testing" +) + +// A realistic config: a virtual qdevice (under the default parent) holding a qpu +// child, with attributes given by reference, plus a second device with inline +// attributes. Exercises nesting, count, the parent default, and both attribute +// forms. +const sampleConfig = ` +resources: + - type: qdevice + name: rigetti_cepheus + attributes: aws-east + with: + - type: qpu + count: 1 + attributes: aws-east + with: + - type: qubit + count: 80 + - type: qdevice + name: ibm_marrakesh + parent: cluster + attributes: + region: us-west-2 + vendor: ibm + +attributes: + aws-east: + region: us-east-1 + connectivity: all-to-all +` + +func TestLoadResourcesConfig(t *testing.T) { + c, err := LoadResourcesConfig([]byte(sampleConfig)) + if err != nil { + t.Fatalf("load: %v", err) + } + if len(c.Resources) != 2 { + t.Fatalf("got %d top-level resources, want 2", len(c.Resources)) + } + + rigetti := c.Resources[0] + if rigetti.Type != "qdevice" || rigetti.Name != "rigetti_cepheus" { + t.Errorf("first resource = %+v", rigetti) + } + // Parent defaults to cluster when unset. + if rigetti.Parent != DefaultParent { + t.Errorf("rigetti parent = %q, want %q", rigetti.Parent, DefaultParent) + } + // Referenced attributes resolved. + if rigetti.ResolvedAttributes["region"] != "us-east-1" || + rigetti.ResolvedAttributes["connectivity"] != "all-to-all" { + t.Errorf("rigetti attributes = %v", rigetti.ResolvedAttributes) + } + + // Nested child + count. + if len(rigetti.With) != 1 || rigetti.With[0].Type != "qpu" { + t.Fatalf("rigetti child = %+v", rigetti.With) + } + qpu := rigetti.With[0] + if qpu.Count != 1 { + t.Errorf("qpu count = %d, want 1", qpu.Count) + } + if qpu.ResolvedAttributes["region"] != "us-east-1" { + t.Errorf("qpu resolved attributes = %v", qpu.ResolvedAttributes) + } + if len(qpu.With) != 1 || qpu.With[0].Type != "qubit" || qpu.With[0].Count != 80 { + t.Errorf("qubit child = %+v", qpu.With) + } + + // Inline attributes on the second device. + ibm := c.Resources[1] + if ibm.ResolvedAttributes["region"] != "us-west-2" || ibm.ResolvedAttributes["vendor"] != "ibm" { + t.Errorf("ibm attributes = %v", ibm.ResolvedAttributes) + } +} + +// A referenced attribute set that doesn't exist is a hard error, not a silent +// empty map. +func TestUnknownAttributeReferenceErrors(t *testing.T) { + cfg := ` +resources: + - type: qdevice + name: x + attributes: does-not-exist +` + _, err := LoadResourcesConfig([]byte(cfg)) + if err == nil { + t.Fatal("expected an error for an unknown attribute reference") + } +} + +// A resource missing 'type' is a hard error. +func TestMissingTypeErrors(t *testing.T) { + cfg := ` +resources: + - name: x +` + _, err := LoadResourcesConfig([]byte(cfg)) + if err == nil { + t.Fatal("expected an error for a resource with no type") + } +} + +// Resolved attribute maps are copies, so mutating one resource's map does not +// bleed into the shared registry or another resource referencing the same set. +func TestReferencedAttributesAreCopied(t *testing.T) { + cfg := ` +resources: + - type: a + attributes: shared + - type: b + attributes: shared +attributes: + shared: + k: v +` + c, err := LoadResourcesConfig([]byte(cfg)) + if err != nil { + t.Fatalf("load: %v", err) + } + c.Resources[0].ResolvedAttributes["k"] = "mutated" + if c.Resources[1].ResolvedAttributes["k"] != "v" { + t.Errorf("mutation bled across resources: %v", c.Resources[1].ResolvedAttributes) + } +} + +// A resource with no attributes leaves ResolvedAttributes nil (not an empty map +// we'd then emit as an empty properties object). +func TestNoAttributesIsNil(t *testing.T) { + cfg := ` +resources: + - type: node + name: plain +` + c, err := LoadResourcesConfig([]byte(cfg)) + if err != nil { + t.Fatalf("load: %v", err) + } + if c.Resources[0].ResolvedAttributes != nil { + t.Errorf("expected nil attributes, got %v", c.Resources[0].ResolvedAttributes) + } +} + +// A config with content but no resources: key (e.g. an older "backends:" layout) +// must error loudly rather than silently yielding an empty graph. +func TestLoadResourcesConfigRejectsSchemaMismatch(t *testing.T) { + _, err := LoadResourcesConfig([]byte("backends:\n - name: ibm_fez\n num_qubits: 156\n")) + if err == nil { + t.Fatal("a non-empty config defining no resources should error (schema mismatch)") + } +} + +// A genuinely empty or comment-only config stays classical-only (no error). +func TestLoadResourcesConfigEmptyIsClassical(t *testing.T) { + for _, in := range []string{"", " \n\n", "# only a comment\n"} { + c, err := LoadResourcesConfig([]byte(in)) + if err != nil { + t.Fatalf("empty/comment config %q should be OK, got: %v", in, err) + } + if len(c.Resources) != 0 { + t.Fatalf("empty config %q should yield 0 resources", in) + } + } +} + +// Attributes inherit downward: a child gets the parent's attributes, can override +// a key, and can clear one by setting it to empty. This makes a nested class + +// attribute constraint reachable (every node on the path carries the attribute). +func TestAttributeInheritance(t *testing.T) { + cfg := ` +resources: + - type: qdevice + name: d + attributes: + region: us-east-1 + vendor: ibm + with: + - type: qpu + attributes: + vendor: rigetti + region: "" + with: + - type: qubit + count: 4 +` + c, err := LoadResourcesConfig([]byte(cfg)) + if err != nil { + t.Fatal(err) + } + dev := c.Resources[0] + if dev.ResolvedAttributes["region"] != "us-east-1" || dev.ResolvedAttributes["vendor"] != "ibm" { + t.Fatalf("qdevice attrs = %v", dev.ResolvedAttributes) + } + qpu := dev.With[0] + // vendor overridden, region cleared (explicit ""), nothing else + if qpu.ResolvedAttributes["vendor"] != "rigetti" { + t.Errorf("qpu vendor = %q, want rigetti (override)", qpu.ResolvedAttributes["vendor"]) + } + if _, ok := qpu.ResolvedAttributes["region"]; ok { + t.Errorf("qpu region should be cleared (explicit empty); got %v", qpu.ResolvedAttributes) + } + // qubit inherits qpu's resolved attrs (vendor=rigetti, no region) + qubit := qpu.With[0] + if qubit.ResolvedAttributes["vendor"] != "rigetti" { + t.Errorf("qubit should inherit vendor=rigetti; got %v", qubit.ResolvedAttributes) + } + if _, ok := qubit.ResolvedAttributes["region"]; ok { + t.Errorf("qubit should not have region (cleared at qpu); got %v", qubit.ResolvedAttributes) + } +} diff --git a/pkg/fluence/fluence.go b/pkg/fluence/fluence.go index 5edbbf4..439e0d1 100644 --- a/pkg/fluence/fluence.go +++ b/pkg/fluence/fluence.go @@ -5,11 +5,14 @@ import ( "fmt" "log" "os" + "sort" "strconv" + "strings" "sync" "github.com/converged-computing/fluence/pkg/cluster" "github.com/converged-computing/fluence/pkg/graph" + "github.com/converged-computing/fluence/pkg/jobspec" "github.com/converged-computing/fluence/pkg/placement" corev1 "k8s.io/api/core/v1" @@ -46,24 +49,32 @@ type matcher interface { // the graph itself is rebuilt fresh on restart. type groupAlloc struct { place placement.Placement - jobid uint64 + // jobids are the Fluxion allocations backing this group — one per match + // (compute, plus one per requested virtual device). All are held (duration 0) + // and cancelled together; the group is all-or-nothing across them. + jobids []uint64 } // Fluence is a scheduler-framework plugin that places whole pod groups by // matching them against a flux-sched resource graph built from the live cluster -// (plus any configured quantum resources). Gang/all-or-nothing semantics are +// (plus any configured virtual resources). Gang/all-or-nothing semantics are // delegated to the native PodGroup API; Fluence only decides placement. type Fluence struct { handle fwk.Handle matcher matcher + // knownDevices is the set of virtual resource types the graph models (suffix + // only, e.g. "qpu"), used to reject a pod requesting a device that does not + // exist before issuing a match. Empty when no resources are configured. + knownDevices map[string]bool + // matcherMu serializes all access to the cgo Fluxion client, which is not // thread-safe. Match runs on the (sequential) scheduling path; Cancel runs in // informer handler goroutines, so the two can race without this. matcherMu sync.Mutex mu sync.Mutex - // placement maps a group key to its allocation (nodes, backend, jobid). + // placement maps a group key to its allocation (nodes, backend, jobids). placement map[string]groupAlloc } @@ -94,15 +105,22 @@ func New(ctx context.Context, _ runtime.Object, h fwk.Handle) (fwk.Plugin, error // the resources add-on is applied, so a missing file is classical-only, not // fatal. opts := cluster.Options{} + knownDevices := map[string]bool{} if path := os.Getenv("FLUENCE_RESOURCES"); path != "" { raw, err := os.ReadFile(path) switch { case err == nil: - qc, err := cluster.LoadQuantumConfig(raw) + rc, err := cluster.LoadResourcesConfig(raw) if err != nil { return nil, err } - opts.Quantum = qc.Backends + opts.Resources = rc.Resources + // The requestable device types are the FluxionResourceNames, minus + // the prefix — the suffixes a pod requests as + // fluxion.flux-framework.org/. + for _, name := range cluster.FluxionResourceNames(rc.Resources) { + knownDevices[strings.TrimPrefix(name, placement.FluxionResourcePrefix)] = true + } case os.IsNotExist(err): // No resources config mounted -> classical-only graph. default: @@ -114,7 +132,9 @@ func New(ctx context.Context, _ runtime.Object, h fwk.Handle) (fwk.Plugin, error if err != nil { return nil, fmt.Errorf("build resource graph: %w", err) } - fmt.Printf("Fluence resource graph:\n%s", jgfBytes) + fmt.Println("[fluence] === RESOURCE GRAPH (knownDevices=" + + fmt.Sprintf("%v", keysOf(knownDevices)) + ") ===") + fmt.Println(string(jgfBytes)) // FluxionGraph.Init reads from a file path, so stage the generated graph. tmp, err := os.CreateTemp("", "fluence-graph-*.json") @@ -130,13 +150,20 @@ func New(ctx context.Context, _ runtime.Object, h fwk.Handle) (fwk.Plugin, error // scheduling key is the same JGF vertex subgraph we parse for placement, and // it carries the execution view flux uses to replay an allocation on restart. // This is the format we persist and feed back to UpdateAllocate for recovery. - fluxion := &graph.FluxionGraph{MatchFormat: "rv1"} + // jgf match format: emits every allocated vertex (with properties) as a graph, + // regardless of type. rv1 cannot represent our allocations — its R_lite is + // built from core/gpu "reducer" children under a node, so a virtual allocation + // that bottoms out in nodes (no cores) serializes to an empty R. jgf has no + // such assumption and is exactly what PlacementFromAllocation parses (node + // vertices + their composed marker/attribute properties). + fluxion := &graph.FluxionGraph{MatchFormat: "jgf"} fluxion.Init(tmp.Name(), os.Getenv("FLUENCE_MATCH_POLICY"), "") f := &Fluence{ - handle: h, - matcher: fluxion, - placement: map[string]groupAlloc{}, + handle: h, + matcher: fluxion, + knownDevices: knownDevices, + placement: map[string]groupAlloc{}, } f.registerCancelHandlers() return f, nil @@ -168,38 +195,22 @@ func (f *Fluence) PreFilter( return nil, fwk.AsStatus(err) } - js, err := placement.JobspecForGroup(group, pods) - if err != nil { - return nil, fwk.AsStatus(err) - } - specYAML, err := js.YAML() + specs, err := placement.JobspecsForGroup(group, pods, f.knownDevices) if err != nil { return nil, fwk.AsStatus(err) } - fmt.Printf("Attempting to match:\n%s\n", specYAML) - f.matcherMu.Lock() - req, err := f.matcher.MatchAllocateSpec(specYAML) - f.matcherMu.Unlock() - if err != nil { - fmt.Printf("FAIL Match failed: %s\n", err) - return nil, fwk.NewStatus(fwk.Unschedulable, fmt.Sprintf("fluxion match failed: %v", err)) + // Run every jobspec as an independent held allocation (duration 0). The group + // is all-or-nothing: if any match fails, cancel the ones that already + // succeeded so we never hold a partial allocation (e.g. compute without its + // device, or vice versa). + place, jobids, status := f.matchGroup(specs) + if !status.IsSuccess() { + return nil, status } - place, err := placement.PlacementFromAllocation(req.Allocation) - if err != nil { - fmt.Printf("FAIL Placement failed: %s\n", err) - return nil, fwk.AsStatus(err) - } - if len(place.Nodes) == 0 && place.Backend == "" { - fmt.Println("FAIL No nodes") - return nil, fwk.NewStatus(fwk.Unschedulable, "fluxion returned no allocation") - } - // A quantum-only allocation has a Backend but no Nodes (a qpu vertex lives - // under the qgateway, not under a compute node). That is valid — the backend - // is reachable from any node — so Filter imposes no node constraint then. f.mu.Lock() - f.placement[group] = groupAlloc{place: place, jobid: req.Number} + f.placement[group] = groupAlloc{place: place, jobids: jobids} f.mu.Unlock() // The jobid (for cancel) and any backend (for the webhook env) are written @@ -207,6 +218,91 @@ func (f *Fluence) PreFilter( return nil, fwk.NewStatus(fwk.Success) } +// matchGroup runs each jobspec as an independent held Fluxion allocation and +// combines them into one placement. It is all-or-nothing: on the first failure +// it cancels every allocation already made and returns an Unschedulable status, +// so the group never holds a partial set (compute without its device, etc.). +// +// The combined placement unions the per-match results: the compute match +// supplies the bind nodes, a device match supplies the backend identity. (The +// per-match split of nodes vs backend is PlacementFromAllocation's job; here we +// merge.) +func (f *Fluence) matchGroup(specs []*jobspec.Jobspec) (placement.Placement, []uint64, *fwk.Status) { + var combined placement.Placement + var jobids []uint64 + + for i, js := range specs { + // Render the jobspec as JSON, not YAML. flux-sched's RFC 31 constraint + // parser requires each property to be a QUOTED scalar (it checks the YAML + // tag == "!"); sigs.k8s.io/yaml emits property strings unquoted + // (e.g. "- virtual=false"), which the parser rejects with "non-string + // property specified" -> the whole match fails with -1. JSON always quotes + // strings, and JSON is valid YAML input to the matcher, so this is the + // reliable encoding for jobspecs that carry constraints. + spec, err := js.JSON() + if err != nil { + f.cancelJobids(jobids) + return placement.Placement{}, nil, fwk.AsStatus(err) + } + + fmt.Println(fmt.Sprintf("[fluence] === MATCH %d/%d: submitting jobspec to fluxion ===", i+1, len(specs))) + fmt.Println(spec) + + f.matcherMu.Lock() + req, err := f.matcher.MatchAllocateSpec(spec) + f.matcherMu.Unlock() + if err != nil { + log.Printf("[fluence] MATCH %d/%d FAILED: %v — rolling back jobids %v", + i+1, len(specs), err, jobids) + f.cancelJobids(jobids) + return placement.Placement{}, nil, fwk.NewStatus( + fwk.Unschedulable, fmt.Sprintf("fluxion match failed: %v", err)) + } + + fmt.Println(fmt.Sprintf("[fluence] MATCH %d/%d allocated jobid %d; fluxion R:", i+1, len(specs), req.Number)) + fmt.Println(req.Allocation) + + place, err := placement.PlacementFromAllocation(req.Allocation) + if err != nil { + log.Printf("[fluence] MATCH %d/%d placement-parse FAILED: %v", i+1, len(specs), err) + f.cancelJobids(append(jobids, req.Number)) + return placement.Placement{}, nil, fwk.AsStatus(err) + } + fmt.Println(fmt.Sprintf("[fluence] MATCH %d/%d parsed: nodes=%v backend=%q attrs=%v", + i+1, len(specs), place.Nodes, place.Backend, place.BackendAttributes)) + + jobids = append(jobids, req.Number) + combined.Nodes = append(combined.Nodes, place.Nodes...) + if place.Backend != "" { + combined.Backend = place.Backend + combined.BackendAttributes = place.BackendAttributes + } + } + + if len(combined.Nodes) == 0 && combined.Backend == "" { + log.Printf("[fluence] match produced no nodes and no backend — unschedulable") + f.cancelJobids(jobids) + return placement.Placement{}, nil, fwk.NewStatus( + fwk.Unschedulable, "fluxion returned no allocation") + } + fmt.Println(fmt.Sprintf("[fluence] GROUP MATCHED: nodes=%v backend=%q attrs=%v jobids=%v", + combined.Nodes, combined.Backend, combined.BackendAttributes, jobids)) + return combined, jobids, fwk.NewStatus(fwk.Success) +} + +// cancelJobids frees a set of held allocations, used to unwind a partial group +// match. Cancel is idempotent and best-effort; errors are logged, not returned. +func (f *Fluence) cancelJobids(jobids []uint64) { + for _, id := range jobids { + f.matcherMu.Lock() + err := f.matcher.Cancel(id) + f.matcherMu.Unlock() + if err != nil { + log.Printf("fluence: rollback cancel of jobid %d failed: %v", id, err) + } + } +} + // PreFilterExtensions: no add/remove pod handling for now. func (f *Fluence) PreFilterExtensions() fwk.PreFilterExtensions { return nil } @@ -275,21 +371,32 @@ func (f *Fluence) PreBind( return fwk.NewStatus(fwk.Success) // not ours; nothing to record } - if err := f.recordJobID(ctx, pod, alloc.jobid); err != nil { - return fwk.AsStatus(fmt.Errorf("record jobid: %w", err)) + if err := f.recordJobIDs(ctx, pod, alloc.jobids); err != nil { + return fwk.AsStatus(fmt.Errorf("record jobids: %w", err)) } if alloc.place.Backend != "" { - if err := f.patchPodAnnotation(ctx, pod.Namespace, pod.Name, placement.BackendAnnotation, alloc.place.Backend); err != nil { - return fwk.AsStatus(fmt.Errorf("stamp backend annotation: %w", err)) + // Stamp the backend name and all matched attributes in one patch. The + // webhook injects a normalized env per annotation so the workload reads + // exactly what it matched (backend + region/qubits/...). + ann := map[string]string{placement.BackendAnnotation: alloc.place.Backend} + for k, v := range alloc.place.BackendAttributes { + ann[placement.AttributeAnnotationPrefix+k] = v + } + log.Printf("[fluence] group %s -> backend %q attrs %v (nodes %v, jobids %v)", + groupKey(pod), alloc.place.Backend, alloc.place.BackendAttributes, + alloc.place.Nodes, alloc.jobids) + if err := f.patchPodAnnotations(ctx, pod.Namespace, pod.Name, ann); err != nil { + return fwk.AsStatus(fmt.Errorf("stamp backend annotations: %w", err)) } } return fwk.NewStatus(fwk.Success) } -// recordJobID writes the jobid annotation onto the allocation's owning object: a -// grouped pod's allocation belongs to the PodGroup; an ungrouped pod owns its own. -func (f *Fluence) recordJobID(ctx context.Context, pod *corev1.Pod, jobid uint64) error { - val := strconv.FormatUint(jobid, 10) +// recordJobIDs writes the jobid annotation (a comma-separated list of all the +// group's held allocations) onto the allocation's owning object: a grouped pod's +// allocation belongs to the PodGroup; an ungrouped pod owns its own. +func (f *Fluence) recordJobIDs(ctx context.Context, pod *corev1.Pod, jobids []uint64) error { + val := formatJobIDs(jobids) if group := placement.PodGroupName(pod); group != "" { patch := fmt.Sprintf(`{"metadata":{"annotations":{%q:%q}}}`, placement.JobIDAnnotation, val) _, err := f.handle.ClientSet().SchedulingV1alpha2().PodGroups(pod.Namespace).Patch( @@ -299,8 +406,36 @@ func (f *Fluence) recordJobID(ctx context.Context, pod *corev1.Pod, jobid uint64 return f.patchPodAnnotation(ctx, pod.Namespace, pod.Name, placement.JobIDAnnotation, val) } +// keysOf returns the keys of a set, for logging. +func keysOf(m map[string]bool) []string { + out := make([]string, 0, len(m)) + for k := range m { + out = append(out, k) + } + sort.Strings(out) + return out +} + +// formatJobIDs renders jobids as a comma-separated string for the annotation. +func formatJobIDs(jobids []uint64) string { + parts := make([]string, len(jobids)) + for i, id := range jobids { + parts[i] = strconv.FormatUint(id, 10) + } + return strings.Join(parts, ",") +} + func (f *Fluence) patchPodAnnotation(ctx context.Context, ns, name, key, val string) error { - patch := fmt.Sprintf(`{"metadata":{"annotations":{%q:%q}}}`, key, val) + return f.patchPodAnnotations(ctx, ns, name, map[string]string{key: val}) +} + +// patchPodAnnotations merges a set of annotations onto a pod in one patch. +func (f *Fluence) patchPodAnnotations(ctx context.Context, ns, name string, ann map[string]string) error { + parts := make([]string, 0, len(ann)) + for k, v := range ann { + parts = append(parts, fmt.Sprintf("%q:%q", k, v)) + } + patch := fmt.Sprintf(`{"metadata":{"annotations":{%s}}}`, strings.Join(parts, ",")) _, err := f.handle.ClientSet().CoreV1().Pods(ns).Patch( ctx, name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}) return err @@ -355,12 +490,12 @@ func (f *Fluence) onPodDeleted(obj interface{}) { f.cancelGroup(pod.Namespace+"/"+pod.Name, pod.Annotations) } -// cancelGroup frees the allocation for a deleted owning object. The jobid comes +// cancelGroup frees all allocations for a deleted owning object. The jobids come // from the object's annotation (the durable source of truth); if it is missing // (e.g. deleted between PreFilter and PreBind, before the annotation was // written) it falls back to the in-memory memo by key. Cancel is idempotent. func (f *Fluence) cancelGroup(key string, ann map[string]string) { - jobid, ok := parseJobID(ann) + jobids, ok := parseJobIDs(ann) if !ok { f.mu.Lock() alloc, found := f.placement[key] @@ -368,14 +503,16 @@ func (f *Fluence) cancelGroup(key string, ann map[string]string) { if !found { return // never scheduled by us, or already cancelled } - jobid = alloc.jobid + jobids = alloc.jobids } - f.matcherMu.Lock() - err := f.matcher.Cancel(jobid) - f.matcherMu.Unlock() - if err != nil { - log.Printf("fluence: cancel jobid %d for %s failed: %v", jobid, key, err) + for _, jobid := range jobids { + f.matcherMu.Lock() + err := f.matcher.Cancel(jobid) + f.matcherMu.Unlock() + if err != nil { + log.Printf("fluence: cancel jobid %d for %s failed: %v", jobid, key, err) + } } f.mu.Lock() @@ -383,16 +520,29 @@ func (f *Fluence) cancelGroup(key string, ann map[string]string) { f.mu.Unlock() } -func parseJobID(ann map[string]string) (uint64, bool) { +// parseJobIDs reads the comma-separated jobid annotation into a slice. Returns +// false when the annotation is absent or empty. +func parseJobIDs(ann map[string]string) ([]uint64, bool) { raw := ann[placement.JobIDAnnotation] if raw == "" { - return 0, false + return nil, false } - jobid, err := strconv.ParseUint(raw, 10, 64) - if err != nil { - return 0, false + var jobids []uint64 + for _, part := range strings.Split(raw, ",") { + part = strings.TrimSpace(part) + if part == "" { + continue + } + id, err := strconv.ParseUint(part, 10, 64) + if err != nil { + return nil, false + } + jobids = append(jobids, id) + } + if len(jobids) == 0 { + return nil, false } - return jobid, true + return jobids, true } // groupPods returns the pods belonging to the same native PodGroup as pod diff --git a/pkg/fluence/fluence_test.go b/pkg/fluence/fluence_test.go index 87e5a3e..136bf1b 100644 --- a/pkg/fluence/fluence_test.go +++ b/pkg/fluence/fluence_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/converged-computing/fluence/pkg/graph" + "github.com/converged-computing/fluence/pkg/jobspec" "github.com/converged-computing/fluence/pkg/placement" corev1 "k8s.io/api/core/v1" @@ -15,13 +16,28 @@ import ( // fakeMatcher records Cancel calls so cancel behavior can be asserted without // the real cgo/flux matcher. It satisfies the package-internal matcher interface. +// matchResults/matchErrs are consumed in order, one per MatchAllocateSpec call, +// to script multi-match (compute + device) scenarios. type fakeMatcher struct { - cancelled []uint64 - cancelErr error + cancelled []uint64 + cancelErr error + matchN int + matchResults []graph.MatchAllocateRequest + matchErrs []error } func (m *fakeMatcher) MatchAllocateSpec(string) (graph.MatchAllocateRequest, error) { - return graph.MatchAllocateRequest{}, nil + i := m.matchN + m.matchN++ + var res graph.MatchAllocateRequest + if i < len(m.matchResults) { + res = m.matchResults[i] + } + var err error + if i < len(m.matchErrs) { + err = m.matchErrs[i] + } + return res, err } func (m *fakeMatcher) Cancel(jobid uint64) error { @@ -56,25 +72,37 @@ func ungroupedPod(ns, name string, annotations map[string]string) *corev1.Pod { } } -func TestParseJobID(t *testing.T) { +func TestParseJobIDs(t *testing.T) { cases := []struct { name string ann map[string]string - want uint64 + want []uint64 wantOK bool }{ - {"present", ann("42"), 42, true}, - {"absent", map[string]string{}, 0, false}, - {"nil map", nil, 0, false}, - {"empty value", ann(""), 0, false}, - {"garbage", ann("not-a-number"), 0, false}, - {"zero", ann("0"), 0, true}, + {"single", ann("42"), []uint64{42}, true}, + {"multiple", ann("42,7"), []uint64{42, 7}, true}, + {"spaces", ann("42, 7 ,9"), []uint64{42, 7, 9}, true}, + {"absent", map[string]string{}, nil, false}, + {"nil map", nil, nil, false}, + {"empty value", ann(""), nil, false}, + {"garbage", ann("not-a-number"), nil, false}, + {"zero", ann("0"), []uint64{0}, true}, } for _, c := range cases { t.Run(c.name, func(t *testing.T) { - got, ok := parseJobID(c.ann) - if got != c.want || ok != c.wantOK { - t.Fatalf("parseJobID(%v) = (%d,%t), want (%d,%t)", c.ann, got, ok, c.want, c.wantOK) + got, ok := parseJobIDs(c.ann) + if ok != c.wantOK { + t.Fatalf("parseJobIDs(%v) ok = %t, want %t", c.ann, ok, c.wantOK) + } + if ok { + if len(got) != len(c.want) { + t.Fatalf("parseJobIDs(%v) = %v, want %v", c.ann, got, c.want) + } + for i := range got { + if got[i] != c.want[i] { + t.Fatalf("parseJobIDs(%v) = %v, want %v", c.ann, got, c.want) + } + } } }) } @@ -94,7 +122,7 @@ func TestGroupKey(t *testing.T) { func TestCancelGroupPrefersAnnotation(t *testing.T) { m := &fakeMatcher{} f := newTestFluence(m) - f.placement["default/training"] = groupAlloc{jobid: 42} + f.placement["default/training"] = groupAlloc{jobids: []uint64{42}} f.cancelGroup("default/training", ann("99")) @@ -111,7 +139,7 @@ func TestCancelGroupPrefersAnnotation(t *testing.T) { func TestCancelGroupMemoFallback(t *testing.T) { m := &fakeMatcher{} f := newTestFluence(m) - f.placement["default/solo"] = groupAlloc{jobid: 7} + f.placement["default/solo"] = groupAlloc{jobids: []uint64{7}} f.cancelGroup("default/solo", nil) @@ -137,7 +165,7 @@ func TestCancelGroupUnknownNoop(t *testing.T) { func TestCancelGroupIdempotent(t *testing.T) { m := &fakeMatcher{} f := newTestFluence(m) - f.placement["default/solo"] = groupAlloc{jobid: 7} + f.placement["default/solo"] = groupAlloc{jobids: []uint64{7}} f.cancelGroup("default/solo", nil) // frees, deletes memo f.cancelGroup("default/solo", nil) // memo gone, no annotation -> no-op @@ -151,7 +179,7 @@ func TestCancelGroupIdempotent(t *testing.T) { func TestCancelGroupMatcherErrorStillDeletes(t *testing.T) { m := &fakeMatcher{cancelErr: errors.New("flux boom")} f := newTestFluence(m) - f.placement["default/solo"] = groupAlloc{jobid: 7} + f.placement["default/solo"] = groupAlloc{jobids: []uint64{7}} f.cancelGroup("default/solo", ann("7")) @@ -244,3 +272,70 @@ func TestOnPodGroupDeletedTombstone(t *testing.T) { t.Fatalf("cancelled = %v, want [4] from PodGroup tombstone", m.cancelled) } } + +// matchGroup combines a compute allocation (nodes) and a device allocation +// (backend) into one placement and records both jobids. +func TestMatchGroupCombinesAllocations(t *testing.T) { + m := &fakeMatcher{ + matchResults: []graph.MatchAllocateRequest{ + {Number: 10, Allocation: `{"graph":{"nodes":[{"metadata":{"type":"node","name":"node-a","properties":{"virtual=false":""}}}]}}`}, + {Number: 11, Allocation: `{"graph":{"nodes":[{"metadata":{"type":"node","name":"rigetti","properties":{"virtual=true":""}}}]}}`}, + }, + } + f := newTestFluence(m) + + place, jobids, status := f.matchGroup(twoSpecs()) + if !status.IsSuccess() { + t.Fatalf("status = %v, want success", status) + } + if len(jobids) != 2 || jobids[0] != 10 || jobids[1] != 11 { + t.Fatalf("jobids = %v, want [10 11]", jobids) + } + // Both allocations contributed (exact node/backend split is + // PlacementFromAllocation's job, refined in the placement rewrite). + if len(place.Nodes) == 0 { + t.Errorf("expected at least one bind node, got %v", place.Nodes) + } +} + +// If a later match fails, every already-successful allocation in the group is +// cancelled (all-or-nothing) and the group is Unschedulable. +func TestMatchGroupAllOrNothing(t *testing.T) { + m := &fakeMatcher{ + matchResults: []graph.MatchAllocateRequest{ + {Number: 20, Allocation: `{"graph":{"nodes":[{"metadata":{"type":"node","name":"node-a","properties":{"virtual=false":""}}}]}}`}, + {}, + }, + matchErrs: []error{nil, errors.New("no qpu available")}, + } + f := newTestFluence(m) + + _, _, status := f.matchGroup(twoSpecs()) + if status.IsSuccess() { + t.Fatal("expected Unschedulable when the device match fails") + } + if len(m.cancelled) != 1 || m.cancelled[0] != 20 { + t.Fatalf("cancelled = %v, want [20] (roll back the successful compute match)", m.cancelled) + } +} + +// A multi-jobid group records a comma-separated annotation and cancels every id. +func TestCancelGroupMultipleJobids(t *testing.T) { + m := &fakeMatcher{} + f := newTestFluence(m) + + f.cancelGroup("default/training", ann("10,11")) + + if len(m.cancelled) != 2 || m.cancelled[0] != 10 || m.cancelled[1] != 11 { + t.Fatalf("cancelled = %v, want [10 11]", m.cancelled) + } +} + +// twoSpecs returns two minimal jobspecs (compute + device) to drive matchGroup +// tests; their content doesn't matter since the fake matcher scripts results. +func twoSpecs() []*jobspec.Jobspec { + return []*jobspec.Jobspec{ + {Version: 9999}, + {Version: 9999}, + } +} diff --git a/pkg/graph/allocation.go b/pkg/graph/allocation.go index b3ff161..2d77784 100644 --- a/pkg/graph/allocation.go +++ b/pkg/graph/allocation.go @@ -5,15 +5,22 @@ import ( "fmt" ) -// graphVertices is the subset of a JGF graph we need: the type and name of each +// vertexMeta is the subset of a JGF vertex metadata fluence needs from an +// allocation: type, name, and the RFC 31 properties object. Properties are +// stored as composed "key=value" keys (matching how the graph is built and how +// the rv1 writer emits them); the value half of the JSON object is unused. +type vertexMeta struct { + Type string `json:"type"` + Name string `json:"name"` + Properties map[string]string `json:"properties"` +} + +// graphVertices is the subset of a JGF graph we need: the metadata of each // vertex. It appears at the top level of a "jgf" allocation, and under the // "scheduling" key of an "rv1" allocation. type graphVertices struct { Nodes []struct { - Metadata struct { - Type string `json:"type"` - Name string `json:"name"` - } `json:"metadata"` + Metadata vertexMeta `json:"metadata"` } `json:"nodes"` } @@ -35,44 +42,78 @@ type allocation struct { // vertices returns the allocated vertices regardless of match format, preferring // the rv1 scheduling key and falling back to a top-level jgf graph. -func (a allocation) vertices() []struct { - Metadata struct { - Type string `json:"type"` - Name string `json:"name"` - } `json:"metadata"` -} { +func (a allocation) vertices() []vertexMeta { + src := a.Graph.Nodes if len(a.Scheduling.Graph.Nodes) > 0 { - return a.Scheduling.Graph.Nodes + src = a.Scheduling.Graph.Nodes + } + out := make([]vertexMeta, 0, len(src)) + for _, n := range src { + out = append(out, n.Metadata) } - return a.Graph.Nodes + return out } -// BackendFromAllocation returns the name of the first vertex of vertexType -// (e.g. "qpu" or "node") in a Fluxion allocation. -func BackendFromAllocation(alloc string, vertexType string) (string, error) { - names, err := NamesFromAllocation(alloc, vertexType) - if err != nil { - return "", err +// AllocatedNode is a node-typed vertex from an allocation, with the properties +// fluence classifies it by (virtual=true/false, class=..., user attributes). +type AllocatedNode struct { + Name string + Properties map[string]string +} + +// HasProperty reports whether the node carries the given composed property key +// (e.g. "virtual=true"). +func (n AllocatedNode) HasProperty(key string) bool { + _, ok := n.Properties[key] + return ok +} + +// NodesFromAllocation returns every node-typed vertex in an allocation together +// with its properties, so callers can classify them (physical vs virtual) by the +// composed marker keys rather than by vertex type. Under the virtual-resource +// model every allocatable thing is type "node"; the virtual marker, not the +// type, distinguishes a compute node from a virtual backend. +func NodesFromAllocation(alloc string) ([]AllocatedNode, error) { + var a allocation + if err := json.Unmarshal([]byte(alloc), &a); err != nil { + return nil, fmt.Errorf("parse allocation: %w", err) } - if len(names) == 0 { - return "", fmt.Errorf("no %q vertex found in allocation", vertexType) + var nodes []AllocatedNode + for _, v := range a.vertices() { + if v.Type == "node" { + nodes = append(nodes, AllocatedNode{Name: v.Name, Properties: v.Properties}) + } } - return names[0], nil + return nodes, nil } // NamesFromAllocation returns the names of every vertex of vertexType in a -// Fluxion allocation (jgf or rv1). Used to map an allocation onto cluster node -// names (vertexType "node") for pod placement, or onto quantum backends ("qpu"). +// Fluxion allocation (jgf or rv1). Retained for callers that key on a concrete +// vertex type (e.g. counting "qpu" children); placement now classifies node +// vertices via NodesFromAllocation and the virtual marker instead. func NamesFromAllocation(alloc string, vertexType string) ([]string, error) { var a allocation if err := json.Unmarshal([]byte(alloc), &a); err != nil { return nil, fmt.Errorf("parse allocation: %w", err) } var names []string - for _, n := range a.vertices() { - if n.Metadata.Type == vertexType { - names = append(names, n.Metadata.Name) + for _, v := range a.vertices() { + if v.Type == vertexType { + names = append(names, v.Name) } } return names, nil } + +// BackendFromAllocation returns the name of the first vertex of vertexType +// (e.g. "qpu" or "node") in a Fluxion allocation. +func BackendFromAllocation(alloc string, vertexType string) (string, error) { + names, err := NamesFromAllocation(alloc, vertexType) + if err != nil { + return "", err + } + if len(names) == 0 { + return "", fmt.Errorf("no %q vertex found in allocation", vertexType) + } + return names[0], nil +} diff --git a/pkg/graph/allocation_test.go b/pkg/graph/allocation_test.go index dc67118..1ddbfca 100644 --- a/pkg/graph/allocation_test.go +++ b/pkg/graph/allocation_test.go @@ -37,3 +37,39 @@ func TestNamesFromAllocationRV1(t *testing.T) { t.Fatalf("rv1 qpu parse: %q %v", be, err) } } + +// markedAlloc has node vertices carrying composed virtual markers plus a qpu +// child — the shape PlacementFromAllocation classifies. +const markedAlloc = `{"graph":{"nodes":[ + {"metadata":{"type":"node","name":"kind-worker","properties":{"virtual=false":""}}}, + {"metadata":{"type":"node","name":"rigetti","properties":{"virtual=true":"","class=qdevice":""}}}, + {"metadata":{"type":"qpu","name":"qpu0"}}]}}` + +// NodesFromAllocation returns only node-typed vertices, each with its composed +// property keys, so callers classify by the virtual marker rather than the type. +func TestNodesFromAllocation(t *testing.T) { + nodes, err := NodesFromAllocation(markedAlloc) + if err != nil { + t.Fatal(err) + } + if len(nodes) != 2 { + t.Fatalf("got %d node vertices, want 2 (qpu is not a node)", len(nodes)) + } + + byName := map[string]AllocatedNode{} + for _, n := range nodes { + byName[n.Name] = n + } + if !byName["kind-worker"].HasProperty("virtual=false") { + t.Errorf("kind-worker missing virtual=false: %v", byName["kind-worker"].Properties) + } + if !byName["rigetti"].HasProperty("virtual=true") { + t.Errorf("rigetti missing virtual=true: %v", byName["rigetti"].Properties) + } + if !byName["rigetti"].HasProperty("class=qdevice") { + t.Errorf("rigetti missing class=qdevice: %v", byName["rigetti"].Properties) + } + if byName["kind-worker"].HasProperty("virtual=true") { + t.Error("kind-worker should not carry virtual=true") + } +} diff --git a/pkg/jgf/jgf.go b/pkg/jgf/jgf.go index acbb75d..05b0507 100644 --- a/pkg/jgf/jgf.go +++ b/pkg/jgf/jgf.go @@ -10,7 +10,6 @@ import ( "encoding/json" "fmt" "sort" - "strings" ) const containment = "containment" @@ -30,6 +29,7 @@ type Vertex struct { exclusive bool props map[string]any nodeProps map[string]string + rank int64 } // Name is the vertex name (basename + per-type index, or an explicit name). @@ -69,6 +69,13 @@ type Options struct { // pruning (by_constraint) and queries match against. Keys/values are // strings; a bare tag is value "" (matching is by key presence). NodeProperties map[string]string + // Rank is the flux execution rank for a node-typed vertex. The rv1 match + // writer builds its R_lite/nodelist from node vertices keyed by rank, and + // cannot emit a vertex whose rank is -1, so every node (physical or virtual) + // needs a real, distinct, non-negative rank. Nil leaves the default (-1), + // which is correct for non-node vertices (cluster, core, memory, ...) that + // never appear in the nodelist. + Rank *int64 } // AddRoot creates a top-level vertex (typically the cluster). @@ -107,6 +114,10 @@ func (b *Builder) add(parent *Vertex, typ, basename string, opts Options) *Verte exclusive: opts.Exclusive, props: opts.Properties, nodeProps: opts.NodeProperties, + rank: -1, + } + if opts.Rank != nil { + v.rank = *opts.Rank } if parent == nil { v.path = "/" + name @@ -137,7 +148,7 @@ func (m metadata) MarshalJSON() ([]byte, error) { "name": m.v.name, "id": m.v.id, "uniq_id": m.v.uniqID, - "rank": -1, + "rank": m.v.rank, "exclusive": m.v.exclusive, "unit": m.v.unit, "size": m.v.size, @@ -199,19 +210,15 @@ type graph struct { Edges []edge `json:"edges"` } -// Doc is a complete JGF document. -type Doc struct { +// Document is a complete JGF document. +type Document struct { Graph graph `json:"graph"` } -// Doc assembles the accumulated vertices and edges into a JGF document. -func (b *Builder) Doc() Doc { - d := Doc{} +// Document assembles the accumulated vertices and edges into a JGF document. +func (b *Builder) Document() Document { + d := Document{} for _, v := range b.vertices { - // Skip the control plane - if strings.Contains(v.name, "control-plane") { - continue - } d.Graph.Nodes = append(d.Graph.Nodes, node{ID: v.key, Metadata: metadata{v: v}}) } for _, e := range b.edges { @@ -226,5 +233,5 @@ func (b *Builder) Doc() Doc { // JSON renders the document as indented JGF. func (b *Builder) JSON() ([]byte, error) { - return json.MarshalIndent(b.Doc(), "", " ") + return json.MarshalIndent(b.Document(), "", " ") } diff --git a/pkg/placement/placement.go b/pkg/placement/placement.go index 397b0a9..9648cda 100644 --- a/pkg/placement/placement.go +++ b/pkg/placement/placement.go @@ -27,8 +27,41 @@ const ( // the pod itself for an ungrouped pod — so the allocation can be cancelled // when that object is deleted, and replayed on scheduler restart. JobIDAnnotation = "fluence.flux-framework.org/jobid" + + // AttributeAnnotationPrefix namespaces the matched backend's attributes when + // the scheduler stamps them onto the pod (e.g. + // fluence.flux-framework.org/attr-region=us-east-1). The webhook injects one + // downward-API env per such annotation so the workload reads exactly what it + // matched. Kept distinct from FluxionResourcePrefix (which marks resource + // *requests*) so request and result annotations never collide. + AttributeAnnotationPrefix = "fluence.flux-framework.org/attr-" + + // EnvVarPrefix is the normalized environment-variable namespace injected into + // the workload. The backend name becomes BACKEND; each attribute + // becomes (uppercased). A vendor-agnostic container reads + // these common names regardless of which backend it matched. + EnvVarPrefix = "FLUXION_" ) +// EnvVarName maps an attribute key to its normalized environment-variable name: +// uppercased, non-alphanumeric runes to underscores, under EnvVarPrefix. E.g. +// "region" -> "FLUXION_REGION", "price/min" -> "FLUXION_PRICE_MIN". +func EnvVarName(attrKey string) string { + var b strings.Builder + b.WriteString(EnvVarPrefix) + for _, r := range attrKey { + switch { + case r >= 'a' && r <= 'z': + b.WriteRune(r - 'a' + 'A') + case (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9'): + b.WriteRune(r) + default: + b.WriteByte('_') + } + } + return b.String() +} + // PodGroupName returns the native (Kubernetes 1.36) scheduling-group name a pod // belongs to, from spec.schedulingGroup.podGroupName, or "" if the pod is not // part of a group. This is the first-class field that links a Pod to its @@ -40,14 +73,37 @@ func PodGroupName(pod *corev1.Pod) string { return "" } +// ComputeProperty / VirtualProperty mirror the markers cluster stamps on graph +// node vertices (composed key=value form). A compute jobspec constrains to +// virtual=false nodes; a device jobspec constrains to virtual=true nodes. Kept +// here (not imported from cluster) to avoid an import cycle: cluster imports +// placement. +const ( + // VirtualPropertyTrue selects configured virtual resource nodes. + VirtualPropertyTrue = "virtual=true" + // VirtualPropertyFalse selects physical compute nodes. + VirtualPropertyFalse = "virtual=false" + // ClassPropertyPrefix composes a class= constraint selecting a virtual node by + // its configured resource type (e.g. class=qpu). Mirrors how the graph builder + // stamps the class property; kept here (not imported from cluster) to avoid an + // import cycle (cluster imports placement). + ClassPropertyPrefix = "class=" +) + +// ComposeClassProperty builds the class= constraint string for a resource type. +func ComposeClassProperty(resourceType string) string { + return ClassPropertyPrefix + resourceType +} + +// computeTypes are the Fluxion graph types that live under a physical +// (virtual=false) compute node. Everything requested via the +// fluxion.flux-framework.org/ prefix is a virtual device type instead. +var computeTypes = map[string]bool{"core": true, "memory": true, "gpu": true} + // podResources distills a pod's container requests into Fluxion resource counts -// keyed by Fluxion graph type (e.g. "core", "gpu", "qpu", "qubit"). -// -// Kubernetes names its native resources (cpu, memory, nvidia.com/gpu), so those -// get a small fixed mapping to graph types. Every resource named -// fluxion.flux-framework.org/ is passed through generically as , -// with no knowledge of what the type means — if the graph has it as a count, -// Fluxion will verify it. +// keyed by Fluxion graph type. Native Kubernetes resources (cpu, memory, +// nvidia.com/gpu) map to compute types; every fluxion.flux-framework.org/ +// request passes through generically as (a virtual device type). func podResources(p *corev1.Pod) map[string]int { counts := map[string]int{} for i := range p.Spec.Containers { @@ -66,74 +122,243 @@ func podResources(p *corev1.Pod) map[string]int { } } - // Every pod runs on a node, so always request at least one core, even when the - // pod also asks for an exotic resource (qpu/qubit). Without this a qpu-only pod - // produces a slot with no compute and Fluxion allocates a bare backend with no - // node to land the pod on. + // Every pod runs on a node, so always request at least one core. Without this + // a device-only pod produces a compute slot with no resources and Fluxion has + // no node to land the probing pod on. if counts["core"] == 0 { counts["core"] = 1 } return counts } -// JobspecForGroup builds a Fluxion jobspec for a whole pod group: a slot per pod -// (count = group size), each holding the per-pod resources as `with` entries — -// one per requested Fluxion type. A hybrid pod (e.g. cores + a qpu) produces a -// slot with both, so classical and quantum are requested together. The group is -// assumed homogeneous (same shape per pod); heterogeneous groups are a TODO. -func JobspecForGroup(groupName string, pods []corev1.Pod) (*jobspec.Jobspec, error) { - if len(pods) == 0 { - return nil, fmt.Errorf("pod group %q has no pods", groupName) +// splitResources separates a pod's resource counts into the compute resources +// (core/memory/gpu, satisfied by a virtual=false node) and the virtual device +// resources (everything requested via the fluxion prefix, satisfied by a +// virtual=true node). Returns counts keyed by graph type for each group. +func splitResources(counts map[string]int) (compute, devices map[string]int) { + compute = map[string]int{} + devices = map[string]int{} + for t, c := range counts { + if c <= 0 { + continue + } + if computeTypes[t] { + compute[t] = c + } else { + devices[t] = c + } } - counts := podResources(&pods[0]) + return compute, devices +} - // Deterministic order for stable jobspecs/tests. +// withEntries renders a count map into sorted jobspec `with` entries (stable +// output for tests and reproducible jobspecs). +func withEntries(counts map[string]int) []jobspec.Resource { types := make([]string, 0, len(counts)) for t := range counts { types = append(types, t) } sort.Strings(types) - var with []jobspec.Resource for _, t := range types { - if counts[t] > 0 { - with = append(with, jobspec.Resource{Type: t, Count: counts[t]}) - } + with = append(with, jobspec.Resource{Type: t, Count: counts[t]}) } + return with +} + +// systemAttributes builds the attributes.system block: a hold-until-cancel +// allocation (duration 0 runs to graph end) plus an RFC 31 property constraint +// selecting the eligible node set. properties is the AND-set of composed +// key=value property strings a matched node must carry. +func systemAttributes(properties []string) map[string]interface{} { + return map[string]interface{}{ + "system": map[string]interface{}{ + // duration 0 => hold the allocation until we explicitly Cancel. + "duration": 0, + "constraints": map[string]interface{}{ + "properties": properties, + }, + }, + } +} +// computeJobspec builds the physical-compute jobspec for a group: one slot per +// pod holding the compute resources, constrained to virtual=false nodes. This is +// the only jobspec for a group that requests no virtual devices. +func computeJobspec(groupName string, slots int, compute map[string]int) *jobspec.Jobspec { return &jobspec.Jobspec{ Version: 9999, Resources: []jobspec.Resource{{ Type: "slot", - Count: len(pods), + Count: slots, Label: "default", - With: with, + With: withEntries(compute), }}, + Attributes: systemAttributes([]string{VirtualPropertyFalse}), Tasks: []jobspec.Task{{ Command: []string{groupName}, Slot: "default", Count: map[string]int{"per_slot": 1}, }}, - }, nil + } } -// Placement is the result of matching a pod group: the cluster nodes to bind to -// (one per slot) and, for quantum groups, the allocated backend. +// deviceJobspec builds a jobspec selecting a single virtual device type. Every +// configured virtual resource is a node carrying class= (and the classes +// of its descendants, so a nested type is reachable), so a device is selected by +// constraining to a virtual node of the requested class. count is the requested +// quantity. +// +// The constraint is virtual=true (scope to virtual backends, not physical nodes) +// AND class= (the requested resource type). The slot requests `node` +// because every virtual resource is a node; the class constraint — not the `with` +// type — picks which one. A nested type (e.g. qpu under a qdevice node) is +// reachable because every ancestor node also advertises class= for the +// types beneath it, so the global constraint does not prune the path. +func deviceJobspec(groupName, deviceType string, count int) *jobspec.Jobspec { + return &jobspec.Jobspec{ + Version: 9999, + Resources: []jobspec.Resource{{ + Type: "slot", + Count: 1, + Label: "device", + With: []jobspec.Resource{{Type: "node", Count: count}}, + }}, + Attributes: systemAttributes([]string{ + VirtualPropertyTrue, + ComposeClassProperty(deviceType), + }), + Tasks: []jobspec.Task{{ + Command: []string{groupName}, + Slot: "device", + Count: map[string]int{"per_slot": 1}, + }}, + } +} + +// JobspecsForGroup builds the set of Fluxion jobspecs to match for a pod group, +// each held independently (duration 0, released by Cancel) and combined all-or- +// nothing by the caller: +// +// - exactly one compute jobspec (slot per pod, virtual=false) — always present, +// so a plain pod or group with no virtual resources yields a single match; +// - one device jobspec per distinct requested virtual resource type +// (constraint virtual=true; the requested type+count rides the slot's `with`). +// +// knownDevices is the set of device types the graph actually models (the +// FluxionResourceNames the device plugin advertises, suffixes only). A request +// for a type not in the graph is a hard error, caught here rather than as an +// opaque match failure. A nil/empty knownDevices with no device requests is +// fine (classical-only). +func JobspecsForGroup( + groupName string, + pods []corev1.Pod, + knownDevices map[string]bool, +) ([]*jobspec.Jobspec, error) { + if len(pods) == 0 { + return nil, fmt.Errorf("pod group %q has no pods", groupName) + } + counts := podResources(&pods[0]) + compute, devices := splitResources(counts) + + specs := []*jobspec.Jobspec{computeJobspec(groupName, len(pods), compute)} + + // Deterministic device order for stable output. + deviceTypes := make([]string, 0, len(devices)) + for t := range devices { + deviceTypes = append(deviceTypes, t) + } + sort.Strings(deviceTypes) + + for _, t := range deviceTypes { + if len(knownDevices) > 0 && !knownDevices[t] { + return nil, fmt.Errorf( + "pod group %q requests virtual resource %q which is not modeled in the resources graph", + groupName, t) + } + if knownDevices == nil { + return nil, fmt.Errorf( + "pod group %q requests virtual resource %q but no resources graph is configured", + groupName, t) + } + specs = append(specs, deviceJobspec(groupName, t, devices[t])) + } + return specs, nil +} + +// Placement is the result of matching one of a group's jobspecs. Nodes are the +// physical (virtual=false) compute nodes a pod binds to; Backend is the virtual +// (virtual=true) resource's identity, surfaced to the pod as env. A single +// allocation yields one or the other: the compute match yields nodes, a device +// match yields a backend. type Placement struct { - Nodes []string // allocated cluster node names - Backend string // allocated qpu/backend name (quantum groups only) + Nodes []string // physical compute node names (virtual=false) + Backend string // virtual backend identity (virtual=true), if any + // BackendAttributes are the matched virtual resource's user attributes + // (region, qubits, ...), decomposed from the backend node's namespaced + // properties. These are injected into the pod as env so the workload sees + // exactly what it matched/queried — the same set that is filterable is also + // readable back. + BackendAttributes map[string]string +} + +// decomposeProperty reverses ComposeProperty: "key=value" -> (key, value, true); +// a bare "key" -> (key, "", true). Used to recover attributes from a backend +// node's composed property keys. +func decomposeProperty(prop string) (key, value string) { + if i := strings.IndexByte(prop, '='); i >= 0 { + return prop[:i], prop[i+1:] + } + return prop, "" } -// PlacementFromAllocation parses a JGF allocation into node and backend names. +// PlacementFromAllocation parses one Fluxion allocation and classifies its +// node-typed vertices by the virtual marker property: virtual=false nodes are +// physical compute (bind targets), virtual=true nodes are virtual backends +// (their name is the backend identity injected into the pod). Everything is +// type "node" now, so the marker — not the vertex type — does the split. +// +// A node carrying neither marker is treated as a physical compute node, so a +// plain graph built without markers still binds correctly. For the chosen +// backend, its user attributes (the fluxion.flux-framework.org/= +// properties) are decomposed into BackendAttributes for env injection. func PlacementFromAllocation(alloc string) (Placement, error) { - nodes, err := graph.NamesFromAllocation(alloc, "node") + nodes, err := graph.NodesFromAllocation(alloc) if err != nil { return Placement{}, err } - backends, _ := graph.NamesFromAllocation(alloc, "qpu") - p := Placement{Nodes: nodes} - if len(backends) > 0 { - p.Backend = backends[0] + var p Placement + for _, n := range nodes { + if n.HasProperty(VirtualPropertyTrue) { + // First virtual node is the backend identity (one backend per group). + if p.Backend != "" { + continue + } + p.Backend = n.Name + p.BackendAttributes = backendAttributes(n.Properties) + continue + } + // virtual=false, or unmarked: a physical compute node to bind. + p.Nodes = append(p.Nodes, n.Name) } return p, nil } + +// backendAttributes extracts the user attributes from a backend node's composed +// property keys: every property of the form +// "fluxion.flux-framework.org/=" becomes -> . Reserved +// markers (virtual=..., class=...) are skipped. +func backendAttributes(props map[string]string) map[string]string { + var attrs map[string]string + for prop := range props { + if !strings.HasPrefix(prop, FluxionResourcePrefix) { + continue + } + key, value := decomposeProperty(strings.TrimPrefix(prop, FluxionResourcePrefix)) + if attrs == nil { + attrs = map[string]string{} + } + attrs[key] = value + } + return attrs +} diff --git a/pkg/placement/placement_test.go b/pkg/placement/placement_test.go index d5e8571..36e3a65 100644 --- a/pkg/placement/placement_test.go +++ b/pkg/placement/placement_test.go @@ -18,7 +18,8 @@ func podWith(name string, req corev1.ResourceList) corev1.Pod { func qty(n int64) resource.Quantity { return *resource.NewQuantity(n, resource.DecimalSI) } -// withType returns the count for a given Fluxion type in the slot's `with`. +// withType returns the count for a given Fluxion type in a jobspec slot's +// `with`. func withType(js *jobspec.Jobspec, t string) (int, bool) { for _, w := range js.Resources[0].With { if w.Type == t { @@ -28,17 +29,51 @@ func withType(js *jobspec.Jobspec, t string) (int, bool) { return 0, false } -func TestClassical(t *testing.T) { +// constraintProps returns the constraint property list from a jobspec's +// attributes.system.constraints.properties. +func constraintProps(t *testing.T, js *jobspec.Jobspec) []string { + t.Helper() + sys, ok := js.Attributes["system"].(map[string]interface{}) + if !ok { + t.Fatalf("no attributes.system: %#v", js.Attributes) + } + cons, ok := sys["constraints"].(map[string]interface{}) + if !ok { + t.Fatalf("no constraints: %#v", sys) + } + props, ok := cons["properties"].([]string) + if !ok { + t.Fatalf("properties not []string: %#v", cons["properties"]) + } + return props +} + +func hasProp(props []string, want string) bool { + for _, p := range props { + if p == want { + return true + } + } + return false +} + +// A classical group (no virtual resources) yields exactly one jobspec: the +// compute slot, constrained to virtual=false. +func TestClassicalSingleMatch(t *testing.T) { pods := []corev1.Pod{ podWith("p0", corev1.ResourceList{corev1.ResourceCPU: qty(4), "nvidia.com/gpu": qty(1)}), podWith("p1", corev1.ResourceList{corev1.ResourceCPU: qty(4), "nvidia.com/gpu": qty(1)}), } - js, err := JobspecForGroup("grp", pods) + specs, err := JobspecsForGroup("grp", pods, nil) if err != nil { t.Fatal(err) } + if len(specs) != 1 { + t.Fatalf("classical group should yield 1 jobspec, got %d", len(specs)) + } + js := specs[0] if js.Resources[0].Count != 2 { - t.Fatalf("slot count = %d, want 2", js.Resources[0].Count) + t.Errorf("slot count = %d, want 2", js.Resources[0].Count) } if c, _ := withType(js, "core"); c != 4 { t.Errorf("core = %d, want 4", c) @@ -46,91 +81,152 @@ func TestClassical(t *testing.T) { if c, _ := withType(js, "gpu"); c != 1 { t.Errorf("gpu = %d, want 1", c) } - if _, ok := withType(js, "qpu"); ok { - t.Error("classical pod should not request qpu") + if !hasProp(constraintProps(t, js), VirtualPropertyFalse) { + t.Errorf("compute jobspec must constrain virtual=false; got %v", constraintProps(t, js)) } } -func TestGenericQuantumCount(t *testing.T) { - // fluxion.flux-framework.org/qpu: 1 -> a qpu count, with no per-type code. - p := podWith("q", corev1.ResourceList{FluxionResourcePrefix + "qpu": qty(1)}) - js, err := JobspecForGroup("qgrp", []corev1.Pod{p}) +// A device request yields a second jobspec constrained to virtual=true (the +// requested type rides the slot's `with`, not a class= constraint), while the +// compute jobspec stays virtual=false and the device type does NOT leak into the +// compute slot. +func TestDeviceProducesSecondMatch(t *testing.T) { + p := podWith("q", corev1.ResourceList{ + corev1.ResourceCPU: qty(1), + FluxionResourcePrefix + "qpu": qty(1), + }) + known := map[string]bool{"qpu": true} + specs, err := JobspecsForGroup("qgrp", []corev1.Pod{p}, known) if err != nil { t.Fatal(err) } - if c, ok := withType(js, "qpu"); !ok || c != 1 { - t.Fatalf("qpu = %d (ok=%v), want 1", c, ok) + if len(specs) != 2 { + t.Fatalf("expected 2 jobspecs (compute + device), got %d", len(specs)) + } + + compute := specs[0] + if !hasProp(constraintProps(t, compute), VirtualPropertyFalse) { + t.Errorf("compute constraint = %v, want virtual=false", constraintProps(t, compute)) + } + if _, ok := withType(compute, "qpu"); ok { + t.Error("qpu must not appear in the compute jobspec") + } + if c, _ := withType(compute, "core"); c != 1 { + t.Errorf("compute core = %d, want 1", c) + } + + device := specs[1] + props := constraintProps(t, device) + if !hasProp(props, VirtualPropertyTrue) { + t.Errorf("device constraint must include virtual=true; got %v", props) + } + // The device is selected by class=: every virtual resource is + // a node carrying its class, so the constraint picks the right one. The slot + // requests a node (every virtual resource is a node), count from the request. + if !hasProp(props, "class=qpu") { + t.Errorf("device constraint must include class=qpu; got %v", props) } - // no classical core forced on an exotic-only request - if _, ok := withType(js, "core"); !ok { - t.Error("quantum-only pod is currently required to request a core") + if len(props) != 2 { + t.Errorf("device constraint should be [virtual=true class=qpu]; got %v", props) + } + if c, ok := withType(device, "node"); !ok || c != 1 { + t.Errorf("device should request node count 1; got %d (ok=%v)", c, ok) } } -func TestGenericQubitCount(t *testing.T) { - // "at least 156 qubits" expressed as a count (Fluxion count match is >=). - p := podWith("q", corev1.ResourceList{FluxionResourcePrefix + "qubit": qty(156)}) - js, err := JobspecForGroup("qubits", []corev1.Pod{p}) +// A device-only request still forces a compute jobspec (the probing pod needs a +// node), so there are two matches: compute (core=1, virtual=false) and device. +func TestDeviceOnlyStillForcesCompute(t *testing.T) { + p := podWith("q", corev1.ResourceList{FluxionResourcePrefix + "qpu": qty(1)}) + specs, err := JobspecsForGroup("qonly", []corev1.Pod{p}, map[string]bool{"qpu": true}) if err != nil { t.Fatal(err) } - if c, ok := withType(js, "qubit"); !ok || c != 156 { - t.Fatalf("qubit = %d (ok=%v), want 156", c, ok) + if len(specs) != 2 { + t.Fatalf("expected 2 jobspecs, got %d", len(specs)) + } + if c, _ := withType(specs[0], "core"); c != 1 { + t.Errorf("forced compute core = %d, want 1", c) } } -func TestHybrid(t *testing.T) { - // cores AND a qpu in the same pod -> both appear in the slot. - p := podWith("h", corev1.ResourceList{ - corev1.ResourceCPU: qty(2), +// Requesting a device type the graph does not model is a hard error. +func TestUnknownDeviceErrors(t *testing.T) { + p := podWith("q", corev1.ResourceList{FluxionResourcePrefix + "fpga": qty(1)}) + _, err := JobspecsForGroup("grp", []corev1.Pod{p}, map[string]bool{"qpu": true}) + if err == nil { + t.Fatal("expected an error for an unmodeled device type") + } +} + +// duration is 0 (hold until cancel) on every generated jobspec. +func TestHoldDurationZero(t *testing.T) { + p := podWith("q", corev1.ResourceList{ + corev1.ResourceCPU: qty(1), FluxionResourcePrefix + "qpu": qty(1), }) - js, err := JobspecForGroup("hyb", []corev1.Pod{p}) + specs, err := JobspecsForGroup("g", []corev1.Pod{p}, map[string]bool{"qpu": true}) if err != nil { t.Fatal(err) } - if c, _ := withType(js, "core"); c != 2 { - t.Errorf("core = %d, want 2", c) - } - if c, _ := withType(js, "qpu"); c != 1 { - t.Errorf("qpu = %d, want 1", c) + for i, js := range specs { + sys := js.Attributes["system"].(map[string]interface{}) + if sys["duration"] != 0 { + t.Errorf("jobspec %d duration = %v, want 0", i, sys["duration"]) + } } } -func TestPlacementFromAllocation(t *testing.T) { +// A compute allocation: node vertices carrying virtual=false are bind targets. +func TestPlacementComputeNodes(t *testing.T) { alloc := `{"graph":{"nodes":[ - {"metadata":{"type":"node","name":"node-a"}}, + {"metadata":{"type":"node","name":"node-a","properties":{"virtual=false":""}}}, {"metadata":{"type":"core","name":"core0"}}, - {"metadata":{"type":"node","name":"node-b"}}, - {"metadata":{"type":"qpu","name":"ibm_fez"}}]}}` + {"metadata":{"type":"node","name":"node-b","properties":{"virtual=false":""}}}]}}` p, err := PlacementFromAllocation(alloc) if err != nil { t.Fatal(err) } if len(p.Nodes) != 2 || p.Nodes[0] != "node-a" || p.Nodes[1] != "node-b" { - t.Fatalf("nodes = %v", p.Nodes) + t.Fatalf("nodes = %v, want [node-a node-b]", p.Nodes) } - if p.Backend != "ibm_fez" { - t.Fatalf("backend = %q", p.Backend) + if p.Backend != "" { + t.Fatalf("compute allocation should have no backend, got %q", p.Backend) } } -func TestPlacementQuantumOnly(t *testing.T) { - // A pure-quantum allocation has a qpu (under qgateway) but NO node vertex. - // Nodes must be empty and Backend set — fluence then imposes no node constraint. +// A device allocation: the virtual=true node is the backend identity, not a bind +// target. Its qpu/qubit children do not become the backend name. +func TestPlacementVirtualBackend(t *testing.T) { alloc := `{"graph":{"nodes":[ - {"metadata":{"type":"cluster","name":"kind"}}, - {"metadata":{"type":"qgateway","name":"qgateway0"}}, - {"metadata":{"type":"qpu","name":"ibm_marrakesh"}}, + {"metadata":{"type":"node","name":"rigetti_cepheus","properties":{"virtual=true":"","class=qdevice":""}}}, + {"metadata":{"type":"qpu","name":"qpu0"}}, {"metadata":{"type":"qubit","name":"qubit0"}}]}}` p, err := PlacementFromAllocation(alloc) if err != nil { t.Fatal(err) } if len(p.Nodes) != 0 { - t.Fatalf("quantum-only allocation should have no nodes, got %v", p.Nodes) + t.Fatalf("device allocation should bind no compute node, got %v", p.Nodes) + } + if p.Backend != "rigetti_cepheus" { + t.Fatalf("backend = %q, want rigetti_cepheus (the virtual=true node)", p.Backend) + } +} + +// An unmarked node (graph built without markers) is treated as a bind target, so +// a plain classical graph still places correctly. +func TestPlacementUnmarkedNodeIsCompute(t *testing.T) { + alloc := `{"graph":{"nodes":[ + {"metadata":{"type":"node","name":"plain-node"}}]}}` + p, err := PlacementFromAllocation(alloc) + if err != nil { + t.Fatal(err) + } + if len(p.Nodes) != 1 || p.Nodes[0] != "plain-node" { + t.Fatalf("nodes = %v, want [plain-node]", p.Nodes) } - if p.Backend != "ibm_marrakesh" { - t.Fatalf("backend = %q, want ibm_marrakesh", p.Backend) + if p.Backend != "" { + t.Fatalf("unmarked node should not be a backend, got %q", p.Backend) } } diff --git a/pkg/webhook/webhook.go b/pkg/webhook/webhook.go index 08e8364..17a7b48 100644 --- a/pkg/webhook/webhook.go +++ b/pkg/webhook/webhook.go @@ -25,6 +25,7 @@ import ( "encoding/pem" "fmt" "io" + "log" "math/big" "net/http" "strings" @@ -49,42 +50,88 @@ type jsonPatchOp struct { Value any `json:"value,omitempty"` } -// backendEnv is the downward-API env injected into quantum containers. To the -// app it is an ordinary env var; its value comes from the fluence backend -// annotation, which the scheduler sets in PreBind. -func backendEnv() corev1.EnvVar { +// Mutator injects fluence's scheduler-chosen values into a pod's containers. It +// carries the env contract — the union of attribute keys across the configured +// backends — so it injects a stable, predictable set of environment variables +// regardless of which backend a given pod ends up matching. Values flow via the +// downward API from annotations the scheduler writes in PreBind, so the env var +// NAMES are fixed at pod-creation time (here) while their VALUES populate later. +type Mutator struct { + // AttributeKeys is the union of user attribute keys across all backends. Each + // becomes a FLUXION_ env var sourced from its attr- annotation. + AttributeKeys []string +} + +// injectedEnv returns the full normalized env set this mutator injects into a +// fluxion-requesting container: FLUXION_BACKEND plus one FLUXION_ per +// configured attribute key. Each reads its annotation via the downward API; an +// annotation the scheduler did not set resolves to empty, which is harmless. +func (m *Mutator) injectedEnv() []corev1.EnvVar { + envs := []corev1.EnvVar{annotationEnv( + placement.EnvVarPrefix+"BACKEND", placement.BackendAnnotation)} + for _, key := range m.AttributeKeys { + envs = append(envs, annotationEnv( + placement.EnvVarName(key), placement.AttributeAnnotationPrefix+key)) + } + return envs +} + +// EnvVarNames returns the names of every env var this mutator injects, for +// startup logging so the developer sees the exact contract their container can +// rely on. +func (m *Mutator) EnvVarNames() []string { + names := make([]string, 0, len(m.AttributeKeys)+1) + for _, e := range m.injectedEnv() { + names = append(names, e.Name) + } + return names +} + +// annotationEnv builds a downward-API env var that reads a pod annotation. +func annotationEnv(envName, annotationKey string) corev1.EnvVar { return corev1.EnvVar{ - Name: "QRMI_BACKEND", + Name: envName, ValueFrom: &corev1.EnvVarSource{ FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: fmt.Sprintf("metadata.annotations['%s']", placement.BackendAnnotation), + FieldPath: fmt.Sprintf("metadata.annotations['%s']", annotationKey), }, }, } } // Mutate returns the JSON Patch operations for a pod, or nil if nothing applies. -func Mutate(pod *corev1.Pod) []jsonPatchOp { +// For each container that requests a fluxion.flux-framework.org/* resource, it +// appends every contract env var the container does not already define. +func (m *Mutator) Mutate(pod *corev1.Pod) []jsonPatchOp { if pod.Spec.SchedulerName != SchedulerName { return nil } + contract := m.injectedEnv() var ops []jsonPatchOp for i, c := range pod.Spec.Containers { - if !requestsFluxionResource(c) || hasEnv(c, "QRMI_BACKEND") { + if !requestsFluxionResource(c) { continue } - if len(c.Env) == 0 { - ops = append(ops, jsonPatchOp{ - Op: "add", - Path: fmt.Sprintf("/spec/containers/%d/env", i), - Value: []corev1.EnvVar{backendEnv()}, - }) - } else { + for _, e := range contract { + if hasEnv(c, e.Name) { + continue + } + if len(c.Env) == 0 { + ops = append(ops, jsonPatchOp{ + Op: "add", + Path: fmt.Sprintf("/spec/containers/%d/env", i), + Value: []corev1.EnvVar{e}, + }) + // Subsequent vars append to the now-existing slice. + c.Env = []corev1.EnvVar{e} + continue + } ops = append(ops, jsonPatchOp{ Op: "add", Path: fmt.Sprintf("/spec/containers/%d/env/-", i), - Value: backendEnv(), + Value: e, }) + c.Env = append(c.Env, e) } } return ops @@ -110,7 +157,7 @@ func hasEnv(c corev1.Container, name string) bool { // Handler is the /mutate endpoint. It always admits the pod (failure to mutate // must not block creation); it only adds a patch when Mutate returns one. -func Handler(w http.ResponseWriter, r *http.Request) { +func (m *Mutator) Handler(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) @@ -125,11 +172,13 @@ func Handler(w http.ResponseWriter, r *http.Request) { resp := &admissionv1.AdmissionResponse{UID: review.Request.UID, Allowed: true} var pod corev1.Pod if err := json.Unmarshal(review.Request.Object.Raw, &pod); err == nil { - if ops := Mutate(&pod); len(ops) > 0 { + if ops := m.Mutate(&pod); len(ops) > 0 { if patch, err := json.Marshal(ops); err == nil { pt := admissionv1.PatchTypeJSONPatch resp.Patch = patch resp.PatchType = &pt + log.Printf("[fluence-webhook] injected %d env op(s) into pod %s/%s", + len(ops), pod.Namespace, pod.Name) } } } diff --git a/pkg/webhook/webhook_test.go b/pkg/webhook/webhook_test.go index c5a164c..6d97e40 100644 --- a/pkg/webhook/webhook_test.go +++ b/pkg/webhook/webhook_test.go @@ -8,7 +8,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" ) -func qpuPod(scheduler string, withEnv bool) *corev1.Pod { +func qpuPod(scheduler string, presetEnv string) *corev1.Pod { c := corev1.Container{ Name: "app", Resources: corev1.ResourceRequirements{ @@ -17,40 +17,101 @@ func qpuPod(scheduler string, withEnv bool) *corev1.Pod { }, }, } - if withEnv { - c.Env = []corev1.EnvVar{{Name: "QRMI_BACKEND", Value: "preset"}} + if presetEnv != "" { + c.Env = []corev1.EnvVar{{Name: presetEnv, Value: "preset"}} } return &corev1.Pod{Spec: corev1.PodSpec{SchedulerName: scheduler, Containers: []corev1.Container{c}}} } -func TestMutateInjectsBackendEnv(t *testing.T) { - ops := Mutate(qpuPod("fluence", false)) - if len(ops) != 1 { - t.Fatalf("want 1 op, got %d", len(ops)) +// envNames returns the env var names referenced by a list of add-ops. +func opEnvNames(ops []jsonPatchOp) []string { + var names []string + for _, op := range ops { + switch v := op.Value.(type) { + case corev1.EnvVar: + names = append(names, v.Name) + case []corev1.EnvVar: + for _, e := range v { + names = append(names, e.Name) + } + } } - if ops[0].Path != "/spec/containers/0/env" { - t.Errorf("path = %q", ops[0].Path) + return names +} + +func contains(names []string, want string) bool { + for _, n := range names { + if n == want { + return true + } + } + return false +} + +// With a config-derived contract (region, qubits), a fluxion pod gets +// FLUXION_BACKEND plus one FLUXION_ per attribute key. +func TestMutateInjectsContract(t *testing.T) { + m := &Mutator{AttributeKeys: []string{"region", "qubits"}} + ops := m.Mutate(qpuPod("fluence", "")) + names := opEnvNames(ops) + + for _, want := range []string{"FLUXION_BACKEND", "FLUXION_REGION", "FLUXION_QUBITS"} { + if !contains(names, want) { + t.Errorf("missing injected env %q; got %v", want, names) + } + } + if len(names) != 3 { + t.Errorf("expected exactly 3 env vars, got %v", names) } } +// With no configured attributes, only FLUXION_BACKEND is injected. +func TestMutateBackendOnly(t *testing.T) { + m := &Mutator{} + names := opEnvNames(m.Mutate(qpuPod("fluence", ""))) + if len(names) != 1 || names[0] != "FLUXION_BACKEND" { + t.Fatalf("want [FLUXION_BACKEND], got %v", names) + } +} + +// Non-fluence pods are never mutated. func TestMutateSkipsOtherScheduler(t *testing.T) { - if ops := Mutate(qpuPod("default-scheduler", false)); ops != nil { + m := &Mutator{AttributeKeys: []string{"region"}} + if ops := m.Mutate(qpuPod("default-scheduler", "")); ops != nil { t.Fatalf("non-fluence pod should not be mutated, got %v", ops) } } +// An env var the container already defines is not re-injected (idempotent / no +// override), while the others still are. func TestMutateRespectsExistingEnv(t *testing.T) { - if ops := Mutate(qpuPod("fluence", true)); ops != nil { - t.Fatalf("should not override an existing QRMI_BACKEND, got %v", ops) + m := &Mutator{AttributeKeys: []string{"region"}} + names := opEnvNames(m.Mutate(qpuPod("fluence", "FLUXION_BACKEND"))) + if contains(names, "FLUXION_BACKEND") { + t.Errorf("should not re-inject existing FLUXION_BACKEND; got %v", names) + } + if !contains(names, "FLUXION_REGION") { + t.Errorf("should still inject FLUXION_REGION; got %v", names) } } -func TestMutateSkipsNonQuantum(t *testing.T) { +// Classical pods (no fluxion resource request) are not mutated. +func TestMutateSkipsNonFluxion(t *testing.T) { + m := &Mutator{AttributeKeys: []string{"region"}} p := &corev1.Pod{Spec: corev1.PodSpec{ SchedulerName: "fluence", Containers: []corev1.Container{{Name: "c", Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{corev1.ResourceCPU: *resource.NewQuantity(1, resource.DecimalSI)}}}}, }} - if ops := Mutate(p); ops != nil { + if ops := m.Mutate(p); ops != nil { t.Fatalf("classical pod should not be mutated, got %v", ops) } } + +// EnvVarNames reports the full contract for startup logging. +func TestEnvVarNames(t *testing.T) { + m := &Mutator{AttributeKeys: []string{"region", "connectivity"}} + names := m.EnvVarNames() + if len(names) != 3 || names[0] != "FLUXION_BACKEND" { + t.Fatalf("EnvVarNames = %v, want FLUXION_BACKEND first then attrs", names) + } +}