[Bf-blender-cvs] [e0d486eb36] unlock_task_scheduler: Attempt to address performances issues of task scheduler with lots of very small tasks.

Bastien Montagne noreply at git.blender.org
Fri Mar 3 17:49:01 CET 2017


Commit: e0d486eb368dc1a0df2c4ab4a99f9bc29e2dd7db
Author: Bastien Montagne
Date:   Wed Dec 21 20:33:42 2016 +0100
Branches: unlock_task_scheduler
https://developer.blender.org/rBe0d486eb368dc1a0df2c4ab4a99f9bc29e2dd7db

Attempt to address performances issues of task scheduler with lots of very small tasks.

This is partially  based on Sergey's work from D2421, but pushing the things
a bit further. Basically:
- We keep a sheduler-counter of TODO tasks, which avoids us to do any
locking (even of the spinlock) when queue is empty, in workers.
- We spin/nanosleep a bit (less than a ms) when we cannot find a task,
before going into real condition-waiting sleep.
- We keep a counter of condition-sleeping threads, and only use
condition notifications in case we know some are waiting on it.

In other words, when no tasks are available, we spend a bit of time in a
rather high-activity but very cheap and totally lock-free loop, before
going into more expansive real condition-waiting sleep.

No noticeable speedup in complex production scene (barbershop one), here
master, D2421 and this code give roughly same performances (about 30%
slower in new than in old despgraph).

But with testfile from T50027 and new depsgraph, after initial bake,
with master I have ~14fps, with D2421 ~14.5fps, and with this code ~19.5fps.

Note that in theory, we could get completely rid of condition and stay
in the nanosleep loop, but this implies some rather high 'noise' (about
3% of CPU usage here with 8 cores), and going into condition-waiting
state after a few hundreds of micro-seconds does not give any measurable
slow down for me.

Also note that this code is only working on POSIX systems (so no Windows, not
sure how to do our nanosleeps on this OS :/ ).

Reviewers: sergey

Differential Revision: https://developer.blender.org/D2426

===================================================================

M	source/blender/blenlib/intern/task.c

===================================================================

diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c
index 91e821a8f1..21c02c31a9 100644
--- a/source/blender/blenlib/intern/task.c
+++ b/source/blender/blenlib/intern/task.c
@@ -35,6 +35,8 @@
 #include "BLI_task.h"
 #include "BLI_threads.h"
 
+#include "PIL_time.h"
+
 #include "atomic_ops.h"
 
 /* Define this to enable some detailed statistic print. */
@@ -48,6 +50,10 @@
  */
 #define MEMPOOL_SIZE 256
 
