Navigation


Changeset 767:c47c16436f71 in freeDiameter for libfdproto


Ignore:
Timestamp:
Oct 24, 2011, 6:43:32 AM (13 years ago)
Author:
Sebastien Decugis <sdecugis@nict.go.jp>
Branch:
default
Phase:
public
Message:

Added a limit on fifo queues to avoid memory exaustion when messages are received faster than handled

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libfdproto/fifo.c

    r740 r767  
    5353       
    5454        pthread_mutex_t mtx;    /* Mutex protecting this queue */
    55         pthread_cond_t  cond;   /* condition variable of the list */
     55        pthread_cond_t  cond_pull;      /* condition variable for pulling threads */
     56        pthread_cond_t  cond_push;      /* condition variable for pushing threads */
    5657       
    5758        struct fd_list  list;   /* sentinel for the list of elements */
    5859        int             count;  /* number of objects in the list */
    5960        int             thrs;   /* number of threads waiting for a new element (when count is 0) */
     61       
     62        int             max;    /* maximum number of items to accept if not 0 */
     63        int             thrs_push; /* number of threads waitnig to push an item */
    6064       
    6165        uint16_t        high;   /* High level threshold (see libfreeDiameter.h for details) */
     
    7579
    7680
    77 /* Create a new queue */
    78 int fd_fifo_new ( struct fifo ** queue )
     81/* Create a new queue, with max number of items -- use 0 for no max */
     82int fd_fifo_new ( struct fifo ** queue, int max )
    7983{
    8084        struct fifo * new;
     
    9296        new->eyec = FIFO_EYEC;
    9397        CHECK_POSIX( pthread_mutex_init(&new->mtx, NULL) );
    94         CHECK_POSIX( pthread_cond_init(&new->cond, NULL) );
     98        CHECK_POSIX( pthread_cond_init(&new->cond_pull, NULL) );
     99        CHECK_POSIX( pthread_cond_init(&new->cond_push, NULL) );
     100        new->max = max;
    95101       
    96102        fd_list_init(&new->list, NULL);
     
    119125        CHECK_POSIX_DO(  pthread_mutex_lock( &queue->mtx ), /* continue */  );
    120126        fd_log_debug("   %d elements in queue / %d threads waiting\n", queue->count, queue->thrs);
     127        fd_log_debug("   %d elements max / %d threads waiting to push\n", queue->max, queue->thrs_push);
    121128        fd_log_debug("   thresholds: %d / %d (h:%d), cb: %p,%p (%p), highest: %d\n",
    122129                        queue->high, queue->low, queue->highest,
     
    162169        while (q->thrs) {
    163170                CHECK_POSIX(  pthread_mutex_unlock( &q->mtx ));
    164                 CHECK_POSIX(  pthread_cond_signal(&q->cond)  );
    165                 sched_yield();
    166                 if (loops >= 10)
    167                         /* sleep for a few milliseconds */
    168                         usleep(50000);
     171                CHECK_POSIX(  pthread_cond_signal(&q->cond_pull)  );
     172                usleep(1000);
    169173               
    170174                CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  );
     
    178182        CHECK_POSIX(  pthread_mutex_unlock( &q->mtx )  );
    179183       
    180         CHECK_POSIX_DO(  pthread_cond_destroy( &q->cond ),  );
     184        CHECK_POSIX_DO(  pthread_cond_destroy( &q->cond_pull ),  );
     185       
     186        CHECK_POSIX_DO(  pthread_cond_destroy( &q->cond_push ),  );
    181187       
    182188        CHECK_POSIX_DO(  pthread_mutex_destroy( &q->mtx ),  );
     
    207213        /* Lock the queues */
    208214        CHECK_POSIX(  pthread_mutex_lock( &old->mtx )  );
     215       
     216        CHECK_PARAMS_DO( (! old->thrs_push), {
     217                        pthread_mutex_unlock( &old->mtx );
     218                        return EINVAL;
     219                } );
     220       
    209221        CHECK_POSIX(  pthread_mutex_lock( &new->mtx )  );
    210222       
     
    213225        while (old->thrs) {
    214226                CHECK_POSIX(  pthread_mutex_unlock( &old->mtx ));
    215                 CHECK_POSIX(  pthread_cond_signal(&old->cond)  );
    216                 sched_yield();
    217                 if (loops >= 10)
    218                         /* sleep for a few milliseconds */
    219                         usleep(50000);
     227                CHECK_POSIX(  pthread_cond_signal( &old->cond_pull )  );
     228                usleep(1000);
    220229               
    221230                CHECK_POSIX(  pthread_mutex_lock( &old->mtx )  );
    222                 ASSERT( ++loops < 20 ); /* detect infinite loops */
     231                ASSERT( loops < 20 ); /* detect infinite loops */
    223232        }
    224233       
     
    226235        fd_list_move_end( &new->list, &old->list );
    227236        if (old->count && (!new->count)) {
    228                 CHECK_POSIX(  pthread_cond_signal(&new->cond)  );
     237                CHECK_POSIX(  pthread_cond_signal(&new->cond_pull)  );
    229238        }
    230239        new->count += old->count;
     
    296305}
    297306
     307
     308/* This handler is called when a thread is blocked on a queue, and cancelled */
     309static void fifo_cleanup_push(void * queue)
     310{
     311        struct fifo * q = (struct fifo *)queue;
     312        TRACE_ENTRY( "%p", queue );
     313       
     314        /* The thread has been cancelled, therefore it does not wait on the queue anymore */
     315        q->thrs_push--;
     316       
     317        /* Now unlock the queue, and we're done */
     318        CHECK_POSIX_DO(  pthread_mutex_unlock( &q->mtx ),  /* nothing */  );
     319       
     320        /* End of cleanup handler */
     321        return;
     322}
     323
     324
    298325/* Post a new item in the queue */
    299326int fd_fifo_post_int ( struct fifo * queue, void ** item )
     
    307334        CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item );
    308335       
     336        /* lock the queue */
     337        CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
     338       
     339        if (queue->max) {
     340                while (queue->count >= queue->max) {
     341                        int ret = 0;
     342                       
     343                        /* We have to wait for an item to be pulled */
     344                        queue->thrs_push++ ;
     345                        pthread_cleanup_push( fifo_cleanup_push, queue);
     346                        ret = pthread_cond_wait( &queue->cond_push, &queue->mtx );
     347                        pthread_cleanup_pop(0);
     348                        queue->thrs_push-- ;
     349                       
     350                        ASSERT( ret == 0 );
     351                }
     352        }
     353       
    309354        /* Create a new list item */
    310         CHECK_MALLOC(  new = malloc (sizeof (struct fd_list))  );
     355        CHECK_MALLOC_DO(  new = malloc (sizeof (struct fd_list)) , {
     356                        pthread_mutex_unlock( &queue->mtx );
     357                } );
    311358       
    312359        fd_list_init(new, *item);
    313360        *item = NULL;
    314        
    315         /* lock the queue */
    316         CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
    317361       
    318362        /* Add the new item at the end */
     
    328372        /* Signal if threads are asleep */
    329373        if (queue->thrs > 0) {
    330                 CHECK_POSIX(  pthread_cond_signal(&queue->cond)  );
     374                CHECK_POSIX(  pthread_cond_signal(&queue->cond_pull)  );
     375        }
     376        if (queue->thrs_push > 0) {
     377                /* cascade */
     378                CHECK_POSIX(  pthread_cond_signal(&queue->cond_push)  );
    331379        }
    332380       
     
    355403        free(li);
    356404       
     405        if (queue->thrs_push) {
     406                CHECK_POSIX_DO( pthread_cond_signal( &queue->cond_push ), );
     407        }
     408       
    357409        return ret;
    358410}
     
    388440        /* Check queue status */
    389441        if (queue->count > 0) {
     442got_item:
    390443                /* There are elements in the queue, so pick the first one */
    391444                *item = mq_pop(queue);
    392445                call_cb = test_l_cb(queue);
    393446        } else {
     447                if (queue->thrs_push > 0) {
     448                        /* A thread is trying to push something, let's give it a chance */
     449                        CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
     450                        CHECK_POSIX(  pthread_cond_signal( &queue->cond_push )  );
     451                        usleep(1000);
     452                        CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
     453                        if (queue->count > 0)
     454                                goto got_item;
     455                }
     456               
    394457                wouldblock = 1;
    395458                *item = NULL;
     
    457520                pthread_cleanup_push( fifo_cleanup, queue);
    458521                if (istimed) {
    459                         ret = pthread_cond_timedwait( &queue->cond, &queue->mtx, abstime );
     522                        ret = pthread_cond_timedwait( &queue->cond_pull, &queue->mtx, abstime );
    460523                } else {
    461                         ret = pthread_cond_wait( &queue->cond, &queue->mtx );
     524                        ret = pthread_cond_wait( &queue->cond_pull, &queue->mtx );
    462525                }
    463526                pthread_cleanup_pop(0);
Note: See TracChangeset for help on using the changeset viewer.