Add example DAGs for AIP-103 task state and asset state#67376
Conversation
ashb
left a comment
There was a problem hiding this comment.
Questions/comments about the interfaces
|
I'm thinking that I'll add a few more example DAGs that cover Asset watching as a fast-follow on this PR: #66595 |
@jroachgolf84 before adding "just more" example dags, can you please consider that these belong to some "story line" and real world example? We had the plans to reduce examples actually in the scope of https://cwiki.apache.org/confluence/display/AIRFLOW/Examples+Refurbish but still this is WIP. With the velocity I see (in real life) compared to new features added continously I feel like example dags are growing faster than the intend to consolidate is getting traction. |
|
@jscheffl - definitely, I think that makes sense. I probably used "a few more" a bit haphazardly. I'll make sure to tag you once that PR lands, but it will be intentional and aim to not add cruft. |
|
@jscheffl @jroachgolf84 interface change PR here: #67418 |
|
Alright I rebased it and now the example dags are cleaner after solving the interface via: #67418, tested the dags and they still work fine @jscheffl @jroachgolf84 appreciate re reviews :) |
jscheffl
left a comment
There was a problem hiding this comment.
Looks good to me - except that leading forward we should aim for consolidating (and not adding more) examples and merging them into the storyline of a real world example.
Ack, not been closely following that one but let me check |
|
Thanks for review folks, merging this one. |
Was generative AI tooling used to co-author this PR?
closes #67324
What
AIP-103 introduced task state and asset state but there are no example DAGs. New users have no concrete reference for how to use these features correctly, and the docs PR has no working code to link to.
I am adding two self-contained example DAGs to
airflow-core/src/airflow/example_dags/:example_task_state.py— demonstrates the external job resumption pattern. The task submits a job, stores the job ID in task state withNEVER_EXPIRE, then deliberately fails on the first attempt. The retry reads the job ID from task state and reattaches to the already-running job instead of submitting a duplicate.example_asset_state.py— demonstrates incremental watermarking with a producer + consumer pair. The producer reads the last watermark from asset state, fetches only new records, then advances the watermark. The consumer is triggered by the asset event and reads asset state to inspect what was loaded.Both DAGs are self-contained with no external dependencies and work out of the box.
Example run
Task state dag
Asset State dag
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.