changeset 94:9862eceddf8d

Added the queues source code and test
author Sebastien Decugis <sdecugis@nict.go.jp>
date Wed, 16 Jul 2008 15:59:53 +0900
parents be49e121160e
children aed4363ff77e
files waaad/queues.c waaad/tests/Makefile.am waaad/tests/testmeq.c waaad/tests/tests.h
diffstat 4 files changed, 516 insertions(+), 17 deletions(-) [+]
line wrap: on
line diff
--- a/waaad/queues.c	Wed Jul 16 14:57:31 2008 +0900
+++ b/waaad/queues.c	Wed Jul 16 15:59:53 2008 +0900
@@ -37,6 +37,15 @@
  * 
  * This module provides the support functions for queueing and picking messages.
  * See queues.h for more information.
+ *
+ * The threads that call these functions must be in the cancellation state PTHREAD_CANCEL_ENABLE and type PTHREAD_CANCEL_DEFERRED.
+ * This is the default state and type on thread creation.
+ *
+ * In order to destroy properly a queue, the application must:
+ *  -> shutdown any process that can add into the queue first.
+ *  -> pthread_cancel any thread that could be waiting on the queue.
+ *  -> consume any message that is in the queue, using meq_tryget.
+ *  -> then destroy the queue using meq_del.
  */
 
 #include <pthread.h>
@@ -66,6 +75,14 @@
 /* Test if a list is empty */
 #define LI_isempty( _li ) ((_li)->next == (_li))
 
+/* Unlink an element */
+#define LI_unlink( _li_ ) {			\
+	(_li_)->prev->next = (_li_)->next;	\
+	(_li_)->next->prev = (_li_)->prev;	\
+	(_li_)->next = (_li_);			\
+	(_li_)->prev = (_li_);			\
+}
+
 /* Add an element in a list at the end */
 #define LI_push( _sentinel, _item) {		\
 	(_item)->prev = (_sentinel)->prev;	\
@@ -73,6 +90,13 @@
 	(_sentinel)->prev->next = (_item);	\
 	(_sentinel)->prev = (_item);		\
 }
+
+/* Remove an element from the head of a list */
+#define LI_pop( _sentinel, _item) {		\
+	(_item) = (_sentinel)->next;		\
+	assert((_item) != (_sentinel));		\
+	LI_unlink(_item);			\
+}
 	
 
 /* Internal representation of a queue */
@@ -324,36 +348,206 @@
 	
 	/* Add the new message */
 	LI_push( &_Q(queue)->list, new);
+	_Q(queue)->count++;
 	
 	/* Signal if threads are asleep */
-#warning "to complete..."
-	
+	if (_Q(queue)->thrs > 0) {
+		ret = pthread_cond_signal(&_Q(queue)->cond);
+		if (ret != 0) {
+			TRACE_DEBUG(INFO, "pthread_cond_signal failed: %s", strerror(ret));
+			return ret;
+		}
+	}
 	
