Skip to content

Commit bb93e0b

Browse files
committed
Init commit
0 parents  commit bb93e0b

18 files changed

Lines changed: 1120 additions & 0 deletions

.github/workflows/go.yml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
name: Go
2+
3+
on:
4+
push:
5+
branches: [ main ]
6+
pull_request:
7+
branches: [ main ]
8+
9+
jobs:
10+
11+
test:
12+
strategy:
13+
matrix:
14+
go_version: [ '1.14', '1.15', '1.16', '1.17' ]
15+
os: [ 'ubuntu-latest', 'windows-latest', 'macOS-latest' ]
16+
runs-on: ${{ matrix.os }}
17+
steps:
18+
- uses: actions/checkout@v2
19+
20+
- name: Use GO ${{ matrix.go_version }}
21+
uses: actions/setup-go@v2
22+
with:
23+
go-version: ${{ matrix.go_version }}
24+
25+
- name: Test GO ${{ matrix.go_version }}
26+
run: go test ./... -race -coverprofile=coverage.txt -covermode=atomic
27+
28+
- name: Upload to codecov
29+
if: ${{ matrix.os == 'ubuntu-latest' && matrix.go_version == '1.17' }}
30+
run: bash <(curl -s https://codecov.io/bash)

.gitignore

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
2+
# Created by https://www.toptal.com/developers/gitignore/api/go
3+
# Edit at https://www.toptal.com/developers/gitignore?templates=go
4+
5+
### Go ###
6+
# Binaries for programs and plugins
7+
*.exe
8+
*.exe~
9+
*.dll
10+
*.so
11+
*.dylib
12+
13+
# Test binary, built with `go test -c`
14+
*.test
15+
16+
# Output of the go coverage tool, specifically when used with LiteIDE
17+
*.out
18+
19+
# Dependency directories (remove the comment below to include it)
20+
# vendor/
21+
22+
### Go Patch ###
23+
/vendor/
24+
/Godeps/
25+
26+
# End of https://www.toptal.com/developers/gitignore/api/go

.idea/.gitignore

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/goqueue.iml

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/modules.xml

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/vcs.xml

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

LICENSE

Lines changed: 504 additions & 0 deletions
Large diffs are not rendered by default.

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# goqueue
2+
3+
[![Go](https://github.com/memclutter/goqueue/actions/workflows/go.yml/badge.svg)](https://github.com/memclutter/goqueue/actions/workflows/go.yml)
4+
[![codecov](https://codecov.io/gh/memclutter/goqueue/branch/main/graph/badge.svg?token=57IA9OCZFD)](https://codecov.io/gh/memclutter/goqueue)
5+
6+
Simple rabbitmq queue wrapper for golang.
7+

callback.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package goqueue
2+
3+
import (
4+
log "github.com/sirupsen/logrus"
5+
"github.com/streadway/amqp"
6+
)
7+
8+
type Callback func(amqp.Delivery, *log.Entry) (Retry, error)

consumer.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package goqueue
2+
3+
import (
4+
"fmt"
5+
log "github.com/sirupsen/logrus"
6+
"github.com/streadway/amqp"
7+
"time"
8+
)
9+
10+
type Consumer interface {
11+
Consume() error
12+
}
13+
14+
func NewConsumer(queue string, retryIntervals []time.Duration, callback Callback, amqpConn *amqp.Connection, consumerLog *log.Entry) Consumer {
15+
if consumerLog == nil {
16+
consumerLog = log.WithFields(log.Fields{"_default": true})
17+
}
18+
consumerLog = consumerLog.WithFields(log.Fields{"queue": queue})
19+
20+
return &DefaultConsumer{
21+
queue: queue,
22+
retryIntervals: retryIntervals,
23+
callback: callback,
24+
amqpConn: amqpConn,
25+
log: consumerLog,
26+
}
27+
}
28+
29+
type DefaultConsumer struct {
30+
queue string
31+
retryIntervals []time.Duration
32+
callback Callback
33+
amqpConn *amqp.Connection
34+
amqpCh *amqp.Channel
35+
log *log.Entry
36+
}
37+
38+
func (c DefaultConsumer) Consume() error {
39+
40+
var err error
41+
42+
// Open channel in rabbitmq
43+
c.amqpCh, err = c.amqpConn.Channel()
44+
if err != nil {
45+
return fmt.Errorf("open channel error: %v", err)
46+
}
47+
defer func() {
48+
if err := c.amqpCh.Close(); err != nil {
49+
c.log.Errorf("close channel error (ignore): %v", err)
50+
}
51+
}()
52+
53+
// Declare exchanges, queues, set qos etc
54+
if err = c.declare(); err != nil {
55+
return fmt.Errorf("declare error: %v", err)
56+
}
57+
58+
// Create co channel for consume deliveries
59+
deliveries, err := c.amqpCh.Consume(c.queue, "", false, false, false, false, nil)
60+
if err != nil {
61+
return fmt.Errorf("error create consume channel: %v", err)
62+
}
63+
64+
// Process deliveries
65+
for delivery := range deliveries {
66+
67+
// How more this delivery retry
68+
retry := getRetry(delivery.Headers)
69+
70+
// Context logger for delivery
71+
deliveryLog := c.log.WithFields(log.Fields{
72+
"deliveryTag": delivery.DeliveryTag,
73+
"retry": retry,
74+
})
75+
deliveryLog.Infof("receive delivery")
76+
deliveryLog.Debugf("body: %s", delivery.Body)
77+
78+
// Process with retries logic
79+
c.ProcessDelivery(delivery, deliveryLog, retry)
80+
}
81+
82+
return nil
83+
}
84+
85+
func (c DefaultConsumer) ProcessDelivery(delivery amqp.Delivery, deliveryLog *log.Entry, retry int) {
86+
defer func() {
87+
if err := delivery.Ack(false); err != nil {
88+
deliveryLog.Errorf("ack error (ignored): %v", err)
89+
}
90+
}()
91+
92+
// Callback for user code to process delivery. If error, check retryStatus
93+
// - RetryNext - redelivery this delivery for retry after some intervals
94+
// - RetryStop - ignore
95+
retryStatus, err := c.callback(delivery, deliveryLog)
96+
if err != nil {
97+
deliveryLog.Warnf("callback error: %v", err)
98+
switch retryStatus {
99+
case RetryNext:
100+
deliveryLog.Infof("retry next")
101+
if err := Publish(c.amqpCh, c.queue, delivery.Body, retry); err != nil {
102+
deliveryLog.Errorf("error retry: %v", err)
103+
}
104+
case RetryStop:
105+
deliveryLog.Infof("retry stop")
106+
default:
107+
deliveryLog.Warnf("retry ignore")
108+
}
109+
}
110+
}

0 commit comments

Comments
 (0)