[Bf-blender-cvs] SVN commit: /data/svn/bf-blender [47671] trunk/blender/source/blender: Applied and completed a compositor patch by Brecht to use signalling and waiting in scheduling and worker threads instead of continuous loops with sleep times .

Lukas Toenne lukas.toenne at googlemail.com
Sun Jun 10 14:26:34 CEST 2012


Revision: 47671
          http://projects.blender.org/scm/viewvc.php?view=rev&root=bf-blender&revision=47671
Author:   lukastoenne
Date:     2012-06-10 12:26:33 +0000 (Sun, 10 Jun 2012)
Log Message:
-----------
Applied and completed a compositor patch by Brecht to use signalling and waiting in scheduling and worker threads instead of continuous loops with sleep times. This should help reduce unnecessary wait times in Tile.

Modified Paths:
--------------
    trunk/blender/source/blender/blenlib/BLI_threads.h
    trunk/blender/source/blender/blenlib/intern/threads.c
    trunk/blender/source/blender/compositor/intern/COM_ExecutionGroup.cpp
    trunk/blender/source/blender/compositor/intern/COM_WorkScheduler.cpp
    trunk/blender/source/blender/compositor/intern/COM_WorkScheduler.h

Modified: trunk/blender/source/blender/blenlib/BLI_threads.h
===================================================================
--- trunk/blender/source/blender/blenlib/BLI_threads.h	2012-06-10 12:23:44 UTC (rev 47670)
+++ trunk/blender/source/blender/blenlib/BLI_threads.h	2012-06-10 12:26:33 UTC (rev 47671)
@@ -136,6 +136,7 @@
 void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms);
 int BLI_thread_queue_size(ThreadQueue *queue);
 
+void BLI_thread_queue_wait_finish(ThreadQueue *queue);
 void BLI_thread_queue_nowait(ThreadQueue *queue);
 
 #endif

Modified: trunk/blender/source/blender/blenlib/intern/threads.c
===================================================================
--- trunk/blender/source/blender/blenlib/intern/threads.c	2012-06-10 12:23:44 UTC (rev 47670)
+++ trunk/blender/source/blender/blenlib/intern/threads.c	2012-06-10 12:26:33 UTC (rev 47671)
@@ -520,8 +520,10 @@
 struct ThreadQueue {
 	GSQueue *queue;
 	pthread_mutex_t mutex;
-	pthread_cond_t cond;
-	int nowait;
+	pthread_cond_t push_cond;
+	pthread_cond_t finish_cond;
+	volatile int nowait;
+	volatile int cancelled;
 };
 
 ThreadQueue *BLI_thread_queue_init(void)
@@ -532,14 +534,17 @@
 	queue->queue = BLI_gsqueue_new(sizeof(void *));
 
 	pthread_mutex_init(&queue->mutex, NULL);
-	pthread_cond_init(&queue->cond, NULL);
+	pthread_cond_init(&queue->push_cond, NULL);
+	pthread_cond_init(&queue->finish_cond, NULL);
 
 	return queue;
 }
 
 void BLI_thread_queue_free(ThreadQueue *queue)
 {
-	pthread_cond_destroy(&queue->cond);
+	/* destroy everything, assumes no one is using queue anymore */
+	pthread_cond_destroy(&queue->finish_cond);
+	pthread_cond_destroy(&queue->push_cond);
 	pthread_mutex_destroy(&queue->mutex);
 
 	BLI_gsqueue_free(queue->queue);
@@ -554,7 +559,7 @@
 	BLI_gsqueue_push(queue->queue, &work);
 
 	/* signal threads waiting to pop */
-	pthread_cond_signal(&queue->cond);
+	pthread_cond_signal(&queue->push_cond);
 	pthread_mutex_unlock(&queue->mutex);
 }
 
@@ -565,11 +570,15 @@
 	/* wait until there is work */
 	pthread_mutex_lock(&queue->mutex);
 	while (BLI_gsqueue_is_empty(queue->queue) && !queue->nowait)
-		pthread_cond_wait(&queue->cond, &queue->mutex);
-
+		pthread_cond_wait(&queue->push_cond, &queue->mutex);
+	
 	/* if we have something, pop it */
-	if (!BLI_gsqueue_is_empty(queue->queue))
+	if (!BLI_gsqueue_is_empty(queue->queue)) {
 		BLI_gsqueue_pop(queue->queue, &work);
+		
+		if(BLI_gsqueue_is_empty(queue->queue))
+			pthread_cond_broadcast(&queue->finish_cond);
+	}
 
 	pthread_mutex_unlock(&queue->mutex);
 