-	return ENOTSUP;
-}
-
-/* Retrieve a message */
-int meq_get ( meq_t * queue, msg_t ** msg )
-{
-	TRACE_ENTRY( "%p %p", queue, msg );
-	TRACE_DEBUG (INFO, "@@@ %s: not implemented yet.", __FUNCTION__ );
-	return ENOTSUP;
+	/* Unlock */
+	ret = pthread_mutex_unlock( &_Q(queue)->mtx );
+	if (ret != 0) {
+		TRACE_DEBUG(INFO, "pthread_mutex_unlock failed: %s", strerror(ret));
+		return ret;
+	}
+	
+	/* Done */
+	return 0;
 }
 
 /* Retrieve a message, if any */
 int meq_tryget ( meq_t * queue, msg_t ** msg )
 {
+	_li_t * li;
+	int ret = 0;
+	int wouldblock = 0;
+	
 	TRACE_ENTRY( "%p %p", queue, msg );
-	TRACE_DEBUG (INFO, "@@@ %s: not implemented yet.", __FUNCTION__ );
-	return ENOTSUP;
+	
+	/* Check the parameters */
+	if ( (! CHECK_QUEUE( queue )) || (msg == NULL) ) {
+		TRACE_DEBUG(INFO, "Invalid parameter");
+		return EINVAL;
+	}
+	
+	/* lock the queue */
+	ret = pthread_mutex_lock( &_Q(queue)->mtx );
+	if (ret != 0) {
+		TRACE_DEBUG(INFO, "pthread_mutex_lock failed: %s", strerror(ret));
+		return ret;
+	}
+	
+	/* Check queue status */
+	if (_Q(queue)->count > 0) {
+		/* There are messages in the queue, so pick the first one */
+		LI_pop( &_Q(queue)->list, li );
+		_Q(queue)->count--;
+		*msg = li->msg;
+		free(li);
+	} else {
+		wouldblock = 1;
+		*msg = NULL;
+	}
+		
+	/* Unlock */
+	ret = pthread_mutex_unlock( &_Q(queue)->mtx );
+	if (ret != 0) {
+		TRACE_DEBUG(INFO, "pthread_mutex_unlock failed: %s", strerror(ret));
+		return ret;
+	}
+	
+	/* Done */
+	return wouldblock ? EWOULDBLOCK : 0;
+}
+
+/* This handler is called when a thread is blocked on a queue, and cancelled */
+static void _meq_cleanup(void * queue)
+{
+	int ret;
+	
+	TRACE_ENTRY( "%p", queue );
+	
+	/* Check the parameter */
+	if ( ! CHECK_QUEUE( queue )) {
+		TRACE_DEBUG(INFO, "Invalid queue, skipping handler");
+		return;
+	}
+	
+	/* The thread has been cancelled, therefore it does not wait on the queue anymore */
+	_Q(queue)->thrs--;
+	
+	/* Now unlock the queue, and we're done */
+	ret = pthread_mutex_unlock( &_Q(queue)->mtx );
+	if (ret != 0) {
+		TRACE_DEBUG(INFO, "pthread_mutex_unlock failed: %s", strerror(ret));
+	}
+	
+	/* End of cleanup handler */
+	return;
+}
+
+/* The internal function for meq_timedget and meq_get */
+static int _meq_tget ( _meq_t * queue, msg_t ** msg, int istimed, const struct timespec *abstime)
+{
+	int ret = 0;
+	int timedout = 0;
+	_li_t * li;
+	
+	/* Check the parameters */
+	if ( (! CHECK_QUEUE( queue )) || (msg == NULL) || ( istimed && (abstime == NULL)) ) {
+		TRACE_DEBUG(INFO, "Invalid parameter");
+		return EINVAL;
+	}
+	
+	/* Check the thread cancellation state */
+	{
+		int oldstate;
+		ret = pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
+		if (ret != 0) {
+			TRACE_DEBUG(INFO, "pthread_setcancelstate failed: %s", strerror(ret));
+			return ret;
+		}
+		if (oldstate != PTHREAD_CANCEL_ENABLE) {
+			(void) pthread_setcancelstate(oldstate, &ret);
+			TRACE_DEBUG(INFO, "Invalid thread cancellation state to call this function");
+			return EINVAL;
+		}
+	}
+	/* Check the thread cancellation type */
+	{
+		int oldtype;
+		ret = pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &oldtype);
+		if (ret != 0) {
+			TRACE_DEBUG(INFO, "pthread_setcanceltype failed: %s", strerror(ret));
+			return ret;
+		}
+		if (oldtype != PTHREAD_CANCEL_DEFERRED) {
+			(void) pthread_setcanceltype(oldtype, &ret);
+			TRACE_DEBUG(INFO, "Invalid thread cancellation type to call this function");
+			return EINVAL;
+		}
+	}
+	
+	/* Initialize the msg value */
+	*msg = NULL;
+	
+	/* lock the queue */
+	ret = pthread_mutex_lock( &queue->mtx );
+	if (ret != 0) {
+		TRACE_DEBUG(INFO, "pthread_mutex_lock failed: %s", strerror(ret));
+		return ret;
+	}
+	
+awaken:
+	/* Check queue status */
+	if (queue->count > 0) {
+		/* There are messages in the queue, so pick the first one */
+		LI_pop( &queue->list, li );
+		queue->count--;
+		*msg = li->msg;
+		free(li);
+	} else {
+		/* We have to wait for a new message */
+		queue->thrs++ ;
+		pthread_cleanup_push(_meq_cleanup, queue);
+		if (istimed) {
+			ret = pthread_cond_timedwait( &queue->cond, &queue->mtx, abstime );
+		} else {
+			ret = pthread_cond_wait( &queue->cond, &queue->mtx );
+		}
+		pthread_cleanup_pop(0);
+		queue->thrs-- ;
+		if (ret == 0)
+			goto awaken;  /* test for spurious wake-ups */
+		
+		if (istimed && (ret == ETIMEDOUT)) {
+			timedout = 1;
+		} else {
+			/* Unexpected error condition */
+			TRACE_DEBUG(INFO, "pthread_cond_wait or pthread_cond_timedwait failed: %s", strerror(ret));
+			return ret;
+		}
+	}
+	
+	/* Unlock */
+	ret = pthread_mutex_unlock( &queue->mtx );
+	if (ret != 0) {
+		TRACE_DEBUG(INFO, "pthread_mutex_unlock failed: %s", strerror(ret));
+		return ret;
+	}
+	
+	/* Done */
+	return timedout ? ETIMEDOUT : 0;
+}
+
+	
+/* Retrieve a message */
+int meq_get ( meq_t * queue, msg_t ** msg )
+{
+	TRACE_ENTRY( "%p %p", queue, msg );
+	return _meq_tget(_Q(queue), msg, 0, NULL);
 }
 
 /* Retrieve a message, if any. wait only a certain time */
 int meq_timedget ( meq_t * queue, msg_t ** msg, const struct timespec *abstime )
 {
 	TRACE_ENTRY( "%p %p %p", queue, msg, abstime );
-	TRACE_DEBUG (INFO, "@@@ %s: not implemented yet.", __FUNCTION__ );
-	return ENOTSUP;
+	return _meq_tget(_Q(queue), msg, 1, abstime);
 }
 
 
