[Bf-blender-cvs] [7b0c529] master: Task scheduler: Add an option to limit number of threads per pool

Sergey Sharybin noreply at git.blender.org
Fri Nov 21 11:32:10 CET 2014


Commit: 7b0c529fe211e4481fe8e459cbf11159857c611d
Author: Sergey Sharybin
Date:   Fri Nov 21 11:31:30 2014 +0100
Branches: master
https://developer.blender.org/rB7b0c529fe211e4481fe8e459cbf11159857c611d

Task scheduler: Add an option to limit number of threads per pool

This way we can have scheduler capable of scheduling tasks on all the CPUs
but in the same time we can limit tasks like baking (in the future) to use
no more than given number of threads.

===================================================================

M	source/blender/blenlib/BLI_task.h
M	source/blender/blenlib/intern/task.c

===================================================================

diff --git a/source/blender/blenlib/BLI_task.h b/source/blender/blenlib/BLI_task.h
index 28da673..8f85bc4 100644
--- a/source/blender/blenlib/BLI_task.h
+++ b/source/blender/blenlib/BLI_task.h
@@ -88,6 +88,9 @@ void BLI_task_pool_cancel(TaskPool *pool);
 /* stop all worker threads */
 void BLI_task_pool_stop(TaskPool *pool);
 
+/* set number of threads allowed to be used by this pool */
+void BLI_pool_set_num_threads(TaskPool *pool, int num_threads);
+
 /* for worker threads, test if canceled */
 bool BLI_task_pool_canceled(TaskPool *pool);
 
diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c
index 219ccb1..e4cded1 100644
--- a/source/blender/blenlib/intern/task.c
+++ b/source/blender/blenlib/intern/task.c
@@ -49,6 +49,8 @@ struct TaskPool {
 
 	volatile size_t num;
 	volatile size_t done;
+	volatile int num_threads;
+	volatile int currently_running_tasks;
 	ThreadMutex num_mutex;
 	ThreadCondition num_cond;
 
@@ -84,6 +86,7 @@ static void task_pool_num_decrease(TaskPool *pool, size_t done)
 	BLI_assert(pool->num >= done);
 
 	pool->num -= done;
+	pool->currently_running_tasks -= done;
 	pool->done += done;
 
 	if (pool->num == 0)
@@ -104,19 +107,37 @@ static void task_pool_num_increase(TaskPool *pool)
 
 static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task)
 {
+	bool found_task = false;
 	BLI_mutex_lock(&scheduler->queue_mutex);
 
 	while (!scheduler->queue.first && !scheduler->do_exit)
 		BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
 
-	if (!scheduler->queue.first) {
-		BLI_mutex_unlock(&scheduler->queue_mutex);
-		BLI_assert(scheduler->do_exit);
-		return false;
-	}
-	
-	*task = scheduler->queue.first;
-	BLI_remlink(&scheduler->queue, *task);
+	do {
+		Task *current_task;
+		if (!scheduler->queue.first) {
+			BLI_mutex_unlock(&scheduler->queue_mutex);
+			BLI_assert(scheduler->do_exit);
+			return false;
+		}
+		for (current_task = scheduler->queue.first;
+		     current_task != NULL;
+		     current_task = current_task->next)
+		{
+			TaskPool *pool = current_task->pool;
+			if (pool->num_threads == 0 ||
+			    pool->currently_running_tasks < pool->num_threads)
+			{
+				*task = current_task;
+				found_task = true;
+				pool->currently_running_tasks++;
+				BLI_remlink(&scheduler->queue, *task);
+				break;
+			}
+		}
+		if (!found_task)
+			BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
+	} while (!found_task);
 
 	BLI_mutex_unlock(&scheduler->queue_mutex);
 
@@ -288,6 +309,8 @@ TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata)
 
 	pool->scheduler = scheduler;
 	pool->num = 0;
+	pool->num_threads = 0;
+	pool->currently_running_tasks = 0;
 	pool->do_cancel = false;
 
 	BLI_mutex_init(&pool->num_mutex);
@@ -351,12 +374,16 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
 		/* find task from this pool. if we get a task from another pool,
 		 * we can get into deadlock */
 
-		for (task = scheduler->queue.first; task; task = task->next) {
-			if (task->pool == pool) {
-				work_task = task;
-				found_task = true;
-				BLI_remlink(&scheduler->queue, task);
-				break;
+		if (pool->num_threads == 0 ||
+		    pool->currently_running_tasks < pool->num_threads)
+		{
+			for (task = scheduler->queue.first; task; task = task->next) {
+				if (task->pool == pool) {
+					work_task = task;
+					found_task = true;
+					BLI_remlink(&scheduler->queue, task);
+					break;
+				}
 			}
 		}
 
@@ -365,6 +392,7 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
 		/* if found task, do it, otherwise wait until other tasks are done */
 		if (found_task) {
 			/* run task */
+			pool->currently_running_tasks++;
 			work_task->run(pool, work_task->taskdata, 0);
 
 			/* delete task */
@@ -387,6 +415,12 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
 	BLI_mutex_unlock(&pool->num_mutex);
 }
 
+void BLI_pool_set_num_threads(TaskPool *pool, int num_threads)
+{
+	/* NOTE: Don't try to modify threads while tasks are running! */
+	pool->num_threads = num_threads;
+}
+
 void BLI_task_pool_cancel(TaskPool *pool)
 {
 	pool->do_cancel = true;




More information about the Bf-blender-cvs mailing list