[Bf-blender-cvs] [7efa34d] master: Task scheduler: Add thread-aware task push routines

Sergey Sharybin noreply at git.blender.org
Tue May 10 10:01:16 CEST 2016


Commit: 7efa34d078336bb366ac5b099e56a2896176f599
Author: Sergey Sharybin
Date:   Tue May 10 09:55:58 2016 +0200
Branches: master
https://developer.blender.org/rB7efa34d078336bb366ac5b099e56a2896176f599

Task scheduler: Add thread-aware task push routines

This commit implements new function BLI_task_pool_push_from_thread()
who's main goal is to have less parasitic load on the CPU bu avoiding
memory allocations as much as possible, making taks pushing cheaper.

This function expects thread ID, which must be 0 for the thread from
which pool is created from (and from which wait_work() is called) and
for other threads it mush be the ID which was sent to the thread working
function.

This reduces allocations quite a bit in the new dependency graph,
hopefully gaining some visible speedup on a fewzillion core machines
(on my own machine can only see benefit in profiler, which shows
significant reduce of time wasted in the memory allocation).

===================================================================

M	source/blender/blenlib/BLI_task.h
M	source/blender/blenlib/intern/task.c
M	source/blender/depsgraph/intern/depsgraph_eval.cc

===================================================================

diff --git a/source/blender/blenlib/BLI_task.h b/source/blender/blenlib/BLI_task.h
index 63a0795..4cf1d8b 100644
--- a/source/blender/blenlib/BLI_task.h
+++ b/source/blender/blenlib/BLI_task.h
@@ -84,7 +84,9 @@ void BLI_task_pool_push_ex(
         TaskPool *pool, TaskRunFunction run, void *taskdata,
         bool free_taskdata, TaskFreeFunction freedata, TaskPriority priority);
 void BLI_task_pool_push(TaskPool *pool, TaskRunFunction run,
-	void *taskdata, bool free_taskdata, TaskPriority priority);
+        void *taskdata, bool free_taskdata, TaskPriority priority);
+void BLI_task_pool_push_from_thread(TaskPool *pool, TaskRunFunction run,
+        void *taskdata, bool free_taskdata, TaskPriority priority, int thread_id);
 
 /* work and wait until all tasks are done */
 void BLI_task_pool_work_and_wait(TaskPool *pool);
diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c
index b657120..9fefd71 100644
--- a/source/blender/blenlib/intern/task.c
+++ b/source/blender/blenlib/intern/task.c
@@ -35,8 +35,17 @@
 
 #include "atomic_ops.h"
 
+/* Define this to enable some detailed statistic print. */
+#undef DEBUG_STATS
+
 /* Types */
 
