diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index faa756d7c5c5..b0051162fee2 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -1245,6 +1245,9 @@ def process_bundle( expected_input_ops.append(op) try: + # Indicate that we are busy (setup or processing) so that runners can + # distinguish setup/processing from being idle and caught up. + self.consuming_received_data = True execution_context = ExecutionContext(instruction_id=instruction_id) self.current_instruction_id = instruction_id self.state_sampler.start() @@ -1283,7 +1286,7 @@ def process_bundle( self.ops[transform_id].add_timer_info(timer_family_id, timer_info) # Process data and timer inputs - # We are currently not consuming received data. + # Setup is complete; signal that we are caught up until data arrives. self.consuming_received_data = False for data_channel, expected_inputs in data_channels.items(): for element in data_channel.input_elements(instruction_id,