Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions common/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,17 @@ type portFilter struct {
to uint16
}

var wellKnownPorts = map[uint16]struct{}{
50051: {}, // gRPC
}

func (f *portFilter) ShouldBeSkipped(port uint16) bool {
if f == nil {
return false
}
if _, ok := wellKnownPorts[port]; ok {
return false
}
return port >= f.from && port <= f.to
}

Expand Down
123 changes: 120 additions & 3 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ type PidFd struct {
Fd uint64
}

type pendingHttp2State struct {
parser *l7.Http2Parser
timestamp uint64
updatedAt time.Time
}

type ConnectionStats struct {
Count uint64
TotalTime time.Duration
Expand Down Expand Up @@ -129,10 +135,12 @@ type Container struct {
lastConnectionAttempts map[common.HostPort]time.Time
activeConnections map[ConnectionKey]*ActiveConnection
connectionsByPidFd map[PidFd]*ActiveConnection
pendingHttp2Parsers map[PidFd]*pendingHttp2State

l7Stats L7Stats
dnsStats *L7Metrics
seenFQDNs map[string]struct{}
l7Stats L7Stats
l7InboundStats L7InboundStats
dnsStats *L7Metrics
seenFQDNs map[string]struct{}

gpuStats map[string]*GpuUsage

Expand Down Expand Up @@ -186,7 +194,9 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, pid
lastConnectionAttempts: map[common.HostPort]time.Time{},
activeConnections: map[ConnectionKey]*ActiveConnection{},
connectionsByPidFd: map[PidFd]*ActiveConnection{},
pendingHttp2Parsers: map[PidFd]*pendingHttp2State{},
l7Stats: L7Stats{},
l7InboundStats: L7InboundStats{},
dnsStats: &L7Metrics{},
seenFQDNs: map[string]struct{}{},

Expand Down Expand Up @@ -454,6 +464,7 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
c.dnsStats.Latency.Collect(ch)
}
c.l7Stats.collect(ch)
c.l7InboundStats.collect(ch)

if !*flags.DisablePinger {
for ip, rtt := range c.ping() {
Expand Down Expand Up @@ -651,11 +662,39 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst, actualDst
prev.Closed = time.Now()
}
c.connectionsByPidFd[k] = connection
if pending, ok := c.pendingHttp2Parsers[k]; ok {
if pending.timestamp == 0 || pending.timestamp == timestamp {
connection.http2Parser = pending.parser
}
delete(c.pendingHttp2Parsers, k)
}
}
c.lastConnectionAttempts[key.Destination()] = time.Now()
}

func (c *Container) feedPendingHttp2(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) {
k := PidFd{Pid: pid, Fd: fd}
st := c.pendingHttp2Parsers[k]
if st == nil || (timestamp != 0 && st.timestamp != 0 && st.timestamp != timestamp) {
st = &pendingHttp2State{parser: l7.NewHttp2Parser(), timestamp: timestamp}
c.pendingHttp2Parsers[k] = st
}
if st.timestamp == 0 {
st.timestamp = timestamp
}
st.updatedAt = time.Now()
_ = st.parser.Parse(r.Method, r.Payload, uint64(r.Duration))
}

