Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions .github/workflows/framework-golden-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ jobs:
config: parallel_ton.toml
count: 1
timeout: 10m
- name: TestUpgrade
config: upgrade.toml
count: 1
timeout: 10m
# Disabled for now, we need to replace Pumba chaos driver
# - name: TestUpgrade
# config: upgrade.toml
# count: 1
# timeout: 10m
- name: TestPerformanceBaseline
config: performance_baseline.toml
count: 1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,4 @@ jobs:
just-version: '1.39.0'
- name: Run tests
run: |
just test ${{ matrix.test.path }} ${{ matrix.test.regex }}
just test ${{ matrix.test.path }} ${{ matrix.test.regex }}
1 change: 1 addition & 0 deletions framework/.changeset/v0.14.1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Forwarder API for clients
61 changes: 44 additions & 17 deletions framework/clclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import (
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/crypto"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"

"github.com/smartcontractkit/chainlink-testing-framework/framework"
"github.com/smartcontractkit/chainlink-testing-framework/framework/components/clnode"

"github.com/ethereum/go-ethereum/common"
"github.com/go-resty/resty/v2"
"github.com/rs/zerolog/log"
"golang.org/x/sync/errgroup"
)

Expand All @@ -33,6 +33,14 @@ const (
ChainlinkKeyPassword string = "twochains"
// NodeURL string for logging
NodeURL string = "Node URL"
// DefaultRetries default CL node client retries
DefaultRetries = 30
// DefaultRetryInterval default CL node client retry interval
DefaultRetryInterval = 5 * time.Second
// DefaultTimeout is a default CL node client timeout
DefaultTimeout = 10 * time.Second
// DefaultAuthRetryCount is a default CL node authorization retry count
DefaultAuthRetryCount = 20
)

var (
Expand Down Expand Up @@ -82,36 +90,56 @@ func New(outs []*clnode.Output) ([]*ChainlinkClient, error) {
return clients, nil
}

func initRestyClient(url string, email string, password string, headers map[string]string, timeout *time.Duration) (*resty.Client, error) {
func initRestyClient(url string, email string, password string, headers map[string]string, _ *time.Duration) (*resty.Client, error) {
isDebug := os.Getenv("RESTY_DEBUG") == "true"
// G402 - TODO: certificates
//nolint
rc := resty.New().SetBaseURL(url).SetHeaders(headers).SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}).SetDebug(isDebug)
if timeout != nil {
rc.SetTimeout(*timeout)
}
session := &Session{Email: email, Password: password}
// Retry the connection on boot up, sometimes pods can still be starting up and not ready to accept connections
var resp *resty.Response
s := &Session{Email: email, Password: password}
rc := resty.New().
SetBaseURL(url).
SetHeaders(headers).
SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}). //nolint:gosec
SetDebug(isDebug).
SetRetryWaitTime(DefaultRetryInterval).
SetRetryCount(DefaultRetries).
SetRetryAfter(func(c *resty.Client, r *resty.Response) (time.Duration, error) {
return DefaultRetryInterval, nil
}).
SetTimeout(DefaultTimeout)

rc.AddRetryCondition(func(r *resty.Response, err error) bool {
return r.StatusCode() == http.StatusNotFound
})

// Retry the connection on boot up, retrying authorization every time slows down AddRetryCondition too much
var err error
retryCount := 20
for i := 0; i < retryCount; i++ {
resp, err = rc.R().SetBody(session).Post("/sessions")
for i := 0; i < DefaultAuthRetryCount; i++ {
err = Authorize(rc, s)
if err != nil {
log.Warn().Err(err).Str("URL", url).Interface("Session Details", session).Msg("Error connecting to Chainlink node, retrying")
time.Sleep(5 * time.Second)
log.Warn().Err(err).Str("URL", url).Interface("Session Details", s).Msg("Error connecting to Chainlink node, retrying")
time.Sleep(DefaultRetryInterval)
} else {
break
}
}
if err != nil {
return nil, fmt.Errorf("error connecting to chainlink node after %d attempts: %w", retryCount, err)
return nil, fmt.Errorf("error connecting to chainlink node after %d attempts: %w", DefaultAuthRetryCount, err)
}
rc.SetCookies(resp.Cookies())
framework.L.Debug().Str("URL", url).Msg("Connected to Chainlink node")
return rc, nil
}

