Navigation


Changeset 767:c47c16436f71 in freeDiameter


Ignore:
Timestamp:
Oct 24, 2011, 6:43:32 AM (13 years ago)
Author:
Sebastien Decugis <sdecugis@nict.go.jp>
Branch:
default
Phase:
public
Message:

Added a limit on fifo queues to avoid memory exaustion when messages are received faster than handled

Files:
12 edited

Legend:

Unmodified
Added
Removed
  • extensions/app_radgw/rgw_worker.c

    r741 r767  
    327327        memset(workers, 0, sizeof(workers));
    328328       
    329         CHECK_FCT( fd_fifo_new ( &work_stack ) );
     329        CHECK_FCT( fd_fifo_new ( &work_stack, 30 ) );
    330330       
    331331        /* Create the worker thread(s) */
  • extensions/dbg_interactive/queues.i

    r741 r767  
    4242
    4343%extend fifo {
    44         fifo() {
     44        fifo(int max = 0) {
    4545                struct fifo * q = NULL;
    46                 int ret = fd_fifo_new(&q);
     46                int ret = fd_fifo_new(&q, max);
    4747                if (ret != 0) {
    4848                        DI_ERROR(ret, NULL, NULL);
     
    147147                ret = fd_fifo_tryget($self, &obj);
    148148                if (ret == EWOULDBLOCK) {
    149                         Py_XINCREF(Py_None);
     149                        Py_INCREF(Py_None);
    150150                        return Py_None;
    151151                }
     
    182182                ret = fd_fifo_timedget($self, &obj, &ts);
    183183                if (ret == ETIMEDOUT) {
    184                         Py_XINCREF(Py_None);
     184                        Py_INCREF(Py_None);
    185185                        return Py_None;
    186186                }
  • include/freeDiameter/libfdproto.h

    r764 r767  
    27592759 * PARAMETERS:
    27602760 *  queue       : Upon success, a pointer to the new queue is saved here.
     2761 *  max         : max number of items in the queue. Above this number, adding a new item becomes a
     2762 *                blocking operation. Use 0 to disable this maximum.
    27612763 *
    27622764 * DESCRIPTION:
     
    27682770 *  ENOMEM      : Not enough memory to complete the creation. 
    27692771 */
    2770 int fd_fifo_new ( struct fifo ** queue );
     2772int fd_fifo_new ( struct fifo ** queue, int max );
    27712773
    27722774/*
  • libfdcore/cnxctx.c

    r740 r767  
    9090
    9191        if (full) {
    92                 CHECK_FCT_DO( fd_fifo_new ( &conn->cc_incoming ), return NULL );
     92                CHECK_FCT_DO( fd_fifo_new ( &conn->cc_incoming, 5 ), return NULL );
    9393        }
    9494
  • libfdcore/config.c

    r740 r767  
    6969       
    7070        CHECK_FCT( fd_dict_init(&fd_g_config->cnf_dict) );
    71         CHECK_FCT( fd_fifo_new(&fd_g_config->cnf_main_ev) );
     71        CHECK_FCT( fd_fifo_new(&fd_g_config->cnf_main_ev, 0) );
    7272       
    7373        /* TLS parameters */
  • libfdcore/p_psm.c

    r740 r767  
    856856       
    857857        /* Create the FIFO for events */
    858         CHECK_FCT( fd_fifo_new(&peer->p_events) );
     858        CHECK_FCT( fd_fifo_new(&peer->p_events, 0) );
    859859       
    860860        /* Create the PSM controler thread */
  • libfdcore/peers.c

    r749 r767  
    7777        fd_list_init(&p->p_actives, p);
    7878        fd_list_init(&p->p_expiry, p);
    79         CHECK_FCT( fd_fifo_new(&p->p_tosend) );
     79        CHECK_FCT( fd_fifo_new(&p->p_tosend, 5) );
    8080        p->p_hbh = lrand48();
    8181       
  • libfdcore/queues.c

    r740 r767  
    4545{
    4646        TRACE_ENTRY();
    47         CHECK_FCT( fd_fifo_new ( &fd_g_incoming ) );
    48         CHECK_FCT( fd_fifo_new ( &fd_g_outgoing ) );
    49         CHECK_FCT( fd_fifo_new ( &fd_g_local ) );
     47        CHECK_FCT( fd_fifo_new ( &fd_g_incoming, 20 ) );
     48        CHECK_FCT( fd_fifo_new ( &fd_g_outgoing, 30 ) );
     49        CHECK_FCT( fd_fifo_new ( &fd_g_local, 25 ) );
    5050        return 0;
    5151}
  • libfdcore/sctps.c

    r740 r767  
    497497                conn->cc_sctps_data.array[i].parent = conn;
    498498                conn->cc_sctps_data.array[i].strid  = i;
    499                 CHECK_FCT( fd_fifo_new(&conn->cc_sctps_data.array[i].raw_recv) );
     499                CHECK_FCT( fd_fifo_new(&conn->cc_sctps_data.array[i].raw_recv, 10) );
    500500        }
    501501       
  • libfdproto/fifo.c

    r740 r767  
    5353       
    5454        pthread_mutex_t mtx;    /* Mutex protecting this queue */
    55         pthread_cond_t  cond;   /* condition variable of the list */
     55        pthread_cond_t  cond_pull;      /* condition variable for pulling threads */
     56        pthread_cond_t  cond_push;      /* condition variable for pushing threads */
    5657       
    5758        struct fd_list  list;   /* sentinel for the list of elements */
    5859        int             count;  /* number of objects in the list */
    5960        int             thrs;   /* number of threads waiting for a new element (when count is 0) */
     61       
     62        int             max;    /* maximum number of items to accept if not 0 */
     63        int             thrs_push; /* number of threads waitnig to push an item */
    6064       
    6165        uint16_t        high;   /* High level threshold (see libfreeDiameter.h for details) */
     
    7579
    7680
    77 /* Create a new queue */
    78 int fd_fifo_new ( struct fifo ** queue )
     81/* Create a new queue, with max number of items -- use 0 for no max */
     82int fd_fifo_new ( struct fifo ** queue, int max )
    7983{
    8084        struct fifo * new;
     
    9296        new->eyec = FIFO_EYEC;
    9397        CHECK_POSIX( pthread_mutex_init(&new->mtx, NULL) );
    94         CHECK_POSIX( pthread_cond_init(&new->cond, NULL) );
     98        CHECK_POSIX( pthread_cond_init(&new->cond_pull, NULL) );
     99        CHECK_POSIX( pthread_cond_init(&new->cond_push, NULL) );
     100        new->max = max;
    95101       
    96102        fd_list_init(&new->list, NULL);
     
    119125        CHECK_POSIX_DO(  pthread_mutex_lock( &queue->mtx ), /* continue */  );
    120126        fd_log_debug("   %d elements in queue / %d threads waiting\n", queue->count, queue->thrs);
     127        fd_log_debug("   %d elements max / %d threads waiting to push\n", queue->max, queue->thrs_push);
    121128        fd_log_debug("   thresholds: %d / %d (h:%d), cb: %p,%p (%p), highest: %d\n",
    122129                        queue->high, queue->low, queue->highest,
     
    162169        while (q->thrs) {
    163170                CHECK_POSIX(  pthread_mutex_unlock( &q->mtx ));
    164                 CHECK_POSIX(  pthread_cond_signal(&q->cond)  );
    165                 sched_yield();
    166                 if (loops >= 10)
    167                         /* sleep for a few milliseconds */
    168                         usleep(50000);
     171                CHECK_POSIX(  pthread_cond_signal(&q->cond_pull)  );
     172                usleep(1000);
    169173               
    170174                CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  );
     
    178182        CHECK_POSIX(  pthread_mutex_unlock( &q->mtx )  );
    179183       
    180         CHECK_POSIX_DO(  pthread_cond_destroy( &q->cond ),  );
     184        CHECK_POSIX_DO(  pthread_cond_destroy( &q->cond_pull ),  );
     185       
     186        CHECK_POSIX_DO(  pthread_cond_destroy( &q->cond_push ),  );
    181187       
    182188        CHECK_POSIX_DO(  pthread_mutex_destroy( &q->mtx ),  );
     
    207213        /* Lock the queues */
    208214        CHECK_POSIX(  pthread_mutex_lock( &old->mtx )  );
     215       
     216        CHECK_PARAMS_DO( (! old->thrs_push), {
     217                        pthread_mutex_unlock( &old->mtx );
     218                        return EINVAL;
     219                } );
     220       
    209221        CHECK_POSIX(  pthread_mutex_lock( &new->mtx )  );
    210222       
     
    213225        while (old->thrs) {
    214226                CHECK_POSIX(  pthread_mutex_unlock( &old->mtx ));
    215                 CHECK_POSIX(  pthread_cond_signal(&old->cond)  );
    216                 sched_yield();
    217                 if (loops >= 10)
    218                         /* sleep for a few milliseconds */
    219                         usleep(50000);
     227                CHECK_POSIX(  pthread_cond_signal( &old->cond_pull )  );
     228                usleep(1000);
    220229               
    221230                CHECK_POSIX(  pthread_mutex_lock( &old->mtx )  );
    222                 ASSERT( ++loops < 20 ); /* detect infinite loops */
     231                ASSERT( loops < 20 ); /* detect infinite loops */
    223232        }
    224233       
     
    226235        fd_list_move_end( &new->list, &old->list );
    227236        if (old->count && (!new->count)) {
    228                 CHECK_POSIX(  pthread_cond_signal(&new->cond)  );
     237                CHECK_POSIX(  pthread_cond_signal(&new->cond_pull)  );
    229238        }
    230239        new->count += old->count;
     
    296305}
    297306
     307
     308/* This handler is called when a thread is blocked on a queue, and cancelled */
     309static void fifo_cleanup_push(void * queue)
     310{
     311        struct fifo * q = (struct fifo *)queue;
     312        TRACE_ENTRY( "%p", queue );
     313       
     314        /* The thread has been cancelled, therefore it does not wait on the queue anymore */
     315        q->thrs_push--;
     316       
     317        /* Now unlock the queue, and we're done */
     318        CHECK_POSIX_DO(  pthread_mutex_unlock( &q->mtx ),  /* nothing */  );
     319       
     320        /* End of cleanup handler */
     321        return;
     322}
     323
     324
    298325/* Post a new item in the queue */
    299326int fd_fifo_post_int ( struct fifo * queue, void ** item )
     
    307334        CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item );
    308335       
     336        /* lock the queue */
     337        CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
     338       
     339        if (queue->max) {
     340                while (queue->count >= queue->max) {
     341                        int ret = 0;
     342                       
     343                        /* We have to wait for an item to be pulled */
     344                        queue->thrs_push++ ;
     345                        pthread_cleanup_push( fifo_cleanup_push, queue);
     346                        ret = pthread_cond_wait( &queue->cond_push, &queue->mtx );
     347                        pthread_cleanup_pop(0);
     348                        queue->thrs_push-- ;
     349                       
     350                        ASSERT( ret == 0 );
     351                }
     352        }
     353       
    309354        /* Create a new list item */
    310         CHECK_MALLOC(  new = malloc (sizeof (struct fd_list))  );
     355        CHECK_MALLOC_DO(  new = malloc (sizeof (struct fd_list)) , {
     356                        pthread_mutex_unlock( &queue->mtx );
     357                } );
    311358       
    312359        fd_list_init(new, *item);
    313360        *item = NULL;
    314        
    315         /* lock the queue */
    316         CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
    317361       
    318362        /* Add the new item at the end */
     
    328372        /* Signal if threads are asleep */
    329373        if (queue->thrs > 0) {
    330                 CHECK_POSIX(  pthread_cond_signal(&queue->cond)  );
     374                CHECK_POSIX(  pthread_cond_signal(&queue->cond_pull)  );
     375        }
     376        if (queue->thrs_push > 0) {
     377                /* cascade */
     378                CHECK_POSIX(  pthread_cond_signal(&queue->cond_push)  );
    331379        }
    332380       
     
    355403        free(li);
    356404       
     405        if (queue->thrs_push) {
     406                CHECK_POSIX_DO( pthread_cond_signal( &queue->cond_push ), );
     407        }
     408       
    357409        return ret;
    358410}
     
    388440        /* Check queue status */
    389441        if (queue->count > 0) {
     442got_item:
    390443                /* There are elements in the queue, so pick the first one */
    391444                *item = mq_pop(queue);
    392445                call_cb = test_l_cb(queue);
    393446        } else {
     447                if (queue->thrs_push > 0) {
     448                        /* A thread is trying to push something, let's give it a chance */
     449                        CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
     450                        CHECK_POSIX(  pthread_cond_signal( &queue->cond_push )  );
     451                        usleep(1000);
     452                        CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
     453                        if (queue->count > 0)
     454                                goto got_item;
     455                }
     456               
    394457                wouldblock = 1;
    395458                *item = NULL;
     
    457520                pthread_cleanup_push( fifo_cleanup, queue);
    458521                if (istimed) {
    459                         ret = pthread_cond_timedwait( &queue->cond, &queue->mtx, abstime );
     522                        ret = pthread_cond_timedwait( &queue->cond_pull, &queue->mtx, abstime );
    460523                } else {
    461                         ret = pthread_cond_wait( &queue->cond, &queue->mtx );
     524                        ret = pthread_cond_wait( &queue->cond_pull, &queue->mtx );
    462525                }
    463526                pthread_cleanup_pop(0);
  • tests/testcnx.c

    r740 r767  
    14991499                /* fd_cnx_recv_setaltfifo */
    15001500                CHECK( 0, fd_cnx_send(client_side, cer_buf, cer_sz, 0));
    1501                 CHECK( 0, fd_fifo_new(&myfifo) );
     1501                CHECK( 0, fd_fifo_new(&myfifo, 0) );
    15021502                CHECK( 0, fd_cnx_recv_setaltfifo(server_side, myfifo) );
    15031503                CHECK( 0, clock_gettime(CLOCK_REALTIME, &now) );
     
    15911591                /* fd_cnx_recv_setaltfifo */
    15921592                CHECK( 0, fd_cnx_send(client_side, cer_buf, cer_sz, 0));
    1593                 CHECK( 0, fd_fifo_new(&myfifo) );
     1593                CHECK( 0, fd_fifo_new(&myfifo, 0) );
    15941594                CHECK( 0, fd_cnx_recv_setaltfifo(server_side, myfifo) );
    15951595                CHECK( 0, clock_gettime(CLOCK_REALTIME, &now) );
  • tests/testfifo.c

    r751 r767  
    111111}
    112112
     113/* The test function, to be threaded */
     114static int iter  = 0;
     115static void * test_fct2(void * data)
     116{
     117        int i;
     118        int * item;
     119        struct test_data * td = (struct test_data *) data;
     120       
     121        for (i=0; i< td->nbr; i++) {
     122                item = malloc(sizeof(int));
     123                CHECK( 1, item ? 1 : 0 );
     124                *item = i;
     125                CHECK( 0, fd_fifo_post(td->queue, &item) );
     126                iter++;
     127        }
     128       
     129        return NULL;
     130}
     131
    113132
    114133/* Main test routine */
     
    145164               
    146165                /* Create the queue */
    147                 CHECK( 0, fd_fifo_new(&queue) );
     166                CHECK( 0, fd_fifo_new(&queue, 0) );
    148167               
    149168                /* Check the count is 0 */
     
    233252               
    234253                /* Create the queue */
    235                 CHECK( 0, fd_fifo_new(&queue) );
     254                CHECK( 0, fd_fifo_new(&queue, 0) );
    236255               
    237256                /* Create the barrier */
     
    303322               
    304323                /* Create the queue */
    305                 CHECK( 0, fd_fifo_new(&queue) );
     324                CHECK( 0, fd_fifo_new(&queue, 0) );
    306325               
    307326                /* Create the barrier */
     
    370389               
    371390                /* Create the queue */
    372                 CHECK( 0, fd_fifo_new(&queue) );
     391                CHECK( 0, fd_fifo_new(&queue, 0) );
    373392               
    374393                /* Prepare the test data */
     
    442461        }
    443462       
     463        /* Test max queue limit */
     464        {
     465                struct fifo             *queue = NULL;
     466                struct test_data         td;
     467                pthread_t                th;
     468                int *                   item, i;
     469               
     470                /* Create the queue */
     471                CHECK( 0, fd_fifo_new(&queue, 10) );
     472               
     473                /* Initialize the test data structures */
     474                td.queue = queue;
     475                td.nbr = 15;
     476               
     477                CHECK( 0, pthread_create( &th, NULL, test_fct2, &td ) );
     478               
     479                usleep(1000); /* 1 millisec */
     480               
     481                CHECK( 10, iter );
     482               
     483                CHECK( 0, fd_fifo_tryget(queue, &item) );
     484                CHECK( 0, *item);
     485                free(item);
     486               
     487                usleep(1000); /* 1 millisec */
     488               
     489                CHECK( 11, iter );
     490               
     491                for (i=1; i<4; i++) {
     492                        CHECK( 0, fd_fifo_get(queue, &item) );
     493                        CHECK( i, *item);
     494                        free(item);
     495                }
     496               
     497                usleep(1000); /* 1 millisec */
     498               
     499                CHECK( 14, iter );
     500               
     501                /* fd_fifo_dump(0, "test", queue, NULL); */
     502               
     503                for (; i < td.nbr; i++) {
     504                        CHECK( 0, fd_fifo_tryget(queue, &item) );
     505                        CHECK( i, *item);
     506                        free(item);
     507                }
     508               
     509                CHECK( 0, pthread_join( th, NULL ) );
     510                CHECK( 15, iter );
     511               
     512        }
     513       
    444514        /* Delete the messages */
    445515        CHECK( 0, fd_msg_free( msg1 ) );
Note: See TracChangeset for help on using the changeset viewer.