func (c *Container) onConnectionClose(e ebpftracer.Event) {
if e.IsInbound {
c.lock.Lock()
defer c.lock.Unlock()
if p := c.processes[e.Pid]; p != nil {
delete(p.inboundHttp2Parsers, e.Fd)
}
return
}
c.lock.Lock()
conn := c.connectionsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
c.lock.Unlock()
Expand Down Expand Up @@ -763,12 +802,20 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
c.lock.Lock()
defer c.lock.Unlock()

if r.IsInbound {
c.observeInboundL7(pid, fd, timestamp, r)
return nil
}

if r.Protocol == l7.ProtocolDNS {
return c.onDNSRequest(r)
}

conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
if conn == nil {
if r.Protocol == l7.ProtocolHTTP2 {
c.feedPendingHttp2(pid, fd, timestamp, r)
}
return nil
}
if timestamp != 0 && conn.Timestamp != timestamp {
Expand Down Expand Up @@ -859,6 +906,55 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
return nil
}

func (c *Container) observeInboundL7(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) {
protocol := r.Protocol
if protocol == l7.ProtocolHTTP2 {
p := c.processes[pid]
if p == nil {
return
}
if p.inboundHttp2Parsers == nil {
p.inboundHttp2Parsers = map[uint64]*inboundHttp2State{}
}
state := p.inboundHttp2Parsers[fd]
if state == nil || state.connTimestamp != timestamp {
state = &inboundHttp2State{
parser: l7.NewHttp2Parser(),
connTimestamp: timestamp,
}
p.inboundHttp2Parsers[fd] = state
}
requests := state.parser.Parse(r.Method, r.Payload, uint64(r.Duration))
if len(requests) == 0 {
return
}
stats := c.l7InboundStats.get(l7.ProtocolHTTP)
for _, req := range requests {
if common.HttpFilter.ShouldBeSkipped(req.Path) {
continue
}
status := req.Status.Http()
if req.GrpcStatus >= 0 {
status = req.GrpcStatus.GRPC()
}
stats.observe(status, "", req.Duration)
}
return
}
if _, ok := L7InboundRequests[protocol]; !ok {
return
}
stats := c.l7InboundStats.get(protocol)
switch protocol {
case l7.ProtocolHTTP:
stats.observe(r.Status.Http(), "", r.Duration)
case l7.ProtocolZookeeper:
stats.observe(r.Status.Zookeeper(), "", r.Duration)
default:
stats.observe(r.Status.String(), "", r.Duration)
}
}

func (c *Container) onRetransmission(src netaddr.IPPort, dst netaddr.IPPort) bool {
c.lock.Lock()
defer c.lock.Unlock()
Expand Down Expand Up @@ -1184,6 +1280,22 @@ func (c *Container) gc(now time.Time) {
listens := map[netaddr.IPPort]string{}
seenNamespaces := map[string]bool{}
for _, p := range c.processes {
if len(p.inboundHttp2Parsers) > 0 {
fds, err := proc.ReadFds(p.Pid)
if err == nil {
openFds := map[uint64]struct{}{}
for _, fd := range fds {
if fd.SocketInode != "" {
openFds[fd.Fd] = struct{}{}
}
}
for fd := range p.inboundHttp2Parsers {
if _, ok := openFds[fd]; !ok {
delete(p.inboundHttp2Parsers, fd)
}
}
}
}
if seenNamespaces[p.NetNsId()] {
continue
}
Expand Down Expand Up @@ -1222,6 +1334,11 @@ func (c *Container) gc(now time.Time) {
}
}
}
for k, st := range c.pendingHttp2Parsers {
if now.Sub(st.updatedAt) > gcInterval {
delete(c.pendingHttp2Parsers, k)
}
}
for dst, at := range c.lastConnectionAttempts {
_, active := establishedDst[dst]
if !active && !at.IsZero() && now.Sub(at) > gcInterval {
Expand Down
36 changes: 36 additions & 0 deletions containers/l7.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,39 @@ func (s L7Stats) delete(dst common.HostPort) {
}
}
}

type L7InboundStats map[l7.Protocol]*L7Metrics

func (s L7InboundStats) get(protocol l7.Protocol) *L7Metrics {
if protocol == l7.ProtocolHTTP2 {
protocol = l7.ProtocolHTTP
}
m := s[protocol]
if m != nil {
return m
}
m = &L7Metrics{}
s[protocol] = m
labels := []string{"status"}
switch protocol {
case l7.ProtocolRabbitmq, l7.ProtocolNats:
labels = append(labels, "method")
default:
hOpts := L7InboundLatency[protocol]
m.Latency = prometheus.NewHistogram(prometheus.HistogramOpts{Name: hOpts.Name, Help: hOpts.Help})
}
cOpts := L7InboundRequests[protocol]
m.Requests = prometheus.NewCounterVec(prometheus.CounterOpts{Name: cOpts.Name, Help: cOpts.Help}, labels)
return m
}

func (s L7InboundStats) collect(ch chan<- prometheus.Metric) {
for _, m := range s {
if m.Requests != nil {
m.Requests.Collect(ch)
}
if m.Latency != nil {
m.Latency.Collect(ch)
}
}
}
30 changes: 30 additions & 0 deletions containers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,36 @@ var (
l7.ProtocolZookeeper: {Name: "container_zookeeper_requests_duration_seconds_total", Help: "Histogram of the execution time for each outbound Zookeeper request"},
l7.ProtocolFoundationDB: {Name: "container_foundationdb_requests_duration_seconds_total", Help: "Histogram of the execution time for each outbound FoundationDB request"},
}
L7InboundRequests = map[l7.Protocol]prometheus.CounterOpts{
l7.ProtocolHTTP: {Name: "container_http_inbound_requests_total", Help: "Total number of inbound HTTP requests"},
l7.ProtocolPostgres: {Name: "container_postgres_inbound_queries_total", Help: "Total number of inbound Postgres queries"},
l7.ProtocolRedis: {Name: "container_redis_inbound_queries_total", Help: "Total number of inbound Redis queries"},
l7.ProtocolMemcached: {Name: "container_memcached_inbound_queries_total", Help: "Total number of inbound Memcached queries"},
l7.ProtocolMysql: {Name: "container_mysql_inbound_queries_total", Help: "Total number of inbound Mysql queries"},
l7.ProtocolMongo: {Name: "container_mongo_inbound_queries_total", Help: "Total number of inbound Mongo queries"},
l7.ProtocolKafka: {Name: "container_kafka_inbound_requests_total", Help: "Total number of inbound Kafka requests"},
l7.ProtocolCassandra: {Name: "container_cassandra_inbound_queries_total", Help: "Total number of inbound Cassandra requests"},
l7.ProtocolDubbo2: {Name: "container_dubbo_inbound_requests_total", Help: "Total number of inbound DUBBO requests"},
l7.ProtocolDNS: {Name: "container_dns_inbound_requests_total", Help: "Total number of inbound DNS requests"},
l7.ProtocolClickhouse: {Name: "container_clickhouse_inbound_queries_total", Help: "Total number of inbound ClickHouse queries"},
l7.ProtocolZookeeper: {Name: "container_zookeeper_inbound_requests_total", Help: "Total number of inbound Zookeeper requests"},
l7.ProtocolFoundationDB: {Name: "container_foundationdb_inbound_requests_total", Help: "Total number of inbound FoundationDB requests"},
}
L7InboundLatency = map[l7.Protocol]prometheus.HistogramOpts{
l7.ProtocolHTTP: {Name: "container_http_inbound_requests_duration_seconds_total", Help: "Histogram of the response time for each inbound HTTP request"},
l7.ProtocolPostgres: {Name: "container_postgres_inbound_queries_duration_seconds_total", Help: "Histogram of the execution time for each inbound Postgres query"},
l7.ProtocolRedis: {Name: "container_redis_inbound_queries_duration_seconds_total", Help: "Histogram of the execution time for each inbound Redis query"},
l7.ProtocolMemcached: {Name: "container_memcached_inbound_queries_duration_seconds_total", Help: "Histogram of the execution time for each inbound Memcached query"},
l7.ProtocolMysql: {Name: "container_mysql_inbound_queries_duration_seconds_total", Help: "Histogram of the execution time for each inbound Mysql query"},
l7.ProtocolMongo: {Name: "container_mongo_inbound_queries_duration_seconds_total", Help: "Histogram of the execution time for each inbound Mongo query"},
l7.ProtocolKafka: {Name: "container_kafka_inbound_requests_duration_seconds_total", Help: "Histogram of the execution time for each inbound Kafka request"},
l7.ProtocolCassandra: {Name: "container_cassandra_inbound_queries_duration_seconds_total", Help: "Histogram of the execution time for each inbound Cassandra request"},
l7.ProtocolDubbo2: {Name: "container_dubbo_inbound_requests_duration_seconds_total", Help: "Histogram of the response time for each inbound DUBBO request"},
l7.ProtocolDNS: {Name: "container_dns_inbound_requests_duration_seconds_total", Help: "Histogram of the response time for each inbound DNS request"},
l7.ProtocolClickhouse: {Name: "container_clickhouse_inbound_queries_duration_seconds_total", Help: "Histogram of the execution time for each inbound ClickHouse query"},
l7.ProtocolZookeeper: {Name: "container_zookeeper_inbound_requests_duration_seconds_total", Help: "Histogram of the execution time for each inbound Zookeeper request"},
l7.ProtocolFoundationDB: {Name: "container_foundationdb_inbound_requests_duration_seconds_total", Help: "Histogram of the execution time for each inbound FoundationDB request"},
}
)

func metric(name, help string, labels ...string) *prometheus.Desc {
Expand Down
8 changes: 8 additions & 0 deletions containers/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/coroot/coroot-node-agent/ebpftracer"
"github.com/coroot/coroot-node-agent/ebpftracer/l7"
"github.com/coroot/coroot-node-agent/flags"
"github.com/coroot/coroot-node-agent/gpu"
"github.com/coroot/coroot-node-agent/proc"
Expand Down Expand Up @@ -55,6 +56,13 @@ type Process struct {
pythonPrevStats *ebpftracer.PythonStats

gpuUsageSamples []gpu.ProcessUsageSample

inboundHttp2Parsers map[uint64]*inboundHttp2State
}

type inboundHttp2State struct {
parser *l7.Http2Parser
connTimestamp uint64
}

func NewProcess(pid uint32, stats *taskstats.Stats, tracer *ebpftracer.Tracer) *Process {
Expand Down
24 changes: 24 additions & 0 deletions containers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package containers

import (
"bytes"
"errors"
"fmt"
"os"
"regexp"
Expand All @@ -10,6 +11,7 @@ import (
"sync"
"time"

"github.com/cilium/ebpf"
"github.com/coroot/coroot-node-agent/cgroup"
"github.com/coroot/coroot-node-agent/common"
"github.com/coroot/coroot-node-agent/ebpftracer"
Expand Down Expand Up @@ -210,6 +212,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
}
}
r.ip2fqdnLock.Unlock()
r.gcActiveConnections()
case u := <-r.trafficStatsUpdateCh:
if u == nil {
continue
Expand Down Expand Up @@ -283,6 +286,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
case ebpftracer.EventTypeListenOpen:
if c := r.getOrCreateContainer(e.Pid); c != nil {
c.onListenOpen(e.Pid, e.SrcAddr, false)
c.attachTlsUprobes(r.tracer, e.Pid)
} else {
klog.Infoln("TCP listen open from unknown container", e)
}
Expand Down Expand Up @@ -438,6 +442,26 @@ func (r *Registry) updateStatsFromEbpfMapsIfNecessary() {
r.ebpfStatsLastUpdated = time.Now()
}

func (r *Registry) gcActiveConnections() {
iter := r.tracer.ActiveConnectionsIterator()
cid := ebpftracer.ConnectionId{}
conn := ebpftracer.Connection{}
var stale []ebpftracer.ConnectionId
for iter.Next(&cid, &conn) {
if _, err := os.Stat(proc.Path(cid.PID, "fd", strconv.FormatUint(cid.FD, 10))); err != nil {
stale = append(stale, cid)
}
}
if err := iter.Err(); err != nil {
klog.Warningln(err)
}
for _, k := range stale {
if err := r.tracer.DeleteActiveConnection(k); err != nil && !errors.Is(err, ebpf.ErrKeyNotExist) {
klog.Warningln(err)
}
}
}

func (r *Registry) updateTrafficStats() {
iter := r.tracer.ActiveConnectionsIterator()
cid := ebpftracer.ConnectionId{}
Expand Down
20 changes: 10 additions & 10 deletions ebpftracer/ebpf.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions ebpftracer/ebpf/l7/gotls.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ int go_crypto_tls_write_enter(struct pt_regs *ctx) {
}
char *buf_ptr = (char*)GO_PARAM2(ctx);
__u64 buf_size = GO_PARAM3(ctx);
return trace_enter_write(ctx, fd, 1, buf_ptr, buf_size, 0);
return trace_enter_write(ctx, fd, 1, 1, buf_ptr, buf_size, 0);
}

SEC("uprobe/go_crypto_tls_read_enter")
Expand All @@ -56,7 +56,7 @@ int go_crypto_tls_read_enter(struct pt_regs *ctx) {
__u64 goroutine_id = GOROUTINE(ctx);
__u64 pid = pid_tgid >> 32;
__u64 id = pid << 32 | goroutine_id | IS_TLS_READ_ID;
return trace_enter_read(id, pid, fd, buf_ptr, 0, 0);
return trace_enter_read(id, pid, fd, 1, buf_ptr, 0, 0);
}

SEC("uprobe/go_crypto_tls_read_exit")
Expand Down
Loading
Loading