diff --git a/acceptance/bundle/deploy/record-deployment-history/jobs/databricks.delete.yml b/acceptance/bundle/deploy/record-deployment-history/jobs/databricks.delete.yml new file mode 100644 index 0000000000..73053e228b --- /dev/null +++ b/acceptance/bundle/deploy/record-deployment-history/jobs/databricks.delete.yml @@ -0,0 +1,8 @@ +bundle: + name: test-rdh-jobs + +experimental: + record_deployment_history: true + +resources: + jobs: {} diff --git a/acceptance/bundle/deploy/record-deployment-history/jobs/databricks.yml b/acceptance/bundle/deploy/record-deployment-history/jobs/databricks.yml new file mode 100644 index 0000000000..51942f9696 --- /dev/null +++ b/acceptance/bundle/deploy/record-deployment-history/jobs/databricks.yml @@ -0,0 +1,10 @@ +bundle: + name: test-rdh-jobs + +experimental: + record_deployment_history: true + +resources: + jobs: + foo: + name: foo-job diff --git a/acceptance/bundle/deploy/record-deployment-history/jobs/out.test.toml b/acceptance/bundle/deploy/record-deployment-history/jobs/out.test.toml new file mode 100644 index 0000000000..e90b6d5d1b --- /dev/null +++ b/acceptance/bundle/deploy/record-deployment-history/jobs/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/record-deployment-history/jobs/output.txt b/acceptance/bundle/deploy/record-deployment-history/jobs/output.txt new file mode 100644 index 0000000000..5153537342 --- /dev/null +++ b/acceptance/bundle/deploy/record-deployment-history/jobs/output.txt @@ -0,0 +1,94 @@ + +=== create: records OPERATION_ACTION_TYPE_CREATE +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-rdh-jobs/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> print_requests.py //api/2.0/bundle +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/abcd/versions/0/operations", + "q": { + "resource_key": "resources.jobs.foo" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "resource_id": "[NUMID]", + "resource_key": "resources.jobs.foo", + "state": { + "deployment": { + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/test-rdh-jobs/default/state/metadata.json" + }, + "edit_mode": "UI_LOCKED", + "format": "MULTI_TASK", + "max_concurrent_runs": 1, + "name": "foo-job", + "queue": { + "enabled": true + } + }, + "status": "OPERATION_STATUS_SUCCEEDED" + } +} + +=== update: records OPERATION_ACTION_TYPE_UPDATE +>>> update_file.py databricks.yml foo-job foo-job-renamed + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-rdh-jobs/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> print_requests.py //api/2.0/bundle +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/abcd/versions/0/operations", + "q": { + "resource_key": "resources.jobs.foo" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_UPDATE", + "resource_id": "[NUMID]", + "resource_key": "resources.jobs.foo", + "state": { + "deployment": { + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/test-rdh-jobs/default/state/metadata.json" + }, + "edit_mode": "UI_LOCKED", + "format": "MULTI_TASK", + "max_concurrent_runs": 1, + "name": "foo-job-renamed", + "queue": { + "enabled": true + } + }, + "status": "OPERATION_STATUS_SUCCEEDED" + } +} + +=== delete: records OPERATION_ACTION_TYPE_DELETE (resource removed from config) +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-rdh-jobs/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> print_requests.py //api/2.0/bundle +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/abcd/versions/0/operations", + "q": { + "resource_key": "resources.jobs.foo" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_DELETE", + "resource_id": "[NUMID]", + "resource_key": "resources.jobs.foo", + "status": "OPERATION_STATUS_SUCCEEDED" + } +} diff --git a/acceptance/bundle/deploy/record-deployment-history/jobs/script b/acceptance/bundle/deploy/record-deployment-history/jobs/script new file mode 100644 index 0000000000..0f7ab4bf37 --- /dev/null +++ b/acceptance/bundle/deploy/record-deployment-history/jobs/script @@ -0,0 +1,13 @@ +title "create: records OPERATION_ACTION_TYPE_CREATE" +trace $CLI bundle deploy +trace print_requests.py //api/2.0/bundle + +title "update: records OPERATION_ACTION_TYPE_UPDATE" +trace update_file.py databricks.yml foo-job foo-job-renamed +trace $CLI bundle deploy +trace print_requests.py //api/2.0/bundle + +title "delete: records OPERATION_ACTION_TYPE_DELETE (resource removed from config)" +cp databricks.delete.yml databricks.yml +trace $CLI bundle deploy +trace print_requests.py //api/2.0/bundle diff --git a/acceptance/bundle/deploy/record-deployment-history/jobs/test.toml b/acceptance/bundle/deploy/record-deployment-history/jobs/test.toml new file mode 100644 index 0000000000..e5d4fa1464 --- /dev/null +++ b/acceptance/bundle/deploy/record-deployment-history/jobs/test.toml @@ -0,0 +1,2 @@ +# databricks.yml is rewritten in-place by the script (update + delete steps). +Ignore = [".databricks", "databricks.yml"] diff --git a/acceptance/bundle/deploy/record-deployment-history/recreate/databricks.yml b/acceptance/bundle/deploy/record-deployment-history/recreate/databricks.yml new file mode 100644 index 0000000000..83dd3cb0f6 --- /dev/null +++ b/acceptance/bundle/deploy/record-deployment-history/recreate/databricks.yml @@ -0,0 +1,11 @@ +bundle: + name: test-rdh-recreate + +experimental: + record_deployment_history: true + +resources: + pipelines: + bar: + name: bar-pipeline + storage: /tmp/storage-a diff --git a/acceptance/bundle/deploy/record-deployment-history/recreate/out.test.toml b/acceptance/bundle/deploy/record-deployment-history/recreate/out.test.toml new file mode 100644 index 0000000000..e90b6d5d1b --- /dev/null +++ b/acceptance/bundle/deploy/record-deployment-history/recreate/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/record-deployment-history/recreate/output.txt b/acceptance/bundle/deploy/record-deployment-history/recreate/output.txt new file mode 100644 index 0000000000..b461129a23 --- /dev/null +++ b/acceptance/bundle/deploy/record-deployment-history/recreate/output.txt @@ -0,0 +1,72 @@ + +=== create the pipeline +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-rdh-recreate/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> print_requests.py //api/2.0/bundle +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/abcd/versions/0/operations", + "q": { + "resource_key": "resources.pipelines.bar" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "resource_id": "[UUID]", + "resource_key": "resources.pipelines.bar", + "state": { + "channel": "CURRENT", + "deployment": { + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/test-rdh-recreate/default/state/metadata.json" + }, + "edition": "ADVANCED", + "name": "bar-pipeline", + "storage": "/tmp/storage-a" + }, + "status": "OPERATION_STATUS_SUCCEEDED" + } +} + +=== recreate: changing immutable 'storage' records OPERATION_ACTION_TYPE_RECREATE +>>> update_file.py databricks.yml /tmp/storage-a /tmp/storage-b + +>>> [CLI] bundle deploy --auto-approve +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-rdh-recreate/default/files... + +This action will result in the deletion or recreation of the following Lakeflow Spark Declarative Pipelines along with the +Streaming Tables (STs) and Materialized Views (MVs) managed by them. Recreating the pipelines will +restore the defined STs and MVs through full refresh. Note that recreation is necessary when pipeline +properties such as the 'catalog' or 'storage' are changed: + recreate resources.pipelines.bar +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> print_requests.py //api/2.0/bundle +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/abcd/versions/0/operations", + "q": { + "resource_key": "resources.pipelines.bar" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_RECREATE", + "resource_id": "[UUID]", + "resource_key": "resources.pipelines.bar", + "state": { + "channel": "CURRENT", + "deployment": { + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/test-rdh-recreate/default/state/metadata.json" + }, + "edition": "ADVANCED", + "name": "bar-pipeline", + "storage": "/tmp/storage-b" + }, + "status": "OPERATION_STATUS_SUCCEEDED" + } +} diff --git a/acceptance/bundle/deploy/record-deployment-history/recreate/script b/acceptance/bundle/deploy/record-deployment-history/recreate/script new file mode 100644 index 0000000000..7e85ac49d9 --- /dev/null +++ b/acceptance/bundle/deploy/record-deployment-history/recreate/script @@ -0,0 +1,8 @@ +title "create the pipeline" +trace $CLI bundle deploy +trace print_requests.py //api/2.0/bundle + +title "recreate: changing immutable 'storage' records OPERATION_ACTION_TYPE_RECREATE" +trace update_file.py databricks.yml /tmp/storage-a /tmp/storage-b +trace $CLI bundle deploy --auto-approve +trace print_requests.py //api/2.0/bundle diff --git a/acceptance/bundle/deploy/record-deployment-history/recreate/test.toml b/acceptance/bundle/deploy/record-deployment-history/recreate/test.toml new file mode 100644 index 0000000000..5e2b28502f --- /dev/null +++ b/acceptance/bundle/deploy/record-deployment-history/recreate/test.toml @@ -0,0 +1,2 @@ +# databricks.yml is rewritten in-place by the script (storage change). +Ignore = [".databricks", "databricks.yml"] diff --git a/acceptance/bundle/deploy/record-deployment-history/resize/databricks.yml b/acceptance/bundle/deploy/record-deployment-history/resize/databricks.yml new file mode 100644 index 0000000000..33ed54901a --- /dev/null +++ b/acceptance/bundle/deploy/record-deployment-history/resize/databricks.yml @@ -0,0 +1,15 @@ +bundle: + name: test-rdh-resize + +experimental: + record_deployment_history: true + +resources: + clusters: + cl: + cluster_name: test-cluster + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 1 + lifecycle: + started: true diff --git a/acceptance/bundle/deploy/record-deployment-history/resize/out.test.toml b/acceptance/bundle/deploy/record-deployment-history/resize/out.test.toml new file mode 100644 index 0000000000..e90b6d5d1b --- /dev/null +++ b/acceptance/bundle/deploy/record-deployment-history/resize/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/record-deployment-history/resize/output.txt b/acceptance/bundle/deploy/record-deployment-history/resize/output.txt new file mode 100644 index 0000000000..31ba992a67 --- /dev/null +++ b/acceptance/bundle/deploy/record-deployment-history/resize/output.txt @@ -0,0 +1,66 @@ + +=== create the cluster (running via lifecycle.started) +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-rdh-resize/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> print_requests.py //api/2.0/bundle +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/abcd/versions/0/operations", + "q": { + "resource_key": "resources.clusters.cl" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "resource_id": "[UUID]", + "resource_key": "resources.clusters.cl", + "state": { + "autotermination_minutes": 60, + "cluster_name": "test-cluster", + "lifecycle": { + "started": true + }, + "node_type_id": "[NODE_TYPE_ID]", + "num_workers": 1, + "spark_version": "15.4.x-scala2.12" + }, + "status": "OPERATION_STATUS_SUCCEEDED" + } +} + +=== resize: changing num_workers on a running cluster records OPERATION_ACTION_TYPE_RESIZE +>>> update_file.py databricks.yml num_workers: 1 num_workers: 2 + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-rdh-resize/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> print_requests.py //api/2.0/bundle +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/abcd/versions/0/operations", + "q": { + "resource_key": "resources.clusters.cl" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_RESIZE", + "resource_id": "[UUID]", + "resource_key": "resources.clusters.cl", + "state": { + "autotermination_minutes": 60, + "cluster_name": "test-cluster", + "lifecycle": { + "started": true + }, + "node_type_id": "[NODE_TYPE_ID]", + "num_workers": 2, + "spark_version": "15.4.x-scala2.12" + }, + "status": "OPERATION_STATUS_SUCCEEDED" + } +} diff --git a/acceptance/bundle/deploy/record-deployment-history/resize/script b/acceptance/bundle/deploy/record-deployment-history/resize/script new file mode 100644 index 0000000000..820c350a2c --- /dev/null +++ b/acceptance/bundle/deploy/record-deployment-history/resize/script @@ -0,0 +1,8 @@ +title "create the cluster (running via lifecycle.started)" +trace $CLI bundle deploy +trace print_requests.py //api/2.0/bundle + +title "resize: changing num_workers on a running cluster records OPERATION_ACTION_TYPE_RESIZE" +trace update_file.py databricks.yml "num_workers: 1" "num_workers: 2" +trace $CLI bundle deploy +trace print_requests.py //api/2.0/bundle diff --git a/acceptance/bundle/deploy/record-deployment-history/resize/test.toml b/acceptance/bundle/deploy/record-deployment-history/resize/test.toml new file mode 100644 index 0000000000..07f25a0dd3 --- /dev/null +++ b/acceptance/bundle/deploy/record-deployment-history/resize/test.toml @@ -0,0 +1,2 @@ +# databricks.yml is rewritten in-place by the script (num_workers change). +Ignore = [".databricks", "databricks.yml"] diff --git a/acceptance/bundle/deploy/record-deployment-history/test.toml b/acceptance/bundle/deploy/record-deployment-history/test.toml new file mode 100644 index 0000000000..907ac0c4d3 --- /dev/null +++ b/acceptance/bundle/deploy/record-deployment-history/test.toml @@ -0,0 +1,8 @@ +Local = true +Cloud = false +RecordRequests = true + +# Operation recording only runs for the direct engine; the terraform engine +# would record nothing and produce divergent output. +[EnvMatrix] +DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/record-deployment-history/update-id/databricks.yml b/acceptance/bundle/deploy/record-deployment-history/update-id/databricks.yml new file mode 100644 index 0000000000..3eafbc05e9 --- /dev/null +++ b/acceptance/bundle/deploy/record-deployment-history/update-id/databricks.yml @@ -0,0 +1,10 @@ +bundle: + name: test-rdh-update-id + +experimental: + record_deployment_history: true + +resources: + catalogs: + cat: + name: my_catalog_a diff --git a/acceptance/bundle/deploy/record-deployment-history/update-id/out.test.toml b/acceptance/bundle/deploy/record-deployment-history/update-id/out.test.toml new file mode 100644 index 0000000000..e90b6d5d1b --- /dev/null +++ b/acceptance/bundle/deploy/record-deployment-history/update-id/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/record-deployment-history/update-id/output.txt b/acceptance/bundle/deploy/record-deployment-history/update-id/output.txt new file mode 100644 index 0000000000..72efad5bf0 --- /dev/null +++ b/acceptance/bundle/deploy/record-deployment-history/update-id/output.txt @@ -0,0 +1,52 @@ + +=== create the catalog +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-rdh-update-id/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> print_requests.py //api/2.0/bundle +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/abcd/versions/0/operations", + "q": { + "resource_key": "resources.catalogs.cat" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "resource_id": "my_catalog_a", + "resource_key": "resources.catalogs.cat", + "state": { + "name": "my_catalog_a" + }, + "status": "OPERATION_STATUS_SUCCEEDED" + } +} + +=== update_id: renaming the catalog changes its ID, records OPERATION_ACTION_TYPE_UPDATE_WITH_ID +>>> update_file.py databricks.yml my_catalog_a my_catalog_b + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-rdh-update-id/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> print_requests.py //api/2.0/bundle +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/abcd/versions/0/operations", + "q": { + "resource_key": "resources.catalogs.cat" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_UPDATE_WITH_ID", + "resource_id": "my_catalog_b", + "resource_key": "resources.catalogs.cat", + "state": { + "name": "my_catalog_b" + }, + "status": "OPERATION_STATUS_SUCCEEDED" + } +} diff --git a/acceptance/bundle/deploy/record-deployment-history/update-id/script b/acceptance/bundle/deploy/record-deployment-history/update-id/script new file mode 100644 index 0000000000..48c3a823a6 --- /dev/null +++ b/acceptance/bundle/deploy/record-deployment-history/update-id/script @@ -0,0 +1,8 @@ +title "create the catalog" +trace $CLI bundle deploy +trace print_requests.py //api/2.0/bundle + +title "update_id: renaming the catalog changes its ID, records OPERATION_ACTION_TYPE_UPDATE_WITH_ID" +trace update_file.py databricks.yml my_catalog_a my_catalog_b +trace $CLI bundle deploy +trace print_requests.py //api/2.0/bundle diff --git a/acceptance/bundle/deploy/record-deployment-history/update-id/test.toml b/acceptance/bundle/deploy/record-deployment-history/update-id/test.toml new file mode 100644 index 0000000000..287187a093 --- /dev/null +++ b/acceptance/bundle/deploy/record-deployment-history/update-id/test.toml @@ -0,0 +1,2 @@ +# databricks.yml is rewritten in-place by the script (catalog rename). +Ignore = [".databricks", "databricks.yml"] diff --git a/bundle/direct/bundle_apply.go b/bundle/direct/bundle_apply.go index 6bad809146..dc45edda0a 100644 --- a/bundle/direct/bundle_apply.go +++ b/bundle/direct/bundle_apply.go @@ -84,11 +84,18 @@ func (b *DeploymentBundle) Apply(ctx context.Context, client *databricks.Workspa logdiag.LogError(ctx, fmt.Errorf("%s: Unexpected delete action during migration", errorPrefix)) return false } + // Read the ID before Destroy: it removes the entry from state. + deletedID := b.StateDB.GetResourceID(resourceKey) err = d.Destroy(ctx, &b.StateDB) if err != nil { logdiag.LogError(ctx, fmt.Errorf("%s: %w", errorPrefix, err)) return false } + // state is nil: the resource no longer exists after a delete. + if err := b.recordOperation(ctx, resourceKey, deployplan.Delete, deletedID, nil); err != nil { + logdiag.LogError(ctx, fmt.Errorf("%s: %w", errorPrefix, err)) + return false + } return true } @@ -128,6 +135,15 @@ func (b *DeploymentBundle) Apply(ctx context.Context, client *databricks.Workspa logdiag.LogError(ctx, fmt.Errorf("%s: %w", errorPrefix, err)) return false } + + // Record the operation with DMS. Migration only mirrors existing + // state into the local store; there is no operation to report. + if !migrateMode { + if err := b.recordOperation(ctx, resourceKey, action, b.StateDB.GetResourceID(resourceKey), sv.Value); err != nil { + logdiag.LogError(ctx, fmt.Errorf("%s: %w", errorPrefix, err)) + return false + } + } } // TODO: Note, we only really need remote state if there are remote references. @@ -189,6 +205,16 @@ func (b *DeploymentBundle) LookupReferencePostDeploy(ctx context.Context, path * return structaccess.Get(remoteState, fieldPath) } +// recordOperation reports a completed resource operation to DMS. It is a no-op +// unless the bundle is opted into managed state (OpRec is set). state is the +// serialized config after the operation and must be nil for deletes. +func (b *DeploymentBundle) recordOperation(ctx context.Context, resourceKey string, action deployplan.ActionType, resourceID string, state any) error { + if b.OpRec == nil { + return nil + } + return b.OpRec.record(ctx, resourceKey, action, resourceID, state) +} + func jsonDump(obj any) string { bytes, err := json.MarshalIndent(obj, "", " ") if err != nil { diff --git a/bundle/direct/oprecorder.go b/bundle/direct/oprecorder.go new file mode 100644 index 0000000000..41da0269af --- /dev/null +++ b/bundle/direct/oprecorder.go @@ -0,0 +1,90 @@ +package direct + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/databricks/cli/bundle/deployplan" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" +) + +// opRecorder records a resource operation with the deployment metadata service +// (DMS) after it has been applied to the workspace. state is the serialized +// local config after the operation and must be nil for delete operations. +type opRecorder interface { + record(ctx context.Context, resourceKey string, action deployplan.ActionType, resourceID string, state any) error +} + +// operationRecorder records operations via the DMS CreateOperation API. +type operationRecorder struct { + client sdkbundle.BundleInterface + // parent is the version the operations are recorded under, formatted as + // "deployments/{deployment_id}/versions/{version_id}". + parent string +} + +// NewOperationRecorder returns an opRecorder backed by the DMS CreateOperation +// API. deploymentID and version identify the deployment version assigned by DMS +// that the operations are recorded under. +func NewOperationRecorder(client sdkbundle.BundleInterface, deploymentID string, version int64) opRecorder { + return &operationRecorder{ + client: client, + parent: fmt.Sprintf("deployments/%s/versions/%d", deploymentID, version), + } +} + +func (r *operationRecorder) record(ctx context.Context, resourceKey string, action deployplan.ActionType, resourceID string, state any) error { + actionType, err := deployActionToSDK(action) + if err != nil { + return err + } + + op := sdkbundle.Operation{ + ActionType: actionType, + ResourceId: resourceID, + ResourceKey: resourceKey, + Status: sdkbundle.OperationStatusOperationStatusSucceeded, + } + + // The DMS Operation.State field carries the serialized config so the backend + // can serve it as resource state. It is intentionally left unset for delete, + // where the resource no longer exists. + if state != nil { + raw, err := json.Marshal(state) + if err != nil { + return fmt.Errorf("serializing state: %w", err) + } + msg := json.RawMessage(raw) + op.State = &msg + } + + _, err = r.client.CreateOperation(ctx, sdkbundle.CreateOperationRequest{ + Parent: r.parent, + ResourceKey: resourceKey, + Operation: op, + }) + return err +} + +// deployActionToSDK maps a deployplan action to its DMS operation action type. +// Only actions that mutate a resource are recordable; Skip and Undefined never +// reach a recorder and are rejected rather than silently coerced. +func deployActionToSDK(a deployplan.ActionType) (sdkbundle.OperationActionType, error) { + switch a { + case deployplan.Create: + return sdkbundle.OperationActionTypeOperationActionTypeCreate, nil + case deployplan.Update: + return sdkbundle.OperationActionTypeOperationActionTypeUpdate, nil + case deployplan.UpdateWithID: + return sdkbundle.OperationActionTypeOperationActionTypeUpdateWithId, nil + case deployplan.Recreate: + return sdkbundle.OperationActionTypeOperationActionTypeRecreate, nil + case deployplan.Resize: + return sdkbundle.OperationActionTypeOperationActionTypeResize, nil + case deployplan.Delete: + return sdkbundle.OperationActionTypeOperationActionTypeDelete, nil + default: + return "", fmt.Errorf("cannot record operation: unsupported action %q", a) + } +} diff --git a/bundle/direct/oprecorder_test.go b/bundle/direct/oprecorder_test.go new file mode 100644 index 0000000000..094a01b60b --- /dev/null +++ b/bundle/direct/oprecorder_test.go @@ -0,0 +1,104 @@ +package direct + +import ( + "context" + "encoding/json" + "errors" + "testing" + + "github.com/databricks/cli/bundle/deployplan" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type fakeBundleClient struct { + sdkbundle.BundleInterface + requests []sdkbundle.CreateOperationRequest + err error +} + +func (c *fakeBundleClient) CreateOperation(_ context.Context, req sdkbundle.CreateOperationRequest) (*sdkbundle.Operation, error) { + c.requests = append(c.requests, req) + if c.err != nil { + return nil, c.err + } + return &sdkbundle.Operation{}, nil +} + +func TestOperationRecorderRecordsUnderVersionParent(t *testing.T) { + client := &fakeBundleClient{} + rec := NewOperationRecorder(client, "dep-1", 7) + + err := rec.record(t.Context(), "resources.jobs.foo", deployplan.Create, "job-123", map[string]string{"name": "foo"}) + require.NoError(t, err) + + require.Len(t, client.requests, 1) + req := client.requests[0] + assert.Equal(t, "deployments/dep-1/versions/7", req.Parent) + assert.Equal(t, "resources.jobs.foo", req.ResourceKey) + assert.Equal(t, "resources.jobs.foo", req.Operation.ResourceKey) + assert.Equal(t, "job-123", req.Operation.ResourceId) + assert.Equal(t, sdkbundle.OperationStatusOperationStatusSucceeded, req.Operation.Status) +} + +func TestOperationRecorderSerializesState(t *testing.T) { + client := &fakeBundleClient{} + rec := NewOperationRecorder(client, "dep", 1) + + err := rec.record(t.Context(), "resources.jobs.foo", deployplan.Update, "job-1", map[string]any{"name": "foo", "tasks": 2}) + require.NoError(t, err) + + require.Len(t, client.requests, 1) + state := client.requests[0].Operation.State + require.NotNil(t, state) + + var decoded map[string]any + require.NoError(t, json.Unmarshal(*state, &decoded)) + assert.Equal(t, "foo", decoded["name"]) +} + +func TestOperationRecorderOmitsStateForDelete(t *testing.T) { + client := &fakeBundleClient{} + rec := NewOperationRecorder(client, "dep", 1) + + err := rec.record(t.Context(), "resources.jobs.foo", deployplan.Delete, "job-1", nil) + require.NoError(t, err) + + require.Len(t, client.requests, 1) + assert.Nil(t, client.requests[0].Operation.State) + assert.Equal(t, sdkbundle.OperationActionTypeOperationActionTypeDelete, client.requests[0].Operation.ActionType) +} + +func TestOperationRecorderPropagatesAPIError(t *testing.T) { + wantErr := errors.New("boom") + client := &fakeBundleClient{err: wantErr} + rec := NewOperationRecorder(client, "dep", 1) + + err := rec.record(t.Context(), "resources.jobs.foo", deployplan.Create, "job-1", nil) + assert.ErrorIs(t, err, wantErr) +} + +func TestDeployActionToSDKMapping(t *testing.T) { + cases := map[deployplan.ActionType]sdkbundle.OperationActionType{ + deployplan.Create: sdkbundle.OperationActionTypeOperationActionTypeCreate, + deployplan.Update: sdkbundle.OperationActionTypeOperationActionTypeUpdate, + deployplan.UpdateWithID: sdkbundle.OperationActionTypeOperationActionTypeUpdateWithId, + deployplan.Recreate: sdkbundle.OperationActionTypeOperationActionTypeRecreate, + deployplan.Resize: sdkbundle.OperationActionTypeOperationActionTypeResize, + deployplan.Delete: sdkbundle.OperationActionTypeOperationActionTypeDelete, + } + + for action, want := range cases { + got, err := deployActionToSDK(action) + require.NoError(t, err, "action: %s", action) + assert.Equal(t, want, got, "action: %s", action) + } +} + +func TestDeployActionToSDKRejectsNonMutatingActions(t *testing.T) { + for _, action := range []deployplan.ActionType{deployplan.Skip, deployplan.Undefined} { + _, err := deployActionToSDK(action) + assert.Error(t, err, "action: %s", action) + } +} diff --git a/bundle/direct/pkg.go b/bundle/direct/pkg.go index 48a9c5a2ff..8b4202e440 100644 --- a/bundle/direct/pkg.go +++ b/bundle/direct/pkg.go @@ -44,6 +44,10 @@ type DeploymentBundle struct { Plan *deployplan.Plan RemoteStateCache sync.Map StateCache structvar.Cache + + // OpRec records resource operations with the deployment metadata service + // (DMS). It is nil unless the bundle is opted into managed state. + OpRec opRecorder } // SetRemoteState updates the remote state with type validation and marks as fresh. diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index 15546880b9..0300cda408 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -81,6 +81,12 @@ func deployCore(ctx context.Context, b *bundle.Bundle, plan *deployplan.Plan, ta err error ) if targetEngine.IsDirect() { + if b.Config.Experimental != nil && b.Config.Experimental.RecordDeploymentHistory { + // TODO(DMS): source the deployment ID and version from the DMS + // CreateVersion response once the deployment-version flow is wired in. + // "abcd"/0 are placeholders until then. + b.DeploymentBundle.OpRec = direct.NewOperationRecorder(b.WorkspaceClient(ctx).Bundle, "abcd", 0) + } b.DeploymentBundle.Apply(ctx, b.WorkspaceClient(ctx), plan, direct.MigrateMode(false)) state, err = b.DeploymentBundle.StateDB.Finalize(ctx) } else { diff --git a/libs/testserver/handlers.go b/libs/testserver/handlers.go index 659f6e010f..ceced7fc71 100644 --- a/libs/testserver/handlers.go +++ b/libs/testserver/handlers.go @@ -8,6 +8,7 @@ import ( "path" "strings" + "github.com/databricks/databricks-sdk-go/service/bundle" "github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/jobs" @@ -67,6 +68,12 @@ func AddDefaultHandlers(server *Server) { } }) + // Deployment metadata service (DMS): record resource operations. The parent + // is "deployments/{id}/versions/{n}", so the path tail is captured wholesale. + server.Handle("POST", "/api/2.0/bundle/{path...}", func(req Request) any { + return bundle.Operation{} + }) + server.Handle("GET", "/api/2.0/preview/scim/v2/Me", func(req Request) any { return Response{ Headers: map[string][]string{"X-Databricks-Org-Id": {"900800700600"}},