out_s3: add format=parquet,arrow with selectable page compression codecs#11885
out_s3: add format=parquet,arrow with selectable page compression codecs#11885rituparnakhaund wants to merge 4 commits into
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds runtime-selectable Parquet page-level compression codecs, updates the Parquet conversion API to accept a codec, maps/configures codecs in the S3 plugin and upload flow, adjusts payload cleanup, and adds unit tests for Parquet outputs. ChangesS3 Parquet Page-Level Compression
Sequence DiagramsequenceDiagram
participant Config as S3 Config
participant S3Upload as S3 Upload Path
participant ParquetLib as Parquet Conversion (out_s3_compress_parquet)
participant ArrowWriter as Arrow/GParquet
Config->>Config: parse format + compression (resolve parquet codec)
S3Upload->>ParquetLib: out_s3_compress_parquet(json, size, &buf, &sz, codec)
ParquetLib->>ArrowWriter: create writer with GParquetWriterProperties(codec)
ArrowWriter-->>ParquetLib: parquet buffer
ParquetLib-->>S3Upload: parquet buffer returned
S3Upload->>S3Upload: upload buffer to S3
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
6e4fd79 to
000f6af
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 000f6af6cc
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@plugins/out_s3/s3.c`:
- Around line 740-761: The Parquet path currently accepts a user-specified
log_key which later breaks Parquet conversion; in the FLB_S3_FORMAT_PARQUET
branch (the block that calls enable_parquet_format(ctx)) add a check after
enable_parquet_format succeeds: if ctx->log_key != NULL then call
flb_plg_error(ctx->ins, "'log_key' is not supported when format is parquet") and
return -1; use the existing symbols FLB_S3_FORMAT_PARQUET,
enable_parquet_format, ctx->log_key and ctx->ins to locate and implement the
check.
- Around line 1316-1331: The Parquet branch (out_s3_compress_parquet) allocates
payload_buf but later cleanup only frees when ctx->compression !=
FLB_AWS_COMPRESS_NONE, leaking payload_buf whenever ctx->compression was cleared
for Parquet; ensure payload_buf is independently freed whenever it was
allocated: after successful upload in the put_object path and in every
early-return/error path (including the FLB_RETRY path where s3_store_file_unlock
and chunk->failures are adjusted). Add a conditional free of payload_buf (e.g.,
flb_free(payload_buf) guarded by whether out_s3_compress_parquet set it) in the
out_s3_compress_parquet success branch and all subsequent exit points regardless
of ctx->compression.
- Around line 114-125: map_to_parquet_codec currently maps unknown compression
types to FLB_PARQUET_COMPRESSION_NONE which silently downgrades user config;
change map_to_parquet_codec to return a sentinel (e.g., -1) for unsupported
codecs and in the caller (where ctx->parquet_compression is set) check for -1,
call flb_plg_error(ctx->ins, "unsupported compression '%s' for format=parquet",
tmp) and return -1 to fail init, and only set ctx->compression =
FLB_AWS_COMPRESS_NONE after a successful mapping; reference map_to_parquet_codec
and ctx->parquet_compression for locating changes.
In `@tests/internal/aws_compress.c`:
- Around line 347-353: The test currently calls out_s3_compress_parquet and
always performs memcmp/out_size checks even if ret != 0, risking dereferencing
invalid out_buf or using out_size after a failed conversion; update the test
around the out_s3_compress_parquet call (and the analogous blocks testing other
compressions) to guard subsequent assertions by early-failing or skipping those
checks when ret != 0 (e.g., assert/require the conversion succeeded before using
out_buf/out_size, or return/continue the test case on failure), and ensure
out_buf is only dereferenced and flb_free'd when it was actually populated by a
successful call.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 4502cf5b-2e0a-4cf0-86d5-02fb93b9e780
📒 Files selected for processing (7)
include/fluent-bit/aws/flb_aws_compress.hplugins/out_s3/s3.cplugins/out_s3/s3.hsrc/aws/compression/arrow/compress.csrc/aws/compression/arrow/compress.hsrc/aws/flb_aws_compress.ctests/internal/aws_compress.c
💤 Files with no reviewable changes (1)
- src/aws/flb_aws_compress.c
facaa57 to
8abd42f
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@plugins/out_s3/s3.c`:
- Around line 714-715: The default Parquet codec is being set to
FLB_PARQUET_COMPRESSION_NONE (ctx->parquet_compression) which causes plain
format=parquet to produce uncompressed output; change the default initialization
to FLB_PARQUET_COMPRESSION_SNAPPY so new default codec is Snappy, and preserve
backward compatibility by keeping the deprecated compression=parquet code path
(the branch that explicitly handles compression="parquet") to force
ctx->parquet_compression = FLB_PARQUET_COMPRESSION_NONE when that deprecated
option is used; update both the initializations around where ctx->s3_format is
set and the identical block later (the region referenced around lines 879-886)
so both locations use SNAPPY as the default and still override to NONE for the
deprecated path.
- Around line 879-886: The deprecated branch that treats "compression=parquet"
as an alias still enables Parquet without rejecting the log_key option; update
the block handling strcasecmp(tmp, "parquet") so it performs the same validation
as the new format guard: call the same check that rejects/validates log_key (the
logic used by enable_parquet_format or the code that inspects log_key) before
enabling Parquet, and if log_key is present return -1 (after emitting the
existing flb_plg_warn about deprecation) so the deprecated path cannot bypass
the log_key rejection; ensure you reference the same ctx/ins structures and
preserve the warning text via flb_plg_warn and the call to
enable_parquet_format(ctx).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 4b6ca01a-c367-4245-acb0-ff825b3e9049
📒 Files selected for processing (6)
e2e-tests.txtfluent-bit.imlplugins/out_s3/s3.cplugins/out_s3/s3.hsrc/aws/compression/arrow/compress.ctests/internal/aws_compress.c
✅ Files skipped from review due to trivial changes (1)
- fluent-bit.iml
🚧 Files skipped from review as they are similar to previous changes (3)
- plugins/out_s3/s3.h
- src/aws/compression/arrow/compress.c
- tests/internal/aws_compress.c
8abd42f to
daf983a
Compare
daf983a to
d26fe19
Compare
cosmo0920
left a comment
There was a problem hiding this comment.
Looks attractive for me. Thanks for your work. 👍
| int ret; | ||
| void *out_buf = NULL; | ||
| size_t out_size = 0; | ||
| char *json = "{\"key\":\"value\",\"num\":42}\n" |
There was a problem hiding this comment.
Is this intended to be valid JSON?
There was a problem hiding this comment.
this is just an example. I can update it to be more meaningful :D. Thanks for your comment.
|
I think we may be missing that we are not moving |
that is intentional. I wanted to roll out parquet as is. Will move |
|
@ShelbyZ is this approved from your side ? |
d26fe19 to
2f9af92
Compare
The compression option conflated byte-level compression (gzip, zstd, snappy) with columnar format conversion (arrow, parquet), making it impossible to produce Arrow or Parquet files with a chosen internal compression codec. Introduce a single columnar API, out_s3_compress_columnar(), that takes a columnar format (FLB_AWS_COMPRESS_FORMAT_ARROW or FLB_AWS_COMPRESS_FORMAT_PARQUET) and applies a generic FLB_AWS_COMPRESS_* codec on top of it: Parquet via the page-level GLib writer properties, Arrow/Feather via the Feather write properties. Parquet supports none, snappy, gzip and zstd; Arrow supports none and zstd. The arrow and parquet entries are removed from the generic compression dispatch table, and the format-specific FLB_PARQUET_COMPRESSION_* constants together with the standalone compress.h are dropped in favour of the unified declarations in flb_aws_compress.h. Unknown columnar formats now fail loudly instead of silently defaulting to Arrow. Signed-off-by: Rituparna Khaund <ritukhau@amazon.com>
Separate the format axis (json, json_lines, otlp_json, arrow, parquet) from the compression axis so users can produce columnar files with a selectable internal codec. 'format arrow' and 'format parquet' write a self-contained columnar object; the compression option then selects the codec applied inside the file (page-level for Parquet, IPC buffer compression for Arrow). The default is uncompressed, preserving existing behaviour. Columnar objects are uploaded via PutObject and never carry a byte-level Content-Encoding header, and log_key is rejected for these formats. Each codec is validated against the chosen format. The deprecated compression=arrow and compression=parquet values are kept as aliases that emit a warning and enable the corresponding format. Signed-off-by: Rituparna Khaund <ritukhau@amazon.com>
Add unit tests for the columnar API covering Arrow/Feather output: uncompressed and zstd produce valid ARROW1 buffers, and an unsupported codec (gzip) is rejected. These complement the existing Parquet page-level compression tests. Signed-off-by: Rituparna Khaund <ritukhau@amazon.com>
Add out_s3 integration scenarios for the columnar formats: arrow and parquet with zstd, parquet and arrow with compression none, and a negative case asserting that an invalid format/compression combination (arrow + gzip) is rejected at startup. Each positive case verifies the ARROW1/PAR1 magic bytes and the absence of a Content-Encoding header. Signed-off-by: Rituparna Khaund <ritukhau@amazon.com>
43cd1f4 to
940a8f1
Compare
The S3 output plugin currently overloads the compression option to handle both byte-level compression (gzip, zstd, snappy) and format conversion (parquet). This makes it impossible
for users to produce Parquet files with page-level compression, because compression can only hold one value.
When compression=parquet is set, the code path ultimately calls gparquet_arrow_file_writer_new_arrow with NULL for writer properties. This causes the Arrow GLib binding to use parquet::default_writer_properties(), which sets the page-level codec to Compression::UNCOMPRESSED.
Every column page ends up uncompressed despite the key being "compression" ?
This patch separates concerns:
format now accepts parquet (in addition to existing json_lines and otlp_json), controlling the output serialization format.
compression controls the codec applied inside the columnar file rather than wrapping it externally: page-level for Parquet (snappy, zstd, gzip, none) and IPC buffer compression
for Arrow/Feather (zstd, none). The codec is validated against the chosen format, and an unsupported combination (e.g. arrow + gzip) fails at startup.
Default page-level codec for Parquet is snappy, matching the industry standard (Spark, PyArrow, Hive all default to snappy).
Backwards compatibility is preserved: compression=parquet and compression=arrow still work but emits a deprecation warning and maps to format=parquet/arrow with no page-level compression (identical to current
behavior).
Addresses feedback from #10691 requesting codec and page size configurability.
Enter
[N/A]in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
Verified with parquet-tools inspect:
Before (current master):
compression: UNCOMPRESSED (space_saved: 0%)
After this patch:
compression: SNAPPY
[X] Debug log output from testing the change :
Testing strategy : Ran
build/bin/fluent-bitlocally on my mac to write to my s3 bucket with different configs.Here are the e2e-tests.txt
Individually ran this for each of the tests and no corruption found. The 17 tests are taking too long, so will post here as soon as I see an update.
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
ok-package-testlabel to test for all targets (requires maintainer to do).Documentation
fluent/fluent-bit-docs#2591
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
New Features
Bug Fixes
Chores
compression=parquetdeprecated with a warning; config help updated.Tests