# HG changeset patch # User Sebastien Decugis # Date 1319406212 -7200 # Node ID c47c16436f717ad42867ff0c2443defc27f4b070 # Parent 734bf3e1487b2d76563f51ae290c6547c2563923 Added a limit on fifo queues to avoid memory exaustion when messages are received faster than handled diff -r 734bf3e1487b -r c47c16436f71 extensions/app_radgw/rgw_worker.c --- a/extensions/app_radgw/rgw_worker.c Sat Oct 22 22:56:55 2011 +0200 +++ b/extensions/app_radgw/rgw_worker.c Sun Oct 23 23:43:32 2011 +0200 @@ -326,7 +326,7 @@ memset(workers, 0, sizeof(workers)); - CHECK_FCT( fd_fifo_new ( &work_stack ) ); + CHECK_FCT( fd_fifo_new ( &work_stack, 30 ) ); /* Create the worker thread(s) */ for (i = 0; i < NB_WORKERS; i++) { diff -r 734bf3e1487b -r c47c16436f71 extensions/dbg_interactive/queues.i --- a/extensions/dbg_interactive/queues.i Sat Oct 22 22:56:55 2011 +0200 +++ b/extensions/dbg_interactive/queues.i Sun Oct 23 23:43:32 2011 +0200 @@ -41,9 +41,9 @@ }; %extend fifo { - fifo() { + fifo(int max = 0) { struct fifo * q = NULL; - int ret = fd_fifo_new(&q); + int ret = fd_fifo_new(&q, max); if (ret != 0) { DI_ERROR(ret, NULL, NULL); return NULL; @@ -146,7 +146,7 @@ ret = fd_fifo_tryget($self, &obj); if (ret == EWOULDBLOCK) { - Py_XINCREF(Py_None); + Py_INCREF(Py_None); return Py_None; } if (ret != 0) { @@ -181,7 +181,7 @@ ret = fd_fifo_timedget($self, &obj, &ts); if (ret == ETIMEDOUT) { - Py_XINCREF(Py_None); + Py_INCREF(Py_None); return Py_None; } if (ret != 0) { diff -r 734bf3e1487b -r c47c16436f71 include/freeDiameter/libfdproto.h --- a/include/freeDiameter/libfdproto.h Sat Oct 22 22:56:55 2011 +0200 +++ b/include/freeDiameter/libfdproto.h Sun Oct 23 23:43:32 2011 +0200 @@ -2758,6 +2758,8 @@ * * PARAMETERS: * queue : Upon success, a pointer to the new queue is saved here. + * max : max number of items in the queue. Above this number, adding a new item becomes a + * blocking operation. Use 0 to disable this maximum. * * DESCRIPTION: * Create a new empty queue. @@ -2767,7 +2769,7 @@ * EINVAL : The parameter is invalid. * ENOMEM : Not enough memory to complete the creation. */ -int fd_fifo_new ( struct fifo ** queue ); +int fd_fifo_new ( struct fifo ** queue, int max ); /* * FUNCTION: fd_fifo_del diff -r 734bf3e1487b -r c47c16436f71 libfdcore/cnxctx.c --- a/libfdcore/cnxctx.c Sat Oct 22 22:56:55 2011 +0200 +++ b/libfdcore/cnxctx.c Sun Oct 23 23:43:32 2011 +0200 @@ -89,7 +89,7 @@ memset(conn, 0, sizeof(struct cnxctx)); if (full) { - CHECK_FCT_DO( fd_fifo_new ( &conn->cc_incoming ), return NULL ); + CHECK_FCT_DO( fd_fifo_new ( &conn->cc_incoming, 5 ), return NULL ); } return conn; diff -r 734bf3e1487b -r c47c16436f71 libfdcore/config.c --- a/libfdcore/config.c Sat Oct 22 22:56:55 2011 +0200 +++ b/libfdcore/config.c Sun Oct 23 23:43:32 2011 +0200 @@ -68,7 +68,7 @@ fd_g_config->cnf_orstateid = (uint32_t) time(NULL); CHECK_FCT( fd_dict_init(&fd_g_config->cnf_dict) ); - CHECK_FCT( fd_fifo_new(&fd_g_config->cnf_main_ev) ); + CHECK_FCT( fd_fifo_new(&fd_g_config->cnf_main_ev, 0) ); /* TLS parameters */ CHECK_GNUTLS_DO( gnutls_certificate_allocate_credentials (&fd_g_config->cnf_sec_data.credentials), return ENOMEM ); diff -r 734bf3e1487b -r c47c16436f71 libfdcore/p_psm.c --- a/libfdcore/p_psm.c Sat Oct 22 22:56:55 2011 +0200 +++ b/libfdcore/p_psm.c Sun Oct 23 23:43:32 2011 +0200 @@ -855,7 +855,7 @@ CHECK_PARAMS( fd_peer_getstate(peer) == STATE_NEW ); /* Create the FIFO for events */ - CHECK_FCT( fd_fifo_new(&peer->p_events) ); + CHECK_FCT( fd_fifo_new(&peer->p_events, 0) ); /* Create the PSM controler thread */ CHECK_POSIX( pthread_create( &peer->p_psm, NULL, p_psm_th, peer ) ); diff -r 734bf3e1487b -r c47c16436f71 libfdcore/peers.c --- a/libfdcore/peers.c Sat Oct 22 22:56:55 2011 +0200 +++ b/libfdcore/peers.c Sun Oct 23 23:43:32 2011 +0200 @@ -76,7 +76,7 @@ fd_list_init(&p->p_actives, p); fd_list_init(&p->p_expiry, p); - CHECK_FCT( fd_fifo_new(&p->p_tosend) ); + CHECK_FCT( fd_fifo_new(&p->p_tosend, 5) ); p->p_hbh = lrand48(); fd_list_init(&p->p_sr.srs, p); diff -r 734bf3e1487b -r c47c16436f71 libfdcore/queues.c --- a/libfdcore/queues.c Sat Oct 22 22:56:55 2011 +0200 +++ b/libfdcore/queues.c Sun Oct 23 23:43:32 2011 +0200 @@ -44,9 +44,9 @@ int fd_queues_init(void) { TRACE_ENTRY(); - CHECK_FCT( fd_fifo_new ( &fd_g_incoming ) ); - CHECK_FCT( fd_fifo_new ( &fd_g_outgoing ) ); - CHECK_FCT( fd_fifo_new ( &fd_g_local ) ); + CHECK_FCT( fd_fifo_new ( &fd_g_incoming, 20 ) ); + CHECK_FCT( fd_fifo_new ( &fd_g_outgoing, 30 ) ); + CHECK_FCT( fd_fifo_new ( &fd_g_local, 25 ) ); return 0; } diff -r 734bf3e1487b -r c47c16436f71 libfdcore/sctps.c --- a/libfdcore/sctps.c Sat Oct 22 22:56:55 2011 +0200 +++ b/libfdcore/sctps.c Sun Oct 23 23:43:32 2011 +0200 @@ -496,7 +496,7 @@ for (i = 0; i < conn->cc_sctp_para.pairs; i++) { conn->cc_sctps_data.array[i].parent = conn; conn->cc_sctps_data.array[i].strid = i; - CHECK_FCT( fd_fifo_new(&conn->cc_sctps_data.array[i].raw_recv) ); + CHECK_FCT( fd_fifo_new(&conn->cc_sctps_data.array[i].raw_recv, 10) ); } /* Set push/pull functions in the master session, using fifo in array[0] */ diff -r 734bf3e1487b -r c47c16436f71 libfdproto/fifo.c --- a/libfdproto/fifo.c Sat Oct 22 22:56:55 2011 +0200 +++ b/libfdproto/fifo.c Sun Oct 23 23:43:32 2011 +0200 @@ -52,12 +52,16 @@ int eyec; /* An eye catcher, also used to check a queue is valid. FIFO_EYEC */ pthread_mutex_t mtx; /* Mutex protecting this queue */ - pthread_cond_t cond; /* condition variable of the list */ + pthread_cond_t cond_pull; /* condition variable for pulling threads */ + pthread_cond_t cond_push; /* condition variable for pushing threads */ struct fd_list list; /* sentinel for the list of elements */ int count; /* number of objects in the list */ int thrs; /* number of threads waiting for a new element (when count is 0) */ + int max; /* maximum number of items to accept if not 0 */ + int thrs_push; /* number of threads waitnig to push an item */ + uint16_t high; /* High level threshold (see libfreeDiameter.h for details) */ uint16_t low; /* Low level threshhold */ void *data; /* Opaque pointer for threshold callbacks */ @@ -74,8 +78,8 @@ #define CHECK_FIFO( _queue ) (( (_queue) != NULL) && ( (_queue)->eyec == FIFO_EYEC) ) -/* Create a new queue */ -int fd_fifo_new ( struct fifo ** queue ) +/* Create a new queue, with max number of items -- use 0 for no max */ +int fd_fifo_new ( struct fifo ** queue, int max ) { struct fifo * new; @@ -91,7 +95,9 @@ new->eyec = FIFO_EYEC; CHECK_POSIX( pthread_mutex_init(&new->mtx, NULL) ); - CHECK_POSIX( pthread_cond_init(&new->cond, NULL) ); + CHECK_POSIX( pthread_cond_init(&new->cond_pull, NULL) ); + CHECK_POSIX( pthread_cond_init(&new->cond_push, NULL) ); + new->max = max; fd_list_init(&new->list, NULL); @@ -118,6 +124,7 @@ CHECK_POSIX_DO( pthread_mutex_lock( &queue->mtx ), /* continue */ ); fd_log_debug(" %d elements in queue / %d threads waiting\n", queue->count, queue->thrs); + fd_log_debug(" %d elements max / %d threads waiting to push\n", queue->max, queue->thrs_push); fd_log_debug(" thresholds: %d / %d (h:%d), cb: %p,%p (%p), highest: %d\n", queue->high, queue->low, queue->highest, queue->h_cb, queue->l_cb, queue->data, @@ -161,11 +168,8 @@ /* Have all waiting threads return an error */ while (q->thrs) { CHECK_POSIX( pthread_mutex_unlock( &q->mtx )); - CHECK_POSIX( pthread_cond_signal(&q->cond) ); - sched_yield(); - if (loops >= 10) - /* sleep for a few milliseconds */ - usleep(50000); + CHECK_POSIX( pthread_cond_signal(&q->cond_pull) ); + usleep(1000); CHECK_POSIX( pthread_mutex_lock( &q->mtx ) ); ASSERT( ++loops < 20 ); /* detect infinite loops */ @@ -177,7 +181,9 @@ /* And destroy it */ CHECK_POSIX( pthread_mutex_unlock( &q->mtx ) ); - CHECK_POSIX_DO( pthread_cond_destroy( &q->cond ), ); + CHECK_POSIX_DO( pthread_cond_destroy( &q->cond_pull ), ); + + CHECK_POSIX_DO( pthread_cond_destroy( &q->cond_push ), ); CHECK_POSIX_DO( pthread_mutex_destroy( &q->mtx ), ); @@ -206,26 +212,29 @@ /* Lock the queues */ CHECK_POSIX( pthread_mutex_lock( &old->mtx ) ); + + CHECK_PARAMS_DO( (! old->thrs_push), { + pthread_mutex_unlock( &old->mtx ); + return EINVAL; + } ); + CHECK_POSIX( pthread_mutex_lock( &new->mtx ) ); /* Any waiting thread on the old queue returns an error */ old->eyec = 0xdead; while (old->thrs) { CHECK_POSIX( pthread_mutex_unlock( &old->mtx )); - CHECK_POSIX( pthread_cond_signal(&old->cond) ); - sched_yield(); - if (loops >= 10) - /* sleep for a few milliseconds */ - usleep(50000); + CHECK_POSIX( pthread_cond_signal( &old->cond_pull ) ); + usleep(1000); CHECK_POSIX( pthread_mutex_lock( &old->mtx ) ); - ASSERT( ++loops < 20 ); /* detect infinite loops */ + ASSERT( loops < 20 ); /* detect infinite loops */ } /* Move all data from old to new */ fd_list_move_end( &new->list, &old->list ); if (old->count && (!new->count)) { - CHECK_POSIX( pthread_cond_signal(&new->cond) ); + CHECK_POSIX( pthread_cond_signal(&new->cond_pull) ); } new->count += old->count; @@ -295,6 +304,24 @@ return 0; } + +/* This handler is called when a thread is blocked on a queue, and cancelled */ +static void fifo_cleanup_push(void * queue) +{ + struct fifo * q = (struct fifo *)queue; + TRACE_ENTRY( "%p", queue ); + + /* The thread has been cancelled, therefore it does not wait on the queue anymore */ + q->thrs_push--; + + /* Now unlock the queue, and we're done */ + CHECK_POSIX_DO( pthread_mutex_unlock( &q->mtx ), /* nothing */ ); + + /* End of cleanup handler */ + return; +} + + /* Post a new item in the queue */ int fd_fifo_post_int ( struct fifo * queue, void ** item ) { @@ -306,15 +333,32 @@ /* Check the parameters */ CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item ); + /* lock the queue */ + CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); + + if (queue->max) { + while (queue->count >= queue->max) { + int ret = 0; + + /* We have to wait for an item to be pulled */ + queue->thrs_push++ ; + pthread_cleanup_push( fifo_cleanup_push, queue); + ret = pthread_cond_wait( &queue->cond_push, &queue->mtx ); + pthread_cleanup_pop(0); + queue->thrs_push-- ; + + ASSERT( ret == 0 ); + } + } + /* Create a new list item */ - CHECK_MALLOC( new = malloc (sizeof (struct fd_list)) ); + CHECK_MALLOC_DO( new = malloc (sizeof (struct fd_list)) , { + pthread_mutex_unlock( &queue->mtx ); + } ); fd_list_init(new, *item); *item = NULL; - /* lock the queue */ - CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); - /* Add the new item at the end */ fd_list_insert_before( &queue->list, new); queue->count++; @@ -327,7 +371,11 @@ /* Signal if threads are asleep */ if (queue->thrs > 0) { - CHECK_POSIX( pthread_cond_signal(&queue->cond) ); + CHECK_POSIX( pthread_cond_signal(&queue->cond_pull) ); + } + if (queue->thrs_push > 0) { + /* cascade */ + CHECK_POSIX( pthread_cond_signal(&queue->cond_push) ); } /* Unlock */ @@ -354,6 +402,10 @@ ret = li->o; free(li); + if (queue->thrs_push) { + CHECK_POSIX_DO( pthread_cond_signal( &queue->cond_push ), ); + } + return ret; } @@ -387,10 +439,21 @@ /* Check queue status */ if (queue->count > 0) { +got_item: /* There are elements in the queue, so pick the first one */ *item = mq_pop(queue); call_cb = test_l_cb(queue); } else { + if (queue->thrs_push > 0) { + /* A thread is trying to push something, let's give it a chance */ + CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); + CHECK_POSIX( pthread_cond_signal( &queue->cond_push ) ); + usleep(1000); + CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); + if (queue->count > 0) + goto got_item; + } + wouldblock = 1; *item = NULL; } @@ -456,9 +519,9 @@ queue->thrs++ ; pthread_cleanup_push( fifo_cleanup, queue); if (istimed) { - ret = pthread_cond_timedwait( &queue->cond, &queue->mtx, abstime ); + ret = pthread_cond_timedwait( &queue->cond_pull, &queue->mtx, abstime ); } else { - ret = pthread_cond_wait( &queue->cond, &queue->mtx ); + ret = pthread_cond_wait( &queue->cond_pull, &queue->mtx ); } pthread_cleanup_pop(0); queue->thrs-- ; diff -r 734bf3e1487b -r c47c16436f71 tests/testcnx.c --- a/tests/testcnx.c Sat Oct 22 22:56:55 2011 +0200 +++ b/tests/testcnx.c Sun Oct 23 23:43:32 2011 +0200 @@ -1498,7 +1498,7 @@ /* fd_cnx_recv_setaltfifo */ CHECK( 0, fd_cnx_send(client_side, cer_buf, cer_sz, 0)); - CHECK( 0, fd_fifo_new(&myfifo) ); + CHECK( 0, fd_fifo_new(&myfifo, 0) ); CHECK( 0, fd_cnx_recv_setaltfifo(server_side, myfifo) ); CHECK( 0, clock_gettime(CLOCK_REALTIME, &now) ); do { @@ -1590,7 +1590,7 @@ /* fd_cnx_recv_setaltfifo */ CHECK( 0, fd_cnx_send(client_side, cer_buf, cer_sz, 0)); - CHECK( 0, fd_fifo_new(&myfifo) ); + CHECK( 0, fd_fifo_new(&myfifo, 0) ); CHECK( 0, fd_cnx_recv_setaltfifo(server_side, myfifo) ); CHECK( 0, clock_gettime(CLOCK_REALTIME, &now) ); do { diff -r 734bf3e1487b -r c47c16436f71 tests/testfifo.c --- a/tests/testfifo.c Sat Oct 22 22:56:55 2011 +0200 +++ b/tests/testfifo.c Sun Oct 23 23:43:32 2011 +0200 @@ -110,6 +110,25 @@ return NULL; } +/* The test function, to be threaded */ +static int iter = 0; +static void * test_fct2(void * data) +{ + int i; + int * item; + struct test_data * td = (struct test_data *) data; + + for (i=0; i< td->nbr; i++) { + item = malloc(sizeof(int)); + CHECK( 1, item ? 1 : 0 ); + *item = i; + CHECK( 0, fd_fifo_post(td->queue, &item) ); + iter++; + } + + return NULL; +} + /* Main test routine */ int main(int argc, char *argv[]) @@ -144,7 +163,7 @@ struct msg * msg = NULL; /* Create the queue */ - CHECK( 0, fd_fifo_new(&queue) ); + CHECK( 0, fd_fifo_new(&queue, 0) ); /* Check the count is 0 */ CHECK( 0, fd_fifo_length(queue, &count) ); @@ -232,7 +251,7 @@ } /* Create the queue */ - CHECK( 0, fd_fifo_new(&queue) ); + CHECK( 0, fd_fifo_new(&queue, 0) ); /* Create the barrier */ CHECK( 0, pthread_barrier_init(&bar, NULL, nbr_threads * 2 + 1) ); @@ -302,7 +321,7 @@ pthread_t th; /* Create the queue */ - CHECK( 0, fd_fifo_new(&queue) ); + CHECK( 0, fd_fifo_new(&queue, 0) ); /* Create the barrier */ CHECK( 0, pthread_barrier_init(&bar, NULL, 2) ); @@ -369,7 +388,7 @@ struct msg * msg = NULL; /* Create the queue */ - CHECK( 0, fd_fifo_new(&queue) ); + CHECK( 0, fd_fifo_new(&queue, 0) ); /* Prepare the test data */ memset(&thrh_td, 0, sizeof(thrh_td)); @@ -441,6 +460,57 @@ CHECK( 0, fd_fifo_del(&queue) ); } + /* Test max queue limit */ + { + struct fifo *queue = NULL; + struct test_data td; + pthread_t th; + int * item, i; + + /* Create the queue */ + CHECK( 0, fd_fifo_new(&queue, 10) ); + + /* Initialize the test data structures */ + td.queue = queue; + td.nbr = 15; + + CHECK( 0, pthread_create( &th, NULL, test_fct2, &td ) ); + + usleep(1000); /* 1 millisec */ + + CHECK( 10, iter ); + + CHECK( 0, fd_fifo_tryget(queue, &item) ); + CHECK( 0, *item); + free(item); + + usleep(1000); /* 1 millisec */ + + CHECK( 11, iter ); + + for (i=1; i<4; i++) { + CHECK( 0, fd_fifo_get(queue, &item) ); + CHECK( i, *item); + free(item); + } + + usleep(1000); /* 1 millisec */ + + CHECK( 14, iter ); + + /* fd_fifo_dump(0, "test", queue, NULL); */ + + for (; i < td.nbr; i++) { + CHECK( 0, fd_fifo_tryget(queue, &item) ); + CHECK( i, *item); + free(item); + } + + CHECK( 0, pthread_join( th, NULL ) ); + CHECK( 15, iter ); + + } + /* Delete the messages */ CHECK( 0, fd_msg_free( msg1 ) ); CHECK( 0, fd_msg_free( msg2 ) );