Project Alice
Loading...
Searching...
No Matches
SPSCQueue.h
Go to the documentation of this file.
1/*
2Copyright (c) 2020 Erik Rigtorp <erik@rigtorp.se>
3
4Permission is hereby granted, free of charge, to any person obtaining a copy
5of this software and associated documentation files (the "Software"), to deal
6in the Software without restriction, including without limitation the rights
7to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8copies of the Software, and to permit persons to whom the Software is
9furnished to do so, subject to the following conditions:
10
11The above copyright notice and this permission notice shall be included in all
12copies or substantial portions of the Software.
13
14THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20SOFTWARE.
21 */
22
23#pragma once
24
25#include <atomic>
26#include <cassert>
27#include <cstddef>
28#include <memory> // std::allocator
29#include <new> // std::hardware_destructive_interference_size
30#include <stdexcept>
31#include <type_traits> // std::enable_if, std::is_*_constructible
32
33#ifdef __has_cpp_attribute
34#if __has_cpp_attribute(nodiscard)
35#define RIGTORP_NODISCARD [[nodiscard]]
36#endif
37#endif
38#ifndef RIGTORP_NODISCARD
39#define RIGTORP_NODISCARD
40#endif
41
42namespace rigtorp {
43
44template <typename T, typename Allocator = std::allocator<T>> class SPSCQueue {
45
46#if defined(__cpp_if_constexpr) && defined(__cpp_lib_void_t)
47 template <typename Alloc2, typename = void>
48 struct has_allocate_at_least : std::false_type {};
49
50 template <typename Alloc2>
51 struct has_allocate_at_least<
52 Alloc2, std::void_t<typename Alloc2::value_type,
53 decltype(std::declval<Alloc2 &>().allocate_at_least(
54 size_t{}))>> : std::true_type {};
55#endif
56
57public:
58 explicit SPSCQueue(const size_t capacity,
59 const Allocator &allocator = Allocator())
60 : capacity_(capacity), allocator_(allocator) {
61 // The queue needs at least one element
62 if (capacity_ < 1) {
63 capacity_ = 1;
64 }
65 capacity_++; // Needs one slack element
66 // Prevent overflowing size_t
67 if (capacity_ > SIZE_MAX - 2 * kPadding) {
68 capacity_ = SIZE_MAX - 2 * kPadding;
69 }
70
71#if defined(__cpp_if_constexpr) && defined(__cpp_lib_void_t)
72 if constexpr (has_allocate_at_least<Allocator>::value) {
73 auto res = allocator_.allocate_at_least(capacity_ + 2 * kPadding);
74 slots_ = res.ptr;
75 capacity_ = res.count - 2 * kPadding;
76 } else {
77 slots_ = std::allocator_traits<Allocator>::allocate(
78 allocator_, capacity_ + 2 * kPadding);
79 }
80#else
81 slots_ = std::allocator_traits<Allocator>::allocate(
82 allocator_, capacity_ + 2 * kPadding);
83#endif
84
85 static_assert(alignof(SPSCQueue<T>) == kCacheLineSize, "");
86 static_assert(sizeof(SPSCQueue<T>) >= 3 * kCacheLineSize, "");
87 assert(reinterpret_cast<char *>(&readIdx_) -
88 reinterpret_cast<char *>(&writeIdx_) >=
89 static_cast<std::ptrdiff_t>(kCacheLineSize));
90 }
91
93 while (front()) {
94 pop();
95 }
96 std::allocator_traits<Allocator>::deallocate(allocator_, slots_,
97 capacity_ + 2 * kPadding);
98 }
99
100 // non-copyable and non-movable
101 SPSCQueue(const SPSCQueue &) = delete;
102 SPSCQueue &operator=(const SPSCQueue &) = delete;
103
104 template <typename... Args>
105 void emplace(Args &&...args) noexcept(
106 std::is_nothrow_constructible<T, Args &&...>::value) {
107 static_assert(std::is_constructible<T, Args &&...>::value,
108 "T must be constructible with Args&&...");
109 auto const writeIdx = writeIdx_.load(std::memory_order_relaxed);
110 auto nextWriteIdx = writeIdx + 1;
111 if (nextWriteIdx == capacity_) {
112 nextWriteIdx = 0;
113 }
114 while (nextWriteIdx == readIdxCache_) {
115 readIdxCache_ = readIdx_.load(std::memory_order_acquire);
116 }
117 new (&slots_[writeIdx + kPadding]) T(std::forward<Args>(args)...);
118 writeIdx_.store(nextWriteIdx, std::memory_order_release);
119 }
120
121 template <typename... Args>
122 RIGTORP_NODISCARD bool try_emplace(Args &&...args) noexcept(
123 std::is_nothrow_constructible<T, Args &&...>::value) {
124 static_assert(std::is_constructible<T, Args &&...>::value,
125 "T must be constructible with Args&&...");
126 auto const writeIdx = writeIdx_.load(std::memory_order_relaxed);
127 auto nextWriteIdx = writeIdx + 1;
128 if (nextWriteIdx == capacity_) {
129 nextWriteIdx = 0;
130 }
131 if (nextWriteIdx == readIdxCache_) {
132 readIdxCache_ = readIdx_.load(std::memory_order_acquire);
133 if (nextWriteIdx == readIdxCache_) {
134 return false;
135 }
136 }
137 new (&slots_[writeIdx + kPadding]) T(std::forward<Args>(args)...);
138 writeIdx_.store(nextWriteIdx, std::memory_order_release);
139 return true;
140 }
141
142 void push(const T &v) noexcept(std::is_nothrow_copy_constructible<T>::value) {
143 static_assert(std::is_copy_constructible<T>::value,
144 "T must be copy constructible");
145 emplace(v);
146 }
147
148 template <typename P, typename = typename std::enable_if<
149 std::is_constructible<T, P &&>::value>::type>
150 void push(P &&v) noexcept(std::is_nothrow_constructible<T, P &&>::value) {
151 emplace(std::forward<P>(v));
152 }
153
155 try_push(const T &v) noexcept(std::is_nothrow_copy_constructible<T>::value) {
156 static_assert(std::is_copy_constructible<T>::value,
157 "T must be copy constructible");
158 return try_emplace(v);
159 }
160
161 template <typename P, typename = typename std::enable_if<
162 std::is_constructible<T, P &&>::value>::type>
164 try_push(P &&v) noexcept(std::is_nothrow_constructible<T, P &&>::value) {
165 return try_emplace(std::forward<P>(v));
166 }
167
168 RIGTORP_NODISCARD T *front() noexcept {
169 auto const readIdx = readIdx_.load(std::memory_order_relaxed);
170 if (readIdx == writeIdxCache_) {
171 writeIdxCache_ = writeIdx_.load(std::memory_order_acquire);
172 if (writeIdxCache_ == readIdx) {
173 return nullptr;
174 }
175 }
176 return &slots_[readIdx + kPadding];
177 }
178
179 void pop() noexcept {
180 static_assert(std::is_nothrow_destructible<T>::value,
181 "T must be nothrow destructible");
182 auto const readIdx = readIdx_.load(std::memory_order_relaxed);
183 assert(writeIdx_.load(std::memory_order_acquire) != readIdx &&
184 "Can only call pop() after front() has returned a non-nullptr");
185 slots_[readIdx + kPadding].~T();
186 auto nextReadIdx = readIdx + 1;
187 if (nextReadIdx == capacity_) {
188 nextReadIdx = 0;
189 }
190 readIdx_.store(nextReadIdx, std::memory_order_release);
191 }
192
193 RIGTORP_NODISCARD size_t size() const noexcept {
194 std::ptrdiff_t diff = writeIdx_.load(std::memory_order_acquire) -
195 readIdx_.load(std::memory_order_acquire);
196 if (diff < 0) {
197 diff += capacity_;
198 }
199 return static_cast<size_t>(diff);
200 }
201
202 RIGTORP_NODISCARD bool empty() const noexcept {
203 return writeIdx_.load(std::memory_order_acquire) ==
204 readIdx_.load(std::memory_order_acquire);
205 }
206
207 RIGTORP_NODISCARD size_t capacity() const noexcept { return capacity_ - 1; }
208
209private:
210#ifdef __cpp_lib_hardware_interference_size
211 static constexpr size_t kCacheLineSize =
212 std::hardware_destructive_interference_size;
213#else
214 static constexpr size_t kCacheLineSize = 64;
215#endif
216
217 // Padding to avoid false sharing between slots_ and adjacent allocations
218 static constexpr size_t kPadding = (kCacheLineSize - 1) / sizeof(T) + 1;
219
220private:
221 size_t capacity_;
222 T *slots_;
223#if defined(__has_cpp_attribute) && __has_cpp_attribute(no_unique_address)
224 Allocator allocator_ [[no_unique_address]];
225#else
226 Allocator allocator_;
227#endif
228
229 // Align to cache line size in order to avoid false sharing
230 // readIdxCache_ and writeIdxCache_ is used to reduce the amount of cache
231 // coherency traffic
232 alignas(kCacheLineSize) std::atomic<size_t> writeIdx_ = {0};
233 alignas(kCacheLineSize) size_t readIdxCache_ = 0;
234 alignas(kCacheLineSize) std::atomic<size_t> readIdx_ = {0};
235 alignas(kCacheLineSize) size_t writeIdxCache_ = 0;
236};
237} // namespace rigtorp
#define RIGTORP_NODISCARD
Definition: SPSCQueue.h:39
RIGTORP_NODISCARD T * front() noexcept
Definition: SPSCQueue.h:168
SPSCQueue(const SPSCQueue &)=delete
SPSCQueue & operator=(const SPSCQueue &)=delete
void emplace(Args &&...args) noexcept(std::is_nothrow_constructible< T, Args &&... >::value)
Definition: SPSCQueue.h:105
void push(P &&v) noexcept(std::is_nothrow_constructible< T, P && >::value)
Definition: SPSCQueue.h:150
RIGTORP_NODISCARD bool empty() const noexcept
Definition: SPSCQueue.h:202
RIGTORP_NODISCARD bool try_push(P &&v) noexcept(std::is_nothrow_constructible< T, P && >::value)
Definition: SPSCQueue.h:164
RIGTORP_NODISCARD bool try_emplace(Args &&...args) noexcept(std::is_nothrow_constructible< T, Args &&... >::value)
Definition: SPSCQueue.h:122
RIGTORP_NODISCARD size_t capacity() const noexcept
Definition: SPSCQueue.h:207
RIGTORP_NODISCARD size_t size() const noexcept
Definition: SPSCQueue.h:193
void push(const T &v) noexcept(std::is_nothrow_copy_constructible< T >::value)
Definition: SPSCQueue.h:142
SPSCQueue(const size_t capacity, const Allocator &allocator=Allocator())
Definition: SPSCQueue.h:58
void pop() noexcept
Definition: SPSCQueue.h:179
RIGTORP_NODISCARD bool try_push(const T &v) noexcept(std::is_nothrow_copy_constructible< T >::value)
Definition: SPSCQueue.h:155
#define assert(condition)
Definition: debug.h:74
Definition: json.hpp:5434