diff --git a/internal/data/dataexport/cmd/create/create.go b/internal/data/dataexport/cmd/create/create.go index b01ec8dc..6eccb9ce 100644 --- a/internal/data/dataexport/cmd/create/create.go +++ b/internal/data/dataexport/cmd/create/create.go @@ -99,7 +99,6 @@ func Run(ctx context.Context, log *slog.Logger, cmd *cobra.Command, args []strin defer cancel() namespace, _ := cmd.Flags().GetString("namespace") ttl, _ := cmd.Flags().GetString("ttl") - publish, _ := cmd.Flags().GetBool("publish") deName, volumeKind, volumeName, err := parseArgs(args) if err != nil { @@ -107,11 +106,21 @@ func Run(ctx context.Context, log *slog.Logger, cmd *cobra.Command, args []strin } flags := cmd.PersistentFlags() - safeClient, err := safeClient.NewSafeClient(flags) + sc, err := safeClient.NewSafeClient(flags) if err != nil { return err } - rtClient, err := safeClient.NewRTClient(v1alpha1.AddToScheme) + rtClient, err := sc.NewRTClient(v1alpha1.AddToScheme) + if err != nil { + return err + } + + publishFlag, err := dataio.ParsePublishFlag(cmd.Flags()) + if err != nil { + return err + } + + publish, err := dataio.ResolvePublish(ctx, publishFlag, rtClient, sc, log) if err != nil { return err } diff --git a/internal/data/dataexport/cmd/download/download.go b/internal/data/dataexport/cmd/download/download.go index 9e1d740f..97274647 100644 --- a/internal/data/dataexport/cmd/download/download.go +++ b/internal/data/dataexport/cmd/download/download.go @@ -243,7 +243,6 @@ func recursiveDownload(ctx context.Context, sClient *safeClient.SafeClient, log func Run(ctx context.Context, log *slog.Logger, cmd *cobra.Command, args []string) error { namespace, _ := cmd.Flags().GetString("namespace") dstPath, _ := cmd.Flags().GetString("output") - publish, _ := cmd.Flags().GetBool("publish") ttl, _ := cmd.Flags().GetString("ttl") dataName, srcPath, err := dataio.ParseArgs(args) @@ -262,6 +261,16 @@ func Run(ctx context.Context, log *slog.Logger, cmd *cobra.Command, args []strin return err } + publishFlag, err := dataio.ParsePublishFlag(cmd.Flags()) + if err != nil { + return err + } + + publish, err := dataio.ResolvePublish(ctx, publishFlag, rtClient, sClient, log) + if err != nil { + return err + } + deName, err := util.CreateDataExporterIfNeededFunc(ctx, log, dataName, namespace, publish, ttl, rtClient) if err != nil { return err diff --git a/internal/data/dataexport/cmd/download/download_http_test.go b/internal/data/dataexport/cmd/download/download_http_test.go index c1b58d9a..bfe67ece 100644 --- a/internal/data/dataexport/cmd/download/download_http_test.go +++ b/internal/data/dataexport/cmd/download/download_http_test.go @@ -57,7 +57,7 @@ func TestDownloadFilesystem_OK(t *testing.T) { outFile := filepath.Join(t.TempDir(), "out.txt") cmd := NewCommand(context.TODO(), slog.Default()) - cmd.SetArgs([]string{"myexport", "foo.txt", "-o", outFile}) + cmd.SetArgs([]string{"myexport", "foo.txt", "-o", outFile, "--publish=false"}) var buf bytes.Buffer cmd.SetOut(&buf) cmd.SetErr(&buf) @@ -87,7 +87,7 @@ func TestDownloadFilesystem_BadPath(t *testing.T) { defer func() { util.PrepareDownloadFunc = origPrep; util.CreateDataExporterIfNeededFunc = origCreate }() cmd := NewCommand(context.TODO(), slog.Default()) - cmd.SetArgs([]string{"myexport", "foo.txt", "-o", filepath.Join(t.TempDir(), "out.txt")}) + cmd.SetArgs([]string{"myexport", "foo.txt", "-o", filepath.Join(t.TempDir(), "out.txt"), "--publish=false"}) require.NoError(t, cmd.Execute()) } @@ -115,7 +115,7 @@ func TestDownloadBlock_OK(t *testing.T) { outFile := filepath.Join(t.TempDir(), "raw.img") cmd := NewCommand(context.TODO(), slog.Default()) - cmd.SetArgs([]string{"myexport", "-o", outFile}) + cmd.SetArgs([]string{"myexport", "-o", outFile, "--publish=false"}) cmd.SetOut(io.Discard) cmd.SetErr(io.Discard) require.NoError(t, cmd.Execute()) @@ -141,7 +141,7 @@ func TestDownloadBlock_WrongEndpoint(t *testing.T) { defer func() { util.PrepareDownloadFunc = origPrep; util.CreateDataExporterIfNeededFunc = origCreate }() cmd := NewCommand(context.TODO(), slog.Default()) - cmd.SetArgs([]string{"myexport", "-o", filepath.Join(t.TempDir(), "raw.img")}) + cmd.SetArgs([]string{"myexport", "-o", filepath.Join(t.TempDir(), "raw.img"), "--publish=false"}) cmd.SetOut(io.Discard) cmd.SetErr(io.Discard) require.NoError(t, cmd.Execute()) diff --git a/internal/data/dataexport/cmd/list/list.go b/internal/data/dataexport/cmd/list/list.go index 165335a6..3e5de98e 100644 --- a/internal/data/dataexport/cmd/list/list.go +++ b/internal/data/dataexport/cmd/list/list.go @@ -162,7 +162,6 @@ func Run(ctx context.Context, log *slog.Logger, cmd *cobra.Command, args []strin defer cancel() namespace, _ := cmd.Flags().GetString("namespace") - publish, _ := cmd.Flags().GetBool("publish") ttl, _ := cmd.Flags().GetString("ttl") dataName, srcPath, err := parseArgs(args) @@ -181,6 +180,17 @@ func Run(ctx context.Context, log *slog.Logger, cmd *cobra.Command, args []strin if err != nil { return err } + + publishFlag, err := dataio.ParsePublishFlag(cmd.Flags()) + if err != nil { + return err + } + + publish, err := dataio.ResolvePublish(ctx, publishFlag, rtClient, sClient, log) + if err != nil { + return err + } + deName, err := util.CreateDataExporterIfNeededFunc(ctx, log, dataName, namespace, publish, ttl, rtClient) if err != nil { return err diff --git a/internal/data/dataexport/cmd/list/list_http_test.go b/internal/data/dataexport/cmd/list/list_http_test.go index bff3df82..1a8c5f3d 100644 --- a/internal/data/dataexport/cmd/list/list_http_test.go +++ b/internal/data/dataexport/cmd/list/list_http_test.go @@ -55,7 +55,7 @@ func TestListFilesystem_OK(t *testing.T) { os.Stdout = w cmd := NewCommand(context.TODO(), slog.Default()) - cmd.SetArgs([]string{"myexport", "/"}) + cmd.SetArgs([]string{"myexport", "/", "--publish=false"}) require.NoError(t, cmd.Execute()) w.Close() @@ -92,7 +92,7 @@ func TestListBlock_OK(t *testing.T) { os.Stdout = w cmd := NewCommand(context.TODO(), slog.Default()) - cmd.SetArgs([]string{"myexport"}) + cmd.SetArgs([]string{"myexport", "--publish=false"}) require.NoError(t, cmd.Execute()) w.Close() @@ -124,7 +124,8 @@ func TestListFilesystem_NotDir(t *testing.T) { cmd := NewCommand(context.TODO(), slog.Default()) cmd.SetOut(&bytes.Buffer{}) cmd.SetErr(&bytes.Buffer{}) - cmd.SetArgs([]string{"myexport", "some/invalid"}) + cmd.SetArgs([]string{"myexport", "some/invalid", "--publish=false"}) err := cmd.Execute() require.Error(t, err) + require.Contains(t, err.Error(), "invalid source path") } diff --git a/internal/data/dataexport/util/util.go b/internal/data/dataexport/util/util.go index d689182f..7ec97bc4 100644 --- a/internal/data/dataexport/util/util.go +++ b/internal/data/dataexport/util/util.go @@ -67,8 +67,9 @@ func GetDataExport(ctx context.Context, deName, namespace string, rtClient ctrlr return deObj, nil } -func GetDataExportWithRestart(ctx context.Context, deName, namespace string, rtClient ctrlrtclient.Client, log *slog.Logger) (*v1alpha1.DataExport, error) { +func GetDataExportWithRestart(ctx context.Context, deName, namespace string, publish bool, rtClient ctrlrtclient.Client, log *slog.Logger) (*v1alpha1.DataExport, error) { deObj := &v1alpha1.DataExport{} + publishReconciled := false for i := 0; ; i++ { var returnErr error @@ -79,6 +80,19 @@ func GetDataExportWithRestart(ctx context.Context, deName, namespace string, rtC return nil, fmt.Errorf("kube Get dataexport with restart: %s", err.Error()) } + // On the first iteration, reconcile Spec.Publish with the resolved value. + // If the object was patched, restart the loop to pick up the updated status. + if !publishReconciled { + patched, err := EnsureDataExportPublish(ctx, deObj, publish, rtClient) + if err != nil { + return nil, err + } + publishReconciled = true + if patched { + continue + } + } + for _, condition := range deObj.Status.Conditions { // restart DataExport if Expired if condition.Type == "Expired" { @@ -241,7 +255,7 @@ func getExportStatus(ctx context.Context, log *slog.Logger, deName, namespace st var podURL, volumeMode, internalCAData string log.Info("Waiting for DataExport to be ready", slog.String("name", deName), slog.String("namespace", namespace)) - deObj, err := GetDataExportWithRestart(ctx, deName, namespace, rtClient, log) + deObj, err := GetDataExportWithRestart(ctx, deName, namespace, public, rtClient, log) if err != nil { return "", "", "", err } @@ -318,3 +332,35 @@ func PrepareDownload(ctx context.Context, log *slog.Logger, deName, namespace st return url, volumeMode, subClient, nil } + +// EnsureDataExportPublish patches DataExport.Spec.Publish to match the resolved value. +// Only upgrades publish: false -> true is patched, true -> false is intentionally skipped +// to avoid downgrading already-published resources. +// Returns (true, nil) if the object was patched and the caller should re-read it. +func EnsureDataExportPublish( + ctx context.Context, + deObj *v1alpha1.DataExport, + publish bool, + rtClient ctrlrtclient.Client, +) (bool, error) { + if !publish { + return false, nil + } + + if deObj == nil { + return false, fmt.Errorf("nil DataExport object") + } + + if deObj.Spec.Publish == publish { + return false, nil + } + + patch := ctrlrtclient.MergeFrom(deObj.DeepCopy()) + deObj.Spec.Publish = publish + + if err := rtClient.Patch(ctx, deObj, patch); err != nil { + return false, fmt.Errorf("patch DataExport publish: %w", err) + } + + return true, nil +} diff --git a/internal/data/dataimport/cmd/create/create.go b/internal/data/dataimport/cmd/create/create.go index 38002832..d960c3b6 100644 --- a/internal/data/dataimport/cmd/create/create.go +++ b/internal/data/dataimport/cmd/create/create.go @@ -77,7 +77,6 @@ func Run(ctx context.Context, log *slog.Logger, cmd *cobra.Command, args []strin name := args[0] namespace, _ := cmd.Flags().GetString("namespace") ttl, _ := cmd.Flags().GetString("ttl") - publish, _ := cmd.Flags().GetBool("publish") pvcFilePath, _ := cmd.Flags().GetString("file") wffc, _ := cmd.Flags().GetBool("wffc") @@ -109,6 +108,16 @@ func Run(ctx context.Context, log *slog.Logger, cmd *cobra.Command, args []strin namespace = pvcSpec.Namespace } + publishFlag, err := dataio.ParsePublishFlag(cmd.Flags()) + if err != nil { + return err + } + + publish, err := dataio.ResolvePublish(ctx, publishFlag, rtClient, sc, log) + if err != nil { + return err + } + if err := util.CreateDataImport(ctx, name, namespace, ttl, publish, wffc, pvcSpec, rtClient); err != nil { return err } diff --git a/internal/data/dataimport/cmd/upload/upload.go b/internal/data/dataimport/cmd/upload/upload.go index 77958aed..a2509abc 100644 --- a/internal/data/dataimport/cmd/upload/upload.go +++ b/internal/data/dataimport/cmd/upload/upload.go @@ -17,6 +17,7 @@ import ( "github.com/spf13/cobra" dataio "github.com/deckhouse/deckhouse-cli/internal/data" + v1alpha1 "github.com/deckhouse/deckhouse-cli/internal/data/dataimport/api/v1alpha1" "github.com/deckhouse/deckhouse-cli/internal/data/dataimport/util" client "github.com/deckhouse/deckhouse-cli/pkg/libsaferequest/client" ) @@ -65,7 +66,6 @@ func cmdExamples() string { func Run(ctx context.Context, log *slog.Logger, cmd *cobra.Command, args []string) error { pathToFile, _ := cmd.Flags().GetString("file") chunks, _ := cmd.Flags().GetInt("chunks") - publish, _ := cmd.Flags().GetBool("publish") namespace, _ := cmd.Flags().GetString("namespace") dstPath, _ := cmd.Flags().GetString("dstPath") resume, _ := cmd.Flags().GetBool("resume") @@ -83,6 +83,22 @@ func Run(ctx context.Context, log *slog.Logger, cmd *cobra.Command, args []strin log.Info("Run") + // Create runtime client for publish auto-detection and reconciliation. + rtClient, err := httpClient.NewRTClient(v1alpha1.AddToScheme) + if err != nil { + return err + } + + publishFlag, err := dataio.ParsePublishFlag(cmd.Flags()) + if err != nil { + return err + } + + publish, err := dataio.ResolvePublish(ctx, publishFlag, rtClient, httpClient, log) + if err != nil { + return err + } + permOctal := defaultFilePermissions uid := os.Getuid() gid := os.Getgid() diff --git a/internal/data/dataimport/cmd/upload/upload_windows.go b/internal/data/dataimport/cmd/upload/upload_windows.go index 3b25553b..e0d9ca8e 100644 --- a/internal/data/dataimport/cmd/upload/upload_windows.go +++ b/internal/data/dataimport/cmd/upload/upload_windows.go @@ -13,10 +13,12 @@ import ( "strconv" "strings" + "github.com/spf13/cobra" + dataio "github.com/deckhouse/deckhouse-cli/internal/data" + v1alpha1 "github.com/deckhouse/deckhouse-cli/internal/data/dataimport/api/v1alpha1" "github.com/deckhouse/deckhouse-cli/internal/data/dataimport/util" client "github.com/deckhouse/deckhouse-cli/pkg/libsaferequest/client" - "github.com/spf13/cobra" ) const ( @@ -63,7 +65,6 @@ func cmdExamples() string { func Run(ctx context.Context, log *slog.Logger, cmd *cobra.Command, args []string) error { pathToFile, _ := cmd.Flags().GetString("file") chunks, _ := cmd.Flags().GetInt("chunks") - publish, _ := cmd.Flags().GetBool("publish") namespace, _ := cmd.Flags().GetString("namespace") dstPath, _ := cmd.Flags().GetString("dstPath") resume, _ := cmd.Flags().GetBool("resume") @@ -91,6 +92,22 @@ func Run(ctx context.Context, log *slog.Logger, cmd *cobra.Command, args []strin } } + // Create runtime client for publish auto-detection and reconciliation. + rtClient, err := httpClient.NewRTClient(v1alpha1.AddToScheme) + if err != nil { + return err + } + + publishFlag, err := dataio.ParsePublishFlag(cmd.Flags()) + if err != nil { + return err + } + + publish, err := dataio.ResolvePublish(ctx, publishFlag, rtClient, httpClient, log) + if err != nil { + return err + } + podUrl, _, subClient, err := util.PrepareUpload(ctx, diName, namespace, publish, httpClient, log) if err != nil { return err diff --git a/internal/data/dataimport/util/util.go b/internal/data/dataimport/util/util.go index 4e1440ee..e189c6b0 100644 --- a/internal/data/dataimport/util/util.go +++ b/internal/data/dataimport/util/util.go @@ -97,8 +97,11 @@ func GetDataImportWithRestart( ctx context.Context, diName, namespace string, rtClient ctrlrtclient.Client, + publish bool, log *slog.Logger, ) (*v1alpha1.DataImport, error) { + publishReconciled := false + for i := 0; ; i++ { if err := ctx.Err(); err != nil { return nil, err @@ -109,6 +112,19 @@ func GetDataImportWithRestart( return nil, fmt.Errorf("kube Get dataimport with ready: %s", err.Error()) } + // On the first iteration, reconcile Spec.Publish with the resolved value. + // If the object was patched, restart the loop to pick up the updated status. + if !publishReconciled { + patched, err := EnsureDataImportPublish(ctx, diObj, publish, rtClient) + if err != nil { + return nil, err + } + publishReconciled = true + if patched { + continue + } + } + var notReadyErr error for _, condition := range diObj.Status.Conditions { if condition.Type == "Expired" && condition.Status == "True" { @@ -188,7 +204,7 @@ func PrepareUpload( return "", "", nil, err } - diObj, err := GetDataImportWithRestart(ctx, diName, namespace, rtClient, log) + diObj, err := GetDataImportWithRestart(ctx, diName, namespace, rtClient, publish, log) if err != nil { return "", "", nil, err } @@ -237,6 +253,38 @@ func PrepareUpload( return url, volumeMode, subClient, nil } +// EnsureDataImportPublish patches DataImport.Spec.Publish to match the resolved value. +// Only upgrades publish: false -> true is patched, true -> false is intentionally skipped +// to avoid downgrading already-published resources. +// Returns (true, nil) if the object was patched and the caller should re-read it. +func EnsureDataImportPublish( + ctx context.Context, + diObj *v1alpha1.DataImport, + publish bool, + rtClient ctrlrtclient.Client, +) (bool, error) { + if !publish { + return false, nil + } + + if diObj == nil { + return false, fmt.Errorf("nil DataImport") + } + + if diObj.Spec.Publish == publish { + return false, nil + } + + patch := ctrlrtclient.MergeFrom(diObj.DeepCopy()) + diObj.Spec.Publish = publish + + if err := rtClient.Patch(ctx, diObj, patch); err != nil { + return false, fmt.Errorf("patch DataImport publish: %w", err) + } + + return true, nil +} + func CheckUploadProgress(ctx context.Context, httpClient *safeClient.SafeClient, targetURL string) (int64, error) { req, err := http.NewRequest(http.MethodHead, targetURL, nil) if err != nil { diff --git a/internal/data/publish.go b/internal/data/publish.go new file mode 100644 index 00000000..0ef81353 --- /dev/null +++ b/internal/data/publish.go @@ -0,0 +1,50 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataio + +import ( + "fmt" + + "github.com/spf13/pflag" +) + +// PublishFlag represents the three-state publish flag: +// - Explicit=true, Value=true: user explicitly requested public (published) access +// - Explicit=true, Value=false: user explicitly requested internal (in-cluster) access +// - Explicit=false: auto-detect mode (Value is meaningless) +type PublishFlag struct { + Explicit bool + Value bool +} + +// ParsePublishFlag reads --publish as a three-state value. +// Explicit is true only when user provided --publish/--publish=true/--publish=false. +func ParsePublishFlag(flags *pflag.FlagSet) (PublishFlag, error) { + if flags == nil { + return PublishFlag{}, fmt.Errorf("publish flag parse: nil flag set") + } + + value, err := flags.GetBool("publish") + if err != nil { + return PublishFlag{}, fmt.Errorf("publish flag parse: %w", err) + } + + return PublishFlag{ + Explicit: flags.Changed("publish"), + Value: value, + }, nil +} diff --git a/internal/data/publish_detect.go b/internal/data/publish_detect.go new file mode 100644 index 00000000..10d95e34 --- /dev/null +++ b/internal/data/publish_detect.go @@ -0,0 +1,284 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataio + +import ( + "context" + "crypto/tls" + "crypto/x509" + "errors" + "log/slog" + "net" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + ctrlrtclient "sigs.k8s.io/controller-runtime/pkg/client" + + safeClient "github.com/deckhouse/deckhouse-cli/pkg/libsaferequest/client" +) + +const ( + kubeServiceNamespace = "default" + kubeServiceName = "kubernetes" + kubeServiceServerName = "kubernetes.default.svc" + ProbeTimeout = 3 * time.Second +) + +var ErrAutoDetectWithHint = errors.New("cannot auto-detect publish mode, specify --publish=true or --publish=false") + +// ResolvePublish returns explicit publish value if user set the flag, +// otherwise runs autodetection. +func ResolvePublish( + ctx context.Context, + publishFlag PublishFlag, + rtClient ctrlrtclient.Client, + sClient *safeClient.SafeClient, + log *slog.Logger, +) (bool, error) { + if log == nil { + log = slog.Default() + } + + if publishFlag.Explicit { + // User set the flag, return value without autodetection. + log.Info("Using explicit publish mode", slog.Bool("publish", publishFlag.Value)) + return publishFlag.Value, nil + } + + // User didn't set the flag, run autodetection. + log.Info("Auto-detecting publish mode") + return DetectPublish(ctx, rtClient, sClient, log) +} + +// DetectPublish decides default publish mode when user did not set --publish. +// +// Detection strategy: +// 1. Read Service default/kubernetes via the normal kubeconfig endpoint. +// 2. Read the same Service via https://:443 with ServerName override. +// 3. Compare UIDs of both objects. +// +// Decision matrix: +// - same UID: internal path is reachable -> publish=false +// - UID mismatch: ClusterIP reached a different cluster (e.g. local minikube/kind) -> publish=true +// - network-unreachable on probe: internal path is not reachable -> publish=true +// - TLS/auth rejection on probe: ClusterIP reached a different server -> publish=true +// - any other probe error (transient 5xx, cancellation, deserialization): ambiguous -> fail fast with hint +func DetectPublish( + ctx context.Context, + rtClient ctrlrtclient.Client, + sClient *safeClient.SafeClient, + log *slog.Logger, +) (bool, error) { + if log == nil { + log = slog.Default() + } + + firstSvc, err := getKubeService(ctx, rtClient) + if err != nil { + return false, ErrAutoDetectWithHint + } + + targetURL := "https://" + net.JoinHostPort(firstSvc.Spec.ClusterIP, "443") + + // Clone the original client to avoid mutating command-wide kubeconfig settings. + // Keep auth/CA from kubeconfig, but switch endpoint to ClusterIP and set ServerName + // so TLS validation uses service DNS name instead of raw IP. + probeClient := sClient.Copy() + // Timeout in restConfig.Timeout is required in addition to context.WithTimeout below: + // context limits Go-level read/write, but restConfig.Timeout sets http.Client.Timeout + // which also covers TLS handshake and DNS resolve. Without it the HTTP client inherits + // the default kubeconfig timeout (typically 30s). + probeClient.SetProbeEndpoint(ProbeTimeout, targetURL, kubeServiceServerName) + probeRtClient, err := probeClient.NewRTClient() + + if err != nil { + return false, ErrAutoDetectWithHint + } + + // Probe timeout limits only autodetect latency + // main command context stays unchanged. + probeCtx, cancel := context.WithTimeout(ctx, ProbeTimeout) + defer cancel() + + secondSvc, err := getKubeService(probeCtx, probeRtClient) + if err != nil { + // Network-level failure means in-cluster endpoint is not reachable + // from current environment. + if isNetworkUnreachable(err) { + log.Info("Publish autodetect: internal endpoint is unreachable, selecting publish=true") + return true, nil + } + // TLS/auth/RBAC rejection: the first request via kubeconfig succeeded + // with the same credentials, so a rejection here means ClusterIP + // reached a different server. + if isProbeRejected(err) { + log.Info("Publish autodetect: internal endpoint rejected, selecting publish=true") + return true, nil + } + // Remaining errors are ambiguous - the probe endpoint may be the same + // server experiencing a transient issue: + // - context.Canceled: deliberate cancellation, not a detection result + // - apierrors 500 InternalError: transient API server failure + // - apierrors 503 ServiceUnavailable / ServerTimeout: server overloaded + // - apierrors 504 Timeout: API-level processing timeout + // - apierrors 429 TooManyRequests: rate limiting + // - apierrors 400 BadRequest, 404 NotFound: unexpected but not clearly "different server" + // - response deserialization errors (malformed JSON, unexpected content type) + return false, ErrAutoDetectWithHint + } + + // UID mismatch: both endpoints responded but belong to different clusters. + // Typical case: a local cluster (minikube, kind) has the same ClusterIP + // as the remote target cluster. The user is not inside the target cluster. + if firstSvc.UID != secondSvc.UID { + log.Info("Publish autodetect: UID mismatch between external and internal endpoints, selecting publish=true") + return true, nil + } + + // Same service identity via both paths -> internal endpoint is reachable. + log.Info("Publish autodetect: internal endpoint is reachable, selecting publish=false") + return false, nil +} + +// isNetworkUnreachable classifies transport-level failures that indicate +// the in-cluster endpoint is not reachable from the current environment. +// +// Returns true for errors that clearly mean "no network path to ClusterIP": +// - context.DeadlineExceeded: probe timed out waiting for any response +// - net.OpError: low-level socket failures (EHOSTUNREACH, ENETUNREACH, +// ECONNREFUSED, ETIMEDOUT, etc.) - all indicate the ClusterIP is not +// routable from here. +// - net.DNSError: DNS resolution failed for the target host +// - net.Error with Timeout(): any other network-level timeout +// +// Returns false for: +// - nil: no error +// - context.Canceled: deliberate cancellation, not a network issue +// - everything else (TLS, RBAC, HTTP-level errors): the endpoint is +// reachable but rejected the request - ambiguous for autodetect +func isNetworkUnreachable(err error) bool { + if err == nil { + return false + } + + if errors.Is(err, context.DeadlineExceeded) { + return true + } + + if errors.Is(err, context.Canceled) { + return false + } + + var opErr *net.OpError + if errors.As(err, &opErr) { + return true + } + + var dnsErr *net.DNSError + if errors.As(err, &dnsErr) { + return true + } + + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + return true + } + + return false +} + +// isProbeRejected classifies errors that indicate the ClusterIP endpoint +// is reachable but belongs to a different server than the kubeconfig endpoint. +// +// Precondition: the first request via kubeconfig already succeeded with the +// same CA, token, and RBAC permissions. If the probe to ClusterIP gets a TLS +// or auth rejection, it means ClusterIP reached a different server. +// +// Returns true for: +// - x509.UnknownAuthorityError: server certificate signed by a different CA +// - x509.CertificateInvalidError: server certificate expired, not yet valid, +// constraint violation, incompatible key usage, etc. +// (also covers errors wrapped in tls.CertificateVerificationError, which +// has Unwrap() so errors.As finds the inner x509 error automatically) +// - x509.HostnameError: server certificate CN/SAN doesn't match +// kubernetes.default.svc +// - tls.RecordHeaderError: server doesn't speak TLS at all +// (plain HTTP on HTTPS port, or a non-HTTP service) +// - tls.AlertError: server sent a TLS alert rejecting the handshake +// (bad_certificate, handshake_failure, protocol_version, unknown_ca, etc.) +// - apierrors 401 Unauthorized: server doesn't accept our token +// - apierrors 403 Forbidden: server has different RBAC rules +// +// Returns false for: +// - nil: no error +// - everything else: ambiguous, handled by the caller as ErrAutoDetectWithHint +func isProbeRejected(err error) bool { + if err == nil { + return false + } + + // TLS: certificate signed by unknown authority + var unknownAuthErr x509.UnknownAuthorityError + if errors.As(err, &unknownAuthErr) { + return true + } + + // TLS: certificate expired, not yet valid, etc. + var certInvalidErr x509.CertificateInvalidError + if errors.As(err, &certInvalidErr) { + return true + } + + // TLS: certificate CN/SAN doesn't match server name + var hostnameErr x509.HostnameError + if errors.As(err, &hostnameErr) { + return true + } + + // TLS: server doesn't speak TLS (plain HTTP on HTTPS port) + var recordHeaderErr tls.RecordHeaderError + if errors.As(err, &recordHeaderErr) { + return true + } + + // TLS: server reject handshake + var tlsAlertErr tls.AlertError + if errors.As(err, &tlsAlertErr) { + return true + } + + // Auth/RBAC: 401 or 403 from a different API server + if apierrors.IsUnauthorized(err) || apierrors.IsForbidden(err) { + return true + } + + return false +} + +func getKubeService(ctx context.Context, rtClient ctrlrtclient.Client) (*corev1.Service, error) { + var svc corev1.Service + if err := rtClient.Get(ctx, types.NamespacedName{ + Name: kubeServiceName, + Namespace: kubeServiceNamespace, + }, &svc); err != nil { + return nil, err + } + + return &svc, nil +} diff --git a/internal/data/publish_detect_test.go b/internal/data/publish_detect_test.go new file mode 100644 index 00000000..2bc05620 --- /dev/null +++ b/internal/data/publish_detect_test.go @@ -0,0 +1,214 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataio + +import ( + "context" + "errors" + "fmt" + "log/slog" + "net" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + kubescheme "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +// timeoutError implements net.Error with Timeout()=true but is neither +// net.OpError nor net.DNSError, so it exercises the final net.Error branch. +type timeoutError struct{} + +func (e *timeoutError) Error() string { return "i/o timeout" } +func (e *timeoutError) Timeout() bool { return true } +func (e *timeoutError) Temporary() bool { return false } + +// nonTimeoutNetError implements net.Error with Timeout()=false. +type nonTimeoutNetError struct{} + +func (e *nonTimeoutNetError) Error() string { return "net error" } +func (e *nonTimeoutNetError) Timeout() bool { return false } +func (e *nonTimeoutNetError) Temporary() bool { return false } + +func TestIsNetworkUnreachable(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + { + name: "nil error", + err: nil, + want: false, + }, + { + name: "deadline exceeded", + err: context.DeadlineExceeded, + want: true, + }, + { + name: "wrapped deadline exceeded", + err: fmt.Errorf("get service: %w", context.DeadlineExceeded), + want: true, + }, + { + name: "context canceled", + err: context.Canceled, + want: false, + }, + { + name: "wrapped context canceled", + err: fmt.Errorf("probe: %w", context.Canceled), + want: false, + }, + { + name: "net.OpError dial", + err: &net.OpError{Op: "dial", Net: "tcp", Err: errors.New("connection refused")}, + want: true, + }, + { + name: "net.OpError wrapped", + err: fmt.Errorf("probe: %w", &net.OpError{Op: "dial", Net: "tcp", Err: errors.New("no route to host")}), + want: true, + }, + { + name: "net.DNSError", + err: &net.DNSError{Err: "no such host", Name: "kubernetes.default.svc"}, + want: true, + }, + { + name: "net.DNSError wrapped", + err: fmt.Errorf("resolve: %w", &net.DNSError{Err: "server misbehaving", Name: "example.com"}), + want: true, + }, + { + name: "net.Error with timeout", + err: &timeoutError{}, + want: true, + }, + { + name: "net.Error without timeout", + err: &nonTimeoutNetError{}, + want: false, + }, + { + name: "generic error", + err: errors.New("something went wrong"), + want: false, + }, + { + name: "wrapped generic error", + err: fmt.Errorf("outer: %w", errors.New("inner")), + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := isNetworkUnreachable(tt.err) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestResolvePublish_Explicit(t *testing.T) { + ctx := context.Background() + log := slog.Default() + + tests := []struct { + name string + flag PublishFlag + want bool + }{ + { + name: "explicit true", + flag: PublishFlag{Explicit: true, Value: true}, + want: true, + }, + { + name: "explicit false", + flag: PublishFlag{Explicit: true, Value: false}, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // rtClient and sClient are nil — explicit path returns early. + got, err := ResolvePublish(ctx, tt.flag, nil, nil, log) + require.NoError(t, err) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestResolvePublish_NilLogger(t *testing.T) { + ctx := context.Background() + + got, err := ResolvePublish(ctx, PublishFlag{Explicit: true, Value: true}, nil, nil, nil) + require.NoError(t, err) + assert.True(t, got) +} + +func TestGetKubeService(t *testing.T) { + scheme := runtime.NewScheme() + require.NoError(t, kubescheme.AddToScheme(scheme)) + + ctx := context.Background() + + t.Run("service exists", func(t *testing.T) { + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: kubeServiceName, + Namespace: kubeServiceNamespace, + UID: types.UID("test-uid-123"), + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.0.1", + }, + } + c := fake.NewClientBuilder().WithScheme(scheme).WithObjects(svc).Build() + + got, err := getKubeService(ctx, c) + require.NoError(t, err) + assert.Equal(t, types.UID("test-uid-123"), got.UID) + assert.Equal(t, "10.96.0.1", got.Spec.ClusterIP) + }) + + t.Run("service not found", func(t *testing.T) { + c := fake.NewClientBuilder().WithScheme(scheme).Build() + + _, err := getKubeService(ctx, c) + require.Error(t, err) + }) +} + +func TestDetectPublish_ServiceNotFound(t *testing.T) { + scheme := runtime.NewScheme() + require.NoError(t, kubescheme.AddToScheme(scheme)) + + c := fake.NewClientBuilder().WithScheme(scheme).Build() + log := slog.Default() + + _, err := DetectPublish(context.Background(), c, nil, log) + require.ErrorIs(t, err, ErrAutoDetectWithHint) +} diff --git a/pkg/libsaferequest/client/http.go b/pkg/libsaferequest/client/http.go index c63e49a7..a448aaf2 100644 --- a/pkg/libsaferequest/client/http.go +++ b/pkg/libsaferequest/client/http.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "net/http" + "time" "github.com/spf13/pflag" apiruntime "k8s.io/apimachinery/pkg/runtime" @@ -57,6 +58,13 @@ func NewSafeClient(flags ...*pflag.FlagSet) (*SafeClient, error) { return &SafeClient{restConfig}, nil } +// SetProbeEndpoint configures host, TLS ServerName and timeout for probe requests. +func (c *SafeClient) SetProbeEndpoint(timeout time.Duration, targetHost, kubeServiceServerName string) { + c.restConfig.Host = targetHost + c.restConfig.TLSClientConfig.ServerName = kubeServiceServerName + c.restConfig.Timeout = timeout +} + func (c *SafeClient) HTTPDo(req *http.Request) (*http.Response, error) { if len(req.Header.Get("Authorization")) != 0 { httpClient, err := rest.HTTPClientFor(c.restConfig) @@ -132,7 +140,7 @@ func (c *SafeClient) HTTPDo(req *http.Request) (*http.Response, error) { return nil, errors.New("No auth") } -func (c *SafeClient) NewRTClient(schemeFuncs ...(func(s *apiruntime.Scheme) error)) (ctrlrtclient.Client, error) { +func (c *SafeClient) NewRTClient(schemeFuncs ...func(s *apiruntime.Scheme) error) (ctrlrtclient.Client, error) { if c.restConfig == nil { return nil, fmt.Errorf("No rest config") }