--- a/waaad/tests/Makefile.am	Wed Jul 16 14:57:31 2008 +0900
+++ b/waaad/tests/Makefile.am	Wed Jul 16 15:59:53 2008 +0900
@@ -33,6 +33,9 @@
 # Testing the messages:
 testmesg_SOURCES = testmesg.c tests.h $(WAAADSOURCES)
 
-check_PROGRAMS = testdict testmesg
+# Testing the queues:
+testmeq_SOURCES = testmeq.c tests.h $(WAAADSOURCES)
+
+check_PROGRAMS = testdict testmesg testmeq
 
 TESTS = $(check_PROGRAMS)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/waaad/tests/testmeq.c	Wed Jul 16 15:59:53 2008 +0900
@@ -0,0 +1,300 @@
+/*********************************************************************************************************
+* Software License Agreement (BSD License)                                                               *
+* Author: Sebastien Decugis <sdecugis@nict.go.jp>							 *
+*													 *
+* Copyright (c) 2008, WIDE Project and NICT								 *
+* All rights reserved.											 *
+* 													 *
+* Redistribution and use of this software in source and binary forms, with or without modification, are  *
+* permitted provided that the following conditions are met:						 *
+* 													 *
+* * Redistributions of source code must retain the above 						 *
+*   copyright notice, this list of conditions and the 							 *
+*   following disclaimer.										 *
+*    													 *
+* * Redistributions in binary form must reproduce the above 						 *
+*   copyright notice, this list of conditions and the 							 *
+*   following disclaimer in the documentation and/or other						 *
+*   materials provided with the distribution.								 *
+* 													 *
+* * Neither the name of the WIDE Project or NICT nor the 						 *
+*   names of its contributors may be used to endorse or 						 *
+*   promote products derived from this software without 						 *
+*   specific prior written permission of WIDE Project and 						 *
+*   NICT.												 *
+* 													 *
+* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
+* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
+* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
+* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 	 *
+* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 	 *
+* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
+* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF   *
+* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.								 *
+*********************************************************************************************************/
+
+#include "tests.h"
+
+/* Test for the message queues */
+
+/* Structure that is passed to the test function */
+typedef struct {
+	meq_t             * queue; /* pointer to the queue */
+	pthread_barrier_t * bar;   /* if not NULL, barrier to synchronize before getting messages */
+	struct timespec   * ts;	   /* if not NULL, use a timedget instead of a get */
+	int		  nbr;	   /* number of messages to retrieve from the queue */
+} test_data_t;
+
+/* The test function to be threaded */
+static void * test_fct(void * data)
+{
+	int ret = 0, i;
+	msg_t * msg = NULL;
+	
+	test_data_t * td = (test_data_t *) data;
+	
+	if (td->bar != NULL) {
+		ret = pthread_barrier_wait(td->bar);
+		if (ret != PTHREAD_BARRIER_SERIAL_THREAD) {
+			CHECK( 0, ret);
+		}
+	}
+	
+	for (i=0; i< td->nbr; i++) {
+		if (td->ts != NULL) {
+			CHECK( 0, meq_timedget(td->queue, &msg, td->ts) );
+		} else {
+			CHECK( 0, meq_get(td->queue, &msg) );
+		}
+	}
+	
+	return NULL;
+}
+
+/* Main test routine */
+int main(int argc, char *argv[])
+{
+	struct timespec ts;
+	
+	msg_t * msg1 = NULL;
+	msg_t * msg2 = NULL;
+	msg_t * msg3 = NULL;
+	
+	/* First, initialize the daemon modules */
+	INIT_WAAAD();
+	
+	/* Set an alarm */
+	alarm(5);
+	
+	/* Prolog: create the messages */
+	{
+		dict_object_t * acr_model = NULL;
+		dict_object_t * cer_model = NULL;
+		dict_object_t * dwr_model = NULL;
+
+		CHECK( 0, dict_search ( DICT_COMMAND, CMD_BY_NAME, "Accounting-Request", 		&acr_model ) );
+		CHECK( 0, dict_search ( DICT_COMMAND, CMD_BY_NAME, "Capabilities-Exchange-Request", 	&cer_model ) );
+		CHECK( 0, dict_search ( DICT_COMMAND, CMD_BY_NAME, "Device-Watchdog-Request",		&dwr_model ) );
+		CHECK( 0, msg_new ( acr_model, MSGFL_FROM_TEMPLATE, &msg1 ) );
+		CHECK( 0, msg_new ( cer_model, MSGFL_FROM_TEMPLATE, &msg2 ) );
+		CHECK( 0, msg_new ( dwr_model, MSGFL_FROM_TEMPLATE, &msg3 ) );
+	}
+
+	/* Basic operation */
+	{
+		meq_t * queue = NULL;
+		int count;
+		msg_t * msg  = NULL;
+		
+		/* Create the queue */
+		CHECK( 0, meq_new(&queue) );
+		
+		/* Check the count is 0 */
+		CHECK( 0, meq_length(queue, &count) );
+		CHECK( 0, count);
+		
+		/* Now enqueue */
+		CHECK( 0, meq_post(queue, msg1) );
+		CHECK( 0, meq_post(queue, msg2) );
+		CHECK( 0, meq_post(queue, msg3) );
+		
+		/* Check the count is 3 */
+		CHECK( 0, meq_length(queue, &count) );
+		CHECK( 3, count);
+		
+		/* Retrieve the first message using meq_get */
+		CHECK( 0, meq_get(queue, &msg) );
+		CHECK( msg1, msg);
+		CHECK( 0, meq_length(queue, &count) );
+		CHECK( 2, count);
+		
+		/* Retrieve the second message using meq_timedget */
+		CHECK(0, clock_gettime(CLOCK_REALTIME, &ts));
+		ts.tv_sec += 1; /* Set the timeout to 1 second */
+		CHECK( 0, meq_timedget(queue, &msg, &ts) );
+		CHECK( msg2, msg);
+		CHECK( 0, meq_length(queue, &count) );
+		CHECK( 1, count);
+		
+		/* Retrieve the third message using meq_tryget */
+		CHECK( 0, meq_tryget(queue, &msg) );
+		CHECK( msg3, msg);
+		CHECK( 0, meq_length(queue, &count) );
+		CHECK( 0, count);
+		
+		/* Check that another meq_tryget does not block */
+		CHECK( EWOULDBLOCK, meq_tryget(queue, &msg) );
+		CHECK( 0, meq_length(queue, &count) );
+		CHECK( 0, count);
+		
+		/* We're done for basic tests */
+		CHECK( 0, meq_del(queue) );
+	}
+	
+	/* Test robustness, ensure no messages are lost */
+	{
+#define NBR_MSG		200
+#define NBR_THREADS	60
+		meq_t      		*queue = NULL;
+		pthread_barrier_t	 bar;
+		test_data_t		 td_1;
+		test_data_t		 td_2;
+		msg_t	   		*msgs[NBR_MSG * NBR_THREADS * 2];
+		pthread_t  		 thr [NBR_THREADS * 2];
+		dict_object_t 		*dwr_model = NULL;
+		int 			 count;
+		int			 i;
+		
+		/* Create the queue */
+		CHECK( 0, meq_new(&queue) );
+		
+		/* Create the barrier */
+		CHECK( 0, pthread_barrier_init(&bar, NULL, NBR_THREADS * 2 + 1) );
+		
+		/* Initialize the ts */
+		CHECK(0, clock_gettime(CLOCK_REALTIME, &ts));
+		ts.tv_sec += 2; /* Set the timeout to 2 second */
+		
+		/* Create the messages */
+		CHECK( 0, dict_search ( DICT_COMMAND, CMD_BY_NAME, "Device-Watchdog-Request", &dwr_model ) );
+		for (i = 0; i < NBR_MSG * NBR_THREADS * 2; i++) {
+			CHECK( 0, msg_new ( dwr_model, 0, &msgs[i] ) );
+		}
+		
+		/* Initialize the test data structures */
+		td_1.queue = queue;
+		td_1.bar = &bar;
+		td_1.ts  = &ts;
+		td_1.nbr = NBR_MSG;
+		td_2.queue = queue;
+		td_2.bar = &bar;
+		td_2.ts  = NULL;
+		td_2.nbr = NBR_MSG;
+		
+		/* Create the threads */
+		for (i=0; i < NBR_THREADS * 2; i++) {
+			CHECK( 0, pthread_create( &thr[i], NULL, test_fct, (i & 1) ? &td_1 : &td_2 ) );
+		}
+		
+		/* Synchronize everyone */
+		{
+			int ret = pthread_barrier_wait(&bar);
+			if (ret != PTHREAD_BARRIER_SERIAL_THREAD) {
+				CHECK( 0, ret);
+			}
+		}
+		
+		/* Now post all the messages */
+		for (i=0; i < NBR_MSG * NBR_THREADS * 2; i++) {
+			CHECK( 0, meq_post(queue, msgs[i]) );
+		}
+		
+		/* Join all threads. This would block if messages are lost... */
+		for (i=0; i < NBR_THREADS * 2; i++) {
+			CHECK( 0, pthread_join( thr[i], NULL ) );
+		}
+		
+		/* Check the count of the queue is back to 0 */
+		CHECK( 0, meq_length(queue, &count) );
+		CHECK( 0, count);
+		
+		/* Destroy this queue and the messages */
+		CHECK( 0, meq_del(queue) );
+		for (i=0; i < NBR_MSG * NBR_THREADS * 2; i++) {
+			CHECK( 0, msg_free(  msgs[i], 1 ) );
+		}
+	}
+	
+	/* Test thread cancelation */
+	{
+		meq_t      		*queue = NULL;
+		pthread_barrier_t	 bar;
+		test_data_t		 td;
+		pthread_t		 th;
+		
+		/* Create the queue */
+		CHECK( 0, meq_new(&queue) );
+		
+		/* Create the barrier */
+		CHECK( 0, pthread_barrier_init(&bar, NULL, 2) );
+		
+		/* Initialize the ts */
+		CHECK(0, clock_gettime(CLOCK_REALTIME, &ts));
+		ts.tv_sec += 2; /* Set the timeout to 2 second */
+		
+		/* Initialize the test data structures */
+		td.queue = queue;
+		td.bar = &bar;
+		td.ts  = &ts;
+		td.nbr = 1;
+		
+		/* Create the thread */
+		CHECK( 0, pthread_create( &th, NULL, test_fct, &td ) );
+		
+		/* Wait for the thread to be running */
+		{
+			int ret = pthread_barrier_wait(&bar);
+			if (ret != PTHREAD_BARRIER_SERIAL_THREAD) {
+				CHECK( 0, ret);
+			}
+		}
+		
+		/* Now cancel the thread */
+		CHECK( 0, pthread_cancel( th ) );
+		
+		/* Join it */
+		CHECK( 0, pthread_join( th, NULL ) );
+		
+		/* Do the same with the other function */
+		td.ts  = NULL;
+		
+		/* Create the thread */
+		CHECK( 0, pthread_create( &th, NULL, test_fct, &td ) );
+		
+		/* Wait for the thread to be running */
+		{
+			int ret = pthread_barrier_wait(&bar);
+			if (ret != PTHREAD_BARRIER_SERIAL_THREAD) {
+				CHECK( 0, ret);
+			}
+		}
+		
+		/* Now cancel the thread */
+		CHECK( 0, pthread_cancel( th ) );
+		
+		/* Join it */
+		CHECK( 0, pthread_join( th, NULL ) );
+		
+		/* Destroy the queue */
+		CHECK( 0, meq_del(queue) );
+	}
+	
+	/* Delete the messages */
+	CHECK( 0, msg_free( msg1, 1 ) );
+	CHECK( 0, msg_free( msg2, 1 ) );
+	CHECK( 0, msg_free( msg3, 1 ) );
+
+	/* That's all for the tests yet */
+	PASSTEST();
+} 
+	
--- a/waaad/tests/tests.h	Wed Jul 16 14:57:31 2008 +0900
+++ b/waaad/tests/tests.h	Wed Jul 16 15:59:53 2008 +0900
@@ -41,6 +41,9 @@
 #ifndef _TESTS_H
 #define _TESTS_H
 
+#include <pthread.h>
+#include <errno.h>
+
 /* The waaad API */
 #include <waaad/waaad.h>
 
@@ -48,7 +51,6 @@
 #include <waaad-internal.h>
 
 /* Standard includes */
-#include <errno.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <getopt.h>
"Welcome to our mercurial repository"