changeset 767:c47c16436f71

Added a limit on fifo queues to avoid memory exaustion when messages are received faster than handled
author Sebastien Decugis <sdecugis@nict.go.jp>
date Sun, 23 Oct 2011 23:43:32 +0200
parents 734bf3e1487b
children a5a82d50c25e
files extensions/app_radgw/rgw_worker.c extensions/dbg_interactive/queues.i include/freeDiameter/libfdproto.h libfdcore/cnxctx.c libfdcore/config.c libfdcore/p_psm.c libfdcore/peers.c libfdcore/queues.c libfdcore/sctps.c libfdproto/fifo.c tests/testcnx.c tests/testfifo.c
diffstat 12 files changed, 179 insertions(+), 44 deletions(-) [+]
line wrap: on
line diff
--- a/extensions/app_radgw/rgw_worker.c	Sat Oct 22 22:56:55 2011 +0200
+++ b/extensions/app_radgw/rgw_worker.c	Sun Oct 23 23:43:32 2011 +0200
@@ -326,7 +326,7 @@
 	
 	memset(workers, 0, sizeof(workers));
 	
-	CHECK_FCT( fd_fifo_new ( &work_stack ) );
+	CHECK_FCT( fd_fifo_new ( &work_stack, 30 ) );
 	
 	/* Create the worker thread(s) */
 	for (i = 0; i < NB_WORKERS; i++) {
--- a/extensions/dbg_interactive/queues.i	Sat Oct 22 22:56:55 2011 +0200
+++ b/extensions/dbg_interactive/queues.i	Sun Oct 23 23:43:32 2011 +0200
@@ -41,9 +41,9 @@
 };
 
 %extend fifo {
-	fifo() {
+	fifo(int max = 0) {
 		struct fifo * q = NULL;
-		int ret = fd_fifo_new(&q);
+		int ret = fd_fifo_new(&q, max);
 		if (ret != 0) {
 			DI_ERROR(ret, NULL, NULL);
 			return NULL;
@@ -146,7 +146,7 @@
 		
 		ret = fd_fifo_tryget($self, &obj);
 		if (ret == EWOULDBLOCK) {
-			Py_XINCREF(Py_None);
+			Py_INCREF(Py_None);
 			return Py_None;
 		}
 		if (ret != 0) {
@@ -181,7 +181,7 @@
 		
 		ret = fd_fifo_timedget($self, &obj, &ts);
 		if (ret == ETIMEDOUT) {
-			Py_XINCREF(Py_None);
+			Py_INCREF(Py_None);
 			return Py_None;
 		}
 		if (ret != 0) {
--- a/include/freeDiameter/libfdproto.h	Sat Oct 22 22:56:55 2011 +0200
+++ b/include/freeDiameter/libfdproto.h	Sun Oct 23 23:43:32 2011 +0200
@@ -2758,6 +2758,8 @@
  *
  * PARAMETERS:
  *  queue	: Upon success, a pointer to the new queue is saved here.
+ *  max		: max number of items in the queue. Above this number, adding a new item becomes a
+ *		  blocking operation. Use 0 to disable this maximum.
  *
  * DESCRIPTION: 
  *  Create a new empty queue.
@@ -2767,7 +2769,7 @@
  *  EINVAL 	: The parameter is invalid.
  *  ENOMEM	: Not enough memory to complete the creation.  
  */
-int fd_fifo_new ( struct fifo ** queue );
+int fd_fifo_new ( struct fifo ** queue, int max );
 
 /*
  * FUNCTION:	fd_fifo_del
--- a/libfdcore/cnxctx.c	Sat Oct 22 22:56:55 2011 +0200
+++ b/libfdcore/cnxctx.c	Sun Oct 23 23:43:32 2011 +0200
@@ -89,7 +89,7 @@
 	memset(conn, 0, sizeof(struct cnxctx));
 
 	if (full) {
-		CHECK_FCT_DO( fd_fifo_new ( &conn->cc_incoming ), return NULL );
+		CHECK_FCT_DO( fd_fifo_new ( &conn->cc_incoming, 5 ), return NULL );
 	}
 
 	return conn;
--- a/libfdcore/config.c	Sat Oct 22 22:56:55 2011 +0200
+++ b/libfdcore/config.c	Sun Oct 23 23:43:32 2011 +0200
@@ -68,7 +68,7 @@
 	fd_g_config->cnf_orstateid = (uint32_t) time(NULL);
 	
 	CHECK_FCT( fd_dict_init(&fd_g_config->cnf_dict) );
-	CHECK_FCT( fd_fifo_new(&fd_g_config->cnf_main_ev) );
+	CHECK_FCT( fd_fifo_new(&fd_g_config->cnf_main_ev, 0) );
 	
 	/* TLS parameters */
 	CHECK_GNUTLS_DO( gnutls_certificate_allocate_credentials (&fd_g_config->cnf_sec_data.credentials), return ENOMEM );
--- a/libfdcore/p_psm.c	Sat Oct 22 22:56:55 2011 +0200
+++ b/libfdcore/p_psm.c	Sun Oct 23 23:43:32 2011 +0200
@@ -855,7 +855,7 @@
 	CHECK_PARAMS( fd_peer_getstate(peer) == STATE_NEW );
 	
 	/* Create the FIFO for events */
-	CHECK_FCT( fd_fifo_new(&peer->p_events) );
+	CHECK_FCT( fd_fifo_new(&peer->p_events, 0) );
 	
 	/* Create the PSM controler thread */
 	CHECK_POSIX( pthread_create( &peer->p_psm, NULL, p_psm_th, peer ) );
--- a/libfdcore/peers.c	Sat Oct 22 22:56:55 2011 +0200
+++ b/libfdcore/peers.c	Sun Oct 23 23:43:32 2011 +0200
@@ -76,7 +76,7 @@
 	
 	fd_list_init(&p->p_actives, p);
 	fd_list_init(&p->p_expiry, p);
-	CHECK_FCT( fd_fifo_new(&p->p_tosend) );
+	CHECK_FCT( fd_fifo_new(&p->p_tosend, 5) );
 	p->p_hbh = lrand48();
 	
 	fd_list_init(&p->p_sr.srs, p);
--- a/libfdcore/queues.c	Sat Oct 22 22:56:55 2011 +0200
+++ b/libfdcore/queues.c	Sun Oct 23 23:43:32 2011 +0200
@@ -44,9 +44,9 @@
 int fd_queues_init(void)
 {
 	TRACE_ENTRY();
-	CHECK_FCT( fd_fifo_new ( &fd_g_incoming ) );
-	CHECK_FCT( fd_fifo_new ( &fd_g_outgoing ) );
-	CHECK_FCT( fd_fifo_new ( &fd_g_local ) );
+	CHECK_FCT( fd_fifo_new ( &fd_g_incoming, 20 ) );
+	CHECK_FCT( fd_fifo_new ( &fd_g_outgoing, 30 ) );
+	CHECK_FCT( fd_fifo_new ( &fd_g_local, 25 ) );
 	return 0;
 }
 
--- a/libfdcore/sctps.c	Sat Oct 22 22:56:55 2011 +0200
+++ b/libfdcore/sctps.c	Sun Oct 23 23:43:32 2011 +0200
@@ -496,7 +496,7 @@
 	for (i = 0; i < conn->cc_sctp_para.pairs; i++) {
 		conn->cc_sctps_data.array[i].parent = conn;
 		conn->cc_sctps_data.array[i].strid  = i;
-		CHECK_FCT( fd_fifo_new(&conn->cc_sctps_data.array[i].raw_recv) );
+		CHECK_FCT( fd_fifo_new(&conn->cc_sctps_data.array[i].raw_recv, 10) );
 	}
 	
 	/* Set push/pull functions in the master session, using fifo in array[0] */
--- a/libfdproto/fifo.c	Sat Oct 22 22:56:55 2011 +0200
+++ b/libfdproto/fifo.c	Sun Oct 23 23:43:32 2011 +0200
@@ -52,12 +52,16 @@
 	int		eyec;	/* An eye catcher, also used to check a queue is valid. FIFO_EYEC */
 	
 	pthread_mutex_t	mtx;	/* Mutex protecting this queue */
-	pthread_cond_t	cond;	/* condition variable of the list */
+	pthread_cond_t	cond_pull;	/* condition variable for pulling threads */
+	pthread_cond_t	cond_push;	/* condition variable for pushing threads */
 	
 	struct fd_list	list;	/* sentinel for the list of elements */
 	int		count;	/* number of objects in the list */
 	int		thrs;	/* number of threads waiting for a new element (when count is 0) */
 	
+	int 		max;	/* maximum number of items to accept if not 0 */
+	int		thrs_push; /* number of threads waitnig to push an item */
+	
 	uint16_t	high;	/* High level threshold (see libfreeDiameter.h for details) */
 	uint16_t	low;	/* Low level threshhold */
 	void 		*data;	/* Opaque pointer for threshold callbacks */
@@ -74,8 +78,8 @@
 #define CHECK_FIFO( _queue ) (( (_queue) != NULL) && ( (_queue)->eyec == FIFO_EYEC) )
 
 
-/* Create a new queue */
-int fd_fifo_new ( struct fifo ** queue )
+/* Create a new queue, with max number of items -- use 0 for no max */
+int fd_fifo_new ( struct fifo ** queue, int max )
 {
 	struct fifo * new;
 	
@@ -91,7 +95,9 @@
 	
 	new->eyec = FIFO_EYEC;
 	CHECK_POSIX( pthread_mutex_init(&new->mtx, NULL) );
-	CHECK_POSIX( pthread_cond_init(&new->cond, NULL) );
+	CHECK_POSIX( pthread_cond_init(&new->cond_pull, NULL) );
+	CHECK_POSIX( pthread_cond_init(&new->cond_push, NULL) );
+	new->max = max;
 	
 	fd_list_init(&new->list, NULL);
 	
@@ -118,6 +124,7 @@
 	
 	CHECK_POSIX_DO(  pthread_mutex_lock( &queue->mtx ), /* continue */  );
 	fd_log_debug("   %d elements in queue / %d threads waiting\n", queue->count, queue->thrs);
+	fd_log_debug("   %d elements max / %d threads waiting to push\n", queue->max, queue->thrs_push);
 	fd_log_debug("   thresholds: %d / %d (h:%d), cb: %p,%p (%p), highest: %d\n",
 			queue->high, queue->low, queue->highest, 
 			queue->h_cb, queue->l_cb, queue->data,
@@ -161,11 +168,8 @@
 	/* Have all waiting threads return an error */
 	while (q->thrs) {
 		CHECK_POSIX(  pthread_mutex_unlock( &q->mtx ));
-		CHECK_POSIX(  pthread_cond_signal(&q->cond)  );
-		sched_yield();
-		if (loops >= 10)
-			/* sleep for a few milliseconds */
-			usleep(50000);
+		CHECK_POSIX(  pthread_cond_signal(&q->cond_pull)  );
+		usleep(1000);
 		
 		CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  );
 		ASSERT( ++loops < 20 ); /* detect infinite loops */
@@ -177,7 +181,9 @@
 	/* And destroy it */
 	CHECK_POSIX(  pthread_mutex_unlock( &q->mtx )  );
 	
-	CHECK_POSIX_DO(  pthread_cond_destroy( &q->cond ),  );
+	CHECK_POSIX_DO(  pthread_cond_destroy( &q->cond_pull ),  );
+	
+	CHECK_POSIX_DO(  pthread_cond_destroy( &q->cond_push ),  );
 	
 	CHECK_POSIX_DO(  pthread_mutex_destroy( &q->mtx ),  );
 	
@@ -206,26 +212,29 @@
 	
 	/* Lock the queues */
 	CHECK_POSIX(  pthread_mutex_lock( &old->mtx )  );
+	
+	CHECK_PARAMS_DO( (! old->thrs_push), {
+			pthread_mutex_unlock( &old->mtx );
+			return EINVAL;
+		} );
+	
 	CHECK_POSIX(  pthread_mutex_lock( &new->mtx )  );
 	
 	/* Any waiting thread on the old queue returns an error */
 	old->eyec = 0xdead;
 	while (old->thrs) {
 		CHECK_POSIX(  pthread_mutex_unlock( &old->mtx ));
-		CHECK_POSIX(  pthread_cond_signal(&old->cond)  );
-		sched_yield();
-		if (loops >= 10)
-			/* sleep for a few milliseconds */
-			usleep(50000);
+		CHECK_POSIX(  pthread_cond_signal( &old->cond_pull )  );
+		usleep(1000);
 		
 		CHECK_POSIX(  pthread_mutex_lock( &old->mtx )  );
-		ASSERT( ++loops < 20 ); /* detect infinite loops */
+		ASSERT( loops < 20 ); /* detect infinite loops */
 	}
 	
 	/* Move all data from old to new */
 	fd_list_move_end( &new->list, &old->list );
 	if (old->count && (!new->count)) {
-		CHECK_POSIX(  pthread_cond_signal(&new->cond)  );
+		CHECK_POSIX(  pthread_cond_signal(&new->cond_pull)  );
 	}
 	new->count += old->count;
 	
@@ -295,6 +304,24 @@
 	return 0;
 }
 
+
+/* This handler is called when a thread is blocked on a queue, and cancelled */
+static void fifo_cleanup_push(void * queue)
+{
+	struct fifo * q = (struct fifo *)queue;
+	TRACE_ENTRY( "%p", queue );
+	
+	/* The thread has been cancelled, therefore it does not wait on the queue anymore */
+	q->thrs_push--;
+	
+	/* Now unlock the queue, and we're done */
+	CHECK_POSIX_DO(  pthread_mutex_unlock( &q->mtx ),  /* nothing */  );
+	
+	/* End of cleanup handler */
+	return;
+}
+
+
 /* Post a new item in the queue */
 int fd_fifo_post_int ( struct fifo * queue, void ** item )
 {
@@ -306,15 +333,32 @@
 	/* Check the parameters */
 	CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item );
 	
+	/* lock the queue */
+	CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
+	
+	if (queue->max) {
+		while (queue->count >= queue->max) {
+			int ret = 0;
+			
+			/* We have to wait for an item to be pulled */
+			queue->thrs_push++ ;
+			pthread_cleanup_push( fifo_cleanup_push, queue);
+			ret = pthread_cond_wait( &queue->cond_push, &queue->mtx );
+			pthread_cleanup_pop(0);
+			queue->thrs_push-- ;
+			
+			ASSERT( ret == 0 );
+		}
+	}
+	
 	/* Create a new list item */
-	CHECK_MALLOC(  new = malloc (sizeof (struct fd_list))  );
+	CHECK_MALLOC_DO(  new = malloc (sizeof (struct fd_list)) , {
+			pthread_mutex_unlock( &queue->mtx );
+		} );
 	
 	fd_list_init(new, *item);
 	*item = NULL;
 	
-	/* lock the queue */
-	CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
-	
 	/* Add the new item at the end */
 	fd_list_insert_before( &queue->list, new);
 	queue->count++;
@@ -327,7 +371,11 @@
 	
 	/* Signal if threads are asleep */
 	if (queue->thrs > 0) {
-		CHECK_POSIX(  pthread_cond_signal(&queue->cond)  );
+		CHECK_POSIX(  pthread_cond_signal(&queue->cond_pull)  );
+	}
+	if (queue->thrs_push > 0) {
+		/* cascade */
+		CHECK_POSIX(  pthread_cond_signal(&queue->cond_push)  );
 	}
 	
 	/* Unlock */
@@ -354,6 +402,10 @@
 	ret = li->o;
 	free(li);
 	
+	if (queue->thrs_push) {
+		CHECK_POSIX_DO( pthread_cond_signal( &queue->cond_push ), );
+	}
+	
 	return ret;
 }
 
@@ -387,10 +439,21 @@
 	
 	/* Check queue status */
 	if (queue->count > 0) {
+got_item:
 		/* There are elements in the queue, so pick the first one */
 		*item = mq_pop(queue);
 		call_cb = test_l_cb(queue);
 	} else {
+		if (queue->thrs_push > 0) {
+			/* A thread is trying to push something, let's give it a chance */
+			CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
+			CHECK_POSIX(  pthread_cond_signal( &queue->cond_push )  );
+			usleep(1000);
+			CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
+			if (queue->count > 0)
+				goto got_item;
+		}
+		
 		wouldblock = 1;
 		*item = NULL;
 	}
@@ -456,9 +519,9 @@
 		queue->thrs++ ;
 		pthread_cleanup_push( fifo_cleanup, queue);
 		if (istimed) {
-			ret = pthread_cond_timedwait( &queue->cond, &queue->mtx, abstime );
+			ret = pthread_cond_timedwait( &queue->cond_pull, &queue->mtx, abstime );
 		} else {
-			ret = pthread_cond_wait( &queue->cond, &queue->mtx );
+			ret = pthread_cond_wait( &queue->cond_pull, &queue->mtx );
 		}
 		pthread_cleanup_pop(0);
 		queue->thrs-- ;
--- a/tests/testcnx.c	Sat Oct 22 22:56:55 2011 +0200
+++ b/tests/testcnx.c	Sun Oct 23 23:43:32 2011 +0200
@@ -1498,7 +1498,7 @@
 		
 		/* fd_cnx_recv_setaltfifo */
 		CHECK( 0, fd_cnx_send(client_side, cer_buf, cer_sz, 0));
-		CHECK( 0, fd_fifo_new(&myfifo) );
+		CHECK( 0, fd_fifo_new(&myfifo, 0) );
 		CHECK( 0, fd_cnx_recv_setaltfifo(server_side, myfifo) );
 		CHECK( 0, clock_gettime(CLOCK_REALTIME, &now) );
 		do {
@@ -1590,7 +1590,7 @@
 		
 		/* fd_cnx_recv_setaltfifo */
 		CHECK( 0, fd_cnx_send(client_side, cer_buf, cer_sz, 0));
-		CHECK( 0, fd_fifo_new(&myfifo) );
+		CHECK( 0, fd_fifo_new(&myfifo, 0) );
 		CHECK( 0, fd_cnx_recv_setaltfifo(server_side, myfifo) );
 		CHECK( 0, clock_gettime(CLOCK_REALTIME, &now) );
 		do {
--- a/tests/testfifo.c	Sat Oct 22 22:56:55 2011 +0200
+++ b/tests/testfifo.c	Sun Oct 23 23:43:32 2011 +0200
@@ -110,6 +110,25 @@
 	return NULL;
 }
 
+/* The test function, to be threaded */
+static int iter  = 0;
+static void * test_fct2(void * data)
+{
+	int i;
+	int * item;
+	struct test_data * td = (struct test_data *) data;
+	
+	for (i=0; i< td->nbr; i++) {
+		item = malloc(sizeof(int));
+		CHECK( 1, item ? 1 : 0 );
+		*item = i;
+		CHECK( 0, fd_fifo_post(td->queue, &item) );
+		iter++;
+	}
+	
+	return NULL;
+}
+
 
 /* Main test routine */
 int main(int argc, char *argv[])
@@ -144,7 +163,7 @@
 		struct msg * msg  = NULL;
 		
 		/* Create the queue */
-		CHECK( 0, fd_fifo_new(&queue) );
+		CHECK( 0, fd_fifo_new(&queue, 0) );
 		
 		/* Check the count is 0 */
 		CHECK( 0, fd_fifo_length(queue, &count) );
@@ -232,7 +251,7 @@
 		}
 		
 		/* Create the queue */
-		CHECK( 0, fd_fifo_new(&queue) );
+		CHECK( 0, fd_fifo_new(&queue, 0) );
 		
 		/* Create the barrier */
 		CHECK( 0, pthread_barrier_init(&bar, NULL, nbr_threads * 2 + 1) );
@@ -302,7 +321,7 @@
 		pthread_t		 th;
 		
 		/* Create the queue */
-		CHECK( 0, fd_fifo_new(&queue) );
+		CHECK( 0, fd_fifo_new(&queue, 0) );
 		
 		/* Create the barrier */
 		CHECK( 0, pthread_barrier_init(&bar, NULL, 2) );
@@ -369,7 +388,7 @@
 		struct msg * msg  = NULL;
 		
 		/* Create the queue */
-		CHECK( 0, fd_fifo_new(&queue) );
+		CHECK( 0, fd_fifo_new(&queue, 0) );
 		
 		/* Prepare the test data */
 		memset(&thrh_td, 0, sizeof(thrh_td));
@@ -441,6 +460,57 @@
 		CHECK( 0, fd_fifo_del(&queue) );
 	}
 	
+	/* Test max queue limit */
+	{
+		struct fifo      	*queue = NULL;
+		struct test_data	 td;
+		pthread_t		 th;
+		int *			item, i;
+		
+		/* Create the queue */
+		CHECK( 0, fd_fifo_new(&queue, 10) );
+		
+		/* Initialize the test data structures */
+		td.queue = queue;
+		td.nbr = 15;
+		
+		CHECK( 0, pthread_create( &th, NULL, test_fct2, &td ) );
+		
+		usleep(1000); /* 1 millisec */
+		
+		CHECK( 10, iter );
+		
+		CHECK( 0, fd_fifo_tryget(queue, &item) );
+		CHECK( 0, *item);
+		free(item);
+		
+		usleep(1000); /* 1 millisec */
+		
+		CHECK( 11, iter );
+		
+		for (i=1; i<4; i++) {
+			CHECK( 0, fd_fifo_get(queue, &item) );
+			CHECK( i, *item);
+			free(item);
+		}
+		
+		usleep(1000); /* 1 millisec */
+		
+		CHECK( 14, iter );
+		
+		/* fd_fifo_dump(0, "test", queue, NULL); */
+		
+		for (; i < td.nbr; i++) {
+			CHECK( 0, fd_fifo_tryget(queue, &item) );
+			CHECK( i, *item);
+			free(item);
+		}
+		
+		CHECK( 0, pthread_join( th, NULL ) );
+		CHECK( 15, iter );
+		
+	}
+	
 	/* Delete the messages */
 	CHECK( 0, fd_msg_free( msg1 ) );
 	CHECK( 0, fd_msg_free( msg2 ) );
"Welcome to our mercurial repository"