Skip to content

Simplifing authoring of task and asset states by allowing JSON types#67418

Merged
amoghrajesh merged 4 commits into
apache:mainfrom
astronomer:aip-103-better-user-interface-ts
May 25, 2026
Merged

Simplifing authoring of task and asset states by allowing JSON types#67418
amoghrajesh merged 4 commits into
apache:mainfrom
astronomer:aip-103-better-user-interface-ts

Conversation

@amoghrajesh
Copy link
Copy Markdown
Contributor


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Discovered during: #67376

What

Task and asset state is designed for storing lightweight metadata like job handles, counters, timestamps, small status dictionaries. But the set()/get() API forced everything through str, making common patterns awkward:

task_state.set("retry_count", str(3))
task_state.set("poll_result", json.dumps({"status": "succeeded", "rows": 1234}))

count = int(task_state.get("retry_count"))
result = json.loads(task_state.get("poll_result"))

Proposed change
set() and get() now accept and return any JSON native type. No manual serialization needed:

# After
task_state.set("retry_count", 3)
task_state.set("poll_result", {"status": "succeeded", "rows": 1234})
task_state.set("job_id", "spark-abc") 

count = task_state.get("retry_count")       # → 3 (int)
result = task_state.get("poll_result")      # → dict

Using plain json.dumps/json.loads at the execution API boundary rather than a heavier serialization library like serde. Task state is not intended Python objects and JSON covers every real use case with no additional dependencies.

The DB column remains TEXT with JSON-encoded strings; the encoding/decoding is transparent to callers.

Testing

from __future__ import annotations

from datetime import datetime

from airflow.sdk import DAG, task
from airflow.sdk.execution_time.context import NEVER_EXPIRE

with DAG(
    dag_id="test_task_state_types",
    schedule=None,
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["test", "aip-103"],
):

    @task(retries=0)
    def test_types(**context):
        ts = context["task_state"]

        # primitive types
        ts.set("a_str", "hello")
        ts.set("an_int", 42)
        ts.set("a_float", 3.14)
        ts.set("a_bool_true", True)
        ts.set("a_bool_false", False)

        assert ts.get("a_str") == "hello"
        assert ts.get("an_int") == 42
        assert ts.get("a_float") == 3.14
        assert ts.get("a_bool_true") is True
        assert ts.get("a_bool_false") is False

        # collection
        ts.set("a_list", [1, "two", 3.0, True])
        ts.set("a_dict", {"status": "ok", "count": 7})
        ts.set("nested", {"rows": [1, 2, 3], "meta": {"done": True}})

        assert ts.get("a_list") == [1, "two", 3.0, True]
        assert ts.get("a_dict") == {"status": "ok", "count": 7}
        assert ts.get("nested") == {"rows": [1, 2, 3], "meta": {"done": True}}

        # missing key returns None
        assert ts.get("never_set") is None

        print("All type round-trips passed")

    test_types()
image image
  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

Comment thread task-sdk/src/airflow/sdk/execution_time/comms.py
Comment thread task-sdk/src/airflow/sdk/execution_time/context.py Outdated
@amoghrajesh amoghrajesh requested a review from kaxil May 24, 2026 16:32
Comment thread shared/state/src/airflow_shared/state/__init__.py
Copy link
Copy Markdown
Collaborator

@jroachgolf84 jroachgolf84 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I pulled this down and tested on my side, and things looked good :)

@amoghrajesh amoghrajesh moved this from Backlog to In review in AIP-103: Task State Management May 25, 2026
@amoghrajesh
Copy link
Copy Markdown
Contributor Author

Failing CI fixed by: #67467

@amoghrajesh amoghrajesh merged commit b59126d into apache:main May 25, 2026
139 of 141 checks passed
@github-project-automation github-project-automation Bot moved this from In review to Done in AIP-103: Task State Management May 25, 2026
@amoghrajesh amoghrajesh deleted the aip-103-better-user-interface-ts branch May 25, 2026 13:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:API Airflow's REST/HTTP API area:task-sdk

Projects

Development

Successfully merging this pull request may close these issues.

3 participants