Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions ebpftracer/ebpf.go

Large diffs are not rendered by default.

32 changes: 20 additions & 12 deletions ebpftracer/ebpf/l7/l7.c
Original file line number Diff line number Diff line change
Expand Up @@ -358,15 +358,18 @@ int handle_request(void *ctx, struct connection_id cid, struct connection *conn,
req->protocol = PROTOCOL_MYSQL;
} else if (is_mongo_query(payload, size)) {
req->protocol = PROTOCOL_MONGO;
} else if (!conn->is_inbound && is_rabbitmq_produce(payload, size)) {
struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
if (!e) {
return 0;
} else if (is_rabbitmq_method_frame(payload, size)) {
if (!conn->is_inbound && rabbitmq_method_matches(payload, RABBITMQ_METHOD_PUBLISH)) {
struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
if (!e) {
return 0;
}
e->protocol = PROTOCOL_RABBITMQ;
e->method = METHOD_PRODUCE;
e->status = STATUS_OK;
e->is_inbound = 0;
send_event(ctx, e, cid, conn);
}
e->protocol = PROTOCOL_RABBITMQ;
e->method = METHOD_PRODUCE;
e->is_inbound = 0;
send_event(ctx, e, cid, conn);
return 0;
} else if (!conn->is_inbound && nats_method(payload, size) == METHOD_PRODUCE) {
struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
Expand All @@ -375,6 +378,7 @@ int handle_request(void *ctx, struct connection_id cid, struct connection *conn,
}
e->protocol = PROTOCOL_NATS;
e->method = METHOD_PRODUCE;
e->status = STATUS_OK;
e->is_inbound = 0;
send_event(ctx, e, cid, conn);
return 0;
Expand Down Expand Up @@ -532,15 +536,19 @@ int handle_response(void *ctx, struct connection_id cid, struct connection *conn
e->is_inbound = conn->is_inbound;

if (!conn->is_inbound) {
if (is_rabbitmq_consume(payload, ret)) {
e->protocol = PROTOCOL_RABBITMQ;
e->method = METHOD_CONSUME;
send_event(ctx, e, cid, conn);
if (is_rabbitmq_method_frame(payload, ret)) {
if (rabbitmq_method_matches(payload, RABBITMQ_METHOD_DELIVER)) {
e->protocol = PROTOCOL_RABBITMQ;
e->method = METHOD_CONSUME;
e->status = STATUS_OK;
send_event(ctx, e, cid, conn);
}
return 0;
}
if (nats_method(payload, ret) == METHOD_CONSUME) {
e->protocol = PROTOCOL_NATS;
e->method = METHOD_CONSUME;
e->status = STATUS_OK;
send_event(ctx, e, cid, conn);
return 0;
}
Expand Down
23 changes: 7 additions & 16 deletions ebpftracer/ebpf/l7/rabbitmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#define RABBITMQ_METHOD_DELIVER 60

static __always_inline
int rabbitmq_method_is(char *buf, __u64 buf_size, __u16 expected_method) {
int is_rabbitmq_method_frame(char *buf, __u64 buf_size) {
if (buf_size < 12) {
return 0;
}
Expand All @@ -32,6 +32,11 @@ int rabbitmq_method_is(char *buf, __u64 buf_size, __u16 expected_method) {
return 0;
}

return 1;
}

static __always_inline
int rabbitmq_method_matches(char *buf, __u16 expected_method) {
__u16 class = 0;
bpf_read(buf+7, class);
if (bpf_htons(class) != RABBITMQ_CLASS_BASIC) {
Expand All @@ -40,19 +45,5 @@ int rabbitmq_method_is(char *buf, __u64 buf_size, __u16 expected_method) {

__u16 method = 0;
bpf_read(buf+9, method);
if (bpf_htons(method) != expected_method) {
return 0;
}

return 1;
}

static __always_inline
int is_rabbitmq_produce(char *buf, __u64 buf_size) {
return rabbitmq_method_is(buf, buf_size, RABBITMQ_METHOD_PUBLISH);
}

static __always_inline
int is_rabbitmq_consume(char *buf, __u64 buf_size) {
return rabbitmq_method_is(buf, buf_size, RABBITMQ_METHOD_DELIVER);
return bpf_htons(method) == expected_method;
}
23 changes: 16 additions & 7 deletions ebpftracer/l7/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@ package l7

import (
"bytes"
"unicode/utf8"

"github.com/ClickHouse/ch-go/proto"
)

func ParseClickhouse(payload []byte) string {
func ParseClickhouse(payload []byte) (query string) {
defer func() {
if recover() != nil {
query = ""
}
}()
r := proto.NewReader(bytes.NewReader(payload))
var err error
if _, err = r.Byte(); err != nil {
Expand Down Expand Up @@ -53,14 +59,17 @@ func ParseClickhouse(payload []byte) string {
if err != nil {
return ""
}
query := make([]byte, min(l, 1024))
n, _ := r.Read(query)
query = bytes.TrimSpace(query[:n])
if len(query) == 0 {
buf := make([]byte, min(l, 1024))
n, _ := r.Read(buf)
buf = bytes.TrimSpace(buf[:n])
if len(buf) == 0 {
return ""
}
if !utf8.Valid(buf) { // not a real query: misclassified or corrupted payload
return ""
}
if n < l {
query = append(query[:len(query)-1], []byte("...<TRUNCATED>")...)
buf = append(buf[:len(buf)-1], []byte("...<TRUNCATED>")...)
}
return string(query)
return string(buf)
}
12 changes: 12 additions & 0 deletions ebpftracer/l7/l7_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,18 @@ func TestParseClickHouse(t *testing.T) {
ParseClickhouse(payload),
)

payload = []byte{ // malformed: huge string length must not panic the parser
0x01,
0x00,
0x01,
0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01, // string length = 2^56
}

assert.Equal(t,
``,
ParseClickhouse(payload),
)

}

func TestParseZookeeper(t *testing.T) {
Expand Down
Loading