# HG changeset patch # User Sebastien Decugis # Date 1216191593 -32400 # Node ID 9862eceddf8d073283d5180c2019144dff165adf # Parent be49e121160ea7ab941ffafe449995be89d4a79d Added the queues source code and test diff -r be49e121160e -r 9862eceddf8d waaad/queues.c --- a/waaad/queues.c Wed Jul 16 14:57:31 2008 +0900 +++ b/waaad/queues.c Wed Jul 16 15:59:53 2008 +0900 @@ -37,6 +37,15 @@ * * This module provides the support functions for queueing and picking messages. * See queues.h for more information. + * + * The threads that call these functions must be in the cancellation state PTHREAD_CANCEL_ENABLE and type PTHREAD_CANCEL_DEFERRED. + * This is the default state and type on thread creation. + * + * In order to destroy properly a queue, the application must: + * -> shutdown any process that can add into the queue first. + * -> pthread_cancel any thread that could be waiting on the queue. + * -> consume any message that is in the queue, using meq_tryget. + * -> then destroy the queue using meq_del. */ #include @@ -66,6 +75,14 @@ /* Test if a list is empty */ #define LI_isempty( _li ) ((_li)->next == (_li)) +/* Unlink an element */ +#define LI_unlink( _li_ ) { \ + (_li_)->prev->next = (_li_)->next; \ + (_li_)->next->prev = (_li_)->prev; \ + (_li_)->next = (_li_); \ + (_li_)->prev = (_li_); \ +} + /* Add an element in a list at the end */ #define LI_push( _sentinel, _item) { \ (_item)->prev = (_sentinel)->prev; \ @@ -73,6 +90,13 @@ (_sentinel)->prev->next = (_item); \ (_sentinel)->prev = (_item); \ } + +/* Remove an element from the head of a list */ +#define LI_pop( _sentinel, _item) { \ + (_item) = (_sentinel)->next; \ + assert((_item) != (_sentinel)); \ + LI_unlink(_item); \ +} /* Internal representation of a queue */ @@ -324,36 +348,206 @@ /* Add the new message */ LI_push( &_Q(queue)->list, new); + _Q(queue)->count++; /* Signal if threads are asleep */ -#warning "to complete..." - + if (_Q(queue)->thrs > 0) { + ret = pthread_cond_signal(&_Q(queue)->cond); + if (ret != 0) { + TRACE_DEBUG(INFO, "pthread_cond_signal failed: %s", strerror(ret)); + return ret; + } + } - return ENOTSUP; -} - -/* Retrieve a message */ -int meq_get ( meq_t * queue, msg_t ** msg ) -{ - TRACE_ENTRY( "%p %p", queue, msg ); - TRACE_DEBUG (INFO, "@@@ %s: not implemented yet.", __FUNCTION__ ); - return ENOTSUP; + /* Unlock */ + ret = pthread_mutex_unlock( &_Q(queue)->mtx ); + if (ret != 0) { + TRACE_DEBUG(INFO, "pthread_mutex_unlock failed: %s", strerror(ret)); + return ret; + } + + /* Done */ + return 0; } /* Retrieve a message, if any */ int meq_tryget ( meq_t * queue, msg_t ** msg ) { + _li_t * li; + int ret = 0; + int wouldblock = 0; + TRACE_ENTRY( "%p %p", queue, msg ); - TRACE_DEBUG (INFO, "@@@ %s: not implemented yet.", __FUNCTION__ ); - return ENOTSUP; + + /* Check the parameters */ + if ( (! CHECK_QUEUE( queue )) || (msg == NULL) ) { + TRACE_DEBUG(INFO, "Invalid parameter"); + return EINVAL; + } + + /* lock the queue */ + ret = pthread_mutex_lock( &_Q(queue)->mtx ); + if (ret != 0) { + TRACE_DEBUG(INFO, "pthread_mutex_lock failed: %s", strerror(ret)); + return ret; + } + + /* Check queue status */ + if (_Q(queue)->count > 0) { + /* There are messages in the queue, so pick the first one */ + LI_pop( &_Q(queue)->list, li ); + _Q(queue)->count--; + *msg = li->msg; + free(li); + } else { + wouldblock = 1; + *msg = NULL; + } + + /* Unlock */ + ret = pthread_mutex_unlock( &_Q(queue)->mtx ); + if (ret != 0) { + TRACE_DEBUG(INFO, "pthread_mutex_unlock failed: %s", strerror(ret)); + return ret; + } + + /* Done */ + return wouldblock ? EWOULDBLOCK : 0; +} + +/* This handler is called when a thread is blocked on a queue, and cancelled */ +static void _meq_cleanup(void * queue) +{ + int ret; + + TRACE_ENTRY( "%p", queue ); + + /* Check the parameter */ + if ( ! CHECK_QUEUE( queue )) { + TRACE_DEBUG(INFO, "Invalid queue, skipping handler"); + return; + } + + /* The thread has been cancelled, therefore it does not wait on the queue anymore */ + _Q(queue)->thrs--; + + /* Now unlock the queue, and we're done */ + ret = pthread_mutex_unlock( &_Q(queue)->mtx ); + if (ret != 0) { + TRACE_DEBUG(INFO, "pthread_mutex_unlock failed: %s", strerror(ret)); + } + + /* End of cleanup handler */ + return; +} + +/* The internal function for meq_timedget and meq_get */ +static int _meq_tget ( _meq_t * queue, msg_t ** msg, int istimed, const struct timespec *abstime) +{ + int ret = 0; + int timedout = 0; + _li_t * li; + + /* Check the parameters */ + if ( (! CHECK_QUEUE( queue )) || (msg == NULL) || ( istimed && (abstime == NULL)) ) { + TRACE_DEBUG(INFO, "Invalid parameter"); + return EINVAL; + } + + /* Check the thread cancellation state */ + { + int oldstate; + ret = pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate); + if (ret != 0) { + TRACE_DEBUG(INFO, "pthread_setcancelstate failed: %s", strerror(ret)); + return ret; + } + if (oldstate != PTHREAD_CANCEL_ENABLE) { + (void) pthread_setcancelstate(oldstate, &ret); + TRACE_DEBUG(INFO, "Invalid thread cancellation state to call this function"); + return EINVAL; + } + } + /* Check the thread cancellation type */ + { + int oldtype; + ret = pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &oldtype); + if (ret != 0) { + TRACE_DEBUG(INFO, "pthread_setcanceltype failed: %s", strerror(ret)); + return ret; + } + if (oldtype != PTHREAD_CANCEL_DEFERRED) { + (void) pthread_setcanceltype(oldtype, &ret); + TRACE_DEBUG(INFO, "Invalid thread cancellation type to call this function"); + return EINVAL; + } + } + + /* Initialize the msg value */ + *msg = NULL; + + /* lock the queue */ + ret = pthread_mutex_lock( &queue->mtx ); + if (ret != 0) { + TRACE_DEBUG(INFO, "pthread_mutex_lock failed: %s", strerror(ret)); + return ret; + } + +awaken: + /* Check queue status */ + if (queue->count > 0) { + /* There are messages in the queue, so pick the first one */ + LI_pop( &queue->list, li ); + queue->count--; + *msg = li->msg; + free(li); + } else { + /* We have to wait for a new message */ + queue->thrs++ ; + pthread_cleanup_push(_meq_cleanup, queue); + if (istimed) { + ret = pthread_cond_timedwait( &queue->cond, &queue->mtx, abstime ); + } else { + ret = pthread_cond_wait( &queue->cond, &queue->mtx ); + } + pthread_cleanup_pop(0); + queue->thrs-- ; + if (ret == 0) + goto awaken; /* test for spurious wake-ups */ + + if (istimed && (ret == ETIMEDOUT)) { + timedout = 1; + } else { + /* Unexpected error condition */ + TRACE_DEBUG(INFO, "pthread_cond_wait or pthread_cond_timedwait failed: %s", strerror(ret)); + return ret; + } + } + + /* Unlock */ + ret = pthread_mutex_unlock( &queue->mtx ); + if (ret != 0) { + TRACE_DEBUG(INFO, "pthread_mutex_unlock failed: %s", strerror(ret)); + return ret; + } + + /* Done */ + return timedout ? ETIMEDOUT : 0; +} + + +/* Retrieve a message */ +int meq_get ( meq_t * queue, msg_t ** msg ) +{ + TRACE_ENTRY( "%p %p", queue, msg ); + return _meq_tget(_Q(queue), msg, 0, NULL); } /* Retrieve a message, if any. wait only a certain time */ int meq_timedget ( meq_t * queue, msg_t ** msg, const struct timespec *abstime ) { TRACE_ENTRY( "%p %p %p", queue, msg, abstime ); - TRACE_DEBUG (INFO, "@@@ %s: not implemented yet.", __FUNCTION__ ); - return ENOTSUP; + return _meq_tget(_Q(queue), msg, 1, abstime); } diff -r be49e121160e -r 9862eceddf8d waaad/tests/Makefile.am --- a/waaad/tests/Makefile.am Wed Jul 16 14:57:31 2008 +0900 +++ b/waaad/tests/Makefile.am Wed Jul 16 15:59:53 2008 +0900 @@ -33,6 +33,9 @@ # Testing the messages: testmesg_SOURCES = testmesg.c tests.h $(WAAADSOURCES) -check_PROGRAMS = testdict testmesg +# Testing the queues: +testmeq_SOURCES = testmeq.c tests.h $(WAAADSOURCES) + +check_PROGRAMS = testdict testmesg testmeq TESTS = $(check_PROGRAMS) diff -r be49e121160e -r 9862eceddf8d waaad/tests/testmeq.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/waaad/tests/testmeq.c Wed Jul 16 15:59:53 2008 +0900 @@ -0,0 +1,300 @@ +/********************************************************************************************************* +* Software License Agreement (BSD License) * +* Author: Sebastien Decugis * +* * +* Copyright (c) 2008, WIDE Project and NICT * +* All rights reserved. * +* * +* Redistribution and use of this software in source and binary forms, with or without modification, are * +* permitted provided that the following conditions are met: * +* * +* * Redistributions of source code must retain the above * +* copyright notice, this list of conditions and the * +* following disclaimer. * +* * +* * Redistributions in binary form must reproduce the above * +* copyright notice, this list of conditions and the * +* following disclaimer in the documentation and/or other * +* materials provided with the distribution. * +* * +* * Neither the name of the WIDE Project or NICT nor the * +* names of its contributors may be used to endorse or * +* promote products derived from this software without * +* specific prior written permission of WIDE Project and * +* NICT. * +* * +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED * +* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A * +* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR * +* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * +* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * +* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR * +* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF * +* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * +*********************************************************************************************************/ + +#include "tests.h" + +/* Test for the message queues */ + +/* Structure that is passed to the test function */ +typedef struct { + meq_t * queue; /* pointer to the queue */ + pthread_barrier_t * bar; /* if not NULL, barrier to synchronize before getting messages */ + struct timespec * ts; /* if not NULL, use a timedget instead of a get */ + int nbr; /* number of messages to retrieve from the queue */ +} test_data_t; + +/* The test function to be threaded */ +static void * test_fct(void * data) +{ + int ret = 0, i; + msg_t * msg = NULL; + + test_data_t * td = (test_data_t *) data; + + if (td->bar != NULL) { + ret = pthread_barrier_wait(td->bar); + if (ret != PTHREAD_BARRIER_SERIAL_THREAD) { + CHECK( 0, ret); + } + } + + for (i=0; i< td->nbr; i++) { + if (td->ts != NULL) { + CHECK( 0, meq_timedget(td->queue, &msg, td->ts) ); + } else { + CHECK( 0, meq_get(td->queue, &msg) ); + } + } + + return NULL; +} + +/* Main test routine */ +int main(int argc, char *argv[]) +{ + struct timespec ts; + + msg_t * msg1 = NULL; + msg_t * msg2 = NULL; + msg_t * msg3 = NULL; + + /* First, initialize the daemon modules */ + INIT_WAAAD(); + + /* Set an alarm */ + alarm(5); + + /* Prolog: create the messages */ + { + dict_object_t * acr_model = NULL; + dict_object_t * cer_model = NULL; + dict_object_t * dwr_model = NULL; + + CHECK( 0, dict_search ( DICT_COMMAND, CMD_BY_NAME, "Accounting-Request", &acr_model ) ); + CHECK( 0, dict_search ( DICT_COMMAND, CMD_BY_NAME, "Capabilities-Exchange-Request", &cer_model ) ); + CHECK( 0, dict_search ( DICT_COMMAND, CMD_BY_NAME, "Device-Watchdog-Request", &dwr_model ) ); + CHECK( 0, msg_new ( acr_model, MSGFL_FROM_TEMPLATE, &msg1 ) ); + CHECK( 0, msg_new ( cer_model, MSGFL_FROM_TEMPLATE, &msg2 ) ); + CHECK( 0, msg_new ( dwr_model, MSGFL_FROM_TEMPLATE, &msg3 ) ); + } + + /* Basic operation */ + { + meq_t * queue = NULL; + int count; + msg_t * msg = NULL; + + /* Create the queue */ + CHECK( 0, meq_new(&queue) ); + + /* Check the count is 0 */ + CHECK( 0, meq_length(queue, &count) ); + CHECK( 0, count); + + /* Now enqueue */ + CHECK( 0, meq_post(queue, msg1) ); + CHECK( 0, meq_post(queue, msg2) ); + CHECK( 0, meq_post(queue, msg3) ); + + /* Check the count is 3 */ + CHECK( 0, meq_length(queue, &count) ); + CHECK( 3, count); + + /* Retrieve the first message using meq_get */ + CHECK( 0, meq_get(queue, &msg) ); + CHECK( msg1, msg); + CHECK( 0, meq_length(queue, &count) ); + CHECK( 2, count); + + /* Retrieve the second message using meq_timedget */ + CHECK(0, clock_gettime(CLOCK_REALTIME, &ts)); + ts.tv_sec += 1; /* Set the timeout to 1 second */ + CHECK( 0, meq_timedget(queue, &msg, &ts) ); + CHECK( msg2, msg); + CHECK( 0, meq_length(queue, &count) ); + CHECK( 1, count); + + /* Retrieve the third message using meq_tryget */ + CHECK( 0, meq_tryget(queue, &msg) ); + CHECK( msg3, msg); + CHECK( 0, meq_length(queue, &count) ); + CHECK( 0, count); + + /* Check that another meq_tryget does not block */ + CHECK( EWOULDBLOCK, meq_tryget(queue, &msg) ); + CHECK( 0, meq_length(queue, &count) ); + CHECK( 0, count); + + /* We're done for basic tests */ + CHECK( 0, meq_del(queue) ); + } + + /* Test robustness, ensure no messages are lost */ + { +#define NBR_MSG 200 +#define NBR_THREADS 60 + meq_t *queue = NULL; + pthread_barrier_t bar; + test_data_t td_1; + test_data_t td_2; + msg_t *msgs[NBR_MSG * NBR_THREADS * 2]; + pthread_t thr [NBR_THREADS * 2]; + dict_object_t *dwr_model = NULL; + int count; + int i; + + /* Create the queue */ + CHECK( 0, meq_new(&queue) ); + + /* Create the barrier */ + CHECK( 0, pthread_barrier_init(&bar, NULL, NBR_THREADS * 2 + 1) ); + + /* Initialize the ts */ + CHECK(0, clock_gettime(CLOCK_REALTIME, &ts)); + ts.tv_sec += 2; /* Set the timeout to 2 second */ + + /* Create the messages */ + CHECK( 0, dict_search ( DICT_COMMAND, CMD_BY_NAME, "Device-Watchdog-Request", &dwr_model ) ); + for (i = 0; i < NBR_MSG * NBR_THREADS * 2; i++) { + CHECK( 0, msg_new ( dwr_model, 0, &msgs[i] ) ); + } + + /* Initialize the test data structures */ + td_1.queue = queue; + td_1.bar = &bar; + td_1.ts = &ts; + td_1.nbr = NBR_MSG; + td_2.queue = queue; + td_2.bar = &bar; + td_2.ts = NULL; + td_2.nbr = NBR_MSG; + + /* Create the threads */ + for (i=0; i < NBR_THREADS * 2; i++) { + CHECK( 0, pthread_create( &thr[i], NULL, test_fct, (i & 1) ? &td_1 : &td_2 ) ); + } + + /* Synchronize everyone */ + { + int ret = pthread_barrier_wait(&bar); + if (ret != PTHREAD_BARRIER_SERIAL_THREAD) { + CHECK( 0, ret); + } + } + + /* Now post all the messages */ + for (i=0; i < NBR_MSG * NBR_THREADS * 2; i++) { + CHECK( 0, meq_post(queue, msgs[i]) ); + } + + /* Join all threads. This would block if messages are lost... */ + for (i=0; i < NBR_THREADS * 2; i++) { + CHECK( 0, pthread_join( thr[i], NULL ) ); + } + + /* Check the count of the queue is back to 0 */ + CHECK( 0, meq_length(queue, &count) ); + CHECK( 0, count); + + /* Destroy this queue and the messages */ + CHECK( 0, meq_del(queue) ); + for (i=0; i < NBR_MSG * NBR_THREADS * 2; i++) { + CHECK( 0, msg_free( msgs[i], 1 ) ); + } + } + + /* Test thread cancelation */ + { + meq_t *queue = NULL; + pthread_barrier_t bar; + test_data_t td; + pthread_t th; + + /* Create the queue */ + CHECK( 0, meq_new(&queue) ); + + /* Create the barrier */ + CHECK( 0, pthread_barrier_init(&bar, NULL, 2) ); + + /* Initialize the ts */ + CHECK(0, clock_gettime(CLOCK_REALTIME, &ts)); + ts.tv_sec += 2; /* Set the timeout to 2 second */ + + /* Initialize the test data structures */ + td.queue = queue; + td.bar = &bar; + td.ts = &ts; + td.nbr = 1; + + /* Create the thread */ + CHECK( 0, pthread_create( &th, NULL, test_fct, &td ) ); + + /* Wait for the thread to be running */ + { + int ret = pthread_barrier_wait(&bar); + if (ret != PTHREAD_BARRIER_SERIAL_THREAD) { + CHECK( 0, ret); + } + } + + /* Now cancel the thread */ + CHECK( 0, pthread_cancel( th ) ); + + /* Join it */ + CHECK( 0, pthread_join( th, NULL ) ); + + /* Do the same with the other function */ + td.ts = NULL; + + /* Create the thread */ + CHECK( 0, pthread_create( &th, NULL, test_fct, &td ) ); + + /* Wait for the thread to be running */ + { + int ret = pthread_barrier_wait(&bar); + if (ret != PTHREAD_BARRIER_SERIAL_THREAD) { + CHECK( 0, ret); + } + } + + /* Now cancel the thread */ + CHECK( 0, pthread_cancel( th ) ); + + /* Join it */ + CHECK( 0, pthread_join( th, NULL ) ); + + /* Destroy the queue */ + CHECK( 0, meq_del(queue) ); + } + + /* Delete the messages */ + CHECK( 0, msg_free( msg1, 1 ) ); + CHECK( 0, msg_free( msg2, 1 ) ); + CHECK( 0, msg_free( msg3, 1 ) ); + + /* That's all for the tests yet */ + PASSTEST(); +} + diff -r be49e121160e -r 9862eceddf8d waaad/tests/tests.h --- a/waaad/tests/tests.h Wed Jul 16 14:57:31 2008 +0900 +++ b/waaad/tests/tests.h Wed Jul 16 15:59:53 2008 +0900 @@ -41,6 +41,9 @@ #ifndef _TESTS_H #define _TESTS_H +#include +#include + /* The waaad API */ #include @@ -48,7 +51,6 @@ #include /* Standard includes */ -#include #include #include #include