Skip to content

[FLINK-38825][python] Add Python DataStream API integration for AsyncBatchFunction#27361

Open
featzhang wants to merge 8 commits intoapache:masterfrom
featzhang:FLINK-38825-python
Open

[FLINK-38825][python] Add Python DataStream API integration for AsyncBatchFunction#27361
featzhang wants to merge 8 commits intoapache:masterfrom
featzhang:FLINK-38825-python

Conversation

@featzhang
Copy link
Copy Markdown
Member

@featzhang featzhang commented Dec 22, 2025

What is the purpose of the change

This PR adds Python DataStream API integration for the existing Java AsyncBatchWaitOperator runtime capability, enabling Python-based AI/ML inference and external service calls to use batch-oriented async execution.

This is a pure integration PR - all batching, scheduling, and async execution logic is reused from the Java side.

Brief change log

New Python Classes

File Description
AsyncBatchFunction Python async batch function interface
AsyncBatchFunctionDescriptor Descriptor for serialization and configuration
AsyncBatchOperation Runtime operation for batch async execution
BatchResultDistributor Distributes batch results to individual elements

Modified Files

File Changes
async_data_stream.py Added unordered_wait_batch() and ordered_wait_batch() methods
functions.py Added AsyncBatchFunction and AsyncBatchFunctionDescriptor classes
__init__.py Exported AsyncBatchFunction
flink-fn-execution.proto Added ASYNC_BATCH function type

Test Files

File Description
test_async_batch_function.py Comprehensive tests for batch async functionality

API Design

AsyncBatchFunction

class AsyncBatchFunction(Function, Generic[IN, OUT]):
    """
    A function to trigger Async I/O operation with batch processing support.
    Designed for AI/ML inference scenarios where batching improves throughput.
    """

    @abstractmethod
    async def async_invoke_batch(self, inputs: List[IN]) -> List[OUT]:
        """
        Trigger async operation for a batch of stream inputs.
        Returns a list of results, one for each input element.
        """
        pass

    def timeout_batch(self, inputs: List[IN]) -> List[OUT]:
        """
        Called when async_invoke_batch times out.
        Override to provide custom timeout handling.
        """
        raise TimeoutError("Async batch function call has timed out")

AsyncDataStream Methods

# Unordered batch execution
AsyncDataStream.unordered_wait_batch(
    data_stream,
    async_batch_function,
    timeout,           # Overall timeout
    batch_size,        # Max elements per batch
    batch_timeout=None,# Optional batch flush timeout
    capacity=100,      # Max in-flight operations
    output_type=None   # Output type info
)

# Ordered batch execution (preserves input order)
AsyncDataStream.ordered_wait_batch(
    data_stream,
    async_batch_function,
    timeout,
    batch_size,
    batch_timeout=None,
    capacity=100,
    output_type=None
)

Example Usage

from pyflink.datastream import AsyncDataStream, AsyncBatchFunction
from pyflink.common import Time, Types, Row
from typing import List

class MLInferenceFunction(AsyncBatchFunction):
    """Batch ML model inference function."""
    
    async def async_invoke_batch(self, inputs: List[Row]) -> List[float]:
        # Batch inference call to ML model
        features = [self.extract_features(row) for row in inputs]
        predictions = await self.model.predict_batch(features)
        return predictions.tolist()

# Apply to data stream
result = AsyncDataStream.unordered_wait_batch(
    ds,
    MLInferenceFunction(),
    timeout=Time.seconds(30),
    batch_size=32,
    batch_timeout=Time.milliseconds(100),
    output_type=Types.FLOAT()
)

Testing

The PR includes comprehensive tests covering:

  1. Basic batch execution - Verify batching and results
  2. Batch size triggering - Verify batches trigger at configured size
  3. Batch timeout triggering - Verify partial batches flush on timeout
  4. Exception propagation - Verify errors fail the job
  5. Timeout handling - Verify timeout_batch is called
  6. Ordered execution - Verify output order matches input
  7. End-of-input flush - Verify remaining elements are flushed
  8. Validation errors - Verify parameter validation

Design Principles

  1. Reuse Java Runtime - All batching logic is in AsyncBatchWaitOperator
  2. Follow Existing Patterns - API mirrors AsyncFunction integration
  3. Explicit, Readable Code - No complex abstractions
  4. Backward Compatible - Existing APIs unchanged

Verifying this change

This change added tests and can be verified as follows:

cd flink-python
python -m pytest pyflink/datastream/tests/test_async_batch_function.py -v

Does this pull request potentially affect one of the following parts

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? Docstrings

PR Series for FLINK-38825

JIRA: FLINK-38825 - Introduce an AI-friendly Async Batch Operator for high-latency inference workloads

This feature is implemented incrementally through the following PR series:

# PR Title Description Module
1 #27355 Introduce AsyncBatchFunction and AsyncBatchWaitOperator Core API and runtime operator with unordered semantics and size-based batch triggering flink-streaming-java
2 #27356 Add time-based batch triggering Timeout-based flush to trigger incomplete batches after a configurable duration flink-streaming-java
3 #27357 Add ordered async batch support Ordered output semantics via OrderedAsyncBatchWaitOperator with sequence numbers flink-streaming-java
4 #27358 Add inference-oriented metrics Batch size/latency histograms, async call duration, inflight batches, failure counters flink-streaming-java
5 #27359 Implement retry and timeout strategies Fixed-delay/exponential-backoff retry, fail-on-timeout/allow-partial timeout policies flink-streaming-java
6 #27360 Add SQL/Table API integration AsyncBatchLookupFunction, AsyncBatchLookupFunctionProvider, lookup join runner flink-table
7 #27361 Add Python DataStream API integration Python AsyncBatchFunction, async_invoke_batch() API, AsyncDataStream batch methods flink-python

Note: Each PR builds incrementally on the previous ones. PRs should be reviewed and merged in order.

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Dec 22, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

averyzhang added 7 commits March 4, 2026 07:54
…erence

This commit introduces SQL/Table API support for batch async lookup joins,
enabling AI/ML inference scenarios where batching lookups improves throughput.

Key additions:
- AsyncBatchLookupFunction: Batch-oriented async lookup interface
- AsyncBatchLookupFunctionProvider: Provider with batch configuration
- AsyncBatchLookupJoinRunner: Runtime lookup join runner
- AsyncBatchLookupJoinFunctionAdapter: Adapter to streaming AsyncBatchFunction
- LookupJoinUtil: Batch async lookup detection and options extraction
- FunctionKind.ASYNC_BATCH_TABLE: New function kind enum

The implementation bridges the Table API layer to the existing
AsyncBatchWaitOperator runtime, ensuring consistent behavior with
size-based, time-based batching, retry, and timeout strategies.
…BatchFunction

This commit introduces Python support for batch-oriented async operations,
enabling AI/ML inference scenarios to use batch processing for improved
throughput.

Key additions:
- AsyncBatchFunction class for batch async operations
- AsyncDataStream.unordered_wait_batch() method
- AsyncDataStream.ordered_wait_batch() method
- AsyncBatchOperation runtime implementation
- Comprehensive unit tests

The implementation reuses the Java AsyncBatchWaitOperator for all batching
and scheduling logic, following existing PyFlink async function patterns.
@featzhang featzhang force-pushed the FLINK-38825-python branch from 090d2ee to 0275e66 Compare March 3, 2026 23:54
@featzhang featzhang closed this Mar 16, 2026
@featzhang featzhang reopened this Mar 20, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants