feature/issue-67200: Adding AssetState Task SDK mechanism#67248
feature/issue-67200: Adding AssetState Task SDK mechanism#67248jroachgolf84 wants to merge 5 commits into
feature/issue-67200: Adding AssetState Task SDK mechanism#67248Conversation
|
From AIP-103 I understood this new task state management could be used across multiple different use cases such as intra-task progress checkpointing . If that's the case, I am not sure about the name |
AIP-103 addresses both Task and Asset state. Here are some of the PR's that have added Asset State.
cc: @amoghrajesh |
|
@amoghrajesh - when you get a chance, can you look at this? I'm going to work on getting these checks green. |
The APIs look good to me, I am only questioning the way to access the state. In your example you do |
|
I just looked at #67376 and I meant exactly this same mechanism but for asset. Cannot we plumb through automatically the state of an asset to its related triggers? That would avoid forcing Dag author to manually get the asset state with |
@vincbeck - I think that makes sense. The only caveat is that a # Contains the asset states
self.asset_states = ...
asset_a_state = self.asset_states.get("asset_a")
asset_b_state = self.asset_states.get("asset_b")Thoughts? I'm kinda caught in the middle on this one. |
amoghrajesh
left a comment
There was a problem hiding this comment.
Hey @jroachgolf84, I think the plumbing fix here will be correct.
AssetState(name=self.asset_name) works but it is inconsistent with how tasks access asset state as Vincent also mentioned, and nothing prevents a trigger from doing AssetState(name="some_other_asset") even if it's not associated with that asset.
Looking at BaseTrigger, the triggerer already injects self.task_instance before calling run(). We could use the same pattern here and the triggerer could populate self.asset_states on BaseEventTrigger before run(), keyed by asset name, using the assets associated with the TI. The trigger author can then do:
async def run(self):
watermark = self.asset_states["orders"].get("watermark")
self.asset_states["orders"].set("watermark", new_watermark)
yield TriggerEvent(...)This will also provide some benefits like:
- Make the triggers scoped automatically (only the trigger's associated assets are present)
- Should also work cleanly for multi-asset triggers(just use the asset name while accessing)
- The framework handles scoping, not the author
The triggerer already knows which assets are associated with the TI at the point it sets trigger.task_instance = ti, so populating asset_states there is probably straightforward.
I really like that! |
Description
This PR adds the
AssetStatemechanism to use the foundations put in place in AIP-103 within a Trigger or Task.AssetStatecan be used like this:Note: Documentation has not been created for this functionality. That will be done as part of #65782.
closes: #67200
related: #65782
Testing
These changes were unit-tested, as well as tested E2E. See below for more information.
Unit Tests
Existing unit tests were updated and new tests were added to validate the changes that were made in this branch. Note, there are additional unit-tests to be added.
E2E Testing
To test this E2E, the DAG below was used. This implemented
AssetStatein both a Task and a Trigger (checkout the Trigger below). Note that I was able to retrieve and set state in theGenericEventTrigger, as well as thedownstream_task.This
GenericEventTriggerwas used for E2E testing. Note thatAssetStateis used in therunmethod of the Trigger. This code properly stores and retrieves the generated number, and logs the output accordingly.Outstanding Items
The following items still need to be completed:
AssetStateworks when called within a Task.Was generative AI tooling used to co-author this PR?
No, generative AI was not used to generate this PR.