diff --git a/cmd/containerd-shim-lcow-v2/main.go b/cmd/containerd-shim-lcow-v2/main.go index 9951144a8a..9cba0e8a25 100644 --- a/cmd/containerd-shim-lcow-v2/main.go +++ b/cmd/containerd-shim-lcow-v2/main.go @@ -1,4 +1,4 @@ -//go:build windows +//go:build windows && lcow // containerd-shim-lcow-v2 is a containerd shim implementation for Linux Containers on Windows (LCOW). package main @@ -16,6 +16,7 @@ import ( "github.com/Microsoft/hcsshim/internal/log" "github.com/Microsoft/hcsshim/internal/oc" "github.com/Microsoft/hcsshim/internal/shim" + "github.com/Microsoft/hcsshim/osversion" "github.com/containerd/errdefs" "github.com/sirupsen/logrus" @@ -42,6 +43,13 @@ func main() { os.Exit(1) } + // This shim is supported on Windows Build 26100 and later. + if osversion.Build() < osversion.V25H1Server { + _, _ = fmt.Fprintf(os.Stderr, + "%s: Windows version [%v] is not supported", service.ShimName, osversion.Build()) + os.Exit(1) + } + // Start the shim manager event loop. The manager is responsible for // handling containerd start/stop lifecycle calls for the shim process. shim.Run(context.Background(), newShimManager(service.ShimName), func(c *shim.Config) { diff --git a/cmd/containerd-shim-lcow-v2/manager.go b/cmd/containerd-shim-lcow-v2/manager.go index e0a997dcc5..d17397a8da 100644 --- a/cmd/containerd-shim-lcow-v2/manager.go +++ b/cmd/containerd-shim-lcow-v2/manager.go @@ -1,4 +1,4 @@ -//go:build windows +//go:build windows && lcow package main @@ -217,7 +217,7 @@ func (m *shimManager) Stop(_ context.Context, id string) (resp shim.StopStatus, if sys, _ := hcs.OpenComputeSystem(ctx, id); sys != nil { defer sys.Close() if err := sys.Terminate(ctx); err != nil { - fmt.Fprintf(os.Stderr, "failed to terminate '%s': %v", id, err) + fmt.Fprintf(os.Stderr, "failed to terminate %q: %v", id, err) } else { ch := make(chan error, 1) go func() { ch <- sys.Wait() }() @@ -225,11 +225,11 @@ func (m *shimManager) Stop(_ context.Context, id string) (resp shim.StopStatus, select { case <-t.C: sys.Close() - return resp, fmt.Errorf("timed out waiting for '%s' to terminate", id) + return resp, fmt.Errorf("timed out waiting for %q to terminate", id) case err := <-ch: t.Stop() if err != nil { - fmt.Fprintf(os.Stderr, "failed to wait for '%s' to terminate: %v", id, err) + fmt.Fprintf(os.Stderr, "failed to wait for %q to terminate: %v", id, err) } } } @@ -249,12 +249,12 @@ func (m *shimManager) Stop(_ context.Context, id string) (resp shim.StopStatus, func limitedRead(filePath string, readLimitBytes int64) ([]byte, error) { f, err := os.Open(filePath) if err != nil { - return nil, fmt.Errorf("limited read failed to open file: %s: %w", filePath, err) + return nil, fmt.Errorf("open file %s: %w", filePath, err) } defer f.Close() fi, err := f.Stat() if err != nil { - return []byte{}, fmt.Errorf("limited read failed during file stat: %s: %w", filePath, err) + return []byte{}, fmt.Errorf("stat file %s: %w", filePath, err) } if fi.Size() < readLimitBytes { readLimitBytes = fi.Size() @@ -262,7 +262,7 @@ func limitedRead(filePath string, readLimitBytes int64) ([]byte, error) { buf := make([]byte, readLimitBytes) _, err = f.Read(buf) if err != nil { - return []byte{}, fmt.Errorf("limited read failed during file read: %s: %w", filePath, err) + return []byte{}, fmt.Errorf("read file %s: %w", filePath, err) } return buf, nil } diff --git a/cmd/containerd-shim-lcow-v2/manager_test.go b/cmd/containerd-shim-lcow-v2/manager_test.go index fd93692299..c080040233 100644 --- a/cmd/containerd-shim-lcow-v2/manager_test.go +++ b/cmd/containerd-shim-lcow-v2/manager_test.go @@ -1,4 +1,4 @@ -//go:build windows +//go:build windows && lcow package main diff --git a/cmd/containerd-shim-lcow-v2/service/plugin/plugin.go b/cmd/containerd-shim-lcow-v2/service/plugin/plugin.go index dd7c1124ae..84b490c189 100644 --- a/cmd/containerd-shim-lcow-v2/service/plugin/plugin.go +++ b/cmd/containerd-shim-lcow-v2/service/plugin/plugin.go @@ -1,4 +1,4 @@ -//go:build windows +//go:build windows && lcow package plugin @@ -7,6 +7,7 @@ import ( "os" "github.com/Microsoft/hcsshim/cmd/containerd-shim-lcow-v2/service" + "github.com/Microsoft/hcsshim/internal/logfields" "github.com/Microsoft/hcsshim/internal/shim" "github.com/Microsoft/hcsshim/internal/shimdiag" hcsversion "github.com/Microsoft/hcsshim/internal/version" @@ -30,7 +31,7 @@ const ( var svc *service.Service func init() { - // Provider ID: 64F6FC7F-8326-5EE8-B890-3734AE584136 + // Provider ID: A6BD4B70-8A0B-5913-5C8E-E2780DC7F06F // Provider and hook aren't closed explicitly, as they will exist until process exit. provider, err := etw.NewProvider(etwProviderName, etwCallback) if err != nil { @@ -106,7 +107,7 @@ func etwCallback(sourceID guid.GUID, state etw.ProviderState, level etw.Level, m if err != nil { return } - log := logrus.WithField("sandboxID", svc.SandboxID()) + log := logrus.WithField(logfields.SandboxID, svc.SandboxID()) log.WithField("stack", resp.Stacks).Info("goroutine stack dump") if resp.GuestStacks != "" { log.WithField("stack", resp.GuestStacks).Info("guest stack dump") diff --git a/cmd/containerd-shim-lcow-v2/service/service.go b/cmd/containerd-shim-lcow-v2/service/service.go index 3c2968c53f..1181efca14 100644 --- a/cmd/containerd-shim-lcow-v2/service/service.go +++ b/cmd/containerd-shim-lcow-v2/service/service.go @@ -1,22 +1,25 @@ -//go:build windows +//go:build windows && lcow package service import ( "context" + "fmt" "sync" "github.com/Microsoft/hcsshim/internal/builder/vm/lcow" + "github.com/Microsoft/hcsshim/internal/controller/pod" "github.com/Microsoft/hcsshim/internal/controller/vm" "github.com/Microsoft/hcsshim/internal/log" "github.com/Microsoft/hcsshim/internal/shim" "github.com/Microsoft/hcsshim/internal/shimdiag" sandboxsvc "github.com/containerd/containerd/api/runtime/sandbox/v1" - tasksvc "github.com/containerd/containerd/api/runtime/task/v3" + tasksvc "github.com/containerd/containerd/api/runtime/task/v2" "github.com/containerd/containerd/v2/core/runtime" "github.com/containerd/containerd/v2/pkg/namespaces" "github.com/containerd/containerd/v2/pkg/shutdown" + "github.com/containerd/errdefs" "github.com/containerd/ttrpc" ) @@ -29,7 +32,7 @@ const ( // All Service methods (sandbox, task, and shimdiag) operate on this shared struct. type Service struct { // mu is used to synchronize access to shared state within the Service. - mu sync.Mutex + mu sync.RWMutex // publisher is used to publish events from the shim to containerd. publisher shim.Publisher @@ -45,7 +48,15 @@ type Service struct { sandboxOptions *lcow.SandboxOptions // vmController is responsible for managing the lifecycle of the underlying utility VM and its associated resources. - vmController vm.Controller + vmController *vm.Controller + + // podControllers maps podID -> PodController for each active pod. + podControllers map[string]*pod.Controller + + // containerPodMapping maps containerID -> podID, allowing callers to look up + // which pod a container belongs to and then retrieve the corresponding controller + // from podControllers. + containerPodMapping map[string]string // shutdown manages graceful shutdown operations and allows registration of cleanup callbacks. shutdown shutdown.Service @@ -56,10 +67,12 @@ var _ shim.TTRPCService = (*Service)(nil) // NewService creates a new instance of the Service with the shared state. func NewService(ctx context.Context, eventsPublisher shim.Publisher, sd shutdown.Service) *Service { svc := &Service{ - publisher: eventsPublisher, - events: make(chan interface{}, 128), // Buffered channel for events - vmController: vm.NewController(), - shutdown: sd, + publisher: eventsPublisher, + events: make(chan interface{}, 128), // Buffered channel for events + vmController: vm.New(), + podControllers: make(map[string]*pod.Controller), + containerPodMapping: make(map[string]string), + shutdown: sd, } go svc.forward(ctx, eventsPublisher) @@ -83,12 +96,20 @@ func NewService(ctx context.Context, eventsPublisher shim.Publisher, sd shutdown // RegisterTTRPC registers the Task, Sandbox, and ShimDiag TTRPC services on // the provided server so that containerd can call into the shim over TTRPC. func (s *Service) RegisterTTRPC(server *ttrpc.Server) error { - tasksvc.RegisterTTRPCTaskService(server, s) + tasksvc.RegisterTaskService(server, s) sandboxsvc.RegisterTTRPCSandboxService(server, s) shimdiag.RegisterShimDiagService(server, s) return nil } +// ensureVMRunning returns an error if the VM is not in the running state. +func (s *Service) ensureVMRunning() error { + if state := s.vmController.State(); state != vm.StateRunning { + return fmt.Errorf("vm is not running (state: %s): %w", state, errdefs.ErrFailedPrecondition) + } + return nil +} + // SandboxID returns the unique identifier for the sandbox managed by this Service. func (s *Service) SandboxID() string { return s.sandboxID @@ -96,10 +117,6 @@ func (s *Service) SandboxID() string { // send enqueues an event onto the internal events channel so that it can be // forwarded to containerd asynchronously by the forward goroutine. -// -// TODO: wire up send() for task events once task lifecycle methods are implemented. -// -//nolint:unused func (s *Service) send(evt interface{}) { s.events <- evt } diff --git a/cmd/containerd-shim-lcow-v2/service/service_sandbox.go b/cmd/containerd-shim-lcow-v2/service/service_sandbox.go index 3e2a3ea437..9e433cb3c0 100644 --- a/cmd/containerd-shim-lcow-v2/service/service_sandbox.go +++ b/cmd/containerd-shim-lcow-v2/service/service_sandbox.go @@ -1,4 +1,4 @@ -//go:build windows +//go:build windows && lcow package service diff --git a/cmd/containerd-shim-lcow-v2/service/service_sandbox_internal.go b/cmd/containerd-shim-lcow-v2/service/service_sandbox_internal.go index f8cd3dfd97..2ca22ebd9a 100644 --- a/cmd/containerd-shim-lcow-v2/service/service_sandbox_internal.go +++ b/cmd/containerd-shim-lcow-v2/service/service_sandbox_internal.go @@ -1,4 +1,4 @@ -//go:build windows +//go:build windows && lcow package service @@ -124,7 +124,7 @@ func (s *Service) startSandboxInternal(ctx context.Context, request *sandbox.Sta } } - // VM controller ensures that only once of the Start call goes through. + // VM controller ensures that only one of the Start calls goes through. err := s.vmController.StartVM(ctx, &vm.StartOptions{ GCSServiceID: winio.VsockServiceID(prot.LinuxGcsVsockPort), ConfidentialOptions: confidentialOpts, @@ -264,7 +264,7 @@ func (s *Service) pingSandboxInternal(_ context.Context, _ *sandbox.PingRequest) // The sandbox must already be in the stopped state before shutdown is accepted. func (s *Service) shutdownSandboxInternal(ctx context.Context, request *sandbox.ShutdownSandboxRequest) (*sandbox.ShutdownSandboxResponse, error) { if s.sandboxID != request.SandboxID { - return &sandbox.ShutdownSandboxResponse{}, fmt.Errorf("sandbox ID mismatch, expected %s, got %s", s.sandboxID, request.SandboxID) + return nil, fmt.Errorf("sandbox ID mismatch, expected %s, got %s", s.sandboxID, request.SandboxID) } // Ensure the VM is terminated. If the VM is already terminated, @@ -298,17 +298,17 @@ func (s *Service) shutdownSandboxInternal(ctx context.Context, request *sandbox. // It collects and returns runtime statistics from the vmController. func (s *Service) sandboxMetricsInternal(ctx context.Context, request *sandbox.SandboxMetricsRequest) (*sandbox.SandboxMetricsResponse, error) { if s.sandboxID != request.SandboxID { - return &sandbox.SandboxMetricsResponse{}, fmt.Errorf("sandbox ID mismatch, expected %s, got %s", s.sandboxID, request.SandboxID) + return nil, fmt.Errorf("sandbox ID mismatch, expected %s, got %s", s.sandboxID, request.SandboxID) } stats, err := s.vmController.Stats(ctx) if err != nil { - return &sandbox.SandboxMetricsResponse{}, fmt.Errorf("failed to get sandbox metrics: %w", err) + return nil, fmt.Errorf("failed to get sandbox metrics: %w", err) } anyStat, err := typeurl.MarshalAny(stats) if err != nil { - return &sandbox.SandboxMetricsResponse{}, fmt.Errorf("failed to marshal sandbox metrics: %w", err) + return nil, fmt.Errorf("failed to marshal sandbox metrics: %w", err) } return &sandbox.SandboxMetricsResponse{ diff --git a/cmd/containerd-shim-lcow-v2/service/service_shimdiag.go b/cmd/containerd-shim-lcow-v2/service/service_shimdiag.go index 503982d59e..b17a0d3657 100644 --- a/cmd/containerd-shim-lcow-v2/service/service_shimdiag.go +++ b/cmd/containerd-shim-lcow-v2/service/service_shimdiag.go @@ -1,4 +1,4 @@ -//go:build windows +//go:build windows && lcow package service @@ -7,11 +7,13 @@ import ( "os" "strings" + "github.com/Microsoft/hcsshim/internal/log" "github.com/Microsoft/hcsshim/internal/logfields" "github.com/Microsoft/hcsshim/internal/oc" "github.com/Microsoft/hcsshim/internal/shimdiag" "github.com/containerd/errdefs/pkg/errgrpc" + "github.com/sirupsen/logrus" "go.opencensus.io/trace" ) @@ -34,6 +36,9 @@ func (s *Service) DiagExecInHost(ctx context.Context, request *shimdiag.ExecProc trace.StringAttribute(logfields.Stdout, request.Stdout), trace.StringAttribute(logfields.Stderr, request.Stderr)) + // Set the sandbox ID in the logger context for all subsequent logs in this request. + ctx, _ = log.WithContext(ctx, logrus.WithField(logfields.SandboxID, s.sandboxID)) + r, e := s.diagExecInHostInternal(ctx, request) return r, errgrpc.ToGRPC(e) } @@ -49,6 +54,9 @@ func (s *Service) DiagTasks(ctx context.Context, request *shimdiag.TasksRequest) trace.StringAttribute(logfields.SandboxID, s.sandboxID), trace.BoolAttribute(logfields.Execs, request.Execs)) + // Set the sandbox ID in the logger context for all subsequent logs in this request. + ctx, _ = log.WithContext(ctx, logrus.WithField(logfields.SandboxID, s.sandboxID)) + r, e := s.diagTasksInternal(ctx, request) return r, errgrpc.ToGRPC(e) } @@ -66,20 +74,26 @@ func (s *Service) DiagShare(ctx context.Context, request *shimdiag.ShareRequest) trace.StringAttribute(logfields.UVMPath, request.UvmPath), trace.BoolAttribute(logfields.ReadOnly, request.ReadOnly)) + // Set the sandbox ID in the logger context for all subsequent logs in this request. + ctx, _ = log.WithContext(ctx, logrus.WithField(logfields.SandboxID, s.sandboxID)) + r, e := s.diagShareInternal(ctx, request) return r, errgrpc.ToGRPC(e) } // DiagStacks returns the stack traces of all goroutines in the shim. // This method is part of the instrumentation layer and business logic is included in diagStacksInternal. -func (s *Service) DiagStacks(ctx context.Context, request *shimdiag.StacksRequest) (resp *shimdiag.StacksResponse, err error) { +func (s *Service) DiagStacks(ctx context.Context, _ *shimdiag.StacksRequest) (resp *shimdiag.StacksResponse, err error) { ctx, span := oc.StartSpan(ctx, "DiagStacks") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() span.AddAttributes(trace.StringAttribute(logfields.SandboxID, s.sandboxID)) - r, e := s.diagStacksInternal(ctx, request) + // Set the sandbox ID in the logger context for all subsequent logs in this request. + ctx, _ = log.WithContext(ctx, logrus.WithField(logfields.SandboxID, s.sandboxID)) + + r, e := s.diagStacksInternal(ctx) return r, errgrpc.ToGRPC(e) } diff --git a/cmd/containerd-shim-lcow-v2/service/service_shimdiag_internal.go b/cmd/containerd-shim-lcow-v2/service/service_shimdiag_internal.go index a835ade320..2144fcc33f 100644 --- a/cmd/containerd-shim-lcow-v2/service/service_shimdiag_internal.go +++ b/cmd/containerd-shim-lcow-v2/service/service_shimdiag_internal.go @@ -1,19 +1,28 @@ -//go:build windows +//go:build windows && lcow package service import ( "context" "fmt" + "os" + "path/filepath" + "runtime" + "time" + plan9Mount "github.com/Microsoft/hcsshim/internal/controller/device/plan9/mount" + "github.com/Microsoft/hcsshim/internal/controller/device/plan9/share" "github.com/Microsoft/hcsshim/internal/shimdiag" - "github.com/containerd/errdefs" ) // diagExecInHostInternal is the implementation for DiagExecInHost. // // It is used to create an exec session into the hosting UVM. func (s *Service) diagExecInHostInternal(ctx context.Context, request *shimdiag.ExecProcessRequest) (*shimdiag.ExecProcessResponse, error) { + if err := s.ensureVMRunning(); err != nil { + return nil, err + } + ec, err := s.vmController.ExecIntoHost(ctx, request) if err != nil { return nil, fmt.Errorf("failed to exec into host: %w", err) @@ -22,14 +31,122 @@ func (s *Service) diagExecInHostInternal(ctx context.Context, request *shimdiag. return &shimdiag.ExecProcessResponse{ExitCode: int32(ec)}, nil } -func (s *Service) diagTasksInternal(_ context.Context, _ *shimdiag.TasksRequest) (*shimdiag.TasksResponse, error) { - return nil, errdefs.ErrNotImplemented +// diagTasksInternal is the implementation for DiagTasks. +// +// It returns all tasks running in the UVM across all pods. +func (s *Service) diagTasksInternal(_ context.Context, request *shimdiag.TasksRequest) (*shimdiag.TasksResponse, error) { + if err := s.ensureVMRunning(); err != nil { + return nil, err + } + + // Originally this method was intended to be used in a single pod setup and therefore, + // we do not specify a TaskID in the request. Since this shim can support multiple pods, + // we will return all tasks running in the UVM, regardless of which pod they belong to. + + resp := &shimdiag.TasksResponse{} + + // This is a diagnostic method and therefore, locking for entire duration + // should not have performance implications in prod. + s.mu.Lock() + defer s.mu.Unlock() + + // For all pods, get all the containers. + for _, podCtrl := range s.podControllers { + containers := podCtrl.ListContainers() + + // For each container, get their processes and status. + for containerID, ctrCtrl := range containers { + t := &shimdiag.Task{ID: containerID} + + if request.Execs { + processes, err := ctrCtrl.ListProcesses() + if err != nil { + return nil, fmt.Errorf("failed to list processes for container %s: %w", containerID, err) + } + + for _, proc := range processes { + status := proc.Status(false) + t.Execs = append(t.Execs, &shimdiag.Exec{ + ID: status.ExecID, + State: status.Status.String(), + }) + } + } + + resp.Tasks = append(resp.Tasks, t) + } + } + + return resp, nil } -func (s *Service) diagShareInternal(_ context.Context, _ *shimdiag.ShareRequest) (*shimdiag.ShareResponse, error) { - return nil, errdefs.ErrNotImplemented +// diagShareInternal is the implementation for DiagShare. +// +// It shares a host path into the guest VM via the plan9 controller. +func (s *Service) diagShareInternal(ctx context.Context, request *shimdiag.ShareRequest) (*shimdiag.ShareResponse, error) { + if err := s.ensureVMRunning(); err != nil { + return nil, err + } + + fileInfo, err := os.Stat(request.HostPath) + if err != nil { + return nil, fmt.Errorf("failed to open source path %s: %w", request.HostPath, err) + } + + shareConfig := share.Config{ + HostPath: request.HostPath, + ReadOnly: request.ReadOnly, + } + + if !fileInfo.IsDir() { + // Map the containing directory in, but restrict the share to a single file. + hostPath, fileName := filepath.Split(request.HostPath) + shareConfig.HostPath = hostPath + shareConfig.Restrict = true + shareConfig.AllowedNames = append(shareConfig.AllowedNames, fileName) + } + + ctrl := s.vmController.Plan9Controller() + + reservationID, err := ctrl.Reserve(ctx, shareConfig, plan9Mount.Config{ReadOnly: request.ReadOnly}) + if err != nil { + return nil, fmt.Errorf("failed to reserve plan9 resource for request %+v: %w", request, err) + } + + _, err = ctrl.MapToGuest(ctx, reservationID) + if err != nil { + return nil, fmt.Errorf("failed to map guest resource for request %+v: %w", request, err) + } + + return &shimdiag.ShareResponse{}, nil } -func (s *Service) diagStacksInternal(_ context.Context, _ *shimdiag.StacksRequest) (*shimdiag.StacksResponse, error) { - return nil, errdefs.ErrNotImplemented +// diagStacksInternal is the implementation for DiagStacks. +// +// It collects goroutine stacks from the host shim and the guest VM. +func (s *Service) diagStacksInternal(ctx context.Context) (*shimdiag.StacksResponse, error) { + if err := s.ensureVMRunning(); err != nil { + return nil, err + } + + buf := make([]byte, 4096) + for { + buf = buf[:runtime.Stack(buf, true)] + if len(buf) < cap(buf) { + break + } + buf = make([]byte, 2*len(buf)) + } + + timedCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + resp := &shimdiag.StacksResponse{Stacks: string(buf)} + stacks, err := s.vmController.DumpStacks(timedCtx) + if err != nil { + return nil, fmt.Errorf("failed to dump stacks: %w", err) + } + + resp.GuestStacks = stacks + return resp, nil } diff --git a/cmd/containerd-shim-lcow-v2/service/service_task.go b/cmd/containerd-shim-lcow-v2/service/service_task.go index f7f7dda5af..52d4c5b340 100644 --- a/cmd/containerd-shim-lcow-v2/service/service_task.go +++ b/cmd/containerd-shim-lcow-v2/service/service_task.go @@ -1,21 +1,22 @@ -//go:build windows +//go:build windows && lcow package service import ( "context" + "os" "github.com/Microsoft/hcsshim/internal/logfields" "github.com/Microsoft/hcsshim/internal/oc" - "github.com/containerd/containerd/api/runtime/task/v3" + "github.com/containerd/containerd/api/runtime/task/v2" "github.com/containerd/errdefs/pkg/errgrpc" "go.opencensus.io/trace" "google.golang.org/protobuf/types/known/emptypb" ) // Ensure Service implements the TTRPCTaskService interface at compile time. -var _ task.TTRPCTaskService = &Service{} +var _ task.TaskService = &Service{} // State returns the current state of a task or process. // This method is part of the instrumentation layer and business logic is included in stateInternal. @@ -319,8 +320,13 @@ func (s *Service) Connect(ctx context.Context, request *task.ConnectRequest) (re trace.StringAttribute(logfields.SandboxID, s.sandboxID), trace.StringAttribute(logfields.ID, request.ID)) - r, e := s.connectInternal(ctx, request) - return r, errgrpc.ToGRPC(e) + // We treat the shim/task as the same pid on the Windows host. + pid := uint32(os.Getpid()) + + return &task.ConnectResponse{ + ShimPid: pid, + TaskPid: pid, + }, nil } // Shutdown gracefully shuts down the Service. diff --git a/cmd/containerd-shim-lcow-v2/service/service_task_internal.go b/cmd/containerd-shim-lcow-v2/service/service_task_internal.go index 254199873b..5c21bf7ab9 100644 --- a/cmd/containerd-shim-lcow-v2/service/service_task_internal.go +++ b/cmd/containerd-shim-lcow-v2/service/service_task_internal.go @@ -1,79 +1,669 @@ -//go:build windows +//go:build windows && lcow package service import ( "context" + "encoding/json" + "fmt" + "os" + "path/filepath" - "github.com/containerd/containerd/api/runtime/task/v3" + "github.com/Microsoft/hcsshim/internal/controller/linuxcontainer" + "github.com/Microsoft/hcsshim/internal/controller/network" + "github.com/Microsoft/hcsshim/internal/controller/pod" + "github.com/Microsoft/hcsshim/internal/controller/process" + "github.com/Microsoft/hcsshim/internal/hcs" + "github.com/Microsoft/hcsshim/internal/log" + "github.com/Microsoft/hcsshim/internal/logfields" + "github.com/Microsoft/hcsshim/internal/oci" + + "github.com/Microsoft/hcsshim/pkg/ctrdtaskapi" + eventstypes "github.com/containerd/containerd/api/events" + "github.com/containerd/containerd/api/runtime/task/v2" "github.com/containerd/errdefs" + "github.com/containerd/typeurl/v2" + "github.com/opencontainers/runtime-spec/specs-go" + "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/types/known/emptypb" ) -func (s *Service) stateInternal(_ context.Context, _ *task.StateRequest) (*task.StateResponse, error) { - return nil, errdefs.ErrNotImplemented +// getContainerController looks up the container controller for the given container ID. +func (s *Service) getContainerController(containerID string) (*linuxcontainer.Controller, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + // Resolve the owning pod ID from the container-to-pod mapping. + podID, ok := s.containerPodMapping[containerID] + if !ok { + return nil, fmt.Errorf("container %s not found: %w", containerID, errdefs.ErrNotFound) + } + + // Fetch the pod controller responsible for this pod. + podCtrl, ok := s.podControllers[podID] + if !ok { + return nil, fmt.Errorf("pod controller for pod %s not found: %w", podID, errdefs.ErrNotFound) + } + + // Retrieve the container controller from the pod. + ctrCtrl, err := podCtrl.GetContainer(containerID) + if err != nil { + return nil, fmt.Errorf("failed to get container controller for container %s in pod %s: %w", containerID, podID, err) + } + + return ctrCtrl, nil } -func (s *Service) createInternal(_ context.Context, _ *task.CreateTaskRequest) (*task.CreateTaskResponse, error) { - return nil, errdefs.ErrNotImplemented +// getPodController returns pod controller for given pod ID. +func (s *Service) getPodController(podID string) (*pod.Controller, bool) { + s.mu.RLock() + defer s.mu.RUnlock() + + ctrl, ok := s.podControllers[podID] + return ctrl, ok } -func (s *Service) startInternal(_ context.Context, _ *task.StartRequest) (*task.StartResponse, error) { - return nil, errdefs.ErrNotImplemented +// stateInternal returns the current status of a process within a container. +func (s *Service) stateInternal(_ context.Context, request *task.StateRequest) (*task.StateResponse, error) { + if err := s.ensureVMRunning(); err != nil { + return nil, err + } + + // Look up the container controller for the requested container. + ctrCtrl, err := s.getContainerController(request.ID) + if err != nil { + return nil, fmt.Errorf("failed to find container for state request: %w", err) + } + + // Retrieve the process controller for the target exec (or init) process. + proc, err := ctrCtrl.GetProcess(request.ExecID) + if err != nil { + return nil, fmt.Errorf("failed to get process (execID=%q) in container %s: %w", request.ExecID, request.ID, err) + } + + // Return the current status snapshot for the process. + return proc.Status(true), nil } -func (s *Service) deleteInternal(_ context.Context, _ *task.DeleteRequest) (*task.DeleteResponse, error) { - return nil, errdefs.ErrNotImplemented +// createInternal creates a new pod sandbox or workload container based on the OCI spec. +func (s *Service) createInternal(ctx context.Context, request *task.CreateTaskRequest) (*task.CreateTaskResponse, error) { + if err := s.ensureVMRunning(); err != nil { + return nil, err + } + + // Parse the OCI spec from the bundle. + var spec specs.Spec + f, err := os.Open(filepath.Join(request.Bundle, "config.json")) + if err != nil { + return nil, fmt.Errorf("failed to open config.json: %w", err) + } + if err := json.NewDecoder(f).Decode(&spec); err != nil { + _ = f.Close() + return nil, fmt.Errorf("failed to decode config.json: %w", err) + } + _ = f.Close() + + // Determine the sandbox type and ID. + // Sandbox type can be "sandbox" or "container". + ct, sid, err := oci.GetSandboxTypeAndID(spec.Annotations) + if err != nil { + return nil, err + } + + s.mu.Lock() + defer s.mu.Unlock() + + var ctrCtrl *linuxcontainer.Controller + + switch ct { + case oci.KubernetesContainerTypeSandbox: + // This is a pod creation request. Create a new pod controller. + if _, ok := s.podControllers[sid]; ok { + return nil, fmt.Errorf("pod controller for pod %s already exists: %w", sid, errdefs.ErrAlreadyExists) + } + + // Validate that required config fields are present for a sandbox. + if spec.Windows == nil || spec.Windows.Network == nil { + return nil, fmt.Errorf("spec is missing required Windows network configuration: %w", errdefs.ErrInvalidArgument) + } + + // If any unsupported param is specified, return an explicit error. + if len(spec.Windows.Network.EndpointList) > 0 { + return nil, fmt.Errorf("spec has unsupported network configuration: endpoints should not be part of spec: %w", errdefs.ErrInvalidArgument) + } + + // Create a new pod. + podCtrl := pod.New(sid, s.vmController) + + // Setup network for the pod based on the provided namespace. + err = podCtrl.SetupNetwork(ctx, &network.SetupOptions{ + NetworkNamespace: spec.Windows.Network.NetworkNamespace, + PolicyBasedRouting: s.sandboxOptions.PolicyBasedRouting, + }) + if err != nil { + // No cleanup on failure since containerd will send a Delete request. + return nil, fmt.Errorf("failed to setup network for pod %s: %w", sid, err) + } + + // Store the created controller in the map. + s.podControllers[sid] = podCtrl + + // Create a container within the pod with the same ID as the pod. + ctrCtrl, err = podCtrl.NewContainer(ctx, request.ID) + if err != nil { + return nil, fmt.Errorf("failed to create sandbox container %s in pod %s: %w", request.ID, sid, err) + } + + s.containerPodMapping[request.ID] = sid + + case oci.KubernetesContainerTypeContainer: + // This is a regular container creation request. Look up the existing pod. + podCtrl, ok := s.podControllers[sid] + if !ok { + return nil, fmt.Errorf("pod controller for pod %s not found: %w", sid, errdefs.ErrNotFound) + } + + // Create a container within the pod with the provided ID. + ctrCtrl, err = podCtrl.NewContainer(ctx, request.ID) + if err != nil { + return nil, fmt.Errorf("failed to create container %s in pod %s: %w", request.ID, sid, err) + } + + s.containerPodMapping[request.ID] = sid + + default: + return nil, fmt.Errorf("unsupported container type %q: %w", ct, errdefs.ErrInvalidArgument) + } + + // Call Create on the container controller. + if err := ctrCtrl.Create( + ctx, + &spec, + request, + &linuxcontainer.CreateOpts{ + IsScratchEncryptionEnabled: s.sandboxOptions.EnableScratchEncryption, + }, + ); err != nil { + return nil, fmt.Errorf("failed to create container %s: %w", request.ID, err) + } + + // Get the init process pid to return in the response. + initProc, err := ctrCtrl.GetProcess("") + if err != nil { + return nil, fmt.Errorf("failed to get init process for container %s: %w", request.ID, err) + } + + // Publish the TaskCreate event to notify containerd that the container has been created. + s.send(&eventstypes.TaskCreate{ + ContainerID: request.ID, + Bundle: request.Bundle, + Rootfs: request.Rootfs, + IO: &eventstypes.TaskIO{ + Stdin: request.Stdin, + Stdout: request.Stdout, + Stderr: request.Stderr, + Terminal: request.Terminal, + }, + Pid: uint32(initProc.Pid()), + }) + + return &task.CreateTaskResponse{ + Pid: uint32(initProc.Pid()), + }, nil } -func (s *Service) pidsInternal(_ context.Context, _ *task.PidsRequest) (*task.PidsResponse, error) { - return nil, errdefs.ErrNotImplemented +// startInternal starts the init process of a container or an exec process within it. +func (s *Service) startInternal(ctx context.Context, request *task.StartRequest) (*task.StartResponse, error) { + if err := s.ensureVMRunning(); err != nil { + return nil, err + } + + // Get the container controller for the requested task. + ctrCtrl, err := s.getContainerController(request.ID) + if err != nil { + return nil, fmt.Errorf("failed to find container for start request: %w", err) + } + + resp := &task.StartResponse{} + + // If the start was meant for container, + // call start on Container controller. + if request.ExecID == "" { + pid, err := ctrCtrl.Start(ctx, s.events) + if err != nil { + return nil, fmt.Errorf("failed to start container %s: %w", request.ID, err) + } + resp.Pid = pid + + // Publish the TaskStart event for the init process. + s.send(&eventstypes.TaskStart{ + ContainerID: request.ID, + Pid: pid, + }) + + return resp, nil + } + + // If the start was meant for exec process, + // call start on Process controller. + proc, err := ctrCtrl.GetProcess(request.ExecID) + if err != nil { + return nil, fmt.Errorf("failed to get process (execID=%q) in container %s: %w", request.ExecID, request.ID, err) + } + + p, err := proc.Start(ctx, s.events) + if err != nil { + return nil, fmt.Errorf("failed to start process (execID=%q) in container %s: %w", request.ExecID, request.ID, err) + } + resp.Pid = uint32(p) + + // Publish the TaskExecStarted event for the exec process. + s.send(&eventstypes.TaskExecStarted{ + ContainerID: request.ID, + ExecID: request.ExecID, + Pid: uint32(p), + }) + + return resp, nil } +// deleteInternal deletes a process, container, or pod sandbox depending on the request. +func (s *Service) deleteInternal(ctx context.Context, request *task.DeleteRequest) (*task.DeleteResponse, error) { + if err := s.ensureVMRunning(); err != nil { + return nil, err + } + + // Look up the container controller for the target ID. + ctrCtrl, err := s.getContainerController(request.ID) + if err != nil { + return nil, fmt.Errorf("failed to find container for delete request: %w", err) + } + + // Delete the process from the container controller. + // For the init process this is request.ExecID == "". + status, err := ctrCtrl.DeleteProcess(ctx, request.ExecID) + if err != nil { + return nil, fmt.Errorf("failed to delete process (execID=%q) in container %s: %w", request.ExecID, request.ID, err) + } + + // Build the response from the process status returned by DeleteProcess. + resp := &task.DeleteResponse{ + Pid: status.Pid, + ExitStatus: status.ExitStatus, + ExitedAt: status.ExitedAt, + } + + // Publish the TaskDelete event to notify containerd the process/task has been deleted. + s.send(&eventstypes.TaskDelete{ + ContainerID: request.ID, + ID: request.ExecID, + Pid: status.Pid, + ExitStatus: status.ExitStatus, + ExitedAt: status.ExitedAt, + }) + + // If this was an exec process deletion, we are done. + if request.ExecID != "" { + return resp, nil + } + + // We need to delete either a pod or a container. + s.mu.Lock() + defer s.mu.Unlock() + + podID := s.containerPodMapping[request.ID] + + // If the container ID matches a pod ID, this is the sandbox container + // being torn down. + if podCtrl, isPod := s.podControllers[request.ID]; isPod { + // Ensure no workload containers remain in the pod. The only container + // left should be the sandbox container itself (request.ID). + remaining := podCtrl.ListContainers() + delete(remaining, request.ID) // exclude the sandbox container itself + if len(remaining) > 0 { + return nil, fmt.Errorf("cannot delete sandbox container %s: %d workload container(s) still exist in the pod: %w", + request.ID, len(remaining), errdefs.ErrFailedPrecondition) + } + + // Tear down the pod network before removing the pod controller. + if err := podCtrl.TeardownNetwork(ctx); err != nil { + return nil, fmt.Errorf("failed to teardown network for pod %s: %w", request.ID, err) + } + + // Remove the sandbox container from the pod's internal container map. + if err := podCtrl.DeleteContainer(ctx, request.ID); err != nil { + return nil, fmt.Errorf("failed to delete sandbox container %s from pod: %w", request.ID, err) + } + + delete(s.podControllers, request.ID) + delete(s.containerPodMapping, request.ID) + return resp, nil + } + + // Regular (non-sandbox) container: delete the container from the owning + // pod controller first, then remove the mapping. + podCtrl, ok := s.podControllers[podID] + if !ok { + return nil, fmt.Errorf("pod controller for pod %s not found while deleting container %s: %w", podID, request.ID, errdefs.ErrNotFound) + } + + if err := podCtrl.DeleteContainer(ctx, request.ID); err != nil { + return nil, fmt.Errorf("failed to delete container %s from pod %s: %w", request.ID, podID, err) + } + + delete(s.containerPodMapping, request.ID) + + return resp, nil +} + +// pidsInternal returns the list of process IDs running inside the specified container. +func (s *Service) pidsInternal(ctx context.Context, request *task.PidsRequest) (*task.PidsResponse, error) { + if err := s.ensureVMRunning(); err != nil { + return nil, err + } + + ctrCtrl, err := s.getContainerController(request.ID) + if err != nil { + return nil, fmt.Errorf("failed to find container for pids request: %w", err) + } + + pids, err := ctrCtrl.Pids(ctx) + if err != nil { + err = enrichNotFoundError(err) + return nil, fmt.Errorf("failed to get pids for container %s: %w", request.ID, err) + } + + return &task.PidsResponse{ + Processes: pids, + }, nil +} + +// pauseInternal is not implemented for this shim. func (s *Service) pauseInternal(_ context.Context, _ *task.PauseRequest) (*emptypb.Empty, error) { return nil, errdefs.ErrNotImplemented } +// resumeInternal is not implemented for this shim. func (s *Service) resumeInternal(_ context.Context, _ *task.ResumeRequest) (*emptypb.Empty, error) { return nil, errdefs.ErrNotImplemented } +// checkpointInternal is not implemented for this shim. func (s *Service) checkpointInternal(_ context.Context, _ *task.CheckpointTaskRequest) (*emptypb.Empty, error) { return nil, errdefs.ErrNotImplemented } -func (s *Service) killInternal(_ context.Context, _ *task.KillRequest) (*emptypb.Empty, error) { - return nil, errdefs.ErrNotImplemented +// killInternal sends a signal to a process or, when All is set, to every process in the pod. +func (s *Service) killInternal(ctx context.Context, request *task.KillRequest) (*emptypb.Empty, error) { + if err := s.ensureVMRunning(); err != nil { + return nil, err + } + + ctrCtrl, err := s.getContainerController(request.ID) + if err != nil { + return nil, fmt.Errorf("failed to find container for kill request: %w", err) + } + + // If "all" is set and this is a sandbox (pod) container, collect all + // workload containers so we can fan out the kill to the entire pod. + var workloadContainers map[string]*linuxcontainer.Controller + if request.All { + if podCtrl, ok := s.getPodController(request.ID); ok { + workloadContainers = podCtrl.ListContainers() + // Exclude the sandbox container — it is killed below. + delete(workloadContainers, request.ID) + } + } + + // Fan out kill to all workload containers and the target container concurrently. + killGroup := errgroup.Group{} + for _, workloadCtr := range workloadContainers { + killGroup.Go(func() error { + return workloadCtr.KillProcess(ctx, request.ExecID, request.Signal, request.All) + }) + } + // Target container. + killGroup.Go(func() error { + return ctrCtrl.KillProcess(ctx, request.ExecID, request.Signal, request.All) + }) + + // Wait for all kill to complete. + if err = killGroup.Wait(); err != nil { + return nil, err + } + + return &emptypb.Empty{}, nil } -func (s *Service) execInternal(_ context.Context, _ *task.ExecProcessRequest) (*emptypb.Empty, error) { - return nil, errdefs.ErrNotImplemented +// execInternal creates a new exec process inside the specified container. +func (s *Service) execInternal(ctx context.Context, request *task.ExecProcessRequest) (*emptypb.Empty, error) { + if err := s.ensureVMRunning(); err != nil { + return nil, err + } + + var spec specs.Process + if err := json.Unmarshal(request.Spec.Value, &spec); err != nil { + return nil, fmt.Errorf("unmarshal process spec: %w", err) + } + + ctrCtrl, err := s.getContainerController(request.ID) + if err != nil { + return nil, fmt.Errorf("failed to find container for exec request: %w", err) + } + + proc, err := ctrCtrl.NewProcess(request.ExecID) + if err != nil { + return nil, fmt.Errorf("failed to create new process (execID=%q) in container %s: %w", request.ExecID, request.ID, err) + } + + if err := proc.Create(ctx, &process.CreateOptions{ + Spec: &spec, + Terminal: request.Terminal, + Stdin: request.Stdin, + Stdout: request.Stdout, + Stderr: request.Stderr, + }); err != nil { + return nil, fmt.Errorf("failed to create exec process (execID=%q) in container %s: %w", request.ExecID, request.ID, err) + } + + // Publish the TaskExecAdded event to notify containerd that a new exec has been created. + s.send(&eventstypes.TaskExecAdded{ + ContainerID: request.ID, + ExecID: request.ExecID, + }) + + return &emptypb.Empty{}, nil } -func (s *Service) resizePtyInternal(_ context.Context, _ *task.ResizePtyRequest) (*emptypb.Empty, error) { - return nil, errdefs.ErrNotImplemented +// resizePtyInternal resizes the pseudo-terminal for the specified process. +func (s *Service) resizePtyInternal(ctx context.Context, request *task.ResizePtyRequest) (*emptypb.Empty, error) { + if err := s.ensureVMRunning(); err != nil { + return nil, err + } + + ctrCtrl, err := s.getContainerController(request.ID) + if err != nil { + return nil, fmt.Errorf("failed to find container for resize pty request: %w", err) + } + + proc, err := ctrCtrl.GetProcess(request.ExecID) + if err != nil { + return nil, fmt.Errorf("failed to get process (execID=%q) in container %s: %w", request.ExecID, request.ID, err) + } + + if err := proc.ResizeConsole(ctx, request.Width, request.Height); err != nil { + return nil, fmt.Errorf("failed to resize pty for process (execID=%q) in container %s: %w", request.ExecID, request.ID, err) + } + + return &emptypb.Empty{}, nil } -func (s *Service) closeIOInternal(_ context.Context, _ *task.CloseIORequest) (*emptypb.Empty, error) { - return nil, errdefs.ErrNotImplemented +// closeIOInternal closes the stdin stream for the specified process. +func (s *Service) closeIOInternal(ctx context.Context, request *task.CloseIORequest) (*emptypb.Empty, error) { + if err := s.ensureVMRunning(); err != nil { + return nil, err + } + + ctrCtrl, err := s.getContainerController(request.ID) + if err != nil { + return nil, fmt.Errorf("failed to find container for close IO request: %w", err) + } + + proc, err := ctrCtrl.GetProcess(request.ExecID) + if err != nil { + return nil, fmt.Errorf("failed to get process (execID=%q) in container %s: %w", request.ExecID, request.ID, err) + } + + proc.CloseIO(ctx) + + return &emptypb.Empty{}, nil } -func (s *Service) updateInternal(_ context.Context, _ *task.UpdateTaskRequest) (*emptypb.Empty, error) { - return nil, errdefs.ErrNotImplemented +// updateInternal applies a resource update to a pod VM or an individual container. +func (s *Service) updateInternal(ctx context.Context, request *task.UpdateTaskRequest) (*emptypb.Empty, error) { + if err := s.ensureVMRunning(); err != nil { + return nil, err + } + + if request.Resources == nil { + return nil, fmt.Errorf("update container %s: resources cannot be empty: %w", request.ID, errdefs.ErrInvalidArgument) + } + + resources, err := typeurl.UnmarshalAny(request.Resources) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal resources for container %s update request: %w", request.ID, err) + } + + switch resources.(type) { + case *specs.LinuxResources: + case *ctrdtaskapi.PolicyFragment: + default: + return nil, fmt.Errorf("unsupported resource type %T: %w", resources, errdefs.ErrInvalidArgument) + } + + // Check if the ID in request matches any podID in podController map. + // If so, this is a pod-level update — call Update on the VMController. + if _, ok := s.getPodController(request.ID); ok { + if err := s.vmController.Update(ctx, resources, request.Annotations); err != nil { + return nil, fmt.Errorf("failed to update VM resources for pod %s: %w", request.ID, err) + } + return &emptypb.Empty{}, nil + } + + // Otherwise, find the container controller and call Update on it. + ctrCtrl, err := s.getContainerController(request.ID) + if err != nil { + return nil, fmt.Errorf("failed to update resources for container %s: %w", request.ID, err) + } + + if err := ctrCtrl.Update(ctx, resources); err != nil { + return nil, fmt.Errorf("failed to update resources for container %s: %w", request.ID, err) + } + + return &emptypb.Empty{}, nil } -func (s *Service) waitInternal(_ context.Context, _ *task.WaitRequest) (*task.WaitResponse, error) { - return nil, errdefs.ErrNotImplemented +// waitInternal blocks until the specified process exits and returns its exit status. +func (s *Service) waitInternal(ctx context.Context, request *task.WaitRequest) (*task.WaitResponse, error) { + if err := s.ensureVMRunning(); err != nil { + return nil, err + } + + ctrCtrl, err := s.getContainerController(request.ID) + if err != nil { + return nil, fmt.Errorf("failed to find container for wait request: %w", err) + } + + // An empty ExecID means the caller is waiting for the container itself + // (i.e. the init process + full teardown). Wait on the container + // controller, which blocks until the container reaches StateTerminated + // and has finished the teardown. + if request.ExecID == "" { + ctrCtrl.Wait(ctx) + } + + // Get the process controller associated with the ExecID. + proc, err := ctrCtrl.GetProcess(request.ExecID) + if err != nil { + return nil, fmt.Errorf("failed to get process (execID=%q) in container %s: %w", request.ExecID, request.ID, err) + } + + // Call Wait on the process controller itself. + proc.Wait(ctx) + + // Get the Process status. + status := proc.Status(true) + + return &task.WaitResponse{ + ExitStatus: status.ExitStatus, + ExitedAt: status.ExitedAt, + }, nil } -func (s *Service) statsInternal(_ context.Context, _ *task.StatsRequest) (*task.StatsResponse, error) { - return nil, errdefs.ErrNotImplemented +// statsInternal returns resource usage statistics for the specified container and, for pods, the VM. +func (s *Service) statsInternal(ctx context.Context, request *task.StatsRequest) (*task.StatsResponse, error) { + if err := s.ensureVMRunning(); err != nil { + return nil, err + } + + ctrCtrl, err := s.getContainerController(request.ID) + if err != nil { + return nil, fmt.Errorf("failed to find container for stats request: %w", err) + } + + // Get the stats for the requested container. + ctrStats, err := ctrCtrl.Stats(ctx) + if err != nil { + err = enrichNotFoundError(err) + return nil, fmt.Errorf("failed to get container stats for %s: %w", request.ID, err) + } + + // Fetch and attach VM stats only for pod-level requests. + if _, ok := s.getPodController(request.ID); ok { + vmStats, err := s.vmController.Stats(ctx) + if err != nil { + err = enrichNotFoundError(err) + return nil, fmt.Errorf("failed to get VM stats: %w", err) + } + ctrStats.VM = vmStats + } + + // Marshal the stats into an Any for the response. + anyStats, err := typeurl.MarshalAny(ctrStats) + if err != nil { + return nil, fmt.Errorf("failed to marshal stats: %w", err) + } + + return &task.StatsResponse{ + Stats: typeurl.MarshalProto(anyStats), + }, nil } -func (s *Service) connectInternal(_ context.Context, _ *task.ConnectRequest) (*task.ConnectResponse, error) { - return nil, errdefs.ErrNotImplemented +// shutdownInternal is a no-op; shim teardown is deferred to SandboxService.ShutdownSandbox. +func (s *Service) shutdownInternal(ctx context.Context, request *task.ShutdownRequest) (*emptypb.Empty, error) { + // Because this shim strictly implements the Sandbox API, + // the TaskService no longer has the authority to shut down the shim process. + // Shim teardown is completely deferred to SandboxService.ShutdownSandbox. + + // Simply log the call for debugging purposes and return. + log.G(ctx).WithFields(logrus.Fields{ + logfields.SandboxID: s.sandboxID, + logfields.ID: request.ID, + }).Debug("ignoring TaskService.Shutdown request") + + return &emptypb.Empty{}, nil } -func (s *Service) shutdownInternal(_ context.Context, _ *task.ShutdownRequest) (*emptypb.Empty, error) { - return nil, errdefs.ErrNotImplemented +// enrichNotFoundError wraps HCS-specific "not found" errors with errdefs.ErrNotFound. +func enrichNotFoundError(err error) error { + isNotFound := errdefs.IsNotFound(err) || + hcs.IsNotExist(err) || + hcs.IsOperationInvalidState(err) || + hcs.IsAccessIsDenied(err) || + hcs.IsErrorInvalidHandle(err) + if isNotFound { + return fmt.Errorf("%w: %w", errdefs.ErrNotFound, err) + } + return err }