-
Notifications
You must be signed in to change notification settings - Fork 3
/
send_queue.c
87 lines (79 loc) · 2.62 KB
/
send_queue.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// Copyright 2019 ETH Zurich
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "send_queue.h"
#include "hercules.h"
#include <stdlib.h>
#include <string.h>
void init_send_queue(struct send_queue *queue, u32 num_entries)
{
queue->units_base = (struct send_queue_unit *)calloc(num_entries + 1, sizeof(struct send_queue_unit));
// make sure units are aligned to cache lines
u32 offset = (size_t)queue->units_base % CACHELINE_SIZE;
queue->units = (void *)((size_t)queue->units_base + CACHELINE_SIZE - offset);
queue->size = num_entries;
queue->head = 0;
queue->tail = 0;
}
void destroy_send_queue(struct send_queue *queue)
{
free(queue->units_base);
queue->units = NULL;
queue->units_base = NULL;
}
// single producer queue: this does not need to be thread-safe
struct send_queue_unit *send_queue_reserve(struct send_queue *queue)
{
u32 current_tail = atomic_load(&queue->tail);
u32 new_tail = (current_tail + 1) % queue->size;
if(new_tail == atomic_load(&queue->head)) {
return NULL; // queue is full
}
return &queue->units[current_tail];
}
// single producer queue: this does not need to be thread-safe
void send_queue_push(struct send_queue *queue)
{
u32 current_tail = atomic_load(&queue->tail);
u32 new_tail = (current_tail + 1) % queue->size;
if(new_tail == atomic_load(&queue->head)) {
debug_printf("cannot push into full send_queue");
exit(129);
}
atomic_store(&queue->tail, new_tail);
}
// returns false if queue empty
bool send_queue_pop(struct send_queue *queue, struct send_queue_unit *unit)
{
while(true) {
u32 current_head = atomic_load(&queue->head);
if(current_head == atomic_load(&queue->tail)) {
return false; // queue is empty
}
// TODO optimize: reserve before copying, release after
memcpy(unit, &queue->units[current_head], sizeof(*unit));
if(atomic_compare_exchange_strong(&queue->head, ¤t_head, (current_head + 1) % queue->size)) {
return true;
}
}
}
// blocks if queue empty
void send_queue_pop_wait(struct send_queue *queue, struct send_queue_unit *unit, bool *block)
{
while(!send_queue_pop(queue, unit)) {
if(block && !atomic_load(block)) {
return;
}
// TODO back-off?
}
}