[Bf-blender-cvs] SVN commit: /data/svn/bf-blender [57912] branches/soc-2013-depsgraph_mt/ source/blender/blenlib: Task scheduler ported to C

Sergey Sharybin sergey.vfx at gmail.com
Mon Jul 1 23:23:21 CEST 2013


Revision: 57912
          http://projects.blender.org/scm/viewvc.php?view=rev&root=bf-blender&revision=57912
Author:   nazgul
Date:     2013-07-01 21:23:20 +0000 (Mon, 01 Jul 2013)
Log Message:
-----------
Task scheduler ported to C

Patch by Brecht, which in original version also
gets rid of ThreadedWorker in favor of new task
scheduler and ports some areas to it.

Kudos to Brecht for this work!

Modified Paths:
--------------
    branches/soc-2013-depsgraph_mt/source/blender/blenlib/BLI_threads.h
    branches/soc-2013-depsgraph_mt/source/blender/blenlib/CMakeLists.txt
    branches/soc-2013-depsgraph_mt/source/blender/blenlib/intern/threads.c

Added Paths:
-----------
    branches/soc-2013-depsgraph_mt/source/blender/blenlib/BLI_task.h
    branches/soc-2013-depsgraph_mt/source/blender/blenlib/intern/task.c

Added: branches/soc-2013-depsgraph_mt/source/blender/blenlib/BLI_task.h
===================================================================
--- branches/soc-2013-depsgraph_mt/source/blender/blenlib/BLI_task.h	                        (rev 0)
+++ branches/soc-2013-depsgraph_mt/source/blender/blenlib/BLI_task.h	2013-07-01 21:23:20 UTC (rev 57912)
@@ -0,0 +1,108 @@
+/*
+ * ***** BEGIN GPL LICENSE BLOCK *****
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ *
+ * ***** END GPL LICENSE BLOCK *****
+ */
+
+#ifndef __BLI_TASK_H__
+#define __BLI_TASK_H__ 
+
+/** \file BLI_task.h
+ *  \ingroup bli
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "BLI_threads.h"
+#include "BLI_utildefines.h"
+
+/* Task Scheduler
+ * 
+ * Central scheduler that holds running threads ready to execute tasks. A single
+ * queue holds the task from all pools.
+ *
+ * Init/exit must be called before/after any task pools are created/freed, and
+ * must be called from the main threads. All other scheduler and pool functions
+ * are thread-safe. */
+
+typedef struct TaskScheduler TaskScheduler;
+
+enum {
+	TASK_SCHEDULER_AUTO_THREADS = 0,
+	TASK_SCHEDULER_SINGLE_THREAD = 1
+};
+
+TaskScheduler *BLI_task_scheduler_create(int num_threads);
+void BLI_task_scheduler_free(TaskScheduler *scheduler);
+
+int BLI_task_scheduler_num_threads(TaskScheduler *scheduler);
+
+/* Task Pool
+ *
+ * Pool of tasks that will be executed by the central TaskScheduler. For each
+ * pool, we can wait for all tasks to be done, or cancel them before they are
+ * done.
+ *
+ * Running tasks may spawn new tasks.
+ *
+ * Pools may be nested, i.e. a thread running a task can create another task
+ * pool with smaller tasks. When other threads are busy they will continue
+ * working on their own tasks, if not they will join in, no new threads will
+ * be launched.
+ */
+
+typedef enum TaskPriority {
+	TASK_PRIORITY_LOW,
+	TASK_PRIORITY_HIGH
+} TaskPriority;
+
+typedef struct TaskPool TaskPool;
+typedef void (*TaskRunFunction)(TaskPool *pool, void *taskdata, int threadid);
+
+TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata);
+void BLI_task_pool_free(TaskPool *pool);
+
+void BLI_task_pool_push(TaskPool *pool, TaskRunFunction run,
+	void *taskdata, bool free_taskdata, TaskPriority priority);
+
+/* work and wait until all tasks are done */
+void BLI_task_pool_work_and_wait(TaskPool *pool);
+/* cancel all tasks, keep worker threads running */
+void BLI_task_pool_cancel(TaskPool *pool);
+/* stop all worker threads */
+void BLI_task_pool_stop(TaskPool *pool);
+
+/* for worker threads, test if cancelled */
+bool BLI_task_pool_cancelled(TaskPool *pool);
+
+/* optional userdata pointer to pass along to run function */
+void *BLI_task_pool_userdata(TaskPool *pool);
+
+/* optional mutex to use from run function */
+ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool);
+
+/* number of tasks done, for stats, don't use this to make decisions */
+size_t BLI_task_pool_tasks_done(TaskPool *pool);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
+

