[Bf-blender-cvs] SVN commit: /data/svn/bf-blender [47671] trunk/blender/source/blender: Applied and completed a compositor patch by Brecht to use signalling and waiting in scheduling and worker threads instead of continuous loops with sleep times .
Lukas Toenne
lukas.toenne at googlemail.com
Sun Jun 10 14:26:34 CEST 2012
Revision: 47671
http://projects.blender.org/scm/viewvc.php?view=rev&root=bf-blender&revision=47671
Author: lukastoenne
Date: 2012-06-10 12:26:33 +0000 (Sun, 10 Jun 2012)
Log Message:
-----------
Applied and completed a compositor patch by Brecht to use signalling and waiting in scheduling and worker threads instead of continuous loops with sleep times. This should help reduce unnecessary wait times in Tile.
Modified Paths:
--------------
trunk/blender/source/blender/blenlib/BLI_threads.h
trunk/blender/source/blender/blenlib/intern/threads.c
trunk/blender/source/blender/compositor/intern/COM_ExecutionGroup.cpp
trunk/blender/source/blender/compositor/intern/COM_WorkScheduler.cpp
trunk/blender/source/blender/compositor/intern/COM_WorkScheduler.h
Modified: trunk/blender/source/blender/blenlib/BLI_threads.h
===================================================================
--- trunk/blender/source/blender/blenlib/BLI_threads.h 2012-06-10 12:23:44 UTC (rev 47670)
+++ trunk/blender/source/blender/blenlib/BLI_threads.h 2012-06-10 12:26:33 UTC (rev 47671)
@@ -136,6 +136,7 @@
void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms);
int BLI_thread_queue_size(ThreadQueue *queue);
+void BLI_thread_queue_wait_finish(ThreadQueue *queue);
void BLI_thread_queue_nowait(ThreadQueue *queue);
#endif
Modified: trunk/blender/source/blender/blenlib/intern/threads.c
===================================================================
--- trunk/blender/source/blender/blenlib/intern/threads.c 2012-06-10 12:23:44 UTC (rev 47670)
+++ trunk/blender/source/blender/blenlib/intern/threads.c 2012-06-10 12:26:33 UTC (rev 47671)
@@ -520,8 +520,10 @@
struct ThreadQueue {
GSQueue *queue;
pthread_mutex_t mutex;
- pthread_cond_t cond;
- int nowait;
+ pthread_cond_t push_cond;
+ pthread_cond_t finish_cond;
+ volatile int nowait;
+ volatile int cancelled;
};
ThreadQueue *BLI_thread_queue_init(void)
@@ -532,14 +534,17 @@
queue->queue = BLI_gsqueue_new(sizeof(void *));
pthread_mutex_init(&queue->mutex, NULL);
- pthread_cond_init(&queue->cond, NULL);
+ pthread_cond_init(&queue->push_cond, NULL);
+ pthread_cond_init(&queue->finish_cond, NULL);
return queue;
}
void BLI_thread_queue_free(ThreadQueue *queue)
{
- pthread_cond_destroy(&queue->cond);
+ /* destroy everything, assumes no one is using queue anymore */
+ pthread_cond_destroy(&queue->finish_cond);
+ pthread_cond_destroy(&queue->push_cond);
pthread_mutex_destroy(&queue->mutex);
BLI_gsqueue_free(queue->queue);
@@ -554,7 +559,7 @@
BLI_gsqueue_push(queue->queue, &work);
/* signal threads waiting to pop */
- pthread_cond_signal(&queue->cond);
+ pthread_cond_signal(&queue->push_cond);
pthread_mutex_unlock(&queue->mutex);
}
@@ -565,11 +570,15 @@
/* wait until there is work */
pthread_mutex_lock(&queue->mutex);
while (BLI_gsqueue_is_empty(queue->queue) && !queue->nowait)
- pthread_cond_wait(&queue->cond, &queue->mutex);
-
+ pthread_cond_wait(&queue->push_cond, &queue->mutex);
+
/* if we have something, pop it */
- if (!BLI_gsqueue_is_empty(queue->queue))
+ if (!BLI_gsqueue_is_empty(queue->queue)) {
BLI_gsqueue_pop(queue->queue, &work);
+
+ if(BLI_gsqueue_is_empty(queue->queue))
+ pthread_cond_broadcast(&queue->finish_cond);
+ }
pthread_mutex_unlock(&queue->mutex);
@@ -623,16 +632,20 @@
/* wait until there is work */
pthread_mutex_lock(&queue->mutex);
while (BLI_gsqueue_is_empty(queue->queue) && !queue->nowait) {
- if (pthread_cond_timedwait(&queue->cond, &queue->mutex, &timeout) == ETIMEDOUT)
+ if (pthread_cond_timedwait(&queue->push_cond, &queue->mutex, &timeout) == ETIMEDOUT)
break;
else if (PIL_check_seconds_timer() - t >= ms * 0.001)
break;
}
/* if we have something, pop it */
- if (!BLI_gsqueue_is_empty(queue->queue))
+ if (!BLI_gsqueue_is_empty(queue->queue)) {
BLI_gsqueue_pop(queue->queue, &work);
-
+
+ if(BLI_gsqueue_is_empty(queue->queue))
+ pthread_cond_broadcast(&queue->finish_cond);
+ }
+
pthread_mutex_unlock(&queue->mutex);
return work;
@@ -656,10 +669,23 @@
queue->nowait = 1;
/* signal threads waiting to pop */
- pthread_cond_signal(&queue->cond);
+ pthread_cond_broadcast(&queue->push_cond);
pthread_mutex_unlock(&queue->mutex);
}
+void BLI_thread_queue_wait_finish(ThreadQueue *queue)
+{
+ /* wait for finish condition */
+ pthread_mutex_lock(&queue->mutex);
+
+ while(!BLI_gsqueue_is_empty(queue->queue))
+ pthread_cond_wait(&queue->finish_cond, &queue->mutex);
+
+ pthread_mutex_unlock(&queue->mutex);
+}
+
+/* ************************************************ */
+
void BLI_begin_threaded_malloc(void)
{
if (thread_levels == 0) {
@@ -674,3 +700,4 @@
if (thread_levels == 0)
MEM_set_lock_callback(NULL, NULL);
}
+
Modified: trunk/blender/source/blender/compositor/intern/COM_ExecutionGroup.cpp
===================================================================
--- trunk/blender/source/blender/compositor/intern/COM_ExecutionGroup.cpp 2012-06-10 12:23:44 UTC (rev 47670)
+++ trunk/blender/source/blender/compositor/intern/COM_ExecutionGroup.cpp 2012-06-10 12:26:33 UTC (rev 47671)
@@ -351,8 +351,9 @@
startIndex = index+1;
}
}
- PIL_sleep_ms(10);
+ WorkScheduler::finish();
+
if (bTree->test_break && bTree->test_break(bTree->tbh)) {
breaked = true;
}
Modified: trunk/blender/source/blender/compositor/intern/COM_WorkScheduler.cpp
===================================================================
--- trunk/blender/source/blender/compositor/intern/COM_WorkScheduler.cpp 2012-06-10 12:23:44 UTC (rev 47670)
+++ trunk/blender/source/blender/compositor/intern/COM_WorkScheduler.cpp 2012-06-10 12:26:33 UTC (rev 47671)
@@ -39,8 +39,6 @@
#endif
-/// @brief global state of the WorkScheduler.
-static WorkSchedulerState state;
/// @brief list of all CPUDevices. for every hardware thread an instance of CPUDevice is created
static vector<CPUDevice*> cpudevices;
@@ -68,43 +66,29 @@
#if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
void *WorkScheduler::thread_execute_cpu(void *data)
{
- bool continueLoop = true;
Device *device = (Device*)data;
- while (continueLoop) {
- WorkPackage *work = (WorkPackage*)BLI_thread_queue_pop(cpuqueue);
- if (work) {
- device->execute(work);
- delete work;
- }
- PIL_sleep_ms(10);
-
- if (WorkScheduler::isStopping()) {
- continueLoop = false;
- }
+ WorkPackage *work;
+
+ while ((work = (WorkPackage*)BLI_thread_queue_pop(cpuqueue))) {
+ device->execute(work);
+ delete work;
}
+
return NULL;
}
void *WorkScheduler::thread_execute_gpu(void *data)
{
- bool continueLoop = true;
Device *device = (Device*)data;
- while (continueLoop) {
- WorkPackage *work = (WorkPackage*)BLI_thread_queue_pop(gpuqueue);
- if (work) {
- device->execute(work);
- delete work;
- }
- PIL_sleep_ms(10);
-
- if (WorkScheduler::isStopping()) {
- continueLoop = false;
- }
+ WorkPackage *work;
+
+ while ((work = (WorkPackage*)BLI_thread_queue_pop(gpuqueue))) {
+ device->execute(work);
+ delete work;
}
+
return NULL;
}
-
-bool WorkScheduler::isStopping() {return state == COM_WSS_STOPPING;}
#endif
@@ -135,7 +119,6 @@
#if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
unsigned int index;
cpuqueue = BLI_thread_queue_init();
- BLI_thread_queue_nowait(cpuqueue);
BLI_init_threads(&cputhreads, thread_execute_cpu, cpudevices.size());
for (index = 0 ; index < cpudevices.size() ; index ++) {
Device *device = cpudevices[index];
@@ -144,7 +127,6 @@
#ifdef COM_OPENCL_ENABLED
if (context.getHasActiveOpenCLDevices()) {
gpuqueue = BLI_thread_queue_init();
- BLI_thread_queue_nowait(gpuqueue);
BLI_init_threads(&gputhreads, thread_execute_gpu, gpudevices.size());
for (index = 0 ; index < gpudevices.size() ; index ++) {
Device *device = gpudevices[index];
@@ -157,45 +139,39 @@
}
#endif
#endif
- state = COM_WSS_STARTED;
}
void WorkScheduler::finish()
{
#if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
#ifdef COM_OPENCL_ENABLED
if (openclActive) {
- while (BLI_thread_queue_size(gpuqueue) + BLI_thread_queue_size(cpuqueue) > 0) {
- PIL_sleep_ms(10);
- }
+ BLI_thread_queue_wait_finish(gpuqueue);
+ BLI_thread_queue_wait_finish(cpuqueue);
}
else {
- while (BLI_thread_queue_size(cpuqueue) > 0) {
- PIL_sleep_ms(10);
- }
+ BLI_thread_queue_wait_finish(cpuqueue);
}
#else
- while (BLI_thread_queue_size(cpuqueue) > 0) {
- PIL_sleep_ms(10);
- }
+ BLI_thread_queue_wait_finish(cpuqueue);
#endif
#endif
}
void WorkScheduler::stop()
{
- state = COM_WSS_STOPPING;
#if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
+ BLI_thread_queue_nowait(cpuqueue);
BLI_end_threads(&cputhreads);
BLI_thread_queue_free(cpuqueue);
cpuqueue = NULL;
#ifdef COM_OPENCL_ENABLED
if (openclActive) {
+ BLI_thread_queue_nowait(gpuqueue);
BLI_end_threads(&gputhreads);
BLI_thread_queue_free(gpuqueue);
gpuqueue = NULL;
}
#endif
#endif
- state = COM_WSS_STOPPED;
}
bool WorkScheduler::hasGPUDevices()
@@ -218,8 +194,6 @@
void WorkScheduler::initialize()
{
- state = COM_WSS_UNKNOWN;
-
#if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
int numberOfCPUThreads = BLI_system_thread_count();
@@ -298,8 +272,6 @@
}
#endif
#endif
-
- state = COM_WSS_INITIALIZED;
}
void WorkScheduler::deinitialize()
@@ -329,5 +301,4 @@
}
#endif
#endif
- state = COM_WSS_DEINITIALIZED;
}
Modified: trunk/blender/source/blender/compositor/intern/COM_WorkScheduler.h
===================================================================
--- trunk/blender/source/blender/compositor/intern/COM_WorkScheduler.h 2012-06-10 12:23:44 UTC (rev 47670)
+++ trunk/blender/source/blender/compositor/intern/COM_WorkScheduler.h 2012-06-10 12:26:33 UTC (rev 47671)
@@ -31,19 +31,6 @@
#include "COM_defines.h"
#include "COM_Device.h"
-// STATES
-/** @brief states of the WorkScheduler
- * @ingroup execution
- */
-typedef enum WorkSchedulerState {
- COM_WSS_UNKNOWN = -1,
- COM_WSS_INITIALIZED = 0,
- COM_WSS_STARTED = 1,
- COM_WSS_STOPPING = 2,
- COM_WSS_STOPPED = 3,
- COM_WSS_DEINITIALIZED = 4
-} WorkSchedulerState;
-
/** @brief the workscheduler
* @ingroup execution
*/
More information about the Bf-blender-cvs
mailing list