[Bf-blender-cvs] SVN commit: /data/svn/bf-blender [26190] trunk/blender/source/blender/ blenlib: Threads: added queue for passing data between threads.

Brecht Van Lommel brecht at blender.org
Fri Jan 22 12:06:58 CET 2010


Revision: 26190
          http://projects.blender.org/plugins/scmsvn/viewcvs.php?view=rev&root=bf-blender&revision=26190
Author:   blendix
Date:     2010-01-22 12:06:57 +0100 (Fri, 22 Jan 2010)

Log Message:
-----------
Threads: added queue for passing data between threads. Includes a function
to wait for an item to be put in the queue and then pop immediately without,
this makes it possible to avoid sleep() while waiting for the results of a
thread.

Modified Paths:
--------------
    trunk/blender/source/blender/blenlib/BLI_gsqueue.h
    trunk/blender/source/blender/blenlib/BLI_threads.h
    trunk/blender/source/blender/blenlib/intern/threads.c

Modified: trunk/blender/source/blender/blenlib/BLI_gsqueue.h
===================================================================
--- trunk/blender/source/blender/blenlib/BLI_gsqueue.h	2010-01-22 11:03:55 UTC (rev 26189)
+++ trunk/blender/source/blender/blenlib/BLI_gsqueue.h	2010-01-22 11:06:57 UTC (rev 26190)
@@ -49,6 +49,11 @@
 int			BLI_gsqueue_is_empty(GSQueue *gq);
 
 	/**
+	 * Query number elements in the queue
+	 */
+int			BLI_gsqueue_size(GSQueue *gq);
+
+	/**
 	 * Access the item at the head of the queue
 	 * without removing it.
 	 * 

Modified: trunk/blender/source/blender/blenlib/BLI_threads.h
===================================================================
--- trunk/blender/source/blender/blenlib/BLI_threads.h	2010-01-22 11:03:55 UTC (rev 26189)
+++ trunk/blender/source/blender/blenlib/BLI_threads.h	2010-01-22 11:06:57 UTC (rev 26190)
@@ -107,6 +107,21 @@
  * NOTE: inserting work is NOT thread safe, so make sure it is only done from one thread */
 void BLI_insert_work(struct ThreadedWorker *worker, void *param);
 
+/* ThreadWorkQueue
+ *
+ * Thread-safe work queue to push work/pointers between threads. */
 
+typedef struct ThreadQueue ThreadQueue;
+
+ThreadQueue *BLI_thread_queue_init();
+void BLI_thread_queue_free(ThreadQueue *queue);
+
+void BLI_thread_queue_push(ThreadQueue *queue, void *work);
+void *BLI_thread_queue_pop(ThreadQueue *queue);
+void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms);
+int BLI_thread_queue_size(ThreadQueue *queue);
+
+void BLI_thread_queue_nowait(ThreadQueue *queue);
+
 #endif
 

Modified: trunk/blender/source/blender/blenlib/intern/threads.c
===================================================================
--- trunk/blender/source/blender/blenlib/intern/threads.c	2010-01-22 11:03:55 UTC (rev 26189)
+++ trunk/blender/source/blender/blenlib/intern/threads.c	2010-01-22 11:06:57 UTC (rev 26190)
@@ -32,12 +32,14 @@
 #include <stdlib.h>
 #include <string.h>
 #include <pthread.h>
+#include <errno.h>
 
 #include "MEM_guardedalloc.h"
 
 #include "DNA_listBase.h"
 
 #include "BLI_blenlib.h"
+#include "BLI_gsqueue.h"
 #include "BLI_threads.h"
 
 #include "PIL_time.h"
@@ -458,4 +460,132 @@
 	BLI_insert_thread(&worker->threadbase, p);
 }
 