Modified: branches/soc-2013-depsgraph_mt/source/blender/blenlib/BLI_threads.h
===================================================================
--- branches/soc-2013-depsgraph_mt/source/blender/blenlib/BLI_threads.h	2013-07-01 21:07:48 UTC (rev 57911)
+++ branches/soc-2013-depsgraph_mt/source/blender/blenlib/BLI_threads.h	2013-07-01 21:23:20 UTC (rev 57912)
@@ -100,6 +100,7 @@
 void BLI_mutex_free(ThreadMutex *mutex);
 
 void BLI_mutex_lock(ThreadMutex *mutex);
+bool BLI_mutex_trylock(ThreadMutex *mutex);
 void BLI_mutex_unlock(ThreadMutex *mutex);
 
 /* Spin Lock */
@@ -170,6 +171,16 @@
 void BLI_thread_queue_wait_finish(ThreadQueue *queue);
 void BLI_thread_queue_nowait(ThreadQueue *queue);
 
+/* Condition */
+
+typedef pthread_cond_t ThreadCondition;
+
+void BLI_condition_init(ThreadCondition *cond);
+void BLI_condition_wait(ThreadCondition *cond, ThreadMutex *mutex);
+void BLI_condition_notify_one(ThreadCondition *cond);
+void BLI_condition_notify_all(ThreadCondition *cond);
+void BLI_condition_end(ThreadCondition *cond);
+
 #ifdef __cplusplus
 }
 #endif

Modified: branches/soc-2013-depsgraph_mt/source/blender/blenlib/CMakeLists.txt
===================================================================
--- branches/soc-2013-depsgraph_mt/source/blender/blenlib/CMakeLists.txt	2013-07-01 21:07:48 UTC (rev 57911)
+++ branches/soc-2013-depsgraph_mt/source/blender/blenlib/CMakeLists.txt	2013-07-01 21:23:20 UTC (rev 57912)
@@ -91,6 +91,7 @@
 	intern/string.c
 	intern/string_cursor_utf8.c
 	intern/string_utf8.c
+	intern/task.c
 	intern/threads.c
 	intern/time.c
 	intern/uvproject.c
@@ -150,6 +151,7 @@
 	BLI_string_cursor_utf8.h
 	BLI_string_utf8.h
 	BLI_sys_types.h
+	BLI_task.h
 	BLI_threads.h
 	BLI_utildefines.h
 	BLI_uvproject.h

