[Bf-blender-cvs] [55c2cd85f0] master: Task scheduler: Initial implementation of local tasks queues

Sergey Sharybin noreply at git.blender.org
Tue Mar 7 17:33:39 CET 2017


Commit: 55c2cd85f0bcf39820013c6ebba1d375d078323d
Author: Sergey Sharybin
Date:   Mon Mar 6 15:40:05 2017 +0100
Branches: master
https://developer.blender.org/rB55c2cd85f0bcf39820013c6ebba1d375d078323d

Task scheduler: Initial implementation of local tasks queues

The idea is to allow some amount of tasks to be pushed from working
thread to it's local queue, so we can acquire some work without doing
whole mutex lock.

This should allow us to remove some hacks from depsgraph which was
added there to keep threads alive.

===================================================================

M	source/blender/blenlib/intern/task.c

===================================================================

diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c
index a7449f97bf..4e73ff6633 100644
--- a/source/blender/blenlib/intern/task.c
+++ b/source/blender/blenlib/intern/task.c
@@ -48,6 +48,27 @@
  */
 #define MEMPOOL_SIZE 256
 
+/* Number of tasks which are pushed directly to local thread queue.
+ *
+ * This allows thread to fetch next task without locking the whole queue.
+ */
+#define LOCALQUEUE_SIZE 1
+
+#ifndef NDEBUG
+#  define ASSERT_THREAD_ID(scheduler, thread_id) \
+	do { \
+		if (!BLI_thread_is_main()) { \
+			TaskThread *thread = pthread_getspecific(scheduler->tls_id_key); \
+			BLI_assert(thread_id == thread->id); \
+		} \
+		else { \
+			BLI_assert(thread_id == 0); \
+		} \
+	} while (false)
+#else
+#  define ASSERT_THREAD_ID(scheduler, thread_id)
+#endif
+
 typedef struct Task {
 	struct Task *next, *prev;
 
@@ -104,6 +125,8 @@ typedef struct TaskMemPoolStats {
 
 typedef struct TaskThreadLocalStorage {
 	TaskMemPool task_mempool;
+	int num_local_queue;
+	Task *local_queue[LOCALQUEUE_SIZE];
 } TaskThreadLocalStorage;
 
 struct TaskPool {
@@ -117,6 +140,7 @@ struct TaskPool {
 	ThreadMutex user_mutex;
 
 	volatile bool do_cancel;
+	volatile bool do_work;
 
 	/* If set, this pool may never be work_and_wait'ed, which means TaskScheduler
 	 * has to use its special background fallback thread in case we are in
@@ -175,6 +199,9 @@ BLI_INLINE TaskThreadLocalStorage *get_task_tls(TaskPool *pool,
 	TaskScheduler *scheduler = pool->scheduler;
 	BLI_assert(thread_id >= 0);
 	BLI_assert(thread_id <= scheduler->num_threads);
+	if (thread_id == 0) {
+		return &scheduler->task_threads[pool->thread_id].tls;
+	}
 	return &scheduler->task_threads[thread_id].tls;
 }
 
@@ -314,9 +341,28 @@ static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task
 	return true;
 }
 
+BLI_INLINE void handle_local_queue(TaskThreadLocalStorage *tls,
+                                   const int thread_id)
+{
+	while (tls->num_local_queue > 0) {
+		/* We pop task from queue before handling it so handler of the task can
+		 * push next job to the local queue.
+		 */
+		tls->num_local_queue--;
+		Task *local_task = tls->local_queue[tls->num_local_queue];
+		/* TODO(sergey): Double-check work_and_wait() doesn't handle other's
+		 * pool tasks.
+		 */
+		TaskPool *local_pool = local_task->pool;
+		local_task->run(local_pool, local_task->taskdata, thread_id);
+		task_free(local_pool, local_task, thread_id);
+	}
+}
+
 static void *task_scheduler_thread_run(void *thread_p)
 {
 	TaskThread *thread = (TaskThread *) thread_p;
+	TaskThreadLocalStorage *tls = &thread->tls;
 	TaskScheduler *scheduler = thread->scheduler;
 	int thread_id = thread->id;
 	Task *task;
@@ -333,6 +379,9 @@ static void *task_scheduler_thread_run(void *thread_p)
 		/* delete task */
 		task_free(pool, task, thread_id);
 
+		/* Handle all tasks from local queue. */
+		handle_local_queue(tls, thread_id);
+
 		/* notify pool task was done */
 		task_pool_num_decrease(pool, 1);
 	}
@@ -506,6 +555,7 @@ static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, c
 	pool->scheduler = scheduler;
 	pool->num = 0;
 	pool->do_cancel = false;
+	pool->do_work = false;
 	pool->run_in_background = is_background;
 
 	BLI_mutex_init(&pool->num_mutex);
@@ -603,6 +653,19 @@ static void task_pool_push(
 	task->freedata = freedata;
 	task->pool = pool;
 
+	if (thread_id != -1 &&
+	    (thread_id != pool->thread_id || pool->do_work))
+	{
+		ASSERT_THREAD_ID(pool->scheduler, thread_id);
+
+		TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
+		if (tls->num_local_queue < LOCALQUEUE_SIZE) {
+			tls->local_queue[tls->num_local_queue] = task;
+			tls->num_local_queue++;
+			return;
+		}
+	}
+
 	task_scheduler_push(pool->scheduler, task, priority);
 }
 
@@ -627,14 +690,12 @@ void BLI_task_pool_push_from_thread(TaskPool *pool, TaskRunFunction run,
 
 void BLI_task_pool_work_and_wait(TaskPool *pool)
 {
+	TaskThreadLocalStorage *tls = get_task_tls(pool, pool->thread_id);
 	TaskScheduler *scheduler = pool->scheduler;
 
-#ifndef NDEBUG
-	if (!BLI_thread_is_main()) {
-		TaskThread *thread = pthread_getspecific(scheduler->tls_id_key);
-		BLI_assert(pool->thread_id == thread->id);
-	}
-#endif
+	pool->do_work = true;
+
+	ASSERT_THREAD_ID(pool->scheduler, pool->thread_id);
 
 	BLI_mutex_lock(&pool->num_mutex);
 
@@ -668,6 +729,9 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
 			/* delete task */
 			task_free(pool, task, pool->thread_id);
 
+			/* Handle all tasks from local queue. */
+			handle_local_queue(tls, pool->thread_id);
+
 			/* notify pool task was done */
 			task_pool_num_decrease(pool, 1);
 		}




More information about the Bf-blender-cvs mailing list