From d35646de605a7a03b41297285351ee7d3f6558d3 Mon Sep 17 00:00:00 2001 From: Francesco Pepe <3891780+francescopepe@users.noreply.github.com> Date: Thu, 15 Feb 2024 20:17:46 +0100 Subject: [PATCH 01/18] chore: add automated release --- .github/workflows/release.yml | 34 +++++++++++++++++ .releaserc.yml | 72 +++++++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+) create mode 100644 .github/workflows/release.yml create mode 100644 .releaserc.yml diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..9aa5f48 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,34 @@ +name: Release +on: + push: + branches: + - main + +permissions: + contents: read # for checkout + +jobs: + release: + name: Release + runs-on: ubuntu-latest + permissions: + contents: write # to be able to publish a GitHub release + issues: write # to be able to comment on released issues + pull-requests: write # to be able to comment on released pull requests + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + fetch-depth: 0 + - name: Setup Node.js + uses: actions/setup-node@v3 + with: + node-version: "lts/*" + - name: Install dependencies + run: npm install --save-dev @semantic-release/commit-analyzer @semantic-release/release-notes-generator @semantic-release/changelog @semantic-release/git @semantic-release/github + - name: Verify the integrity of provenance attestations and registry signatures for installed dependencies + run: npm audit signatures + - name: Release + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: npx semantic-release diff --git a/.releaserc.yml b/.releaserc.yml new file mode 100644 index 0000000..e92538f --- /dev/null +++ b/.releaserc.yml @@ -0,0 +1,72 @@ +--- +branch: main +preset: "angular" +tagFormat: "v${version}" +plugins: + - "@semantic-release/commit-analyzer" + - "@semantic-release/release-notes-generator" + - "@semantic-release/changelog" + - "@semantic-release/git" + - "@semantic-release/github" + +verifyConditions: + - '@semantic-release/git' + - "@semantic-release/github" +analyzeCommits: + - path: "@semantic-release/commit-analyzer" + releaseRules: + - type: "feat" + release: "patch" + - type: "hotfix" + release: "patch" + - type: "patch" + release: "patch" + - type: "minor" + release: "minor" + - type: "breaking" + release: "major" +generateNotes: + - path: "@semantic-release/release-notes-generator" + writerOpts: + groupBy: "type" + commitGroupsSort: + - "feat" + - "perf" + - "fix" + commitsSort: "header" + types: + - type: "feat" + - section: "Features" + # Tracked bug fix with a hotfix branch + - type: "hotfix" + - section: "Bug Fixes" + # Uninmportent fix (CI testing, etc) + - type: "fix" + - hidden: true + - type: "chore" + - hidden: true + - type: "docs" + - hidden: true + - type: "doc" + - hidden: true + - type: "style" + - hidden: true + - type: "refactor" + - hidden: true + - type: "perf" + - hidden: true + - type: "test" + - hidden: true + presetConfig: true +prepare: + - path: "@semantic-release/git" + - path: "@semantic-release/changelog" + changelogFile: "docs/CHANGELOG.md" +publish: + - path: "@semantic-release/github" + +success: + - "@semantic-release/github" + +fail: + - "@semantic-release/github" From 24d1260a034503cd97b017e250826e8d75668061 Mon Sep 17 00:00:00 2001 From: Francesco Pepe <3891780+francescopepe@users.noreply.github.com> Date: Fri, 16 Feb 2024 21:38:09 +0100 Subject: [PATCH 02/18] chore: fix releaserc.yml --- .releaserc.yml | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/.releaserc.yml b/.releaserc.yml index e92538f..5a2bd93 100644 --- a/.releaserc.yml +++ b/.releaserc.yml @@ -1,16 +1,14 @@ --- -branch: main +branches: + - main preset: "angular" tagFormat: "v${version}" plugins: - "@semantic-release/commit-analyzer" - "@semantic-release/release-notes-generator" - - "@semantic-release/changelog" - - "@semantic-release/git" - "@semantic-release/github" verifyConditions: - - '@semantic-release/git' - "@semantic-release/github" analyzeCommits: - path: "@semantic-release/commit-analyzer" @@ -58,10 +56,6 @@ generateNotes: - type: "test" - hidden: true presetConfig: true -prepare: - - path: "@semantic-release/git" - - path: "@semantic-release/changelog" - changelogFile: "docs/CHANGELOG.md" publish: - path: "@semantic-release/github" From ab739c863fecf273f9808232c20ca164cd41a204 Mon Sep 17 00:00:00 2001 From: francescopepe <3891780+francescopepe@users.noreply.github.com> Date: Mon, 17 Jun 2024 16:54:58 +0200 Subject: [PATCH 03/18] feat: `reportFunc` returns a boolean value to decide whether the error counts towards to threshold BREAKING CHANGE: `reportFunc` must return a boolean --- config.go | 8 +++++--- consumers.go | 2 +- controller.go | 7 ++++++- go.mod | 10 +++++----- go.sum | 18 ++++++++++++++++++ internal/messages/messages.go | 2 +- retriever.go | 8 ++++---- 7 files changed, 40 insertions(+), 15 deletions(-) diff --git a/config.go b/config.go index d8bc8da..bcd9848 100644 --- a/config.go +++ b/config.go @@ -33,8 +33,8 @@ type ErrorConfiguration struct { // Default: 120s. Period time.Duration - // The error report function - ReportFunc func(err error) + // The error report function, returns a boolean value to decide whether the error counts towards to threshold + ReportFunc func(err error) bool } // The MultiMessageBufferConfiguration defines a buffer which is consumed by the worker when either @@ -107,8 +107,10 @@ func setWorkerConfigValues(config Configuration) Configuration { } if config.ErrorConfig.ReportFunc == nil { - config.ErrorConfig.ReportFunc = func(err error) { + config.ErrorConfig.ReportFunc = func(err error) bool { log.Println("ERROR", err) + + return true } } diff --git a/consumers.go b/consumers.go index 83834e4..d2983a1 100644 --- a/consumers.go +++ b/consumers.go @@ -34,7 +34,7 @@ func makeAvailableConsumers(concurrency int) chan struct{} { return consumers } -// wrapHandler catches any panic error, logs it and returns the error that generated it. +// wrapHandler catches any panic error and returns the error that generated it. // It prevents the worker from crashing in case of an unexpected error. func wrapHandler(handler func() error) (err error) { defer func() { diff --git a/controller.go b/controller.go index 858b881..9d05dd2 100644 --- a/controller.go +++ b/controller.go @@ -19,8 +19,10 @@ type controller struct { // the controller. func (c *controller) decreaseCounterAfterTimeout() { time.Sleep(c.errorConfig.Period) + c.mutex.Lock() defer c.mutex.Unlock() + c.errorCounter-- } @@ -29,6 +31,7 @@ func (c *controller) increaseCounter() { // Increase counter c.mutex.Lock() defer c.mutex.Unlock() + c.errorCounter++ } @@ -40,7 +43,9 @@ func (c *controller) shouldStop() bool { } func (c *controller) reportError(err error) { - c.errorConfig.ReportFunc(err) + if shouldIncreaseCounter := c.errorConfig.ReportFunc(err); !shouldIncreaseCounter { + return + } c.increaseCounter() go c.decreaseCounterAfterTimeout() diff --git a/go.mod b/go.mod index 93d117b..bf8797f 100644 --- a/go.mod +++ b/go.mod @@ -2,11 +2,11 @@ module github.com/francescopepe/formigo go 1.20 -require github.com/aws/aws-sdk-go-v2/service/sqs v1.23.2 +require github.com/aws/aws-sdk-go-v2/service/sqs v1.32.6 require ( - github.com/aws/aws-sdk-go-v2 v1.18.1 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28 // indirect - github.com/aws/smithy-go v1.13.5 // indirect + github.com/aws/aws-sdk-go-v2 v1.27.2 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.9 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.9 // indirect + github.com/aws/smithy-go v1.20.2 // indirect ) diff --git a/go.sum b/go.sum index 644a8f6..a0bd7e8 100644 --- a/go.sum +++ b/go.sum @@ -1,13 +1,31 @@ github.com/aws/aws-sdk-go-v2 v1.18.1 h1:+tefE750oAb7ZQGzla6bLkOwfcQCEtC5y2RqoqCeqKo= github.com/aws/aws-sdk-go-v2 v1.18.1/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= +github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+EkA= +github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= +github.com/aws/aws-sdk-go-v2 v1.27.2 h1:pLsTXqX93rimAOZG2FIYraDQstZaaGVVN4tNw65v0h8= +github.com/aws/aws-sdk-go-v2 v1.27.2/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34 h1:A5UqQEmPaCFpedKouS4v+dHCTUo2sKqhoKO9U5kxyWo= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34/go.mod h1:wZpTEecJe0Btj3IYnDx/VlUzor9wm3fJHyvLpQF0VwY= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 h1:aw39xVGeRWlWx9EzGVnhOR4yOjQDHPQ6o6NmBlscyQg= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5/go.mod h1:FSaRudD0dXiMPK2UjknVwwTYyZMRsHv3TtkabsZih5I= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.9 h1:cy8ahBJuhtM8GTTSyOkfy6WVPV1IE+SS5/wfXUYuulw= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.9/go.mod h1:CZBXGLaJnEZI6EVNcPd7a6B5IC5cA/GkRWtu9fp3S6Y= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28 h1:srIVS45eQuewqz6fKKu6ZGXaq6FuFg5NzgQBAM6g8Y4= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28/go.mod h1:7VRpKQQedkfIEXb4k52I7swUnZP0wohVajJMRn3vsUw= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 h1:PG1F3OD1szkuQPzDw3CIQsRIrtTlUC3lP84taWzHlq0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5/go.mod h1:jU1li6RFryMz+so64PpKtudI+QzbKoIEivqdf6LNpOc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.9 h1:A4SYk07ef04+vxZToz9LWvAXl9LW0NClpPpMsi31cz0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.9/go.mod h1:5jJcHuwDagxN+ErjQ3PU3ocf6Ylc/p9x+BLO/+X4iXw= github.com/aws/aws-sdk-go-v2/service/sqs v1.23.2 h1:Y2vfLiY3HmaMisuwx6fS2kMRYbajRXXB+9vesGVPseY= github.com/aws/aws-sdk-go-v2/service/sqs v1.23.2/go.mod h1:TaV67b6JMD1988x/uMDop/JnMFK6v5d4Ru+sDmFg+ww= +github.com/aws/aws-sdk-go-v2/service/sqs v1.31.4 h1:mE2ysZMEeQ3ulHWs4mmc4fZEhOfeY1o6QXAfDqjbSgw= +github.com/aws/aws-sdk-go-v2/service/sqs v1.31.4/go.mod h1:lCN2yKnj+Sp9F6UzpoPPTir+tSaC9Jwf6LcmTqnXFZw= +github.com/aws/aws-sdk-go-v2/service/sqs v1.32.6 h1:FrGnU+Ggf+jUFj1O7Pdw5hCk42dmyO9TOTCVL7mDISk= +github.com/aws/aws-sdk-go-v2/service/sqs v1.32.6/go.mod h1:2Ef3ZgVWL7lyz5YZf854YkMboK6qF1NbG/0hc9StZsg= github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8= github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= +github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q= +github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= diff --git a/internal/messages/messages.go b/internal/messages/messages.go index 13c9ba4..d4b0aa1 100644 --- a/internal/messages/messages.go +++ b/internal/messages/messages.go @@ -106,7 +106,7 @@ type BufferWithContextTimeoutConfiguration struct { Size int } -// bufferWithContextTimeout is used to construct a buffer that has a context timeout +// BufferWithContextTimeout is used to construct a buffer that has a context timeout // along with the standard buffer timeout. This is used because the messages have to // be processed within a certain period and if this doesn't happen, the buffer should // delete the messages in it and reset. diff --git a/retriever.go b/retriever.go index 1ae23e1..d0939fc 100644 --- a/retriever.go +++ b/retriever.go @@ -17,7 +17,7 @@ func retriever(ctx context.Context, receiver client.MessageReceiver, ctrl *contr case <-ctx.Done(): return default: - messages, err := receiver.ReceiveMessages() + msgs, err := receiver.ReceiveMessages() if err != nil { ctrl.reportError(fmt.Errorf("unable to receive message: %w", err)) continue @@ -27,9 +27,9 @@ func retriever(ctx context.Context, receiver client.MessageReceiver, ctrl *contr // This means that the retriever won't listen for context cancellation // at this stage. func() { - for _, message := range messages { + for _, msg := range msgs { select { - case <-message.Ctx.Done(): + case <-msg.Ctx.Done(): // If consumers don't pick up the messages within the messages' timeout we raise // an error. // This could be due to one or more of the following reasons: @@ -42,7 +42,7 @@ func retriever(ctx context.Context, receiver client.MessageReceiver, ctrl *contr ctrl.reportError(errors.New("message didn't get picked up by any consumer within its timeout")) return // Avoid publishing all the messages downstream - case messageCh <- message: + case messageCh <- msg: // Message pushed to the channel } } From 10cdba402468646c320929d4fd3bbbdfefe99082 Mon Sep 17 00:00:00 2001 From: francescopepe <3891780+francescopepe@users.noreply.github.com> Date: Sat, 17 Aug 2024 11:57:40 +0200 Subject: [PATCH 04/18] chore: update dependencies --- go.mod | 10 +++++----- go.sum | 47 ++++++++++------------------------------------- 2 files changed, 15 insertions(+), 42 deletions(-) diff --git a/go.mod b/go.mod index bf8797f..822ea9b 100644 --- a/go.mod +++ b/go.mod @@ -2,11 +2,11 @@ module github.com/francescopepe/formigo go 1.20 -require github.com/aws/aws-sdk-go-v2/service/sqs v1.32.6 +require github.com/aws/aws-sdk-go-v2/service/sqs v1.34.4 require ( - github.com/aws/aws-sdk-go-v2 v1.27.2 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.9 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.9 // indirect - github.com/aws/smithy-go v1.20.2 // indirect + github.com/aws/aws-sdk-go-v2 v1.30.4 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 // indirect + github.com/aws/smithy-go v1.20.4 // indirect ) diff --git a/go.sum b/go.sum index a0bd7e8..3609e04 100644 --- a/go.sum +++ b/go.sum @@ -1,37 +1,10 @@ -github.com/aws/aws-sdk-go-v2 v1.18.1 h1:+tefE750oAb7ZQGzla6bLkOwfcQCEtC5y2RqoqCeqKo= -github.com/aws/aws-sdk-go-v2 v1.18.1/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= -github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+EkA= -github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= -github.com/aws/aws-sdk-go-v2 v1.27.2 h1:pLsTXqX93rimAOZG2FIYraDQstZaaGVVN4tNw65v0h8= -github.com/aws/aws-sdk-go-v2 v1.27.2/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34 h1:A5UqQEmPaCFpedKouS4v+dHCTUo2sKqhoKO9U5kxyWo= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34/go.mod h1:wZpTEecJe0Btj3IYnDx/VlUzor9wm3fJHyvLpQF0VwY= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 h1:aw39xVGeRWlWx9EzGVnhOR4yOjQDHPQ6o6NmBlscyQg= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5/go.mod h1:FSaRudD0dXiMPK2UjknVwwTYyZMRsHv3TtkabsZih5I= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.9 h1:cy8ahBJuhtM8GTTSyOkfy6WVPV1IE+SS5/wfXUYuulw= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.9/go.mod h1:CZBXGLaJnEZI6EVNcPd7a6B5IC5cA/GkRWtu9fp3S6Y= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28 h1:srIVS45eQuewqz6fKKu6ZGXaq6FuFg5NzgQBAM6g8Y4= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28/go.mod h1:7VRpKQQedkfIEXb4k52I7swUnZP0wohVajJMRn3vsUw= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 h1:PG1F3OD1szkuQPzDw3CIQsRIrtTlUC3lP84taWzHlq0= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5/go.mod h1:jU1li6RFryMz+so64PpKtudI+QzbKoIEivqdf6LNpOc= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.9 h1:A4SYk07ef04+vxZToz9LWvAXl9LW0NClpPpMsi31cz0= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.9/go.mod h1:5jJcHuwDagxN+ErjQ3PU3ocf6Ylc/p9x+BLO/+X4iXw= -github.com/aws/aws-sdk-go-v2/service/sqs v1.23.2 h1:Y2vfLiY3HmaMisuwx6fS2kMRYbajRXXB+9vesGVPseY= -github.com/aws/aws-sdk-go-v2/service/sqs v1.23.2/go.mod h1:TaV67b6JMD1988x/uMDop/JnMFK6v5d4Ru+sDmFg+ww= -github.com/aws/aws-sdk-go-v2/service/sqs v1.31.4 h1:mE2ysZMEeQ3ulHWs4mmc4fZEhOfeY1o6QXAfDqjbSgw= -github.com/aws/aws-sdk-go-v2/service/sqs v1.31.4/go.mod h1:lCN2yKnj+Sp9F6UzpoPPTir+tSaC9Jwf6LcmTqnXFZw= -github.com/aws/aws-sdk-go-v2/service/sqs v1.32.6 h1:FrGnU+Ggf+jUFj1O7Pdw5hCk42dmyO9TOTCVL7mDISk= -github.com/aws/aws-sdk-go-v2/service/sqs v1.32.6/go.mod h1:2Ef3ZgVWL7lyz5YZf854YkMboK6qF1NbG/0hc9StZsg= -github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8= -github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= -github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q= -github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= -github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= -github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +github.com/aws/aws-sdk-go-v2 v1.30.4 h1:frhcagrVNrzmT95RJImMHgabt99vkXGslubDaDagTk8= +github.com/aws/aws-sdk-go-v2 v1.30.4/go.mod h1:CT+ZPWXbYrci8chcARI3OmI/qgd+f6WtuLOoaIA8PR0= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 h1:TNyt/+X43KJ9IJJMjKfa3bNTiZbUP7DeCxfbTROESwY= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16/go.mod h1:2DwJF39FlNAUiX5pAc0UNeiz16lK2t7IaFcm0LFHEgc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 h1:jYfy8UPmd+6kJW5YhY0L1/KftReOGxI/4NtVSTh9O/I= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16/go.mod h1:7ZfEPZxkW42Afq4uQB8H2E2e6ebh6mXTueEpYzjCzcs= +github.com/aws/aws-sdk-go-v2/service/sqs v1.34.4 h1:FXPO72iKC5YmYNEANltl763bUj8A6qT20wx8Jwvxlsw= +github.com/aws/aws-sdk-go-v2/service/sqs v1.34.4/go.mod h1:7idt3XszF6sE9WPS1GqZRiDJOxw4oPtlRBXodWnCGjU= +github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4= +github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= From 1967ed6b95dbd5be3e04edf03ea814a4b31bea70 Mon Sep 17 00:00:00 2001 From: francescopepe <3891780+francescopepe@users.noreply.github.com> Date: Sat, 17 Aug 2024 12:02:53 +0200 Subject: [PATCH 05/18] feat: pass formigo message to handlers to get more info BREAKING CHANGE: messages passed to the handlers are not the queue messages, but formigo ones. --- README.md | 24 ++++++++++++------------ consumers.go | 22 ++++++++++++---------- internal/messages/messages.go | 15 ++++++++++++--- message.go | 9 +++++++++ sqs.go | 3 ++- 5 files changed, 47 insertions(+), 26 deletions(-) create mode 100644 message.go diff --git a/README.md b/README.md index 6d48cb5..c79bc3f 100644 --- a/README.md +++ b/README.md @@ -72,11 +72,11 @@ func main() { Client: sqsClient, Concurrency: 100, Consumer: formigo.NewSingleMessageConsumer(formigo.SingleMessageConsumerConfiguration{ - Handler: func(ctx context.Context, msg interface{}) error { - log.Println("Got Message", msgs) + Handler: func(ctx context.Context, msg formigo.Message) error { + log.Println("Got Message", msg.Content()) // Assert the type of message to get the body or any other attributes - log.Println("Message body", *msg.(types.Message).Body) + log.Println("Message body", *msg.Content().(types.Message).Body) return nil }, @@ -145,13 +145,13 @@ func main() { Size: 100, Timeout: time.Second * 5, }, - Handler: func(ctx context.Context, msgs []interface{}) error { + Handler: func(ctx context.Context, msgs []formigo.Message) error { log.Printf("Got %d messages to process\n", len(msgs) // Assert the type of message to get the body or any other attributes for i, msg := range msgs { - log.Printf("Message %d body: %s", i, *msg.(types.Message).Body) + log.Printf("Message %d body: %s", i, *msg.Content().(types.Message).Body) } return nil @@ -174,13 +174,13 @@ By processing messages in batches, the worker can significantly enhance throughp ## Configuration -| Configuration | Explanation | Default Value | -|-------------- | ----------- | ------------- | -| Client | The client is used for receiving messages from the queue and deleting them once they are processed correctly. This is a required configuration. | None | -| Concurrency | Number of Go routines that process the messages from the Queue. Higher values are useful for slow I/O operations in the consumer's handler. | 100 | -| Retrievers | Number of Go routines that retrieve messages from the Queue. Higher values are helpful for slow networks or when consumers are quicker. | 1 | -| ErrorConfig | Defines the error threshold and interval for worker termination and error reporting function. | None | -| Consumer | The message consumer, either SingleMessageConsumer or MultipleMessageConsumer. | None | +| Configuration | Explanation | Default Value | +|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------|---------------| +| Client | The client is used for receiving messages from the queue and deleting them once they are processed correctly. This is a required configuration. | None | +| Concurrency | Number of Go routines that process the messages from the Queue. Higher values are useful for slow I/O operations in the consumer's handler. | 100 | +| Retrievers | Number of Go routines that retrieve messages from the Queue. Higher values are helpful for slow networks or when consumers are quicker. | 1 | +| ErrorConfig | Defines the error threshold and interval for worker termination and error reporting function. | None | +| Consumer | The message consumer, either SingleMessageConsumer or MultipleMessageConsumer. | None | ## License diff --git a/consumers.go b/consumers.go index d2983a1..52a9472 100644 --- a/consumers.go +++ b/consumers.go @@ -10,8 +10,8 @@ import ( "github.com/francescopepe/formigo/internal/messages" ) -type singleMessageHandler = func(ctx context.Context, msg interface{}) error -type multiMessageHandler = func(ctx context.Context, msgs []interface{}) error +type singleMessageHandler = func(ctx context.Context, msg Message) error +type multiMessageHandler = func(ctx context.Context, msgs []Message) error // This means that the buffered messages didn't get passed to the handler within // the first message's timeout. @@ -62,7 +62,7 @@ func (c *singleMessageConsumer) processMessage(msg messages.Message) error { // Process Message return wrapHandler(func() error { - return c.handler(msg.Ctx, msg.Msg) + return c.handler(msg.Ctx, msg) }) } @@ -110,13 +110,15 @@ type multiMessageConsumer struct { } // It processes the messages and push them downstream for deletion. -func (c *multiMessageConsumer) processMessages(ctrl *controller, deleteCh chan<- messages.Message, ctx context.Context, messages []messages.Message) { - msgs := make([]interface{}, 0, len(messages)) - for _, msg := range messages { - msgs = append(msgs, msg.Msg) - } +func (c *multiMessageConsumer) processMessages(ctrl *controller, deleteCh chan<- messages.Message, ctx context.Context, msgs []messages.Message) { err := wrapHandler(func() error { - return c.handler(ctx, msgs) + // Convert slice to the abstraction + converted := make([]Message, len(msgs)) + for _, msg := range msgs { + converted = append(converted, msg) + } + + return c.handler(ctx, converted) }) if err != nil { ctrl.reportError(fmt.Errorf("failed to process messages: %w", err)) @@ -124,7 +126,7 @@ func (c *multiMessageConsumer) processMessages(ctrl *controller, deleteCh chan<- } // Push messages for deletion - for _, msg := range messages { + for _, msg := range msgs { deleteCh <- msg } } diff --git a/internal/messages/messages.go b/internal/messages/messages.go index d4b0aa1..c26acee 100644 --- a/internal/messages/messages.go +++ b/internal/messages/messages.go @@ -6,9 +6,18 @@ import ( ) type Message struct { - Ctx context.Context - CancelCtx context.CancelFunc - Msg interface{} + Ctx context.Context + CancelCtx context.CancelFunc + Msg interface{} + ReceivedTime time.Time +} + +func (m Message) Content() interface{} { + return m.Msg +} + +func (m Message) ReceivedAt() time.Time { + return m.ReceivedTime } type BufferConfiguration struct { diff --git a/message.go b/message.go new file mode 100644 index 0000000..7aaff57 --- /dev/null +++ b/message.go @@ -0,0 +1,9 @@ +package formigo + +import "time" + +type Message interface { + ReceivedAt() time.Time + Raw() interface{} + Content() interface{} +} diff --git a/sqs.go b/sqs.go index 01bbb2c..8bc82c7 100644 --- a/sqs.go +++ b/sqs.go @@ -82,7 +82,8 @@ func (c sqsClient) prepareMessagesForDeletion(messages []messages.Message) []typ func (c sqsClient) createMessage(sqsMessage types.Message) messages.Message { msg := messages.Message{ - Msg: sqsMessage, + Msg: sqsMessage, + ReceivedTime: time.Now(), } // Set a context with timeout From 0936e34ab6aa1d1529166a807b2160eac54fcb69 Mon Sep 17 00:00:00 2001 From: francescopepe <3891780+francescopepe@users.noreply.github.com> Date: Sat, 17 Aug 2024 18:08:27 +0200 Subject: [PATCH 06/18] chore: rename single message handler and consumer to message handler and consumer BREAKING CHANGE: single message handler and consumer renamed --- README.md | 4 ++-- config.go | 4 ++-- consumers.go | 18 ++++++++++-------- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index c79bc3f..9b37f3e 100644 --- a/README.md +++ b/README.md @@ -71,7 +71,7 @@ func main() { wkr := formigo.NewWorker(formigo.Configuration{ Client: sqsClient, Concurrency: 100, - Consumer: formigo.NewSingleMessageConsumer(formigo.SingleMessageConsumerConfiguration{ + Consumer: formigo.NewMessageConsumer(formigo.MessageConsumerConfiguration{ Handler: func(ctx context.Context, msg formigo.Message) error { log.Println("Got Message", msg.Content()) @@ -180,7 +180,7 @@ By processing messages in batches, the worker can significantly enhance throughp | Concurrency | Number of Go routines that process the messages from the Queue. Higher values are useful for slow I/O operations in the consumer's handler. | 100 | | Retrievers | Number of Go routines that retrieve messages from the Queue. Higher values are helpful for slow networks or when consumers are quicker. | 1 | | ErrorConfig | Defines the error threshold and interval for worker termination and error reporting function. | None | -| Consumer | The message consumer, either SingleMessageConsumer or MultipleMessageConsumer. | None | +| Consumer | The message consumer, either MessageConsumer or MultipleMessageConsumer. | None | ## License diff --git a/config.go b/config.go index bcd9848..83bb2e3 100644 --- a/config.go +++ b/config.go @@ -52,8 +52,8 @@ type MultiMessageBufferConfiguration struct { Timeout time.Duration } -type SingleMessageConsumerConfiguration struct { - Handler singleMessageHandler +type MessageConsumerConfiguration struct { + Handler messageHandler } type MultiMessageConsumerConfiguration struct { diff --git a/consumers.go b/consumers.go index 52a9472..da5a31a 100644 --- a/consumers.go +++ b/consumers.go @@ -12,6 +12,7 @@ import ( type singleMessageHandler = func(ctx context.Context, msg Message) error type multiMessageHandler = func(ctx context.Context, msgs []Message) error +type messageHandler = func(ctx context.Context, msg Message) error // This means that the buffered messages didn't get passed to the handler within // the first message's timeout. @@ -49,15 +50,15 @@ func wrapHandler(handler func() error) (err error) { return err } -// singleMessageConsumer defines a message handler that consumes only one message at a +// messageConsumer defines a message handler that consumes only one message at a // time. // It can be useful when the workload is specific per message, for example for sending // an email. -type singleMessageConsumer struct { - handler singleMessageHandler +type messageConsumer struct { + handler messageHandler } -func (c *singleMessageConsumer) processMessage(msg messages.Message) error { +func (c *messageConsumer) processMessage(msg messages.Message) error { defer msg.CancelCtx() // This must be called to release resources associated with the context. // Process Message @@ -68,12 +69,12 @@ func (c *singleMessageConsumer) processMessage(msg messages.Message) error { // Consumes and deletes a single message, it stops only when the `messageCh` gets closed // and doesn't have any messages in it. -func (c *singleMessageConsumer) consume(concurrency int, ctrl *controller, messageCh <-chan messages.Message, deleteCh chan<- messages.Message) { +func (c *messageConsumer) consume(concurrency int, ctrl *controller, messageCh <-chan messages.Message, deleteCh chan<- messages.Message) { consumers := makeAvailableConsumers(concurrency) var wg sync.WaitGroup for msg := range messageCh { - <-consumers // Use an available comsumer + <-consumers // Use an available consumer wg.Add(1) go func(message messages.Message) { @@ -96,8 +97,8 @@ func (c *singleMessageConsumer) consume(concurrency int, ctrl *controller, messa wg.Wait() } -func NewSingleMessageConsumer(config SingleMessageConsumerConfiguration) *singleMessageConsumer { - return &singleMessageConsumer{ +func NewMessageConsumer(config MessageConsumerConfiguration) *messageConsumer { + return &messageConsumer{ handler: config.Handler, } } @@ -229,4 +230,5 @@ func NewMultiMessageConsumer(config MultiMessageConsumerConfiguration) *multiMes var ( _ consumer = (*singleMessageConsumer)(nil) _ consumer = (*multiMessageConsumer)(nil) + _ consumer = (*messageConsumer)(nil) ) From a9f90dbd75c66ea4beeadcdb412891265f9208bc Mon Sep 17 00:00:00 2001 From: francescopepe <3891780+francescopepe@users.noreply.github.com> Date: Sat, 17 Aug 2024 18:26:28 +0200 Subject: [PATCH 07/18] feat: add batch response to support partial failures BREAKING CHANGE: multiMessageHandler renamed to batchHandler. It now requires a BatchResponse --- README.md | 8 ++-- config.go | 10 ++--- consumers.go | 71 ++++++++++++++++++++++++----------- internal/messages/messages.go | 7 +++- message.go | 2 +- sqs.go | 1 + 6 files changed, 66 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index 9b37f3e..2808923 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ Formigo is a powerful and flexible Golang library designed to simplify the proce - **Efficient Throughput Management**: it offers optimal throughput management, allowing you to fine-tune the number of Go routines responsible for both polling messages from the queue and processing them. This dynamic control ensures maximum efficiency in various scenarios, making the library highly adaptable to your application's needs. -- **Configurable Batch Processing**: it uses powerful batch processing capabilities, enabling you to handle messages efficiently in customizable batches. With the Multiple Message Handler, messages can be processed in batches of a size you define, granting you full control over the processing logic. Moreover, you can adjust the batch buffer size and timeout settings, providing a flexible and optimal solution to process messages under various workloads. +- **Configurable Batch Processing**: it uses powerful batch processing capabilities, enabling you to handle messages efficiently in customizable batches. With the Batch Handler, messages can be processed in batches of a size you define, granting you full control over the processing logic. Moreover, you can adjust the batch buffer size and timeout settings, providing a flexible and optimal solution to process messages under various workloads. - **Context Cancellation**: Effortlessly stop the QueueWorker by canceling its context. This feature guarantees smooth and controlled termination of the worker whenever required. @@ -140,8 +140,8 @@ func main() { wkr := formigo.NewWorker(formigo.Configuration{ Client: sqsClient, Concurrency: 100, - Consumer: formigo.NewMultiMessageConsumer(formigo.MultiMessageConsumerConfiguration{ - BufferConfig: formigo.MultiMessageBufferConfiguration{ + Consumer: formigo.BatchConsumer(formigo.BatchConsumerConfiguration{ + BufferConfig: formigo.BatchBufferConfiguration{ Size: 100, Timeout: time.Second * 5, }, @@ -180,7 +180,7 @@ By processing messages in batches, the worker can significantly enhance throughp | Concurrency | Number of Go routines that process the messages from the Queue. Higher values are useful for slow I/O operations in the consumer's handler. | 100 | | Retrievers | Number of Go routines that retrieve messages from the Queue. Higher values are helpful for slow networks or when consumers are quicker. | 1 | | ErrorConfig | Defines the error threshold and interval for worker termination and error reporting function. | None | -| Consumer | The message consumer, either MessageConsumer or MultipleMessageConsumer. | None | +| Consumer | The message consumer, either MessageConsumer or BatchConsumer. | None | ## License diff --git a/config.go b/config.go index 83bb2e3..507b644 100644 --- a/config.go +++ b/config.go @@ -37,9 +37,9 @@ type ErrorConfiguration struct { ReportFunc func(err error) bool } -// The MultiMessageBufferConfiguration defines a buffer which is consumed by the worker when either +// The BatchConsumerBufferConfiguration defines a buffer which is consumed by the worker when either // the buffer is full or the timeout has passed since the first message got added. -type MultiMessageBufferConfiguration struct { +type BatchConsumerBufferConfiguration struct { // Max number of messages that the buffer can contain. // Default: 10. Size int @@ -56,9 +56,9 @@ type MessageConsumerConfiguration struct { Handler messageHandler } -type MultiMessageConsumerConfiguration struct { - Handler multiMessageHandler - BufferConfig MultiMessageBufferConfiguration +type BatchConsumerConfiguration struct { + Handler batchHandler + BufferConfig BatchConsumerBufferConfiguration } type Configuration struct { diff --git a/consumers.go b/consumers.go index da5a31a..7cf8ebe 100644 --- a/consumers.go +++ b/consumers.go @@ -10,9 +10,12 @@ import ( "github.com/francescopepe/formigo/internal/messages" ) -type singleMessageHandler = func(ctx context.Context, msg Message) error -type multiMessageHandler = func(ctx context.Context, msgs []Message) error +type BatchResponse struct { + FailedMessagesId []interface{} +} + type messageHandler = func(ctx context.Context, msg Message) error +type batchHandler = func(ctx context.Context, msgs []Message) (BatchResponse, error) // This means that the buffered messages didn't get passed to the handler within // the first message's timeout. @@ -103,31 +106,35 @@ func NewMessageConsumer(config MessageConsumerConfiguration) *messageConsumer { } } -// multiMessageConsumer allows to process multiple messages at a time. This can be useful +// batchConsumer allows to process multiple messages at a time. This can be useful // for batch updates or use cases with high throughput. -type multiMessageConsumer struct { - handler multiMessageHandler - bufferConfig MultiMessageBufferConfiguration +type batchConsumer struct { + handler batchHandler + bufferConfig BatchConsumerBufferConfiguration } // It processes the messages and push them downstream for deletion. -func (c *multiMessageConsumer) processMessages(ctrl *controller, deleteCh chan<- messages.Message, ctx context.Context, msgs []messages.Message) { - err := wrapHandler(func() error { - // Convert slice to the abstraction - converted := make([]Message, len(msgs)) - for _, msg := range msgs { - converted = append(converted, msg) +func (c *batchConsumer) processMessages(ctrl *controller, deleteCh chan<- messages.Message, ctx context.Context, msgs []messages.Message) { + defer func() { + if r := recover(); r != nil { + ctrl.reportError(fmt.Errorf("panic error: %s", r)) } + }() - return c.handler(ctx, converted) - }) + // Convert slice to the abstraction + converted := make([]Message, 0, len(msgs)) + for _, msg := range msgs { + converted = append(converted, msg) + } + + resp, err := c.handler(ctx, converted) if err != nil { - ctrl.reportError(fmt.Errorf("failed to process messages: %w", err)) - return + ctrl.reportError(fmt.Errorf("failed to process batch: %w", err)) } + toDelete := c.buildMessagesToDeleteFromBatchResponse(msgs, resp) // Push messages for deletion - for _, msg := range msgs { + for _, msg := range toDelete { deleteCh <- msg } } @@ -135,7 +142,7 @@ func (c *multiMessageConsumer) processMessages(ctrl *controller, deleteCh chan<- // Consumes and deletes a number of messages in the interval [1, N] based on configuration // provided in the BufferConfiguration. // It stops only when the messageCh gets closed and doesn't have any messages in it. -func (c *multiMessageConsumer) consume(concurrency int, ctrl *controller, messageCh <-chan messages.Message, deleteCh chan<- messages.Message) { +func (c *batchConsumer) consume(concurrency int, ctrl *controller, messageCh <-chan messages.Message, deleteCh chan<- messages.Message) { consumers := makeAvailableConsumers(concurrency) // Create buffer @@ -211,7 +218,28 @@ func (c *multiMessageConsumer) consume(concurrency int, ctrl *controller, messag wg.Wait() } -func NewMultiMessageConsumer(config MultiMessageConsumerConfiguration) *multiMessageConsumer { +func (c *batchConsumer) buildMessagesToDeleteFromBatchResponse(msgs []messages.Message, resp BatchResponse) []messages.Message { + if len(resp.FailedMessagesId) == 0 { + return msgs + } + + toDelete := make([]messages.Message, 0, len(msgs)) + + failedMessagesIdIndexed := make(map[interface{}]struct{}, len(resp.FailedMessagesId)) + for _, id := range resp.FailedMessagesId { + failedMessagesIdIndexed[id] = struct{}{} + } + + for _, msg := range msgs { + if _, ok := failedMessagesIdIndexed[msg.Id()]; !ok { + toDelete = append(toDelete, msg) + } + } + + return toDelete +} + +func NewBatchConsumer(config BatchConsumerConfiguration) *batchConsumer { if config.BufferConfig.Size == 0 { config.BufferConfig.Size = 10 } @@ -220,7 +248,7 @@ func NewMultiMessageConsumer(config MultiMessageConsumerConfiguration) *multiMes config.BufferConfig.Timeout = time.Second } - return &multiMessageConsumer{ + return &batchConsumer{ handler: config.Handler, bufferConfig: config.BufferConfig, } @@ -228,7 +256,6 @@ func NewMultiMessageConsumer(config MultiMessageConsumerConfiguration) *multiMes // Interface guards var ( - _ consumer = (*singleMessageConsumer)(nil) - _ consumer = (*multiMessageConsumer)(nil) _ consumer = (*messageConsumer)(nil) + _ consumer = (*batchConsumer)(nil) ) diff --git a/internal/messages/messages.go b/internal/messages/messages.go index c26acee..3e4a882 100644 --- a/internal/messages/messages.go +++ b/internal/messages/messages.go @@ -9,9 +9,14 @@ type Message struct { Ctx context.Context CancelCtx context.CancelFunc Msg interface{} + MsgId interface{} ReceivedTime time.Time } +func (m Message) Id() interface{} { + return m.MsgId +} + func (m Message) Content() interface{} { return m.Msg } @@ -145,7 +150,7 @@ func (b *BufferWithContextTimeout) Add(msg Message) { // Reset resets its internal buffer, cancel the current context created and // reset any timeout. -// It's important to call this function avoid memory leaks. In fact, the +// It's important to call this function to avoid memory leaks. In fact, the // GC won't collect any timer or resources allocated within the context. // NOTE: this function should be always called to clean up any buffer // created. Used in defer can guarantee that it always run. diff --git a/message.go b/message.go index 7aaff57..d1b118a 100644 --- a/message.go +++ b/message.go @@ -4,6 +4,6 @@ import "time" type Message interface { ReceivedAt() time.Time - Raw() interface{} Content() interface{} + Id() interface{} } diff --git a/sqs.go b/sqs.go index 8bc82c7..9beb547 100644 --- a/sqs.go +++ b/sqs.go @@ -82,6 +82,7 @@ func (c sqsClient) prepareMessagesForDeletion(messages []messages.Message) []typ func (c sqsClient) createMessage(sqsMessage types.Message) messages.Message { msg := messages.Message{ + MsgId: *sqsMessage.MessageId, Msg: sqsMessage, ReceivedTime: time.Now(), } From ed8a2bca3d7a726759f4646a8196e48d7eed9c6b Mon Sep 17 00:00:00 2001 From: francescopepe <3891780+francescopepe@users.noreply.github.com> Date: Sat, 17 Aug 2024 18:47:25 +0200 Subject: [PATCH 08/18] ci: update go version --- .github/workflows/lint.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 5ff49ea..ff9ecd7 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -18,7 +18,7 @@ jobs: - uses: actions/checkout@v3 - uses: actions/setup-go@v4 with: - go-version: '1.22' + go-version-file: 'go.mod' cache: false - name: golangci-lint uses: golangci/golangci-lint-action@v3 @@ -32,7 +32,7 @@ jobs: - name: govulncheck uses: golang/govulncheck-action@v1 with: - go-version-input: '~1.22.0' + go-version-file: 'go.mod' check-latest: true commitlint: From 32ae9a6f899f9f89c9af08a99fda4ce5782d1b19 Mon Sep 17 00:00:00 2001 From: francescopepe <3891780+francescopepe@users.noreply.github.com> Date: Sat, 17 Aug 2024 18:49:48 +0200 Subject: [PATCH 09/18] chore: update go version in go.mod --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 822ea9b..4307c56 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/francescopepe/formigo -go 1.20 +go 1.21 require github.com/aws/aws-sdk-go-v2/service/sqs v1.34.4 From 9d72b46bdfc077cb673b8ea0f4ae53b56b362262 Mon Sep 17 00:00:00 2001 From: Francesco Pepe <3891780+francescopepe@users.noreply.github.com> Date: Sun, 18 Aug 2024 17:47:44 +0200 Subject: [PATCH 10/18] feat: make formigo.Message JSON encodable --- internal/messages/messages.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/messages/messages.go b/internal/messages/messages.go index 3e4a882..ba91502 100644 --- a/internal/messages/messages.go +++ b/internal/messages/messages.go @@ -6,11 +6,11 @@ import ( ) type Message struct { - Ctx context.Context - CancelCtx context.CancelFunc - Msg interface{} - MsgId interface{} - ReceivedTime time.Time + Ctx context.Context `json:"-"` // Exclude from JSON + CancelCtx context.CancelFunc `json:"-"` // Exclude from JSON + MsgId interface{} `json:"id"` + Msg interface{} `json:"content"` + ReceivedTime time.Time `json:"receivedAt"` } func (m Message) Id() interface{} { From 0ffeeae1d53a868f01e5c1cdd8de89390bcf16ea Mon Sep 17 00:00:00 2001 From: francescopepe <3891780+francescopepe@users.noreply.github.com> Date: Tue, 27 Aug 2024 23:35:11 +0200 Subject: [PATCH 11/18] feat: make the retriever stop immediately if the context is canceled --- go.mod | 8 +++++++- go.sum | 12 ++++++++++++ internal/client/client.go | 8 ++++++-- retriever.go | 8 +++++++- sqs.go | 4 ++-- 5 files changed, 34 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 4307c56..f22b95a 100644 --- a/go.mod +++ b/go.mod @@ -2,11 +2,17 @@ module github.com/francescopepe/formigo go 1.21 -require github.com/aws/aws-sdk-go-v2/service/sqs v1.34.4 +require ( + github.com/aws/aws-sdk-go-v2/service/sqs v1.34.5 + github.com/stretchr/testify v1.9.0 +) require ( github.com/aws/aws-sdk-go-v2 v1.30.4 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 // indirect github.com/aws/smithy-go v1.20.4 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 3609e04..c419e11 100644 --- a/go.sum +++ b/go.sum @@ -6,5 +6,17 @@ github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 h1:jYfy8UPmd+6kJW5YhY github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16/go.mod h1:7ZfEPZxkW42Afq4uQB8H2E2e6ebh6mXTueEpYzjCzcs= github.com/aws/aws-sdk-go-v2/service/sqs v1.34.4 h1:FXPO72iKC5YmYNEANltl763bUj8A6qT20wx8Jwvxlsw= github.com/aws/aws-sdk-go-v2/service/sqs v1.34.4/go.mod h1:7idt3XszF6sE9WPS1GqZRiDJOxw4oPtlRBXodWnCGjU= +github.com/aws/aws-sdk-go-v2/service/sqs v1.34.5 h1:HYyVDOC2/PIg+3oBX1q0wtDU5kONki6lrgIG0afrBkY= +github.com/aws/aws-sdk-go-v2/service/sqs v1.34.5/go.mod h1:7idt3XszF6sE9WPS1GqZRiDJOxw4oPtlRBXodWnCGjU= github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4= github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/client/client.go b/internal/client/client.go index 9e24aeb..7b4c7b0 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -1,9 +1,13 @@ package client -import "github.com/francescopepe/formigo/internal/messages" +import ( + "context" + + "github.com/francescopepe/formigo/internal/messages" +) type MessageReceiver interface { - ReceiveMessages() ([]messages.Message, error) + ReceiveMessages(ctx context.Context) ([]messages.Message, error) } type MessageDeleter interface { diff --git a/retriever.go b/retriever.go index d0939fc..3765e74 100644 --- a/retriever.go +++ b/retriever.go @@ -17,8 +17,14 @@ func retriever(ctx context.Context, receiver client.MessageReceiver, ctrl *contr case <-ctx.Done(): return default: - msgs, err := receiver.ReceiveMessages() + msgs, err := receiver.ReceiveMessages(ctx) if err != nil { + if errors.Is(err, context.Canceled) && errors.Is(ctx.Err(), context.Canceled) { + // The worker's context was canceled. We can exit. + return + } + + // Report the error to the controller and continue. ctrl.reportError(fmt.Errorf("unable to receive message: %w", err)) continue } diff --git a/sqs.go b/sqs.go index 9beb547..1b09a30 100644 --- a/sqs.go +++ b/sqs.go @@ -35,8 +35,8 @@ type sqsClient struct { messageCtxTimeout time.Duration } -func (c sqsClient) ReceiveMessages() ([]messages.Message, error) { - out, err := c.svc.ReceiveMessage(context.Background(), c.receiveMessageInput) +func (c sqsClient) ReceiveMessages(ctx context.Context) ([]messages.Message, error) { + out, err := c.svc.ReceiveMessage(ctx, c.receiveMessageInput) if err != nil { return nil, fmt.Errorf("unable to receive messages: %w", err) } From 01021115ac15f8cf3edef45b4a8b92df033e42bf Mon Sep 17 00:00:00 2001 From: francescopepe <3891780+francescopepe@users.noreply.github.com> Date: Tue, 27 Aug 2024 23:35:37 +0200 Subject: [PATCH 12/18] ci: add some simple tests and workflow --- .github/workflows/test.yml | 19 +++ worker_test.go | 250 +++++++++++++++++++++++++++++++++++++ 2 files changed, 269 insertions(+) create mode 100644 .github/workflows/test.yml create mode 100644 worker_test.go diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..21744eb --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,19 @@ +name: Tests + +on: + push: + branches: + - main + pull_request: + +jobs: + tests: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-go@v4 + with: + go-version-file: 'go.mod' + cache: false + - name: Run tests + run: go test ./... diff --git a/worker_test.go b/worker_test.go new file mode 100644 index 0000000..7731880 --- /dev/null +++ b/worker_test.go @@ -0,0 +1,250 @@ +package formigo + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/francescopepe/formigo/internal/messages" + "github.com/stretchr/testify/assert" +) + +type SimpleInMemoryBrokerMessage struct { + messageId string + body string + deleteReqCh chan struct{} + deleteAckCh chan struct{} + timer *time.Timer +} + +type SimpleInMemoryBroker struct { + visibilityTimeout time.Duration + queue chan *SimpleInMemoryBrokerMessage + inFlights chan *SimpleInMemoryBrokerMessage + expired chan *SimpleInMemoryBrokerMessage + + statics struct { + rwMutex sync.RWMutex + enqueuedMessages int + inFlightMessages int + } +} + +func NewSimpleInMemoryBroker(visibilityTimeout time.Duration) *SimpleInMemoryBroker { + return &SimpleInMemoryBroker{ + visibilityTimeout: visibilityTimeout, + queue: make(chan *SimpleInMemoryBrokerMessage, 1000), + inFlights: make(chan *SimpleInMemoryBrokerMessage), + expired: make(chan *SimpleInMemoryBrokerMessage, 1000), + } +} + +func (b *SimpleInMemoryBroker) run(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case msg := <-b.inFlights: + go func(ctx context.Context) { + select { + case <-ctx.Done(): + return + case <-msg.deleteReqCh: + msg.deleteAckCh <- struct{}{} + case <-msg.timer.C: + b.expired <- msg + } + }(ctx) + } + } +} + +func (b *SimpleInMemoryBroker) AddMessages(msgs []*SimpleInMemoryBrokerMessage) { + for _, msg := range msgs { + b.queue <- msg + b.statics.rwMutex.Lock() + b.statics.enqueuedMessages++ + b.statics.rwMutex.Unlock() + } +} + +func (b *SimpleInMemoryBroker) DeleteMessages(msgs []messages.Message) error { + requestTimer := time.NewTimer(time.Second * 5) + defer requestTimer.Stop() + + for _, msg := range msgs { + brokerMsg := msg.Content().(*SimpleInMemoryBrokerMessage) + + select { + case <-requestTimer.C: + return fmt.Errorf("failed to delete message %s: request timeout", brokerMsg.messageId) + case brokerMsg.deleteReqCh <- struct{}{}: + } + + if !brokerMsg.timer.Stop() { + return fmt.Errorf("failed to delete message %s: visibility timeout exipired", brokerMsg.messageId) + } + + <-brokerMsg.deleteAckCh + + b.statics.rwMutex.Lock() + b.statics.inFlightMessages-- + b.statics.rwMutex.Unlock() + } + + return nil +} + +func (b *SimpleInMemoryBroker) ReceiveMessages(ctx context.Context) ([]messages.Message, error) { + var polledMessage *SimpleInMemoryBrokerMessage + select { + case polledMessage = <-b.expired: + default: + timer := time.NewTimer(time.Millisecond * 500) + defer timer.Stop() + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-timer.C: + return nil, nil + case polledMessage = <-b.expired: + case polledMessage = <-b.queue: + } + } + + polledMessage.timer = time.NewTimer(b.visibilityTimeout) + polledMessage.deleteReqCh = make(chan struct{}) + polledMessage.deleteAckCh = make(chan struct{}) + + time.After(time.Millisecond * 5) + + msg := messages.Message{ + MsgId: polledMessage.messageId, + Msg: polledMessage, + ReceivedTime: time.Now(), + } + + // Set a context with timeout + msg.Ctx, msg.CancelCtx = context.WithTimeout(context.Background(), b.visibilityTimeout) + + // Move the message to inflight + b.inFlights <- polledMessage + b.statics.rwMutex.Lock() + b.statics.enqueuedMessages-- + b.statics.inFlightMessages++ + b.statics.rwMutex.Unlock() + + return []messages.Message{msg}, nil +} + +func (b *SimpleInMemoryBroker) EnqueuedMessages() int { + b.statics.rwMutex.RLock() + defer b.statics.rwMutex.RUnlock() + return b.statics.enqueuedMessages +} + +func (b *SimpleInMemoryBroker) InFlightMessages() int { + b.statics.rwMutex.RLock() + defer b.statics.rwMutex.RUnlock() + return b.statics.inFlightMessages +} + +func TestWorker(t *testing.T) { + inMemoryBroker := NewSimpleInMemoryBroker(time.Second * 10) + go inMemoryBroker.run(context.Background()) + + t.Run("can receive a message", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + msgs := []*SimpleInMemoryBrokerMessage{ + { + messageId: "1", + body: "Hello, world!", + }, + } + + inMemoryBroker.AddMessages(msgs) + + wkr := NewWorker(Configuration{ + Client: inMemoryBroker, + Concurrency: 1, + Retrievers: 1, + ErrorConfig: ErrorConfiguration{ + ReportFunc: func(err error) bool { + t.Fatalf("unexpected error: %v", err) + return true + }, + }, + Consumer: NewMessageConsumer(MessageConsumerConfiguration{ + Handler: func(ctx context.Context, msg Message) error { + defer cancel() + + assert.Equal(t, "Hello, world!", msg.Content().(*SimpleInMemoryBrokerMessage).body) + + return nil + }, + }), + }) + + assert.NoError(t, wkr.Run(ctx)) + assert.Equal(t, 0, inMemoryBroker.EnqueuedMessages()) + assert.Equal(t, 0, inMemoryBroker.InFlightMessages()) + }) + + t.Run("can receive a batch of messages", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + msgs := []*SimpleInMemoryBrokerMessage{ + { + messageId: "1", + body: "Hello, world 1!", + }, + { + messageId: "2", + body: "Hello, world 2!", + }, + { + messageId: "3", + body: "Hello, world 3!", + }, + } + + inMemoryBroker.AddMessages(msgs) + + wkr := NewWorker(Configuration{ + Client: inMemoryBroker, + Concurrency: 1, + Retrievers: 1, + ErrorConfig: ErrorConfiguration{ + ReportFunc: func(err error) bool { + t.Fatalf("unexpected error: %v", err) + return true + }, + }, + Consumer: NewBatchConsumer(BatchConsumerConfiguration{ + BufferConfig: BatchConsumerBufferConfiguration{ + Size: 3, + Timeout: time.Second, + }, + Handler: func(ctx context.Context, msgs []Message) (BatchResponse, error) { + defer cancel() + + if len(msgs) < 3 { + t.Fatalf("expected 3 messages, got %d", len(msgs)) + } + + assert.Equal(t, "Hello, world 1!", msgs[0].Content().(*SimpleInMemoryBrokerMessage).body) + assert.Equal(t, "Hello, world 2!", msgs[1].Content().(*SimpleInMemoryBrokerMessage).body) + assert.Equal(t, "Hello, world 3!", msgs[2].Content().(*SimpleInMemoryBrokerMessage).body) + + return BatchResponse{}, nil + }, + }), + }) + + assert.NoError(t, wkr.Run(ctx)) + assert.Equal(t, 0, inMemoryBroker.EnqueuedMessages()) + assert.Equal(t, 0, inMemoryBroker.InFlightMessages()) + }) +} From 3e4b72c249da55eb5aeaa8b3896f79b23458b772 Mon Sep 17 00:00:00 2001 From: francescopepe <3891780+francescopepe@users.noreply.github.com> Date: Sun, 15 Sep 2024 19:58:23 +0200 Subject: [PATCH 13/18] feat: export Worker, Consumer and Handlers --- config.go | 6 +++--- consumers.go | 14 +++++++------- worker.go | 14 +++++++------- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/config.go b/config.go index 507b644..86233ea 100644 --- a/config.go +++ b/config.go @@ -53,11 +53,11 @@ type BatchConsumerBufferConfiguration struct { } type MessageConsumerConfiguration struct { - Handler messageHandler + Handler MessageHandler } type BatchConsumerConfiguration struct { - Handler batchHandler + Handler BatchHandler BufferConfig BatchConsumerBufferConfiguration } @@ -83,7 +83,7 @@ type Configuration struct { ErrorConfig ErrorConfiguration // The messages Consumer. - Consumer consumer + Consumer Consumer // Configuration for the deleter DeleterConfig DeleterConfiguration diff --git a/consumers.go b/consumers.go index 7cf8ebe..f9231da 100644 --- a/consumers.go +++ b/consumers.go @@ -14,8 +14,8 @@ type BatchResponse struct { FailedMessagesId []interface{} } -type messageHandler = func(ctx context.Context, msg Message) error -type batchHandler = func(ctx context.Context, msgs []Message) (BatchResponse, error) +type MessageHandler = func(ctx context.Context, msg Message) error +type BatchHandler = func(ctx context.Context, msgs []Message) (BatchResponse, error) // This means that the buffered messages didn't get passed to the handler within // the first message's timeout. @@ -25,7 +25,7 @@ type batchHandler = func(ctx context.Context, msgs []Message) (BatchResponse, er // - Consumer to slow var errBufferCtxExpired = errors.New("buffer context expired, buffer will Reset") -type consumer interface { +type Consumer interface { consume(concurrency int, ctrl *controller, messageCh <-chan messages.Message, deleteCh chan<- messages.Message) } @@ -58,7 +58,7 @@ func wrapHandler(handler func() error) (err error) { // It can be useful when the workload is specific per message, for example for sending // an email. type messageConsumer struct { - handler messageHandler + handler MessageHandler } func (c *messageConsumer) processMessage(msg messages.Message) error { @@ -109,7 +109,7 @@ func NewMessageConsumer(config MessageConsumerConfiguration) *messageConsumer { // batchConsumer allows to process multiple messages at a time. This can be useful // for batch updates or use cases with high throughput. type batchConsumer struct { - handler batchHandler + handler BatchHandler bufferConfig BatchConsumerBufferConfiguration } @@ -256,6 +256,6 @@ func NewBatchConsumer(config BatchConsumerConfiguration) *batchConsumer { // Interface guards var ( - _ consumer = (*messageConsumer)(nil) - _ consumer = (*batchConsumer)(nil) + _ Consumer = (*messageConsumer)(nil) + _ Consumer = (*batchConsumer)(nil) ) diff --git a/worker.go b/worker.go index 9621dbf..4ffe257 100644 --- a/worker.go +++ b/worker.go @@ -9,16 +9,16 @@ import ( "github.com/francescopepe/formigo/internal/messages" ) -type worker struct { +type Worker struct { client client.Client concurrency int retrievers int errorConfig ErrorConfiguration - consumer consumer + consumer Consumer deleterConfig DeleterConfiguration } -func (w worker) Run(ctx context.Context) error { +func (w Worker) Run(ctx context.Context) error { // Create a new context with a cancel function used to stop the worker from the // controller in case too many errors occur. ctx, cancel := context.WithCancelCause(ctx) @@ -60,7 +60,7 @@ func (w worker) Run(ctx context.Context) error { // It returns a channel where the messages will be published and, only when all the // retrievers have stopped, it will close it to broadcast the signal to stop to the // consumers. -func (w worker) runRetrievers(ctx context.Context, ctrl *controller) <-chan messages.Message { +func (w Worker) runRetrievers(ctx context.Context, ctrl *controller) <-chan messages.Message { messageCh := make(chan messages.Message) var wg sync.WaitGroup @@ -84,7 +84,7 @@ func (w worker) runRetrievers(ctx context.Context, ctrl *controller) <-chan mess // It returns a channel where the messages will be published for deletion and, // only when the consumer has stopped, it will close it to broadcast the // signal to stop to the deleter. -func (w worker) runConsumer(ctrl *controller, messageCh <-chan messages.Message) <-chan messages.Message { +func (w Worker) runConsumer(ctrl *controller, messageCh <-chan messages.Message) <-chan messages.Message { deleteCh := make(chan messages.Message) go func() { @@ -96,10 +96,10 @@ func (w worker) runConsumer(ctrl *controller, messageCh <-chan messages.Message) return deleteCh } -func NewWorker(config Configuration) worker { +func NewWorker(config Configuration) Worker { config = setWorkerConfigValues(config) - return worker{ + return Worker{ client: config.Client, concurrency: config.Concurrency, retrievers: config.Retrievers, From a1a0e54ff553d3035cb417a78e7c326a546ae02b Mon Sep 17 00:00:00 2001 From: francescopepe <3891780+francescopepe@users.noreply.github.com> Date: Tue, 22 Oct 2024 15:51:19 +0200 Subject: [PATCH 14/18] chore: retract v1.0.0 --- go.mod | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go.mod b/go.mod index f22b95a..3612c7b 100644 --- a/go.mod +++ b/go.mod @@ -16,3 +16,5 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +retract v1.0.0 // Accidentally added From e361d826b4d51d1e47cc7560c403740f4b476111 Mon Sep 17 00:00:00 2001 From: Luke Morrigan Date: Thu, 20 Feb 2025 14:33:02 +0000 Subject: [PATCH 15/18] chore: retract v1.0.0 correctly --- go.mod | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 3612c7b..c07aec7 100644 --- a/go.mod +++ b/go.mod @@ -17,4 +17,7 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) -retract v1.0.0 // Accidentally added +retract ( + v1.0.0 // Accidentally added + v1.0.1 // Contains retractions only. +) From f7ec269854454ad5b49c5ffb1ef6382e15bbdbc0 Mon Sep 17 00:00:00 2001 From: Richard Toms Date: Thu, 15 Feb 2024 09:42:32 +0000 Subject: [PATCH 16/18] chore: rename forked module for internal use --- README.md | 10 ++++++---- config.go | 2 +- consumers.go | 2 +- deleter.go | 4 ++-- go.mod | 2 +- internal/client/client.go | 2 +- retriever.go | 4 ++-- sqs.go | 4 ++-- worker.go | 4 ++-- 9 files changed, 18 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 2808923..69db507 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ Make sure you have Go installed [download](https://go.dev/dl/) Initialize your project by creating a folder and then running `go mod init github.com/your/repo` inside the folder. Then install the library with the [`go get`](https://pkg.go.dev/cmd/go/#hdr-Add_dependencies_to_current_module_and_install_them) command: ```bash -go get -u github.com/francescopepe/formigo +go get -u github.com/Pod-Point/go-queue-worker ``` ## Examples @@ -37,8 +37,9 @@ import ( "context" "fmt" "log" - - "github.com/francescopepe/formigo" + + "github.com/Pod-Point/go-queue-worker" + workerSqs "github.com/Pod-Point/go-queue-worker/clients/sqs" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" @@ -107,7 +108,8 @@ import ( "fmt" "log" - "github.com/francescopepe/formigo" + "github.com/Pod-Point/go-queue-worker" + workerSqs "github.com/Pod-Point/go-queue-worker/clients/sqs" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" diff --git a/config.go b/config.go index 86233ea..0fe18e7 100644 --- a/config.go +++ b/config.go @@ -4,7 +4,7 @@ import ( "log" "time" - "github.com/francescopepe/formigo/internal/client" + "github.com/Pod-Point/go-queue-worker/internal/client" ) const ( diff --git a/consumers.go b/consumers.go index f9231da..fdd90e1 100644 --- a/consumers.go +++ b/consumers.go @@ -7,7 +7,7 @@ import ( "sync" "time" - "github.com/francescopepe/formigo/internal/messages" + "github.com/Pod-Point/go-queue-worker/internal/messages" ) type BatchResponse struct { diff --git a/deleter.go b/deleter.go index 289037b..3a180d7 100644 --- a/deleter.go +++ b/deleter.go @@ -4,8 +4,8 @@ import ( "fmt" "sync" - "github.com/francescopepe/formigo/internal/client" - "github.com/francescopepe/formigo/internal/messages" + "github.com/Pod-Point/go-queue-worker/internal/client" + "github.com/Pod-Point/go-queue-worker/internal/messages" ) // deleter will delete messages from SQS until the delete channel gets closed. diff --git a/go.mod b/go.mod index c07aec7..b310d80 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/francescopepe/formigo +module github.com/Pod-Point/go-queue-worker go 1.21 diff --git a/internal/client/client.go b/internal/client/client.go index 7b4c7b0..9db03d5 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -3,7 +3,7 @@ package client import ( "context" - "github.com/francescopepe/formigo/internal/messages" + "github.com/Pod-Point/go-queue-worker/internal/messages" ) type MessageReceiver interface { diff --git a/retriever.go b/retriever.go index 3765e74..bde055c 100644 --- a/retriever.go +++ b/retriever.go @@ -5,8 +5,8 @@ import ( "errors" "fmt" - "github.com/francescopepe/formigo/internal/client" - "github.com/francescopepe/formigo/internal/messages" + "github.com/Pod-Point/go-queue-worker/internal/client" + "github.com/Pod-Point/go-queue-worker/internal/messages" ) // retriever will get messages from SQS until the given context gets canceled. diff --git a/sqs.go b/sqs.go index 1b09a30..96026c1 100644 --- a/sqs.go +++ b/sqs.go @@ -9,8 +9,8 @@ import ( awsSqs "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" - "github.com/francescopepe/formigo/internal/client" - "github.com/francescopepe/formigo/internal/messages" + "github.com/Pod-Point/go-queue-worker/internal/client" + "github.com/Pod-Point/go-queue-worker/internal/messages" ) type SqsClientConfiguration struct { diff --git a/worker.go b/worker.go index 4ffe257..e6d5666 100644 --- a/worker.go +++ b/worker.go @@ -5,8 +5,8 @@ import ( "errors" "sync" - "github.com/francescopepe/formigo/internal/client" - "github.com/francescopepe/formigo/internal/messages" + "github.com/Pod-Point/go-queue-worker/internal/client" + "github.com/Pod-Point/go-queue-worker/internal/messages" ) type Worker struct { From 84934ce141f58cb8fc4b2527483a87a311a65857 Mon Sep 17 00:00:00 2001 From: leegm-pp Date: Tue, 1 Jul 2025 12:19:12 +0100 Subject: [PATCH 17/18] fix: update formigo reference and delete retract block in go.mod --- README.md | 4 ++-- go.mod | 5 ----- go.sum | 2 -- worker_test.go | 3 ++- 4 files changed, 4 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 69db507..eef1373 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ import ( "fmt" "log" - "github.com/Pod-Point/go-queue-worker" + formigo "github.com/Pod-Point/go-queue-worker" workerSqs "github.com/Pod-Point/go-queue-worker/clients/sqs" "github.com/aws/aws-sdk-go-v2/aws" @@ -108,7 +108,7 @@ import ( "fmt" "log" - "github.com/Pod-Point/go-queue-worker" + formigo "github.com/Pod-Point/go-queue-worker" workerSqs "github.com/Pod-Point/go-queue-worker/clients/sqs" "github.com/aws/aws-sdk-go-v2/aws" diff --git a/go.mod b/go.mod index b310d80..b9949ea 100644 --- a/go.mod +++ b/go.mod @@ -16,8 +16,3 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) - -retract ( - v1.0.0 // Accidentally added - v1.0.1 // Contains retractions only. -) diff --git a/go.sum b/go.sum index c419e11..5c94707 100644 --- a/go.sum +++ b/go.sum @@ -4,8 +4,6 @@ github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 h1:TNyt/+X43KJ9IJJMj github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16/go.mod h1:2DwJF39FlNAUiX5pAc0UNeiz16lK2t7IaFcm0LFHEgc= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 h1:jYfy8UPmd+6kJW5YhY0L1/KftReOGxI/4NtVSTh9O/I= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16/go.mod h1:7ZfEPZxkW42Afq4uQB8H2E2e6ebh6mXTueEpYzjCzcs= -github.com/aws/aws-sdk-go-v2/service/sqs v1.34.4 h1:FXPO72iKC5YmYNEANltl763bUj8A6qT20wx8Jwvxlsw= -github.com/aws/aws-sdk-go-v2/service/sqs v1.34.4/go.mod h1:7idt3XszF6sE9WPS1GqZRiDJOxw4oPtlRBXodWnCGjU= github.com/aws/aws-sdk-go-v2/service/sqs v1.34.5 h1:HYyVDOC2/PIg+3oBX1q0wtDU5kONki6lrgIG0afrBkY= github.com/aws/aws-sdk-go-v2/service/sqs v1.34.5/go.mod h1:7idt3XszF6sE9WPS1GqZRiDJOxw4oPtlRBXodWnCGjU= github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4= diff --git a/worker_test.go b/worker_test.go index 7731880..3ccf1f5 100644 --- a/worker_test.go +++ b/worker_test.go @@ -7,8 +7,9 @@ import ( "testing" "time" - "github.com/francescopepe/formigo/internal/messages" "github.com/stretchr/testify/assert" + + "github.com/Pod-Point/go-queue-worker/internal/messages" ) type SimpleInMemoryBrokerMessage struct { From e1dea93eb229b728815f2766722987930d639cde Mon Sep 17 00:00:00 2001 From: leegm-pp Date: Tue, 1 Jul 2025 12:34:05 +0100 Subject: [PATCH 18/18] fix: try running commitlint from base to head insead --- .github/workflows/lint.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index ff9ecd7..1c045b6 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -57,4 +57,4 @@ jobs: - name: Validate PR commits with commitlint if: github.event_name == 'pull_request' - run: npx commitlint --from ${{ github.event.pull_request.head.sha }}~${{ github.event.pull_request.commits }} --to ${{ github.event.pull_request.head.sha }} --verbose + run: npx commitlint --from ${{ github.event.pull_request.base.sha }} --to ${{ github.event.pull_request.head.sha }} --verbose