From 71a41d92150860f0d0022d8f18b3250dd54d2032 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Mon, 1 Jun 2026 01:44:36 +0200 Subject: [PATCH] bundle/statemgmt: add StateReader abstraction for file and DMS state Introduce a StateReader interface with two implementations that populate the direct engine's resource-state DB: - file-based, reading the local resources.json (delegates to DeploymentState.Open, preserving WAL recovery + migration) - DMS-based, reading from the deployment metadata service via the SDK Bundle.ListResourcesAll This is a self-contained, drop-in abstraction with unit tests; it is not yet wired into the deploy path. Integration (selecting the reader by managed-state and sourcing the deployment ID) follows once the DMS lock and op-reporting PRs land. Co-authored-by: Isaac --- bundle/statemgmt/statereader.go | 99 ++++++++++++++++++++ bundle/statemgmt/statereader_test.go | 132 +++++++++++++++++++++++++++ 2 files changed, 231 insertions(+) create mode 100644 bundle/statemgmt/statereader.go create mode 100644 bundle/statemgmt/statereader_test.go diff --git a/bundle/statemgmt/statereader.go b/bundle/statemgmt/statereader.go new file mode 100644 index 0000000000..2c670386c3 --- /dev/null +++ b/bundle/statemgmt/statereader.go @@ -0,0 +1,99 @@ +package statemgmt + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/databricks/cli/bundle/direct/dstate" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" +) + +// StateReader populates the deployment resource-state DB used by the direct +// engine's plan/apply path. It abstracts where the state comes from: the local +// resources.json file (the historical source) or the deployment metadata +// service (DMS), which owns the state server-side when the bundle is opted into +// managed state. +// +// Both implementations leave the DB open in read mode. The plan path may later +// upgrade it to write mode; see dstate.DeploymentState.UpgradeToWrite. +type StateReader interface { + // Load opens db and populates it with the deployment's resource state. + Load(ctx context.Context, db *dstate.DeploymentState) error +} + +// fileStateReader reads resource state from the local resources.json file. +type fileStateReader struct { + path string +} + +// NewFileStateReader returns a StateReader backed by the local resources.json +// file at path. This is the historical (non-DMS) source of resource state. +func NewFileStateReader(path string) StateReader { + return &fileStateReader{path: path} +} + +func (r *fileStateReader) Load(ctx context.Context, db *dstate.DeploymentState) error { + // Recovery replays any leftover write-ahead log from a crashed deploy; the + // file reader owns the on-disk state, so recovery applies here. + return db.Open(ctx, r.path, dstate.WithRecovery(true), dstate.WithWrite(false)) +} + +// dmsStateReader reads resource state from the deployment metadata service. +type dmsStateReader struct { + client sdkbundle.BundleInterface + deploymentID string + + // path is the local resources.json path. OpenWithData records it as the + // eventual write target if the plan path later upgrades the DB to write + // mode; the DMS reader itself never reads from or writes to it. + path string +} + +// NewDMSStateReader returns a StateReader backed by the deployment metadata +// service for the deployment identified by deploymentID, which must be +// non-empty. path is the local resources.json path (see dmsStateReader.path). +func NewDMSStateReader(client sdkbundle.BundleInterface, deploymentID, path string) StateReader { + return &dmsStateReader{client: client, deploymentID: deploymentID, path: path} +} + +func (r *dmsStateReader) Load(ctx context.Context, db *dstate.DeploymentState) error { + data, err := fetchDeploymentState(ctx, r.client, r.deploymentID) + if err != nil { + return err + } + db.OpenWithData(r.path, data) + return nil +} + +// fetchDeploymentState lists every resource recorded for the deployment and +// assembles them into a state Database. +func fetchDeploymentState(ctx context.Context, client sdkbundle.BundleInterface, deploymentID string) (dstate.Database, error) { + resources, err := client.ListResourcesAll(ctx, sdkbundle.ListResourcesRequest{ + Parent: "deployments/" + deploymentID, + }) + if err != nil { + return dstate.Database{}, fmt.Errorf("listing resources from deployment metadata service: %w", err) + } + + // Lineage and serial are file-state concepts for detecting concurrent + // local edits; under DMS the server owns versioning, so they stay empty. + data := dstate.NewDatabase("", 0) + for _, res := range resources { + // DMS reports resource keys without the "resources." prefix (e.g. + // "jobs.foo"), but the local state DB keys are fully qualified + // ("resources.jobs.foo"), so prepend it here. + key := "resources." + res.ResourceKey + + var state json.RawMessage + if res.State != nil { + state = *res.State + } + + data.State[key] = dstate.ResourceEntry{ + ID: res.ResourceId, + State: state, + } + } + return data, nil +} diff --git a/bundle/statemgmt/statereader_test.go b/bundle/statemgmt/statereader_test.go new file mode 100644 index 0000000000..bf4ed23d87 --- /dev/null +++ b/bundle/statemgmt/statereader_test.go @@ -0,0 +1,132 @@ +package statemgmt + +import ( + "context" + "encoding/json" + "errors" + "os" + "path/filepath" + "testing" + + "github.com/databricks/cli/bundle/direct/dstate" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type fakeBundleClient struct { + sdkbundle.BundleInterface + resources []sdkbundle.Resource + requests []sdkbundle.ListResourcesRequest + err error +} + +func (c *fakeBundleClient) ListResourcesAll(_ context.Context, req sdkbundle.ListResourcesRequest) ([]sdkbundle.Resource, error) { + c.requests = append(c.requests, req) + if c.err != nil { + return nil, c.err + } + return c.resources, nil +} + +func rawState(t *testing.T, s string) *json.RawMessage { + t.Helper() + msg := json.RawMessage(s) + return &msg +} + +func TestDMSStateReaderPopulatesStateAndPrefixesKeys(t *testing.T) { + client := &fakeBundleClient{ + resources: []sdkbundle.Resource{ + {ResourceKey: "jobs.foo", ResourceId: "job-1", State: rawState(t, `{"name":"foo"}`)}, + {ResourceKey: "pipelines.bar", ResourceId: "pipe-2", State: rawState(t, `{"name":"bar"}`)}, + }, + } + + var db dstate.DeploymentState + reader := NewDMSStateReader(client, "dep-1", filepath.Join(t.TempDir(), "resources.json")) + require.NoError(t, reader.Load(t.Context(), &db)) + + require.Len(t, client.requests, 1) + assert.Equal(t, "deployments/dep-1", client.requests[0].Parent) + + entry, ok := db.GetResourceEntry("resources.jobs.foo") + require.True(t, ok) + assert.Equal(t, "job-1", entry.ID) + assert.JSONEq(t, `{"name":"foo"}`, string(entry.State)) + assert.Equal(t, "job-1", db.GetResourceID("resources.jobs.foo")) + + entry, ok = db.GetResourceEntry("resources.pipelines.bar") + require.True(t, ok) + assert.Equal(t, "pipe-2", entry.ID) +} + +func TestDMSStateReaderEmptyList(t *testing.T) { + client := &fakeBundleClient{} + + var db dstate.DeploymentState + reader := NewDMSStateReader(client, "dep-1", filepath.Join(t.TempDir(), "resources.json")) + require.NoError(t, reader.Load(t.Context(), &db)) + + _, ok := db.GetResourceEntry("resources.jobs.foo") + assert.False(t, ok) +} + +func TestDMSStateReaderNilStateBecomesEmpty(t *testing.T) { + client := &fakeBundleClient{ + resources: []sdkbundle.Resource{ + {ResourceKey: "jobs.foo", ResourceId: "job-1", State: nil}, + }, + } + + var db dstate.DeploymentState + reader := NewDMSStateReader(client, "dep-1", filepath.Join(t.TempDir(), "resources.json")) + require.NoError(t, reader.Load(t.Context(), &db)) + + entry, ok := db.GetResourceEntry("resources.jobs.foo") + require.True(t, ok) + assert.Equal(t, "job-1", entry.ID) + assert.Nil(t, entry.State) +} + +func TestDMSStateReaderPropagatesListError(t *testing.T) { + wantErr := errors.New("boom") + client := &fakeBundleClient{err: wantErr} + + var db dstate.DeploymentState + reader := NewDMSStateReader(client, "dep-1", filepath.Join(t.TempDir(), "resources.json")) + err := reader.Load(t.Context(), &db) + assert.ErrorIs(t, err, wantErr) +} + +func TestFileStateReaderReadsLocalState(t *testing.T) { + path := filepath.Join(t.TempDir(), "resources.json") + + seed := dstate.NewDatabase("lineage-1", 3) + seed.State["resources.jobs.foo"] = dstate.ResourceEntry{ID: "job-1", State: json.RawMessage(`{"name":"foo"}`)} + content, err := json.Marshal(seed) + require.NoError(t, err) + require.NoError(t, os.WriteFile(path, content, 0o600)) + + var db dstate.DeploymentState + reader := NewFileStateReader(path) + require.NoError(t, reader.Load(t.Context(), &db)) + + assert.Equal(t, "lineage-1", db.Data.Lineage) + assert.Equal(t, 3, db.Data.Serial) + + entry, ok := db.GetResourceEntry("resources.jobs.foo") + require.True(t, ok) + assert.Equal(t, "job-1", entry.ID) +} + +func TestFileStateReaderMissingFileIsEmptyState(t *testing.T) { + path := filepath.Join(t.TempDir(), "does-not-exist.json") + + var db dstate.DeploymentState + reader := NewFileStateReader(path) + require.NoError(t, reader.Load(t.Context(), &db)) + + _, ok := db.GetResourceEntry("resources.jobs.foo") + assert.False(t, ok) +}