-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathspsc.go
More file actions
62 lines (53 loc) · 1.39 KB
/
spsc.go
File metadata and controls
62 lines (53 loc) · 1.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// Copyright 2022 tink <qietingfy@gmail.com>. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package ringbuffer
import (
"sync/atomic"
)
// SpscRingBuffer define Single Producer/Single Consumer ring buffer
type SpscRingBuffer struct {
ringbuffer
}
var _ RingBuffer = (*SpscRingBuffer)(nil)
// NewSpscRingBuffer return the spsc ring buffer with specified capacity
func NewSpscRingBuffer(capacity int) *SpscRingBuffer {
return &SpscRingBuffer{
ringbuffer{
head: 0,
tail: 0,
capacity: capacity,
elements: make([]interface{}, capacity),
},
}
}
// Enqueue element to the ring buffer
// if the ring buffer is full, then return ErrIsFull
func (q *SpscRingBuffer) Enqueue(elem interface{}) error {
if elem == nil {
elem = nilPlaceholder
}
h := atomic.LoadUint64(&q.head)
t := q.tail
if t >= h+uint64(q.capacity) {
return ErrIsFull
}
q.elements[t%uint64(q.capacity)] = elem
atomic.AddUint64(&q.tail, 1)
return nil
}
// Dequeue an element from the ring buffer
// if the ring buffer is empty, then return ErrIsEmpty
func (q *SpscRingBuffer) Dequeue() (interface{}, error) {
h := q.head
t := atomic.LoadUint64(&q.tail)
if t == h {
return nil, ErrIsEmpty
}
elem := q.elements[h%uint64(q.capacity)]
atomic.AddUint64(&q.head, 1)
if elem == nilPlaceholder {
return nil, nil
}
return elem, nil
}