+/* Number of per-thread pre-allocated tasks.
+ *
+ * For more details see description of TaskMemPool.
+ */
+#define MEMPOOL_SIZE 256
+
 typedef struct Task {
 	struct Task *next, *prev;
 
@@ -47,6 +56,50 @@ typedef struct Task {
 	TaskPool *pool;
 } Task;
 
+/* This is a per-thread storage of pre-allocated tasks.
+ *
+ * The idea behind this is simple: reduce amount of malloc() calls when pushing
+ * new task to the pool. This is done by keeping memory from the tasks which
+ * were finished already, so instead of freeing that memory we put it to the
+ * pool for the later re-use.
+ *
+ * The tricky part here is to avoid any inter-thread synchronization, hence no
+ * lock must exist around this pool. The pool will become an owner of the pointer
+ * from freed task, and only corresponding thread will be able to use this pool
+ * (no memory stealing and such).
+ *
+ * This leads to the following use of the pool:
+ *
+ * - task_push() should provide proper thread ID from which the task is being
+ *   pushed from.
+ *
+ * - Task allocation function which check corresponding memory pool and if there
+ *   is any memory in there it'll mark memory as re-used, remove it from the pool
+ *   and use that memory for the new task.
+ *
+ *   At this moment task queue owns the memory.
+ *
+ * - When task is done and task_free() is called the memory will be put to the
+ *  pool which corresponds to a thread which handled the task.
+ */
+typedef struct TaskMemPool {
+	/* Number of pre-allocated tasks in the pool. */
+	int num_tasks;
+	/* Pre-allocated task memory pointers. */
+	Task *tasks[MEMPOOL_SIZE];
+} TaskMemPool;
+
+#ifdef DEBUG_STATS
+typedef struct TaskMemPoolStats {
+	/* Number of allocations. */
+	int num_alloc;
+	/* Number of avoided allocations (pointer was re-used from the pool). */
+	int num_reuse;
+	/* Number of discarded memory due to pool saturation, */
+	int num_discard;
+} TaskMemPoolStats;
+#endif
+
 struct TaskPool {
 	TaskScheduler *scheduler;
 
@@ -62,14 +115,32 @@ struct TaskPool {
 
 	volatile bool do_cancel;
 
-	/* If set, this pool may never be work_and_wait'ed, which means TaskScheduler has to use its special
-	 * background fallback thread in case we are in single-threaded situation. */
+	/* If set, this pool may never be work_and_wait'ed, which means TaskScheduler
+	 * has to use its special background fallback thread in case we are in
+	 * single-threaded situation.
+	 */
 	bool run_in_background;
+
+	/* This pool is used for caching task pointers for thread id 0.
+	 * This could either point to a global scheduler's task_mempool[0] if the
+	 * pool is handled form the main thread or point to task_mempool_local
+	 * otherwise.
+	 *
+	 * This way we solve possible threading conflicts accessing same global
+	 * memory pool from multiple threads from which wait_work() is called.
+	 */
+	TaskMemPool *task_mempool;
+	TaskMemPool task_mempool_local;
+
+#ifdef DEBUG_STATS
+	TaskMemPoolStats *mempool_stats;
+#endif
 };
 
 struct TaskScheduler {
 	pthread_t *threads;
 	struct TaskThread *task_threads;
+	TaskMemPool *task_mempool;
 	int num_threads;
 	bool background_thread_only;
 
@@ -98,6 +169,63 @@ static void task_data_free(Task *task, const int thread_id)
 	}
 }
 
+BLI_INLINE TaskMemPool *get_task_mempool(TaskPool *pool, const int thread_id)
+{
+	if (thread_id == 0) {
+		return pool->task_mempool;
+	}
+	return &pool->scheduler->task_mempool[thread_id];
+}
+
+static Task *task_alloc(TaskPool *pool, const int thread_id)
+{
+	assert(thread_id <= pool->scheduler->num_threads);
+	if (thread_id != -1) {
+		assert(thread_id >= 0);
+		TaskMemPool *mem_pool = get_task_mempool(pool, thread_id);
+		/* Try to re-use task memory from a thread local storage. */
+		if (mem_pool->num_tasks > 0) {
+			--mem_pool->num_tasks;
+			/* Success! We've just avoided task allocation. */
+#ifdef DEBUG_STATS
+			pool->mempool_stats[thread_id].num_reuse++;
+#endif
+			return mem_pool->tasks[mem_pool->num_tasks];
+		}
+		/* We are doomed to allocate new task data. */
+#ifdef DEBUG_STATS
+		pool->mempool_stats[thread_id].num_alloc++;
+#endif
+	}
+	return MEM_mallocN(sizeof(Task), "New task");
+}
+
+static void task_free(TaskPool *pool, Task *task, const int thread_id)
+{
+	task_data_free(task, thread_id);
+	assert(thread_id >= 0);
+	assert(thread_id <= pool->scheduler->num_threads);
+	TaskMemPool *mem_pool = get_task_mempool(pool, thread_id);
+	if (mem_pool->num_tasks < MEMPOOL_SIZE - 1) {
+		/* Successfully allowed the task to be re-used later. */
+		mem_pool->tasks[mem_pool->num_tasks] = task;
+		++mem_pool->num_tasks;
+	}
+	else {
+		/* Local storage saturated, no other way than just discard
+		 * the memory.
+		 *
+		 * TODO(sergey): We can perhaps store such pointer in a global
+		 * scheduler pool, maybe it'll be faster than discarding and
+		 * allocating again.
+		 */
+		MEM_freeN(task);
+#ifdef DEBUG_STATS
+		pool->mempool_stats[thread_id].num_discard++;
+#endif
+	}
+}
+
 /* Task Scheduler */
 
 static void task_pool_num_decrease(TaskPool *pool, size_t done)
@@ -196,8 +324,7 @@ static void *task_scheduler_thread_run(void *thread_p)
 		task->run(pool, task->taskdata, thread_id);
 
 		/* delete task */
-		task_data_free(task, thread_id);
-		MEM_freeN(task);
+		task_free(pool, task, thread_id);
 
 		/* notify pool task was done */
 		task_pool_num_decrease(pool, 1);
@@ -249,6 +376,9 @@ TaskScheduler *BLI_task_scheduler_create(int num_threads)
 				fprintf(stderr, "TaskScheduler failed to launch thread %d/%d\n", i, num_threads);
 			}
 		}
