-
Notifications
You must be signed in to change notification settings - Fork 0
/
buf_ring.h
248 lines (207 loc) · 5.21 KB
/
buf_ring.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
#ifndef _SYS_BUF_RING_H_
#define _SYS_BUF_RING_H_
#include <machine/cpu.h>
#if defined(INVARIANTS) && !defined(DEBUG_BUFRING)
#define DEBUG_BUFRING 1
#endif
#ifdef DEBUG_BUFRING
#include <sys/lock.h>
#include <sys/mutex.h>
#endif
struct buf_ring {
volatile uint32_t br_prod_head;
volatile uint32_t br_prod_tail;
int br_prod_size;
int br_prod_mask;
uint64_t br_drops;
uint64_t br_prod_bufs;
uint64_t br_prod_bytes;
/*
* Pad out to next L2 cache line
*/
uint64_t _pad0[11];
volatile uint32_t br_cons_head;
volatile uint32_t br_cons_tail;
int br_cons_size;
int br_cons_mask;
/*
* Pad out to next L2 cache line
*/
uint64_t _pad1[14];
#ifdef DEBUG_BUFRING
struct mtx *br_lock;
#endif
void *br_ring[0];
};
/*
* multi-producer safe lock-free ring buffer enqueue
*
*/
static __inline int
buf_ring_enqueue_bytes(struct buf_ring *br, void *buf, int nbytes)
{
uint32_t prod_head, prod_next;
uint32_t cons_tail;
int success;
#ifdef DEBUG_BUFRING
int i;
for (i = br->br_cons_head; i != br->br_prod_head;
i = ((i + 1) & br->br_cons_mask))
if(br->br_ring[i] == buf)
panic("buf=%p already enqueue at %d prod=%d cons=%d",
buf, i, br->br_prod_tail, br->br_cons_tail);
#endif
critical_enter();
do {
prod_head = br->br_prod_head;
cons_tail = br->br_cons_tail;
prod_next = (prod_head + 1) & br->br_prod_mask;
if (prod_next == cons_tail) {
critical_exit();
return (ENOBUFS);
}
success = atomic_cmpset_int(&br->br_prod_head, prod_head,
prod_next);
} while (success == 0);
#ifdef DEBUG_BUFRING
if (br->br_ring[prod_head] != NULL)
panic("dangling value in enqueue");
#endif
br->br_ring[prod_head] = buf;
wmb();
/*
* If there are other enqueues in progress
* that preceeded us, we need to wait for them
* to complete
*/
while (br->br_prod_tail != prod_head)
cpu_spinwait();
br->br_prod_bufs++;
br->br_prod_bytes += nbytes;
br->br_prod_tail = prod_next;
critical_exit();
return (0);
}
static __inline int
buf_ring_enqueue(struct buf_ring *br, void *buf)
{
return (buf_ring_enqueue_bytes(br, buf, 0));
}
/*
* multi-consumer safe dequeue
*
*/
static __inline void *
buf_ring_dequeue_mc(struct buf_ring *br)
{
uint32_t cons_head, cons_next;
uint32_t prod_tail;
void *buf;
int success;
critical_enter();
do {
cons_head = br->br_cons_head;
prod_tail = br->br_prod_tail;
cons_next = (cons_head + 1) & br->br_cons_mask;
if (cons_head == prod_tail) {
critical_exit();
return (NULL);
}
success = atomic_cmpset_int(&br->br_cons_head, cons_head,
cons_next);
} while (success == 0);
buf = br->br_ring[cons_head];
#ifdef DEBUG_BUFRING
br->br_ring[cons_head] = NULL;
#endif
rmb();
/*
* If there are other dequeues in progress
* that preceeded us, we need to wait for them
* to complete
*/
while (br->br_cons_tail != cons_head)
cpu_spinwait();
br->br_cons_tail = cons_next;
critical_exit();
return (buf);
}
/*
* single-consumer dequeue
* use where dequeue is protected by a lock
* e.g. a network driver's tx queue lock
*/
static __inline void *
buf_ring_dequeue_sc(struct buf_ring *br)
{
uint32_t cons_head, cons_next, cons_next_next;
uint32_t prod_tail;
void *buf;
cons_head = br->br_cons_head;
prod_tail = br->br_prod_tail;
cons_next = (cons_head + 1) & br->br_cons_mask;
cons_next_next = (cons_head + 2) & br->br_cons_mask;
if (cons_head == prod_tail)
return (NULL);
#ifdef PREFETCH_DEFINED
if (cons_next != prod_tail) {
prefetch(br->br_ring[cons_next]);
if (cons_next_next != prod_tail)
prefetch(br->br_ring[cons_next_next]);
}
#endif
br->br_cons_head = cons_next;
buf = br->br_ring[cons_head];
#ifdef DEBUG_BUFRING
br->br_ring[cons_head] = NULL;
if (!mtx_owned(br->br_lock))
panic("lock not held on single consumer dequeue");
if (br->br_cons_tail != cons_head)
panic("inconsistent list cons_tail=%d cons_head=%d",
br->br_cons_tail, cons_head);
#endif
br->br_cons_tail = cons_next;
return (buf);
}
/*
* return a pointer to the first entry in the ring
* without modifying it, or NULL if the ring is empty
* race-prone if not protected by a lock
*/
static __inline void *
buf_ring_peek(struct buf_ring *br)
{
#ifdef DEBUG_BUFRING
if ((br->br_lock != NULL) && !mtx_owned(br->br_lock))
panic("lock not held on single consumer dequeue");
#endif
/*
* I believe it is safe to not have a memory barrier
* here because we control cons and tail is worst case
* a lagging indicator so we worst case we might
* return NULL immediately after a buffer has been enqueued
*/
if (br->br_cons_head == br->br_prod_tail)
return (NULL);
return (br->br_ring[br->br_cons_head]);
}
static __inline int
buf_ring_full(struct buf_ring *br)
{
return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail);
}
static __inline int
buf_ring_empty(struct buf_ring *br)
{
return (br->br_cons_head == br->br_prod_tail);
}
static __inline int
buf_ring_count(struct buf_ring *br)
{
return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail)
& br->br_prod_mask);
}
struct buf_ring *buf_ring_alloc(int count, struct malloc_type *type, int flags,
struct mtx *);
void buf_ring_free(struct buf_ring *br, struct malloc_type *type);
#endif