[Bf-blender-cvs] [a481908232e] master: Task scheduler: Optimize subsequent pushing bunch of tasks

Sergey Sharybin noreply at git.blender.org
Wed May 31 15:48:10 CEST 2017


Commit: a481908232ef20449e6ad6951769677e0b108ca8
Author: Sergey Sharybin
Date:   Wed May 31 15:24:09 2017 +0200
Branches: master
https://developer.blender.org/rBa481908232ef20449e6ad6951769677e0b108ca8

Task scheduler: Optimize subsequent pushing bunch of tasks

The idea is to accumulate all new tasks in a thread local queue
first without doing any thread synchronization (aka, locks and
conditional variables) and move those tasks to a scheduler queue
once they are all ready. This way we avoid per-task-pool lock
and only have one lock per bunch of tasks.

This is particularly handy when scheduling new dependency graph
node children. Brings FPS of cached simulation from the linked
below file from ~30 to ~50.

See documentation for BLI_task_pool_delayed_push_{begin, end}
and for TaskThreadLocalStorage::do_delayed_push.

Fixes T50027: Rigidbody playback and simulation performance regression with new depsgraph

Thanks Bastien for the review!

===================================================================

M	source/blender/blenlib/BLI_task.h
M	source/blender/blenlib/intern/task.c
M	source/blender/depsgraph/intern/eval/deg_eval.cc

===================================================================

diff --git a/source/blender/blenlib/BLI_task.h b/source/blender/blenlib/BLI_task.h
index c3c587275e1..721327d26a8 100644
--- a/source/blender/blenlib/BLI_task.h
+++ b/source/blender/blenlib/BLI_task.h
@@ -106,6 +106,13 @@ void *BLI_task_pool_userdata(TaskPool *pool);
 /* optional mutex to use from run function */
 ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool);
 
+/* Delayed push, use that to reduce thread overhead by accumulating
+ * all new tasks into local queue first and pushing it to scheduler
+ * from within a single mutex lock.
+ */
+void BLI_task_pool_delayed_push_begin(TaskPool *pool, int thread_id);
+void BLI_task_pool_delayed_push_end(TaskPool *pool, int thread_id);
+
 /* Parallel for routines */
 typedef void (*TaskParallelRangeFunc)(void *userdata, const int iter);
 typedef void (*TaskParallelRangeFuncEx)(void *userdata, void *userdata_chunk, const int iter, const int thread_id);
diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c
index 3766a72319b..a1eae8f1955 100644
--- a/source/blender/blenlib/intern/task.c
+++ b/source/blender/blenlib/intern/task.c
@@ -54,6 +54,13 @@
  */
 #define LOCAL_QUEUE_SIZE 1
 
