-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-38189][core][python] Add RowFieldExtractorSchema for Row field serialization #27353
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…lization This commit introduces RowFieldExtractorSchema, a new SerializationSchema that extracts and serializes a specific field from a Row object. This is particularly useful for Kafka scenarios where keys and values need separate serialization. Changes: - Add RowFieldExtractorSchema.java with field extraction logic - Add comprehensive unit tests for Java implementation - Add Python bindings in pyflink.common.serialization - Add Python unit tests and Kafka integration tests - Add documentation and examples This closes apache/flink#38189
| return new byte[0]; | ||
| } | ||
|
|
||
| return field.toString().getBytes(CHARSET); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It assumes the input to be of type string. What about requiring the input to be of type byte array and let user convert the data to byte array themselves?
| * }</pre> | ||
| */ | ||
| @PublicEvolving | ||
| public class RowFieldExtractorSchema implements SerializationSchema<Row> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about moving this class to module flink-python? This class should be more useful for Python users.
|
@Nflrijal Thanks a lot for the PR. I think it will be very useful. Besides the comments above, could you also add an example under |
What is the purpose of the change
This pull request introduces
RowFieldExtractorSchema, a newSerializationSchemaimplementation that extracts and serializes a specific field from a Row object. This addresses the common use case where users need to serialize different fields of a Row separately, particularly for Kafka producers where keys and values require independent serialization.Previously, users had to implement custom serialization schemas or use workarounds to extract individual Row fields. This change provides a reusable, type-safe solution that simplifies the common pattern of using one Row field as a Kafka key and another as the value.
Brief change log
RowFieldExtractorSchemaclass inflink-corethat implementsSerializationSchema<Row>RowFieldExtractorSchemaTestwith comprehensive unit tests covering:pyflink.common.serialization.RowFieldExtractorSchematest_serialization_schemas.pyVerifying this change
This change added tests and can be verified as follows:
Java tests:
RowFieldExtractorSchemaTestwith 6 test cases covering normal operation, edge cases, and error handlingmvn test -pl flink-core -Dtest=RowFieldExtractorSchemaTestPython tests:
test_serialization_schemas.py::SerializationSchemasTests::test_row_field_extractor_*kafka_test.pydemonstrating Kafka key/value serializationpytest pyflink/common/tests/test_serialization_schemas.py -k RowFieldExtractor -vManual verification:
Does this pull request potentially affect one of the following parts:
@Public(Evolving): yes (new@Public(Evolving)class added)Documentation