[Bf-blender-cvs] [599f314f1d] unlock_task_scheduler: Cleanup, factorization, comments, and some fixes for potential issues.

Bastien Montagne noreply at git.blender.org
Fri Mar 3 17:49:03 CET 2017


Commit: 599f314f1da2ece12c3ed4ed0dcf6037ef3ad355
Author: Bastien Montagne
Date:   Thu Dec 22 00:08:43 2016 +0100
Branches: unlock_task_scheduler
https://developer.blender.org/rB599f314f1da2ece12c3ed4ed0dcf6037ef3ad355

Cleanup, factorization, comments, and some fixes for potential issues.

===================================================================

M	source/blender/blenlib/intern/task.c

===================================================================

diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c
index 21c02c31a9..d1e27c97ab 100644
--- a/source/blender/blenlib/intern/task.c
+++ b/source/blender/blenlib/intern/task.c
@@ -241,10 +241,13 @@ static void task_pool_num_decrease(TaskPool *pool, size_t done)
 {
 	BLI_assert(pool->num >= done);
 
-	atomic_sub_and_fetch_z(&pool->num, done);
-	atomic_sub_and_fetch_z(&pool->currently_running_tasks, done);
+	const size_t num = atomic_sub_and_fetch_z(&pool->num, done);
 
-	if (pool->num == 0 && pool->scheduler->workers_sleeping != 0) {
+	/* This is needed for several things:
+	 *   - Wake up all sleeping threads on exit, before we join them.
+	 *   - Wake up 'main' thread itself in case it called BLI_task_pool_work_and_wait() and ended up sleeping there.
+	 *   - Wake up 'main' thread itself in case it called BLI_task_pool_cancel() and ended up sleeping there. */
+	if (num == 0 && pool->scheduler->workers_sleeping != 0) {
 		BLI_mutex_lock(&pool->scheduler->workers_mutex);
 		BLI_condition_notify_all(&pool->scheduler->workers_condition);
 		BLI_mutex_unlock(&pool->scheduler->workers_mutex);
@@ -257,19 +260,95 @@ static void task_pool_num_increase(TaskPool *pool)
 
 	if (pool->scheduler->workers_sleeping != 0) {
 		BLI_mutex_lock(&pool->scheduler->workers_mutex);
+		/* NOTE: Even tho it's only single task added here we notify all threads.
+		 * The reason for that is because there might be much more tasks coming
+		 * right after this one, so waking up all threads earlier seems to give
+		 * a bit less of threading overhead.
+		 */
 		BLI_condition_notify_all(&pool->scheduler->workers_condition);
 		BLI_mutex_unlock(&pool->scheduler->workers_mutex);
 	}
 }
 
+BLI_INLINE bool task_find(TaskScheduler *scheduler, Task **task, TaskPool *pool)
+{
+	Task *current_task;
+	bool found_task = false;
+
+	/* This check allows us to completely avoid a spinlock in case the queue is reported empty.
+	 * There is a possibility of race condition here (check being done after task has been added to queue,
+	 * and before counter is increased), but this should not be an issue in practice, quite unlikely and
+	 * would just delay a bit that thread going back to work. */
+	if (scheduler->num_queued != 0) {
+		/* NOTE: We almost always do single iteration here, so spin time is most of the time is really low. */
+		BLI_spin_lock(&scheduler->queue_spinlock);
+		for (current_task = scheduler->queue.first;
+			 current_task != NULL;
+			 current_task = current_task->next)
+		{
+			TaskPool *current_pool = current_task->pool;
+
+			if (!ELEM(pool, NULL, current_pool)) {
+				continue;
+			}
+
+			if (scheduler->background_thread_only && !current_pool->run_in_background) {
+				continue;
+			}
+
+			if (atomic_add_and_fetch_z(&current_pool->currently_running_tasks, 1) <= current_pool->num_threads ||
+				current_pool->num_threads == 0)
+			{
+				*task = current_task;
+				found_task = true;
+				BLI_remlink(&scheduler->queue, *task);
+				atomic_sub_and_fetch_z(&scheduler->num_queued, 1);
+				break;
+			}
+			else {
+				atomic_sub_and_fetch_z(&current_pool->currently_running_tasks, 1);
+			}
+		}
+		BLI_spin_unlock(&scheduler->queue_spinlock);
+	}
+	return found_task;
+}
+
+BLI_INLINE bool task_wait(TaskScheduler *scheduler, int *loop_count)
+{
+	/* If we have iterated NANOSLEEP_MAX_SPINNING times without finding a task, go into real sleep. */
+	if (++(*loop_count) > NANOSLEEP_MAX_SPINNING) {
+		BLI_mutex_lock(&scheduler->workers_mutex);
+
+		/* Check again inside the mutex, bad race condition is possible here (though unlikely),
+		 * leading to undying thread... */
+		if (scheduler->do_exit) {
+			BLI_mutex_unlock(&scheduler->workers_mutex);
+			return true;
+		}
+
+		/* Even though this is read outside of mutex lock, there is no real need to use atomic ops here,
+		 * changing the value inside mutex should be enough to ensure safety. */
+		scheduler->workers_sleeping++;
+
+		BLI_condition_wait(&scheduler->workers_condition, &scheduler->workers_mutex);
+
+		scheduler->workers_sleeping--;
+
+		BLI_mutex_unlock(&scheduler->workers_mutex);
+	}
+	else {
+		nanosleep(NANOSLEEP_DURATION, NULL);
+	}
+	return false;
+}
+
 static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task)
 {
 	bool found_task = false;
 	int loop_count = 0;
 
 	do {
-		Task *current_task;
-
 		/* Assuming we can only have a void queue in 'exit' case here seems logical (we should only be here after
 		 * our worker thread has been woken up from a condition_wait(), which only happens after a new task was
 		 * added to the queue), but it is wrong.
@@ -284,46 +363,10 @@ static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task
 			return false;
 		}
 
-		if (scheduler->num_queued != 0) {
-			BLI_spin_lock(&scheduler->queue_spinlock);
-			for (current_task = scheduler->queue.first;
-				 current_task != NULL;
-				 current_task = current_task->next)
-			{
-				TaskPool *pool = current_task->pool;
-
-				if (scheduler->background_thread_only && !pool->run_in_background) {
-					continue;
-				}
-
-				if (atomic_add_and_fetch_z(&pool->currently_running_tasks, 1) <= pool->num_threads ||
-					pool->num_threads == 0)
-				{
-					*task = current_task;
-					found_task = true;
-					BLI_remlink(&scheduler->queue, *task);
-					atomic_sub_and_fetch_z(&scheduler->num_queued, 1);
-					break;
-				}
-				else {
-					atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1);
-				}
-			}
-			BLI_spin_unlock(&scheduler->queue_spinlock);
-		}
-
-		if (!found_task) {
-			if (++loop_count > NANOSLEEP_MAX_SPINNING) {
-				BLI_mutex_lock(&scheduler->workers_mutex);
-				atomic_add_and_fetch_z(&scheduler->workers_sleeping, 1);
-				BLI_condition_wait(&scheduler->workers_condition, &scheduler->workers_mutex);
-				atomic_sub_and_fetch_z(&scheduler->workers_sleeping, 1);
-				BLI_mutex_unlock(&scheduler->workers_mutex);
+		if (!(found_task = task_find(scheduler, task, NULL))) {
+			if (task_wait(scheduler, &loop_count)) {
+				return false;
 			}
-			else {
-				nanosleep(NANOSLEEP_DURATION, NULL);
-			}
-//			PIL_sleep_ms(1);
 		}
 	} while (!found_task);
 
@@ -347,6 +390,8 @@ static void *task_scheduler_thread_run(void *thread_p)
 		/* delete task */
 		task_free(pool, task, thread_id);
 
+		atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1);
+
 		/* notify pool task was done */
 		task_pool_num_decrease(pool, 1);
 	}
@@ -468,8 +513,6 @@ int BLI_task_scheduler_num_threads(TaskScheduler *scheduler)
 
 static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriority priority)
 {
-	task_pool_num_increase(task->pool);
-
 	/* add task to queue */
 	BLI_spin_lock(&scheduler->queue_spinlock);
 
@@ -480,6 +523,7 @@ static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriori
 
 	BLI_spin_unlock(&scheduler->queue_spinlock);
 
+	task_pool_num_increase(task->pool);
 	atomic_add_and_fetch_z(&scheduler->num_queued, 1);
 }
 
