Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 66 additions & 14 deletions plugins/in_tcp/tcp_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@
#include "tcp.h"
#include "tcp_conn.h"

static inline void consume_bytes(char *buf, int bytes, int length)
static inline void consume_bytes(char *buf, size_t bytes, size_t length)
{
if (bytes == 0 || bytes > length) {
return;
}

memmove(buf, buf + bytes, length - bytes);
}

Expand Down Expand Up @@ -132,9 +136,11 @@ static inline int process_pack(struct tcp_conn *conn,
msgpack_unpacked_destroy(&result);

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
flb_input_log_append(conn->ins, NULL, 0,
ctx->log_encoder->output_buffer,
ctx->log_encoder->output_length);
if (ctx->log_encoder->output_length > 0) {
flb_input_log_append(conn->ins, NULL, 0,
ctx->log_encoder->output_buffer,
ctx->log_encoder->output_length);
}
ret = 0;
}
else {
Expand All @@ -152,6 +158,7 @@ static ssize_t parse_payload_json(struct tcp_conn *conn)
int ret;
int out_size;
char *pack;
ssize_t processed;

ret = flb_pack_json_state(conn->buf_data, conn->buf_len,
&pack, &out_size, &conn->pack_state);
Expand All @@ -170,10 +177,23 @@ static ssize_t parse_payload_json(struct tcp_conn *conn)
}

/* Process the packaged JSON and return the last byte used */
process_pack(conn, pack, out_size);
if (out_size < 0) {
flb_free(pack);
return -1;
}

ret = process_pack(conn, pack, (size_t) out_size);
flb_free(pack);
if (ret < 0) {
return -1;
}

processed = conn->pack_state.last_byte;
if (processed < 0 || processed > conn->buf_len) {
return -1;
}

return conn->pack_state.last_byte;
return processed;
}

/*
Expand All @@ -183,8 +203,8 @@ static ssize_t parse_payload_json(struct tcp_conn *conn)
static ssize_t parse_payload_none(struct tcp_conn *conn)
{
int ret;
int len;
int sep_len;
size_t len;
size_t sep_len;
size_t consumed = 0;
char *buf;
char *s;
Expand All @@ -202,7 +222,7 @@ static ssize_t parse_payload_none(struct tcp_conn *conn)
flb_log_event_encoder_reset(ctx->log_encoder);

while ((s = strstr(buf, separator))) {
len = (s - buf);
len = (size_t) (s - buf);
if (len == 0) {
break;
}
Expand Down Expand Up @@ -237,9 +257,11 @@ static ssize_t parse_payload_none(struct tcp_conn *conn)
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
flb_input_log_append(conn->ins, NULL, 0,
ctx->log_encoder->output_buffer,
ctx->log_encoder->output_length);
if (ctx->log_encoder->output_length > 0) {
flb_input_log_append(conn->ins, NULL, 0,
ctx->log_encoder->output_buffer,
ctx->log_encoder->output_length);
}
}
else {
flb_plg_error(ctx->ins, "log event encoding error : %d", ret);
Expand Down Expand Up @@ -336,7 +358,14 @@ int tcp_conn_event(void *data)
}
else if (ret_payload == -1) {
flb_pack_state_reset(&conn->pack_state);
flb_pack_state_init(&conn->pack_state);
if (flb_pack_state_init(&conn->pack_state) == -1) {
flb_plg_error(ctx->ins,
"fd=%i failed to reinitialize JSON parser state",
event->fd);
conn->pending_close = FLB_TRUE;
ret = -1;
goto cleanup;
}
conn->pack_state.multiple = FLB_TRUE;
ret = -1;
goto cleanup;
Expand All @@ -356,7 +385,30 @@ int tcp_conn_event(void *data)
}


consume_bytes(conn->buf_data, ret_payload, conn->buf_len);
if (ret_payload < 0 || ret_payload > conn->buf_len) {
flb_plg_warn(ctx->ins,
"fd=%i invalid payload consume length=%zd buf_len=%i",
event->fd, ret_payload, conn->buf_len);

if (ctx->format == FLB_TCP_FMT_JSON) {
flb_pack_state_reset(&conn->pack_state);
if (flb_pack_state_init(&conn->pack_state) == -1) {
flb_plg_error(ctx->ins,
"fd=%i failed to reinitialize JSON parser state",
event->fd);
conn->pending_close = FLB_TRUE;
ret = -1;
goto cleanup;
}
conn->pack_state.multiple = FLB_TRUE;
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

conn->buf_len = 0;
ret = -1;
goto cleanup;
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

consume_bytes(conn->buf_data, (size_t) ret_payload, (size_t) conn->buf_len);
conn->buf_len -= ret_payload;
conn->buf_data[conn->buf_len] = '\0';

Expand Down
Loading
Loading