diff --git a/.github/workflows/framework-golden-tests.yml b/.github/workflows/framework-golden-tests.yml index 52964ec48..280ddade7 100644 --- a/.github/workflows/framework-golden-tests.yml +++ b/.github/workflows/framework-golden-tests.yml @@ -55,10 +55,11 @@ jobs: config: parallel_ton.toml count: 1 timeout: 10m - - name: TestUpgrade - config: upgrade.toml - count: 1 - timeout: 10m + # Disabled for now, we need to replace Pumba chaos driver + # - name: TestUpgrade + # config: upgrade.toml + # count: 1 + # timeout: 10m - name: TestPerformanceBaseline config: performance_baseline.toml count: 1 diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 560cf0940..ba6fefd84 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -100,4 +100,4 @@ jobs: just-version: '1.39.0' - name: Run tests run: | - just test ${{ matrix.test.path }} ${{ matrix.test.regex }} \ No newline at end of file + just test ${{ matrix.test.path }} ${{ matrix.test.regex }} diff --git a/framework/.changeset/v0.14.1.md b/framework/.changeset/v0.14.1.md new file mode 100644 index 000000000..de85af59d --- /dev/null +++ b/framework/.changeset/v0.14.1.md @@ -0,0 +1 @@ +- Forwarder API for clients \ No newline at end of file diff --git a/framework/clclient/client.go b/framework/clclient/client.go index 6f2093687..ec4ab0cf0 100644 --- a/framework/clclient/client.go +++ b/framework/clclient/client.go @@ -18,13 +18,13 @@ import ( "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/crypto" "github.com/pkg/errors" + "github.com/rs/zerolog/log" "github.com/smartcontractkit/chainlink-testing-framework/framework" "github.com/smartcontractkit/chainlink-testing-framework/framework/components/clnode" "github.com/ethereum/go-ethereum/common" "github.com/go-resty/resty/v2" - "github.com/rs/zerolog/log" "golang.org/x/sync/errgroup" ) @@ -33,6 +33,14 @@ const ( ChainlinkKeyPassword string = "twochains" // NodeURL string for logging NodeURL string = "Node URL" + // DefaultRetries default CL node client retries + DefaultRetries = 30 + // DefaultRetryInterval default CL node client retry interval + DefaultRetryInterval = 5 * time.Second + // DefaultTimeout is a default CL node client timeout + DefaultTimeout = 10 * time.Second + // DefaultAuthRetryCount is a default CL node authorization retry count + DefaultAuthRetryCount = 20 ) var ( @@ -82,36 +90,56 @@ func New(outs []*clnode.Output) ([]*ChainlinkClient, error) { return clients, nil } -func initRestyClient(url string, email string, password string, headers map[string]string, timeout *time.Duration) (*resty.Client, error) { +func initRestyClient(url string, email string, password string, headers map[string]string, _ *time.Duration) (*resty.Client, error) { isDebug := os.Getenv("RESTY_DEBUG") == "true" // G402 - TODO: certificates //nolint - rc := resty.New().SetBaseURL(url).SetHeaders(headers).SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}).SetDebug(isDebug) - if timeout != nil { - rc.SetTimeout(*timeout) - } - session := &Session{Email: email, Password: password} - // Retry the connection on boot up, sometimes pods can still be starting up and not ready to accept connections - var resp *resty.Response + s := &Session{Email: email, Password: password} + rc := resty.New(). + SetBaseURL(url). + SetHeaders(headers). + SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}). //nolint:gosec + SetDebug(isDebug). + SetRetryWaitTime(DefaultRetryInterval). + SetRetryCount(DefaultRetries). + SetRetryAfter(func(c *resty.Client, r *resty.Response) (time.Duration, error) { + return DefaultRetryInterval, nil + }). + SetTimeout(DefaultTimeout) + + rc.AddRetryCondition(func(r *resty.Response, err error) bool { + return r.StatusCode() == http.StatusNotFound + }) + + // Retry the connection on boot up, retrying authorization every time slows down AddRetryCondition too much var err error - retryCount := 20 - for i := 0; i < retryCount; i++ { - resp, err = rc.R().SetBody(session).Post("/sessions") + for i := 0; i < DefaultAuthRetryCount; i++ { + err = Authorize(rc, s) if err != nil { - log.Warn().Err(err).Str("URL", url).Interface("Session Details", session).Msg("Error connecting to Chainlink node, retrying") - time.Sleep(5 * time.Second) + log.Warn().Err(err).Str("URL", url).Interface("Session Details", s).Msg("Error connecting to Chainlink node, retrying") + time.Sleep(DefaultRetryInterval) } else { break } } if err != nil { - return nil, fmt.Errorf("error connecting to chainlink node after %d attempts: %w", retryCount, err) + return nil, fmt.Errorf("error connecting to chainlink node after %d attempts: %w", DefaultAuthRetryCount, err) } - rc.SetCookies(resp.Cookies()) framework.L.Debug().Str("URL", url).Msg("Connected to Chainlink node") return rc, nil } +func Authorize(rc *resty.Client, session *Session) error { + resp, err := rc.R(). + SetBody(session). + Post("/sessions") + if err != nil { + return fmt.Errorf("error authorizing in CL node: %w", err) + } + rc.SetCookies(resp.Cookies()) + return nil +} + // URL Chainlink instance http url func (c *ChainlinkClient) URL() string { return c.Config.URL @@ -192,7 +220,6 @@ func (c *ChainlinkClient) WaitHealthy(pattern, status string, attempts uint) err Msg("Retrying health check") }), ) - if err != nil { return fmt.Errorf("health check failed after %d attempts: %w", attempts, err) } diff --git a/framework/components/blockchain/anvil.go b/framework/components/blockchain/anvil.go index 581576d10..f8fac1a7e 100644 --- a/framework/components/blockchain/anvil.go +++ b/framework/components/blockchain/anvil.go @@ -59,7 +59,7 @@ func newAnvil(ctx context.Context, in *Input) (*Output, error) { framework.L.Info().Any("Cmd", strings.Join(entryPoint, " ")).Msg("Creating anvil with command") if pods.K8sEnabled() { - _, err := pods.Run(ctx, &pods.Config{ + _, svc, err := pods.Run(ctx, &pods.Config{ Pods: []*pods.PodConfig{ { Name: pods.Ptr(in.ContainerName), @@ -86,8 +86,9 @@ func newAnvil(ctx context.Context, in *Input) (*Output, error) { ContainerName: in.ContainerName, Nodes: []*Node{ { - ExternalWSUrl: fmt.Sprintf("ws://%s:%s", fmt.Sprintf("%s-svc", in.ContainerName), in.Port), - ExternalHTTPUrl: fmt.Sprintf("http://%s:%s", fmt.Sprintf("%s-svc", in.ContainerName), in.Port), + K8sService: svc, + ExternalWSUrl: fmt.Sprintf("ws://%s:%s", "localhost", in.Port), + ExternalHTTPUrl: fmt.Sprintf("http://%s:%s", "localhost", in.Port), InternalWSUrl: fmt.Sprintf("ws://%s:%s", fmt.Sprintf("%s-svc", in.ContainerName), in.Port), InternalHTTPUrl: fmt.Sprintf("http://%s:%s", fmt.Sprintf("%s-svc", in.ContainerName), in.Port), }, diff --git a/framework/components/blockchain/blockchain.go b/framework/components/blockchain/blockchain.go index 710f31b92..05a4d30c6 100644 --- a/framework/components/blockchain/blockchain.go +++ b/framework/components/blockchain/blockchain.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + v1 "k8s.io/api/core/v1" + "github.com/testcontainers/testcontainers-go" "github.com/smartcontractkit/chainlink-testing-framework/framework" @@ -101,6 +103,8 @@ type Node struct { ExternalHTTPUrl string `toml:"http_url" comment:"External blockchain node HTTP URL"` InternalWSUrl string `toml:"internal_ws_url" comment:"Internal blockchain node WebSocket URL"` InternalHTTPUrl string `toml:"internal_http_url" comment:"Internal blockchain node HTTP URL"` + // K8sService is a Kubernetes service spec used to connect locally + K8sService *v1.Service `toml:"k8s_service" comment:"Kubernetes service spec used to connect locally"` } func NewBlockchainNetwork(in *Input) (*Output, error) { diff --git a/framework/components/clnode/clnode.go b/framework/components/clnode/clnode.go index 2fe51a8a1..3e1323eb3 100644 --- a/framework/components/clnode/clnode.go +++ b/framework/components/clnode/clnode.go @@ -117,6 +117,8 @@ type NodeOut struct { InternalP2PUrl string `toml:"p2p_internal_url" comment:"Node internal P2P URL"` // InternalIP node internal IP InternalIP string `toml:"internal_ip" comment:"Node internal IP"` + // K8sService is a Kubernetes service spec used to connect locally + K8sService *v1.Service `toml:"k8s_service" comment:"Kubernetes service spec used to connect locally"` } // NewNodeWithDB create a new Chainlink node with some image:tag and one or several configs @@ -198,7 +200,8 @@ func natPortsToK8sFormat(nat nat.PortMap) []string { // exposes custom_ports in format "host:docker" or map 1-to-1 if only "host" port is provided func generatePortBindings(in *Input) ([]string, nat.PortMap, error) { httpPort := fmt.Sprintf("%s/tcp", DefaultHTTPPort) - exposedPorts := []string{httpPort} + p2pPort := fmt.Sprintf("%s/udp", DefaultP2PPort) + exposedPorts := []string{httpPort, p2pPort} portBindings := nat.PortMap{ nat.Port(httpPort): []nat.PortBinding{ { @@ -206,6 +209,12 @@ func generatePortBindings(in *Input) ([]string, nat.PortMap, error) { HostPort: strconv.Itoa(in.Node.HTTPPort), }, }, + nat.Port(p2pPort): []nat.PortBinding{ + { + HostIP: "0.0.0.0", + HostPort: strconv.Itoa(in.Node.P2PPort), + }, + }, } if os.Getenv("CTF_CLNODE_DLV") == "true" { innerDebuggerPort := fmt.Sprintf("%d/tcp", DefaultDebuggerPort) @@ -304,9 +313,14 @@ func newNode(ctx context.Context, in *Input, pgOut *postgres.Output) (*NodeOut, return nil, err } + defaultHTTPPortInt, err := strconv.Atoi(DefaultHTTPPort) + if err != nil { + return nil, err + } + // k8s deployment if pods.K8sEnabled() { - _, err := pods.Run(ctx, &pods.Config{ + _, svc, err := pods.Run(ctx, &pods.Config{ Pods: []*pods.PodConfig{ { Name: pods.Ptr(containerName), @@ -321,6 +335,7 @@ func newNode(ctx context.Context, in *Input, pgOut *postgres.Output) (*NodeOut, RunAsUser: pods.Ptr[int64](14933), RunAsGroup: pods.Ptr[int64](999), }, + ReadinessProbe: pods.TCPReadyProbe(defaultHTTPPortInt), ConfigMap: map[string]string{ "config.toml": cfg, "overrides.toml": in.Node.TestConfigOverrides, @@ -357,9 +372,10 @@ func newNode(ctx context.Context, in *Input, pgOut *postgres.Output) (*NodeOut, APIAuthUser: DefaultAPIUser, APIAuthPassword: DefaultAPIPassword, ContainerName: containerName, - ExternalURL: fmt.Sprintf("http://%s:%d", fmt.Sprintf("%s-svc", containerName), in.Node.HTTPPort), + ExternalURL: fmt.Sprintf("http://%s:%d", "localhost", in.Node.HTTPPort), InternalURL: fmt.Sprintf("http://%s:%s", containerName, DefaultHTTPPort), InternalP2PUrl: fmt.Sprintf("http://%s:%s", containerName, DefaultP2PPort), + K8sService: svc, }, nil } // local deployment diff --git a/framework/components/fake/container.go b/framework/components/fake/container.go index 282e60673..d2729f430 100644 --- a/framework/components/fake/container.go +++ b/framework/components/fake/container.go @@ -25,6 +25,8 @@ type Output struct { UseCache bool `toml:"use_cache" comment:"Whether to respect caching or not, if cache = true component won't be deployed again"` BaseURLHost string `toml:"base_url_host" comment:"Base URL which can be used when running locally"` BaseURLDocker string `toml:"base_url_docker" comment:"Base URL to reach fakes service from other Docker containers"` + // K8sService is a Kubernetes service spec used to connect locally + K8sService *v1.Service `toml:"k8s_service" comment:"Kubernetes service spec used to connect locally"` } // NewDockerFakeDataProvider creates new fake data provider in Docker using testcontainers-go @@ -40,7 +42,7 @@ func NewWithContext(ctx context.Context, in *Input) (*Output, error) { bindPort := fmt.Sprintf("%d/tcp", in.Port) containerName := framework.DefaultTCName("fake") if pods.K8sEnabled() { - _, err := pods.Run(ctx, &pods.Config{ + _, svc, err := pods.Run(ctx, &pods.Config{ Pods: []*pods.PodConfig{ { Name: pods.Ptr(containerName), @@ -59,13 +61,12 @@ func NewWithContext(ctx context.Context, in *Input) (*Output, error) { return nil, err } in.Out = &Output{ - BaseURLHost: fmt.Sprintf("http://%s:%d", fmt.Sprintf("%s-svc", containerName), in.Port), - BaseURLDocker: fmt.Sprintf("http://%s:%d", containerName, in.Port), + K8sService: svc, + BaseURLHost: fmt.Sprintf("http://%s:%d", "localhost", in.Port), + BaseURLDocker: fmt.Sprintf("http://%s:%d", fmt.Sprintf("%s-svc", containerName), in.Port), } return in.Out, nil } - // if pods.K8sEnabled() { - // } req := tc.ContainerRequest{ Name: containerName, Image: in.Image, diff --git a/framework/components/postgres/postgres.go b/framework/components/postgres/postgres.go index 55d2b29ce..a47e789b4 100644 --- a/framework/components/postgres/postgres.go +++ b/framework/components/postgres/postgres.go @@ -64,6 +64,8 @@ type Output struct { JDUrl string `toml:"jd_url" comment:"PostgreSQL internal connection URL to JobDistributor database"` // JDInternalURL PostgreSQL internal connection URL to JobDistributor database JDInternalURL string `toml:"jd_internal_url" comment:"PostgreSQL internal connection URL to JobDistributor database"` + // K8sService is a Kubernetes service spec used to connect locally + K8sService *v1.Service `toml:"k8s_service" comment:"Kubernetes service spec used to connect locally"` } func NewPostgreSQL(in *Input) (*Output, error) { @@ -127,7 +129,7 @@ func NewWithContext(ctx context.Context, in *Input) (*Output, error) { // k8s deployment if pods.K8sEnabled() { - _, err := pods.Run(ctx, &pods.Config{ + _, svc, err := pods.Run(ctx, &pods.Config{ Pods: []*pods.PodConfig{ { Name: pods.Ptr(in.Name), @@ -147,8 +149,8 @@ func NewWithContext(ctx context.Context, in *Input) (*Output, error) { Value: Database, }, }, - Requests: pods.ResourcesLarge(), - Limits: pods.ResourcesLarge(), + Requests: pods.ResourcesMedium(), + Limits: pods.ResourcesMedium(), // container and pod security settings are specific to // 'postgres' Docker image ContainerSecurityContext: &v1.SecurityContext{ @@ -172,6 +174,7 @@ func NewWithContext(ctx context.Context, in *Input) (*Output, error) { return nil, err } o = &Output{ + K8sService: svc, ContainerName: containerName, InternalURL: fmt.Sprintf( "postgresql://%s:%s@%s:%d/%s?sslmode=disable", @@ -186,7 +189,7 @@ func NewWithContext(ctx context.Context, in *Input) (*Output, error) { "postgresql://%s:%s@%s:%d/%s?sslmode=disable", User, Password, - fmt.Sprintf("%s-svc", in.Name), + "localhost", portToExpose, Database, ), @@ -204,7 +207,7 @@ func NewWithContext(ctx context.Context, in *Input) (*Output, error) { "postgresql://%s:%s@%s:%d/%s?sslmode=disable", User, Password, - fmt.Sprintf("%s-svc", in.Name), + "localhost", portToExpose, JDDatabase, ) diff --git a/framework/pods/defaults.go b/framework/pods/defaults.go index f99c0b029..025eea792 100644 --- a/framework/pods/defaults.go +++ b/framework/pods/defaults.go @@ -6,8 +6,25 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" ) +// TCPReadyProbe is a default TCP port probe +func TCPReadyProbe(port int) *v1.Probe { + return &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + TCPSocket: &v1.TCPSocketAction{ + Port: intstr.FromInt(port), + }, + }, + InitialDelaySeconds: 5, + PeriodSeconds: 10, + FailureThreshold: 3, + SuccessThreshold: 1, + TimeoutSeconds: 5, + } +} + func Resources(cpu, mem string) map[string]string { return map[string]string{ "cpu": cpu, diff --git a/framework/pods/forward.go b/framework/pods/forward.go index cf6a95c97..1b087e048 100644 --- a/framework/pods/forward.go +++ b/framework/pods/forward.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "golang.org/x/sync/errgroup" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -17,16 +18,21 @@ import ( ) const ( - RetryDelay = 1 * time.Second - K8sAPITimeout = 2 * time.Minute + // ClientReadyTimeout is a timeout client (tests) will wait until abandoning attempts + ClientReadyTimeout = 1 * time.Minute + // RetryDelay is a delay before retrying forwarding + RetryDelay = 1 * time.Second + // K8sFunctionCallTimeout is a common K8s API timeout we use in functions + // function may contain multiple calls + K8sFunctionCallTimeout = 2 * time.Minute ) // PortForwardConfig represents a single port forward configuration type PortForwardConfig struct { - ServiceName string - LocalPort int - ContainerPort int - Namespace string + ServiceName string + LocalPort int + ServicePort int + Namespace string } func (c PortForwardConfig) validate() error { @@ -36,8 +42,8 @@ func (c PortForwardConfig) validate() error { if c.LocalPort == 0 { return fmt.Errorf("empty local port") } - if c.ContainerPort == 0 { - return fmt.Errorf("empty container port") + if c.ServicePort == 0 { + return fmt.Errorf("empty service port") } if c.ServiceName == "" { return fmt.Errorf("empty service name") @@ -57,7 +63,9 @@ type PortForwardManager struct { // forwardInfo holds information about a running port forward type forwardInfo struct { stopChan chan struct{} - cleanup func() + // signalClientReadyChan is used to signal the caller when first connection is established + signalClientReadyChan chan struct{} + cleanup func() } // NewForwarder creates a new manager for multiple port forwards @@ -72,20 +80,21 @@ func NewForwarder(api *API) *PortForwardManager { // startForwardService starts forwarding a single service port with retry logic func (m *PortForwardManager) startForwardService(cfg PortForwardConfig) { key := fmt.Sprintf("%s:%d", cfg.ServiceName, cfg.LocalPort) - stopChan := make(chan struct{}) + stopChan, readyChan := make(chan struct{}), make(chan struct{}) m.mu.Lock() m.forwards[key] = &forwardInfo{ - stopChan: stopChan, - cleanup: func() { close(stopChan) }, + stopChan: stopChan, + signalClientReadyChan: readyChan, + cleanup: func() { close(stopChan) }, } m.mu.Unlock() - go m.forwardAndRetry(cfg, stopChan) + go m.forwardAndRetry(cfg, stopChan, readyChan) } // forwardAndRetry continuously attempts to forward the port with retries -func (m *PortForwardManager) forwardAndRetry(cfg PortForwardConfig, stopChan <-chan struct{}) { +func (m *PortForwardManager) forwardAndRetry(cfg PortForwardConfig, stopChan <-chan struct{}, readyChan chan struct{}) { key := fmt.Sprintf("%s:%d", cfg.ServiceName, cfg.LocalPort) consecutiveFailures := 0 @@ -95,21 +104,21 @@ func (m *PortForwardManager) forwardAndRetry(cfg PortForwardConfig, stopChan <-c L.Info().Msgf("Stopped retry loop for %s", key) return default: - L.Info(). + L.Debug(). Str("ServiceName", cfg.ServiceName). Int("LocalPort", cfg.LocalPort). Msg("Starting forwarder") - err := m.attemptForward(cfg) + err := m.attemptForward(cfg, readyChan) if err != nil { // Connection failed or broke - retry consecutiveFailures++ - L.Error(). + L.Debug(). Err(err). Str("Key", key). Int("Attempt", consecutiveFailures). Msg("Port forward failed") - L.Info().Msgf("Retrying %s in %v", key, RetryDelay) + L.Debug().Msgf("Retrying %s in %v", key, RetryDelay) select { case <-stopChan: return @@ -126,12 +135,8 @@ func (m *PortForwardManager) forwardAndRetry(cfg PortForwardConfig, stopChan <-c } // attemptForward establishes and monitors a single port forward connection -func (m *PortForwardManager) attemptForward(cfg PortForwardConfig) error { +func (m *PortForwardManager) attemptForward(cfg PortForwardConfig, signalReadyChan chan struct{}) error { namespace := cfg.Namespace - if namespace == "" { - namespace = "default" - } - // Get target pod for forwarding targetPod, targetPort, err := m.getTargetPodAndPort(cfg, namespace) if err != nil { @@ -143,7 +148,7 @@ func (m *PortForwardManager) attemptForward(cfg PortForwardConfig) error { Int("LocalPort", cfg.LocalPort).Logger() l.Info().Msgf("Forwarding service %s:%d -> pod %s:%d -> localhost:%d", - cfg.ServiceName, cfg.ContainerPort, targetPod.Name, targetPort, cfg.LocalPort) + cfg.ServiceName, cfg.ServicePort, targetPod.Name, targetPort, cfg.LocalPort) // run port forward stopChan := make(chan struct{}) @@ -179,7 +184,12 @@ func (m *PortForwardManager) attemptForward(cfg PortForwardConfig) error { select { case <-readyChan: l.Info(). - Msg("🟢 established connection for %s:%d") + Msg("🟢 established connection") + // signal if client is waiting for the first connection established + select { + case signalReadyChan <- struct{}{}: + default: + } // error is received when someone is trying to call the local port // and connection is broken, it is not proactively checked select { @@ -211,7 +221,7 @@ func (m *PortForwardManager) attemptForward(cfg PortForwardConfig) error { // getTargetPodAndPort finds the target pod and resolves the port func (m *PortForwardManager) getTargetPodAndPort(cfg PortForwardConfig, namespace string) (*corev1.Pod, int, error) { - ctx, cancel := context.WithTimeout(context.Background(), K8sAPITimeout) + ctx, cancel := context.WithTimeout(context.Background(), K8sFunctionCallTimeout) defer cancel() service, err := m.cs.CoreV1().Services(namespace).Get(ctx, cfg.ServiceName, metav1.GetOptions{}) if err != nil { @@ -243,7 +253,7 @@ func (m *PortForwardManager) getTargetPodAndPort(cfg PortForwardConfig, namespac if targetPod == nil { return nil, 0, fmt.Errorf("no running pods found for service %s", cfg.ServiceName) } - targetPort, err := m.resolveServicePort(service, cfg.ContainerPort, targetPod) + targetPort, err := m.resolveServicePort(service, cfg.ServicePort, targetPod) if err != nil { return nil, 0, fmt.Errorf("failed to resolve port: %w", err) } @@ -251,13 +261,30 @@ func (m *PortForwardManager) getTargetPodAndPort(cfg PortForwardConfig, namespac } // Forward starts multiple port forwards concurrently -func (m *PortForwardManager) Forward(configs []PortForwardConfig) error { +func (m *PortForwardManager) Forward(configs []PortForwardConfig, waitForConnection bool) error { for _, cfg := range configs { if err := cfg.validate(); err != nil { return err } m.startForwardService(cfg) } + if waitForConnection { + // this code is used so library users can block until first successful connection + eg := &errgroup.Group{} + for _, fwd := range m.forwards { + eg.Go(func() error { + L.Info().Msg("Awaiting for first established connection") + select { + case <-fwd.signalClientReadyChan: + L.Info().Msg("Connection established") + return nil + case <-time.After(2 * time.Minute): + return fmt.Errorf("failed to forward ports until deadline") + } + }) + } + return eg.Wait() + } return nil } diff --git a/framework/pods/pods.go b/framework/pods/pods.go index 5effbc9f7..6fb582cce 100644 --- a/framework/pods/pods.go +++ b/framework/pods/pods.go @@ -79,39 +79,50 @@ type PodConfig struct { VolumeClaimTemplates []corev1.PersistentVolumeClaim } -// App is an application context with generated manifests +// App is an application context with a generated Kubernetes manifest type App struct { cfg *Config objects []any + svcObj *corev1.Service manifest string } +// K8sEnabled is a flag that means Kubernetes in enabled, used in framework components func K8sEnabled() bool { return os.Getenv(K8sNamespaceEnvVar) != "" } // Run generates and applies a new K8s YAML manifest -func Run(ctx context.Context, cfg *Config) (string, error) { +func Run(ctx context.Context, cfg *Config) (string, *corev1.Service, error) { var err error if cfg.Namespace == "" { cfg.Namespace = os.Getenv(K8sNamespaceEnvVar) } Client, err = NewAPI(cfg.Namespace) if err != nil { - return "", fmt.Errorf("failed to create K8s client: %w", err) + return "", nil, fmt.Errorf("failed to create K8s client: %w", err) } if Client != nil { if err := Client.CreateNamespace(ctx, cfg.Namespace); err != nil { - return "", fmt.Errorf("failed to create namespace: %s, %w", cfg.Namespace, err) + return "", nil, fmt.Errorf("failed to create namespace: %s, %w", cfg.Namespace, err) } } p := &App{ cfg: cfg, } if err := p.generate(); err != nil { - return "", err + return p.Manifest(), nil, err } - return p.Manifest(), p.apply() + svc, err := p.apply() //nolint + if err != nil { + return p.Manifest(), svc, err + } + return p.Manifest(), svc, nil +} + +// GetConnectionDetails returns connection details needed to forward ports +func (n *App) GetConnectionDetails() *corev1.Service { + return n.svcObj } // generate provides a simplified template that is focused on deploying K8s Pods @@ -408,6 +419,7 @@ func (n *App) generate() error { Selector: labels, }, } + n.svcObj = service n.objects = append(n.objects, service) } } @@ -427,30 +439,59 @@ func (n *App) generate() error { return nil } -func (n *App) apply() error { +func (n *App) apply() (*corev1.Service, error) { if os.Getenv("SNAPSHOT_TESTS") == "true" { - return nil + return nil, nil } if n.manifest == "" { - return fmt.Errorf("manifest is empty, nothing to generate") + return nil, fmt.Errorf("manifest is empty, nothing to generate") } _ = os.Mkdir(ManifestsDir, os.ModePerm) // write generate manifest manifestFile := filepath.Join(ManifestsDir, fmt.Sprintf("pods-%s.tmp.yml", uuid.NewString()[0:5])) err := os.WriteFile(manifestFile, []byte(n.manifest), 0o600) if err != nil { - return fmt.Errorf("failed to write manifest to file: %w", err) + return nil, fmt.Errorf("failed to write manifest to file: %w", err) } // apply the manifest cmd := exec.Command("kubectl", "apply", "-f", manifestFile, "--wait=true") output, err := cmd.CombinedOutput() if err != nil { - return fmt.Errorf("failed to apply manifest: %v\nOutput: %s", err, string(output)) + return nil, fmt.Errorf("failed to apply manifest: %v\nOutput: %s", err, string(output)) } L.Info().Str("Manifest", manifestFile).Msg("Manifest applied successfully") - return nil + return n.svcObj, Connect(n.svcObj, false) +} + +// Connect connects service to localhost, the same method is used internally +// by environment and externally by tests +// 'blocking' means it'd wait until first successful port connection +func Connect(svc *corev1.Service, waitForConnection bool) error { + ns := os.Getenv(K8sNamespaceEnvVar) + if ns == "" { + return fmt.Errorf("empty namespace") + } + var err error + if Client == nil { + Client, err = NewAPI(ns) + if err != nil { + return err + } + } + f := NewForwarder(Client) + forwardConfigs := make([]PortForwardConfig, 0) + for _, p := range svc.Spec.Ports { + forwardConfigs = append(forwardConfigs, PortForwardConfig{ + Namespace: ns, + ServiceName: svc.Name, + LocalPort: int(p.Port), + ServicePort: int(p.Port), + }) + } + return f.Forward(forwardConfigs, waitForConnection) } +// WaitReady waits for all pods to be in status ready func WaitReady(ctx context.Context, t time.Duration) error { _, err := Client.waitAllPodsReady(ctx, t) return err diff --git a/framework/pods/pods_test.go b/framework/pods/pods_test.go index 15aba36a1..431e71e96 100644 --- a/framework/pods/pods_test.go +++ b/framework/pods/pods_test.go @@ -182,7 +182,7 @@ func TestPods(t *testing.T) { if os.Getenv("CI") == "true" && tt.skipCI { t.Skip("this test can't be run in CI because of GHA limitations") } - manifest, err := p.Run(context.Background(), tt.props) + manifest, _, err := p.Run(context.Background(), tt.props) tt.validateManifest(t, err) if err == nil { snaps.MatchSnapshot(t, manifest)