[Bf-blender-cvs] [33dd648605c] profiler-editor: add queue implementation
Jacques Lucke
noreply at git.blender.org
Thu Apr 29 11:30:46 CEST 2021
Commit: 33dd648605c69a0ce472c5ff7be25c0b752b4bd2
Author: Jacques Lucke
Date: Tue Apr 27 11:41:38 2021 +0200
Branches: profiler-editor
https://developer.blender.org/rB33dd648605c69a0ce472c5ff7be25c0b752b4bd2
add queue implementation
===================================================================
A source/blender/blenlib/BLI_single_producer_chunk_consumer.hh
M source/blender/blenlib/CMakeLists.txt
M source/blender/blenlib/intern/profile.cc
===================================================================
diff --git a/source/blender/blenlib/BLI_single_producer_chunk_consumer.hh b/source/blender/blenlib/BLI_single_producer_chunk_consumer.hh
new file mode 100644
index 00000000000..cd8b647a276
--- /dev/null
+++ b/source/blender/blenlib/BLI_single_producer_chunk_consumer.hh
@@ -0,0 +1,211 @@
+/*
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+#pragma once
+
+/** \file
+ * \ingroup bli
+ *
+ * A `SingleProducerChunkConsumerQueue<T>` is designed to handle the case when
+ * - A single producer thread wants to append elements to the queue very efficiently.
+ * - A single consumer thread wants to consume large chunks from the queue at a time.
+ * - The producer and consumer might run on different threads.
+ */
+
+#include <atomic>
+
+#include "BLI_allocator.hh"
+#include "BLI_function_ref.hh"
+#include "BLI_span.hh"
+
+namespace blender {
+
+template<typename T> struct SingleProducerChunkConsumerQueueChunk {
+ /**
+ * Points to the next chunk that contains the elements added after the elements in this chunk.
+ * This is only modified during the append-operation. When it is not null, it means that the
+ * append-operation will not look at this chunk anymore.
+ */
+ std::atomic<SingleProducerChunkConsumerQueueChunk *> next = nullptr;
+
+ /**
+ * Number of elements that have been committed to the chunk and won't be modified anymore.
+ * This is modified during the append-operation and is only increasing.
+ */
+ std::atomic<int64_t> committed_size = 0;
+
+ /**
+ * Number of elements that have been consumed already from this chunk.
+ * This is only accessed by the consume-operation.
+ */
+ int64_t consumed_size = 0;
+
+ /**
+ * Begin and end of the entire chunk buffer. Those are only set during construction and don't
+ * change anymore afterwards.
+ */
+ T *begin = nullptr;
+ T *capacity_end = nullptr;
+
+ /**
+ * This is modified by the append-operation and not accessed by the consume-operation.
+ */
+ T *end = nullptr;
+};
+
+template<typename T, typename Allocator = GuardedAllocator>
+class SingleProducerChunkConsumerQueue {
+ private:
+ using Chunk = SingleProducerChunkConsumerQueueChunk<T>;
+ static constexpr inline int64_t ChunkCapacity = 1000;
+
+ Allocator allocator_;
+
+ /* Is only modified in constructor and during consume. */
+ Chunk *begin_;
+
+ /* Is only accessed when appending. */
+ Chunk *current_;
+
+ public:
+ SingleProducerChunkConsumerQueue()
+ {
+ /* Create the first chunk in the constructor, so that the append-operation does not have to
+ * handle this case. */
+ begin_ = this->new_chunk();
+ current_ = begin_;
+ }
+
+ ~SingleProducerChunkConsumerQueue()
+ {
+ Chunk *chunk = begin_;
+ while (chunk) {
+ Chunk *next_chunk = chunk->next;
+ this->delete_chunk(chunk);
+ chunk = next_chunk;
+ }
+ }
+
+ /**
+ * Start appending a new element.
+ * Returns a pointer to uninitialized memory.
+ * The caller is responsible for constructing the new element in the returned buffer.
+ * This must be used in conjunction with #commit_append.
+ */
+ T *prepare_append()
+ {
+ if (current_->end == current_->capacity_end) {
+ /* Create a new chunk when the current one is full. */
+ Chunk *new_chunk = this->new_chunk();
+ /* This tells the consume-operation that the append-operation does not look at this chunk
+ * anymore. */
+ current_->next.store(new_chunk, std::memory_order_release);
+ current_ = new_chunk;
+ }
+ /* Return a pointer to the next uninitialized element. */
+ return current_->end;
+ }
+
+ /**
+ * Tell the queue that the element has been constructed and is ready to be committed.
+ * Once it is committed, the next consumer can read it.
+ */
+ void commit_append()
+ {
+ current_->end++;
+ /* Compute the committed size like that instead of doing an increment to avoid having a
+ * read-modify-write operation on an atomic variable which could be more expensive than just
+ * writing to it. */
+ const int64_t new_committed_size = current_->end - current_->begin;
+ current_->committed_size.store(new_committed_size, std::memory_order_release);
+ }
+
+ /**
+ * Get access to all newly committed elements in this queue.
+ * Only a single thread is allowed to entire this function.
+ * However, another thread is allowed to perform an append-operation at the same time.
+ * The spans passed to `consume_fn` are valid until `free_consumed` has been called.
+ */
+ void consume(const FunctionRef<void(Span<T>)> consume_fn)
+ {
+ Chunk *chunk = begin_;
+ while (chunk) {
+ const int64_t already_consumed_size = chunk->consumed_size;
+ const int64_t committed_chunk_size = chunk->committed_size.load(std::memory_order_acquire);
+ const int64_t newly_consumed_size = committed_chunk_size - already_consumed_size;
+
+ const Span<T> committed_data{chunk->begin + already_consumed_size, newly_consumed_size};
+ consume_fn(committed_data);
+ chunk->consumed_size = committed_chunk_size;
+
+ /* Only try to consume the next chunk, if all elements from this chunk have been consumed. */
+ if (committed_chunk_size == ChunkCapacity) {
+ chunk = chunk->next.load(std::memory_order_acquire);
+ }
+ else {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Free chunks that have been consumed already and won't be accessed anymore.
+ * Calling this will invalidate the spans provided by #consume.
+ */
+ void free_consumed()
+ {
+ Chunk *chunk = begin_;
+ while (chunk) {
+ const int64_t consumed_size = chunk->consumed_size;
+ /* Check if the entire capacity of the chunk has been consumed. */
+ if (consumed_size == ChunkCapacity) {
+ /* Check if the append-operation might still access this chunk. */
+ Chunk *next_chunk = chunk->next.load(std::memory_order_acquire);
+ if (next_chunk != nullptr) {
+ begin_ = next_chunk;
+ this->delete_chunk(chunk);
+ }
+ chunk = next_chunk;
+ }
+ else {
+ break;
+ }
+ }
+ }
+
+ private:
+ Chunk *new_chunk()
+ {
+ /* We could also combine both allocations in a single one. */
+ void *chunk_buffer = allocator_.allocate(sizeof(Chunk), alignof(Chunk), __func__);
+ Chunk *chunk = new (chunk_buffer) Chunk();
+ chunk->begin = (T *)allocator_.allocate(
+ sizeof(T) * (size_t)ChunkCapacity, alignof(T), __func__);
+ chunk->end = chunk->begin;
+ chunk->capacity_end = chunk->begin + ChunkCapacity;
+ return chunk;
+ }
+
+ void delete_chunk(Chunk *chunk)
+ {
+ destruct_n(chunk->begin, chunk->committed_size);
+ allocator_.deallocate(chunk->data);
+ chunk->~Chunk();
+ allocator_.deallocate(chunk);
+ }
+};
+
+} // namespace blender
diff --git a/source/blender/blenlib/CMakeLists.txt b/source/blender/blenlib/CMakeLists.txt
index 95789c1fa00..06f937ce5ca 100644
--- a/source/blender/blenlib/CMakeLists.txt
+++ b/source/blender/blenlib/CMakeLists.txt
@@ -268,6 +268,7 @@ set(SRC
BLI_set.hh
BLI_set_slots.hh
BLI_simd.h
+ BLI_single_producer_chunk_consumer.hh
BLI_smallhash.h
BLI_sort.h
BLI_sort_utils.h
diff --git a/source/blender/blenlib/intern/profile.cc b/source/blender/blenlib/intern/profile.cc
index e9f409c8649..e55857b68fb 100644
--- a/source/blender/blenlib/intern/profile.cc
+++ b/source/blender/blenlib/intern/profile.cc
@@ -17,8 +17,10 @@
#include <atomic>
#include <mutex>
+#include "BLI_function_ref.hh"
#include "BLI_profile.hh"
#include "BLI_profile_manage.hh"
+#include "BLI_single_producer_chunk_consumer.hh"
#include "BLI_stack.hh"
using namespace blender;
More information about the Bf-blender-cvs
mailing list