Mercurial > hg > waaad
changeset 90:c2f0ff9a5774
Started queues.c; backup
author | Sebastien Decugis <sdecugis@nict.go.jp> |
---|---|
date | Tue, 15 Jul 2008 17:26:58 +0900 |
parents | e15ba31853ef |
children | f5576d74ef5d |
files | waaad/queues.c |
diffstat | 1 files changed, 119 insertions(+), 14 deletions(-) [+] |
line wrap: on
line diff
--- a/waaad/queues.c Tue Jul 15 16:55:41 2008 +0900 +++ b/waaad/queues.c Tue Jul 15 17:26:58 2008 +0900 @@ -43,6 +43,7 @@ #include <errno.h> #include <string.h> #include <stdlib.h> +#include <assert.h> #include "waaad-internal.h" @@ -55,19 +56,30 @@ msg_t * msg; } _li_t; +/* Initialize a list element */ #define LI_init( _li, _msg ) { \ (_li)->next = (_li); \ (_li)->prev = (_li); \ (_li)->msg = (_msg); \ } +/* Test if a list is empty */ #define LI_isempty( _li ) ((_li)->next == (_li)) + +/* Add an element in a list at the end */ +#define LI_push( _sentinel, _item) { \ + (_item)->prev = (_sentinel)->prev; \ + (_item)->next = (_sentinel); \ + (_sentinel)->prev->next = (_item); \ + (_sentinel)->prev = (_item); \ +} /* Internal representation of a queue */ typedef struct { int eyec; /* An eye catcher, also used to check a queue is valid. _MEQ_EYEC */ - int count; /* number of objects in the list if >0, number of waiters if <0 */ + int count; /* number of objects in the list */ + int thrs; /* number of threads waiting in the list */ _li_t list; /* sentinel for the list */ pthread_mutex_t mtx; /* Mutex protecting this queue */ pthread_cond_t cond; /* condition variable of the list */ @@ -207,27 +219,120 @@ return ret; } + if ((_Q(queue)->count != 0) || (_Q(queue)->thrs != 0)) { + TRACE_DEBUG(INFO, "The queue count is %d, thread count %d, cannot destroy", _Q(queue)->count, _Q(queue)->thrs); + ret = pthread_mutex_unlock( &_Q(queue)->mtx ); + if (ret != 0) { + TRACE_DEBUG(INFO, "pthread_mutex_unlock failed (ignored): %s", strerror(ret)); + } + return EINVAL; + } + /* sanity check */ + assert(LI_isempty(&_Q(queue)->list)); + + /* Ok, now invalidate the queue */ + _Q(queue)->eyec = 0xdead; + + /* And destroy it */ + ret = pthread_mutex_unlock( &_Q(queue)->mtx ); + if (ret != 0) { + TRACE_DEBUG(INFO, "pthread_mutex_unlock failed: %s", strerror(ret)); + return ret; + } + + ret = pthread_cond_destroy( &_Q(queue)->cond ); + if (ret != 0) { + TRACE_DEBUG(INFO, "pthread_cond_destroy failed: %s", strerror(ret)); + return ret; + } + + ret = pthread_mutex_destroy( &_Q(queue)->mtx ); + if (ret != 0) { + TRACE_DEBUG(INFO, "pthread_mutex_destroy failed: %s", strerror(ret)); + return ret; + } + + free(queue); return 0; } - +/* Retrieve the number of messages of a queue */ int meq_length ( meq_t * queue, int * length ) { + int ret = 0; + TRACE_ENTRY( "%p %p", queue, length ); - TRACE_DEBUG (INFO, "@@@ %s: not implemented yet.", __FUNCTION__ ); + + /* Check the parameters */ + if ( (! CHECK_QUEUE( queue )) || (length == NULL) ) { + TRACE_DEBUG(INFO, "Invalid parameter"); + return EINVAL; + } + + /* lock the queue */ + ret = pthread_mutex_lock( &_Q(queue)->mtx ); + if (ret != 0) { + TRACE_DEBUG(INFO, "pthread_mutex_lock failed: %s", strerror(ret)); + return ret; + } + + /* Retrieve the count */ + *length = _Q(queue)->count; + + /* Unlock */ + ret = pthread_mutex_unlock( &_Q(queue)->mtx ); + if (ret != 0) { + TRACE_DEBUG(INFO, "pthread_mutex_unlock failed: %s", strerror(ret)); + return ret; + } + + /* Done */ + return 0; +} + +/* Enqueue a message */ +int meq_post ( meq_t * queue, msg_t * msg ) +{ + _li_t * new; + int ret = 0; + + TRACE_ENTRY( "%p %p", queue, msg ); + + /* Check the parameters */ + if ( (! CHECK_QUEUE( queue )) || (msg == NULL) ) { + TRACE_DEBUG(INFO, "Invalid parameter"); + return EINVAL; + } + + /* Create a new list item */ + new = (_li_t *) malloc (sizeof (_li_t)); + if (new == NULL) { + log_error("Memory allocation failed: %s", strerror(errno)); + TRACE_DEBUG(INFO, "malloc failed"); + return ENOMEM; + } + + LI_init(new, msg); + + /* lock the queue */ + ret = pthread_mutex_lock( &_Q(queue)->mtx ); + if (ret != 0) { + TRACE_DEBUG(INFO, "pthread_mutex_lock failed: %s", strerror(ret)); + return ret; + } + + /* Add the new message */ + LI_push( &_Q(queue)->list, new); + + /* Signal if threads are asleep */ +#warning "to complete..." + + return ENOTSUP; } - -int meq_post ( meq_t * queue, msg_t * msg ) -{ - TRACE_ENTRY( "%p %p", queue, msg ); - TRACE_DEBUG (INFO, "@@@ %s: not implemented yet.", __FUNCTION__ ); - return ENOTSUP; -} - - +/* Retrieve a message */ int meq_get ( meq_t * queue, msg_t ** msg ) { TRACE_ENTRY( "%p %p", queue, msg ); @@ -235,7 +340,7 @@ return ENOTSUP; } - +/* Retrieve a message, if any */ int meq_tryget ( meq_t * queue, msg_t ** msg ) { TRACE_ENTRY( "%p %p", queue, msg ); @@ -243,7 +348,7 @@ return ENOTSUP; } - +/* Retrieve a message, if any. wait only a certain time */ int meq_timedget ( meq_t * queue, msg_t ** msg, const struct timespec *abstime ) { TRACE_ENTRY( "%p %p %p", queue, msg, abstime );