opentelemetry: add OTLP JSON and OTLP Proto helpers#11624
Conversation
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughMade yyjson optional via CMake; added a public flb_json API and implementation (immutable + mutable); added OTLP JSON/proto encoders and integrated them into outputs (Kafka, Stdout); gated yyjson-backed packer APIs; added metric unit roundtrip handling and tests; added metrics chunk synchronization. Changes
Sequence Diagram(s)sequenceDiagram
participant Producer as Event Producer
participant OutKafka as out_kafka
participant OTLPConv as OTLP Converter
participant Broker as Kafka Broker
Producer->>OutKafka: push event_chunk (msgpack)
activate OutKafka
OutKafka->>OutKafka: inspect ctx->format
alt OTLP JSON
OutKafka->>OTLPConv: msgpack -> OTLP JSON
activate OTLPConv
OTLPConv->>OTLPConv: decode msgpack, build OTLP JSON
OTLPConv-->>OutKafka: JSON bytes
deactivate OTLPConv
else OTLP Proto
OutKafka->>OTLPConv: msgpack -> OTLP Proto
activate OTLPConv
OTLPConv->>OTLPConv: decode to cmt/ctr, build protobuf
OTLPConv-->>OutKafka: binary proto bytes
deactivate OTLPConv
end
OutKafka->>OutKafka: handle queue-full retries (local)
OutKafka->>Broker: produce_raw_payload(bytes)
OutKafka-->>Producer: return status
deactivate OutKafka
sequenceDiagram
participant App as Consumer/Library user
participant flb_json as flb_json API
participant flb_pack as flb_pack (msgpack)
participant yyjson as yyjson (optional)
App->>flb_json: flb_json_read(json, len)
activate flb_json
flb_json->>flb_pack: flb_pack_json(...)
flb_pack-->>flb_json: msgpack buffer
flb_json->>flb_json: wrap msgpack into flb_json_doc
flb_json-->>App: flb_json_doc*
deactivate flb_json
App->>flb_json: flb_json_write_pretty(doc)
activate flb_json
alt FLB_HAVE_YYJSON
flb_json->>yyjson: convert -> yyjson_mut, yyjson_write(pretty)
yyjson-->>flb_json: pretty JSON
else
flb_json->>flb_json: render msgpack -> pretty JSON
end
flb_json-->>App: JSON string
deactivate flb_json
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: a1ae9ea8fc
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (1)
src/opentelemetry/flb_opentelemetry_otlp_proto.c (1)
124-137:⚠️ Potential issue | 🟠 MajorStill missing an
INT64_MAXguard before narrowing positive msgpack integers.Line 136 and Line 508 cast
via.u64straight intoint64_t. Values aboveINT64_MAXwrap into negativeresource_id/scope_idorAnyValue.int_value, which corrupts grouping and exported payload data.Suggested range checks
if (value->type == MSGPACK_OBJECT_POSITIVE_INTEGER) { + if (value->via.u64 > INT64_MAX) { + return -1; + } *result = (int64_t) value->via.u64; return 0; }if (object->type == MSGPACK_OBJECT_POSITIVE_INTEGER) { + if (object->via.u64 > INT64_MAX) { + otlp_any_value_destroy(value); + return NULL; + } value->int_value = (int64_t) object->via.u64; }Also applies to: 503-509
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/opentelemetry/flb_opentelemetry_otlp_proto.c` around lines 124 - 137, The positive-integer branch in msgpack_map_get_int64 (and the other site that casts value->via.u64 into int64_t for resource_id/scope_id/AnyValue.int_value) lacks a check for overflow: add a guard that compares value->via.u64 against INT64_MAX before casting and treat values > INT64_MAX as an error (return non-zero) to avoid wrapping into negative int64_t; apply the same check at the other cast sites where via.u64 is narrowed to int64_t so large unsigned msgpack integers are rejected rather than silently wrapped.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/flb_input_metric.c`:
- Around line 102-116: The eviction block that removes entries from
ht_metric_chunks is skipped when tag == NULL; to fix, ensure tag normalization
happens before that eviction logic in flb_input_metrics_append:
resolve/normalize the incoming tag (using the same tag normalization logic
currently at lines ~42–50) unconditionally so tag is non-NULL when you reach the
mutex/eviction code that references ins->ht_metric_chunks and
metrics_chunk_lock, or alternatively move the eviction code to the location
where flb_input_chunk_append_raw() finalizes the tag; update
flb_input_metrics_append() (and any helper that performs tag normalization) so
the eviction always runs against the resolved tag reference.
In `@src/opentelemetry/flb_opentelemetry_otlp_proto.c`:
- Around line 1621-1649: The code allocates a LogRecord into
current_scope->scope_log->log_records[...] before calling log_record_to_proto
but only increments n_log_records on success, so if log_record_to_proto fails
the allocated/partially-populated record is leaked; fix by freeing/destroying
that staged LogRecord (reverse of
opentelemetry__proto__logs__v1__log_record__init — i.e., free any nested fields
and the LogRecord struct) immediately before each early return in the error
paths that follow log_record_to_proto, so that flb_log_event_decoder_destroy,
destroy_export_logs, destroy_logs_resource_states and set_error can run without
leaking memory; ensure you update the same error branches that currently return
NULL after log_record_to_proto fails.
- Around line 911-915: The cleanup paths free protobuf-c string fields
unconditionally (e.g., resource_log->schema_url and the scope-related schema_url
fields) which can free static storage set by protobuf_c__*_init; update each
early-error cleanup to check against protobuf_c_empty_string before calling
flb_free (use the same guard pattern as destroy_export_logs()), specifically
guard resource_log->schema_url and the scope schema_url fields in the error
branches around the realloc/alloc failures so you only flb_free() if the pointer
!= protobuf_c_empty_string.
---
Duplicate comments:
In `@src/opentelemetry/flb_opentelemetry_otlp_proto.c`:
- Around line 124-137: The positive-integer branch in msgpack_map_get_int64 (and
the other site that casts value->via.u64 into int64_t for
resource_id/scope_id/AnyValue.int_value) lacks a check for overflow: add a guard
that compares value->via.u64 against INT64_MAX before casting and treat values >
INT64_MAX as an error (return non-zero) to avoid wrapping into negative int64_t;
apply the same check at the other cast sites where via.u64 is narrowed to
int64_t so large unsigned msgpack integers are rejected rather than silently
wrapped.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 0b03b000-c255-41bd-a23f-f04c5fa02a5c
📒 Files selected for processing (6)
plugins/out_kafka/kafka.csrc/flb_input.csrc/flb_input_metric.csrc/flb_json.csrc/opentelemetry/flb_opentelemetry_otlp_proto.ctests/internal/opentelemetry.c
🚧 Files skipped from review as they are similar to previous changes (4)
- src/flb_input.c
- tests/internal/opentelemetry.c
- plugins/out_kafka/kafka.c
- src/flb_json.c
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
There was a problem hiding this comment.
♻️ Duplicate comments (1)
src/opentelemetry/flb_opentelemetry_otlp_proto.c (1)
978-985:⚠️ Potential issue | 🔴 CriticalMissing
protobuf_c_empty_stringguards when freeingscope->nameandscope->version.After
opentelemetry__proto__common__v1__instrumentation_scope__init(scope), thenameandversionfields may be set toprotobuf_c_empty_string(static storage). Ifadd_msgpack_scope_fieldspartially succeeds or the corresponding msgpack fields don't exist, these fields remain at their init values. Callingflb_free()on static memory is undefined behavior.The same issue exists at lines 993-994 and 1011-1012. The
destroy_export_logsfunction correctly guards these fields at lines 1296-1302.Proposed fix for all three cleanup paths
if (add_msgpack_scope_fields(scope, scope_object) != 0) { - flb_free(scope->name); - flb_free(scope->version); + if (scope->name != NULL && scope->name != protobuf_c_empty_string) { + flb_free(scope->name); + } + if (scope->version != NULL && scope->version != protobuf_c_empty_string) { + flb_free(scope->version); + } otlp_kvarray_destroy(scope->attributes, scope->n_attributes); flb_free(scope); flb_free(scope_log);Apply the same guard pattern at lines 993-994 and 1011-1012.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/opentelemetry/flb_opentelemetry_otlp_proto.c` around lines 978 - 985, The cleanup code frees scope->name and scope->version unconditionally which can be the static protobuf_c_empty_string; change each flb_free(scope->name) and flb_free(scope->version) call (the ones in the failure path after add_msgpack_scope_fields and the two other scope-cleanup paths) to only call flb_free if the pointer is not equal to protobuf_c_empty_string (e.g., if (scope->name && scope->name != protobuf_c_empty_string) flb_free(scope->name); same for scope->version); apply the same guard pattern to all three cleanup locations that free scope and scope_log to avoid freeing static storage.
🧹 Nitpick comments (1)
src/opentelemetry/flb_opentelemetry_otlp_proto.c (1)
1403-1413: Dead code:decoded_count == 0check is unreachable.The validation at lines 1394-1401 requires
decoded_count > 0for theCMT_DECODE_MSGPACK_INSUFFICIENT_DATAcase (the only reachable branch after the while loop). If we reach line 1403,decoded_countis guaranteed to be > 0, making this check always false.This is defensive code that does no harm, but could be removed for clarity.
Suggested simplification
if (!((ret == CMT_DECODE_MSGPACK_INSUFFICIENT_DATA && offset == size && decoded_count > 0) || (ret == CMT_DECODE_MSGPACK_SUCCESS && offset == size))) { cmt_destroy(merged_context); set_error(result, FLB_OPENTELEMETRY_OTLP_PROTO_INVALID_ARGUMENT, EINVAL); return NULL; } - if (decoded_count == 0) { - cmt_destroy(merged_context); - output = cfl_sds_create_size(0); - if (output == NULL) { - set_error(result, FLB_OPENTELEMETRY_OTLP_PROTO_NOT_SUPPORTED, ENOMEM); - return NULL; - } - - set_result(result, FLB_OPENTELEMETRY_OTLP_PROTO_SUCCESS); - return (flb_sds_t) output; - } - encoded = cmt_encode_opentelemetry_create(merged_context);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/opentelemetry/flb_opentelemetry_otlp_proto.c` around lines 1403 - 1413, The if-block that checks decoded_count == 0 (which destroys merged_context, creates output, sets set_error or set_result and returns) is unreachable given the prior validation that requires decoded_count > 0; remove this entire conditional branch to simplify control flow and avoid dead code, leaving the normal path that continues processing merged_context and uses set_result/set_error later (references: decoded_count, merged_context, output, set_result, set_error).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/opentelemetry/flb_opentelemetry_otlp_proto.c`:
- Around line 978-985: The cleanup code frees scope->name and scope->version
unconditionally which can be the static protobuf_c_empty_string; change each
flb_free(scope->name) and flb_free(scope->version) call (the ones in the failure
path after add_msgpack_scope_fields and the two other scope-cleanup paths) to
only call flb_free if the pointer is not equal to protobuf_c_empty_string (e.g.,
if (scope->name && scope->name != protobuf_c_empty_string)
flb_free(scope->name); same for scope->version); apply the same guard pattern to
all three cleanup locations that free scope and scope_log to avoid freeing
static storage.
---
Nitpick comments:
In `@src/opentelemetry/flb_opentelemetry_otlp_proto.c`:
- Around line 1403-1413: The if-block that checks decoded_count == 0 (which
destroys merged_context, creates output, sets set_error or set_result and
returns) is unreachable given the prior validation that requires decoded_count >
0; remove this entire conditional branch to simplify control flow and avoid dead
code, leaving the normal path that continues processing merged_context and uses
set_result/set_error later (references: decoded_count, merged_context, output,
set_result, set_error).
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c432c824-a832-4f1f-9002-aea251db9907
📒 Files selected for processing (6)
plugins/out_null/null.cplugins/out_tcp/tcp_conf.cplugins/out_udp/udp_conf.cplugins/out_websocket/websocket_conf.csrc/flb_input_metric.csrc/opentelemetry/flb_opentelemetry_otlp_proto.c
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
include/fluent-bit/flb_opentelemetry.h (1)
114-120: Rename the shared OTLP log options type before this API freezes.
flb_opentelemetry_logs_to_otlp_proto()takingstruct flb_opentelemetry_otlp_json_options *bakes a JSON-specific type name into the new proto API. Since this is newly public surface, a neutral name likeflb_opentelemetry_otlp_logs_optionswill age better if the JSON and proto settings ever diverge.Also applies to: 249-252
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@include/fluent-bit/flb_opentelemetry.h` around lines 114 - 120, Rename the JSON-specific struct flb_opentelemetry_otlp_json_options to a neutral name flb_opentelemetry_otlp_logs_options and update all references to it (notably the parameter type in flb_opentelemetry_logs_to_otlp_proto and the other occurrences mentioned) so the public API no longer embeds “json”; keep the same fields and semantics but change the type name everywhere it is declared and used.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@include/fluent-bit/flb_opentelemetry.h`:
- Around line 239-252: The current *_to_otlp_proto functions
(flb_opentelemetry_traces_to_otlp_proto,
flb_opentelemetry_metrics_to_otlp_proto,
flb_opentelemetry_metrics_msgpack_to_otlp_proto,
flb_opentelemetry_logs_to_otlp_proto) return flb_sds_t but do not document a
safe destroy contract, causing callers to use the wrong free routine; add
explicit destroy helpers or an opaque proto object to normalize ownership:
either (A) introduce and export per-type destroy functions such as
flb_opentelemetry_traces_proto_destroy, flb_opentelemetry_metrics_proto_destroy,
flb_opentelemetry_logs_proto_destroy and update callers to use them, or (B)
change the API to return a single opaque handle type (e.g., struct
flb_otlp_proto *) and provide one flb_otlp_proto_destroy(handle) plus accessor
to get flb_sds_t if needed; update the header declarations accordingly and
ensure plugins/out_kafka and other callers use the new destroy API instead of
flb_sds_destroy/cmt/ctr specific destructors.
In `@plugins/out_stdout/stdout.c`:
- Around line 346-353: The code currently checks ctx->out_format against
FLB_PACK_JSON_FORMAT_OTLP/OTLP_PRETTY after an unconditional return for
FLB_EVENT_TYPE_PROFILES (and thus print_profiles_text() runs even when OTLP JSON
is requested); update the logic to either (A) detect the invalid combination
early and return an error when event_type == FLB_EVENT_TYPE_PROFILES and
ctx->out_format is FLB_PACK_JSON_FORMAT_OTLP or
FLB_PACK_JSON_FORMAT_OTLP_PRETTY, or (B) move handling of
FLB_EVENT_TYPE_PROFILES into the same format dispatch so print_otlp_json(ctx,
event_chunk) is invoked for profiles once a profile OTLP encoder exists;
reference ctx->out_format, FLB_PACK_JSON_FORMAT_OTLP,
FLB_PACK_JSON_FORMAT_OTLP_PRETTY, FLB_EVENT_TYPE_PROFILES,
print_profiles_text(), and print_otlp_json() when making the change.
- Around line 220-222: The fallback else branch currently returns -1 without
logging, so add a flb_plg_error() call before returning to record the
unsupported event chunk type (use event_chunk->type) and a short message like
"unsupported OTLP event chunk type" to stdout plugin's error stream; update the
else branch in stdout.c (the block that now just returns -1) to call
flb_plg_error(ctx->ins, "unsupported OTLP event chunk type: %d",
event_chunk->type) (or the existing logger/context variable used in this file)
and then return -1.
---
Nitpick comments:
In `@include/fluent-bit/flb_opentelemetry.h`:
- Around line 114-120: Rename the JSON-specific struct
flb_opentelemetry_otlp_json_options to a neutral name
flb_opentelemetry_otlp_logs_options and update all references to it (notably the
parameter type in flb_opentelemetry_logs_to_otlp_proto and the other occurrences
mentioned) so the public API no longer embeds “json”; keep the same fields and
semantics but change the type name everywhere it is declared and used.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 5cf9052a-4327-43a1-8a32-41da28fd169b
📒 Files selected for processing (3)
include/fluent-bit/flb_opentelemetry.hplugins/out_stdout/stdout.csrc/opentelemetry/flb_opentelemetry_otlp_json.c
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
It also extends stdout and kafka to format payloads in OTLP
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
New Features
Bug Fixes
Tests
Chores