changeset 90:c2f0ff9a5774

Started queues.c; backup
author Sebastien Decugis <sdecugis@nict.go.jp>
date Tue, 15 Jul 2008 17:26:58 +0900
parents e15ba31853ef
children f5576d74ef5d
files waaad/queues.c
diffstat 1 files changed, 119 insertions(+), 14 deletions(-) [+]
line wrap: on
line diff
--- a/waaad/queues.c	Tue Jul 15 16:55:41 2008 +0900
+++ b/waaad/queues.c	Tue Jul 15 17:26:58 2008 +0900
@@ -43,6 +43,7 @@
 #include <errno.h>
 #include <string.h>
 #include <stdlib.h>
+#include <assert.h>
 
 #include "waaad-internal.h"
 
@@ -55,19 +56,30 @@
 	msg_t		* msg;
 } _li_t;
 
+/* Initialize a list element */
 #define LI_init( _li, _msg ) {	\
 	(_li)->next = (_li);	\
 	(_li)->prev = (_li);	\
 	(_li)->msg  = (_msg);	\
 }
 
+/* Test if a list is empty */
 #define LI_isempty( _li ) ((_li)->next == (_li))
+
+/* Add an element in a list at the end */
+#define LI_push( _sentinel, _item) {		\
+	(_item)->prev = (_sentinel)->prev;	\
+	(_item)->next = (_sentinel);		\
+	(_sentinel)->prev->next = (_item);	\
+	(_sentinel)->prev = (_item);		\
+}
 	
 
 /* Internal representation of a queue */
 typedef struct {
 	int		eyec;	/* An eye catcher, also used to check a queue is valid. _MEQ_EYEC */
-	int		count;	/* number of objects in the list if >0, number of waiters if <0 */
+	int		count;	/* number of objects in the list */
+	int		thrs;	/* number of threads waiting in the list */
 	_li_t		list;	/* sentinel for the list */
 	pthread_mutex_t	mtx;	/* Mutex protecting this queue */
 	pthread_cond_t	cond;	/* condition variable of the list */
@@ -207,27 +219,120 @@
 		return ret;
 	}
 	
+	if ((_Q(queue)->count != 0) || (_Q(queue)->thrs != 0)) {
+		TRACE_DEBUG(INFO, "The queue count is %d, thread count %d, cannot destroy", _Q(queue)->count, _Q(queue)->thrs);
+		ret = pthread_mutex_unlock( &_Q(queue)->mtx );
+		if (ret != 0) {
+			TRACE_DEBUG(INFO, "pthread_mutex_unlock failed (ignored): %s", strerror(ret));
+		}
+		return EINVAL;
+	}
 	
+	/* sanity check */
+	assert(LI_isempty(&_Q(queue)->list));
+	
+	/* Ok, now invalidate the queue */
+	_Q(queue)->eyec = 0xdead;
+	
+	/* And destroy it */
+	ret = pthread_mutex_unlock( &_Q(queue)->mtx );
+	if (ret != 0) {
+		TRACE_DEBUG(INFO, "pthread_mutex_unlock failed: %s", strerror(ret));
+		return ret;
+	}
+	
+	ret = pthread_cond_destroy( &_Q(queue)->cond );
+	if (ret != 0) {
+		TRACE_DEBUG(INFO, "pthread_cond_destroy failed: %s", strerror(ret));
+		return ret;
+	}
+	
+	ret = pthread_mutex_destroy( &_Q(queue)->mtx );
+	if (ret != 0) {
+		TRACE_DEBUG(INFO, "pthread_mutex_destroy failed: %s", strerror(ret));
+		return ret;
+	}
+	
+	free(queue);
 	return 0;
 }
 
-
+/* Retrieve the number of messages of a queue */
 int meq_length ( meq_t * queue, int * length )
 {
+	int ret = 0;
+	
 	TRACE_ENTRY( "%p %p", queue, length );
-	TRACE_DEBUG (INFO, "@@@ %s: not implemented yet.", __FUNCTION__ );
+	
+	/* Check the parameters */
+	if ( (! CHECK_QUEUE( queue )) || (length == NULL) ) {
+		TRACE_DEBUG(INFO, "Invalid parameter");
+		return EINVAL;
+	}
+	
+	/* lock the queue */
+	ret = pthread_mutex_lock( &_Q(queue)->mtx );
+	if (ret != 0) {
+		TRACE_DEBUG(INFO, "pthread_mutex_lock failed: %s", strerror(ret));
+		return ret;
+	}
+	
+	/* Retrieve the count */
+	*length = _Q(queue)->count;
+	
+	/* Unlock */
+	ret = pthread_mutex_unlock( &_Q(queue)->mtx );
+	if (ret != 0) {
+		TRACE_DEBUG(INFO, "pthread_mutex_unlock failed: %s", strerror(ret));
+		return ret;
+	}
+	
+	/* Done */
+	return 0;
+}
+
+/* Enqueue a message */
+int meq_post ( meq_t * queue, msg_t * msg )
+{
+	_li_t * new;
+	int ret = 0;
+	
+	TRACE_ENTRY( "%p %p", queue, msg );
+	
+	/* Check the parameters */
+	if ( (! CHECK_QUEUE( queue )) || (msg == NULL) ) {
+		TRACE_DEBUG(INFO, "Invalid parameter");
+		return EINVAL;
+	}
+	
+	/* Create a new list item */
+	new = (_li_t *) malloc (sizeof (_li_t));
+	if (new == NULL) {
+		log_error("Memory allocation failed: %s", strerror(errno));
+		TRACE_DEBUG(INFO, "malloc failed");
+		return ENOMEM;
+	}
+	
+	LI_init(new, msg);
+	
+	/* lock the queue */
+	ret = pthread_mutex_lock( &_Q(queue)->mtx );
+	if (ret != 0) {
+		TRACE_DEBUG(INFO, "pthread_mutex_lock failed: %s", strerror(ret));
+		return ret;
+	}
+	
+	/* Add the new message */
+	LI_push( &_Q(queue)->list, new);
+	
+	/* Signal if threads are asleep */
+#warning "to complete..."
+	
+	
 	return ENOTSUP;
 }
 
-
-int meq_post ( meq_t * queue, msg_t * msg )
-{
-	TRACE_ENTRY( "%p %p", queue, msg );
-	TRACE_DEBUG (INFO, "@@@ %s: not implemented yet.", __FUNCTION__ );
-	return ENOTSUP;
-}
-
-
+/* Retrieve a message */
 int meq_get ( meq_t * queue, msg_t ** msg )
 {
 	TRACE_ENTRY( "%p %p", queue, msg );
@@ -235,7 +340,7 @@
 	return ENOTSUP;
 }
 
-
+/* Retrieve a message, if any */
 int meq_tryget ( meq_t * queue, msg_t ** msg )
 {
 	TRACE_ENTRY( "%p %p", queue, msg );
@@ -243,7 +348,7 @@
 	return ENOTSUP;
 }
 
-
+/* Retrieve a message, if any. wait only a certain time */
 int meq_timedget ( meq_t * queue, msg_t ** msg, const struct timespec *abstime )
 {
 	TRACE_ENTRY( "%p %p %p", queue, msg, abstime );
"Welcome to our mercurial repository"