-/* eof */
+/* ************************************************ */
+
+struct ThreadQueue {
+	GSQueue *queue;
+	pthread_mutex_t mutex;
+	pthread_cond_t cond;
+	int nowait;
+};
+
+ThreadQueue *BLI_thread_queue_init()
+{
+	ThreadQueue *queue;
+
+	queue= MEM_callocN(sizeof(ThreadQueue), "ThreadQueue");
+	queue->queue= BLI_gsqueue_new(sizeof(void*));
+
+	pthread_mutex_init(&queue->mutex, NULL);
+	pthread_cond_init(&queue->cond, NULL);
+
+	return queue;
+}
+
+void BLI_thread_queue_free(ThreadQueue *queue)
+{
+	pthread_cond_destroy(&queue->cond);
+	pthread_mutex_destroy(&queue->mutex);
+
+	BLI_gsqueue_free(queue->queue);
+
+	MEM_freeN(queue);
+}
+
+void BLI_thread_queue_push(ThreadQueue *queue, void *work)
+{
+	pthread_mutex_lock(&queue->mutex);
+
+	BLI_gsqueue_push(queue->queue, &work);
+
+	/* signal threads waiting to pop */
+	pthread_cond_signal(&queue->cond);
+	pthread_mutex_unlock(&queue->mutex);
+}
+
+void *BLI_thread_queue_pop(ThreadQueue *queue)
+{
+	void *work= NULL;
+
+	/* wait until there is work */
+	pthread_mutex_lock(&queue->mutex);
+	while(BLI_gsqueue_is_empty(queue->queue) && !queue->nowait)
+		pthread_cond_wait(&queue->cond, &queue->mutex);
+
+	/* if we have something, pop it */
+	if(!BLI_gsqueue_is_empty(queue->queue))
+		BLI_gsqueue_pop(queue->queue, &work);
+
+	pthread_mutex_unlock(&queue->mutex);
+
+	return work;
+}
+
+static void wait_timeout(struct timespec *timeout, int ms)
+{
+	struct timeval now;
+	ldiv_t div_result;
+	long x;
+
+	gettimeofday(&now, NULL);
+	div_result = ldiv(ms, 1000);
+	timeout->tv_sec = now.tv_sec + div_result.quot;
+	x = now.tv_usec + (div_result.rem*1000);
+
+	if (x >= 1000000) {
+		timeout->tv_sec++;
+		x -= 1000000;
+	}
+
+	timeout->tv_nsec = x*1000;
+}
+
+void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms)
+{
+	double t;
+	void *work= NULL;
+	struct timespec timeout;
+
+	t= PIL_check_seconds_timer();
+	wait_timeout(&timeout, ms);
+
+	/* wait until there is work */
+	pthread_mutex_lock(&queue->mutex);
+	while(BLI_gsqueue_is_empty(queue->queue) && !queue->nowait) {
+		if(pthread_cond_timedwait(&queue->cond, &queue->mutex, &timeout) == ETIMEDOUT)
+			break;
+		else if(PIL_check_seconds_timer() - t >= ms*0.001)
+			break;
+	}
+
+	/* if we have something, pop it */
+	if(!BLI_gsqueue_is_empty(queue->queue))
+		BLI_gsqueue_pop(queue->queue, &work);
+
+	pthread_mutex_unlock(&queue->mutex);
+
+	return work;
+}
+
+int BLI_thread_queue_size(ThreadQueue *queue)
+{
+	int size;
+
+	pthread_mutex_lock(&queue->mutex);
+	size= BLI_gsqueue_size(queue->queue);
+	pthread_mutex_unlock(&queue->mutex);
+
+	return size;
+}
+
+void BLI_thread_queue_nowait(ThreadQueue *queue)
+{
+	pthread_mutex_lock(&queue->mutex);
+
+	queue->nowait= 1;
+
+	/* signal threads waiting to pop */
+	pthread_cond_signal(&queue->cond);
+	pthread_mutex_unlock(&queue->mutex);
+}
+





More information about the Bf-blender-cvs mailing list