Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions ingress/icmp_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (ip *icmpProxy) Serve(ctx context.Context) error {
if err != nil {
return err
}
reply, err := parseReply(from, buf[:n])
reply, err := parseReply(from, buf[:n], receivedTTL{})
if err != nil {
ip.logger.Debug().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply, continue to parse as full packet")
// In unit test, we found out when the listener listens on 0.0.0.0, the socket reads the full packet after
Expand All @@ -231,24 +231,36 @@ func (ip *icmpProxy) Serve(ctx context.Context) error {
}
}

func enableReceiveTTL(conn *icmp.PacketConn, listenIP netip.Addr) error {
return nil
}

func (ip *icmpProxy) handleFullPacket(ctx context.Context, decoder *packet.ICMPDecoder, rawPacket []byte) error {
icmpPacket, err := decoder.Decode(packet.RawPacket{Data: rawPacket})
reply, err := parseFullPacketReply(decoder, rawPacket)
if err != nil {
return err
}
if err := ip.sendReply(ctx, reply); err != nil {
return err
}
return nil
}

func parseFullPacketReply(decoder *packet.ICMPDecoder, rawPacket []byte) (*echoReply, error) {
icmpPacket, err := decoder.Decode(packet.RawPacket{Data: rawPacket})
if err != nil {
return nil, err
}
echo, err := getICMPEcho(icmpPacket.Message)
if err != nil {
return err
return nil, err
}
reply := echoReply{
return &echoReply{
from: icmpPacket.Src,
msg: icmpPacket.Message,
echo: echo,
}
if ip.sendReply(ctx, &reply); err != nil {
return err
}
return nil
ttl: receivedTTLFromIPHeader(icmpPacket.TTL),
}, nil
}

func (ip *icmpProxy) sendReply(ctx context.Context, reply *echoReply) error {
Expand All @@ -265,10 +277,16 @@ func (ip *icmpProxy) sendReply(ctx context.Context, reply *echoReply) error {
_, span := icmpFlow.responder.ReplySpan(ctx, ip.logger)
defer icmpFlow.responder.ExportSpan()

if err := icmpFlow.returnToSrc(reply); err != nil {
sent, err := icmpFlow.returnToSrc(reply)
if err != nil {
tracing.EndWithErrorStatus(span, err)
return err
}
if !sent {
ip.logger.Debug().Str("dst", reply.from.String()).Msg("Drop ICMP echo reply because TTL expired")
tracing.End(span)
return nil
}
observeICMPReply(ip.logger, span, reply.from.String(), reply.echo.ID, reply.echo.Seq)
span.SetAttributes(attribute.Int("originalEchoID", icmpFlow.originalEchoID))
tracing.End(span)
Expand Down
32 changes: 32 additions & 0 deletions ingress/icmp_darwin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,43 @@ import (
"net/netip"
"testing"

"github.com/google/gopacket/layers"
"github.com/stretchr/testify/require"
"golang.org/x/net/icmp"
"golang.org/x/net/ipv4"

"github.com/cloudflare/cloudflared/packet"
)

func TestParseFullPacketReplyUsesIPTTL(t *testing.T) {
t.Parallel()

pk := &packet.ICMP{
IP: &packet.IP{
Src: localhostIP,
Dst: localhostIP,
Protocol: layers.IPProtocolICMPv4,
TTL: 37,
},
Message: &icmp.Message{
Type: ipv4.ICMPTypeEchoReply,
Code: 0,
Body: &icmp.Echo{
ID: 12345,
Seq: 6789,
Data: []byte(t.Name()),
},
},
}
rawPacket, err := packet.NewEncoder().Encode(pk)
require.NoError(t, err)

reply, err := parseFullPacketReply(packet.NewICMPDecoder(), rawPacket.Data)
require.NoError(t, err)
require.True(t, reply.ttl.ok)
require.Equal(t, uint8(37), reply.ttl.value)
}

func TestSingleEchoIDTracker(t *testing.T) {
tracker := newEchoIDTracker()
key := flow3Tuple{
Expand Down
64 changes: 61 additions & 3 deletions ingress/icmp_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"github.com/pkg/errors"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/net/icmp"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"

"github.com/cloudflare/cloudflared/packet"
"github.com/cloudflare/cloudflared/tracing"
Expand Down Expand Up @@ -177,6 +180,55 @@ func (ip *icmpProxy) listenResponse(ctx context.Context, flow *icmpEchoFlow) {
}
}

func enableReceiveTTL(conn *icmp.PacketConn, listenIP netip.Addr) error {
if listenIP.Is4() {
ipv4Conn := conn.IPv4PacketConn()
if ipv4Conn == nil {
return nil
}
if err := ipv4Conn.SetControlMessage(ipv4.FlagTTL, true); err != nil {
return fmt.Errorf("failed to enable IPv4 TTL control message: %w", err)
}
return nil
}

ipv6Conn := conn.IPv6PacketConn()
if ipv6Conn == nil {
return nil
}
if err := ipv6Conn.SetControlMessage(ipv6.FlagHopLimit, true); err != nil {
return fmt.Errorf("failed to enable IPv6 hop limit control message: %w", err)
}
return nil
}

func readICMPReply(conn *icmp.PacketConn, buf []byte) (int, net.Addr, receivedTTL, error) {
if ipv4Conn := conn.IPv4PacketConn(); ipv4Conn != nil {
n, cm, from, err := ipv4Conn.ReadFrom(buf)
if err != nil {
return 0, nil, receivedTTL{}, err
}
if cm == nil {
return n, from, receivedTTL{}, nil
}
return n, from, receivedTTLFromControlMessage(cm.TTL), nil
}

if ipv6Conn := conn.IPv6PacketConn(); ipv6Conn != nil {
n, cm, from, err := ipv6Conn.ReadFrom(buf)
if err != nil {
return 0, nil, receivedTTL{}, err
}
if cm == nil {
return n, from, receivedTTL{}, nil
}
return n, from, receivedTTLFromControlMessage(cm.HopLimit), nil
}

n, from, err := conn.ReadFrom(buf)
return n, from, receivedTTL{}, err
}

// Listens for ICMP response and handles error logging
func (ip *icmpProxy) handleResponse(ctx context.Context, flow *icmpEchoFlow, buf []byte) (done bool) {
_, span := flow.responder.ReplySpan(ctx, ip.logger)
Expand All @@ -186,7 +238,7 @@ func (ip *icmpProxy) handleResponse(ctx context.Context, flow *icmpEchoFlow, buf
attribute.Int("originalEchoID", flow.originalEchoID),
)

n, from, err := flow.originConn.ReadFrom(buf)
n, from, ttl, err := readICMPReply(flow.originConn, buf)
if err != nil {
if flow.IsClosed() {
tracing.EndWithErrorStatus(span, fmt.Errorf("flow was closed"))
Expand All @@ -196,7 +248,7 @@ func (ip *icmpProxy) handleResponse(ctx context.Context, flow *icmpEchoFlow, buf
tracing.EndWithErrorStatus(span, err)
return true
}
reply, err := parseReply(from, buf[:n])
reply, err := parseReply(from, buf[:n], ttl)
if err != nil {
ip.logger.Error().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply")
tracing.EndWithErrorStatus(span, err)
Expand All @@ -209,11 +261,17 @@ func (ip *icmpProxy) handleResponse(ctx context.Context, flow *icmpEchoFlow, buf
return false
}

if err := flow.returnToSrc(reply); err != nil {
sent, err := flow.returnToSrc(reply)
if err != nil {
ip.logger.Error().Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply")
tracing.EndWithErrorStatus(span, err)
return false
}
if !sent {
ip.logger.Debug().Str("dst", from.String()).Msg("Drop ICMP echo reply because TTL expired")
tracing.End(span)
return false
}

observeICMPReply(ip.logger, span, from.String(), reply.echo.ID, reply.echo.Seq)
tracing.End(span)
Expand Down
35 changes: 29 additions & 6 deletions ingress/icmp_posix.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,27 @@ import (

// Opens a non-privileged ICMP socket on Linux and Darwin
func newICMPConn(listenIP netip.Addr) (*icmp.PacketConn, error) {
var (
network string
err error
)
if listenIP.Is4() {
return icmp.ListenPacket("udp4", listenIP.String())
network = "udp4"
} else {
network = "udp6"
}
return icmp.ListenPacket("udp6", listenIP.String())

conn, err := icmp.ListenPacket(network, listenIP.String())
if err != nil {
return nil, err
}
if err := enableReceiveTTL(conn, listenIP); err != nil {
if closeErr := conn.Close(); closeErr != nil {
return nil, fmt.Errorf("%w; failed to close ICMP socket after error: %v", err, closeErr)
}
return nil, err
}
return conn, nil
}

func netipAddr(addr net.Addr) (netip.Addr, bool) {
Expand Down Expand Up @@ -120,29 +137,34 @@ func (ief *icmpEchoFlow) sendToDst(dst netip.Addr, msg *icmp.Message) error {
}

// returnToSrc rewrites the echo ID to the original echo ID from the eyeball
func (ief *icmpEchoFlow) returnToSrc(reply *echoReply) error {
func (ief *icmpEchoFlow) returnToSrc(reply *echoReply) (bool, error) {
ief.UpdateLastActive()
ttl, shouldForward := reply.ttl.forwardedTTL()
if !shouldForward {
return false, nil
}
reply.echo.ID = ief.originalEchoID
reply.msg.Body = reply.echo
pk := packet.ICMP{
IP: &packet.IP{
Src: reply.from,
Dst: ief.src,
Protocol: layers.IPProtocol(reply.msg.Type.Protocol()),
TTL: packet.DefaultTTL,
TTL: ttl,
},
Message: reply.msg,
}
return ief.responder.ReturnPacket(&pk)
return true, ief.responder.ReturnPacket(&pk)
}

type echoReply struct {
from netip.Addr
msg *icmp.Message
echo *icmp.Echo
ttl receivedTTL
}

func parseReply(from net.Addr, rawMsg []byte) (*echoReply, error) {
func parseReply(from net.Addr, rawMsg []byte, ttl receivedTTL) (*echoReply, error) {
fromAddr, ok := netipAddr(from)
if !ok {
return nil, fmt.Errorf("cannot convert %s to netip.Addr", from)
Expand All @@ -163,6 +185,7 @@ func parseReply(from net.Addr, rawMsg []byte) (*echoReply, error) {
from: fromAddr,
msg: msg,
echo: echo,
ttl: ttl,
}, nil
}

Expand Down
69 changes: 69 additions & 0 deletions ingress/icmp_posix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,75 @@ import (
"github.com/cloudflare/cloudflared/packet"
)

func TestReturnToSrcUsesReplyTTL(t *testing.T) {
t.Parallel()

const originalEchoID = 42573
muxer := newMockMuxer(1)
responder := newPacketResponder(muxer, 0, packet.NewEncoder())
flow := newICMPEchoFlow(localhostIP, func() error { return nil }, nil, responder, 0, originalEchoID)

sent, err := flow.returnToSrc(&echoReply{
from: localhostIP,
msg: &icmp.Message{
Type: ipv4.ICMPTypeEchoReply,
Code: 0,
},
echo: &icmp.Echo{
ID: 12345,
Seq: 6789,
Data: []byte(t.Name()),
},
ttl: receivedTTLFromIPHeader(42),
})
require.NoError(t, err)
require.True(t, sent)

resp := <-muxer.cfdToEdge
decoder := packet.NewICMPDecoder()
decoded, err := decoder.Decode(packet.RawPacket{Data: resp.Payload()})
require.NoError(t, err)
require.Equal(t, uint8(41), decoded.TTL)
require.Equal(t, localhostIP, decoded.Src)
require.Equal(t, localhostIP, decoded.Dst)
require.Equal(t, ipv4.ICMPTypeEchoReply, decoded.Type)
require.Equal(t, &icmp.Echo{
ID: originalEchoID,
Seq: 6789,
Data: []byte(t.Name()),
}, decoded.Body)
}

func TestReturnToSrcDropsExpiredReplyTTL(t *testing.T) {
t.Parallel()

muxer := newMockMuxer(1)
responder := newPacketResponder(muxer, 0, packet.NewEncoder())
flow := newICMPEchoFlow(localhostIP, func() error { return nil }, nil, responder, 0, 42573)

sent, err := flow.returnToSrc(&echoReply{
from: localhostIP,
msg: &icmp.Message{
Type: ipv4.ICMPTypeEchoReply,
Code: 0,
},
echo: &icmp.Echo{
ID: 12345,
Seq: 6789,
Data: []byte(t.Name()),
},
ttl: receivedTTLFromIPHeader(1),
})
require.NoError(t, err)
require.False(t, sent)

select {
case pk := <-muxer.cfdToEdge:
t.Fatalf("received unexpected ICMP reply: %+v", pk)
default:
}
}

func TestFunnelIdleTimeout(t *testing.T) {
defer leaktest.Check(t)()

Expand Down
Loading