diff --git a/README.md b/README.md index ae80647..6385dd6 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,23 @@ This repo contains a JavaScript/TypeScript SDK for use with the [Azure Durable Task Scheduler](https://github.com/Azure/Durable-Task-Scheduler). With this SDK, you can define, schedule, and manage durable orchestrations using ordinary TypeScript/JavaScript code. -> Note that this SDK is **not** currently compatible with [Azure Durable Functions](https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-overview). If you are looking for a JavaScript SDK for Azure Durable Functions, please see [this repo](https://github.com/Azure/azure-functions-durable-js). +> Note that this SDK does **not** provide the [Azure Durable Functions](https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-overview) programming model, decorators, or worker-indexing metadata. If you are looking for a JavaScript SDK for Azure Durable Functions, please see [this repo](https://github.com/Azure/azure-functions-durable-js). This package exposes low-level TaskHubSidecarService gRPC/protobuf helpers that host integrations can reuse; those helpers follow this package's Node.js 22+ requirement. + +## Low-level host integration APIs + +Host integrations that already own trigger metadata and transport encoding can depend on the `@microsoft/durabletask-js` package directly. `TaskHubGrpcWorker` registers orchestrators, activities, and entities, and can process raw TaskHubSidecarService protobuf payloads without starting the long-running gRPC worker loop: + +```typescript +const worker = new TaskHubGrpcWorker(); +worker.addOrchestrator(myOrchestrator); +worker.addActivity(myActivity); +worker.addEntity(myEntity); + +const orchestrationResponseBytes = await worker.processOrchestratorRequest(orchestrationRequestBytes); +const entityResponseBytes = await worker.processEntityBatchRequest(entityBatchRequestBytes); +``` + +`TaskHubGrpcClient` already exposes orchestration start/query/event/terminate/suspend/resume/purge APIs and entity signal/read/query/clean APIs through its existing `hostAddress` and `metadataGenerator` options. Host integrations that need task-hub routing metadata should provide it through `metadataGenerator`, keeping host-specific metadata policy outside the core client. Azure-managed scheduler connection strings remain in `@microsoft/durabletask-js-azuremanaged`. ## npm packages diff --git a/package-lock.json b/package-lock.json index edf9cdb..f9c04e0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -182,6 +182,28 @@ "node": ">=20.0.0" } }, + "node_modules/@azure/functions": { + "version": "4.16.1", + "resolved": "https://registry.npmjs.org/@azure/functions/-/functions-4.16.1.tgz", + "integrity": "sha512-A9obwC7IBg4NAmxUfTVfYEd8Xg6Px+o85JRprS3UJZt+GYYzIOmEecnFwTe3rl+aiHDewBk/8fnIVrSjR/fNGQ==", + "license": "MIT", + "dependencies": { + "@azure/functions-extensions-base": "0.3.0", + "cookie": "^0.7.0" + }, + "engines": { + "node": ">=20.0" + } + }, + "node_modules/@azure/functions-extensions-base": { + "version": "0.3.0", + "resolved": "https://registry.npmjs.org/@azure/functions-extensions-base/-/functions-extensions-base-0.3.0.tgz", + "integrity": "sha512-Cux0hLu5ZXlC/Kb+yvJVhRLIdkfFwui2HeT5oGZL00r/GCUUkhGTzRfZUjRN4Bq729mPv3okPucz2z7SMQLStA==", + "license": "MIT", + "engines": { + "node": ">=18.0" + } + }, "node_modules/@azure/identity": { "version": "4.13.1", "resolved": "https://registry.npmjs.org/@azure/identity/-/identity-4.13.1.tgz", @@ -3182,6 +3204,15 @@ "dev": true, "license": "MIT" }, + "node_modules/cookie": { + "version": "0.7.2", + "resolved": "https://registry.npmjs.org/cookie/-/cookie-0.7.2.tgz", + "integrity": "sha512-yki5XnKuf750l50uGTllt6kKILY4nQ1eNIQatoXEByZ5dWgnKqbnqmTrBE5B4N7lrMJKQ2ytWMiTO2o0v6Ew/w==", + "license": "MIT", + "engines": { + "node": ">= 0.6" + } + }, "node_modules/create-jest": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/create-jest/-/create-jest-29.7.0.tgz", @@ -3368,6 +3399,10 @@ "url": "https://dotenvx.com" } }, + "node_modules/durable-functions": { + "resolved": "packages/azure-functions-durable", + "link": true + }, "node_modules/ecdsa-sig-formatter": { "version": "1.0.11", "resolved": "https://registry.npmjs.org/ecdsa-sig-formatter/-/ecdsa-sig-formatter-1.0.11.tgz", @@ -7534,6 +7569,43 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "packages/azure-functions-durable": { + "name": "durable-functions", + "version": "4.0.0-alpha.0", + "license": "MIT", + "dependencies": { + "@azure/functions": "^4.16.1", + "@grpc/grpc-js": "^1.14.4", + "@microsoft/durabletask-js": "0.3.0" + }, + "devDependencies": { + "@types/jest": "^29.5.1", + "@types/node": "^18.16.1", + "jest": "^29.5.0", + "ts-jest": "^29.1.0", + "typescript": "^5.0.4" + }, + "engines": { + "node": ">=22.0.0" + } + }, + "packages/azure-functions-durable/node_modules/@types/node": { + "version": "18.19.130", + "resolved": "https://registry.npmjs.org/@types/node/-/node-18.19.130.tgz", + "integrity": "sha512-GRaXQx6jGfL8sKfaIDD6OupbIHBr9jv7Jnaml9tB7l4v068PAOXqfcujMMo5PhbIs6ggR1XODELqahT2R8v0fg==", + "dev": true, + "license": "MIT", + "dependencies": { + "undici-types": "~5.26.4" + } + }, + "packages/azure-functions-durable/node_modules/undici-types": { + "version": "5.26.5", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-5.26.5.tgz", + "integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==", + "dev": true, + "license": "MIT" + }, "packages/durabletask-js": { "name": "@microsoft/durabletask-js", "version": "0.3.0", diff --git a/packages/azure-functions-durable/CHANGELOG.md b/packages/azure-functions-durable/CHANGELOG.md new file mode 100644 index 0000000..bf22902 --- /dev/null +++ b/packages/azure-functions-durable/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog + +## 4.0.0-alpha.0 + +- Added the initial gRPC-consolidated Azure Functions Durable provider package. +- Added `DurableFunctionsClient`, a direct `TaskHubGrpcClient` subclass for host-provided gRPC client bindings. +- Added Functions HTTP management payload helpers, worker byte-processing adapter, and `durableRequiresGrpc` binding metadata helper. diff --git a/packages/azure-functions-durable/LICENSE b/packages/azure-functions-durable/LICENSE new file mode 100644 index 0000000..22aed37 --- /dev/null +++ b/packages/azure-functions-durable/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) Microsoft Corporation. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/packages/azure-functions-durable/README.md b/packages/azure-functions-durable/README.md new file mode 100644 index 0000000..007bb2a --- /dev/null +++ b/packages/azure-functions-durable/README.md @@ -0,0 +1,81 @@ +# durable-functions + +Azure Functions Durable provider for the Durable Task JavaScript SDK. + +This package is the gRPC-consolidated Durable Functions provider for JavaScript. It builds on `@microsoft/durabletask-js` and is intended to supersede the legacy `durable-functions` package at a new major version. + +## Phase 1 scope + +This preview includes the low-level host integration pieces: + +- `DurableFunctionsClient`, a direct subclass of `TaskHubGrpcClient`. +- HTTP management payload helpers for Durable HTTP starter responses. +- `DurableFunctionsWorker`, which accepts base64-encoded protobuf work-item payloads from the Functions host and delegates execution to the core worker byte processors. +- `addDurableGrpcMetadata`, which stamps `durableRequiresGrpc: true` onto durable trigger and client binding metadata. + +The full Durable Functions JavaScript authoring model is not included yet. Use `@microsoft/durabletask-js` APIs directly for orchestrator, activity, and entity implementations in this phase. + +## Client binding + +The Functions host passes a JSON client-binding payload to the app. Construct the client from that payload: + +```typescript +import { DurableFunctionsClient } from "durable-functions"; + +const client = new DurableFunctionsClient(clientBindingJson); +const instanceId = await client.scheduleNewOrchestration("hello", { name: "Durable" }); +return client.createCheckStatusResponse(request, instanceId); +``` + +`DurableFunctionsClient` extends `TaskHubGrpcClient`, so orchestration and entity management methods come from the core SDK by inheritance. The only Functions-specific public helpers are: + +- `createCheckStatusResponse(request, instanceId)` +- `createHttpManagementPayload(request, instanceId)` + +Both helpers derive the management endpoint from the incoming HTTP request origin: + +```text +{scheme}://{host}/runtime/webhooks/durabletask/instances/{instanceId} +``` + +## gRPC metadata + +The client mirrors the Python Azure Functions Durable provider interceptor by adding per-call gRPC metadata: + +- `taskhub`: the task hub name from the host client-binding payload. +- `x-user-agent`: the package user agent. gRPC reserves `user-agent`, so this package uses `x-user-agent`. + +The host-provided `requiredQueryStringParameters` value is used for HTTP management URLs. Python PR #155 passes it to the interceptor constructor but does not emit it as gRPC metadata; this package keeps the same behavior. + +## Serialization + +This package intentionally does not port Python's `DEFAULT_FUNCTIONS_DATA_CONVERTER`. The JavaScript core client and worker already serialize payloads at the protobuf string boundary with plain `JSON.stringify` and `JSON.parse`, which matches the Azure Functions JavaScript worker's plain JSON payload contract for this gRPC path. + +## Worker adapter + +`DurableFunctionsWorker` extends `TaskHubGrpcWorker` and adds base64 helpers for the Functions host's middleware-passthrough payloads: + +```typescript +const worker = new DurableFunctionsWorker(); +worker.addOrchestrator(myOrchestrator); + +const encodedResponse = await worker.handleOrchestratorRequest(encodedRequest); +``` + +## Phase 2 plan + +Phase 2 will port the full Durable Functions JavaScript authoring surface onto the core SDK. Planned work: + +- `src/app.ts`: add `DFApp` and `Blueprint` equivalents that mirror Python `decorators/durable_app.py` and register Azure Functions v4 handlers. +- `src/decorators/`: add durable trigger and durable client binding helpers that call `addDurableGrpcMetadata` and emit `durableRequiresGrpc: true`. +- `src/orchestrator.ts`: add an `Orchestrator` wrapper that converts Functions invocation payloads into `DurableFunctionsWorker.handleOrchestratorRequest` calls. +- `src/entity.ts`: add entity handler glue over `DurableFunctionsWorker.handleEntityBatchRequest`. +- `src/input.ts`: add a durable client input helper that constructs `DurableFunctionsClient` from the host binding payload. +- `test/authoring/`: add parity tests for `DFApp`, `Blueprint`, orchestration trigger registration, entity trigger registration, durable client input registration, and generated binding metadata. + +Open questions for the Functions extension team: + +- Confirm the exact JavaScript client-binding payload field set and whether `rpcBaseUrl` is always an absolute URL with scheme. +- Confirm whether `requiredQueryStringParameters` always includes any required `taskHub` and `connection` HTTP routing parameters. This Phase 1 port mirrors Python PR #155 and appends only that host-provided string to management URLs. +- Confirm whether `requiredQueryStringParameters` should ever be emitted as gRPC metadata; Python stores it on the interceptor but only sends `taskhub` and `x-user-agent`. +- Confirm whether the local gRPC sidecar remains plaintext-only for JavaScript (`useTLS: false`) in all supported Functions hosting modes. diff --git a/packages/azure-functions-durable/jest.config.js b/packages/azure-functions-durable/jest.config.js new file mode 100644 index 0000000..a830572 --- /dev/null +++ b/packages/azure-functions-durable/jest.config.js @@ -0,0 +1,12 @@ +module.exports = { + preset: "ts-jest", + testEnvironment: "node", + testMatch: ["**/test/**/*.spec.ts"], + moduleFileExtensions: ["ts", "tsx", "js", "jsx", "json", "node"], + transform: { + "^.+\\.ts$": "ts-jest", + }, + moduleNameMapper: { + "^@microsoft/durabletask-js$": "/../durabletask-js/src/index.ts", + }, +}; diff --git a/packages/azure-functions-durable/package.json b/packages/azure-functions-durable/package.json new file mode 100644 index 0000000..128ca9f --- /dev/null +++ b/packages/azure-functions-durable/package.json @@ -0,0 +1,63 @@ +{ + "name": "durable-functions", + "version": "4.0.0-alpha.0", + "description": "Azure Functions Durable provider for the Durable Task JavaScript SDK", + "main": "./dist/index.js", + "types": "./dist/index.d.ts", + "exports": { + ".": { + "types": "./dist/index.d.ts", + "require": "./dist/index.js", + "import": "./dist/index.js" + } + }, + "files": [ + "dist", + "LICENSE", + "README.md", + "CHANGELOG.md" + ], + "scripts": { + "clean": "node -e \"require('fs').rmSync('dist', {recursive:true, force:true})\"", + "prebuild": "node -p \"'// Auto-generated by prebuild\\nexport const SDK_VERSION = ' + JSON.stringify(require('./package.json').version) + ';\\nexport const SDK_PACKAGE_NAME = ' + JSON.stringify(require('./package.json').name) + ';'\" > src/version.ts", + "build:core": "npm run build -w @microsoft/durabletask-js", + "build": "npm run prebuild && npm run clean && npm run build:core && tsc -p tsconfig.build.json", + "test": "jest --runInBand --detectOpenHandles", + "test:unit": "jest test/unit --runInBand --detectOpenHandles", + "prepublishOnly": "npm run build && npm run test" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/microsoft/durabletask-js.git", + "directory": "packages/azure-functions-durable" + }, + "keywords": [ + "azure-functions", + "durable-functions", + "durabletask", + "orchestration", + "workflow", + "grpc" + ], + "author": "Microsoft", + "license": "MIT", + "bugs": { + "url": "https://github.com/microsoft/durabletask-js/issues" + }, + "homepage": "https://github.com/microsoft/durabletask-js/tree/main/packages/azure-functions-durable#readme", + "engines": { + "node": ">=22.0.0" + }, + "dependencies": { + "@azure/functions": "^4.16.1", + "@grpc/grpc-js": "^1.14.4", + "@microsoft/durabletask-js": "0.3.0" + }, + "devDependencies": { + "@types/jest": "^29.5.1", + "@types/node": "^18.16.1", + "jest": "^29.5.0", + "ts-jest": "^29.1.0", + "typescript": "^5.0.4" + } +} diff --git a/packages/azure-functions-durable/src/client.ts b/packages/azure-functions-durable/src/client.ts new file mode 100644 index 0000000..1ae7120 --- /dev/null +++ b/packages/azure-functions-durable/src/client.ts @@ -0,0 +1,179 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { HttpRequest, HttpResponse } from "@azure/functions"; +import { TaskHubGrpcClient } from "@microsoft/durabletask-js"; +import { + HttpManagementPayload, + createHttpManagementPayload as createPayload, +} from "./http-management-payload"; +import { createAzureFunctionsMetadataGenerator } from "./metadata"; + +export interface DurableFunctionsClientConfig { + taskHubName?: string; + connectionName?: string; + creationUrls?: Record; + managementUrls?: Record; + baseUrl?: string; + requiredQueryStringParameters?: string; + rpcBaseUrl?: string; + httpBaseUrl?: string; + maxGrpcMessageSizeInBytes?: number; + grpcHttpClientTimeout?: unknown; +} + +export type DurableFunctionsClientInput = string | DurableFunctionsClientConfig; + +export class DurableFunctionsClient extends TaskHubGrpcClient { + public readonly taskHubName: string; + public readonly connectionName: string; + public readonly creationUrls: Record; + public readonly managementUrls: Record; + public readonly baseUrl: string; + public readonly requiredQueryStringParameters: string; + public readonly rpcBaseUrl: string; + public readonly httpBaseUrl: string; + public readonly maxGrpcMessageSizeInBytes: number; + public readonly grpcHttpClientTimeout: unknown; + + constructor(clientConfig: DurableFunctionsClientInput) { + const config = parseClientConfig(clientConfig); + const taskHubName = config.taskHubName ?? ""; + const requiredQueryStringParameters = config.requiredQueryStringParameters ?? ""; + const rpcBaseUrl = requireString(config.rpcBaseUrl, "rpcBaseUrl"); + + super({ + hostAddress: getGrpcHostAddress(rpcBaseUrl), + useTLS: false, + metadataGenerator: createAzureFunctionsMetadataGenerator(taskHubName), + }); + + this.taskHubName = taskHubName; + this.connectionName = config.connectionName ?? ""; + this.creationUrls = config.creationUrls ?? {}; + this.managementUrls = config.managementUrls ?? {}; + this.baseUrl = config.baseUrl ?? ""; + this.requiredQueryStringParameters = requiredQueryStringParameters; + this.rpcBaseUrl = rpcBaseUrl; + this.httpBaseUrl = config.httpBaseUrl ?? ""; + this.maxGrpcMessageSizeInBytes = config.maxGrpcMessageSizeInBytes ?? 0; + this.grpcHttpClientTimeout = config.grpcHttpClientTimeout; + } + + createCheckStatusResponse(request: HttpRequest, instanceId: string): HttpResponse { + const payload = this.createHttpManagementPayload(request, instanceId); + + return new HttpResponse({ + status: 202, + body: JSON.stringify(payload), + headers: { + "content-type": "application/json", + Location: payload.statusQueryGetUri, + }, + }); + } + + createHttpManagementPayload(request: HttpRequest, instanceId: string): HttpManagementPayload { + const instanceStatusUrl = getInstanceStatusUrl(request, instanceId); + return createPayload(instanceId, instanceStatusUrl, this.requiredQueryStringParameters); + } +} + +export function getGrpcHostAddress(rpcBaseUrl: string): string { + try { + const hostAddress = new URL(rpcBaseUrl).host; + if (!hostAddress) { + throw new Error("rpcBaseUrl must include a host."); + } + return hostAddress; + } catch (e) { + throw new Error(`Invalid Durable Functions rpcBaseUrl: ${rpcBaseUrl}`, { cause: e }); + } +} + +function getInstanceStatusUrl(request: HttpRequest, instanceId: string): string { + const requestUrl = new URL(request.url); + const encodedInstanceId = encodeURIComponent(instanceId); + return `${requestUrl.protocol}//${requestUrl.host}/runtime/webhooks/durabletask/instances/${encodedInstanceId}`; +} + +function parseClientConfig(clientConfig: DurableFunctionsClientInput): DurableFunctionsClientConfig { + const value: unknown = typeof clientConfig === "string" ? JSON.parse(clientConfig) : clientConfig; + const record = requireRecord(value, "Durable Functions client configuration"); + + return { + taskHubName: optionalString(record, "taskHubName"), + connectionName: optionalString(record, "connectionName"), + creationUrls: optionalStringRecord(record, "creationUrls"), + managementUrls: optionalStringRecord(record, "managementUrls"), + baseUrl: optionalString(record, "baseUrl"), + requiredQueryStringParameters: optionalString(record, "requiredQueryStringParameters"), + rpcBaseUrl: optionalString(record, "rpcBaseUrl"), + httpBaseUrl: optionalString(record, "httpBaseUrl"), + maxGrpcMessageSizeInBytes: optionalNumber(record, "maxGrpcMessageSizeInBytes"), + grpcHttpClientTimeout: record.grpcHttpClientTimeout, + }; +} + +function requireString(value: string | undefined, name: string): string { + if (!value) { + throw new TypeError(`Durable Functions client configuration is missing ${name}.`); + } + + return value; +} + +function requireRecord(value: unknown, name: string): Record { + if (value === null || typeof value !== "object" || Array.isArray(value)) { + throw new TypeError(`${name} must be a JSON object.`); + } + + return value as Record; +} + +function optionalString(record: Record, name: string): string | undefined { + const value = record[name]; + if (value === undefined || value === null) { + return undefined; + } + if (typeof value !== "string") { + throw new TypeError(`Durable Functions client configuration field ${name} must be a string.`); + } + + return value; +} + +function optionalNumber(record: Record, name: string): number | undefined { + const value = record[name]; + if (value === undefined || value === null) { + return undefined; + } + if (typeof value !== "number") { + throw new TypeError(`Durable Functions client configuration field ${name} must be a number.`); + } + + return value; +} + +function optionalStringRecord( + record: Record, + name: string, +): Record | undefined { + const value = record[name]; + if (value === undefined || value === null) { + return undefined; + } + + const valueRecord = requireRecord(value, `Durable Functions client configuration field ${name}`); + const result: Record = {}; + for (const [key, entry] of Object.entries(valueRecord)) { + if (typeof entry !== "string") { + throw new TypeError( + `Durable Functions client configuration field ${name}.${key} must be a string.`, + ); + } + result[key] = entry; + } + + return result; +} diff --git a/packages/azure-functions-durable/src/durable-grpc.ts b/packages/azure-functions-durable/src/durable-grpc.ts new file mode 100644 index 0000000..47b5058 --- /dev/null +++ b/packages/azure-functions-durable/src/durable-grpc.ts @@ -0,0 +1,13 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +export type DurableBindingMetadata = Record; + +export function addDurableGrpcMetadata( + binding: TBinding, +): TBinding & { durableRequiresGrpc: true } { + return { + ...binding, + durableRequiresGrpc: true, + }; +} diff --git a/packages/azure-functions-durable/src/http-management-payload.ts b/packages/azure-functions-durable/src/http-management-payload.ts new file mode 100644 index 0000000..640d7ad --- /dev/null +++ b/packages/azure-functions-durable/src/http-management-payload.ts @@ -0,0 +1,42 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +export interface HttpManagementPayload { + id: string; + purgeHistoryDeleteUri: string; + restartPostUri: string; + rewindPostUri: string; + sendEventPostUri: string; + statusQueryGetUri: string; + terminatePostUri: string; + resumePostUri: string; + suspendPostUri: string; +} + +export function createHttpManagementPayload( + instanceId: string, + instanceStatusUrl: string, + requiredQueryStringParameters: string, +): HttpManagementPayload { + const queryString = normalizeQueryString(requiredQueryStringParameters); + const querySuffix = queryString ? `?${queryString}` : ""; + const reasonQuerySuffix = queryString ? `?reason={text}&${queryString}` : "?reason={text}"; + + return { + id: instanceId, + purgeHistoryDeleteUri: `${instanceStatusUrl}${querySuffix}`, + restartPostUri: `${instanceStatusUrl}/restart${querySuffix}`, + rewindPostUri: `${instanceStatusUrl}/rewind${reasonQuerySuffix}`, + sendEventPostUri: `${instanceStatusUrl}/raiseEvent/{eventName}${querySuffix}`, + statusQueryGetUri: `${instanceStatusUrl}${querySuffix}`, + terminatePostUri: `${instanceStatusUrl}/terminate${reasonQuerySuffix}`, + resumePostUri: `${instanceStatusUrl}/resume${reasonQuerySuffix}`, + suspendPostUri: `${instanceStatusUrl}/suspend${reasonQuerySuffix}`, + }; +} + +function normalizeQueryString(requiredQueryStringParameters: string): string { + return requiredQueryStringParameters.startsWith("?") + ? requiredQueryStringParameters.slice(1) + : requiredQueryStringParameters; +} diff --git a/packages/azure-functions-durable/src/index.ts b/packages/azure-functions-durable/src/index.ts new file mode 100644 index 0000000..d71d89a --- /dev/null +++ b/packages/azure-functions-durable/src/index.ts @@ -0,0 +1,13 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +export { + DurableFunctionsClient, + DurableFunctionsClientConfig, + DurableFunctionsClientInput, + getGrpcHostAddress, +} from "./client"; +export { HttpManagementPayload } from "./http-management-payload"; +export { createAzureFunctionsMetadataGenerator } from "./metadata"; +export { DurableFunctionsWorker } from "./worker"; +export { DurableBindingMetadata, addDurableGrpcMetadata } from "./durable-grpc"; diff --git a/packages/azure-functions-durable/src/metadata.ts b/packages/azure-functions-durable/src/metadata.ts new file mode 100644 index 0000000..6f84a8b --- /dev/null +++ b/packages/azure-functions-durable/src/metadata.ts @@ -0,0 +1,19 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import * as grpc from "@grpc/grpc-js"; +import { MetadataGenerator } from "@microsoft/durabletask-js"; +import { getUserAgent } from "./user-agent"; + +export function createAzureFunctionsMetadataGenerator( + taskHubName: string, +): MetadataGenerator { + const userAgent = getUserAgent(); + + return async (): Promise => { + const metadata = new grpc.Metadata(); + metadata.set("taskhub", taskHubName); + metadata.set("x-user-agent", userAgent); + return metadata; + }; +} diff --git a/packages/azure-functions-durable/src/user-agent.ts b/packages/azure-functions-durable/src/user-agent.ts new file mode 100644 index 0000000..fe69113 --- /dev/null +++ b/packages/azure-functions-durable/src/user-agent.ts @@ -0,0 +1,23 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +const SDK_NAME = "durable-functions"; + +let packageVersion = "unknown"; + +function getPackageVersion(): string { + if (packageVersion === "unknown") { + try { + const pkg = require("../package.json"); + packageVersion = pkg.version ?? "unknown"; + } catch { + // Keep the fallback when package metadata is unavailable. + } + } + + return packageVersion; +} + +export function getUserAgent(): string { + return `${SDK_NAME}/${getPackageVersion()}`; +} diff --git a/packages/azure-functions-durable/src/worker.ts b/packages/azure-functions-durable/src/worker.ts new file mode 100644 index 0000000..4eb2bc8 --- /dev/null +++ b/packages/azure-functions-durable/src/worker.ts @@ -0,0 +1,30 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { TaskHubGrpcWorker, TaskHubGrpcWorkerOptions } from "@microsoft/durabletask-js"; + +export class DurableFunctionsWorker extends TaskHubGrpcWorker { + constructor(options: TaskHubGrpcWorkerOptions = {}) { + super(options); + } + + async handleOrchestratorRequest(encodedRequest: string): Promise { + const request = decodeBase64Request(encodedRequest, "orchestrator"); + const response = await this.processOrchestratorRequest(request); + return Buffer.from(response).toString("base64"); + } + + async handleEntityBatchRequest(encodedRequest: string): Promise { + const request = decodeBase64Request(encodedRequest, "entity batch"); + const response = await this.processEntityBatchRequest(request); + return Buffer.from(response).toString("base64"); + } +} + +function decodeBase64Request(encodedRequest: string, requestType: string): Buffer { + if (!encodedRequest) { + throw new TypeError(`${requestType} request must be a non-empty base64 string.`); + } + + return Buffer.from(encodedRequest, "base64"); +} diff --git a/packages/azure-functions-durable/test/unit/client.spec.ts b/packages/azure-functions-durable/test/unit/client.spec.ts new file mode 100644 index 0000000..d595bc7 --- /dev/null +++ b/packages/azure-functions-durable/test/unit/client.spec.ts @@ -0,0 +1,111 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { HttpRequest } from "@azure/functions"; +import { TaskHubGrpcClient } from "@microsoft/durabletask-js"; +import { DurableFunctionsClient, getGrpcHostAddress } from "../../src/client"; +import { createAzureFunctionsMetadataGenerator } from "../../src/metadata"; + +const CLIENT_CONFIG = { + taskHubName: "functions-taskhub", + rpcBaseUrl: "http://127.0.0.1:4711/rpc", + requiredQueryStringParameters: "code=secret&taskHub=functions-taskhub", + httpBaseUrl: "https://ignored.example/runtime/webhooks/durabletask", +}; + +describe("DurableFunctionsClient", () => { + it("derives the gRPC host address from rpcBaseUrl", () => { + expect(getGrpcHostAddress("http://127.0.0.1:4711/rpc")).toBe("127.0.0.1:4711"); + expect(getGrpcHostAddress("https://localhost:9443")).toBe("localhost:9443"); + }); + + it("extends TaskHubGrpcClient and does not redeclare management methods", async () => { + const client = new DurableFunctionsClient(JSON.stringify(CLIENT_CONFIG)); + + try { + expect(client).toBeInstanceOf(TaskHubGrpcClient); + expect(typeof client.scheduleNewOrchestration).toBe("function"); + expect(typeof client.getOrchestrationState).toBe("function"); + expect(typeof client.raiseOrchestrationEvent).toBe("function"); + expect(typeof client.terminateOrchestration).toBe("function"); + expect(typeof client.suspendOrchestration).toBe("function"); + expect(typeof client.resumeOrchestration).toBe("function"); + expect(typeof client.purgeOrchestration).toBe("function"); + expect(typeof client.signalEntity).toBe("function"); + expect(typeof client.getEntity).toBe("function"); + expect(Object.getOwnPropertyNames(DurableFunctionsClient.prototype).sort()).toEqual([ + "constructor", + "createCheckStatusResponse", + "createHttpManagementPayload", + ]); + } finally { + await client.stop(); + } + }); + + it("creates HTTP management payload URLs from the incoming request", async () => { + const client = new DurableFunctionsClient(CLIENT_CONFIG); + const request = new HttpRequest({ + method: "POST", + url: "https://public.example/api/start?ignored=true", + }); + + try { + const payload = client.createHttpManagementPayload(request, "instance 1"); + + expect(payload).toEqual({ + id: "instance 1", + purgeHistoryDeleteUri: + "https://public.example/runtime/webhooks/durabletask/instances/instance%201?code=secret&taskHub=functions-taskhub", + restartPostUri: + "https://public.example/runtime/webhooks/durabletask/instances/instance%201/restart?code=secret&taskHub=functions-taskhub", + rewindPostUri: + "https://public.example/runtime/webhooks/durabletask/instances/instance%201/rewind?reason={text}&code=secret&taskHub=functions-taskhub", + sendEventPostUri: + "https://public.example/runtime/webhooks/durabletask/instances/instance%201/raiseEvent/{eventName}?code=secret&taskHub=functions-taskhub", + statusQueryGetUri: + "https://public.example/runtime/webhooks/durabletask/instances/instance%201?code=secret&taskHub=functions-taskhub", + terminatePostUri: + "https://public.example/runtime/webhooks/durabletask/instances/instance%201/terminate?reason={text}&code=secret&taskHub=functions-taskhub", + resumePostUri: + "https://public.example/runtime/webhooks/durabletask/instances/instance%201/resume?reason={text}&code=secret&taskHub=functions-taskhub", + suspendPostUri: + "https://public.example/runtime/webhooks/durabletask/instances/instance%201/suspend?reason={text}&code=secret&taskHub=functions-taskhub", + }); + } finally { + await client.stop(); + } + }); + + it("creates 202 check status responses with Location and JSON body", async () => { + const client = new DurableFunctionsClient(CLIENT_CONFIG); + const request = new HttpRequest({ + method: "POST", + url: "http://localhost:7071/api/orchestrators/hello", + }); + + try { + const response = client.createCheckStatusResponse(request, "abc"); + + expect(response.status).toBe(202); + expect(response.headers.get("content-type")).toBe("application/json"); + expect(response.headers.get("Location")).toBe( + "http://localhost:7071/runtime/webhooks/durabletask/instances/abc?code=secret&taskHub=functions-taskhub", + ); + const body = JSON.parse(await response.text()); + expect(body.statusQueryGetUri).toBe( + "http://localhost:7071/runtime/webhooks/durabletask/instances/abc?code=secret&taskHub=functions-taskhub", + ); + } finally { + await client.stop(); + } + }); + + it("mirrors the Azure Functions gRPC metadata interceptor", async () => { + const metadata = await createAzureFunctionsMetadataGenerator("functions-taskhub")(); + + expect(metadata.get("taskhub")).toEqual(["functions-taskhub"]); + expect(metadata.get("x-user-agent")[0]).toMatch(/^durable-functions\//); + expect(metadata.get("requiredQueryStringParameters")).toEqual([]); + }); +}); diff --git a/packages/azure-functions-durable/test/unit/durable-grpc.spec.ts b/packages/azure-functions-durable/test/unit/durable-grpc.spec.ts new file mode 100644 index 0000000..89d01ac --- /dev/null +++ b/packages/azure-functions-durable/test/unit/durable-grpc.spec.ts @@ -0,0 +1,19 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { addDurableGrpcMetadata } from "../../src/durable-grpc"; + +describe("addDurableGrpcMetadata", () => { + it("adds durableRequiresGrpc without mutating the original binding", () => { + const binding = { type: "orchestrationTrigger", name: "context" }; + + const actual = addDurableGrpcMetadata(binding); + + expect(actual).toEqual({ + type: "orchestrationTrigger", + name: "context", + durableRequiresGrpc: true, + }); + expect(binding).toEqual({ type: "orchestrationTrigger", name: "context" }); + }); +}); diff --git a/packages/azure-functions-durable/test/unit/worker.spec.ts b/packages/azure-functions-durable/test/unit/worker.spec.ts new file mode 100644 index 0000000..7e32302 --- /dev/null +++ b/packages/azure-functions-durable/test/unit/worker.spec.ts @@ -0,0 +1,48 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { DurableFunctionsWorker } from "../../src/worker"; + +describe("DurableFunctionsWorker", () => { + it("decodes base64, delegates to processOrchestratorRequest, and re-encodes the response", async () => { + const worker = new DurableFunctionsWorker(); + const responseBytes = Buffer.from("orchestrator response"); + const processOrchestratorRequest = jest + .spyOn(worker, "processOrchestratorRequest") + .mockResolvedValue(responseBytes); + + const actual = await worker.handleOrchestratorRequest( + Buffer.from("orchestrator request").toString("base64"), + ); + + expect(actual).toBe(responseBytes.toString("base64")); + expect(processOrchestratorRequest).toHaveBeenCalledTimes(1); + expect(Buffer.from(processOrchestratorRequest.mock.calls[0][0]).toString()).toBe( + "orchestrator request", + ); + }); + + it("decodes base64, delegates to processEntityBatchRequest, and re-encodes the response", async () => { + const worker = new DurableFunctionsWorker(); + const responseBytes = Buffer.from("entity batch response"); + const processEntityBatchRequest = jest + .spyOn(worker, "processEntityBatchRequest") + .mockResolvedValue(responseBytes); + + const actual = await worker.handleEntityBatchRequest( + Buffer.from("entity batch request").toString("base64"), + ); + + expect(actual).toBe(responseBytes.toString("base64")); + expect(processEntityBatchRequest).toHaveBeenCalledTimes(1); + expect(Buffer.from(processEntityBatchRequest.mock.calls[0][0]).toString()).toBe( + "entity batch request", + ); + }); + + it("rejects empty base64 requests", async () => { + const worker = new DurableFunctionsWorker(); + + await expect(worker.handleOrchestratorRequest("")).rejects.toThrow(TypeError); + }); +}); diff --git a/packages/azure-functions-durable/tsconfig.build.json b/packages/azure-functions-durable/tsconfig.build.json new file mode 100644 index 0000000..b309342 --- /dev/null +++ b/packages/azure-functions-durable/tsconfig.build.json @@ -0,0 +1,13 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "rootDir": "./src", + "outDir": "./dist", + "baseUrl": ".", + "paths": { + "@microsoft/durabletask-js": ["../durabletask-js/dist/index"] + } + }, + "include": ["src"], + "exclude": ["node_modules", "dist", "test"] +} diff --git a/packages/azure-functions-durable/tsconfig.json b/packages/azure-functions-durable/tsconfig.json new file mode 100644 index 0000000..0180107 --- /dev/null +++ b/packages/azure-functions-durable/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "rootDir": "./src", + "outDir": "./dist" + }, + "include": ["src"], + "exclude": ["node_modules", "dist", "test"] +} diff --git a/packages/durabletask-js/src/worker/task-hub-grpc-worker.ts b/packages/durabletask-js/src/worker/task-hub-grpc-worker.ts index 9fec6f3..1ce8cf3 100644 --- a/packages/durabletask-js/src/worker/task-hub-grpc-worker.ts +++ b/packages/durabletask-js/src/worker/task-hub-grpc-worker.ts @@ -313,6 +313,58 @@ export class TaskHubGrpcWorker { return name.toLowerCase(); } + /** + * Processes a single serialized TaskHubSidecarService OrchestratorRequest and + * returns the serialized OrchestratorResponse. + * + * @param request - The protobuf-encoded OrchestratorRequest bytes. + * @returns The protobuf-encoded OrchestratorResponse bytes. + * + * @remarks + * This is intended for host integrations, such as Azure Functions, that drive a + * single orchestration work item per invocation instead of running the + * long-lived gRPC worker loop. It reuses the same execution path as the worker + * loop, capturing the response in-process rather than completing it over gRPC. + * Host integrations own any transport-specific encoding (for example base64). + */ + async processOrchestratorRequest(request: Uint8Array): Promise { + const req = pb.OrchestratorRequest.deserializeBinary(request); + const stub = new CapturingSidecarStub(); + await this._executeOrchestratorInternal(req, "", stub as unknown as stubs.TaskHubSidecarServiceClient); + + if (!stub.orchestratorResponse) { + throw new Error("Orchestrator execution did not produce a response."); + } + + return stub.orchestratorResponse.serializeBinary(); + } + + /** + * Processes a single serialized TaskHubSidecarService EntityBatchRequest and + * returns the serialized EntityBatchResult. + * + * @param request - The protobuf-encoded EntityBatchRequest bytes. + * @returns The protobuf-encoded EntityBatchResult bytes. + * + * @remarks + * This is intended for host integrations, such as Azure Functions, that drive a + * single entity batch work item per invocation instead of running the + * long-lived gRPC worker loop. It reuses the same execution path as the worker + * loop, capturing the result in-process rather than completing it over gRPC. + * Host integrations own any transport-specific encoding (for example base64). + */ + async processEntityBatchRequest(request: Uint8Array): Promise { + const req = pb.EntityBatchRequest.deserializeBinary(request); + const stub = new CapturingSidecarStub(); + await this._executeEntityInternal(req, "", stub as unknown as stubs.TaskHubSidecarServiceClient); + + if (!stub.entityResult) { + throw new Error("Entity batch execution did not produce a result."); + } + + return stub.entityResult.serializeBinary(); + } + /** * In node.js we don't require a new thread as we have a main event loop * Therefore, we open the stream and simply listen through the eventemitter behind the scenes @@ -1157,3 +1209,46 @@ export class TaskHubGrpcWorker { } } } + +/** + * A minimal in-process stand-in for the TaskHubSidecarService client that captures the + * completion payload instead of sending it over gRPC. + * + * @remarks + * This lets host integrations reuse the worker's existing execution path for a single work + * item (see {@link TaskHubGrpcWorker.processOrchestratorRequest} and + * {@link TaskHubGrpcWorker.processEntityBatchRequest}) without opening a gRPC channel. Only the + * completion/abandon methods used by those execution paths are implemented. + */ +class CapturingSidecarStub { + orchestratorResponse?: pb.OrchestratorResponse; + entityResult?: pb.EntityBatchResult; + abandonRequest?: pb.AbandonOrchestrationTaskRequest; + + completeOrchestratorTask( + request: pb.OrchestratorResponse, + _metadata: grpc.Metadata, + callback: (error: grpc.ServiceError | null, response: Empty) => void, + ): void { + this.orchestratorResponse = request; + callback(null, new Empty()); + } + + completeEntityTask( + request: pb.EntityBatchResult, + _metadata: grpc.Metadata, + callback: (error: grpc.ServiceError | null, response: Empty) => void, + ): void { + this.entityResult = request; + callback(null, new Empty()); + } + + abandonTaskOrchestratorWorkItem( + request: pb.AbandonOrchestrationTaskRequest, + _metadata: grpc.Metadata, + callback: (error: grpc.ServiceError | null, response: Empty) => void, + ): void { + this.abandonRequest = request; + callback(null, new Empty()); + } +} diff --git a/packages/durabletask-js/test/functions-grpc-support.spec.ts b/packages/durabletask-js/test/functions-grpc-support.spec.ts new file mode 100644 index 0000000..21f6d75 --- /dev/null +++ b/packages/durabletask-js/test/functions-grpc-support.spec.ts @@ -0,0 +1,68 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { OrchestrationContext, TaskEntity, TaskHubGrpcWorker, TOrchestrator } from "../src"; +import * as pb from "../src/proto/orchestrator_service_pb"; +import { newExecutionStartedEvent, newOrchestratorStartedEvent } from "../src/utils/pb-helper.util"; +import { NoOpLogger } from "../src/types/logger.type"; + +const TEST_INSTANCE_ID = "functions-grpc-instance"; + +class CounterEntity extends TaskEntity { + increment(): number { + this.state++; + return this.state; + } + + protected initializeState(): number { + return 0; + } +} + +describe("Functions gRPC support surface", () => { + it("processes a single serialized orchestration request without the gRPC worker loop", async () => { + const worker = new TaskHubGrpcWorker({ logger: new NoOpLogger() }); + const orchestrator: TOrchestrator = async (_ctx: OrchestrationContext) => "done"; + const name = worker.addOrchestrator(orchestrator); + + const request = new pb.OrchestratorRequest(); + request.setInstanceid(TEST_INSTANCE_ID); + request.setNeweventsList([ + newOrchestratorStartedEvent(new Date("2026-01-01T00:00:00.000Z")), + newExecutionStartedEvent(name, TEST_INSTANCE_ID, undefined), + ]); + + const responseBytes = await worker.processOrchestratorRequest(request.serializeBinary()); + const response = pb.OrchestratorResponse.deserializeBinary(responseBytes); + + expect(response.getInstanceid()).toBe(TEST_INSTANCE_ID); + const completed = response.getActionsList()[0].getCompleteorchestration(); + expect(completed?.getOrchestrationstatus()).toBe(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(completed?.getResult()?.getValue()).toBe('"done"'); + }); + + it("processes a single serialized entity batch request without the gRPC worker loop", async () => { + const worker = new TaskHubGrpcWorker({ logger: new NoOpLogger() }); + worker.addNamedEntity("counter", () => new CounterEntity()); + + const request = createEntityBatchRequest("counter", "key1"); + const responseBytes = await worker.processEntityBatchRequest(request.serializeBinary()); + const response = pb.EntityBatchResult.deserializeBinary(responseBytes); + + expect(response.getResultsList()).toHaveLength(1); + expect(response.getResultsList()[0].getSuccess()?.getResult()?.getValue()).toBe("1"); + expect(response.getEntitystate()?.getValue()).toBe("1"); + }); +}); + +function createEntityBatchRequest(entityName: string, entityKey: string): pb.EntityBatchRequest { + const request = new pb.EntityBatchRequest(); + request.setInstanceid(`@${entityName}@${entityKey}`); + + const operation = new pb.OperationRequest(); + operation.setOperation("increment"); + operation.setRequestid("req-1"); + request.setOperationsList([operation]); + + return request; +}