From 237fd6a7d5d521dbd5faecf3edc627810c5b9629 Mon Sep 17 00:00:00 2001 From: Namik Mesic Date: Fri, 28 Mar 2025 15:11:41 +0100 Subject: [PATCH 1/5] Initial demo --- go.mod | 16 +++- go.sum | 37 +++++--- pkg/nats/client.go | 194 ++++++++++++++++++++++++++++++++++++++++ pkg/nats/config.go | 40 +++++++++ pkg/nats/transmitter.go | 138 ++++++++++++++++++++++++++++ rpc/mtls/mtls.go | 71 ++++++++++++++- 6 files changed, 478 insertions(+), 18 deletions(-) create mode 100644 pkg/nats/client.go create mode 100644 pkg/nats/config.go create mode 100644 pkg/nats/transmitter.go diff --git a/go.mod b/go.mod index 2837489..7bdd5b0 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,8 @@ require ( github.com/ethereum/go-ethereum v1.15.3 github.com/hashicorp/go-plugin v1.6.3 github.com/leanovate/gopter v0.2.11 + github.com/nats-io/nats-server/v2 v2.11.0 + github.com/nats-io/nats.go v1.40.1 github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chainlink-common v0.4.2-0.20250130202959-6f1f48342e36 github.com/smartcontractkit/libocr v0.0.0-20250220133800-f3b940c4f298 @@ -52,6 +54,7 @@ require ( github.com/gogo/protobuf v1.3.3 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/go-cmp v0.6.0 // indirect + github.com/google/go-tpm v0.9.3 // indirect github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.5.3 // indirect github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 // indirect @@ -78,9 +81,13 @@ require ( github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.14 // indirect + github.com/minio/highwayhash v1.0.3 // indirect github.com/mmcloughlin/addchain v0.4.0 // indirect github.com/mr-tron/base58 v1.2.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/nats-io/jwt/v2 v2.7.3 // indirect + github.com/nats-io/nkeys v0.4.10 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/oklog/run v1.1.0 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/pkg/errors v0.9.1 // indirect @@ -124,11 +131,12 @@ require ( go.opentelemetry.io/proto/otlp v1.5.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/crypto v0.33.0 // indirect + golang.org/x/crypto v0.36.0 // indirect golang.org/x/net v0.35.0 // indirect - golang.org/x/sync v0.11.0 // indirect - golang.org/x/sys v0.30.0 // indirect - golang.org/x/text v0.22.0 // indirect + golang.org/x/sync v0.12.0 // indirect + golang.org/x/sys v0.31.0 // indirect + golang.org/x/text v0.23.0 // indirect + golang.org/x/time v0.11.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250219182151-9fdb1cabc7b2 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250219182151-9fdb1cabc7b2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 3aa376e..e6ac029 100644 --- a/go.sum +++ b/go.sum @@ -53,6 +53,8 @@ github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkT github.com/XSAM/otelsql v0.29.0 h1:pEw9YXXs8ZrGRYfDc0cmArIz9lci5b42gmP5+tA1Huc= github.com/XSAM/otelsql v0.29.0/go.mod h1:d3/0xGIGC5RVEE+Ld7KotwaLy6zDeaF3fLJHOPpdN2w= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op h1:+OSa/t11TFhqfrX0EOSqQBDJ0YlpmK0rDSiB19dg9M0= +github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E= github.com/apache/arrow-go/v18 v18.0.0 h1:1dBDaSbH3LtulTyOVYaBCHO3yVRwjV+TZaqn3g6V7ZM= github.com/apache/arrow-go/v18 v18.0.0/go.mod h1:t6+cWRSmKgdQ6HsxisQjok+jBpKGhRDiqcf3p0p/F+A= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= @@ -246,6 +248,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-tpm v0.9.3 h1:+yx0/anQuGzi+ssRqeD6WpXjW2L/V0dItUayO0i9sRc= +github.com/google/go-tpm v0.9.3/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -441,6 +445,8 @@ github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= +github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= +github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= @@ -463,6 +469,16 @@ github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE= +github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4= +github.com/nats-io/nats-server/v2 v2.11.0 h1:fdwAT1d6DZW/4LUz5rkvQUe5leGEwjjOQYntzVRKvjE= +github.com/nats-io/nats-server/v2 v2.11.0/go.mod h1:leXySghbdtXSUmWem8K9McnJ6xbJOb0t9+NQ5HTRZjI= +github.com/nats-io/nats.go v1.40.1 h1:MLjDkdsbGUeCMKFyCFoLnNn/HDTqcgVa3EQm+pMNDPk= +github.com/nats-io/nats.go v1.40.1/go.mod h1:wV73x0FSI/orHPSYoyMeJB+KajMDoWyXmFaRrrYaaTo= +github.com/nats-io/nkeys v0.4.10 h1:glmRrpCmYLHByYcePvnTBEAwawwapjCPMjy2huw20wc= +github.com/nats-io/nkeys v0.4.10/go.mod h1:OjRrnIKnWBFl+s4YK5ChQfvHP2fxqZexrKJoVVyWB3U= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo= github.com/neelance/sourcemap v0.0.0-20200213170602-2833bce08e4c/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= @@ -695,8 +711,8 @@ golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ= -golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= -golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= +golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= +golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -814,8 +830,8 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= -golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= +golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -885,8 +901,9 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= -golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -907,13 +924,13 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= -golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= +golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= +golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= -golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0= +golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/pkg/nats/client.go b/pkg/nats/client.go new file mode 100644 index 0000000..9bab553 --- /dev/null +++ b/pkg/nats/client.go @@ -0,0 +1,194 @@ +package nats + +import ( + "context" + "crypto" + "crypto/ed25519" + "encoding/hex" + "errors" + "fmt" + "strings" + "time" + + "github.com/nats-io/nats.go" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-data-streams/rpc" + "github.com/smartcontractkit/chainlink-data-streams/rpc/mtls" +) + +type Client interface { + services.Service + Transmit(ctx context.Context, payload []byte, dedupKey string, donID uint32) error +} + +var _ Client = (*client)(nil) + +type client struct { + services.Service + + lggr logger.Logger + clientSigner crypto.Signer + serverPubKey ed25519.PublicKey + serverURLs []string + clientPubKeyHex string + + conn *nats.Conn + js nats.JetStreamContext +} + +func NewClient(opts ClientOpts) (Client, error) { + if err := opts.verifyConfig(); err != nil { + return nil, fmt.Errorf("invalid configuration: %w", err) + } + + c := &client{ + lggr: opts.Logger, + clientSigner: opts.ClientSigner, + serverPubKey: opts.ServerPubKey, + serverURLs: opts.ServerURLs, + } + + svc, _ := services.Config{ + Name: "NATSClient", + Start: c.start, + Close: c.close, + }.NewServiceEngine(opts.Logger) + c.Service = svc + + return c, nil +} + +// connect creates a new NATS connection with the given configuration +func (c *client) connect() (*nats.Conn, error) { + cMtls, err := mtls.NewTLSTransportSigner(c.clientSigner, []ed25519.PublicKey{c.serverPubKey}) + if err != nil { + return nil, fmt.Errorf("failed to create client mTLS credentials: %w", err) + } + options := []nats.Option{ + // Connection settings + nats.ReconnectWait(1 * time.Second), + nats.RetryOnFailedConnect(true), + nats.MaxReconnects(-1), + nats.ReconnectBufSize(256 * 1024 * 1024), // 256MB + // Timeouts and keepalive + nats.PingInterval(1 * time.Second), + nats.Timeout(5 * time.Second), + nats.TLSHandshakeFirst(), + nats.Secure(cMtls), + nats.Name(c.getClientPubKeyHex()), + // Connection handlers for various NATS events + nats.ConnectHandler(func(nc *nats.Conn) { + c.lggr.Info("NATS client connection established", "server_id", nc.ConnectedServerId(), "server_url", nc.ConnectedUrl()) + }), + nats.ReconnectHandler(func(nc *nats.Conn) { + c.lggr.Info("NATS client reconnected", nc.ConnectedServerId(), "server_url", nc.ConnectedUrl(), "total_reconnects", nc.Reconnects) + }), + nats.ReconnectErrHandler(func(nc *nats.Conn, err error) { + c.lggr.Error("NATS client reconnected with error", nc.ConnectedServerId(), "server_url", nc.ConnectedUrl(), "error", err) + }), + nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { + c.lggr.Error("NATS client disconnected with error", nc.ConnectedServerId(), "server_url", nc.ConnectedUrl(), "total_reconnects", nc.Reconnects, "error", err) + }), + nats.ClosedHandler(func(nc *nats.Conn) { + c.lggr.Warn("NATS client closed", "server_id", nc.ConnectedServerId(), "server_url", nc.ConnectedUrl()) + }), + // Error handler for subscriptions + nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) { + c.lggr.Error("NATS client subscription error", "server_id", nc.ConnectedServerId(), "server_url", nc.ConnectedUrl(), "error", err, "subject", sub.Subject, "queue", sub.Queue) + }), + } + + nc, err := nats.Connect(strings.Join(c.serverURLs, ","), options...) + if err != nil { + return nil, fmt.Errorf("failed to create NATS connection: %w", err) + } + + jsOptions := []nats.JSOpt{ + nats.PublishAsyncMaxPending(256 * 1024), // Allow large number of async publishes + nats.PublishAsyncTimeout(100 * time.Millisecond), + nats.MaxWait(100 * time.Millisecond), + } + + // Create the JetStream context + c.js, err = nc.JetStream(jsOptions...) + if err != nil { + return nil, fmt.Errorf("failed to create JetStream context: %w", err) + } + + return nc, nil +} + +func (c *client) start(context.Context) error { + nc, err := c.connect() + // if there is only one server URL, use it + if err != nil { + return err + } + c.conn = nc + return nil +} + +func (c *client) close() error { + if c.conn != nil { + return c.conn.Drain() + } + return nil +} + +func (c *client) Transmit(ctx context.Context, payload []byte, dedupKey string, donID uint32) error { + subject := fmt.Sprintf("%s.%s.%s", donID, c.getClientPubKeyHex()) + + pubOpts := []nats.PubOpt{ + nats.MsgId(dedupKey), + nats.StallWait(200 * time.Millisecond), + nats.MsgTTL(24 * time.Hour), + } + ack, err := c.js.PublishAsync(subject, payload, pubOpts...) + if err != nil { + return fmt.Errorf("failed to publish to fast lane: %w", err) + } + select { + case <-ack.Ok(): + return nil + case <-time.After(1 * time.Second): + return fmt.Errorf("fast lane publish timed out") + } +} + +func (c *client) LatestReport(ctx context.Context, req *rpc.LatestReportRequest) (resp *rpc.LatestReportResponse, err error) { + return nil, errors.New("LatestReport is not supported in nats mode") +} + +func (c *client) Name() string { + if c.lggr == nil { + return "NATSClient" + } + return c.lggr.Name() +} + +func (c *client) Healthy() error { + switch { + case c.conn == nil: + return fmt.Errorf("NATS connection is nil") + case !c.conn.IsConnected(): + return fmt.Errorf("NATS connection is %s", c.conn.Status()) + default: + return nil + } +} + +func (c *client) Ready() error { + if c.conn == nil || !c.conn.IsConnected() { + return errors.New("NATS connection is not ready") + } + return nil +} + +func (c *client) HealthReport() map[string]error { + return map[string]error{c.Name(): c.Healthy()} +} + +func (c *client) getClientPubKeyHex() string { + return hex.EncodeToString(c.clientSigner.Public().(ed25519.PublicKey)) +} diff --git a/pkg/nats/config.go b/pkg/nats/config.go new file mode 100644 index 0000000..64aac36 --- /dev/null +++ b/pkg/nats/config.go @@ -0,0 +1,40 @@ +package nats + +import ( + "crypto" + "crypto/ed25519" + "fmt" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +type ClientOpts struct { + Logger logger.Logger + ClientSigner crypto.Signer + ServerPubKey ed25519.PublicKey + ServerURLs []string +} + +// verifyConfig validates all required fields are properly set +func (c *ClientOpts) verifyConfig() error { + var errs []error + + if c.Logger == nil { + errs = append(errs, fmt.Errorf("logger is required for NATS client")) + } + if c.ClientSigner == nil { + errs = append(errs, fmt.Errorf("client signer is required for NATS client")) + } + if len(c.ServerPubKey) == 0 { + errs = append(errs, fmt.Errorf("server public key is required for NATS client")) + } + if len(c.ServerURLs) == 0 { + errs = append(errs, fmt.Errorf("at least one server URL is required for NATS client")) + } + + if len(errs) > 0 { + return fmt.Errorf("invalid NATS client configuration: %v", errs) + } + + return nil +} diff --git a/pkg/nats/transmitter.go b/pkg/nats/transmitter.go new file mode 100644 index 0000000..423a782 --- /dev/null +++ b/pkg/nats/transmitter.go @@ -0,0 +1,138 @@ +package nats + +import ( + "context" + "crypto" + "crypto/ed25519" + "encoding/hex" + "fmt" + "sync" + + "github.com/cespare/xxhash/v2" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" +) + +type TransmitterOpts struct { + Logger logger.Logger + FromAccount string + DonID uint32 + ServerURLs []string + ClientSigner crypto.Signer + ServerPubKey ed25519.PublicKey +} + +type transmitter struct { + services.StateMachine + services.Service + + lggr logger.Logger + fromAccount string + donID uint32 + serverURL []string + client Client + + // Reusable hash instance with mutex for thread safety + hashPool sync.Pool +} + +func NewTransmitter(opts TransmitterOpts) (llotypes.Transmitter, error) { + t := &transmitter{ + lggr: opts.Logger, + fromAccount: opts.FromAccount, + donID: opts.DonID, + serverURL: opts.ServerURLs, + } + + // Initialize the hash pool + t.hashPool.New = func() interface{} { + return xxhash.New() + } + + // Create NATS client + clientOpts := ClientOpts{ + Logger: opts.Logger, + ClientSigner: opts.ClientSigner, + ServerPubKey: opts.ServerPubKey, + ServerURLs: opts.ServerURLs, + } + client, err := NewClient(clientOpts) + if err != nil { + return nil, fmt.Errorf("failed to create NATS client: %w", err) + } + t.client = client + + // Initialize service with subservices + svc, _ := services.Config{ + Name: "NATSTransmitter", + Start: func(ctx context.Context) error { return nil }, + Close: func() error { return nil }, + NewSubServices: func(lggr logger.Logger) []services.Service { + return []services.Service{client} + }, + }.NewServiceEngine(opts.Logger) + t.Service = svc + + return t, nil +} + +func (t *transmitter) Transmit( + ctx context.Context, + digest ocr2types.ConfigDigest, + seqNr uint64, + report ocr3types.ReportWithInfo[llotypes.ReportInfo], + sigs []ocr2types.AttributedOnchainSignature, +) error { + if !t.IfStarted(func() {}) { + return fmt.Errorf("transmitter is not started") + } + + // Get a hash instance from the pool + h := t.hashPool.Get().(*xxhash.Digest) + defer t.hashPool.Put(h) + + // Reset the hash instance before use + h.Reset() + h.Write(report.Report) + dedupeKey := hex.EncodeToString(h.Sum(nil)) + + err := t.client.Transmit(ctx, report.Report, dedupeKey, t.donID) + if err != nil { + t.lggr.Errorw("Failed to transmit report", + "error", err, + "digest", digest, + "seqNr", seqNr, + "reportFormat", report.Info.ReportFormat, + ) + return err + } + + t.lggr.Debugw("Successfully transmitted report", + "digest", digest, + "seqNr", seqNr, + "reportFormat", report.Info.ReportFormat, + ) + return nil +} + +func (t *transmitter) FromAccount(ctx context.Context) (ocr2types.Account, error) { + return ocr2types.Account(t.fromAccount), nil +} + +func (t *transmitter) Ready() error { + return t.Healthy() +} + +func (t *transmitter) HealthReport() map[string]error { + report := map[string]error{t.Name(): t.Healthy()} + services.CopyHealth(report, t.client.HealthReport()) + return report +} + +func (t *transmitter) Name() string { + return t.lggr.Name() +} diff --git a/rpc/mtls/mtls.go b/rpc/mtls/mtls.go index ccab601..3299698 100644 --- a/rpc/mtls/mtls.go +++ b/rpc/mtls/mtls.go @@ -7,6 +7,8 @@ import ( "crypto/subtle" "crypto/tls" "crypto/x509" + "crypto/x509/pkix" + "encoding/hex" "errors" "fmt" "math/big" @@ -55,8 +57,32 @@ func NewTLSConfig(privKey ed25519.PrivateKey, pubKeys []ed25519.PublicKey) (*tls if err != nil { return nil, err } + c, err := newMutualTLSConfig(priv.key, pubs) - return newMutualTLSConfig(priv.key, pubs) + if err != nil { + return nil, err + } + c.InsecureSkipVerify = true + c.ClientAuth = tls.RequireAnyClientCert + + return c, nil +} + +func NewTLSTransportSigner(signer crypto.Signer, pubKeys []ed25519.PublicKey) (*tls.Config, error) { + pubs, err := ValidPublicKeysFromEd25519(pubKeys...) + if err != nil { + return nil, err + } + + c, err := newMutualTLSConfig(signer, pubs) + c.ClientAuth = tls.RequireAnyClientCert + c.InsecureSkipVerify = true + + if err != nil { + return nil, err + } + + return c, nil } // newMutualTLSConfig uses the private key and public keys to construct a mutual @@ -93,21 +119,58 @@ func newMutualTLSConfig(signer crypto.Signer, pubs *PublicKeys) (*tls.Config, er // Generates a minimal certificate (that wouldn't be considered valid outside of // this networking protocol) from an Ed25519 private key. +// This also sets the organization and organizational unit for the certificate used for User Mapping func newMinimalX509Cert(signer crypto.Signer) (tls.Certificate, error) { + pubKey, ok := signer.Public().(ed25519.PublicKey) + if !ok { + return tls.Certificate{}, fmt.Errorf("invalid public key type") + } + + pubKeyHex := hex.EncodeToString(pubKey) template := x509.Certificate{ SerialNumber: big.NewInt(0), // serial number must be set, so we set it to 0 + Subject: pkix.Name{ + CommonName: pubKeyHex, + Organization: []string{pubKeyHex}, + }, + EmailAddresses: []string{pubKeyHex}, } encodedCert, err := x509.CreateCertificate(rand.Reader, &template, &template, signer.Public(), signer) if err != nil { return tls.Certificate{}, err } - - return tls.Certificate{ + cert := tls.Certificate{ Certificate: [][]byte{encodedCert}, PrivateKey: signer, SupportedSignatureAlgorithms: []tls.SignatureScheme{tls.Ed25519}, - }, nil + } + // err = printCertificateDetails(cert) + // if err != nil { + // return tls.Certificate{}, err + // } + + return cert, nil +} + +func printCertificateDetails(cert tls.Certificate) error { + // Parse the certificate from its DER bytes. + parsedCert, err := x509.ParseCertificate(cert.Certificate[0]) + if err != nil { + return fmt.Errorf("failed to parse certificate: %w", err) + } + + fmt.Println("Certificate:") + fmt.Println(" Data:") + // Print the subject line similar to the OpenSSL output. + fmt.Printf(" Subject: %s\n", parsedCert.Subject.String()) + + // Optionally, print additional fields. + fmt.Printf(" Issuer: %s\n", parsedCert.Issuer.String()) + fmt.Printf(" Validity:\n") + fmt.Printf(" Not Before: %s\n", parsedCert.NotBefore) + fmt.Printf(" Not After : %s\n", parsedCert.NotAfter) + return nil } type PrivateKey struct { From bf82a4ef9f9b1166a04cdfa24dad712b65b73dc7 Mon Sep 17 00:00:00 2001 From: Namik Mesic Date: Fri, 28 Mar 2025 15:12:01 +0100 Subject: [PATCH 2/5] Add example --- pkg/nats/example/keys/client.priv | 1 + pkg/nats/example/keys/client.pub | 1 + pkg/nats/example/keys/client2.priv | 1 + pkg/nats/example/keys/client2.pub | 1 + pkg/nats/example/keys/insecureClient.priv | 1 + pkg/nats/example/keys/insecureClient.pub | 1 + pkg/nats/example/keys/server.priv | 1 + pkg/nats/example/keys/server.pub | 1 + pkg/nats/example/main.go | 239 ++++++++++++++++++++++ 9 files changed, 247 insertions(+) create mode 100644 pkg/nats/example/keys/client.priv create mode 100644 pkg/nats/example/keys/client.pub create mode 100644 pkg/nats/example/keys/client2.priv create mode 100644 pkg/nats/example/keys/client2.pub create mode 100644 pkg/nats/example/keys/insecureClient.priv create mode 100644 pkg/nats/example/keys/insecureClient.pub create mode 100644 pkg/nats/example/keys/server.priv create mode 100644 pkg/nats/example/keys/server.pub create mode 100644 pkg/nats/example/main.go diff --git a/pkg/nats/example/keys/client.priv b/pkg/nats/example/keys/client.priv new file mode 100644 index 0000000..8f02520 --- /dev/null +++ b/pkg/nats/example/keys/client.priv @@ -0,0 +1 @@ +d55fc3eff8a4bc2f3fc23e6fa3e3a2460bc542b184b6e775f645c62dd30c9d185c87bb05698793b0d78b5c2ed93b205166b97f418ed6bee3981ce6554b3dae03 \ No newline at end of file diff --git a/pkg/nats/example/keys/client.pub b/pkg/nats/example/keys/client.pub new file mode 100644 index 0000000..bc0fe2f --- /dev/null +++ b/pkg/nats/example/keys/client.pub @@ -0,0 +1 @@ +5c87bb05698793b0d78b5c2ed93b205166b97f418ed6bee3981ce6554b3dae03 \ No newline at end of file diff --git a/pkg/nats/example/keys/client2.priv b/pkg/nats/example/keys/client2.priv new file mode 100644 index 0000000..47b7215 --- /dev/null +++ b/pkg/nats/example/keys/client2.priv @@ -0,0 +1 @@ +91b84a9c541fbeb1d90f5dee298c0651718fe23dec90b7c3a40b76cbb8a7bb5d19d5560c9db831858fcb43b2306103046a44e189d1eebd1bdeb862b00a3884c3 \ No newline at end of file diff --git a/pkg/nats/example/keys/client2.pub b/pkg/nats/example/keys/client2.pub new file mode 100644 index 0000000..d7f5d91 --- /dev/null +++ b/pkg/nats/example/keys/client2.pub @@ -0,0 +1 @@ +19d5560c9db831858fcb43b2306103046a44e189d1eebd1bdeb862b00a3884c3 \ No newline at end of file diff --git a/pkg/nats/example/keys/insecureClient.priv b/pkg/nats/example/keys/insecureClient.priv new file mode 100644 index 0000000..ad6acc0 --- /dev/null +++ b/pkg/nats/example/keys/insecureClient.priv @@ -0,0 +1 @@ +c696afa354a9745c0889c37f0524fe517f4042ab57695cf280b55c6f6b5d8e407926c9e1f357cb6da70fed3e6c43ff198a8d0facc5c6e5753d5a81bad1b9f93e \ No newline at end of file diff --git a/pkg/nats/example/keys/insecureClient.pub b/pkg/nats/example/keys/insecureClient.pub new file mode 100644 index 0000000..cd18d74 --- /dev/null +++ b/pkg/nats/example/keys/insecureClient.pub @@ -0,0 +1 @@ +7926c9e1f357cb6da70fed3e6c43ff198a8d0facc5c6e5753d5a81bad1b9f93e \ No newline at end of file diff --git a/pkg/nats/example/keys/server.priv b/pkg/nats/example/keys/server.priv new file mode 100644 index 0000000..d908eb0 --- /dev/null +++ b/pkg/nats/example/keys/server.priv @@ -0,0 +1 @@ +e53a730b64a70806f74a2028efd51b53af9aa063c1bc0018f660ddcdc0a0e3d791afcaa6f5f0ba385eda2416056610fee0a420dd9214babaf3973728e68c7c05 \ No newline at end of file diff --git a/pkg/nats/example/keys/server.pub b/pkg/nats/example/keys/server.pub new file mode 100644 index 0000000..58ea0c3 --- /dev/null +++ b/pkg/nats/example/keys/server.pub @@ -0,0 +1 @@ +91afcaa6f5f0ba385eda2416056610fee0a420dd9214babaf3973728e68c7c05 \ No newline at end of file diff --git a/pkg/nats/example/main.go b/pkg/nats/example/main.go new file mode 100644 index 0000000..9cedf21 --- /dev/null +++ b/pkg/nats/example/main.go @@ -0,0 +1,239 @@ +package main + +import ( + "crypto" + "crypto/ed25519" + "crypto/rand" + "encoding/hex" + "fmt" + "log" + "os" + "os/signal" + "path/filepath" + "syscall" + "time" + + "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" + "github.com/smartcontractkit/chainlink-data-streams/rpc/mtls" + // Your custom TLS package +) + +func loadOrGenerateKeys(keysDir, prefix string) (ed25519.PublicKey, ed25519.PrivateKey) { + // Generate or load server keys + serverPubPath := filepath.Join(keysDir, prefix+".pub") + serverPrivPath := filepath.Join(keysDir, prefix+".priv") + + var serverPub ed25519.PublicKey + var serverPriv ed25519.PrivateKey + + // Try to load existing keys + pubBytes, err := os.ReadFile(serverPubPath) + if err == nil { + privBytes, err := os.ReadFile(serverPrivPath) + if err == nil { + // Decode existing keys + pubBytes, err = hex.DecodeString(string(pubBytes)) + if err != nil { + log.Fatalf("Error decoding server public key: %v", err) + } + privBytes, err = hex.DecodeString(string(privBytes)) + if err != nil { + log.Fatalf("Error decoding server private key: %v", err) + } + serverPub = ed25519.PublicKey(pubBytes) + serverPriv = ed25519.PrivateKey(privBytes) + } + } + + // Generate new keys if they don't exist + if serverPub == nil || serverPriv == nil { + var err error + serverPub, serverPriv, err = ed25519.GenerateKey(rand.Reader) + if err != nil { + log.Fatalf("Error generating server keys: %v", err) + } + + // Save new keys + err = os.WriteFile(serverPubPath, []byte(hex.EncodeToString(serverPub)), 0644) + if err != nil { + log.Fatalf("Error saving server public key: %v", err) + } + err = os.WriteFile(serverPrivPath, []byte(hex.EncodeToString(serverPriv)), 0600) + if err != nil { + log.Fatalf("Error saving server private key: %v", err) + } + } + + return serverPub, serverPriv +} + +func startServer(opts *server.Options) (*server.Server, error) { + // Create a new NATS server + ns, err := server.NewServer(opts) + if err != nil { + return nil, fmt.Errorf("error creating server: %w", err) + } + + // Start the server in a goroutine + go func() { + ns.Start() + }() + + // Wait for the server to be ready + if !ns.ReadyForConnections(4 * time.Second) { + return nil, fmt.Errorf("NATS server failed to start") + } + + log.Printf("NATS server is running on %s", ns.ClientURL()) + return ns, nil +} + +func startClientandSendHello(clientName string, clientPriv ed25519.PrivateKey, serverPubKeys []ed25519.PublicKey, serverURL string) { + // Set up client TLS config + clientTLSSigner := crypto.Signer(clientPriv) + clientTLSConfig, err := mtls.NewTLSTransportSigner(clientTLSSigner, serverPubKeys) + if err != nil { + log.Fatalf("Client %s: Error creating TLS config: %v", clientName, err) + } + + natsOpts := []nats.Option{ + nats.Secure(clientTLSConfig), + nats.ReconnectWait(500 * time.Millisecond), + nats.Compression(true), + nats.MaxReconnects(-1), + nats.TLSHandshakeFirst(), + nats.FlusherTimeout(1 * time.Second), + nats.PingInterval(1 * time.Second), + nats.Timeout(2 * time.Second), + } + nc, err := nats.Connect(serverURL, natsOpts...) + if err != nil { + log.Fatalf("Client %s: Error connecting to NATS: %v", clientName, err) + } + defer nc.Close() + + // Subscribe to the test subject + sub, err := nc.Subscribe("test", func(msg *nats.Msg) { + log.Printf("Client %s received: %s", clientName, string(msg.Data)) + }) + if err != nil { + log.Fatalf("Client %s: Error subscribing: %v", clientName, err) + } + defer sub.Unsubscribe() + + // Send a message + err = nc.Publish("test", []byte(fmt.Sprintf("Hello world from %s", clientName))) + if err != nil { + log.Fatalf("Client %s: Error publishing message: %v", clientName, err) + } + + log.Printf("Client %s: Message published successfully", clientName) + + // Keep the connection alive + defer nc.Close() +} + +func main() { + serverPub, serverPriv := loadOrGenerateKeys("keys", "server") + log.Printf("Server public key: %x", serverPub) + log.Printf("Server private key: %x", serverPriv) + + client1Pub, client1Priv := loadOrGenerateKeys("keys", "client") + log.Printf("Client public key: %x", client1Pub) + log.Printf("Client private key: %x", client1Priv) + + client2Pub, client2Priv := loadOrGenerateKeys("keys", "client2") + log.Printf("Client2 public key: %x", client2Pub) + log.Printf("Client2 private key: %x", client2Priv) + + insecureClientPub, insecureClientPriv := loadOrGenerateKeys("keys", "insecureClient") + log.Printf("Insecure client public key: %x", insecureClientPub) + log.Printf("Insecure client private key: %x", insecureClientPriv) + + fmt.Printf("Client1 username (CN): %s\n", hex.EncodeToString(client1Pub)) + fmt.Printf("Client2 username (CN): %s\n", hex.EncodeToString(client2Pub)) + + clientPubKeys := []ed25519.PublicKey{client1Pub, client2Pub} + + serverTLSConfig, err := mtls.NewTLSConfig(serverPriv, clientPubKeys) + for _, key := range clientPubKeys { + log.Printf("Client public key: %x", key) + } + + if err != nil { + log.Fatalf("Error creating TLS config: %v", err) + } + + // Create a new NATS server options + opts := &server.Options{ + Host: "0.0.0.0", // Listen on all interfaces since it's public + Port: 4222, + NoLog: false, // Keep logs for monitoring + NoSigs: true, // Disable signal handling since we handle it ourselves + Logtime: true, // Include timestamps in logs + Debug: false, // Disable debug mode in production + Trace: false, // Disable trace mode in production + TLSConfig: serverTLSConfig, + TLSHandshakeFirst: true, // Require TLS handshake before any other communication + AllowNonTLS: false, // Only allow TLS connections + TLSMap: true, // Enable TLS certificate mapping + // Connection limits and DDoS protection + MaxConn: 1000, // Limit total connections + MaxSubs: 100, // Limit subscriptions per connection + MaxControlLine: 4096, // Limit control line size (4KB) + MaxPayload: 512 * 1024, // 512KB max payload for reports + MaxPending: 1024 * 1024 * 2, // 2MB total pending messages + MaxClosedClients: 1000, // Keep track of closed clients + // Rate limiting + WriteDeadline: 1 * time.Second, // Timeout for write operations + // Connection timeouts + AuthTimeout: 2.0, // 2 seconds for auth + TLSTimeout: 2.0, // 2 seconds for TLS handshake + PingInterval: 2 * time.Second, + MaxPingsOut: 3, // Disconnect after 3 missed pings + // Security hardening + NoHeaderSupport: true, // Disable header support for simpler protocol + NoFastProducerStall: true, // Prevent fast producer stall + // Graceful shutdown + LameDuckDuration: 30 * time.Second, + LameDuckGracePeriod: 10 * time.Second, + // Users with permissions + Users: []*server.User{ + { + Username: fmt.Sprintf("OU=%s,O=%s", hex.EncodeToString(client1Pub), hex.EncodeToString(client1Pub)), + Permissions: &server.Permissions{ + Publish: &server.SubjectPermission{Allow: []string{"test.*"}}, + Subscribe: &server.SubjectPermission{Allow: []string{"test.*"}}, + }, + }, + { + Username: fmt.Sprintf("OU=%s,O=%s", hex.EncodeToString(client2Pub), hex.EncodeToString(client2Pub)), + Permissions: &server.Permissions{ + Publish: &server.SubjectPermission{Allow: []string{"else.*"}}, + Subscribe: &server.SubjectPermission{Allow: []string{"else.*"}}, + }, + }, + }, + } + // Start the server + ns, err := startServer(opts) + if err != nil { + log.Fatalf("Failed to start server: %v", err) + } + + serverPubKeys := []ed25519.PublicKey{serverPub} + + // Start client in a goroutine + startClientandSendHello("client1", client1Priv, serverPubKeys, ns.ClientURL()) + startClientandSendHello("client2", client2Priv, serverPubKeys, ns.ClientURL()) + startClientandSendHello("insecureClient", insecureClientPriv, serverPubKeys, ns.ClientURL()) + + // Handle graceful shutdown + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + + <-sigCh + log.Println("\nShutting down...") + ns.Shutdown() +} From d8e16a9e8b0effb2a9bf952282efcc21e279444b Mon Sep 17 00:00:00 2001 From: Namik Mesic Date: Fri, 28 Mar 2025 19:48:52 +0100 Subject: [PATCH 3/5] Writing tests --- go.mod | 2 +- pkg/nats/client.go | 2 +- pkg/nats/client_test.go | 200 ++++++++++++++++++++++ pkg/nats/config.go | 26 +++ pkg/nats/example/keys/client.priv | 1 - pkg/nats/example/keys/client.pub | 1 - pkg/nats/example/keys/client2.priv | 1 - pkg/nats/example/keys/client2.pub | 1 - pkg/nats/example/keys/insecureClient.priv | 1 - pkg/nats/example/keys/insecureClient.pub | 1 - pkg/nats/example/keys/server.priv | 1 - pkg/nats/example/keys/server.pub | 1 - pkg/nats/example/main.go | 128 ++++++-------- rpc/mtls/mtls.go | 48 ++---- 14 files changed, 298 insertions(+), 116 deletions(-) create mode 100644 pkg/nats/client_test.go delete mode 100644 pkg/nats/example/keys/client.priv delete mode 100644 pkg/nats/example/keys/client.pub delete mode 100644 pkg/nats/example/keys/client2.priv delete mode 100644 pkg/nats/example/keys/client2.pub delete mode 100644 pkg/nats/example/keys/insecureClient.priv delete mode 100644 pkg/nats/example/keys/insecureClient.pub delete mode 100644 pkg/nats/example/keys/server.priv delete mode 100644 pkg/nats/example/keys/server.pub diff --git a/go.mod b/go.mod index 7bdd5b0..d4bc73e 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.24 toolchain go1.24.0 require ( + github.com/cespare/xxhash/v2 v2.3.0 github.com/ethereum/go-ethereum v1.15.3 github.com/hashicorp/go-plugin v1.6.3 github.com/leanovate/gopter v0.2.11 @@ -28,7 +29,6 @@ require ( github.com/bits-and-blooms/bitset v1.17.0 // indirect github.com/buger/jsonparser v1.1.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect - github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/consensys/bavard v0.1.22 // indirect github.com/consensys/gnark-crypto v0.14.0 // indirect github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a // indirect diff --git a/pkg/nats/client.go b/pkg/nats/client.go index 9bab553..b77a8e3 100644 --- a/pkg/nats/client.go +++ b/pkg/nats/client.go @@ -137,7 +137,7 @@ func (c *client) close() error { } func (c *client) Transmit(ctx context.Context, payload []byte, dedupKey string, donID uint32) error { - subject := fmt.Sprintf("%s.%s.%s", donID, c.getClientPubKeyHex()) + subject := fmt.Sprintf("%d.%s", donID, c.getClientPubKeyHex()) pubOpts := []nats.PubOpt{ nats.MsgId(dedupKey), diff --git a/pkg/nats/client_test.go b/pkg/nats/client_test.go new file mode 100644 index 0000000..0769e11 --- /dev/null +++ b/pkg/nats/client_test.go @@ -0,0 +1,200 @@ +package nats + +import ( + "crypto/ed25519" + "crypto/rand" + "encoding/hex" + "fmt" + "testing" + "time" + + "github.com/nats-io/nats-server/v2/server" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" + "github.com/smartcontractkit/chainlink-data-streams/rpc/mtls" + "github.com/stretchr/testify/require" +) + +func TestClient_Connect(t *testing.T) { + // Generate server and client keypairs + serverPub, serverPriv, _ := ed25519.GenerateKey(rand.Reader) + clientPub, clientPriv, _ := ed25519.GenerateKey(nil) + clientPubHex := hex.EncodeToString(clientPub) + + serverTLSConfig, err := mtls.NewTLSConfig(serverPriv, []ed25519.PublicKey{clientPub}) + require.NoError(t, err) + + opts := &server.Options{ + Host: "0.0.0.0", // Listen on all interfaces since it's public + Port: 4222, + NoAuthUser: "", + NoLog: false, // Keep logs for monitoring + NoSigs: true, // Disable signal handling since we handle it ourselves + Logtime: true, // Include timestamps in logs + Debug: false, // Disable debug mode in production + Trace: false, // Disable trace mode in production + TLSConfig: serverTLSConfig, + TLSHandshakeFirst: true, // Require TLS handshake before any other communication + AllowNonTLS: false, // Only allow TLS connections + TLSMap: true, // Enable TLS certificate mapping + // Connection limits and DDoS protection + MaxConn: 1000, // Limit total connections + MaxSubs: 100, // Limit subscriptions per connection + MaxControlLine: 4096, // Limit control line size (4KB) + MaxPayload: 512 * 1024, // 512KB max payload for reports + MaxPending: 1024 * 1024 * 2, // 2MB total pending messages + MaxClosedClients: 1000, // Keep track of closed clients + // Rate limiting + WriteDeadline: 1 * time.Second, // Timeout for write operations + // Connection timeouts + AuthTimeout: 2.0, // 2 seconds for auth + TLSTimeout: 2.0, // 2 seconds for TLS handshake + PingInterval: 2 * time.Second, + MaxPingsOut: 3, // Disconnect after 3 missed pings + // Security hardening + NoHeaderSupport: false, // Disable header support for simpler protocol + NoFastProducerStall: true, // Prevent fast producer stall + // Graceful shutdown + LameDuckDuration: 30 * time.Second, + LameDuckGracePeriod: 10 * time.Second, + // Define users with specific permissions + Users: []*server.User{ + // User 1 with restricted permissions + { + Username: fmt.Sprintf("CN=%s,OU=%s,O=Chainlink Data Streams", clientPubHex[:32], clientPubHex), + Permissions: &server.Permissions{ + Publish: &server.SubjectPermission{ + Allow: []string{"test.*"}, // Allows test.anything + }, + Subscribe: &server.SubjectPermission{ + Allow: []string{"test.*"}, // Allows test.anything + }, + }, + }, + // Insecure client with no permissions + }, + } + + // Start the server + ns, err := server.NewServer(opts) + + require.NoError(t, err) + ns.Start() + defer ns.Shutdown() + + // Wait for server to be ready + for i := 0; i < 10; i++ { + if ns.ReadyForConnections(1 * time.Second) { + break + } + if i == 9 { + t.Fatal("NATS server did not start in time") + } + time.Sleep(100 * time.Millisecond) + } + + serverURL := fmt.Sprintf("tls://localhost:4222") + + testCases := []struct { + name string + clientSigner ed25519.PrivateKey + serverPubKey ed25519.PublicKey + serverURLs []string + expectSuccess bool + errorContains string + }{ + { + name: "successful connection", + clientSigner: clientPriv, + serverPubKey: serverPub, + serverURLs: []string{serverURL}, + expectSuccess: true, + }, + { + name: "invalid server URL", + clientSigner: clientPriv, + serverPubKey: serverPub, + serverURLs: []string{"tls://invalid:9999"}, + expectSuccess: false, + errorContains: "failed to create NATS connection", + }, + { + name: "wrong server public key", + clientSigner: clientPriv, + serverPubKey: make([]byte, ed25519.PublicKeySize), // Invalid key + serverURLs: []string{serverURL}, + expectSuccess: false, + errorContains: "failed to create client mTLS credentials", + }, + } + + clientOptions := ClientOpts{ + Logger: logger.Test(t), + ClientSigner: clientPriv, + ServerPubKey: serverPub, + ServerURLs: []string{serverURL}, + } + + err = clientOptions.verifyConfig() + require.NoError(t, err) + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + + c, err := NewClient(clientOptions) + + servicetest.Run(t, c) + require.NoError(t, err) + + if tc.expectSuccess { + require.NoError(t, err) + + // Clean up connection + err = c.Close() + require.NoError(t, err) + } + }) + } +} + +// // Helper to create a test NATS server with mTLS support +// func createTestServer(t *testing.T, allowedClientCerts []string) (*server.Server, string) { +// opts := &server.Options{ +// Host: "127.0.0.1", +// Port: -1, // Use a random port +// NoLog: true, +// NoSigs: true, +// TLS: true, +// TLSVerify: true, +// TLSMap: true, +// TLSTimeout: 5, +// TLSPinnedCerts: makePinnedCertSet(allowedClientCerts), +// } + +// s, err := server.NewServer(opts) +// require.NoError(t, err) +// go s.Start() + +// // Wait for server to be ready +// for i := 0; i < 10; i++ { +// if s.ReadyForConnections(1 * time.Second) { +// break +// } +// if i == 9 { +// t.Fatal("NATS server did not start in time") +// } +// time.Sleep(100 * time.Millisecond) +// } + +// serverURL := fmt.Sprintf("tls://%s:%d", opts.Host, s.ClusterAddr().Port) +// return s, serverURL +// } + +// // Helper function to convert string slice to PinnedCertSet +// func makePinnedCertSet(certs []string) server.PinnedCertSet { +// set := server.PinnedCertSet{} +// for _, cert := range certs { +// set[cert] = struct{}{} +// } +// return set +// } diff --git a/pkg/nats/config.go b/pkg/nats/config.go index 64aac36..535f571 100644 --- a/pkg/nats/config.go +++ b/pkg/nats/config.go @@ -38,3 +38,29 @@ func (c *ClientOpts) verifyConfig() error { return nil } + +type ServerOpts struct { + Logger logger.Logger + ServerSigner crypto.Signer + clientPubKeys []ed25519.PublicKey +} + +func (s *ServerOpts) verifyConfig() error { + var errs []error + + if s.Logger == nil { + errs = append(errs, fmt.Errorf("logger is required for NATS server")) + } + if s.ServerSigner == nil { + errs = append(errs, fmt.Errorf("server signer is required for NATS server")) + } + if len(s.clientPubKeys) == 0 { + errs = append(errs, fmt.Errorf("at least one client public key is required for NATS server")) + } + + if len(errs) > 0 { + return fmt.Errorf("invalid NATS server configuration: %v", errs) + } + + return nil +} diff --git a/pkg/nats/example/keys/client.priv b/pkg/nats/example/keys/client.priv deleted file mode 100644 index 8f02520..0000000 --- a/pkg/nats/example/keys/client.priv +++ /dev/null @@ -1 +0,0 @@ -d55fc3eff8a4bc2f3fc23e6fa3e3a2460bc542b184b6e775f645c62dd30c9d185c87bb05698793b0d78b5c2ed93b205166b97f418ed6bee3981ce6554b3dae03 \ No newline at end of file diff --git a/pkg/nats/example/keys/client.pub b/pkg/nats/example/keys/client.pub deleted file mode 100644 index bc0fe2f..0000000 --- a/pkg/nats/example/keys/client.pub +++ /dev/null @@ -1 +0,0 @@ -5c87bb05698793b0d78b5c2ed93b205166b97f418ed6bee3981ce6554b3dae03 \ No newline at end of file diff --git a/pkg/nats/example/keys/client2.priv b/pkg/nats/example/keys/client2.priv deleted file mode 100644 index 47b7215..0000000 --- a/pkg/nats/example/keys/client2.priv +++ /dev/null @@ -1 +0,0 @@ -91b84a9c541fbeb1d90f5dee298c0651718fe23dec90b7c3a40b76cbb8a7bb5d19d5560c9db831858fcb43b2306103046a44e189d1eebd1bdeb862b00a3884c3 \ No newline at end of file diff --git a/pkg/nats/example/keys/client2.pub b/pkg/nats/example/keys/client2.pub deleted file mode 100644 index d7f5d91..0000000 --- a/pkg/nats/example/keys/client2.pub +++ /dev/null @@ -1 +0,0 @@ -19d5560c9db831858fcb43b2306103046a44e189d1eebd1bdeb862b00a3884c3 \ No newline at end of file diff --git a/pkg/nats/example/keys/insecureClient.priv b/pkg/nats/example/keys/insecureClient.priv deleted file mode 100644 index ad6acc0..0000000 --- a/pkg/nats/example/keys/insecureClient.priv +++ /dev/null @@ -1 +0,0 @@ -c696afa354a9745c0889c37f0524fe517f4042ab57695cf280b55c6f6b5d8e407926c9e1f357cb6da70fed3e6c43ff198a8d0facc5c6e5753d5a81bad1b9f93e \ No newline at end of file diff --git a/pkg/nats/example/keys/insecureClient.pub b/pkg/nats/example/keys/insecureClient.pub deleted file mode 100644 index cd18d74..0000000 --- a/pkg/nats/example/keys/insecureClient.pub +++ /dev/null @@ -1 +0,0 @@ -7926c9e1f357cb6da70fed3e6c43ff198a8d0facc5c6e5753d5a81bad1b9f93e \ No newline at end of file diff --git a/pkg/nats/example/keys/server.priv b/pkg/nats/example/keys/server.priv deleted file mode 100644 index d908eb0..0000000 --- a/pkg/nats/example/keys/server.priv +++ /dev/null @@ -1 +0,0 @@ -e53a730b64a70806f74a2028efd51b53af9aa063c1bc0018f660ddcdc0a0e3d791afcaa6f5f0ba385eda2416056610fee0a420dd9214babaf3973728e68c7c05 \ No newline at end of file diff --git a/pkg/nats/example/keys/server.pub b/pkg/nats/example/keys/server.pub deleted file mode 100644 index 58ea0c3..0000000 --- a/pkg/nats/example/keys/server.pub +++ /dev/null @@ -1 +0,0 @@ -91afcaa6f5f0ba385eda2416056610fee0a420dd9214babaf3973728e68c7c05 \ No newline at end of file diff --git a/pkg/nats/example/main.go b/pkg/nats/example/main.go index 9cedf21..d923646 100644 --- a/pkg/nats/example/main.go +++ b/pkg/nats/example/main.go @@ -9,7 +9,6 @@ import ( "log" "os" "os/signal" - "path/filepath" "syscall" "time" @@ -19,55 +18,6 @@ import ( // Your custom TLS package ) -func loadOrGenerateKeys(keysDir, prefix string) (ed25519.PublicKey, ed25519.PrivateKey) { - // Generate or load server keys - serverPubPath := filepath.Join(keysDir, prefix+".pub") - serverPrivPath := filepath.Join(keysDir, prefix+".priv") - - var serverPub ed25519.PublicKey - var serverPriv ed25519.PrivateKey - - // Try to load existing keys - pubBytes, err := os.ReadFile(serverPubPath) - if err == nil { - privBytes, err := os.ReadFile(serverPrivPath) - if err == nil { - // Decode existing keys - pubBytes, err = hex.DecodeString(string(pubBytes)) - if err != nil { - log.Fatalf("Error decoding server public key: %v", err) - } - privBytes, err = hex.DecodeString(string(privBytes)) - if err != nil { - log.Fatalf("Error decoding server private key: %v", err) - } - serverPub = ed25519.PublicKey(pubBytes) - serverPriv = ed25519.PrivateKey(privBytes) - } - } - - // Generate new keys if they don't exist - if serverPub == nil || serverPriv == nil { - var err error - serverPub, serverPriv, err = ed25519.GenerateKey(rand.Reader) - if err != nil { - log.Fatalf("Error generating server keys: %v", err) - } - - // Save new keys - err = os.WriteFile(serverPubPath, []byte(hex.EncodeToString(serverPub)), 0644) - if err != nil { - log.Fatalf("Error saving server public key: %v", err) - } - err = os.WriteFile(serverPrivPath, []byte(hex.EncodeToString(serverPriv)), 0600) - if err != nil { - log.Fatalf("Error saving server private key: %v", err) - } - } - - return serverPub, serverPriv -} - func startServer(opts *server.Options) (*server.Server, error) { // Create a new NATS server ns, err := server.NewServer(opts) @@ -106,6 +56,10 @@ func startClientandSendHello(clientName string, clientPriv ed25519.PrivateKey, s nats.FlusherTimeout(1 * time.Second), nats.PingInterval(1 * time.Second), nats.Timeout(2 * time.Second), + nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) { + log.Printf("Client %s: Error: %v", clientName, err) + }), + nats.Name(clientName), } nc, err := nats.Connect(serverURL, natsOpts...) if err != nil { @@ -114,7 +68,7 @@ func startClientandSendHello(clientName string, clientPriv ed25519.PrivateKey, s defer nc.Close() // Subscribe to the test subject - sub, err := nc.Subscribe("test", func(msg *nats.Msg) { + sub, err := nc.Subscribe("test.*", func(msg *nats.Msg) { log.Printf("Client %s received: %s", clientName, string(msg.Data)) }) if err != nil { @@ -122,37 +76,33 @@ func startClientandSendHello(clientName string, clientPriv ed25519.PrivateKey, s } defer sub.Unsubscribe() - // Send a message - err = nc.Publish("test", []byte(fmt.Sprintf("Hello world from %s", clientName))) + // Publish a message + err = nc.Publish("test.*", []byte(fmt.Sprintf("Hello world from %s", clientName))) if err != nil { log.Fatalf("Client %s: Error publishing message: %v", clientName, err) } - log.Printf("Client %s: Message published successfully", clientName) + // Ensure the publish message is sent to the server. + if err := nc.Flush(); err != nil { + log.Fatalf("Client %s: Error flushing connection: %v", clientName, err) + } - // Keep the connection alive - defer nc.Close() + // Wait to ensure the message is received before closing. + time.Sleep(2 * time.Second) } func main() { - serverPub, serverPriv := loadOrGenerateKeys("keys", "server") + serverPub, serverPriv, _ := ed25519.GenerateKey(rand.Reader) log.Printf("Server public key: %x", serverPub) - log.Printf("Server private key: %x", serverPriv) - client1Pub, client1Priv := loadOrGenerateKeys("keys", "client") - log.Printf("Client public key: %x", client1Pub) - log.Printf("Client private key: %x", client1Priv) + client1Pub, client1Priv, _ := ed25519.GenerateKey(rand.Reader) + log.Printf("Client1 public key: %x", client1Pub) - client2Pub, client2Priv := loadOrGenerateKeys("keys", "client2") + client2Pub, client2Priv, _ := ed25519.GenerateKey(rand.Reader) log.Printf("Client2 public key: %x", client2Pub) - log.Printf("Client2 private key: %x", client2Priv) - insecureClientPub, insecureClientPriv := loadOrGenerateKeys("keys", "insecureClient") + insecureClientPub, insecureClientPriv, _ := ed25519.GenerateKey(rand.Reader) log.Printf("Insecure client public key: %x", insecureClientPub) - log.Printf("Insecure client private key: %x", insecureClientPriv) - - fmt.Printf("Client1 username (CN): %s\n", hex.EncodeToString(client1Pub)) - fmt.Printf("Client2 username (CN): %s\n", hex.EncodeToString(client2Pub)) clientPubKeys := []ed25519.PublicKey{client1Pub, client2Pub} @@ -165,10 +115,18 @@ func main() { log.Fatalf("Error creating TLS config: %v", err) } + // Options block for nats-server. + // NOTE: This structure is no longer used for monitoring endpoints + // and json tags are deprecated and may be removed in the future. + // Create an embedded NATS server with least privilege permissions + + client1PubHex := hex.EncodeToString(client1Pub) + client2PubHex := hex.EncodeToString(client2Pub) // Create a new NATS server options opts := &server.Options{ Host: "0.0.0.0", // Listen on all interfaces since it's public Port: 4222, + NoAuthUser: "", NoLog: false, // Keep logs for monitoring NoSigs: true, // Disable signal handling since we handle it ourselves Logtime: true, // Include timestamps in logs @@ -193,29 +151,45 @@ func main() { PingInterval: 2 * time.Second, MaxPingsOut: 3, // Disconnect after 3 missed pings // Security hardening - NoHeaderSupport: true, // Disable header support for simpler protocol - NoFastProducerStall: true, // Prevent fast producer stall + NoHeaderSupport: false, // Disable header support for simpler protocol + NoFastProducerStall: true, // Prevent fast producer stall // Graceful shutdown LameDuckDuration: 30 * time.Second, LameDuckGracePeriod: 10 * time.Second, - // Users with permissions + // Define users with specific permissions Users: []*server.User{ + // User 1 with restricted permissions { - Username: fmt.Sprintf("OU=%s,O=%s", hex.EncodeToString(client1Pub), hex.EncodeToString(client1Pub)), + Username: fmt.Sprintf("CN=%s,OU=%s,O=Chainlink Data Streams", client1PubHex[:32], client1PubHex), Permissions: &server.Permissions{ - Publish: &server.SubjectPermission{Allow: []string{"test.*"}}, - Subscribe: &server.SubjectPermission{Allow: []string{"test.*"}}, + Publish: &server.SubjectPermission{ + Allow: []string{"test.*"}, // Allows test.anything + }, + Subscribe: &server.SubjectPermission{ + Allow: []string{"test.*"}, // Allows test.anything + }, }, }, + + // User 2 with different permissions { - Username: fmt.Sprintf("OU=%s,O=%s", hex.EncodeToString(client2Pub), hex.EncodeToString(client2Pub)), + Username: fmt.Sprintf("CN=%s,OU=%s,O=Chainlink Data Streams", client2PubHex[:32], client2PubHex), Permissions: &server.Permissions{ - Publish: &server.SubjectPermission{Allow: []string{"else.*"}}, - Subscribe: &server.SubjectPermission{Allow: []string{"else.*"}}, + Publish: &server.SubjectPermission{ + Allow: []string{"other"}, + Deny: []string{"service.admin.>", "service.internal.>"}, + }, + Subscribe: &server.SubjectPermission{ + Allow: []string{"other", "other"}, + Deny: []string{"service.admin.>", "service.internal.>"}, + }, }, }, + + // Insecure client with no permissions }, } + // Start the server ns, err := startServer(opts) if err != nil { diff --git a/rpc/mtls/mtls.go b/rpc/mtls/mtls.go index 3299698..af30b78 100644 --- a/rpc/mtls/mtls.go +++ b/rpc/mtls/mtls.go @@ -13,6 +13,7 @@ import ( "fmt" "math/big" "sync" + "time" "google.golang.org/grpc/credentials" ) @@ -127,13 +128,27 @@ func newMinimalX509Cert(signer crypto.Signer) (tls.Certificate, error) { } pubKeyHex := hex.EncodeToString(pubKey) + + // Generate a random serial number. + serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128) + serialNumber, err := rand.Int(rand.Reader, serialNumberLimit) + if err != nil { + return tls.Certificate{}, fmt.Errorf("failed to generate serial number: %v", err) + } + + now := time.Now() + // Set certificate validity (e.g., valid for 24 hours) template := x509.Certificate{ - SerialNumber: big.NewInt(0), // serial number must be set, so we set it to 0 + SerialNumber: serialNumber, Subject: pkix.Name{ - CommonName: pubKeyHex, - Organization: []string{pubKeyHex}, + CommonName: pubKeyHex[:32], + Organization: []string{"Chainlink Data Streams"}, + OrganizationalUnit: []string{pubKeyHex}, }, - EmailAddresses: []string{pubKeyHex}, + NotBefore: now, + KeyUsage: x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth}, + BasicConstraintsValid: true, } encodedCert, err := x509.CreateCertificate(rand.Reader, &template, &template, signer.Public(), signer) @@ -145,34 +160,9 @@ func newMinimalX509Cert(signer crypto.Signer) (tls.Certificate, error) { PrivateKey: signer, SupportedSignatureAlgorithms: []tls.SignatureScheme{tls.Ed25519}, } - // err = printCertificateDetails(cert) - // if err != nil { - // return tls.Certificate{}, err - // } - return cert, nil } -func printCertificateDetails(cert tls.Certificate) error { - // Parse the certificate from its DER bytes. - parsedCert, err := x509.ParseCertificate(cert.Certificate[0]) - if err != nil { - return fmt.Errorf("failed to parse certificate: %w", err) - } - - fmt.Println("Certificate:") - fmt.Println(" Data:") - // Print the subject line similar to the OpenSSL output. - fmt.Printf(" Subject: %s\n", parsedCert.Subject.String()) - - // Optionally, print additional fields. - fmt.Printf(" Issuer: %s\n", parsedCert.Issuer.String()) - fmt.Printf(" Validity:\n") - fmt.Printf(" Not Before: %s\n", parsedCert.NotBefore) - fmt.Printf(" Not After : %s\n", parsedCert.NotAfter) - return nil -} - type PrivateKey struct { key ed25519.PrivateKey } From d6f7821c031d8ee1eaa860d4224242fffa62e9dd Mon Sep 17 00:00:00 2001 From: Namik Mesic Date: Fri, 28 Mar 2025 21:40:19 +0100 Subject: [PATCH 4/5] Passing tests --- pkg/nats/client_test.go | 4 ---- pkg/nats/config.go | 26 -------------------------- 2 files changed, 30 deletions(-) diff --git a/pkg/nats/client_test.go b/pkg/nats/client_test.go index 0769e11..f33c30a 100644 --- a/pkg/nats/client_test.go +++ b/pkg/nats/client_test.go @@ -148,10 +148,6 @@ func TestClient_Connect(t *testing.T) { if tc.expectSuccess { require.NoError(t, err) - - // Clean up connection - err = c.Close() - require.NoError(t, err) } }) } diff --git a/pkg/nats/config.go b/pkg/nats/config.go index 535f571..64aac36 100644 --- a/pkg/nats/config.go +++ b/pkg/nats/config.go @@ -38,29 +38,3 @@ func (c *ClientOpts) verifyConfig() error { return nil } - -type ServerOpts struct { - Logger logger.Logger - ServerSigner crypto.Signer - clientPubKeys []ed25519.PublicKey -} - -func (s *ServerOpts) verifyConfig() error { - var errs []error - - if s.Logger == nil { - errs = append(errs, fmt.Errorf("logger is required for NATS server")) - } - if s.ServerSigner == nil { - errs = append(errs, fmt.Errorf("server signer is required for NATS server")) - } - if len(s.clientPubKeys) == 0 { - errs = append(errs, fmt.Errorf("at least one client public key is required for NATS server")) - } - - if len(errs) > 0 { - return fmt.Errorf("invalid NATS server configuration: %v", errs) - } - - return nil -} From 53dbe723c89af4da13d9209d05b9fbfadc65e1b1 Mon Sep 17 00:00:00 2001 From: Namik Mesic Date: Fri, 28 Mar 2025 21:40:32 +0100 Subject: [PATCH 5/5] Server implementation --- pkg/nats/server.go | 267 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 267 insertions(+) create mode 100644 pkg/nats/server.go diff --git a/pkg/nats/server.go b/pkg/nats/server.go new file mode 100644 index 0000000..aa27469 --- /dev/null +++ b/pkg/nats/server.go @@ -0,0 +1,267 @@ +package nats + +import ( + "context" + "crypto" + "crypto/ed25519" + "fmt" + "time" + + natssrv "github.com/nats-io/nats-server/v2/server" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-data-streams/rpc/mtls" +) + +// Server is the interface for a NATS server service, mirroring the pattern used by the client. +type Server interface { + services.Service + + // URL returns the server connect URL(s). Useful if you want to pass the + // string to clients so they know how to connect. + URL() []string +} + +// Make sure *serverImpl implements Server. +var _ Server = (*serverImpl)(nil) + +// serverImpl is the concrete implementation that holds the server instance and configuration. +type serverImpl struct { + services.Service + + lggr logger.Logger + opts ServerOpts + srv *natssrv.Server + urls []string +} + +// ServerOpts is the set of options required to stand up a NATS Server with mTLS. +type ServerOpts struct { + // Logging + Logger logger.Logger + + // The server’s private key. Must be an ed25519 key, used to prove identity to clients. + ServerPrivKey ed25519.PrivateKey + + // Which client public keys are allowed to connect via mTLS. Typically includes + // the set of ed25519.PublicKey(s) for the clients you'd like to authorize. + AllowedClientPubKeys []ed25519.PublicKey + + // Where the server should listen (e.g. "0.0.0.0" or "127.0.0.1") + Host string + + // The TCP port on which NATS server will listen (e.g. 4222). + Port int + + // (Optional) Custom NATS permissions. If empty, this example code + // defaults to some basic subject permissions. + AllowedPublish []string + AllowedSubscribe []string +} + +// NewServer constructs a new NATS Server service but does not start it. It mirrors +// the client pattern: we build a *serverImpl, then wrap it in a ServiceEngine so +// we get Start/Close/Health checks. +func NewServer(opts ServerOpts) (Server, error) { + if err := verifyServerOpts(opts); err != nil { + return nil, fmt.Errorf("invalid server configuration: %w", err) + } + + s := &serverImpl{ + lggr: opts.Logger, + opts: opts, + } + + // Create a service lifecycle engine, using the same pattern as client.go. + svc, err := services.Config{ + Name: "NATSServer", + Start: s.start, + Close: s.close, + }.NewServiceEngine(s.lggr) + if err != nil { + return nil, fmt.Errorf("failed to create NATSServer service engine: %w", err) + } + s.Service = svc + + return s, nil +} + +// verifyServerOpts does basic checks on your server configuration to +// avoid returning an incorrectly configured server. +func verifyServerOpts(opts ServerOpts) error { + if opts.Logger == nil { + return fmt.Errorf("logger must not be nil") + } + if opts.ServerPrivKey == nil { + return fmt.Errorf("server private key is required") + } + if len(opts.AllowedClientPubKeys) == 0 { + return fmt.Errorf("at least one allowed client public key is required") + } + if opts.Host == "" { + return fmt.Errorf("host must not be empty") + } + if opts.Port <= 0 { + return fmt.Errorf("port must be > 0") + } + return nil +} + +// start spins up the embedded NATS server with an mTLS config and begins listening. +func (s *serverImpl) start(ctx context.Context) error { + // Build the server TLSConfig from the server’s private key & allowed client pub keys. + serverTLSConfig, err := mtls.NewTLSConfig(s.opts.ServerPrivKey, s.opts.AllowedClientPubKeys) + if err != nil { + return fmt.Errorf("failed to create server TLS config: %w", err) + } + + // Derive a default set of allowed subjects for testing or production usage. + // If you'd like more advanced mapping, you can dynamically build natssrv.Users + // with specific Permissions, etc. + pubAllows := s.opts.AllowedPublish + if len(pubAllows) == 0 { + pubAllows = []string{"test.*"} + } + subAllows := s.opts.AllowedSubscribe + if len(subAllows) == 0 { + subAllows = []string{"test.*"} + } + + // For each allowed client pubkey, build a user stanza with the same pattern + // from your test code. This approach uses NATS TLSMap to tie the subject to + // the client's certificate identity, so the client’s CN is typically: + // "CN=,OU=,O=Chainlink Data Streams" + // This is an example pattern. You can adapt it to your environment. + var users []*natssrv.User + for _, clientPub := range s.opts.AllowedClientPubKeys { + clientHex := prettyKeyHex(clientPub) + user := &natssrv.User{ + Username: fmt.Sprintf("CN=%s,OU=%s,O=Chainlink Data Streams", + clientHex[:32], // maybe first 32 chars for brevity + clientHex), + Permissions: &natssrv.Permissions{ + Publish: &natssrv.SubjectPermission{ + Allow: pubAllows, + }, + Subscribe: &natssrv.SubjectPermission{ + Allow: subAllows, + }, + }, + } + users = append(users, user) + } + + // Configure the embedded server. + natsOpts := &natssrv.Options{ + Host: s.opts.Host, + Port: s.opts.Port, + NoLog: false, + NoSigs: true, + Logtime: true, + Debug: false, + Trace: false, + TLSConfig: serverTLSConfig, + TLSHandshakeFirst: true, + AllowNonTLS: false, + TLSMap: true, + // Connection limits & server resource constraints + MaxConn: 1000, + MaxSubs: 100, + MaxPayload: 512 * 1024, // 512KB + MaxPending: 2 * 1024 * 1024, + // Timeouts + WriteDeadline: 1 * time.Second, + AuthTimeout: 2.0, + TLSTimeout: 2.0, + // Ping intervals + PingInterval: 2 * time.Second, + MaxPingsOut: 3, + // Lame-duck + LameDuckDuration: 30 * time.Second, + LameDuckGracePeriod: 10 * time.Second, + // User / permissions + Users: users, + } + + // Spin up the NATS server instance (this *does not* block). + ns, err := natssrv.NewServer(natsOpts) + if err != nil { + return fmt.Errorf("failed to create embedded NATS server: %w", err) + } + + // Start listening for connections in a goroutine. Because NATSServer doesn’t block on Start(), + // you can proceed to do readiness checks to see if the server is ready for connections. + go ns.Start() + + s.srv = ns + // Prepare our final "URL()" that clients can connect to + // e.g. "tls://HOST:PORT" + addr := fmt.Sprintf("tls://%s:%d", s.opts.Host, s.opts.Port) + s.urls = []string{addr} + + s.lggr.Info("NATS server is starting", "host", s.opts.Host, "port", s.opts.Port) + return nil +} + +// close gracefully shuts down the NATS server. +func (s *serverImpl) close() error { + if s.srv == nil { + return nil + } + s.lggr.Info("Shutting down NATS server", "host", s.opts.Host, "port", s.opts.Port) + s.srv.Shutdown() + return nil +} + +// Healthy implements the Service interface and indicates if the NATS server +// is logically healthy (i.e. we have a non-nil server pointer). +func (s *serverImpl) Healthy() error { + if s.srv == nil { + return fmt.Errorf("NATS server is nil") + } + // Optionally, check internal server status or track metrics. + return nil +} + +// Ready implements the Service interface and indicates if the server is +// ready to accept connections. We can use the server's built-in readiness check. +func (s *serverImpl) Ready() error { + if s.srv == nil { + return fmt.Errorf("NATS server is nil") + } + // If we fail readiness (e.g. server not started or ephemeral failure), return an error. + // 0-second wait means immediate check. + if !s.srv.ReadyForConnections(0 * time.Second) { + return fmt.Errorf("NATS server is not ready for connections") + } + return nil +} + +// HealthReport aggregates the server's health in a map with the key as the +// service name. +func (s *serverImpl) HealthReport() map[string]error { + return map[string]error{s.Name(): s.Healthy()} +} + +// Name returns this service's name. If the logger has a name, use it. Otherwise "NATSServer". +func (s *serverImpl) Name() string { + if s.lggr == nil { + return "NATSServer" + } + return s.lggr.Name() +} + +// URL returns the array of server URLs that clients can connect to, e.g. ["tls://localhost:4222"]. +func (s *serverImpl) URL() []string { + return s.urls +} + +// prettyKeyHex helper function to turn ed25519.PublicKey or generic crypto.PublicKey into hex. +func prettyKeyHex(pub crypto.PublicKey) string { + switch k := pub.(type) { + case ed25519.PublicKey: + return fmt.Sprintf("%x", k) + default: + return "unknown_public_key" + } +}