@@ -497,7 +541,6 @@ static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool)
 		if (task->pool == pool) {
 			task_data_free(task, 0);
 			BLI_freelinkN(&scheduler->queue, task);
-			atomic_sub_and_fetch_z(&scheduler->num_queued, 1);
 
 			done++;
 		}
@@ -505,6 +548,8 @@ static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool)
 
 	BLI_spin_unlock(&scheduler->queue_spinlock);
 
+	atomic_sub_and_fetch_z(&scheduler->num_queued, done);
+
 	/* notify done */
 	task_pool_num_decrease(pool, done);
 }
@@ -658,58 +703,35 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
 	int loop_count = 0;
 
 	while (pool->num != 0) {
-		Task *task, *work_task = NULL;
+		Task *task;
 		bool found_task = false;
 
 		/* find task from this pool. if we get a task from another pool,
 		 * we can get into deadlock */
 
-		if (pool->num_threads == 0 ||
-		    pool->currently_running_tasks < pool->num_threads)
-		{
-			BLI_spin_lock(&scheduler->queue_spinlock);
-			for (task = scheduler->queue.first; task; task = task->next) {
-				if (task->pool == pool) {
-					work_task = task;
-					found_task = true;
-					BLI_remlink(&scheduler->queue, task);
-					atomic_sub_and_fetch_z(&scheduler->num_queued, 1);
-					break;
-				}
-			}
-			BLI_spin_unlock(&scheduler->queue_spinlock);
-		}
+		found_task = task_find(scheduler, &task, pool);
 
 		/* if found task, do it, otherwise wait until other tasks are done */
 		if (found_task) {
 			/* run task */
-			atomic_add_and_fetch_z(&pool->currently_running_tasks, 1);
-			work_task->run(pool, work_task->taskdata, 0);
+			task->run(pool, task->taskdata, 0);
 
 			/* delete task */
 			task_free(pool, task, 0);
 
+			atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1);
+
 			/* notify pool task was done */
 			task_pool_num_decrease(pool, 1);
 
-			/* Reset the 'failde' counter to zero. */
+			/* Reset the 'failed' counter to zero. */
 			loop_count = 0;
 		}