func Authorize(rc *resty.Client, session *Session) error {
resp, err := rc.R().
SetBody(session).
Post("/sessions")
if err != nil {
return fmt.Errorf("error authorizing in CL node: %w", err)
}
rc.SetCookies(resp.Cookies())
return nil
}

// URL Chainlink instance http url
func (c *ChainlinkClient) URL() string {
return c.Config.URL
Expand Down Expand Up @@ -192,7 +220,6 @@ func (c *ChainlinkClient) WaitHealthy(pattern, status string, attempts uint) err
Msg("Retrying health check")
}),
)

if err != nil {
return fmt.Errorf("health check failed after %d attempts: %w", attempts, err)
}
Expand Down
7 changes: 4 additions & 3 deletions framework/components/blockchain/anvil.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func newAnvil(ctx context.Context, in *Input) (*Output, error) {
framework.L.Info().Any("Cmd", strings.Join(entryPoint, " ")).Msg("Creating anvil with command")

if pods.K8sEnabled() {
_, err := pods.Run(ctx, &pods.Config{
_, svc, err := pods.Run(ctx, &pods.Config{
Pods: []*pods.PodConfig{
{
Name: pods.Ptr(in.ContainerName),
Expand All @@ -86,8 +86,9 @@ func newAnvil(ctx context.Context, in *Input) (*Output, error) {
ContainerName: in.ContainerName,
Nodes: []*Node{
{
ExternalWSUrl: fmt.Sprintf("ws://%s:%s", fmt.Sprintf("%s-svc", in.ContainerName), in.Port),
ExternalHTTPUrl: fmt.Sprintf("http://%s:%s", fmt.Sprintf("%s-svc", in.ContainerName), in.Port),
K8sService: svc,
ExternalWSUrl: fmt.Sprintf("ws://%s:%s", "localhost", in.Port),
ExternalHTTPUrl: fmt.Sprintf("http://%s:%s", "localhost", in.Port),
InternalWSUrl: fmt.Sprintf("ws://%s:%s", fmt.Sprintf("%s-svc", in.ContainerName), in.Port),
InternalHTTPUrl: fmt.Sprintf("http://%s:%s", fmt.Sprintf("%s-svc", in.ContainerName), in.Port),
},
Expand Down
4 changes: 4 additions & 0 deletions framework/components/blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

v1 "k8s.io/api/core/v1"

"github.com/testcontainers/testcontainers-go"

"github.com/smartcontractkit/chainlink-testing-framework/framework"
Expand Down Expand Up @@ -101,6 +103,8 @@ type Node struct {
ExternalHTTPUrl string `toml:"http_url" comment:"External blockchain node HTTP URL"`
InternalWSUrl string `toml:"internal_ws_url" comment:"Internal blockchain node WebSocket URL"`
InternalHTTPUrl string `toml:"internal_http_url" comment:"Internal blockchain node HTTP URL"`
// K8sService is a Kubernetes service spec used to connect locally
K8sService *v1.Service `toml:"k8s_service" comment:"Kubernetes service spec used to connect locally"`
}

func NewBlockchainNetwork(in *Input) (*Output, error) {
Expand Down
22 changes: 19 additions & 3 deletions framework/components/clnode/clnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ type NodeOut struct {
InternalP2PUrl string `toml:"p2p_internal_url" comment:"Node internal P2P URL"`
// InternalIP node internal IP
InternalIP string `toml:"internal_ip" comment:"Node internal IP"`
// K8sService is a Kubernetes service spec used to connect locally
K8sService *v1.Service `toml:"k8s_service" comment:"Kubernetes service spec used to connect locally"`
}

// NewNodeWithDB create a new Chainlink node with some image:tag and one or several configs
Expand Down Expand Up @@ -198,14 +200,21 @@ func natPortsToK8sFormat(nat nat.PortMap) []string {
// exposes custom_ports in format "host:docker" or map 1-to-1 if only "host" port is provided
func generatePortBindings(in *Input) ([]string, nat.PortMap, error) {
httpPort := fmt.Sprintf("%s/tcp", DefaultHTTPPort)
exposedPorts := []string{httpPort}
p2pPort := fmt.Sprintf("%s/udp", DefaultP2PPort)
exposedPorts := []string{httpPort, p2pPort}
portBindings := nat.PortMap{
nat.Port(httpPort): []nat.PortBinding{
{
HostIP: "0.0.0.0",
HostPort: strconv.Itoa(in.Node.HTTPPort),
},
},
nat.Port(p2pPort): []nat.PortBinding{
{
HostIP: "0.0.0.0",
HostPort: strconv.Itoa(in.Node.P2PPort),
},
},
}
if os.Getenv("CTF_CLNODE_DLV") == "true" {
innerDebuggerPort := fmt.Sprintf("%d/tcp", DefaultDebuggerPort)
Expand Down Expand Up @@ -304,9 +313,14 @@ func newNode(ctx context.Context, in *Input, pgOut *postgres.Output) (*NodeOut,
return nil, err
}

defaultHTTPPortInt, err := strconv.Atoi(DefaultHTTPPort)
if err != nil {
return nil, err
}

// k8s deployment
if pods.K8sEnabled() {
_, err := pods.Run(ctx, &pods.Config{
_, svc, err := pods.Run(ctx, &pods.Config{
Pods: []*pods.PodConfig{
{
Name: pods.Ptr(containerName),
Expand All @@ -321,6 +335,7 @@ func newNode(ctx context.Context, in *Input, pgOut *postgres.Output) (*NodeOut,
RunAsUser: pods.Ptr[int64](14933),
RunAsGroup: pods.Ptr[int64](999),
},
ReadinessProbe: pods.TCPReadyProbe(defaultHTTPPortInt),
ConfigMap: map[string]string{
"config.toml": cfg,
"overrides.toml": in.Node.TestConfigOverrides,
Expand Down Expand Up @@ -357,9 +372,10 @@ func newNode(ctx context.Context, in *Input, pgOut *postgres.Output) (*NodeOut,
APIAuthUser: DefaultAPIUser,
APIAuthPassword: DefaultAPIPassword,
ContainerName: containerName,
ExternalURL: fmt.Sprintf("http://%s:%d", fmt.Sprintf("%s-svc", containerName), in.Node.HTTPPort),
ExternalURL: fmt.Sprintf("http://%s:%d", "localhost", in.Node.HTTPPort),
InternalURL: fmt.Sprintf("http://%s:%s", containerName, DefaultHTTPPort),
InternalP2PUrl: fmt.Sprintf("http://%s:%s", containerName, DefaultP2PPort),
K8sService: svc,
}, nil
}
// local deployment
Expand Down
11 changes: 6 additions & 5 deletions framework/components/fake/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type Output struct {
UseCache bool `toml:"use_cache" comment:"Whether to respect caching or not, if cache = true component won't be deployed again"`
BaseURLHost string `toml:"base_url_host" comment:"Base URL which can be used when running locally"`
BaseURLDocker string `toml:"base_url_docker" comment:"Base URL to reach fakes service from other Docker containers"`
// K8sService is a Kubernetes service spec used to connect locally
K8sService *v1.Service `toml:"k8s_service" comment:"Kubernetes service spec used to connect locally"`
}

// NewDockerFakeDataProvider creates new fake data provider in Docker using testcontainers-go
Expand All @@ -40,7 +42,7 @@ func NewWithContext(ctx context.Context, in *Input) (*Output, error) {
bindPort := fmt.Sprintf("%d/tcp", in.Port)
containerName := framework.DefaultTCName("fake")
if pods.K8sEnabled() {
_, err := pods.Run(ctx, &pods.Config{
_, svc, err := pods.Run(ctx, &pods.Config{
Pods: []*pods.PodConfig{
{
Name: pods.Ptr(containerName),
Expand All @@ -59,13 +61,12 @@ func NewWithContext(ctx context.Context, in *Input) (*Output, error) {
return nil, err
}
in.Out = &Output{
BaseURLHost: fmt.Sprintf("http://%s:%d", fmt.Sprintf("%s-svc", containerName), in.Port),
BaseURLDocker: fmt.Sprintf("http://%s:%d", containerName, in.Port),
K8sService: svc,
BaseURLHost: fmt.Sprintf("http://%s:%d", "localhost", in.Port),
BaseURLDocker: fmt.Sprintf("http://%s:%d", fmt.Sprintf("%s-svc", containerName), in.Port),
}
return in.Out, nil
}
// if pods.K8sEnabled() {
// }
req := tc.ContainerRequest{
Name: containerName,
Image: in.Image,
Expand Down
13 changes: 8 additions & 5 deletions framework/components/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type Output struct {
JDUrl string `toml:"jd_url" comment:"PostgreSQL internal connection URL to JobDistributor database"`
// JDInternalURL PostgreSQL internal connection URL to JobDistributor database
JDInternalURL string `toml:"jd_internal_url" comment:"PostgreSQL internal connection URL to JobDistributor database"`
// K8sService is a Kubernetes service spec used to connect locally
K8sService *v1.Service `toml:"k8s_service" comment:"Kubernetes service spec used to connect locally"`
}

func NewPostgreSQL(in *Input) (*Output, error) {
Expand Down Expand Up @@ -127,7 +129,7 @@ func NewWithContext(ctx context.Context, in *Input) (*Output, error) {

// k8s deployment
if pods.K8sEnabled() {
_, err := pods.Run(ctx, &pods.Config{
_, svc, err := pods.Run(ctx, &pods.Config{
Pods: []*pods.PodConfig{
{
Name: pods.Ptr(in.Name),
Expand All @@ -147,8 +149,8 @@ func NewWithContext(ctx context.Context, in *Input) (*Output, error) {
Value: Database,
},
},
Requests: pods.ResourcesLarge(),
Limits: pods.ResourcesLarge(),
Requests: pods.ResourcesMedium(),
Limits: pods.ResourcesMedium(),
// container and pod security settings are specific to
// 'postgres' Docker image
ContainerSecurityContext: &v1.SecurityContext{
Expand All @@ -172,6 +174,7 @@ func NewWithContext(ctx context.Context, in *Input) (*Output, error) {
return nil, err
}
o = &Output{
K8sService: svc,
ContainerName: containerName,
InternalURL: fmt.Sprintf(
"postgresql://%s:%s@%s:%d/%s?sslmode=disable",
Expand All @@ -186,7 +189,7 @@ func NewWithContext(ctx context.Context, in *Input) (*Output, error) {
"postgresql://%s:%s@%s:%d/%s?sslmode=disable",
User,
Password,
fmt.Sprintf("%s-svc", in.Name),
"localhost",
portToExpose,
Database,
),
Expand All @@ -204,7 +207,7 @@ func NewWithContext(ctx context.Context, in *Input) (*Output, error) {
"postgresql://%s:%s@%s:%d/%s?sslmode=disable",
User,
Password,
fmt.Sprintf("%s-svc", in.Name),
"localhost",
portToExpose,
JDDatabase,
)
Expand Down
17 changes: 17 additions & 0 deletions framework/pods/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,25 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

// TCPReadyProbe is a default TCP port probe
func TCPReadyProbe(port int) *v1.Probe {
return &v1.Probe{
ProbeHandler: v1.ProbeHandler{
TCPSocket: &v1.TCPSocketAction{
Port: intstr.FromInt(port),
},
},
InitialDelaySeconds: 5,
PeriodSeconds: 10,
FailureThreshold: 3,
SuccessThreshold: 1,
TimeoutSeconds: 5,
}
}

func Resources(cpu, mem string) map[string]string {
return map[string]string{
"cpu": cpu,
Expand Down
Loading
Loading