diff --git a/.golangci.yaml b/.golangci.yaml deleted file mode 100644 index b05dbe4..0000000 --- a/.golangci.yaml +++ /dev/null @@ -1,56 +0,0 @@ -linters-settings: - gocyclo: - min-complexity: 50 - dupl: - threshold: 5 - goconst: - min-len: 2 - min-occurrences: 2 - misspell: - locale: US - revive: - confidence: 0.8 - lll: - line-length: 160 - # tab width in spaces. Default to 1. - tab-width: 1 - funlen: - lines: 150 - statements: 80 - -linters: - # please, do not use `enable-all`: it's deprecated and will be removed soon. - # inverted configuration with `enable-all` and `disable` is not scalable during updates of golangci-lint - disable-all: true - enable: - - errcheck - - funlen - - goconst - - gocyclo - - gosec - - gosimple - - govet - - ineffassign - - lll - - misspell - - revive - - staticcheck - - typecheck - - unconvert - - unparam - - unused - - # don't enable: - # - gochecknoglobals - # - gocognit - # - godox - # - maligned - # - prealloc - -run: - skip-dirs: - # - test/testdata_etc -issues: - exclude-rules: - exclude-files: - - ".*_test\\.go$" diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..d2b645e --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,171 @@ +# This configuration file is not a recommendation. +# +# We intentionally use a limited set of linters. +# This configuration file is used with different version of golangci-lint to avoid regressions: +# the linters can change between version, +# their configuration may be not compatible or their reports can be different, +# and this can break some of our tests. +# Also, some linters are not relevant for the project (e.g. linters related to SQL). +# +# We have specific constraints, so we use a specific configuration. +# +# See the file `.golangci.reference.yml` to have a list of all available configuration options. + +version: "2" + +linters: + default: none + # This list of linters is not a recommendation (same thing for all this configuration file). + # We intentionally use a limited set of linters. + # See the comment on top of this file. + enable: + - bodyclose + - copyloopvar + - depguard + - dogsled + - dupl + - errcheck + - errorlint + - funlen + - gocheckcompilerdirectives + - gochecknoinits + - goconst + - gocritic + - gocyclo + - godox + - mnd + - goprintffuncname + - gosec + - govet + - intrange + - ineffassign + - lll + - misspell + - nakedret + - noctx + - nolintlint + - revive + - staticcheck + - testifylint + - unconvert + - unparam + - unused + - whitespace + + settings: + depguard: + rules: + logger: + deny: + # logging is allowed only by logutils.Log, + - pkg: "github.com/sirupsen/logrus" + desc: logging is allowed only by logutils.Log. + - pkg: "github.com/pkg/errors" + desc: Should be replaced by standard lib errors package. + - pkg: "github.com/instana/testify" + desc: It's a fork of github.com/stretchr/testify. + dupl: + threshold: 100 + funlen: + lines: -1 # the number of lines (code + empty lines) is not a right metric and leads to code without empty line or one-liner. + statements: 50 + goconst: + min-len: 2 + min-occurrences: 3 + gocritic: + settings: + hugeParam: + sizeThreshold: 300 + enabled-tags: + - diagnostic + - experimental + - opinionated + - performance + - style + disabled-checks: + - dupImport # https://github.com/go-critic/go-critic/issues/845 + - ifElseChain + - octalLiteral + - whyNoLint + gocyclo: + min-complexity: 15 + godox: + keywords: + - FIXME + mnd: + # don't include the "operation" and "assign" + checks: + - argument + - case + - condition + - return + ignored-numbers: + - "0" + - "1" + - "2" + - "3" + ignored-functions: + - strings.SplitN + govet: + settings: + printf: + funcs: + - (github.com/rs/zerolog/log).Infof + - (github.com/rs/zerolog/log).Warnf + - (github.com/rs/zerolog/log).Errorf + - (github.com/rs/zerolog/log).Fatalf + enable: + - nilness + - shadow + errorlint: + asserts: false + lll: + line-length: 140 + misspell: + locale: US + ignore-rules: + - "importas" # linter name + nolintlint: + allow-unused: false # report any unused nolint directives + require-explanation: true # require an explanation for nolint directives + require-specific: true # require nolint directives to be specific about which linter is being skipped + revive: + rules: + - name: indent-error-flow + - name: unexported-return + disabled: true + - name: unused-parameter + - name: unused-receiver + + exclusions: + # presets: + # - comments + # - std-error-handling + # - common-false-positives + # - legacy + paths: + - misc/ + - examples/ + rules: + - path: (.+)_test\.go + linters: + - dupl + - mnd + - lll + - testifylint + +formatters: + enable: + - gofmt + - goimports + settings: + gofmt: + rewrite-rules: + - pattern: "interface{}" + replacement: "any" + goimports: + local-prefixes: + - github.com/bxcodec/goqueue + exclusions: + paths: + - misc/ diff --git a/Makefile b/Makefile index 7cb83e9..0fbd30f 100644 --- a/Makefile +++ b/Makefile @@ -35,7 +35,7 @@ run-tests: $(GOTESTSUM) test: run-tests $(TPARSE) ## Run Tests & parse details @cat gotestsum.json.out | $(TPARSE) -all -notests docker-test: - @docker-compose -f test.compose.yaml up -d --build + @docker compose -f test.compose.yaml up -d --build integration-test: docker-test @echo "Running Integration Tests" @@ -48,12 +48,12 @@ integration-test-ci: $(GOTESTSUM) $(TPARSE) @cat gotestsum.json.out | $(TPARSE) -all -notests docker-clean: - @docker-compose -f test.compose.yaml down + @docker compose -f test.compose.yaml down lint: $(GOLANGCI) ## Runs golangci-lint with predefined configuration @echo "Applying linter" golangci-lint version - golangci-lint run -c .golangci.yaml ./... + golangci-lint run -c .golangci.yml ./... .PHONY: lint lint-prepare clean build unittest diff --git a/consumer/service.go b/consumer/service.go index 347d78d..d94ff2e 100644 --- a/consumer/service.go +++ b/consumer/service.go @@ -3,6 +3,7 @@ package consumer import ( "github.com/bxcodec/goqueue/internal/consumer" "github.com/bxcodec/goqueue/internal/consumer/rabbitmq" + _ "github.com/bxcodec/goqueue/internal/shared" // Auto-setup logging "github.com/bxcodec/goqueue/options" consumerOpts "github.com/bxcodec/goqueue/options/consumer" ) diff --git a/encoding.go b/encoding.go index e95f3e4..f0880da 100644 --- a/encoding.go +++ b/encoding.go @@ -20,11 +20,11 @@ type DecoderFn func(ctx context.Context, data []byte) (m interfaces.Message, err var ( // JSONEncoder is an implementation of the EncoderFn interface // that encodes a Message into JSON format. - JSONEncoder EncoderFn = func(ctx context.Context, m interfaces.Message) (data []byte, err error) { + JSONEncoder EncoderFn = func(_ context.Context, m interfaces.Message) (data []byte, err error) { return json.Marshal(m) } // JSONDecoder is a DecoderFn implementation that decodes JSON data into a Message. - JSONDecoder DecoderFn = func(ctx context.Context, data []byte) (m interfaces.Message, err error) { + JSONDecoder DecoderFn = func(_ context.Context, data []byte) (m interfaces.Message, err error) { err = json.Unmarshal(data, &m) return } @@ -73,6 +73,7 @@ var ( DefaultEncoding = JSONEncoding ) +//nolint:gochecknoinits // Required for auto-registration of default JSON encoding func init() { AddGoQueueEncoding(JSONEncoding.ContentType, JSONEncoding) } diff --git a/go.mod b/go.mod index de368ba..d5a4bc0 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.23.0 require ( github.com/google/uuid v1.6.0 github.com/rabbitmq/amqp091-go v1.10.0 - github.com/sirupsen/logrus v1.9.3 + github.com/rs/zerolog v1.34.0 github.com/stretchr/testify v1.10.0 go.uber.org/multierr v1.11.0 golang.org/x/sync v0.16.0 @@ -13,7 +13,10 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.19 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect + golang.org/x/sys v0.12.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 99227c1..e126ee6 100644 --- a/go.sum +++ b/go.sum @@ -1,16 +1,23 @@ -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= -github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= -github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= +github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY= +github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= @@ -19,10 +26,11 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= -golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= -golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/interfaces/delayfn.go b/interfaces/delayfn.go index 1c639b3..ef5d0ee 100644 --- a/interfaces/delayfn.go +++ b/interfaces/delayfn.go @@ -1,5 +1,7 @@ package interfaces +// DelayFn is a function type that represents a delay function. +// It takes the current number of retries as input and returns the delay in seconds. type DelayFn func(currenRetries int64) (delay int64) var ( @@ -16,8 +18,10 @@ var ( } // NoDelayFn is a DelayFn implementation that returns 0 delay for retries. - NoDelayFn DelayFn = func(currenRetries int64) (delay int64) { + NoDelayFn DelayFn = func(_ int64) (delay int64) { return 0 } + // DefaultDelayFn is the default delay function that will be used if no delay function is provided. + // It is set to LinearDelayFn by default. DefaultDelayFn DelayFn = LinearDelayFn ) diff --git a/interfaces/inboundmessagehandler.go b/interfaces/inboundmessagehandler.go index df7209c..1305a82 100644 --- a/interfaces/inboundmessagehandler.go +++ b/interfaces/inboundmessagehandler.go @@ -17,8 +17,12 @@ type InboundMessageHandlerMiddlewareFunc func(next InboundMessageHandlerFunc) In type InboundMessage struct { Message - RetryCount int64 `json:"retryCount"` - Metadata map[string]interface{} `json:"metadata"` + // RetryCount is the number of times the message has been retried. + // This is set by the library to identify the number of times the message has been retried. + RetryCount int64 `json:"retryCount"` + // Metadata is the metadata of the message. + // This is set by the library to identify the metadata of the message. + Metadata map[string]any `json:"metadata"` // Ack is used for confirming the message. It will drop the message from the queue. Ack func(ctx context.Context) (err error) `json:"-"` // Nack is used for rejecting the message. It will requeue the message to be re-delivered again. diff --git a/interfaces/message.go b/interfaces/message.go index cec23c9..5720c09 100644 --- a/interfaces/message.go +++ b/interfaces/message.go @@ -12,21 +12,56 @@ import ( // The message is used to publish messages to the queue. // Read the concept of message publishing in the documentation, here: TODO(bxcodec): Add link to the documentation type Message struct { - ID string `json:"id"` - Action string `json:"action"` - Topic string `json:"topic"` - Data any `json:"data"` - ContentType headerVal.ContentType `json:"-"` - Timestamp time.Time `json:"timestamp"` - Headers map[string]interface{} `json:"-"` - ServiceAgent headerVal.GoquServiceAgent `json:"-"` + // ID is the unique identifier for the message. + // This is set by the publisher to identify the message in the queue. + // The id is auto-generated by the the library if not provided. + ID string `json:"id"` + // Action is the action that will be performed on the message. + // This is set by the publisher to identify the action that will be performed on the message. + // For RabbitMQ, the action is the routing key. + Action string `json:"action"` + // Topic is the topic that the message will be published to. + // This is set by the publisher to identify the topic that the message will be published to. + // For RabbitMQ, the topic is the exchange name. + Topic string `json:"topic"` + // Data is the data that will be published to the queue. + // This is set by the publisher to identify the data that will be published to the queue. + // It should be a valid JSON object. + Data any `json:"data"` + // ContentType is the content type of the message. + // This is set by the publisher to identify the content type of the message. + // Default value is "application/json". + ContentType headerVal.ContentType `json:"-"` + // Timestamp is the timestamp of the message. + // This is set by the publisher to identify the timestamp of the message. + // Default value is the current time. + Timestamp time.Time `json:"timestamp"` + // Headers is the headers of the message. + // This is set by the publisher to identify the headers of the message. + // This library will provide extra headers values by default based on the library type. + // Don't use any prefix with :goqueue-, it will conflicted and overrided by the library. + Headers map[string]any `json:"-"` + // ServiceAgent is the service agent that will be used to publish the message. + // This is set by the publisher to identify the service agent that will be used to publish the message. + // This will be set by the library and override any value + ServiceAgent headerVal.GoquServiceAgent `json:"-"` + // SchemaVersion is the schema version of the message. + // This is set by the publisher to identify the schema version of the message. + // Default value is the library type. + // This will be set by the library and override any value schemaVersion string } +// SetSchemaVersion is a method to set the schema version of the message. +// This is used to set the schema version of the message. +// This will be set by the library and override any value func (m *Message) SetSchemaVersion(v string) { m.schemaVersion = v } +// GetSchemaVersion is a method to get the schema version of the message. +// This is used to get the schema version of the message. +// This will be set by the library and override any value func (m *Message) GetSchemaVersion() string { return m.schemaVersion } diff --git a/interfaces/publisher.go b/interfaces/publisher.go index 895ba61..a3b3df8 100644 --- a/interfaces/publisher.go +++ b/interfaces/publisher.go @@ -20,4 +20,6 @@ func (f PublisherFunc) Publish(ctx context.Context, m Message) (err error) { return f(ctx, m) } +// PublisherMiddlewareFunc is a function type that represents a publisher middleware function. +// It takes a next PublisherFunc as input parameter and returns a PublisherFunc. type PublisherMiddlewareFunc func(next PublisherFunc) PublisherFunc diff --git a/internal/consumer/consumer.go b/internal/consumer/consumer.go index b01ec73..adf032d 100644 --- a/internal/consumer/consumer.go +++ b/internal/consumer/consumer.go @@ -13,7 +13,7 @@ type Consumer interface { // Consume consumes messages from the queue and passes them to the provided handler. // It takes a context, an InboundMessageHandler, and a map of metadata as parameters. // It returns an error if there was a problem consuming the messages. - Consume(ctx context.Context, handler interfaces.InboundMessageHandler, meta map[string]interface{}) (err error) + Consume(ctx context.Context, handler interfaces.InboundMessageHandler, meta map[string]any) (err error) // Stop stops the consumer from consuming messages. // It takes a context as a parameter and returns an error if there was a problem stopping the consumer. diff --git a/internal/consumer/rabbitmq/blackbox_consumer_test.go b/internal/consumer/rabbitmq/blackbox_consumer_test.go index a169ff3..c114816 100644 --- a/internal/consumer/rabbitmq/blackbox_consumer_test.go +++ b/internal/consumer/rabbitmq/blackbox_consumer_test.go @@ -7,6 +7,13 @@ import ( "testing" "time" + "github.com/google/uuid" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/rs/zerolog/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/bxcodec/goqueue" headerKey "github.com/bxcodec/goqueue/headers/key" headerVal "github.com/bxcodec/goqueue/headers/value" @@ -15,12 +22,6 @@ import ( "github.com/bxcodec/goqueue/middleware" "github.com/bxcodec/goqueue/options" consumerOpts "github.com/bxcodec/goqueue/options/consumer" - "github.com/google/uuid" - amqp "github.com/rabbitmq/amqp091-go" - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" ) const ( @@ -53,13 +54,12 @@ func TestSuiteRabbitMQConsumer(t *testing.T) { rabbitMQTestSuite := &rabbitMQTestSuite{ rmqURL: rmqURL, } - logrus.SetLevel(logrus.DebugLevel) - logrus.SetFormatter(&logrus.JSONFormatter{}) + log.Logger = log.With().Caller().Logger() rabbitMQTestSuite.initConnection(t) suite.Run(t, rabbitMQTestSuite) } -func (s *rabbitMQTestSuite) BeforeTest(_, _ string) { +func (*rabbitMQTestSuite) BeforeTest(_, _ string) { } func (s *rabbitMQTestSuite) AfterTest(_, _ string) { @@ -111,8 +111,11 @@ func (s *rabbitMQTestSuite) initQueueForTesting(t *testing.T, exchangePattern .. require.NoError(t, err) for _, patternRoutingKey := range exchangePattern { - logrus.Printf("binding queue %s to exchange %s with routing key %s", - q.Name, testExchange, patternRoutingKey) + log.Info(). + Str("queue_name", q.Name). + Str("exchange", testExchange). + Str("routing_key", patternRoutingKey). + Msg("binding queue to exchange") err = s.consumerChannel.QueueBind( rabbitMQTestQueueName, // queue name @@ -125,11 +128,11 @@ func (s *rabbitMQTestSuite) initQueueForTesting(t *testing.T, exchangePattern .. } } -func (s *rabbitMQTestSuite) getMockData(action string) (res interfaces.Message) { - res = interfaces.Message{ +func (*rabbitMQTestSuite) getMockData(action string) (res *interfaces.Message) { + res = &interfaces.Message{ Action: action, Topic: testExchange, - Data: map[string]interface{}{ + Data: map[string]any{ "message": "hello-world-test", }, Timestamp: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), @@ -138,7 +141,7 @@ func (s *rabbitMQTestSuite) getMockData(action string) (res interfaces.Message) return res } -func (s *rabbitMQTestSuite) seedPublish(contentType string, action string) { +func (s *rabbitMQTestSuite) seedPublish(contentType, action string) { mockData := s.getMockData(action) jsonData, err := json.Marshal(mockData) s.Require().NoError(err) @@ -216,16 +219,16 @@ func (s *rabbitMQTestSuite) TestConsumerWithExchangePatternProvided() { s.Require().NoError(err) } -func handler(t *testing.T, expected interfaces.Message) interfaces.InboundMessageHandlerFunc { +func handler(t *testing.T, expected *interfaces.Message) interfaces.InboundMessageHandlerFunc { return func(ctx context.Context, m interfaces.InboundMessage) (err error) { switch m.ContentType { case headerVal.ContentTypeText: assert.Equal(t, expected.Data, m.Data) case headerVal.ContentTypeJSON: - expectedJSON, err := json.Marshal(expected.Data) - require.NoError(t, err) - actualJSON, err := json.Marshal(m.Data) - require.NoError(t, err) + expectedJSON, marshalErr := json.Marshal(expected.Data) + require.NoError(t, marshalErr) + actualJSON, marshalErr := json.Marshal(m.Data) + require.NoError(t, marshalErr) assert.JSONEq(t, string(expectedJSON), string(actualJSON)) } @@ -269,7 +272,7 @@ func (s *rabbitMQTestSuite) TestRequeueWithouthExchangePatternProvided() { func handlerRequeue(t *testing.T) interfaces.InboundMessageHandlerFunc { return func(ctx context.Context, m interfaces.InboundMessage) (err error) { delayFn := func(retries int64) int64 { - assert.Equal(t, int64(m.RetryCount)+1, retries) // because the retry++ is done before this delayfn is called + assert.Equal(t, m.RetryCount+1, retries) // because the retry++ is done before this delayfn is called return m.RetryCount } diff --git a/internal/consumer/rabbitmq/consumer.go b/internal/consumer/rabbitmq/consumer.go index 1736553..97ff8f9 100644 --- a/internal/consumer/rabbitmq/consumer.go +++ b/internal/consumer/rabbitmq/consumer.go @@ -3,14 +3,15 @@ package rabbitmq import ( "context" "encoding/json" + "errors" "fmt" "time" "github.com/google/uuid" amqp "github.com/rabbitmq/amqp091-go" - "github.com/sirupsen/logrus" + "github.com/rs/zerolog/log" - "github.com/bxcodec/goqueue/errors" + goqueueErrors "github.com/bxcodec/goqueue/errors" headerKey "github.com/bxcodec/goqueue/headers/key" headerVal "github.com/bxcodec/goqueue/headers/value" "github.com/bxcodec/goqueue/interfaces" @@ -19,6 +20,11 @@ import ( consumerOpts "github.com/bxcodec/goqueue/options/consumer" ) +const ( + // millisecondsMultiplier converts seconds to milliseconds for RabbitMQ expiration + millisecondsMultiplier = 10_000 +) + // rabbitMQ is the subscriber handler for rabbitmq type rabbitMQ struct { consumerChannel *amqp.Channel @@ -70,7 +76,7 @@ func (r *rabbitMQ) initQueue() { r.option.RabbitMQConsumerConfig.QueueDeclareConfig.Args, ) if err != nil { - logrus.Fatal("error declaring the queue, ", err) + log.Fatal().Err(err).Msg("error declaring the queue") } for _, eventType := range r.option.RabbitMQConsumerConfig.QueueBindConfig.RoutingKeys { @@ -82,7 +88,7 @@ func (r *rabbitMQ) initQueue() { r.option.RabbitMQConsumerConfig.QueueBindConfig.Args, ) if err != nil { - logrus.Fatal("error binding the queue, ", err) + log.Fatal().Err(err).Msg("error binding the queue") } } } @@ -100,7 +106,7 @@ func (r *rabbitMQ) initConsumer() { err := r.consumerChannel.Qos(r.option.BatchMessageSize, 0, false) if err != nil { - logrus.Fatal("error when setting the prefetch count, ", err) + log.Fatal().Err(err).Msg("error when setting the prefetch count") } receiver, err := r.consumerChannel.Consume( @@ -127,7 +133,7 @@ func (r *rabbitMQ) initConsumer() { nil, ) if err != nil { - logrus.Fatal(err, "error consuming message") + log.Fatal().Err(err).Msg("error consuming message") } r.msgReceiver = receiver } @@ -148,7 +154,7 @@ func (r *rabbitMQ) initRetryModule() { ) if err != nil { - logrus.Fatal("error declaring the retry exchange, ", err) + log.Fatal().Err(err).Msg("error declaring the retry exchange") } // declare dead letter exchange @@ -163,7 +169,7 @@ func (r *rabbitMQ) initRetryModule() { ) if err != nil { - logrus.Fatal("error declaring the retry dead letter exchange, ", err) + log.Fatal().Err(err).Msg("error declaring the retry dead letter exchange") } // bind dead letter exchange to original queue @@ -175,7 +181,7 @@ func (r *rabbitMQ) initRetryModule() { nil, ) if err != nil { - logrus.Fatal("error binding the dead letter exchange to the original queue, ", err) + log.Fatal().Err(err).Msg("error binding the dead letter exchange to the original queue") } // declare retry queue @@ -192,7 +198,7 @@ func (r *rabbitMQ) initRetryModule() { }, ) if err != nil { - logrus.Fatal("error declaring the retry queue, ", err) + log.Fatal().Err(err).Msg("error declaring the retry queue") } // bind retry queue to retry exchange @@ -204,7 +210,7 @@ func (r *rabbitMQ) initRetryModule() { nil, ) if err != nil { - logrus.Fatal("error binding the retry queue, ", err) + log.Fatal().Err(err).Msg("error binding the retry queue") } } } @@ -213,7 +219,8 @@ func (r *rabbitMQ) initRetryModule() { // It takes a context, an inbound message handler, and a map of metadata as input parameters. // The function continuously listens for messages from the queue and processes them until the context is canceled. // If the context is canceled, the function stops consuming messages and returns. -// For each received message, the function builds an inbound message, extracts the retry count, and checks if the maximum retry count has been reached. +// For each received message, the function builds an inbound message, extracts the retry count, +// and checks if the maximum retry count has been reached. // If the maximum retry count has been reached, the message is moved to the dead letter queue. // Otherwise, the message is passed to the message handler for processing. // The message handler is responsible for handling the message and returning an error if any. @@ -225,38 +232,38 @@ func (r *rabbitMQ) initRetryModule() { // The function returns an error if any occurred during message handling or if the context was canceled. func (r *rabbitMQ) Consume(ctx context.Context, h interfaces.InboundMessageHandler, - meta map[string]interface{}) (err error) { - logrus.WithFields(logrus.Fields{ - "queue_name": r.option.QueueName, - "consumer_meta": meta, - }).Info("starting the worker") + meta map[string]any) (err error) { + log.Info(). + Str("queue_name", r.option.QueueName). + Interface("consumer_meta", meta). + Msg("starting the worker") for { select { case <-ctx.Done(): - logrus.WithFields(logrus.Fields{ - "queue_name": r.option.QueueName, - "consumer_meta": meta, - }).Info("stopping the worker") - return + log.Info(). + Str("queue_name", r.option.QueueName). + Interface("consumer_meta", meta). + Msg("stopping the worker") + return err case receivedMsg, ok := <-r.msgReceiver: if !ok { // deliveries channel closed (e.g., due to Stop/Cancel or connection closure) - logrus.WithFields(logrus.Fields{ - "queue_name": r.option.QueueName, - "consumer_meta": meta, - }).Info("message receiver closed, stopping the worker") - return + log.Info(). + Str("queue_name", r.option.QueueName). + Interface("consumer_meta", meta). + Msg("message receiver closed, stopping the worker") + return err } msg, err := buildMessage(meta, receivedMsg) if err != nil { - if err == errors.ErrInvalidMessageFormat { + if errors.Is(err, goqueueErrors.ErrInvalidMessageFormat) { nackErr := receivedMsg.Nack(false, false) // nack with requeue false if nackErr != nil { - logrus.WithFields(logrus.Fields{ - "consumer_meta": meta, - "error": nackErr, - }).Error("failed to nack the message") + log.Error(). + Interface("consumer_meta", meta). + Err(nackErr). + Msg("failed to nack the message") } } continue @@ -264,26 +271,26 @@ func (r *rabbitMQ) Consume(ctx context.Context, retryCount := extractHeaderInt(receivedMsg.Headers, headerKey.RetryCount) if retryCount > r.option.MaxRetryFailedMessage { - logrus.WithFields(logrus.Fields{ - "consumer_meta": meta, - "message_id": msg.ID, - "topic": msg.Topic, - "action": msg.Action, - "timestamp": msg.Timestamp, - }).Error("max retry failed message reached, moving message to dead letter queue") + log.Error(). + Interface("consumer_meta", meta). + Str("message_id", msg.ID). + Str("topic", msg.Topic). + Str("action", msg.Action). + Time("timestamp", msg.Timestamp). + Msg("max retry failed message reached, moving message to dead letter queue") err = receivedMsg.Nack(false, false) if err != nil { - logrus.WithFields(logrus.Fields{ - "consumer_meta": meta, - "error": err, - }).Error("failed to nack the message") + log.Error(). + Interface("consumer_meta", meta). + Err(err). + Msg("failed to nack the message") } continue } m := interfaces.InboundMessage{ Message: msg, RetryCount: retryCount, - Metadata: map[string]interface{}{ + Metadata: map[string]any{ "app-id": receivedMsg.AppId, "consumer-tag": receivedMsg.ConsumerTag, "content-encoding": receivedMsg.ContentEncoding, @@ -299,17 +306,17 @@ func (r *rabbitMQ) Consume(ctx context.Context, "type": receivedMsg.Type, "user-id": receivedMsg.UserId, }, - Ack: func(ctx context.Context) (err error) { + Ack: func(_ context.Context) (err error) { err = receivedMsg.Ack(false) return }, - Nack: func(ctx context.Context) (err error) { + Nack: func(_ context.Context) (err error) { // receivedMsg.Nack(false, true) => will redelivered again instantly (same with receivedMsg.reject) // receivedMsg.Nack(false, false) => will put the message to dead letter queue (same with receivedMsg.reject) err = receivedMsg.Nack(false, true) return }, - MoveToDeadLetterQueue: func(ctx context.Context) (err error) { + MoveToDeadLetterQueue: func(_ context.Context) (err error) { // receivedMsg.Nack(false, true) => will redelivered again instantly (same with receivedMsg.reject) // receivedMsg.Nack(false, false) => will put the message to dead letter queue (same with receivedMsg.reject) err = receivedMsg.Nack(false, false) @@ -318,47 +325,47 @@ func (r *rabbitMQ) Consume(ctx context.Context, RetryWithDelayFn: r.requeueMessageWithDLQ(meta, msg, receivedMsg), } - logrus.WithFields(logrus.Fields{ - "consumer_meta": meta, - "message_id": msg.ID, - "topic": msg.Topic, - "action": msg.Action, - "timestamp": msg.Timestamp, - }).Info("message received") + log.Info(). + Interface("consumer_meta", meta). + Str("message_id", msg.ID). + Str("topic", msg.Topic). + Str("action", msg.Action). + Time("timestamp", msg.Timestamp). + Msg("message received") handleCtx := middleware.ApplyHandlerMiddleware(h.HandleMessage, r.option.Middlewares...) err = handleCtx(ctx, m) if err != nil { - logrus.WithFields(logrus.Fields{ - "consumer_meta": meta, - "message_id": msg.ID, - "topic": msg.Topic, - "action": msg.Action, - "timestamp": msg.Timestamp, - }).Error("error handling message, ", err) + log.Error(). + Interface("consumer_meta", meta). + Str("message_id", msg.ID). + Str("topic", msg.Topic). + Str("action", msg.Action). + Time("timestamp", msg.Timestamp). + Err(err). + Msg("error handling message") } } } } -func buildMessage(consumerMeta map[string]interface{}, receivedMsg amqp.Delivery) (msg interfaces.Message, err error) { +func buildMessage(consumerMeta map[string]any, receivedMsg amqp.Delivery) (msg interfaces.Message, err error) { if len(receivedMsg.Body) == 0 { - logrus.WithFields(logrus.Fields{ - "consumer_meta": consumerMeta, - "msg": string(receivedMsg.Body), - }).Error("message body is empty, removing the message due to wrong message format") - return msg, errors.ErrInvalidMessageFormat + log.Error(). + Interface("consumer_meta", consumerMeta). + Str("msg", string(receivedMsg.Body)). + Msg("message body is empty, removing the message due to wrong message format") + return msg, goqueueErrors.ErrInvalidMessageFormat } err = json.Unmarshal(receivedMsg.Body, &msg) if err != nil { - logrus.Error("failed to unmarshal the message, got err: ", err) - logrus.WithFields(logrus.Fields{ - "consumer_meta": consumerMeta, - "msg": string(receivedMsg.Body), - "error": err, - }).Error("failed to unmarshal the message, removing the message due to wrong message format") - return msg, errors.ErrInvalidMessageFormat + log.Error(). + Interface("consumer_meta", consumerMeta). + Str("msg", string(receivedMsg.Body)). + Err(err). + Msg("failed to unmarshal the message, removing the message due to wrong message format") + return msg, goqueueErrors.ErrInvalidMessageFormat } if msg.ID == "" { @@ -380,18 +387,18 @@ func buildMessage(consumerMeta map[string]interface{}, receivedMsg amqp.Delivery msg.Headers = receivedMsg.Headers } if msg.Data == "" || msg.Data == nil { - logrus.WithFields(logrus.Fields{ - "consumer_meta": consumerMeta, - "msg": msg, - }).Error("message data is empty, removing the message due to wrong message format") - return msg, errors.ErrInvalidMessageFormat + log.Error(). + Interface("consumer_meta", consumerMeta). + Interface("msg", msg). + Msg("message data is empty, removing the message due to wrong message format") + return msg, goqueueErrors.ErrInvalidMessageFormat } msg.SetSchemaVersion(extractHeaderString(receivedMsg.Headers, headerKey.SchemaVer)) return msg, nil } -func (r *rabbitMQ) requeueMessageWithDLQ(consumerMeta map[string]interface{}, msg interfaces.Message, +func (r *rabbitMQ) requeueMessageWithDLQ(consumerMeta map[string]any, msg interfaces.Message, receivedMsg amqp.Delivery) func(ctx context.Context, delayFn interfaces.DelayFn) (err error) { return func(ctx context.Context, delayFn interfaces.DelayFn) (err error) { if delayFn == nil { @@ -417,21 +424,21 @@ func (r *rabbitMQ) requeueMessageWithDLQ(consumerMeta map[string]interface{}, ms Body: receivedMsg.Body, Timestamp: time.Now(), AppId: r.tagName, - Expiration: fmt.Sprintf("%d", delayInSeconds*10000), + Expiration: fmt.Sprintf("%d", delayInSeconds*millisecondsMultiplier), }, ) if requeueErr != nil { - logrus.WithFields(logrus.Fields{ - "consumer_meta": consumerMeta, - "error": requeueErr, - }).Error("failed to requeue the message") + log.Error(). + Interface("consumer_meta", consumerMeta). + Err(requeueErr). + Msg("failed to requeue the message") err = receivedMsg.Nack(false, false) // move to DLQ instead (depend on the RMQ server configuration) if err != nil { - logrus.WithFields(logrus.Fields{ - "consumer_meta": consumerMeta, - "error": err, - }).Error("failed to nack the message") + log.Error(). + Interface("consumer_meta", consumerMeta). + Err(err). + Msg("failed to nack the message") return err } return requeueErr @@ -441,10 +448,10 @@ func (r *rabbitMQ) requeueMessageWithDLQ(consumerMeta map[string]interface{}, ms // ack the message err = receivedMsg.Ack(false) if err != nil { - logrus.WithFields(logrus.Fields{ - "consumer_meta": consumerMeta, - "error": err, - }).Error("failed to ack the message") + log.Error(). + Interface("consumer_meta", consumerMeta). + Err(err). + Msg("failed to ack the message") return err } return nil diff --git a/internal/publisher/rabbitmq/blackbox_publisher_test.go b/internal/publisher/rabbitmq/blackbox_publisher_test.go index 6ad45ce..3be0457 100644 --- a/internal/publisher/rabbitmq/blackbox_publisher_test.go +++ b/internal/publisher/rabbitmq/blackbox_publisher_test.go @@ -8,6 +8,11 @@ import ( "testing" "time" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/rs/zerolog/log" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/bxcodec/goqueue" headerKey "github.com/bxcodec/goqueue/headers/key" headerVal "github.com/bxcodec/goqueue/headers/value" @@ -15,10 +20,6 @@ import ( rmq "github.com/bxcodec/goqueue/internal/publisher/rabbitmq" "github.com/bxcodec/goqueue/options" publisherOpts "github.com/bxcodec/goqueue/options/publisher" - amqp "github.com/rabbitmq/amqp091-go" - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" ) const ( @@ -50,13 +51,12 @@ func TestSuiteRabbitMQPublisher(t *testing.T) { rabbitMQTestSuite := &rabbitMQTestSuite{ rmqURL: rmqURL, } - logrus.SetLevel(logrus.DebugLevel) - logrus.SetFormatter(&logrus.JSONFormatter{}) + log.Logger = log.With().Caller().Logger() rabbitMQTestSuite.initConnection(t) suite.Run(t, rabbitMQTestSuite) } -func (s *rabbitMQTestSuite) BeforeTest(_, _ string) { +func (*rabbitMQTestSuite) BeforeTest(_, _ string) { } func (s *rabbitMQTestSuite) AfterTest(_, _ string) { @@ -107,8 +107,11 @@ func (s *rabbitMQTestSuite) initQueueForTesting(t *testing.T, exchangePattern .. require.NoError(t, err) for _, patternRoutingKey := range exchangePattern { - logrus.Printf("binding queue %s to exchange %s with routing key %s", - q.Name, testExchange, patternRoutingKey) + log.Info(). + Str("queue_name", q.Name). + Str("exchange", testExchange). + Str("routing_key", patternRoutingKey). + Msg("binding queue to exchange") err = s.consumerChannel.QueueBind( rabbitMQTestQueueName, // queue name @@ -120,13 +123,13 @@ func (s *rabbitMQTestSuite) initQueueForTesting(t *testing.T, exchangePattern .. require.NoError(t, err) } } -func (s *rabbitMQTestSuite) getMockData(action string, identifier string) (res interfaces.Message) { +func (*rabbitMQTestSuite) getMockData(action, identifier string) (res interfaces.Message) { res = interfaces.Message{ Action: action, ID: identifier, Topic: testExchange, ContentType: headerVal.ContentTypeJSON, - Data: map[string]interface{}{ + Data: map[string]any{ "message": "hello-world-test", }, Timestamp: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), @@ -150,7 +153,7 @@ func (s *rabbitMQTestSuite) TestPublisher() { ) var err error totalPublishedMessage := 10 - for i := 0; i < totalPublishedMessage; i++ { + for i := range totalPublishedMessage { err = queueSvc.Publish(context.Background(), s.getMockData(testAction, fmt.Sprintf("test-id-%d", i))) s.Require().NoError(err) } @@ -178,7 +181,7 @@ func (s *rabbitMQTestSuite) TestPublisher() { case <-ctx.Done(): done <- true case d := <-msgs: - var content map[string]interface{} + var content map[string]any inErr := json.Unmarshal(d.Body, &content) /* @@ -210,7 +213,8 @@ func (s *rabbitMQTestSuite) TestPublisher() { } }() - logrus.Printf("waiting for the message to be consumed") + log.Info(). + Msg("waiting for the message to be consumed") <-done err = publisher.Close(context.Background()) diff --git a/internal/publisher/rabbitmq/publisher.go b/internal/publisher/rabbitmq/publisher.go index facc61b..ec66cd1 100644 --- a/internal/publisher/rabbitmq/publisher.go +++ b/internal/publisher/rabbitmq/publisher.go @@ -4,6 +4,10 @@ import ( "context" "time" + "github.com/google/uuid" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/rs/zerolog/log" + "github.com/bxcodec/goqueue" "github.com/bxcodec/goqueue/errors" headerKey "github.com/bxcodec/goqueue/headers/key" @@ -12,9 +16,6 @@ import ( "github.com/bxcodec/goqueue/internal/publisher" "github.com/bxcodec/goqueue/middleware" publisherOpts "github.com/bxcodec/goqueue/options/publisher" - "github.com/google/uuid" - amqp "github.com/rabbitmq/amqp091-go" - "github.com/sirupsen/logrus" ) const ( @@ -64,7 +65,7 @@ func NewPublisher( channelPool := NewChannelPool(conn, opt.RabbitMQPublisherConfig.PublisherChannelPoolSize) ch, err := channelPool.Get() if err != nil { - logrus.Fatal(err) + log.Fatal().Err(err).Msg("error getting channel from pool") } defer channelPool.Return(ch) @@ -100,7 +101,7 @@ func (r *rabbitMQ) buildPublisher() interfaces.PublisherFunc { timestamp = time.Now() } - defaultHeaders := map[string]interface{}{ + defaultHeaders := map[string]any{ headerKey.AppID: r.option.PublisherID, headerKey.MessageID: id, headerKey.PublishedTimestamp: timestamp.Format(time.RFC3339), diff --git a/internal/shared/logging.go b/internal/shared/logging.go new file mode 100644 index 0000000..84242ba --- /dev/null +++ b/internal/shared/logging.go @@ -0,0 +1,29 @@ +package shared + +import ( + "sync" + + "github.com/rs/zerolog" + "github.com/rs/zerolog/pkgerrors" +) + +var loggingSetupOnce sync.Once + +// SetupLogging configures zerolog with sensible defaults for goqueue. +// This function is safe to call multiple times - it will only execute once. +// +// It sets: +// - TimeFieldFormat to Unix timestamp format +// - ErrorStackMarshaler to include stack traces in error logs +func SetupLogging() { + loggingSetupOnce.Do(func() { + zerolog.TimeFieldFormat = zerolog.TimeFormatUnix + zerolog.ErrorStackMarshaler = pkgerrors.MarshalStack + }) +} + +//nolint:gochecknoinits // Required for auto-setup of logging when any goqueue package is imported +func init() { + // Automatically setup logging when any goqueue package is imported + SetupLogging() +} diff --git a/middleware/default_errormapper.go b/middleware/default_errormapper.go index 2150c43..295972f 100644 --- a/middleware/default_errormapper.go +++ b/middleware/default_errormapper.go @@ -2,11 +2,31 @@ package middleware import ( "context" + "errors" - "github.com/bxcodec/goqueue/errors" + goqueueErrors "github.com/bxcodec/goqueue/errors" "github.com/bxcodec/goqueue/interfaces" ) +// mapError maps generic errors to specific goqueue error types +func mapError(err error) error { + if err == nil { + return nil + } + + switch { + case errors.Is(err, goqueueErrors.ErrInvalidMessageFormat): + return goqueueErrors.ErrInvalidMessageFormat + case errors.Is(err, goqueueErrors.ErrEncodingFormatNotSupported): + return goqueueErrors.ErrEncodingFormatNotSupported + default: + return goqueueErrors.Error{ + Code: goqueueErrors.UnKnownError, + Message: err.Error(), + } + } +} + // PublisherDefaultErrorMapper returns a middleware function that maps publisher errors to specific error types. // It takes a next PublisherFunc as input and returns a new PublisherFunc that performs error mapping. // If an error occurs during publishing, it will be mapped to a specific error type based on the error code. @@ -15,20 +35,7 @@ func PublisherDefaultErrorMapper() interfaces.PublisherMiddlewareFunc { return func(next interfaces.PublisherFunc) interfaces.PublisherFunc { return func(ctx context.Context, e interfaces.Message) (err error) { err = next(ctx, e) - if err != nil { - switch err { - case errors.ErrInvalidMessageFormat: - return errors.ErrInvalidMessageFormat - case errors.ErrEncodingFormatNotSupported: - return errors.ErrEncodingFormatNotSupported - default: - return errors.Error{ - Code: errors.UnKnownError, - Message: err.Error(), - } - } - } - return nil + return mapError(err) } } } @@ -42,20 +49,7 @@ func InboundMessageHandlerDefaultErrorMapper() interfaces.InboundMessageHandlerM return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { return func(ctx context.Context, m interfaces.InboundMessage) (err error) { err = next(ctx, m) - if err != nil { - switch err { - case errors.ErrInvalidMessageFormat: - return errors.ErrInvalidMessageFormat - case errors.ErrEncodingFormatNotSupported: - return errors.ErrEncodingFormatNotSupported - default: - return errors.Error{ - Code: errors.UnKnownError, - Message: err.Error(), - } - } - } - return nil + return mapError(err) } } } diff --git a/middleware/example.go b/middleware/example.go index a958944..5d505a8 100644 --- a/middleware/example.go +++ b/middleware/example.go @@ -3,14 +3,16 @@ package middleware import ( "context" + "github.com/rs/zerolog/log" + "github.com/bxcodec/goqueue/interfaces" - "github.com/sirupsen/logrus" ) // HelloWorldMiddlewareExecuteAfterInboundMessageHandler returns an inbound message handler middleware function. // This middleware function executes after the inbound message handler and performs additional tasks. // It logs any errors that occur during the execution of the next handler and provides an opportunity to handle them. -// You can customize the error handling logic by adding your own error handler, such as sending errors to Sentry or other error tracking tools. +// You can customize the error handling logic by adding your own error handler, +// such as sending errors to Sentry or other error tracking tools. // After error handling, it logs a message indicating that the hello-world-last-middleware has been executed. // The function signature follows the `interfaces.InboundMessageHandlerMiddlewareFunc` type. func HelloWorldMiddlewareExecuteAfterInboundMessageHandler() interfaces.InboundMessageHandlerMiddlewareFunc { @@ -18,9 +20,9 @@ func HelloWorldMiddlewareExecuteAfterInboundMessageHandler() interfaces.InboundM return func(ctx context.Context, m interfaces.InboundMessage) (err error) { err = next(ctx, m) if err != nil { - logrus.Error("Error: ", err, "add your custom error handler here, eg send to Sentry or other error tracking tools") + log.Error().Err(err).Msg("Error: add your custom error handler here, eg send to Sentry or other error tracking tools") } - logrus.Info("hello-world-last-middleware executed") + log.Info().Msg("hello-world-last-middleware executed") return err } } @@ -31,7 +33,7 @@ func HelloWorldMiddlewareExecuteAfterInboundMessageHandler() interfaces.InboundM func HelloWorldMiddlewareExecuteBeforeInboundMessageHandler() interfaces.InboundMessageHandlerMiddlewareFunc { return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { return func(ctx context.Context, m interfaces.InboundMessage) (err error) { - logrus.Info("hello-world-first-middleware executed") + log.Info().Msg("hello-world-first-middleware executed") return next(ctx, m) } } @@ -44,21 +46,22 @@ func HelloWorldMiddlewareExecuteAfterPublisher() interfaces.PublisherMiddlewareF return func(ctx context.Context, m interfaces.Message) (err error) { err = next(ctx, m) if err != nil { - logrus.Error("got error while publishing the message: ", err) + log.Error().Err(err).Msg("got error while publishing the message") return err } - logrus.Info("hello-world-last-middleware executed") + log.Info().Msg("hello-world-last-middleware executed") return nil } } } // HelloWorldMiddlewareExecuteBeforePublisher is a function that returns a PublisherMiddlewareFunc. -// It wraps the provided PublisherFunc with a middleware that logs a message before executing the next middleware or the actual publisher function. +// It wraps the provided PublisherFunc with a middleware that logs a message before executing +// the next middleware or the actual publisher function. func HelloWorldMiddlewareExecuteBeforePublisher() interfaces.PublisherMiddlewareFunc { return func(next interfaces.PublisherFunc) interfaces.PublisherFunc { return func(ctx context.Context, e interfaces.Message) (err error) { - logrus.Info("hello-world-first-middleware executed") + log.Info().Msg("hello-world-first-middleware executed") return next(ctx, e) } } diff --git a/misc/makefile/tools.Makefile b/misc/makefile/tools.Makefile index b126a4f..921ea91 100644 --- a/misc/makefile/tools.Makefile +++ b/misc/makefile/tools.Makefile @@ -67,7 +67,7 @@ bin/mockery: bin GOLANGCI := $(shell command -v golangci-lint || echo "bin/golangci-lint") golangci-lint: bin/golangci-lint ## Installs golangci-lint (linter) -bin/golangci-lint: VERSION := 1.59.0 +bin/golangci-lint: VERSION := 2.3.1 bin/golangci-lint: GITHUB := golangci/golangci-lint bin/golangci-lint: ARCHIVE := golangci-lint-$(VERSION)-$(OSTYPE)-$(ARCH).tar.gz bin/golangci-lint: bin diff --git a/options/consumer/consumer.go b/options/consumer/consumer.go index 47e89d6..3c6f427 100644 --- a/options/consumer/consumer.go +++ b/options/consumer/consumer.go @@ -1,9 +1,10 @@ package consumer import ( + amqp "github.com/rabbitmq/amqp091-go" + "github.com/bxcodec/goqueue/interfaces" "github.com/bxcodec/goqueue/options" - amqp "github.com/rabbitmq/amqp091-go" ) const ( diff --git a/options/publisher/publisher.go b/options/publisher/publisher.go index 83ed0ed..9611bcd 100644 --- a/options/publisher/publisher.go +++ b/options/publisher/publisher.go @@ -1,11 +1,12 @@ package publisher import ( + "github.com/google/uuid" + amqp "github.com/rabbitmq/amqp091-go" + headerVal "github.com/bxcodec/goqueue/headers/value" "github.com/bxcodec/goqueue/interfaces" "github.com/bxcodec/goqueue/options" - "github.com/google/uuid" - amqp "github.com/rabbitmq/amqp091-go" ) const ( diff --git a/publisher/service.go b/publisher/service.go index 0ce36b2..219d114 100644 --- a/publisher/service.go +++ b/publisher/service.go @@ -3,6 +3,7 @@ package publisher import ( "github.com/bxcodec/goqueue/internal/publisher" "github.com/bxcodec/goqueue/internal/publisher/rabbitmq" + _ "github.com/bxcodec/goqueue/internal/shared" // Auto-setup logging "github.com/bxcodec/goqueue/options" publisherOpts "github.com/bxcodec/goqueue/options/publisher" ) diff --git a/queueservice.go b/queueservice.go index 43457f0..4ff2a52 100644 --- a/queueservice.go +++ b/queueservice.go @@ -5,11 +5,12 @@ import ( "errors" "time" + "golang.org/x/sync/errgroup" + "github.com/bxcodec/goqueue/interfaces" "github.com/bxcodec/goqueue/internal/consumer" "github.com/bxcodec/goqueue/internal/publisher" "github.com/bxcodec/goqueue/options" - "golang.org/x/sync/errgroup" ) // QueueService represents a service that handles message queuing operations. @@ -49,8 +50,8 @@ func (qs *QueueService) Start(ctx context.Context) (err error) { } g, ctx := errgroup.WithContext(ctx) - for i := 0; i < qs.NumberOfConsumer; i++ { - meta := map[string]interface{}{ + for i := range qs.NumberOfConsumer { + meta := map[string]any{ "consumer_id": i, "started_time": time.Now(), } diff --git a/static.go b/static.go new file mode 100644 index 0000000..cbc6bbe --- /dev/null +++ b/static.go @@ -0,0 +1,32 @@ +package goqueue + +import ( + "os" + + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + + "github.com/bxcodec/goqueue/internal/shared" +) + +// SetupLogging configures zerolog with sensible defaults for goqueue. +// This is automatically called when importing consumer or publisher packages, +// but can be called explicitly for custom configuration. +// +// Note: This function is safe to call multiple times. +func SetupLogging() { + shared.SetupLogging() +} + +// SetupLoggingWithDefaults configures zerolog and sets up a default global logger +// with console output and reasonable formatting for development. +// This is useful for development environments or when you want pretty-printed logs. +func SetupLoggingWithDefaults() { + SetupLogging() + + // Set up a nice console logger for development + log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stdout}). + With(). + Caller(). + Logger() +}