-
-		if (pool->num == 0)
+		else if (pool->num == 0) {
 			break;
-
-		if (!found_task) {
-			if (++loop_count > NANOSLEEP_MAX_SPINNING) {
-				BLI_mutex_lock(&scheduler->workers_mutex);
-				atomic_add_and_fetch_z(&scheduler->workers_sleeping, 1);
-				BLI_condition_wait(&scheduler->workers_condition, &scheduler->workers_mutex);
-				atomic_sub_and_fetch_z(&scheduler->workers_sleeping, 1);
-				BLI_mutex_unlock(&scheduler->workers_mutex);
-			}
-			else {
-				nanosleep(NANOSLEEP_DURATION, NULL);
-			}
+		}
+		else {
+			task_wait(scheduler, &loop_count);  /* We do not care about return value here. */
 		}
 	}
 }
@@ -730,9 +752,9 @@ void BLI_task_pool_cancel(TaskPool *pool)
 	while (pool->num) {
 		/* No real point in spinning here... */
 		BLI_mutex_lock(&pool->scheduler->workers_mutex);
-		atomic_add_and_fetch_z(&pool->scheduler->workers_sleeping, 1);
+		pool->scheduler->workers_sleeping++;
 		BLI_condition_wait(&pool->scheduler->workers_condition, &pool->scheduler->workers_mutex);
-		atomic_sub_and_fetch_z(&pool->scheduler->workers_sleeping, 1);
+		pool->scheduler->workers_sleeping--;
 		BLI_mutex_unlock(&pool->scheduler->workers_mutex);
 	}




More information about the Bf-blender-cvs mailing list