+/* Parameters controlling how much we spin in nanosleeps before switching to real condition-controlled sleeping. */
+#define NANOSLEEP_MAX_SPINNING 1000   /* Number of failed attempt to get a task before going to condition waiting. */
+#define NANOSLEEP_DURATION (const struct timespec[]){{0, 200L}}  /* Nanosleep duration (in nano-seconds). */
+
 typedef struct Task {
 	struct Task *next, *prev;
 
@@ -105,11 +111,9 @@ typedef struct TaskMemPoolStats {
 struct TaskPool {
 	TaskScheduler *scheduler;
 
-	volatile size_t num;
+	size_t num;
 	size_t num_threads;
 	size_t currently_running_tasks;
-	ThreadMutex num_mutex;
-	ThreadCondition num_cond;
 
 	void *userdata;
 	ThreadMutex user_mutex;
@@ -146,10 +150,14 @@ struct TaskScheduler {
 	bool background_thread_only;
 
 	ListBase queue;
-	ThreadMutex queue_mutex;
-	ThreadCondition queue_cond;
+	size_t num_queued;
+	SpinLock queue_spinlock;
+
+	ThreadMutex workers_mutex;
+	ThreadCondition workers_condition;
+	size_t workers_sleeping;
 
-	volatile bool do_exit;
+	uint8_t do_exit;
 };
 
 typedef struct TaskThread {
@@ -231,36 +239,33 @@ static void task_free(TaskPool *pool, Task *task, const int thread_id)
 
 static void task_pool_num_decrease(TaskPool *pool, size_t done)
 {
-	BLI_mutex_lock(&pool->num_mutex);
-
 	BLI_assert(pool->num >= done);
 
-	pool->num -= done;
+	atomic_sub_and_fetch_z(&pool->num, done);
 	atomic_sub_and_fetch_z(&pool->currently_running_tasks, done);
 
-	if (pool->num == 0)
-		BLI_condition_notify_all(&pool->num_cond);
-
-	BLI_mutex_unlock(&pool->num_mutex);
+	if (pool->num == 0 && pool->scheduler->workers_sleeping != 0) {
+		BLI_mutex_lock(&pool->scheduler->workers_mutex);
+		BLI_condition_notify_all(&pool->scheduler->workers_condition);
+		BLI_mutex_unlock(&pool->scheduler->workers_mutex);
+	}
 }
 
 static void task_pool_num_increase(TaskPool *pool)
 {
-	BLI_mutex_lock(&pool->num_mutex);
-
-	pool->num++;
-	BLI_condition_notify_all(&pool->num_cond);
+	atomic_add_and_fetch_z(&pool->num, 1);
 
-	BLI_mutex_unlock(&pool->num_mutex);
+	if (pool->scheduler->workers_sleeping != 0) {
+		BLI_mutex_lock(&pool->scheduler->workers_mutex);
+		BLI_condition_notify_all(&pool->scheduler->workers_condition);
+		BLI_mutex_unlock(&pool->scheduler->workers_mutex);
+	}
 }
 
 static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task)
 {
 	bool found_task = false;
-	BLI_mutex_lock(&scheduler->queue_mutex);
-
-	while (!scheduler->queue.first && !scheduler->do_exit)
-		BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
+	int loop_count = 0;
 
 	do {
 		Task *current_task;
@@ -276,38 +281,52 @@ static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task
 		 * So we only abort here if do_exit is set.
 		 */
 		if (scheduler->do_exit) {
-			BLI_mutex_unlock(&scheduler->queue_mutex);
 			return false;
 		}
 
-		for (current_task = scheduler->queue.first;
-		     current_task != NULL;
-		     current_task = current_task->next)
-		{
-			TaskPool *pool = current_task->pool;
+		if (scheduler->num_queued != 0) {
+			BLI_spin_lock(&scheduler->queue_spinlock);
+			for (current_task = scheduler->queue.first;
+				 current_task != NULL;
+				 current_task = current_task->next)
+			{
+				TaskPool *pool = current_task->pool;
+
+				if (scheduler->background_thread_only && !pool->run_in_background) {
+					continue;
+				}
 
-			if (scheduler->background_thread_only && !pool->run_in_background) {
-				continue;
+				if (atomic_add_and_fetch_z(&pool->currently_running_tasks, 1) <= pool->num_threads ||
+					pool->num_threads == 0)
+				{
+					*task = current_task;
+					found_task = true;
+					BLI_remlink(&scheduler->queue, *task);
+					atomic_sub_and_fetch_z(&scheduler->num_queued, 1);
+					break;
+				}
+				else {
+					atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1);
+				}
 			}
+			BLI_spin_unlock(&scheduler->queue_spinlock);
+		}
 
-			if (atomic_add_and_fetch_z(&pool->currently_running_tasks, 1) <= pool->num_threads ||
-			    pool->num_threads == 0)
-			{
-				*task = current_task;
-				found_task = true;
-				BLI_remlink(&scheduler->queue, *task);
-				break;
+		if (!found_task) {
+			if (++loop_count > NANOSLEEP_MAX_SPINNING) {
+				BLI_mutex_lock(&scheduler->workers_mutex);
+				atomic_add_and_fetch_z(&scheduler->workers_sleeping, 1);
+				BLI_condition_wait(&scheduler->workers_condition, &scheduler->workers_mutex);
+				atomic_sub_and_fetch_z(&scheduler->workers_sleeping, 1);
+				BLI_mutex_unlock(&scheduler->workers_mutex);
 			}
 			else {
-				atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1);
+				nanosleep(NANOSLEEP_DURATION, NULL);
 			}
+//			PIL_sleep_ms(1);
 		}
-		if (!found_task)
-			BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
 	} while (!found_task);
 
-	BLI_mutex_unlock(&scheduler->queue_mutex);
-
 	return true;
 }
 
@@ -341,11 +360,13 @@ TaskScheduler *BLI_task_scheduler_create(int num_threads)
 
 	/* multiple places can use this task scheduler, sharing the same
 	 * threads, so we keep track of the number of users. */
-	scheduler->do_exit = false;
+	scheduler->do_exit = 0;
 
 	BLI_listbase_clear(&scheduler->queue);
-	BLI_mutex_init(&scheduler->queue_mutex);
-	BLI_condition_init(&scheduler->queue_cond);
+	BLI_spin_init(&scheduler->queue_spinlock);
+
+	BLI_mutex_init(&scheduler->workers_mutex);
+	BLI_condition_init(&scheduler->workers_condition);
 
 	if (num_threads == 0) {
 		/* automatic number of threads will be main thread + num cores */
@@ -391,10 +412,12 @@ void BLI_task_scheduler_free(TaskScheduler *scheduler)
 	Task *task;
 
 	/* stop all waiting threads */
-	BLI_mutex_lock(&scheduler->queue_mutex);
-	scheduler->do_exit = true;
-	BLI_condition_notify_all(&scheduler->queue_cond);
-	BLI_mutex_unlock(&scheduler->queue_mutex);
+	atomic_fetch_and_or_uint8(&scheduler->do_exit, 1);
+	if (scheduler->workers_sleeping != 0) {
+		BLI_mutex_lock(&scheduler->workers_mutex);
+		BLI_condition_notify_all(&scheduler->workers_condition);
+		BLI_mutex_unlock(&scheduler->workers_mutex);
+	}
 
 	/* delete threads */
 	if (scheduler->threads) {
@@ -430,8 +453,10 @@ void BLI_task_scheduler_free(TaskScheduler *scheduler)
 	BLI_freelistN(&scheduler->queue);
 
 	/* delete mutex/condition */
-	BLI_mutex_end(&scheduler->queue_mutex);
-	BLI_condition_end(&scheduler->queue_cond);
+	BLI_spin_end(&scheduler->queue_spinlock);
+
+	BLI_mutex_end(&scheduler->workers_mutex);
+	BLI_condition_end(&scheduler->workers_condition);
 
 	MEM_freeN(scheduler);
 }
@@ -446,15 +471,16 @@ static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriori
 	task_pool_num_increase(task->pool);
 
 	/* add task to queue */
-	BLI_mutex_lock(&scheduler->queue_mutex);
+	BLI_spin_lock(&scheduler->queue_spinlock);
 
 	if (priority == TASK_PRIORITY_HIGH)
 		BLI_addhead(&scheduler->queue, task);
 	else
 		BLI_addtail(&scheduler->queue, task);
 
-	BLI_condition_notify_one(&scheduler->queue_cond);
-	BLI_mutex_unlock(&scheduler->queue_mutex);
+	BLI_spin_unlock(&scheduler->queue_spinlock);
+
+	atomic_add_and_fetch_z(&scheduler->num_queued, 1);
 }
 
 static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool)
@@ -462,7 +488,7 @@ static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool)
 	Task *task, *nexttask;
 	size_t done = 0;
 
-	BLI_mutex_lock(&scheduler->queue_mutex);
+	BLI_spin_lock(&scheduler->queue_spinlock);
 
 	/* free all tasks from this pool from the queue */
 	for (task = scheduler->queue.first; task; task = nexttask) {
@@ -471,12 +497,13 @@ static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool)
 		if (task->pool == pool) {
 			task_data_free(task, 0);
 			BLI_freelinkN(&scheduler->queue, task);
+			atomic_sub_and_fetch_z(&scheduler->num_queued, 1);
 
 			done++;
 		}
 	}
 
-	BLI_mutex_unlock(&scheduler->queue_mutex);
+	BLI_spin_unlock(&scheduler->queue_spinlock);
 
 	/* notify done */
 	task_pool_num_decrease(pool, done);
@@ -507,9 +534,6 @@ static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, c
 	pool->do_cancel = false;
 	pool->run_in_background = is_background;
 
-	BLI_mutex_init(&pool->num_mutex);
-	BLI_condition_init(&pool->num_cond);
-
 	pool->userdata = userdata;
 	BLI_mutex_init(&pool->user_mutex);
 
@@ -567,9 +591,6 @@ void BLI_task_pool_free(TaskPool *pool)
 {
 	BLI_task_pool_cancel(pool);
 
-	BLI_mutex_end(&pool->num_mutex);
-	BLI_condition_end(&pool->num_cond);
-
 	BLI_mutex_end(&pool->user_mutex);
 
 	/* Free local memory pool, those pointers are lost forever. */
@@ -634,35 +655,31 @@ void BLI_task_pool_push_from_thread(TaskPool *pool, TaskRunFunction run,
 void BLI_task_pool_work_and_wait(TaskPool *pool)
 {
 	TaskScheduler *scheduler = pool->scheduler;
-
-	BLI_mutex_lock(&pool->num_mutex);
+	int loop_count = 0;
 
 	while (pool->num != 0) {
 		Task *task, *work_task = NULL;
 		bool found_task = false;
 
-		BLI_mutex_unlock(&pool->num_mutex);
-
-		BLI_mutex_lock(&scheduler->queue_mutex);
-
 		/* find task from this pool. if we get a task from another pool,
 		 * we can get into deadlock */
 
 		if (pool->num_threads == 0 ||
 		    pool->currently_running_tasks < pool->num_threads)
 		{
+			BLI_spin_lock(&scheduler->queue_spinlock);
 			for (task = scheduler->queue.first; task; task = task->next) {
 				if (task->pool == pool) {
 					work_task = task;
 					found_task = true;
 					BLI_remlink(&scheduler->queue, task);
+					atomic_sub_and_fetch_z(&scheduler->num_queued, 1);
 					break;
 				}
 			}
+			BLI_spin_unlock(&scheduler->queue_spinlock);
 		}
 
-		BLI_mutex_unlock(&scheduler->queue_mutex);
-
 		/* if found task, do it, otherwise wait until other tasks are done */
 		if (found_task) {
 			/* run task */
@@ -674,17 +691,27 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
 
 			/* 

@@ Diff output truncated at 10240 characters. @@




More information about the Bf-blender-cvs mailing list