[Bf-blender-cvs] [a7536df] input_method_editor: Task scheduler: Add an option to limit number of threads per pool

Sergey Sharybin noreply at git.blender.org
Mon Nov 24 00:45:13 CET 2014


Commit: a7536dfac5b33486be769dad68c164af04d4167c
Author: Sergey Sharybin
Date:   Fri Nov 21 11:31:30 2014 +0100
Branches: input_method_editor
https://developer.blender.org/rBa7536dfac5b33486be769dad68c164af04d4167c

Task scheduler: Add an option to limit number of threads per pool

This way we can have scheduler capable of scheduling tasks on all the CPUs
but in the same time we can limit tasks like baking (in the future) to use
no more than given number of threads.

===================================================================

M	source/blender/blenlib/BLI_task.h
M	source/blender/blenlib/intern/task.c

===================================================================

diff --git a/source/blender/blenlib/BLI_task.h b/source/blender/blenlib/BLI_task.h
index 28da673..8f85bc4 100644
--- a/source/blender/blenlib/BLI_task.h
+++ b/source/blender/blenlib/BLI_task.h
@@ -88,6 +88,9 @@ void BLI_task_pool_cancel(TaskPool *pool);
 /* stop all worker threads */
 void BLI_task_pool_stop(TaskPool *pool);
 
+/* set number of threads allowed to be used by this pool */
+void BLI_pool_set_num_threads(TaskPool *pool, int num_threads);
+
 /* for worker threads, test if canceled */
 bool BLI_task_pool_canceled(TaskPool *pool);
 
diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c
index 219ccb1..e4cded1 100644
--- a/source/blender/blenlib/intern/task.c
+++ b/source/blender/blenlib/intern/task.c
@@ -49,6 +49,8 @@ struct TaskPool {
 
 	volatile size_t num;
 	volatile size_t done;
+	volatile int num_threads;
+	volatile int currently_running_tasks;
 	ThreadMutex num_mutex;
 	ThreadCondition num_cond;
 
@@ -84,6 +86,7 @@ static void task_pool_num_decrease(TaskPool *pool, size_t done)
 	BLI_assert(pool->num >= done);
 
 	pool->num -= done;
+	pool->currently_running_tasks -= done;
 	pool->done += done;
 
 	if (pool->num == 0)
@@ -104,19 +107,37 @@ static void task_pool_num_increase(TaskPool *pool)
 
 static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task)
 {
+	bool found_task = false;
 	BLI_mutex_lock(&scheduler->queue_mutex);
 
 	while (!scheduler->queue.first && !scheduler->do_exit)
 		BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
 
-	if (!scheduler->queue.first) {
-		BLI_mutex_unlock(&scheduler->queue_mutex);
-		BLI_assert(scheduler->do_exit);
-		return false;
-	}
-	
-	*task = scheduler->queue.first;
-	BLI_remlink(&scheduler->queue, *task);
+	do {
+		Task *current_task;
+		if (!scheduler->queue.first) {
+			BLI_mutex_unlock(&scheduler->queue_mutex);
+			BLI_assert(scheduler->do_exit);
+			return false;
+		}
+		for (current_task = scheduler->queue.first;
+		     current_task != NULL;
+		     current_task = current_task->next)
+		{
+			TaskPool *pool = current_task->pool;
+			if (pool->num_threads == 0 ||
+			    pool->currently_running_tasks < pool->num_threads)
+			{
+				*task = current_task;
+				found_task = true;
+				pool->currently_running_tasks++;
+				BLI_remlink(&scheduler->queue, *task);
+				break;
+			}
+		}
+		if (!found_task)
+			BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
+	} while (!found_task);
 
 	BLI_mutex_unlock(&scheduler->queue_mutex);
 
@@ -288,6 +309,8 @@ TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata)
 
 	pool->scheduler = scheduler;
 	pool->num = 0;
+	pool->num_threads = 0;
+	pool->currently_running_tasks = 0;
 	pool->do_cancel = false;
 
 	BLI_mutex_init(&pool->num_mutex);
@@ -351,12 +374,16 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
 		/* find task from this pool. if we get a task from another pool,
 		 * we can get into deadlock */
 
-		for (task = scheduler->queue.first; task; task = task->next) {
-			if (task->pool == pool) {
-				work_task = task;
-				found_task = true;
-				BLI_remlink(&scheduler->queue, task);
-				break;
+		if (pool->num_threads == 0 ||
+		    pool->currently_running_tasks < pool->num_threads)
+		{
+			for (task = scheduler->queue.first; task; task = task->next) {
+				if (task->pool == pool) {
+					work_task = task;
+					found_task = true;
+					BLI_remlink(&scheduler->queue, task);
+					break;
+				}
 			}
 		}
 
@@ -365,6 +392,7 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
 		/* if found task, do it, otherwise wait until other tasks are done */
 		if (found_task) {
 			/* run task */
+			pool->currently_running_tasks++;
 			work_task->run(pool, work_task->taskdata, 0);
 
 			/* delete task */
@@ -387,6 +415,12 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
 	BLI_mutex_unlock(&pool->num_mutex);
 }
 
+void BLI_pool_set_num_threads(TaskPool *pool, int num_threads)
+{
+	/* NOTE: Don't try to modify threads while tasks are running! */
+	pool->num_threads = num_threads;
+}
+
 void BLI_task_pool_cancel(TaskPool *pool)
 {
 	pool->do_cancel = true;




More information about the Bf-blender-cvs mailing list