Navigation


Changeset 767:c47c16436f71 in freeDiameter


Ignore:
Timestamp:
Oct 24, 2011 6:43:32 AM (19 months ago)
Author:
Sebastien Decugis <sdecugis@nict.go.jp>
Branch:
default
Message:

Added a limit on fifo queues to avoid memory exaustion when messages are received faster than handled

Files:
12 edited

Legend:

Unmodified
Added
Removed
  • extensions/app_radgw/rgw_worker.c

    r741 r767  
    327327        memset(workers, 0, sizeof(workers)); 
    328328         
    329         CHECK_FCT( fd_fifo_new ( &work_stack ) ); 
     329        CHECK_FCT( fd_fifo_new ( &work_stack, 30 ) ); 
    330330         
    331331        /* Create the worker thread(s) */ 
  • extensions/dbg_interactive/queues.i

    r741 r767  
    4242 
    4343%extend fifo { 
    44         fifo() { 
     44        fifo(int max = 0) { 
    4545                struct fifo * q = NULL; 
    46                 int ret = fd_fifo_new(&q); 
     46                int ret = fd_fifo_new(&q, max); 
    4747                if (ret != 0) { 
    4848                        DI_ERROR(ret, NULL, NULL); 
     
    147147                ret = fd_fifo_tryget($self, &obj); 
    148148                if (ret == EWOULDBLOCK) { 
    149                         Py_XINCREF(Py_None); 
     149                        Py_INCREF(Py_None); 
    150150                        return Py_None; 
    151151                } 
     
    182182                ret = fd_fifo_timedget($self, &obj, &ts); 
    183183                if (ret == ETIMEDOUT) { 
    184                         Py_XINCREF(Py_None); 
     184                        Py_INCREF(Py_None); 
    185185                        return Py_None; 
    186186                } 
  • include/freeDiameter/libfdproto.h

    r764 r767  
    27592759 * PARAMETERS: 
    27602760 *  queue       : Upon success, a pointer to the new queue is saved here. 
     2761 *  max         : max number of items in the queue. Above this number, adding a new item becomes a 
     2762 *                blocking operation. Use 0 to disable this maximum. 
    27612763 * 
    27622764 * DESCRIPTION:  
     
    27682770 *  ENOMEM      : Not enough memory to complete the creation.   
    27692771 */ 
    2770 int fd_fifo_new ( struct fifo ** queue ); 
     2772int fd_fifo_new ( struct fifo ** queue, int max ); 
    27712773 
    27722774/* 
  • libfdcore/cnxctx.c

    r740 r767  
    9090 
    9191        if (full) { 
    92                 CHECK_FCT_DO( fd_fifo_new ( &conn->cc_incoming ), return NULL ); 
     92                CHECK_FCT_DO( fd_fifo_new ( &conn->cc_incoming, 5 ), return NULL ); 
    9393        } 
    9494 
  • libfdcore/config.c

    r740 r767  
    6969         
    7070        CHECK_FCT( fd_dict_init(&fd_g_config->cnf_dict) ); 
    71         CHECK_FCT( fd_fifo_new(&fd_g_config->cnf_main_ev) ); 
     71        CHECK_FCT( fd_fifo_new(&fd_g_config->cnf_main_ev, 0) ); 
    7272         
    7373        /* TLS parameters */ 
  • libfdcore/p_psm.c

    r740 r767  
    856856         
    857857        /* Create the FIFO for events */ 
    858         CHECK_FCT( fd_fifo_new(&peer->p_events) ); 
     858        CHECK_FCT( fd_fifo_new(&peer->p_events, 0) ); 
    859859         
    860860        /* Create the PSM controler thread */ 
  • libfdcore/peers.c

    r749 r767  
    7777        fd_list_init(&p->p_actives, p); 
    7878        fd_list_init(&p->p_expiry, p); 
    79         CHECK_FCT( fd_fifo_new(&p->p_tosend) ); 
     79        CHECK_FCT( fd_fifo_new(&p->p_tosend, 5) ); 
    8080        p->p_hbh = lrand48(); 
    8181         
  • libfdcore/queues.c

    r740 r767  
    4545{ 
    4646        TRACE_ENTRY(); 
    47         CHECK_FCT( fd_fifo_new ( &fd_g_incoming ) ); 
    48         CHECK_FCT( fd_fifo_new ( &fd_g_outgoing ) ); 
    49         CHECK_FCT( fd_fifo_new ( &fd_g_local ) ); 
     47        CHECK_FCT( fd_fifo_new ( &fd_g_incoming, 20 ) ); 
     48        CHECK_FCT( fd_fifo_new ( &fd_g_outgoing, 30 ) ); 
     49        CHECK_FCT( fd_fifo_new ( &fd_g_local, 25 ) ); 
    5050        return 0; 
    5151} 
  • libfdcore/sctps.c

    r740 r767  
    497497                conn->cc_sctps_data.array[i].parent = conn; 
    498498                conn->cc_sctps_data.array[i].strid  = i; 
    499                 CHECK_FCT( fd_fifo_new(&conn->cc_sctps_data.array[i].raw_recv) ); 
     499                CHECK_FCT( fd_fifo_new(&conn->cc_sctps_data.array[i].raw_recv, 10) ); 
    500500        } 
    501501         
  • libfdproto/fifo.c

    r740 r767  
    5353         
    5454        pthread_mutex_t mtx;    /* Mutex protecting this queue */ 
    55         pthread_cond_t  cond;   /* condition variable of the list */ 
     55        pthread_cond_t  cond_pull;      /* condition variable for pulling threads */ 
     56        pthread_cond_t  cond_push;      /* condition variable for pushing threads */ 
    5657         
    5758        struct fd_list  list;   /* sentinel for the list of elements */ 
    5859        int             count;  /* number of objects in the list */ 
    5960        int             thrs;   /* number of threads waiting for a new element (when count is 0) */ 
     61         
     62        int             max;    /* maximum number of items to accept if not 0 */ 
     63        int             thrs_push; /* number of threads waitnig to push an item */ 
    6064         
    6165        uint16_t        high;   /* High level threshold (see libfreeDiameter.h for details) */ 
     
    7579 
    7680 
    77 /* Create a new queue */ 
    78 int fd_fifo_new ( struct fifo ** queue ) 
     81/* Create a new queue, with max number of items -- use 0 for no max */ 
     82int fd_fifo_new ( struct fifo ** queue, int max ) 
    7983{ 
    8084        struct fifo * new; 
     
    9296        new->eyec = FIFO_EYEC; 
    9397        CHECK_POSIX( pthread_mutex_init(&new->mtx, NULL) ); 
    94         CHECK_POSIX( pthread_cond_init(&new->cond, NULL) ); 
     98        CHECK_POSIX( pthread_cond_init(&new->cond_pull, NULL) ); 
     99        CHECK_POSIX( pthread_cond_init(&new->cond_push, NULL) ); 
     100        new->max = max; 
    95101         
    96102        fd_list_init(&new->list, NULL); 
     
    119125        CHECK_POSIX_DO(  pthread_mutex_lock( &queue->mtx ), /* continue */  ); 
    120126        fd_log_debug("   %d elements in queue / %d threads waiting\n", queue->count, queue->thrs); 
     127        fd_log_debug("   %d elements max / %d threads waiting to push\n", queue->max, queue->thrs_push); 
    121128        fd_log_debug("   thresholds: %d / %d (h:%d), cb: %p,%p (%p), highest: %d\n", 
    122129                        queue->high, queue->low, queue->highest,  
     
    162169        while (q->thrs) { 
    163170                CHECK_POSIX(  pthread_mutex_unlock( &q->mtx )); 
    164                 CHECK_POSIX(  pthread_cond_signal(&q->cond)  ); 
    165                 sched_yield(); 
    166                 if (loops >= 10) 
    167                         /* sleep for a few milliseconds */ 
    168                         usleep(50000); 
     171                CHECK_POSIX(  pthread_cond_signal(&q->cond_pull)  ); 
     172                usleep(1000); 
    169173                 
    170174                CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  ); 
     
    178182        CHECK_POSIX(  pthread_mutex_unlock( &q->mtx )  ); 
    179183         
    180         CHECK_POSIX_DO(  pthread_cond_destroy( &q->cond ),  ); 
     184        CHECK_POSIX_DO(  pthread_cond_destroy( &q->cond_pull ),  ); 
     185         
     186        CHECK_POSIX_DO(  pthread_cond_destroy( &q->cond_push ),  ); 
    181187         
    182188        CHECK_POSIX_DO(  pthread_mutex_destroy( &q->mtx ),  ); 
     
    207213        /* Lock the queues */ 
    208214        CHECK_POSIX(  pthread_mutex_lock( &old->mtx )  ); 
     215         
     216        CHECK_PARAMS_DO( (! old->thrs_push), { 
     217                        pthread_mutex_unlock( &old->mtx ); 
     218                        return EINVAL; 
     219                } ); 
     220         
    209221        CHECK_POSIX(  pthread_mutex_lock( &new->mtx )  ); 
    210222         
     
    213225        while (old->thrs) { 
    214226                CHECK_POSIX(  pthread_mutex_unlock( &old->mtx )); 
    215                 CHECK_POSIX(  pthread_cond_signal(&old->cond)  ); 
    216                 sched_yield(); 
    217                 if (loops >= 10) 
    218                         /* sleep for a few milliseconds */ 
    219                         usleep(50000); 
     227                CHECK_POSIX(  pthread_cond_signal( &old->cond_pull )  ); 
     228                usleep(1000); 
    220229                 
    221230                CHECK_POSIX(  pthread_mutex_lock( &old->mtx )  ); 
    222                 ASSERT( ++loops < 20 ); /* detect infinite loops */ 
     231                ASSERT( loops < 20 ); /* detect infinite loops */ 
    223232        } 
    224233         
     
    226235        fd_list_move_end( &new->list, &old->list ); 
    227236        if (old->count && (!new->count)) { 
    228                 CHECK_POSIX(  pthread_cond_signal(&new->cond)  ); 
     237                CHECK_POSIX(  pthread_cond_signal(&new->cond_pull)  ); 
    229238        } 
    230239        new->count += old->count; 
     
    296305} 
    297306 
     307 
     308/* This handler is called when a thread is blocked on a queue, and cancelled */ 
     309static void fifo_cleanup_push(void * queue) 
     310{ 
     311        struct fifo * q = (struct fifo *)queue; 
     312        TRACE_ENTRY( "%p", queue ); 
     313         
     314        /* The thread has been cancelled, therefore it does not wait on the queue anymore */ 
     315        q->thrs_push--; 
     316         
     317        /* Now unlock the queue, and we're done */ 
     318        CHECK_POSIX_DO(  pthread_mutex_unlock( &q->mtx ),  /* nothing */  ); 
     319         
     320        /* End of cleanup handler */ 
     321        return; 
     322} 
     323 
     324 
    298325/* Post a new item in the queue */ 
    299326int fd_fifo_post_int ( struct fifo * queue, void ** item ) 
     
    307334        CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item ); 
    308335         
     336        /* lock the queue */ 
     337        CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  ); 
     338         
     339        if (queue->max) { 
     340                while (queue->count >= queue->max) { 
     341                        int ret = 0; 
     342                         
     343                        /* We have to wait for an item to be pulled */ 
     344                        queue->thrs_push++ ; 
     345                        pthread_cleanup_push( fifo_cleanup_push, queue); 
     346                        ret = pthread_cond_wait( &queue->cond_push, &queue->mtx ); 
     347                        pthread_cleanup_pop(0); 
     348                        queue->thrs_push-- ; 
     349                         
     350                        ASSERT( ret == 0 ); 
     351                } 
     352        } 
     353         
    309354        /* Create a new list item */ 
    310         CHECK_MALLOC(  new = malloc (sizeof (struct fd_list))  ); 
     355        CHECK_MALLOC_DO(  new = malloc (sizeof (struct fd_list)) , { 
     356                        pthread_mutex_unlock( &queue->mtx ); 
     357                } ); 
    311358         
    312359        fd_list_init(new, *item); 
    313360        *item = NULL; 
    314          
    315         /* lock the queue */ 
    316         CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  ); 
    317361         
    318362        /* Add the new item at the end */ 
     
    328372        /* Signal if threads are asleep */ 
    329373        if (queue->thrs > 0) { 
    330                 CHECK_POSIX(  pthread_cond_signal(&queue->cond)  ); 
     374                CHECK_POSIX(  pthread_cond_signal(&queue->cond_pull)  ); 
     375        } 
     376        if (queue->thrs_push > 0) { 
     377                /* cascade */ 
     378                CHECK_POSIX(  pthread_cond_signal(&queue->cond_push)  ); 
    331379        } 
    332380         
     
    355403        free(li); 
    356404         
     405        if (queue->thrs_push) { 
     406                CHECK_POSIX_DO( pthread_cond_signal( &queue->cond_push ), ); 
     407        } 
     408         
    357409        return ret; 
    358410} 
     
    388440        /* Check queue status */ 
    389441        if (queue->count > 0) { 
     442got_item: 
    390443                /* There are elements in the queue, so pick the first one */ 
    391444                *item = mq_pop(queue); 
    392445                call_cb = test_l_cb(queue); 
    393446        } else { 
     447                if (queue->thrs_push > 0) { 
     448                        /* A thread is trying to push something, let's give it a chance */ 
     449                        CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  ); 
     450                        CHECK_POSIX(  pthread_cond_signal( &queue->cond_push )  ); 
     451                        usleep(1000); 
     452                        CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  ); 
     453                        if (queue->count > 0) 
     454                                goto got_item; 
     455                } 
     456                 
    394457                wouldblock = 1; 
    395458                *item = NULL; 
     
    457520                pthread_cleanup_push( fifo_cleanup, queue); 
    458521                if (istimed) { 
    459                         ret = pthread_cond_timedwait( &queue->cond, &queue->mtx, abstime ); 
     522                        ret = pthread_cond_timedwait( &queue->cond_pull, &queue->mtx, abstime ); 
    460523                } else { 
    461                         ret = pthread_cond_wait( &queue->cond, &queue->mtx ); 
     524                        ret = pthread_cond_wait( &queue->cond_pull, &queue->mtx ); 
    462525                } 
    463526                pthread_cleanup_pop(0); 
  • tests/testcnx.c

    r740 r767  
    14991499                /* fd_cnx_recv_setaltfifo */ 
    15001500                CHECK( 0, fd_cnx_send(client_side, cer_buf, cer_sz, 0)); 
    1501                 CHECK( 0, fd_fifo_new(&myfifo) ); 
     1501                CHECK( 0, fd_fifo_new(&myfifo, 0) ); 
    15021502                CHECK( 0, fd_cnx_recv_setaltfifo(server_side, myfifo) ); 
    15031503                CHECK( 0, clock_gettime(CLOCK_REALTIME, &now) ); 
     
    15911591                /* fd_cnx_recv_setaltfifo */ 
    15921592                CHECK( 0, fd_cnx_send(client_side, cer_buf, cer_sz, 0)); 
    1593                 CHECK( 0, fd_fifo_new(&myfifo) ); 
     1593                CHECK( 0, fd_fifo_new(&myfifo, 0) ); 
    15941594                CHECK( 0, fd_cnx_recv_setaltfifo(server_side, myfifo) ); 
    15951595                CHECK( 0, clock_gettime(CLOCK_REALTIME, &now) ); 
  • tests/testfifo.c

    r751 r767  
    111111} 
    112112 
     113/* The test function, to be threaded */ 
     114static int iter  = 0; 
     115static void * test_fct2(void * data) 
     116{ 
     117        int i; 
     118        int * item; 
     119        struct test_data * td = (struct test_data *) data; 
     120         
     121        for (i=0; i< td->nbr; i++) { 
     122                item = malloc(sizeof(int)); 
     123                CHECK( 1, item ? 1 : 0 ); 
     124                *item = i; 
     125                CHECK( 0, fd_fifo_post(td->queue, &item) ); 
     126                iter++; 
     127        } 
     128         
     129        return NULL; 
     130} 
     131 
    113132 
    114133/* Main test routine */ 
     
    145164                 
    146165                /* Create the queue */ 
    147                 CHECK( 0, fd_fifo_new(&queue) ); 
     166                CHECK( 0, fd_fifo_new(&queue, 0) ); 
    148167                 
    149168                /* Check the count is 0 */ 
     
    233252                 
    234253                /* Create the queue */ 
    235                 CHECK( 0, fd_fifo_new(&queue) ); 
     254                CHECK( 0, fd_fifo_new(&queue, 0) ); 
    236255                 
    237256                /* Create the barrier */ 
     
    303322                 
    304323                /* Create the queue */ 
    305                 CHECK( 0, fd_fifo_new(&queue) ); 
     324                CHECK( 0, fd_fifo_new(&queue, 0) ); 
    306325                 
    307326                /* Create the barrier */ 
     
    370389                 
    371390                /* Create the queue */ 
    372                 CHECK( 0, fd_fifo_new(&queue) ); 
     391                CHECK( 0, fd_fifo_new(&queue, 0) ); 
    373392                 
    374393                /* Prepare the test data */ 
     
    442461        } 
    443462         
     463        /* Test max queue limit */ 
     464        { 
     465                struct fifo             *queue = NULL; 
     466                struct test_data         td; 
     467                pthread_t                th; 
     468                int *                   item, i; 
     469                 
     470                /* Create the queue */ 
     471                CHECK( 0, fd_fifo_new(&queue, 10) ); 
     472                 
     473                /* Initialize the test data structures */ 
     474                td.queue = queue; 
     475                td.nbr = 15; 
     476                 
     477                CHECK( 0, pthread_create( &th, NULL, test_fct2, &td ) ); 
     478                 
     479                usleep(1000); /* 1 millisec */ 
     480                 
     481                CHECK( 10, iter ); 
     482                 
     483                CHECK( 0, fd_fifo_tryget(queue, &item) ); 
     484                CHECK( 0, *item); 
     485                free(item); 
     486                 
     487                usleep(1000); /* 1 millisec */ 
     488                 
     489                CHECK( 11, iter ); 
     490                 
     491                for (i=1; i<4; i++) { 
     492                        CHECK( 0, fd_fifo_get(queue, &item) ); 
     493                        CHECK( i, *item); 
     494                        free(item); 
     495                } 
     496                 
     497                usleep(1000); /* 1 millisec */ 
     498                 
     499                CHECK( 14, iter ); 
     500                 
     501                /* fd_fifo_dump(0, "test", queue, NULL); */ 
     502                 
     503                for (; i < td.nbr; i++) { 
     504                        CHECK( 0, fd_fifo_tryget(queue, &item) ); 
     505                        CHECK( i, *item); 
     506                        free(item); 
     507                } 
     508                 
     509                CHECK( 0, pthread_join( th, NULL ) ); 
     510                CHECK( 15, iter ); 
     511                 
     512        } 
     513         
    444514        /* Delete the messages */ 
    445515        CHECK( 0, fd_msg_free( msg1 ) ); 
Note: See TracChangeset for help on using the changeset viewer.