+/* Number of tasks which are allowed to be scheduled in a delayed manner.
+ *
+ * This allows to use less locks per graph node children schedule. More details
+ * could be found at TaskThreadLocalStorage::do_delayed_push.
+ */
+#define DELAYED_QUEUE_SIZE 4096
+
 #ifndef NDEBUG
 #  define ASSERT_THREAD_ID(scheduler, thread_id)                              \
 	do {                                                                      \
@@ -129,9 +136,28 @@ typedef struct TaskMemPoolStats {
 #endif
 
 typedef struct TaskThreadLocalStorage {
+	/* Memory pool for faster task allocation.
+	 * The idea is to re-use memory of finished/discarded tasks by this thread.
+	 */
 	TaskMemPool task_mempool;
+
+	/* Local queue keeps thread alive by keeping small amount of tasks ready
+	 * to be picked up without causing global thread locks for synchronization.
+	 */
 	int num_local_queue;
 	Task *local_queue[LOCAL_QUEUE_SIZE];
+
+	/* Thread can be marked for delayed tasks push. This is helpful when it's
+	 * know that lots of subsequent task pushed will happen from the same thread
+	 * without "interrupting" for task execution.
+	 *
+	 * We try to accumulate as much tasks as possible in a local queue without
+	 * any locks first, and then we push all of them into a scheduler's queue
+	 * from within a single mutex lock.
+	 */
+	bool do_delayed_push;
+	int num_delayed_queue;
+	Task *delayed_queue[DELAYED_QUEUE_SIZE];
 } TaskThreadLocalStorage;
 
 struct TaskPool {
@@ -378,6 +404,7 @@ static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task
 BLI_INLINE void handle_local_queue(TaskThreadLocalStorage *tls,
                                    const int thread_id)
 {
+	BLI_assert(!tls->do_delayed_push);
 	while (tls->num_local_queue > 0) {
 		/* We pop task from queue before handling it so handler of the task can
 		 * push next job to the local queue.
@@ -391,6 +418,7 @@ BLI_INLINE void handle_local_queue(TaskThreadLocalStorage *tls,
 		local_task->run(local_pool, local_task->taskdata, thread_id);
 		task_free(local_pool, local_task, thread_id);
 	}
+	BLI_assert(!tls->do_delayed_push);
 }
 
 static void *task_scheduler_thread_run(void *thread_p)
@@ -408,7 +436,9 @@ static void *task_scheduler_thread_run(void *thread_p)
 		TaskPool *pool = task->pool;
 
 		/* run task */
+		BLI_assert(!tls->do_delayed_push);
 		task->run(pool, task->taskdata, thread_id);
+		BLI_assert(!tls->do_delayed_push);
 
 		/* delete task */
 		task_free(pool, task, thread_id);
@@ -547,6 +577,27 @@ static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriori
 	BLI_mutex_unlock(&scheduler->queue_mutex);
 }
 
+static void task_scheduler_push_all(TaskScheduler *scheduler,
+                                    TaskPool *pool,
+                                    Task **tasks,
+                                    int num_tasks)
+{
+	if (num_tasks == 0) {
+		return;
+	}
+
+	task_pool_num_increase(pool, num_tasks);
+
+	BLI_mutex_lock(&scheduler->queue_mutex);
+
+	for (int i = 0; i < num_tasks; i++) {
+		BLI_addhead(&scheduler->queue, tasks[i]);
+	}
+
+	BLI_condition_notify_all(&scheduler->queue_cond);
+	BLI_mutex_unlock(&scheduler->queue_mutex);
+}
+
 static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool)
 {
 	Task *task, *nexttask;
@@ -714,38 +765,59 @@ void BLI_task_pool_free(TaskPool *pool)
 	BLI_end_threaded_malloc();
 }
 
+BLI_INLINE bool task_can_use_local_queues(TaskPool *pool, int thread_id)
+{
+	return (thread_id != -1 && (thread_id != pool->thread_id || pool->do_work));
+}
+
 static void task_pool_push(
         TaskPool *pool, TaskRunFunction run, void *taskdata,
         bool free_taskdata, TaskFreeFunction freedata, TaskPriority priority,
         int thread_id)
 {
+	/* Allocate task and fill it's properties. */
 	Task *task = task_alloc(pool, thread_id);
-
 	task->run = run;
 	task->taskdata = taskdata;
 	task->free_taskdata = free_taskdata;
 	task->freedata = freedata;
 	task->pool = pool;
-
+	/* For suspended pools we put everything yo a global queue first
+	 * and exit as soon as possible.
+	 *
+	 * This tasks will be moved to actual execution when pool is
+	 * activated by work_and_wait().
+	 */
 	if (pool->is_suspended) {
 		BLI_addhead(&pool->suspended_queue, task);
 		atomic_fetch_and_add_z(&pool->num_suspended, 1);
 		return;
 	}
-
-	if (thread_id != -1 &&
-	    (thread_id != pool->thread_id || pool->do_work))
-	{
+	/* Populate to any local queue first, this is cheapest push ever. */
+	if (task_can_use_local_queues(pool, thread_id)) {
 		ASSERT_THREAD_ID(pool->scheduler, thread_id);
-
 		TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
+		/* Try to push to a local execution queue.
+		 * These tasks will be picked up next.
+		 */
 		if (tls->num_local_queue < LOCAL_QUEUE_SIZE) {
 			tls->local_queue[tls->num_local_queue] = task;
 			tls->num_local_queue++;
 			return;
 		}
+		/* If we are in the delayed tasks push mode, we push tasks to a
+		 * temporary local queue first without any locks, and then move them
+		 * to global execution queue with a single lock.
+		 */
+		if (tls->do_delayed_push && tls->num_delayed_queue < DELAYED_QUEUE_SIZE) {
+			tls->delayed_queue[tls->num_delayed_queue] = task;
+			tls->num_delayed_queue++;
+			return;
+		}
 	}
-
+	/* Do push to a global execution ppol, slowest possible method,
+	 * causes quite reasonable amount of threading overhead.
+	 */
 	task_scheduler_push(pool->scheduler, task, priority);
 }
 
@@ -816,7 +888,9 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
 		/* if found task, do it, otherwise wait until other tasks are done */
 		if (found_task) {
 			/* run task */
+			BLI_assert(!tls->do_delayed_push);
 			work_task->run(pool, work_task->taskdata, pool->thread_id);
+			BLI_assert(!tls->do_delayed_push);
 
 			/* delete task */
 			task_free(pool, task, pool->thread_id);
@@ -871,6 +945,30 @@ ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool)
 	return &pool->user_mutex;
 }
 
+void BLI_task_pool_delayed_push_begin(TaskPool *pool, int thread_id)
+{
+	if (task_can_use_local_queues(pool, thread_id)) {
+		ASSERT_THREAD_ID(pool->scheduler, thread_id);
+		TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
+		tls->do_delayed_push = true;
+	}
+}
+
+void BLI_task_pool_delayed_push_end(TaskPool *pool, int thread_id)
+{
+	if (task_can_use_local_queues(pool, thread_id)) {
+		ASSERT_THREAD_ID(pool->scheduler, thread_id);
+		TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
+		BLI_assert(tls->do_delayed_push);
+		task_scheduler_push_all(pool->scheduler,
+		                        pool,
+		                        tls->delayed_queue,
+		                        tls->num_delayed_queue);
+		tls->do_delayed_push = false;
+		tls->num_delayed_queue = 0;
+	}
+}
+
 /* Parallel range routines */
 
 /**
diff --git a/source/blender/depsgraph/intern/eval/deg_eval.cc b/source/blender/depsgraph/intern/eval/deg_eval.cc
index e739bc9dbb5..54947ddbb5e 100644
--- a/source/blender/depsgraph/intern/eval/deg_eval.cc
+++ b/source/blender/depsgraph/intern/eval/deg_eval.cc
@@ -126,7 +126,9 @@ static void deg_task_run_func(TaskPool *pool,
 #endif
 	}
 
+	BLI_task_pool_delayed_push_begin(pool, thread_id);
 	schedule_children(pool, state->graph, node, state->layers, thread_id);
+	BLI_task_pool_delayed_push_end(pool, thread_id);
 }
 
 typedef struct CalculatePengindData {




More information about the Bf-blender-cvs mailing list