[Bf-blender-cvs] [29433da4c6e] master: BLI_task: Add new generic `BLI_task_parallel_iterator()`.

Bastien Montagne noreply at git.blender.org
Wed Oct 30 12:24:33 CET 2019


Commit: 29433da4c6e4b2da6f3fa3d1868c5039b4e6be70
Author: Bastien Montagne
Date:   Wed Oct 30 12:23:45 2019 +0100
Branches: master
https://developer.blender.org/rB29433da4c6e4b2da6f3fa3d1868c5039b4e6be70

BLI_task: Add new generic `BLI_task_parallel_iterator()`.

This new function is part of the 'parallel for loops' functions. It
takes an iterator callback to generate items to be processed, in
addition to the usual 'process' func callback.

This allows to use common code from BLI_task for a wide range of custom
iteratiors, whithout having to re-invent the wheel of the whole tasks &
data chuncks handling.

This supports all settings features from `BLI_task_parallel_range()`,
including dynamic and static (if total number of items is knwon)
scheduling, TLS data and its finalize callback, etc.

One question here is whether we should provide usercode with a spinlock
by default, or enforce it to always handle its own sync mechanism.
I kept it, since imho it will be needed very often, and generating one
is pretty cheap even if unused...

----------

Additionaly, this commit converts (currently unused)
`BLI_task_parallel_listbase()` to use that generic code. This was done
mostly as proof of concept, but performance-wise it shows some
interesting data, roughly:
 - Very light processing (that should not be threaded anyway) is several
   times slower, which is expected due to more overhead in loop management
   code.
 - Heavier processing can be up to 10% quicker (probably thanks to the
   switch from dynamic to static scheduling, which reduces a lot locking
   to fill-in the per-tasks chunks of data). Similar speed-up in
   non-threaded case comes as a surprise though, not sure what can
   explain that.

While this conversion is not really needed, imho we should keep it
(instead of existing code for that function), it's easier to have
complex handling logic in as few places as possible, for maintaining and
for improving it.

Note: That work was initially done to allow for D5372 to be possible... Unfortunately that one proved to be not better  than orig code on performances point of view.

Reviewed By: sergey

Differential Revision: https://developer.blender.org/D5371

===================================================================

M	source/blender/blenlib/BLI_task.h
M	source/blender/blenlib/intern/task.c
M	tests/gtests/blenlib/BLI_task_performance_test.cc
M	tests/gtests/blenlib/BLI_task_test.cc

===================================================================

diff --git a/source/blender/blenlib/BLI_task.h b/source/blender/blenlib/BLI_task.h
index 568d6c9a84a..7ef5e518cc8 100644
--- a/source/blender/blenlib/BLI_task.h
+++ b/source/blender/blenlib/BLI_task.h
@@ -198,11 +198,45 @@ void BLI_task_parallel_range(const int start,
                              TaskParallelRangeFunc func,
                              const TaskParallelSettings *settings);
 
-typedef void (*TaskParallelListbaseFunc)(void *userdata, struct Link *iter, int index);
+/* This data is shared between all tasks, its access needs thread lock or similar protection. */
+typedef struct TaskParallelIteratorStateShared {
+  /* Maximum amount of items to acquire at once. */
+  int chunk_size;
+  /* Next item to be acquired. */
+  void *next_item;
+  /* Index of the next item to be acquired. */
+  int next_index;
+  /* Indicates that end of iteration has been reached. */
+  bool is_finished;
+  /* Helper lock to protect access to this data in iterator getter callback,
+   * can be ignored (if the callback implements its own protection system, using atomics e.g.).
+   * Will be NULL when iterator is actually processed in a single thread. */
+  SpinLock *spin_lock;
+} TaskParallelIteratorStateShared;
+
+typedef void (*TaskParallelIteratorIterFunc)(void *__restrict userdata,
+                                             const TaskParallelTLS *__restrict tls,
+                                             void **r_next_item,
+                                             int *r_next_index,
+                                             bool *r_do_abort);
+
+typedef void (*TaskParallelIteratorFunc)(void *__restrict userdata,
+                                         void *item,
+                                         int index,
+                                         const TaskParallelTLS *__restrict tls);
+
+void BLI_task_parallel_iterator(void *userdata,
+                                TaskParallelIteratorIterFunc iter_func,
+                                void *init_item,
+                                const int init_index,
+                                const int tot_items,
+                                TaskParallelIteratorFunc func,
+                                const TaskParallelSettings *settings);
+
 void BLI_task_parallel_listbase(struct ListBase *listbase,
                                 void *userdata,
-                                TaskParallelListbaseFunc func,
-                                const bool use_threading);
+                                TaskParallelIteratorFunc func,
+                                const TaskParallelSettings *settings);
 
 typedef struct MempoolIterData MempoolIterData;
 typedef void (*TaskParallelMempoolFunc)(void *userdata, MempoolIterData *iter);
diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c
index 6cdaec97d9a..bb69dc6452f 100644
--- a/source/blender/blenlib/intern/task.c
+++ b/source/blender/blenlib/intern/task.c
@@ -149,7 +149,7 @@ typedef struct TaskThreadLocalStorage {
    * without "interrupting" for task execution.
    *
    * We try to accumulate as much tasks as possible in a local queue without
-   * any locks first, and then we push all of them into a schedulers queue
+   * any locks first, and then we push all of them into a scheduler's queue
    * from within a single mutex lock.
    */
   bool do_delayed_push;
@@ -1052,14 +1052,20 @@ typedef struct ParallelRangeState {
   int chunk_size;
 } ParallelRangeState;
 
-BLI_INLINE void task_parallel_range_calc_chunk_size(const TaskParallelSettings *settings,
-                                                    const int num_tasks,
-                                                    ParallelRangeState *state)
+BLI_INLINE void task_parallel_calc_chunk_size(const TaskParallelSettings *settings,
+                                              const int tot_items,
+                                              int num_tasks,
+                                              int *r_chunk_size)
 {
-  const int tot_items = state->stop - state->start;
   int chunk_size = 0;
 
-  if (settings->min_iter_per_thread > 0) {
+  if (!settings->use_threading) {
+    /* Some users of this helper will still need a valid chunk size in case processing is not
+     * threaded. We can use a bigger one than in default threaded case then. */
+    chunk_size = 1024;
+    num_tasks = 1;
+  }
+  else if (settings->min_iter_per_thread > 0) {
     /* Already set by user, no need to do anything here. */
     chunk_size = settings->min_iter_per_thread;
   }
@@ -1091,16 +1097,30 @@ BLI_INLINE void task_parallel_range_calc_chunk_size(const TaskParallelSettings *
 
   BLI_assert(chunk_size > 0);
 
-  switch (settings->scheduling_mode) {
-    case TASK_SCHEDULING_STATIC:
-      state->chunk_size = max_ii(chunk_size, tot_items / (num_tasks));
-      break;
-    case TASK_SCHEDULING_DYNAMIC:
-      state->chunk_size = chunk_size;
-      break;
+  if (tot_items > 0) {
+    switch (settings->scheduling_mode) {
+      case TASK_SCHEDULING_STATIC:
+        *r_chunk_size = max_ii(chunk_size, tot_items / num_tasks);
+        break;
+      case TASK_SCHEDULING_DYNAMIC:
+        *r_chunk_size = chunk_size;
+        break;
+    }
+  }
+  else {
+    /* If total amount of items is unknown, we can only use dynamic scheduling. */
+    *r_chunk_size = chunk_size;
   }
 }
 
+BLI_INLINE void task_parallel_range_calc_chunk_size(const TaskParallelSettings *settings,
+                                                    const int num_tasks,
+                                                    ParallelRangeState *state)
+{
+  task_parallel_calc_chunk_size(
+      settings, state->stop - state->start, num_tasks, &state->chunk_size);
+}
+
 BLI_INLINE bool parallel_range_next_iter_get(ParallelRangeState *__restrict state,
                                              int *__restrict iter,
                                              int *__restrict count)
@@ -1256,77 +1276,239 @@ void BLI_task_parallel_range(const int start,
   }
 }
 
-#undef MALLOCA
-#undef MALLOCA_FREE
-
-typedef struct ParallelListbaseState {
+typedef struct TaskParallelIteratorState {
   void *userdata;
-  TaskParallelListbaseFunc func;
+  TaskParallelIteratorIterFunc iter_func;
+  TaskParallelIteratorFunc func;
+
+  /* *** Data used to 'acquire' chunks of items from the iterator. *** */
+  /* Common data also passed to the generator callback. */
+  TaskParallelIteratorStateShared iter_shared;
+  /* Total number of items. If unknown, set it to a negative number. */
+  int tot_items;
+} TaskParallelIteratorState;
+
+BLI_INLINE void task_parallel_iterator_calc_chunk_size(const TaskParallelSettings *settings,
+                                                       const int num_tasks,
+                                                       TaskParallelIteratorState *state)
+{
+  task_parallel_calc_chunk_size(
+      settings, state->tot_items, num_tasks, &state->iter_shared.chunk_size);
+}
 
-  int chunk_size;
-  int index;
-  Link *link;
-  SpinLock lock;
-} ParallelListState;
-
-BLI_INLINE Link *parallel_listbase_next_iter_get(ParallelListState *__restrict state,
-                                                 int *__restrict index,
-                                                 int *__restrict count)
+static void parallel_iterator_func_do(TaskParallelIteratorState *__restrict state,
+                                      void *userdata_chunk,
+                                      int threadid)
 {
-  int task_count = 0;
-  BLI_spin_lock(&state->lock);
-  Link *result = state->link;
-  if (LIKELY(result != NULL)) {
-    *index = state->index;
-    while (state->link != NULL && task_count < state->chunk_size) {
-      task_count++;
-      state->link = state->link->next;
+  TaskParallelTLS tls = {
+      .thread_id = threadid,
+      .userdata_chunk = userdata_chunk,
+  };
+
+  void **current_chunk_items;
+  int *current_chunk_indices;
+  int current_chunk_size;
+
+  const size_t items_size = sizeof(*current_chunk_items) * (size_t)state->iter_shared.chunk_size;
+  const size_t indices_size = sizeof(*current_chunk_indices) *
+                              (size_t)state->iter_shared.chunk_size;
+
+  current_chunk_items = MALLOCA(items_size);
+  current_chunk_indices = MALLOCA(indices_size);
+  current_chunk_size = 0;
+
+  for (bool do_abort = false; !do_abort;) {
+    if (state->iter_shared.spin_lock != NULL) {
+      BLI_spin_lock(state->iter_shared.spin_lock);
+    }
+
+    /* Get current status. */
+    int index = state->iter_shared.next_index;
+    void *item = state->iter_shared.next_item;
+    int i;
+
+    /* 'Acquire' a chunk of items from the iterator function. */
+    for (i = 0; i < state->iter_shared.chunk_size && !state->iter_shared.is_finished; i++) {
+      current_chunk_indices[i] = index;
+      current_chunk_items[i] = item;
+      state->iter_func(state->userdata, &tls, &item, &index, &state->iter_shared.is_finished);
+    }
+
+    /* Update current status. */
+    state->iter_shared.next_index = index;
+    state->iter_shared.next_item = item;
+    current_chunk_size = i;
+
+    do_abort = state->iter_shared.is_finished;
+
+    if (state->iter_shared.spin_lock != NULL) {
+      BLI_spin_unlock(state->iter_shared.spin_lock);
+    }
+
+    for (i = 0; i < current_chunk_size; ++i) {
+      state->func(state->userdata, current_chunk_items[i], current_chunk_indices[i], &tls);
     }
-    state->index += task_count;
   }
-  BLI_spin_unlock(&state->lock);
-  *count = task_count;
-  return result;
+
+  MALLOCA_FREE(current_chunk_items, items_size);
+  MALLOCA_FREE(current_chunk_indices, indices_size);
 }
 
-static void parallel_listbase_func(TaskPool *__restrict pool,
-                                   void *UNUSED(taskdata),
-                                   int UNUSED(threadid))
+static void parallel_iterator_func(TaskPool *__restrict pool, void *userdata_chunk, int threadid)
 {
-  ParallelListState *__restrict state = BLI_task_pool_userdata(pool);
-  Link *link;
-  int index, count;
+  TaskParallelIteratorState *__restrict state = BLI_task_pool_userdata(pool);
 
-  while ((link = parallel_listbase_next_iter_get(state, &index, &count)) != NULL) {
-    for 

@@ Diff output truncated at 10240 characters. @@



More information about the Bf-blender-cvs mailing list