diff --git a/images/dvcr-artifact/pkg/registry/registry.go b/images/dvcr-artifact/pkg/registry/registry.go index 20e613e3ce..03fb693d9f 100644 --- a/images/dvcr-artifact/pkg/registry/registry.go +++ b/images/dvcr-artifact/pkg/registry/registry.go @@ -24,6 +24,7 @@ import ( "crypto/tls" "encoding/hex" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -40,7 +41,6 @@ import ( "github.com/google/go-containerregistry/pkg/v1/mutate" "github.com/google/go-containerregistry/pkg/v1/remote" "github.com/google/go-containerregistry/pkg/v1/stream" - "github.com/pkg/errors" "golang.org/x/sync/errgroup" "k8s.io/klog/v2" "kubevirt.io/containerized-data-importer/pkg/importer" @@ -140,17 +140,51 @@ func (p DataProcessor) Process(ctx context.Context) (ImportRes, error) { informer := NewImageInformer() - errsGroup, ctx := errgroup.WithContext(ctx) - errsGroup.Go(func() error { - return p.inspectAndStreamSourceImage(ctx, sourceImageFilename, sourceImageSize, progressMeter, pipeWriter, informer) - }) - errsGroup.Go(func() error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + errsCh := make(chan error, 2) + + go func() { + err := p.inspectAndStreamSourceImage( + ctx, sourceImageFilename, sourceImageSize, + progressMeter, pipeWriter, informer, + ) + if err != nil { + if errors.Is(err, io.ErrClosedPipe) { + klog.Infof("source streaming: pipe is closed by upload: %s", err.Error()) + err = nil + } else { + if ctx.Err() != context.Canceled { + cancel() + } + err = fmt.Errorf("source streaming error: %w", err) + klog.Errorln(err) + } + } + errsCh <- err + }() + + go func() { defer pipeReader.Close() - return p.uploadLayersAndImage(ctx, pipeReader, sourceImageSize, informer) - }) - err = errsGroup.Wait() - if err != nil { + err := p.uploadLayersAndImage( + ctx, pipeReader, sourceImageSize, informer, + ) + if err != nil { + if ctx.Err() != context.Canceled { + cancel() + } + err = fmt.Errorf("layers uploading error: %w", err) + klog.Errorln(err) + } + errsCh <- err + }() + + err1 := <-errsCh + err2 := <-errsCh + + if err := errors.Join(err1, err2); err != nil { return ImportRes{}, err } diff --git a/images/virtualization-artifact/pkg/controller/service/stat_service.go b/images/virtualization-artifact/pkg/controller/service/stat_service.go index efa67b5dcf..7974be0ab2 100644 --- a/images/virtualization-artifact/pkg/controller/service/stat_service.go +++ b/images/virtualization-artifact/pkg/controller/service/stat_service.go @@ -19,7 +19,11 @@ package service import ( "errors" "fmt" + "net/http" + "regexp" "strconv" + "strings" + "syscall" "time" corev1 "k8s.io/api/core/v1" @@ -99,9 +103,12 @@ func (s StatService) GetSize(pod *corev1.Pod) v1alpha2.ImageStatusSize { } var ( - ErrNotInitialized = errors.New("not initialized") - ErrNotScheduled = errors.New("not scheduled") - ErrProvisioningFailed = errors.New("provisioning failed") + ErrNotInitialized = errors.New("not initialized") + ErrNotScheduled = errors.New("not scheduled") + ErrProvisioningFailed = errors.New("provisioning failed") + ErrDVCRNoSpaceImageError = errors.New("DVCR is out of space; please contact the cluster administrator") + // ErrDVCRNoSpaceDiskError is intended to avoid confusion by clarifying that DVCR is needed as an intermediary when creating a virtual disk. + ErrDVCRNoSpaceDiskError = errors.New("DVCR is out of space to create the virtual disk; please contact the cluster administrator") ) func (s StatService) CheckPod(pod *corev1.Pod) error { @@ -125,6 +132,12 @@ func (s StatService) CheckPod(pod *corev1.Pod) error { } if report != nil && report.ErrMessage != "" { + if s.isDVCRNoSpaceError(report.ErrMessage) { + if strings.HasPrefix(pod.Name, "d8v-vd-") { + return fmt.Errorf("%w: %w", ErrProvisioningFailed, ErrDVCRNoSpaceDiskError) + } + return fmt.Errorf("%w: %w", ErrProvisioningFailed, ErrDVCRNoSpaceImageError) + } return fmt.Errorf("%w: Pod %s/%s termination message: %s", ErrProvisioningFailed, pod.Namespace, pod.Name, report.ErrMessage) } @@ -135,6 +148,22 @@ func (s StatService) CheckPod(pod *corev1.Pod) error { return nil } +func (s StatService) isDVCRNoSpaceError(terminationMessage string) bool { + dvcrSvc := "dvcr.d8-virtualization.svc" + + noSpaceErrorPattern := fmt.Sprintf("Err:%d", syscall.ENOSPC) + noDigitPattern := `\D` + re := regexp.MustCompile(noSpaceErrorPattern + noDigitPattern) + + if strings.Contains(terminationMessage, dvcrSvc) && + strings.Contains(terminationMessage, http.MethodPost) && + re.MatchString(terminationMessage) { + return true + } + + return false +} + func (s StatService) GetDownloadSpeed(ownerUID types.UID, pod *corev1.Pod) *v1alpha2.StatusSpeed { report, err := monitoring.GetFinalReportFromPod(pod) if err != nil && !errors.Is(err, monitoring.ErrTerminationMessageNotFound) {