From c4be5d492664e805fd27d5398435efaf1c73855f Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Sat, 23 May 2026 19:03:01 +0530 Subject: [PATCH 1/5] Add example DAGs for AIP-103 task state and asset state --- .../example_dags/example_asset_state.py | 100 ++++++++++++++++++ .../example_dags/example_task_state.py | 90 ++++++++++++++++ 2 files changed, 190 insertions(+) create mode 100644 airflow-core/src/airflow/example_dags/example_asset_state.py create mode 100644 airflow-core/src/airflow/example_dags/example_task_state.py diff --git a/airflow-core/src/airflow/example_dags/example_asset_state.py b/airflow-core/src/airflow/example_dags/example_asset_state.py new file mode 100644 index 0000000000000..d1b1855ec2cf5 --- /dev/null +++ b/airflow-core/src/airflow/example_dags/example_asset_state.py @@ -0,0 +1,100 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Dag that demonstrates using AIP-103 asset state to track a watermark across DAG runs. +The producer reads the last watermark, processes only new records, then +advances the watermark. The consumer is triggered by the asset event and +reads asset state to understand what the producer just loaded. + +Asset state persists on the asset across runs — unlike task state which is +scoped to a single task instance. This replaces the common pattern of +storing watermarks in Airflow Variables, which have no asset-level scoping. +""" + +from __future__ import annotations + +import json +import random +from datetime import datetime, timedelta, timezone + +from airflow.sdk import DAG, Asset, task + +ORDERS = Asset(name="orders/daily", uri="s3://warehouse/orders/daily") + + +def _fetch_records(since: str) -> list[dict]: + """Simulate fetching records newer than `since`.""" + return [{"id": i} for i in range(random.randint(100, 5_000))] + + +with DAG( + dag_id="example_asset_state_producer", + schedule=timedelta(hours=1), + start_date=datetime(2026, 1, 1), + catchup=False, + tags=["example", "asset-state"], + doc_md=__doc__, +): + + @task(inlets=[ORDERS], outlets=[ORDERS]) + def load(**context): + state = context["asset_state"][ORDERS] + + # First run: watermark is None — fall back to epoch start. + watermark = state.get("watermark") or "2026-01-01T00:00:00+00:00" + records = _fetch_records(since=watermark) + row_count = len(records) + + now = datetime.now(tz=timezone.utc).isoformat() + state.set("watermark", now) + state.set("total_runs", str(int(state.get("total_runs") or 0) + 1)) + state.set( + "last_run_summary", + json.dumps( + { + "rows_loaded": row_count, + "prev_watermark": watermark, + "completed_at": now, + } + ), + ) + + print(f"Loaded {row_count} records. Watermark advanced to {now}.") + return row_count + + load() + + +with DAG( + dag_id="example_asset_state_consumer", + schedule=[ORDERS], + start_date=datetime(2026, 1, 1), + catchup=False, + tags=["example", "asset-state"], +): + + @task(inlets=[ORDERS]) + def consume(**context): + state = context["asset_state"][ORDERS] + summary = json.loads(state.get("last_run_summary") or "{}") + print( + f"Processing {summary.get('rows_loaded', '?')} rows " + f"up to watermark {state.get('watermark')}. " + f"Total runs so far: {state.get('total_runs')}." + ) + + consume() diff --git a/airflow-core/src/airflow/example_dags/example_task_state.py b/airflow-core/src/airflow/example_dags/example_task_state.py new file mode 100644 index 0000000000000..247af5dd9bd65 --- /dev/null +++ b/airflow-core/src/airflow/example_dags/example_task_state.py @@ -0,0 +1,90 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Dag that demonstrates the canonical AIP-103 task state pattern: a task submits a +long-running external job, stores the job handle in task state, and polls +until completion. + +The first attempt always fails after submitting the job (simulating a +worker crash / connection to external system being lost). The retry reads +the job ID from task state and reattaches to the already-running job instead +of submitting a duplicate. +""" + +from __future__ import annotations + +import json +import random +import string +import time +from datetime import datetime, timezone + +from airflow.sdk import DAG, task +from airflow.sdk.execution_time.context import NEVER_EXPIRE + + +def _submit_job() -> str: + """Simulate submitting an external job. Returns a job ID.""" + time.sleep(1) + return "job-" + "".join(random.choices(string.ascii_lowercase + string.digits, k=8)) + + +def _poll_job(job_id: str) -> dict: + """Simulate polling an external job until complete.""" + time.sleep(1) + return {"job_id": job_id, "status": "succeeded", "rows_written": random.randint(100, 10_000)} + + +with DAG( + dag_id="example_task_state", + schedule=None, + start_date=datetime(2026, 1, 1), + catchup=False, + tags=["example", "task-state"], + doc_md=__doc__, +): + + @task(retries=2) + def run_job(**context): + ts = context["task_state"] + try_number = context["ti"].try_number + + job_id = ts.get("job_id") + if job_id: + print(f"Try {try_number}: reattaching to existing job: {job_id}") + else: + job_id = _submit_job() + # Store with NEVER_EXPIRE so the job ID survives across all retries. + ts.set("job_id", job_id, retention=NEVER_EXPIRE) + ts.set("submitted_at", datetime.now(tz=timezone.utc).isoformat()) + print(f"Try {try_number}: submitted job: {job_id}") + + # Simulate a crash after submission on the first attempt. + # The retry will reattach to the same job instead of submitting a duplicate. + raise RuntimeError( + f"Simulated failure after submitting {job_id}. The next retry will reattach to this job." + ) + + ts.set("status", "running") + result = _poll_job(job_id) + ts.set("status", "complete") + ts.set("result", json.dumps(result)) + + print(f"Try {try_number}: job complete — {result['rows_written']} rows written") + return result["rows_written"] + + run_job() From 41a9056cbf0df6ad271d7458bbc46d14a6d600ad Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Sat, 23 May 2026 19:08:15 +0530 Subject: [PATCH 2/5] Add example DAGs for AIP-103 task state and asset state --- airflow-core/src/airflow/example_dags/example_asset_state.py | 4 ++-- airflow-core/src/airflow/example_dags/example_task_state.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/example_dags/example_asset_state.py b/airflow-core/src/airflow/example_dags/example_asset_state.py index d1b1855ec2cf5..062ac23d84e07 100644 --- a/airflow-core/src/airflow/example_dags/example_asset_state.py +++ b/airflow-core/src/airflow/example_dags/example_asset_state.py @@ -29,7 +29,7 @@ import json import random -from datetime import datetime, timedelta, timezone +from datetime import datetime, timezone from airflow.sdk import DAG, Asset, task @@ -43,7 +43,7 @@ def _fetch_records(since: str) -> list[dict]: with DAG( dag_id="example_asset_state_producer", - schedule=timedelta(hours=1), + schedule=None, start_date=datetime(2026, 1, 1), catchup=False, tags=["example", "asset-state"], diff --git a/airflow-core/src/airflow/example_dags/example_task_state.py b/airflow-core/src/airflow/example_dags/example_task_state.py index 247af5dd9bd65..da37cdf028e1a 100644 --- a/airflow-core/src/airflow/example_dags/example_task_state.py +++ b/airflow-core/src/airflow/example_dags/example_task_state.py @@ -31,7 +31,7 @@ import random import string import time -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from airflow.sdk import DAG, task from airflow.sdk.execution_time.context import NEVER_EXPIRE @@ -58,7 +58,7 @@ def _poll_job(job_id: str) -> dict: doc_md=__doc__, ): - @task(retries=2) + @task(retries=2, retry_delay=timedelta(seconds=5)) def run_job(**context): ts = context["task_state"] try_number = context["ti"].try_number From 142136bbf8a02f37a377c643b47db47cd9153632 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Sun, 24 May 2026 18:01:57 +0530 Subject: [PATCH 3/5] comments from ash --- .../src/airflow/example_dags/example_asset_state.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/example_dags/example_asset_state.py b/airflow-core/src/airflow/example_dags/example_asset_state.py index 062ac23d84e07..b2dd5d19f5c04 100644 --- a/airflow-core/src/airflow/example_dags/example_asset_state.py +++ b/airflow-core/src/airflow/example_dags/example_asset_state.py @@ -51,8 +51,8 @@ def _fetch_records(since: str) -> list[dict]: ): @task(inlets=[ORDERS], outlets=[ORDERS]) - def load(**context): - state = context["asset_state"][ORDERS] + def load(asset_state=None): + state = asset_state[ORDERS] # First run: watermark is None — fall back to epoch start. watermark = state.get("watermark") or "2026-01-01T00:00:00+00:00" @@ -88,8 +88,8 @@ def load(**context): ): @task(inlets=[ORDERS]) - def consume(**context): - state = context["asset_state"][ORDERS] + def consume(asset_state=None): + state = asset_state[ORDERS] summary = json.loads(state.get("last_run_summary") or "{}") print( f"Processing {summary.get('rows_loaded', '?')} rows " From d05d893751533eb1557c41ccdef81377f9a91fad Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Sun, 24 May 2026 18:05:07 +0530 Subject: [PATCH 4/5] comments from jake --- .../src/airflow/example_dags/example_task_state.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/airflow-core/src/airflow/example_dags/example_task_state.py b/airflow-core/src/airflow/example_dags/example_task_state.py index da37cdf028e1a..2689bfeea240d 100644 --- a/airflow-core/src/airflow/example_dags/example_task_state.py +++ b/airflow-core/src/airflow/example_dags/example_task_state.py @@ -60,17 +60,17 @@ def _poll_job(job_id: str) -> dict: @task(retries=2, retry_delay=timedelta(seconds=5)) def run_job(**context): - ts = context["task_state"] + task_state = context["task_state"] try_number = context["ti"].try_number - job_id = ts.get("job_id") + job_id = task_state.get("job_id") if job_id: print(f"Try {try_number}: reattaching to existing job: {job_id}") else: job_id = _submit_job() # Store with NEVER_EXPIRE so the job ID survives across all retries. - ts.set("job_id", job_id, retention=NEVER_EXPIRE) - ts.set("submitted_at", datetime.now(tz=timezone.utc).isoformat()) + task_state.set("job_id", job_id, retention=NEVER_EXPIRE) + task_state.set("submitted_at", datetime.now(tz=timezone.utc).isoformat()) print(f"Try {try_number}: submitted job: {job_id}") # Simulate a crash after submission on the first attempt. @@ -79,10 +79,10 @@ def run_job(**context): f"Simulated failure after submitting {job_id}. The next retry will reattach to this job." ) - ts.set("status", "running") + task_state.set("status", "running") result = _poll_job(job_id) - ts.set("status", "complete") - ts.set("result", json.dumps(result)) + task_state.set("status", "complete") + task_state.set("result", json.dumps(result)) print(f"Try {try_number}: job complete — {result['rows_written']} rows written") return result["rows_written"] From 5803bd99d8a367b10403ffdc086f9b0880d5f8c3 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Mon, 25 May 2026 19:30:43 +0530 Subject: [PATCH 5/5] fixing example dags --- .../airflow/example_dags/example_asset_state.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/airflow-core/src/airflow/example_dags/example_asset_state.py b/airflow-core/src/airflow/example_dags/example_asset_state.py index b2dd5d19f5c04..3d18b9afaa609 100644 --- a/airflow-core/src/airflow/example_dags/example_asset_state.py +++ b/airflow-core/src/airflow/example_dags/example_asset_state.py @@ -61,16 +61,14 @@ def load(asset_state=None): now = datetime.now(tz=timezone.utc).isoformat() state.set("watermark", now) - state.set("total_runs", str(int(state.get("total_runs") or 0) + 1)) + state.set("total_runs", (state.get("total_runs") or 0) + 1) state.set( "last_run_summary", - json.dumps( - { - "rows_loaded": row_count, - "prev_watermark": watermark, - "completed_at": now, - } - ), + { + "rows_loaded": row_count, + "prev_watermark": watermark, + "completed_at": now, + }, ) print(f"Loaded {row_count} records. Watermark advanced to {now}.")