Priority Level: High (Major functionality broken)
Describe the bug
The async engine (DATA_DESIGNER_ASYNC_ENGINE=1) has two issues with side-effect columns from @custom_column_generator:
-
Side-effect values not propagated to buffer - AsyncTaskScheduler._run_cell() writes results back via _instance_to_columns, which only contains primary column names. Side-effect values (e.g., _tag_notation, _merged_tagged_text) are in the result dict but get filtered out, so downstream prompts that reference them via Jinja2 fail with "columns are missing".
-
Side-effect collision silently misresolves - ExecutionGraph.set_side_effect() uses last-writer-wins. When two generators declare the same side-effect column at different pipeline stages, the second registration overwrites the first, creating incorrect dependency edges and false DAGCircularDependencyError.
This blocks Anonymizer from using the async engine (NVIDIA-NeMo/Anonymizer#95).
Steps/Code to reproduce bug
# Two custom columns producing the same side-effect at different stages
@custom_column_generator(
required_columns=["seed_col"],
side_effect_columns=["shared_side_effect"],
)
def stage_1(row):
row["stage_1_col"] = "value"
row["shared_side_effect"] = "produced_by_stage_1"
return row
@custom_column_generator(
required_columns=["seed_col", "stage_2_col"],
side_effect_columns=["shared_side_effect"],
)
def stage_3(row):
row["stage_3_col"] = "value"
row["shared_side_effect"] = "produced_by_stage_3"
return row
# LLM column between them references {{ shared_side_effect }} in prompt.
# Graph resolves shared_side_effect -> stage_3 (last writer), not stage_1.
# Creates false cycle: stage_3 -> stage_2 -> stage_3
With DATA_DESIGNER_ASYNC_ENGINE=1, Anonymizer's detection pipeline (10 generated columns, multiple stages with side-effect reuse) hits:
DAGCircularDependencyError: The execution graph contains cyclic dependencies. Resolved 6/12 columns.
After manually breaking the cycle by removing later side-effect declarations, a second error surfaces:
The following ['_tag_notation', '_merged_tagged_text'] columns are missing!
Expected behavior
ExecutionGraph.set_side_effect() should detect collisions and keep the first registration (matching sync engine semantics where earlier consumers see earlier producers), or raise.
- Side-effect column values from cell task results should be written to the
RowGroupBufferManager so downstream tasks can read them via prompt templates.
Agent Diagnostic / Prior Investigation
Issue 1 root cause: async_scheduler.py line 128-131 builds _instance_to_columns from generators.items() (primary columns only). At line 785, the write-back loop for col in output_cols skips side-effect columns even though they're present in the result dict. Fix: include config.side_effect_columns when building _instance_to_columns.
Issue 2 root cause: execution_graph.py line 108, set_side_effect is a blind dict assignment. In Anonymizer's detection pipeline, prepare_validation_inputs and merge_and_build_candidates both produce _merged_tagged_text. The second overwrites the first, so _validation_decisions's prompt reference {{ _merged_tagged_text }} resolves to _merged_entities instead of _validation_candidates, creating the cycle: _merged_entities -> _validation_decisions -> _validated_entities -> _seed_entities_json -> _augmented_entities -> _merged_entities.
Additional context
The sync engine handles both cases correctly because columns execute sequentially on a shared mutable row dict. Both fixes are small and localized (~5 lines each). Workaround: DATA_DESIGNER_ASYNC_ENGINE=0 (the default).
Priority Level: High (Major functionality broken)
Describe the bug
The async engine (
DATA_DESIGNER_ASYNC_ENGINE=1) has two issues with side-effect columns from@custom_column_generator:Side-effect values not propagated to buffer -
AsyncTaskScheduler._run_cell()writes results back via_instance_to_columns, which only contains primary column names. Side-effect values (e.g.,_tag_notation,_merged_tagged_text) are in the result dict but get filtered out, so downstream prompts that reference them via Jinja2 fail with "columns are missing".Side-effect collision silently misresolves -
ExecutionGraph.set_side_effect()uses last-writer-wins. When two generators declare the same side-effect column at different pipeline stages, the second registration overwrites the first, creating incorrect dependency edges and falseDAGCircularDependencyError.This blocks Anonymizer from using the async engine (NVIDIA-NeMo/Anonymizer#95).
Steps/Code to reproduce bug
With
DATA_DESIGNER_ASYNC_ENGINE=1, Anonymizer's detection pipeline (10 generated columns, multiple stages with side-effect reuse) hits:After manually breaking the cycle by removing later side-effect declarations, a second error surfaces:
Expected behavior
ExecutionGraph.set_side_effect()should detect collisions and keep the first registration (matching sync engine semantics where earlier consumers see earlier producers), or raise.RowGroupBufferManagerso downstream tasks can read them via prompt templates.Agent Diagnostic / Prior Investigation
Issue 1 root cause:
async_scheduler.pyline 128-131 builds_instance_to_columnsfromgenerators.items()(primary columns only). At line 785, the write-back loopfor col in output_colsskips side-effect columns even though they're present in the result dict. Fix: includeconfig.side_effect_columnswhen building_instance_to_columns.Issue 2 root cause:
execution_graph.pyline 108,set_side_effectis a blind dict assignment. In Anonymizer's detection pipeline,prepare_validation_inputsandmerge_and_build_candidatesboth produce_merged_tagged_text. The second overwrites the first, so_validation_decisions's prompt reference{{ _merged_tagged_text }}resolves to_merged_entitiesinstead of_validation_candidates, creating the cycle:_merged_entities -> _validation_decisions -> _validated_entities -> _seed_entities_json -> _augmented_entities -> _merged_entities.Additional context
The sync engine handles both cases correctly because columns execute sequentially on a shared mutable row dict. Both fixes are small and localized (~5 lines each). Workaround:
DATA_DESIGNER_ASYNC_ENGINE=0(the default).