Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions core/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type consumer struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
registry TopicRegistry
classifiers []errs.Classifier

mu sync.Mutex
stopped bool
Expand All @@ -76,12 +77,22 @@ type activeSubscription struct {
}

// New creates a new consumer.
// registry provides queue and subscription config for topics.
func New(logger *zap.SugaredLogger, scope tally.Scope, registry TopicRegistry) Consumer {
//
// registry provides queue and subscription config for topics. classifiers are
// the per-backend error classifiers used to decide whether an error returned
// by a controller is retryable (nack for redelivery) or non-retryable (reject
// to DLQ). The consumer runs errs.Classify(err, classifiers...) exactly once
// per failing delivery and then drives ack/nack/reject from the resulting
// chain via plain errs.IsRetryable type checks. Pass any backend-specific
// classifiers the controllers rely on (e.g. core/errs/mysql.Classifier);
// passing none is fine for tests where controllers always return explicit
// framework-wrapped errors.
func New(logger *zap.SugaredLogger, scope tally.Scope, registry TopicRegistry, classifiers ...errs.Classifier) Consumer {
return &consumer{
logger: logger,
metricsScope: scope.SubScope("consumer"),
registry: registry,
classifiers: classifiers,
subscriptions: make(map[TopicKey]*activeSubscription),
}
}
Expand Down Expand Up @@ -370,6 +381,12 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
metrics.NamedTimer(controllerScope, opName, "controller_latency", elapsed, metrics.NewTag("success", successTag))

if err != nil {
// Single explicit classification pass: if err's chain does not already
// carry a framework wrap, ask the configured classifiers and prepend
// the matching framework type. Downstream errs.IsRetryable / IsUserError
// calls then only do a plain type check on the result.
err = errs.Classify(err, m.classifiers...)

// Check if the error is non-retryable (poison pill message)
if !errs.IsRetryable(err) {
m.logger.Errorw("non-retryable controller error, rejecting message",
Expand Down
109 changes: 95 additions & 14 deletions core/errs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,109 @@ Errors are classified along two axes:

**Infra by default.** Any error that is not explicitly wrapped with `NewUserError` is an infra error. There is no `NewInfraError` constructor — infra is the default classification.

## Who Classifies Errors
## Two Routes to a Classification

**Extensions return plain Go errors.** Extension interfaces (`MergeChecker`, `Storage`, `Publisher`) return standard `error` values. They may define their own domain-specific sentinel errors (e.g. `storage.ErrNotFound`, `storage.ErrVersionMismatch`) but they do not classify errors as user or infra.
A returned `error` reaches `IsUserError` / `IsRetryable` / `IsDependencyError` carrying one of the framework types (`*userError` / `*infraError`). It gets there one of two ways:

**Service controllers classify errors.** The controller that calls an extension decides whether the error is user-caused or infrastructure-caused, and whether it should be retried:
1. **Explicit wrap by the controller** — the controller knows the meaning of the failure and wraps the cause with `NewUserError`, `NewRetryableError`, `NewDependencyError`, or `NewRetryableDependencyError` before returning.
2. **Automatic wrap by `Classify`** — the controller returns a raw driver/library/sentinel error, and a per-backend `Classifier` recognises it later in the pipeline (typically inside the consumer) and adds the appropriate framework wrap.

Both routes feed the same downstream helpers; the chain that reaches `IsRetryable` looks identical regardless of who wrapped it.

## `Classify` and the `Classifier` Interface

`Classifier` inspects a **single error node** and returns a `Verdict`:

```go
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error {
// Extension returns a plain error
result, err := c.mergeChecker.Check(ctx, queue, change)
if err != nil {
// Controller classifies: merge checker failure is infra, worth retrying
return errs.NewRetryableError(fmt.Errorf("merge check failed: %w", err))
}
type Classifier interface {
Classify(err error) Verdict
}
```

Verdicts: `Unknown` (this node carries no signal), `User`, `Infra`, `InfraRetryable`, `InfraDependency`, `InfraDependencyRetryable`.

`Classify(err, classifiers...)` is the single, explicit pass that turns a raw chain into a wrapped one. It is called **exactly once per chain** — typically by the consumer immediately after the controller returns. After that point, callers use only the `IsXxx` helpers, which are pure type checks.

`Classify` walks the chain twice:

1. **Pass 1 — framework-wrap check.** A cheap type switch looks for an existing `*userError` / `*infraError` anywhere in the chain. If found, the chain is already interpretable and `Classify` returns `err` unchanged. **No classifier is invoked.**
2. **Pass 2 — classifier walk.** From outermost to innermost node, each registered classifier is asked for a verdict. The first non-`Unknown` verdict wins and `err` is wrapped with the matching framework constructor.

If no classifier recognises anything, `err` is returned unchanged — and behaves as non-retryable infra at the helper layer.

## Adding a Backend-Specific Classifier

Backend classifiers live alongside the extension they classify, under `core/errs/<backend>/`. The canonical examples are `core/errs/mysql` (MySQL driver errors) and `core/errs/generic` (transport-agnostic concerns such as `context.Canceled`).

A classifier:

- Inspects exactly one node — the `err` argument passed in. **Do not call `errors.Is` / `errors.As`** from inside `Classify`; the framework owns the chain walk. Calling it yourself can shadow a deeper-but-different verdict and breaks the controller-override rules described below.
- Returns `Unknown` for anything it does not recognise, so the surrounding walker can continue.
- Is stateless. The convention is to expose a package-level singleton value rather than a constructor:

```go
// core/errs/foo/foo.go
package foo

import "github.com/uber/submitqueue/core/errs"

if !result.Mergeable {
// Controller classifies: not mergeable is a user error, never retry
return errs.NewUserError(fmt.Errorf("not mergeable: %s", result.Reason))
var Classifier errs.Classifier = classifier{}

type classifier struct{}

func (classifier) Classify(err error) errs.Verdict {
// Type-assert / sentinel-compare on err directly, never errors.As / errors.Is.
if fe, ok := err.(*FooError); ok {
return classifyFooCode(fe.Code)
}
return errs.Unknown
}
```

Servers wire each classifier into the consumer as a vararg. Order matters only when two classifiers might both match a node — earlier classifiers win:

```go
import (
genericerrs "github.com/uber/submitqueue/core/errs/generic"
mysqlerrs "github.com/uber/submitqueue/core/errs/mysql"
)

c := consumer.New(logger, scope, registry,
genericerrs.Classifier,
mysqlerrs.Classifier,
)
```

Tests follow the same shape: assert per-node behaviour against `Classifier.Classify(node)` directly, and assert end-to-end behaviour by running `errs.Classify(err, Classifier)` and checking the helpers (`IsRetryable`, `IsUserError`, …) on the result. See `core/errs/mysql/mysql_test.go` and `core/errs/generic/generic_test.go`.

// ...
## Overriding Classification from a Controller

Because pass 1 short-circuits on the first framework wrap it finds, **an explicit wrap by the controller always wins over any classifier**. Use this when the controller has context the classifier cannot — typically when the same low-level error means different things in different call sites.

```go
result, err := c.storage.Get(ctx, id)
if errors.Is(err, storage.ErrNotFound) {
// This caller treats "not found" as a user error: the user asked for an
// unknown resource. The mysql classifier never gets a vote because the
// framework wrap short-circuits pass 1.
return errs.NewUserError(fmt.Errorf("request %s: %w", id, err))
}
if err != nil {
// Hand the raw error to Classify — the mysql classifier will recognise
// deadlocks, lock-wait timeouts, etc. and wrap them as retryable infra.
return fmt.Errorf("get %s: %w", id, err)
}
```

Two practical rules fall out of the short-circuit semantics:

- **Wrap with a framework constructor as soon as the controller knows the right verdict.** Any wrap added later in the chain still wins, but wrapping early keeps the intent close to the decision.
- **A wrap anywhere in the chain blocks all classifiers — including for nodes deeper than the wrap.** If you want a classifier to still get a look at the cause, do not wrap above it. (In practice this is rare: controllers wrap because they have the final answer.)

## Extensions Return Plain Go Errors

Extension interfaces (`MergeChecker`, `Storage`, `Publisher`) return standard `error` values. They may define their own domain-specific sentinel errors (e.g. `storage.ErrNotFound`, `storage.ErrVersionMismatch`) but they do **not** classify errors as user or infra — that is the controller's (and `Classify`'s) job.

This separation keeps extensions reusable across contexts. The same `storage.ErrNotFound` might be a user error in one controller (user requested a non-existent resource) and an infra error in another (expected record is missing).

## Error Chain Compatibility
Expand Down Expand Up @@ -71,3 +150,5 @@ errors.Is(err, ErrNotFound) // true — cause is in the chain
| `IsUserError(err)` | `err` is or wraps a `userError` |
| `IsRetryable(err)` | `err` is or wraps an infra error with the retryable flag set |
| `IsDependencyError(err)` | `err` is or wraps an infra error marked as dependency |

All three are type-only checks. They do not invoke classifiers — pair them with a preceding `Classify` call when the controller's error may not carry an explicit wrap.
156 changes: 134 additions & 22 deletions core/errs/errs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package errs

import (
"context"
"errors"
)

Expand Down Expand Up @@ -107,34 +106,147 @@ func (e *infraError) Is(target error) bool {
return ok
}

// IsUserError checks if err is or wraps a user error.
// Verdict is the classification of a single error node, returned by a
// Classifier. Unknown means the node carries no signal and the chain walker
// should keep looking; every other value names a terminal classification.
type Verdict int

const (
// Unknown means this node carries no classification. The chain walker
// will move on to the next node in the unwrap chain.
Unknown Verdict = iota
// User means the error is caused by the user's input or action (e.g. a
// merge conflict or invalid request) and must not be retried.
User
// Infra means a non-retryable infrastructure failure: something below the
// caller broke in a way that retrying will not fix (e.g. a schema or
// programmer bug). This is the implicit verdict for an unclassified chain,
// so Classify does not add a wrap for it.
Infra
// InfraRetryable means a transient infrastructure failure that is
// expected to succeed on retry (e.g. a deadlock, lock-wait timeout, or
// dropped connection).
InfraRetryable
// InfraDependency means a non-retryable failure originating in a
// downstream dependency outside the caller's control (e.g. an external
// service rejecting the request).
InfraDependency
// InfraDependencyRetryable means a transient failure originating in a
// downstream dependency (e.g. an external service is briefly unavailable)
// that is expected to succeed on retry.
InfraDependencyRetryable
)

// Classifier inspects a single error node (not the whole chain) and returns a
// Verdict. Implementations should return Unknown for nodes they do not
// recognize so the chain walker can continue down the unwrap chain.
//
// Classifiers must not call errors.As / errors.Is themselves, which would walk
// the chain and could shadow a classification carried by an outer node (such
// as a controller's explicit NewUserError wrap). The package-level Classify
// function owns the walk.
//
// Classifiers are typically stateless; the canonical convention is to expose a
// package-level singleton value (e.g. mysqlerrs.Classifier) rather than a
// constructor.
type Classifier interface {
Classify(err error) Verdict
}

// Classify is the single, explicit classification pass. It is intended to be
// called exactly once per error chain — typically by the consumer immediately
// after a controller returns — and produces a chain that subsequent IsUserError
// / IsRetryable / IsDependencyError calls can interpret with simple type
// checks (no further classifier walks).
//
// Semantics:
//
// - nil in, nil out.
// - If err's chain already carries a framework classification (*userError or
// *infraError anywhere in the chain), returns err unchanged — the chain is
// already interpretable by IsUserError / IsRetryable / IsDependencyError.
// - Otherwise, walks the chain from outermost to innermost, asking each
// classifier per node. The FIRST non-Unknown verdict wins; the outermost
// such node determines the wrap. err is wrapped with the framework
// constructor matching that verdict (User -> NewUserError, InfraRetryable
// -> NewRetryableError, etc.) and the wrapped error is returned.
// - Verdict Infra means "non-retryable infra" — which is already the default
// behavior for an unwrapped chain, so no wrap is added.
// - If no classifier recognises anything, err is returned unchanged.
//
// Implementation: two passes over the chain. Pass 1 is a cheap type check
// looking for an existing framework wrap and short-circuits if one is found —
// no classifier is invoked. Pass 2 runs the configured classifiers per node.
// Walking the chain is cheap relative to a classifier call, so this avoids
// running classifiers whenever the chain is already classified deeper down.
//
// NOTE: this central classifier model cannot disambiguate errors of the same
// underlying type produced by different extensions (e.g. a net.OpError from a
// mysql connection vs the same type from an HTTP caller would both match the
// mysql classifier here). Resolving that requires per-extension provenance
// tagging; intentionally deferred.
func Classify(err error, classifiers ...Classifier) error {
if err == nil {
return nil
}

// Pass 1 — cheap framework-wrap check. If any node already carries a
// framework type, the chain is interpretable as-is and classifiers are
// never invoked.
for cur := err; cur != nil; cur = errors.Unwrap(cur) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we expect mutli-errors to work in this case?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are multi-errors? This function inspects the whole error chain, same as Errors.As/Is does

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errors.Join(...) stuff which returns array of errors... i don't think we are doing it anywhere yet, just making sure if we do it later at some point, i can be calssified..

switch cur.(type) {
case *userError, *infraError:
return err
}
}

// Pass 2 — run classifiers per node from outermost to innermost. Stop at
// the first non-Unknown verdict.
var verdict Verdict
for cur := err; cur != nil && verdict == Unknown; cur = errors.Unwrap(cur) {
for _, c := range classifiers {
if v := c.Classify(cur); v != Unknown {
verdict = v
break
}
}
}

switch verdict {
case User:
return NewUserError(err)
case InfraRetryable:
return NewRetryableError(err)
case InfraDependency:
return NewDependencyError(err)
case InfraDependencyRetryable:
return NewRetryableDependencyError(err)
}
// Unknown or Infra — no wrap needed; the existing chain already behaves as
// non-retryable infra at the IsRetryable / IsUserError layer.
return err
}

// IsUserError reports whether err is or wraps a user error, i.e. an error
// produced by NewUserError. Inspects only the framework types in the chain.
func IsUserError(err error) bool {
var target *userError
return errors.As(err, &target)
var ue *userError
return errors.As(err, &ue)
}

// IsRetryable checks if err is retryable. Returns true when err is or
// wraps an infrastructure error whose retryable flag is set or when err is context.Canceled. User errors are
// never retryable. A generic error (not wrapped) returns false, consistent
// with the convention that unclassified errors are non-retryable.
// IsRetryable reports whether err is or wraps an infra error marked
// retryable, i.e. an error produced by NewRetryableError or
// NewRetryableDependencyError. Inspects only the framework types in the chain.
func IsRetryable(err error) bool {
if errors.Is(err, context.Canceled) {
return true
}
var ie *infraError
if errors.As(err, &ie) {
return ie.retryable
}
return false
return errors.As(err, &ie) && ie.retryable
}

// IsDependencyError checks if err is or wraps an infra error that originated
// in a downstream dependency. Returns false for user errors, generic errors,
// and infra errors not marked as dependency.
// IsDependencyError reports whether err is or wraps an infra error marked as
// originating in a downstream dependency, i.e. an error produced by
// NewDependencyError or NewRetryableDependencyError. Inspects only the
// framework types in the chain.
func IsDependencyError(err error) bool {
var ie *infraError
if errors.As(err, &ie) {
return ie.dependency
}
return false
return errors.As(err, &ie) && ie.dependency
}
19 changes: 19 additions & 0 deletions core/errs/generic/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "generic",
srcs = ["generic.go"],
importpath = "github.com/uber/submitqueue/core/errs/generic",
visibility = ["//visibility:public"],
deps = ["//core/errs"],
)

go_test(
name = "generic_test",
srcs = ["generic_test.go"],
embed = [":generic"],
deps = [
"//core/errs",
"@com_github_stretchr_testify//assert",
],
)
Loading
Loading