Added: branches/soc-2013-depsgraph_mt/source/blender/blenlib/intern/task.c
===================================================================
--- branches/soc-2013-depsgraph_mt/source/blender/blenlib/intern/task.c	                        (rev 0)
+++ branches/soc-2013-depsgraph_mt/source/blender/blenlib/intern/task.c	2013-07-01 21:23:20 UTC (rev 57912)
@@ -0,0 +1,402 @@
+/*
+ * ***** BEGIN GPL LICENSE BLOCK *****
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ *
+ * ***** END GPL LICENSE BLOCK *****
+ */
+
+#include <stdlib.h>
+
+#include "MEM_guardedalloc.h"
+
+#include "BLI_listbase.h"
+#include "BLI_task.h"
+#include "BLI_threads.h"
+
+/* Types */
+
+typedef struct Task {
+	struct Task *next, *prev;
+
+	TaskRunFunction run;
+	void *taskdata;
+	bool free_taskdata;
+	TaskPool *pool;
+} Task;
+
+struct TaskPool {
+	TaskScheduler *scheduler;
+
+	volatile size_t num;
+	volatile size_t done;
+	ThreadMutex num_mutex;
+	ThreadCondition num_cond;
+
+	void *userdata;
+	ThreadMutex user_mutex;
+
+	volatile bool do_cancel;
+};
+
+struct TaskScheduler {
+	pthread_t *threads;
+	int num_threads;
+
+	ListBase queue;
+	ThreadMutex queue_mutex;
+	ThreadCondition queue_cond;
+
+	volatile bool do_exit;
+};
+
+typedef struct TaskThread {
+	TaskScheduler *scheduler;
+	int id;
+} TaskThread;
+
+/* Task Scheduler */
+
+static void task_pool_num_decrease(TaskPool *pool, size_t done)
+{
+	BLI_mutex_lock(&pool->num_mutex);
+	pool->num -= done;
+	pool->done += done;
+
+	BLI_assert(pool->num >= 0);
+	if(pool->num == 0)
+		BLI_condition_notify_all(&pool->num_cond);
+
+	BLI_mutex_unlock(&pool->num_mutex);
+}
+
+static void task_pool_num_increase(TaskPool *pool)
+{
+	BLI_mutex_lock(&pool->num_mutex);
+
+	pool->num++;
+	BLI_condition_notify_all(&pool->num_cond);
+
+	BLI_mutex_unlock(&pool->num_mutex);
+}
+
+static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task)
+{
+	BLI_mutex_lock(&scheduler->queue_mutex);
+
+	while(!scheduler->queue.first && !scheduler->do_exit)
+		BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
+
+	if(!scheduler->queue.first) {
+		BLI_mutex_unlock(&scheduler->queue_mutex);
+		BLI_assert(scheduler->do_exit);
+		return false;
+	}
+	
+	*task = scheduler->queue.first;
+	BLI_remlink(&scheduler->queue, *task);
+
+	BLI_mutex_unlock(&scheduler->queue_mutex);
+
+	return true;
+}
+
+static void *task_scheduler_thread_run(void *thread_p)
+{
+	TaskThread *thread = (TaskThread*)thread_p;
+	TaskScheduler *scheduler = thread->scheduler;
+	int thread_id = thread->id;
+	Task *task;
+
+	/* keep popping off tasks */
+	while(task_scheduler_thread_wait_pop(scheduler, &task)) {
+		/* run task */
+		task->run(task->pool, task->taskdata, thread_id);
+
+		/* notify pool task was done */
+		task_pool_num_decrease(task->pool, 1);
+
+		/* delete task */
+		if(task->free_taskdata)
+			MEM_freeN(task->taskdata);
+		MEM_freeN(task);
+	}
+
+	MEM_freeN(thread);
+
+	return NULL;
+}
+
+TaskScheduler *BLI_task_scheduler_create(int num_threads)
+{
+	TaskScheduler *scheduler = MEM_callocN(sizeof(TaskScheduler), "TaskScheduler");
+
+	/* multiple places can use this task scheduler, sharing the same
+	 * threads, so we keep track of the number of users. */
+	scheduler->do_exit = false;
+
+	scheduler->queue.first = scheduler->queue.last = NULL;
+	BLI_mutex_init(&scheduler->queue_mutex);
+	BLI_condition_init(&scheduler->queue_cond);
+
+	if(num_threads == 0) {
+		/* automatic number of threads will be main thread + num cores */
+		num_threads = BLI_system_thread_count();
+	}
+
+	/* main thread will also work, so we count it too */
+	num_threads -= 1;
+
+	/* launch threads that will be waiting for work */
+	if(num_threads > 0) {
+		int i;
+
+		scheduler->num_threads = num_threads;
+		scheduler->threads = MEM_callocN(sizeof(pthread_t)*num_threads, "TaskScheduler threads");
+
+		for(i = 0; i < num_threads; i++) {
+			TaskThread *thread = MEM_callocN(sizeof(TaskThread), "TaskThread");
+			thread->scheduler = scheduler;
+			thread->id = i+1;
+
+			if(pthread_create(&scheduler->threads[i], NULL, task_scheduler_thread_run, thread) != 0) {
+				fprintf(stderr, "TaskScheduler failed to launch thread %d/%d\n", i, num_threads);
+				MEM_freeN(thread);
+			}
+		}
+	}
+	
+	return scheduler;
+}
+
+void BLI_task_scheduler_free(TaskScheduler *scheduler)
+{
+	Task *task;
+
+	/* stop all waiting threads */
+	scheduler->do_exit = true;

@@ Diff output truncated at 10240 characters. @@



More information about the Bf-blender-cvs mailing list