@@ -623,16 +632,20 @@
 	/* wait until there is work */
 	pthread_mutex_lock(&queue->mutex);
 	while (BLI_gsqueue_is_empty(queue->queue) && !queue->nowait) {
-		if (pthread_cond_timedwait(&queue->cond, &queue->mutex, &timeout) == ETIMEDOUT)
+		if (pthread_cond_timedwait(&queue->push_cond, &queue->mutex, &timeout) == ETIMEDOUT)
 			break;
 		else if (PIL_check_seconds_timer() - t >= ms * 0.001)
 			break;
 	}
 
 	/* if we have something, pop it */
-	if (!BLI_gsqueue_is_empty(queue->queue))
+	if (!BLI_gsqueue_is_empty(queue->queue)) {
 		BLI_gsqueue_pop(queue->queue, &work);
-
+		
+		if(BLI_gsqueue_is_empty(queue->queue))
+			pthread_cond_broadcast(&queue->finish_cond);
+	}
+	
 	pthread_mutex_unlock(&queue->mutex);
 
 	return work;
@@ -656,10 +669,23 @@
 	queue->nowait = 1;
 
 	/* signal threads waiting to pop */
-	pthread_cond_signal(&queue->cond);
+	pthread_cond_broadcast(&queue->push_cond);
 	pthread_mutex_unlock(&queue->mutex);
 }
 
+void BLI_thread_queue_wait_finish(ThreadQueue *queue)
+{
+	/* wait for finish condition */
+	pthread_mutex_lock(&queue->mutex);
+
+    while(!BLI_gsqueue_is_empty(queue->queue))
+		pthread_cond_wait(&queue->finish_cond, &queue->mutex);
+
+	pthread_mutex_unlock(&queue->mutex);
+}
+
+/* ************************************************ */
+
 void BLI_begin_threaded_malloc(void)
 {
 	if (thread_levels == 0) {
@@ -674,3 +700,4 @@
 	if (thread_levels == 0)
 		MEM_set_lock_callback(NULL, NULL);
 }
+

Modified: trunk/blender/source/blender/compositor/intern/COM_ExecutionGroup.cpp
===================================================================
--- trunk/blender/source/blender/compositor/intern/COM_ExecutionGroup.cpp	2012-06-10 12:23:44 UTC (rev 47670)
+++ trunk/blender/source/blender/compositor/intern/COM_ExecutionGroup.cpp	2012-06-10 12:26:33 UTC (rev 47671)
@@ -351,8 +351,9 @@
 				startIndex = index+1;
 			}
 		}
-		PIL_sleep_ms(10);
 
+		WorkScheduler::finish();
+
 		if (bTree->test_break && bTree->test_break(bTree->tbh)) {
 			breaked = true;
 		}

Modified: trunk/blender/source/blender/compositor/intern/COM_WorkScheduler.cpp
===================================================================
--- trunk/blender/source/blender/compositor/intern/COM_WorkScheduler.cpp	2012-06-10 12:23:44 UTC (rev 47670)
+++ trunk/blender/source/blender/compositor/intern/COM_WorkScheduler.cpp	2012-06-10 12:26:33 UTC (rev 47671)
@@ -39,8 +39,6 @@
 #endif
 
 
-/// @brief global state of the WorkScheduler.
-static WorkSchedulerState state;
 /// @brief list of all CPUDevices. for every hardware thread an instance of CPUDevice is created
 static vector<CPUDevice*> cpudevices;
 