+
+		scheduler->task_mempool = MEM_callocN(sizeof(*scheduler->task_mempool) * (num_threads + 1),
+		                                      "TaskScheduler task_mempool");
 	}
 
 	return scheduler;
@@ -281,6 +411,16 @@ void BLI_task_scheduler_free(TaskScheduler *scheduler)
 		MEM_freeN(scheduler->task_threads);
 	}
 
+	/* Delete task memory pool */
+	if (scheduler->task_mempool) {
+		for (int i = 0; i <= scheduler->num_threads; ++i) {
+			for (int j = 0; j < scheduler->task_mempool[i].num_tasks; ++j) {
+				MEM_freeN(scheduler->task_mempool[i].tasks[j]);
+			}
+		}
+		MEM_freeN(scheduler->task_mempool);
+	}
+
 	/* delete leftover tasks */
 	for (task = scheduler->queue.first; task; task = task->next) {
 		task_data_free(task, 0);
@@ -372,6 +512,20 @@ static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, c
 	pool->userdata = userdata;
 	BLI_mutex_init(&pool->user_mutex);
 
+	if (BLI_thread_is_main()) {
+		pool->task_mempool = scheduler->task_mempool;
+	}
+	else {
+		pool->task_mempool = &pool->task_mempool_local;
+		pool->task_mempool_local.num_tasks = 0;
+	}
+
+#ifdef DEBUG_STATS
+	pool->mempool_stats =
+	        MEM_callocN(sizeof(*pool->mempool_stats) * (scheduler->num_threads + 1),
+	                    "per-taskpool mempool stats");
+#endif
+
 	/* Ensure malloc will go fine from threads,
 	 *
 	 * This is needed because we could be in main thread here
@@ -417,16 +571,36 @@ void BLI_task_pool_free(TaskPool *pool)
 
 	BLI_mutex_end(&pool->user_mutex);
 
+	/* Free local memory pool, those pointers are lost forever. */
+	if (pool->task_mempool == &pool->task_mempool_local) {
+		for (int i = 0; i < pool->task_mempool_local.num_tasks; i++) {
+			MEM_freeN(pool->task_mempool_local.tasks[i]);
+		}
+	}
+
+#ifdef DEBUG_STATS
+	printf("Thread ID    Allocated   Reused   Discarded\n");
+	for (int i = 0; i < pool->scheduler->num_threads + 1; ++i) {
+		printf("%02d           %05d       %05d    %05d\n",
+		       i,
+		       pool->mempool_stats[i].num_alloc,
+		       pool->mempool_stats[i].num_reuse,
+		       pool->mempool_stats[i].num_discard);
+	}
+	MEM_freeN(pool->mempool_stats);
+#endif
+
 	MEM_freeN(pool);
 
 	BLI_end_threaded_malloc();
 }
 
-void BLI_task_pool_push_ex(
+static void task_pool_push(
         TaskPool *pool, TaskRunFunction run, void *taskdata,
-        bool free_taskdata, TaskFreeFunction freedata, TaskPriority priority)
+        bool free_taskdata, TaskFreeFunction freedata, TaskPriority priority,
+        int thread_id)
 {
-	Task *task = MEM_mallocN(sizeof(Task), "Task");
+	Task *task = task_alloc(pool, thread_id);
 
 	task->run = run;
 	task->taskdata = taskdata;
@@ -437,12 +611,25 @@ void BLI_task_pool_push_ex(
 	task_scheduler_push(pool->scheduler, task, priority);
 }
 
+void BLI_task_pool_push_ex(
+        TaskPool *pool, TaskRunFunction run, void *taskdata,
+        bool free_taskdata, TaskFreeFunction freedata, TaskPriority priority)
+{
+	task_pool_push(pool, run, taskdata, free_taskdata, freedata, priority, -1);
+}
+
 void BLI_task_pool_push(
         TaskPool *pool, TaskRunFunction run, void *taskdata, bool free_taskdata, TaskPriority priority)
 {
 	BLI_task_pool_push_ex(pool, run, taskdata, free_taskdata, NULL, priority);
 }
 
+void BLI_task_pool_push_from_thread(TaskPool *pool, TaskRunFunction run,
+        void *taskdata, bool free_taskdata, TaskPriority priority, int thread_id)
+{
+	task_pool_push(pool, run, taskdata, free_taskdata, NULL, priority, thread_id);
+}
+
 void BLI_task_pool_work_and_wait(TaskPool *pool)
 {
 	

@@ Diff output truncated at 10240 characters. @@




More information about the Bf-blender-cvs mailing list