-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathmpsc.go
More file actions
84 lines (73 loc) · 2.12 KB
/
mpsc.go
File metadata and controls
84 lines (73 loc) · 2.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// Copyright 2022 tink <qietingfy@gmail.com>. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package ringbuffer
import (
"runtime"
"sync/atomic"
"unsafe"
)
// MpscRingBuffer define Multi-Producer/Single Consumer ring buffer
type MpscRingBuffer struct {
ringbuffer
}
var _ RingBuffer = (*MpscRingBuffer)(nil)
// NewMpscRingBuffer return the mpsc ring buffer with specified capacity
func NewMpscRingBuffer(capacity int) *MpscRingBuffer {
return &MpscRingBuffer{
ringbuffer{
head: 0,
tail: 0,
capacity: capacity,
elements: make([]interface{}, capacity),
},
}
}
// Enqueue element to the ring buffer
// if the ring buffer is full, then return ErrIsFull.
func (q *MpscRingBuffer) Enqueue(elem interface{}) error {
if elem == nil {
elem = nilPlaceholder
}
for {
h := atomic.LoadUint64(&q.head)
t := atomic.LoadUint64(&q.tail)
// for a queue that is already full, when atomic loaded q.head, other thread processes happen to be dequeued and enqueued sequentially,
// then maybe t is greater than h + q.capacity
if t >= h+uint64(q.capacity) {
return ErrIsFull
}
slot := (*eface)(unsafe.Pointer(&q.elements[t%uint64(q.capacity)]))
if !atomic.CompareAndSwapUint64(&q.tail, t, t+1) {
runtime.Gosched()
continue
}
eval := *(*eface)(unsafe.Pointer(&elem))
atomic.StorePointer(&slot.typ, eval.typ)
atomic.StorePointer(&slot.val, eval.val)
return nil
}
}
// Dequeue an element from the ring buffer
// if the ring buffer is empty, then return ErrIsEmpty
// When dequeue element to the ring buffer, it may happen that the producer who are working the same slot, so an error is returned
func (q *MpscRingBuffer) Dequeue() (interface{}, error) {
retry:
h := q.head
t := atomic.LoadUint64(&q.tail)
if t == h {
return nil, ErrIsEmpty
}
idx := h % uint64(q.capacity)
slot := (*eface)(unsafe.Pointer(&q.elements[idx]))
if atomic.LoadPointer(&slot.val) == nil {
goto retry
}
elem := q.elements[idx]
q.elements[idx] = nil
atomic.AddUint64(&q.head, 1)
if elem == nilPlaceholder {
return nil, nil
}
return elem, nil
}