@@ -68,43 +66,29 @@
 #if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
 void *WorkScheduler::thread_execute_cpu(void *data)
 {
-	bool continueLoop = true;
 	Device *device = (Device*)data;
-	while (continueLoop) {
-		WorkPackage *work = (WorkPackage*)BLI_thread_queue_pop(cpuqueue);
-		if (work) {
-			device->execute(work);
-			delete work;
-		}
-		PIL_sleep_ms(10);
-
-		if (WorkScheduler::isStopping()) {
-			continueLoop = false;
-		}
+	WorkPackage *work;
+	
+	while ((work = (WorkPackage*)BLI_thread_queue_pop(cpuqueue))) {
+		device->execute(work);
+		delete work;
 	}
+	
 	return NULL;
 }
 
 void *WorkScheduler::thread_execute_gpu(void *data)
 {
-	bool continueLoop = true;
 	Device *device = (Device*)data;
-	while (continueLoop) {
-		WorkPackage *work = (WorkPackage*)BLI_thread_queue_pop(gpuqueue);
-		if (work) {
-			device->execute(work);
-			delete work;
-		}
-		PIL_sleep_ms(10);
-
-		if (WorkScheduler::isStopping()) {
-			continueLoop = false;
-		}
+	WorkPackage *work;
+	
+	while ((work = (WorkPackage*)BLI_thread_queue_pop(gpuqueue))) {
+		device->execute(work);
+		delete work;
 	}
+	
 	return NULL;
 }
-
-bool WorkScheduler::isStopping() {return state == COM_WSS_STOPPING;}
 #endif
 
 
@@ -135,7 +119,6 @@
 #if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
 	unsigned int index;
 	cpuqueue = BLI_thread_queue_init();
-	BLI_thread_queue_nowait(cpuqueue);
 	BLI_init_threads(&cputhreads, thread_execute_cpu, cpudevices.size());
 	for (index = 0 ; index < cpudevices.size() ; index ++) {
 		Device *device = cpudevices[index];
@@ -144,7 +127,6 @@
 #ifdef COM_OPENCL_ENABLED
 	if (context.getHasActiveOpenCLDevices()) {
 		gpuqueue = BLI_thread_queue_init();
-		BLI_thread_queue_nowait(gpuqueue);
 		BLI_init_threads(&gputhreads, thread_execute_gpu, gpudevices.size());
 		for (index = 0 ; index < gpudevices.size() ; index ++) {
 			Device *device = gpudevices[index];
@@ -157,45 +139,39 @@
 	}
 #endif
 #endif
-	state = COM_WSS_STARTED;
 }
 void WorkScheduler::finish()
 {
 #if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
 #ifdef COM_OPENCL_ENABLED
 	if (openclActive) {
-		while (BLI_thread_queue_size(gpuqueue) + BLI_thread_queue_size(cpuqueue) > 0) {
-			PIL_sleep_ms(10);
-		}
+		BLI_thread_queue_wait_finish(gpuqueue);
+		BLI_thread_queue_wait_finish(cpuqueue);
 	}
 	else {
-		while (BLI_thread_queue_size(cpuqueue) > 0) {
-			PIL_sleep_ms(10);
-		}
+		BLI_thread_queue_wait_finish(cpuqueue);
 	}
 #else
-	while (BLI_thread_queue_size(cpuqueue) > 0) {
-		PIL_sleep_ms(10);
-	}
+	BLI_thread_queue_wait_finish(cpuqueue);
 #endif
 #endif
 }
 void WorkScheduler::stop()
 {
-	state = COM_WSS_STOPPING;
 #if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
+	BLI_thread_queue_nowait(cpuqueue);
 	BLI_end_threads(&cputhreads);
 	BLI_thread_queue_free(cpuqueue);
 	cpuqueue = NULL;
 #ifdef COM_OPENCL_ENABLED
 	if (openclActive) {
+		BLI_thread_queue_nowait(gpuqueue);
 		BLI_end_threads(&gputhreads);
 		BLI_thread_queue_free(gpuqueue);
 		gpuqueue = NULL;
 	}
 #endif
 #endif
-	state = COM_WSS_STOPPED;
 }
 
 bool WorkScheduler::hasGPUDevices()
@@ -218,8 +194,6 @@
 
 void WorkScheduler::initialize()
 {
-	state = COM_WSS_UNKNOWN;
-
 #if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
 	int numberOfCPUThreads = BLI_system_thread_count();
 
@@ -298,8 +272,6 @@
 	}
 #endif
 #endif
-
-	state = COM_WSS_INITIALIZED;
 }
 
 void WorkScheduler::deinitialize()
@@ -329,5 +301,4 @@
 	}
 #endif
 #endif
-	state = COM_WSS_DEINITIALIZED;
 }

Modified: trunk/blender/source/blender/compositor/intern/COM_WorkScheduler.h
===================================================================
--- trunk/blender/source/blender/compositor/intern/COM_WorkScheduler.h	2012-06-10 12:23:44 UTC (rev 47670)
+++ trunk/blender/source/blender/compositor/intern/COM_WorkScheduler.h	2012-06-10 12:26:33 UTC (rev 47671)
@@ -31,19 +31,6 @@
 #include "COM_defines.h"
 #include "COM_Device.h"
 
-// STATES
-/** @brief states of the WorkScheduler
-  * @ingroup execution
-  */
-typedef enum WorkSchedulerState {
-	COM_WSS_UNKNOWN = -1,
-	COM_WSS_INITIALIZED = 0,
-	COM_WSS_STARTED = 1,
-	COM_WSS_STOPPING = 2,
-	COM_WSS_STOPPED = 3,
-	COM_WSS_DEINITIALIZED = 4
-} WorkSchedulerState;
-
 /** @brief the workscheduler
   * @ingroup execution
   */




More information about the Bf-blender-cvs mailing list