Navigation


Changeset 1377:ce257e43085d in freeDiameter for libfdproto


Ignore:
Timestamp:
Jun 20, 2019, 7:01:50 PM (5 years ago)
Author:
Thomas Klausner <tk@giga.or.at>
Branch:
default
Phase:
public
Message:

fd_fifo_del: check if queue is already (being) destroyed and return success in that case.

Improves shutdown behaviour.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libfdproto/fifo.c

    r1188 r1377  
    5151struct fifo {
    5252        int             eyec;   /* An eye catcher, also used to check a queue is valid. FIFO_EYEC */
    53        
     53
    5454        pthread_mutex_t mtx;    /* Mutex protecting this queue */
    5555        pthread_cond_t  cond_pull;      /* condition variable for pulling threads */
    5656        pthread_cond_t  cond_push;      /* condition variable for pushing threads */
    57        
     57
    5858        struct fd_list  list;   /* sentinel for the list of elements */
    5959        int             count;  /* number of objects in the list */
    6060        int             thrs;   /* number of threads waiting for a new element (when count is 0) */
    61        
     61
    6262        int             max;    /* maximum number of items to accept if not 0 */
    6363        int             thrs_push; /* number of threads waitnig to push an item */
    64        
     64
    6565        uint16_t        high;   /* High level threshold (see libfreeDiameter.h for details) */
    6666        uint16_t        low;    /* Low level threshhold */
     
    7070        int             highest;/* The highest count value for which h_cb has been called */
    7171        int             highest_ever; /* The max count value this queue has reached (for tweaking) */
    72        
     72
    7373        long long       total_items;   /* Cumulated number of items that went through this fifo (excluding current count), always increasing. */
    7474        struct timespec total_time;    /* Cumulated time all items spent in this queue, including blocking time (always growing, use deltas for monitoring) */
    7575        struct timespec blocking_time; /* Cumulated time threads trying to post new items were blocked (queue full). */
    7676        struct timespec last_time;     /* For the last element retrieved from the queue, how long it take between posting (including blocking) and poping */
    77        
     77
    7878};
    7979
     
    9494{
    9595        struct fifo * new;
    96        
     96
    9797        TRACE_ENTRY( "%p", queue );
    98        
     98
    9999        CHECK_PARAMS( queue );
    100        
     100
    101101        /* Create a new object */
    102102        CHECK_MALLOC( new = malloc (sizeof (struct fifo) )  );
    103        
     103
    104104        /* Initialize the content */
    105105        memset(new, 0, sizeof(struct fifo));
    106        
     106
    107107        new->eyec = FIFO_EYEC;
    108108        CHECK_POSIX( pthread_mutex_init(&new->mtx, NULL) );
     
    110110        CHECK_POSIX( pthread_cond_init(&new->cond_push, NULL) );
    111111        new->max = max;
    112        
     112
    113113        fd_list_init(&new->list, NULL);
    114        
     114
    115115        /* We're done */
    116116        *queue = new;
     
    122122{
    123123        FD_DUMP_HANDLE_OFFSET();
    124        
     124
    125125        if (name) {
    126                 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "'%s'(@%p): ", name, queue), return NULL); 
     126                CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "'%s'(@%p): ", name, queue), return NULL);
    127127        } else {
    128128                CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "{fifo}(@%p): ", queue), return NULL);
    129129        }
    130        
     130
    131131        if (!CHECK_FIFO( queue )) {
    132132                return fd_dump_extend(FD_DUMP_STD_PARAMS, "INVALID/NULL");
    133133        }
    134        
     134
    135135        CHECK_POSIX_DO(  pthread_mutex_lock( &queue->mtx ), /* continue */  );
    136         CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "items:%d,%d,%d threads:%d,%d stats:%lld/%ld.%06ld,%ld.%06ld,%ld.%06ld thresholds:%d,%d,%d,%p,%p,%p", 
     136        CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "items:%d,%d,%d threads:%d,%d stats:%lld/%ld.%06ld,%ld.%06ld,%ld.%06ld thresholds:%d,%d,%d,%p,%p,%p",
    137137                                                queue->count, queue->highest_ever, queue->max,
    138138                                                queue->thrs, queue->thrs_push,
    139139                                                queue->total_items,(long)queue->total_time.tv_sec,(long)(queue->total_time.tv_nsec/1000),(long)queue->blocking_time.tv_sec,(long)(queue->blocking_time.tv_nsec/1000),(long)queue->last_time.tv_sec,(long)(queue->last_time.tv_nsec/1000),
    140                                                 queue->high, queue->low, queue->highest, queue->h_cb, queue->l_cb, queue->data), 
     140                                                queue->high, queue->low, queue->highest, queue->h_cb, queue->l_cb, queue->data),
    141141                         goto error);
    142        
     142
    143143        if (dump_item) {
    144144                struct fd_list * li;
     
    146146                for (li = queue->list.next; li != &queue->list; li = li->next) {
    147147                        struct fifo_item * fi = (struct fifo_item *)li;
    148                         CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n [#%i](@%p)@%ld.%06ld: ", 
    149                                                 i++, fi->item.o, (long)fi->posted_on.tv_sec,(long)(fi->posted_on.tv_nsec/1000)), 
     148                        CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n [#%i](@%p)@%ld.%06ld: ",
     149                                                i++, fi->item.o, (long)fi->posted_on.tv_sec,(long)(fi->posted_on.tv_nsec/1000)),
    150150                                         goto error);
    151151                        CHECK_MALLOC_DO( (*dump_item)(FD_DUMP_STD_PARAMS, fi->item.o), goto error);
     
    153153        }
    154154        CHECK_POSIX_DO(  pthread_mutex_unlock( &queue->mtx ), /* continue */  );
    155        
     155
    156156        return *buf;
    157157error:
     
    160160}
    161161
    162 /* Delete a queue. It must be empty. */ 
     162/* Delete a queue. It must be empty. */
    163163int fd_fifo_del ( struct fifo  ** queue )
    164164{
    165165        struct fifo * q;
    166166        int loops = 0;
    167        
     167
    168168        TRACE_ENTRY( "%p", queue );
    169169
     170        if (queue && *queue == NULL) {
     171                /* Queue already (in the process of being) deleted */
     172                return 0;
     173        }
     174
    170175        CHECK_PARAMS( queue && CHECK_FIFO( *queue ) );
    171        
     176
    172177        q = *queue;
    173        
     178
    174179        CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  );
    175        
     180
    176181        if ((q->count != 0) || (q->data != NULL)) {
    177182                TRACE_DEBUG(INFO, "The queue cannot be destroyed (%d, %p)", q->count, q->data);
     
    179184                return EINVAL;
    180185        }
    181        
     186
    182187        /* Ok, now invalidate the queue */
    183188        q->eyec = 0xdead;
    184        
     189
    185190        /* Have all waiting threads return an error */
    186191        while (q->thrs) {
     
    188193                CHECK_POSIX(  pthread_cond_signal(&q->cond_pull)  );
    189194                usleep(1000);
    190                
     195
    191196                CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  );
    192197                ASSERT( ++loops < 20 ); /* detect infinite loops */
    193198        }
    194        
     199
    195200        /* sanity check */
    196201        ASSERT(FD_IS_LIST_EMPTY(&q->list));
    197        
     202
    198203        /* And destroy it */
    199204        CHECK_POSIX(  pthread_mutex_unlock( &q->mtx )  );
    200        
     205
    201206        CHECK_POSIX_DO(  pthread_cond_destroy( &q->cond_pull ),  );
    202        
     207
    203208        CHECK_POSIX_DO(  pthread_cond_destroy( &q->cond_push ),  );
    204        
     209
    205210        CHECK_POSIX_DO(  pthread_mutex_destroy( &q->mtx ),  );
    206        
     211
    207212        free(q);
    208213        *queue = NULL;
    209        
     214
    210215        return 0;
    211216}
     
    215220{
    216221        int loops = 0;
    217        
     222
    218223        TRACE_ENTRY("%p %p %p", old, new, loc_update);
    219224        CHECK_PARAMS( CHECK_FIFO( old ) && CHECK_FIFO( new ));
    220        
     225
    221226        CHECK_PARAMS( ! old->data );
    222227        if (new->high) {
    223228                TODO("Implement support for thresholds in fd_fifo_move...");
    224229        }
    225        
     230
    226231        /* Update loc_update */
    227232        if (loc_update)
    228233                *loc_update = new;
    229        
     234
    230235        /* Lock the queues */
    231236        CHECK_POSIX(  pthread_mutex_lock( &old->mtx )  );
    232        
     237
    233238        CHECK_PARAMS_DO( (! old->thrs_push), {
    234239                        pthread_mutex_unlock( &old->mtx );
    235240                        return EINVAL;
    236241                } );
    237        
     242
    238243        CHECK_POSIX(  pthread_mutex_lock( &new->mtx )  );
    239        
     244
    240245        /* Any waiting thread on the old queue returns an error */
    241246        old->eyec = 0xdead;
     
    244249                CHECK_POSIX(  pthread_cond_signal( &old->cond_pull )  );
    245250                usleep(1000);
    246                
     251
    247252                CHECK_POSIX(  pthread_mutex_lock( &old->mtx )  );
    248253                ASSERT( loops < 20 ); /* detect infinite loops */
    249254        }
    250        
     255
    251256        /* Move all data from old to new */
    252257        fd_list_move_end( &new->list, &old->list );
     
    255260        }
    256261        new->count += old->count;
    257        
     262
    258263        /* Reset old */
    259264        old->count = 0;
    260265        old->eyec = FIFO_EYEC;
    261        
     266
    262267        /* Merge the stats in the new queue */
    263268        new->total_items += old->total_items;
    264269        old->total_items = 0;
    265        
     270
    266271        new->total_time.tv_nsec += old->total_time.tv_nsec;
    267272        new->total_time.tv_sec += old->total_time.tv_sec + (new->total_time.tv_nsec / 1000000000);
     
    269274        old->total_time.tv_nsec = 0;
    270275        old->total_time.tv_sec = 0;
    271        
     276
    272277        new->blocking_time.tv_nsec += old->blocking_time.tv_nsec;
    273278        new->blocking_time.tv_sec += old->blocking_time.tv_sec + (new->blocking_time.tv_nsec / 1000000000);
     
    275280        old->blocking_time.tv_nsec = 0;
    276281        old->blocking_time.tv_sec = 0;
    277        
     282
    278283        /* Unlock, we're done */
    279284        CHECK_POSIX(  pthread_mutex_unlock( &new->mtx )  );
    280285        CHECK_POSIX(  pthread_mutex_unlock( &old->mtx )  );
    281        
     286
    282287        return 0;
    283288}
    284289
    285290/* Get the information on the queue */
    286 int fd_fifo_getstats( struct fifo * queue, int * current_count, int * limit_count, int * highest_count, long long * total_count, 
     291int fd_fifo_getstats( struct fifo * queue, int * current_count, int * limit_count, int * highest_count, long long * total_count,
    287292                                           struct timespec * total, struct timespec * blocking, struct timespec * last)
    288293{
    289294        TRACE_ENTRY( "%p %p %p %p %p %p %p %p", queue, current_count, limit_count, highest_count, total_count, total, blocking, last);
    290        
     295
    291296        /* Check the parameters */
    292297        CHECK_PARAMS( CHECK_FIFO( queue ) );
    293        
     298
    294299        /* lock the queue */
    295300        CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
    296        
     301
    297302        if (current_count)
    298303                *current_count = queue->count;
    299        
     304
    300305        if (limit_count)
    301306                *limit_count = queue->max;
    302        
     307
    303308        if (highest_count)
    304309                *highest_count = queue->highest_ever;
    305        
     310
    306311        if (total_count)
    307312                *total_count = queue->total_items;
    308        
     313
    309314        if (total)
    310315                memcpy(total, &queue->total_time, sizeof(struct timespec));
    311        
     316
    312317        if (blocking)
    313318                memcpy(blocking, &queue->blocking_time, sizeof(struct timespec));
    314        
     319
    315320        if (last)
    316321                memcpy(last, &queue->last_time, sizeof(struct timespec));
    317        
     322
    318323        /* Unlock */
    319324        CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
    320        
     325
    321326        /* Done */
    322327        return 0;
     
    329334        if ( !CHECK_FIFO( queue ) )
    330335                return 0;
    331        
     336
    332337        return queue->count; /* Let's hope it's read atomically, since we are not locking... */
    333338}
     
    337342{
    338343        TRACE_ENTRY( "%p %p %hu %p %hu %p", queue, data, high, h_cb, low, l_cb );
    339        
     344
    340345        /* Check the parameters */
    341346        CHECK_PARAMS( CHECK_FIFO( queue ) && (high > low) && (queue->data == NULL) );
    342        
     347
    343348        /* lock the queue */
    344349        CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
    345        
     350
    346351        /* Save the values */
    347352        queue->high = high;
     
    350355        queue->h_cb = h_cb;
    351356        queue->l_cb = l_cb;
    352        
     357
    353358        /* Unlock */
    354359        CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
    355        
     360
    356361        /* Done */
    357362        return 0;
     
    364369        struct fifo * q = (struct fifo *)queue;
    365370        TRACE_ENTRY( "%p", queue );
    366        
     371
    367372        /* The thread has been cancelled, therefore it does not wait on the queue anymore */
    368373        q->thrs_push--;
    369        
     374
    370375        /* Now unlock the queue, and we're done */
    371376        CHECK_POSIX_DO(  pthread_mutex_unlock( &q->mtx ),  /* nothing */  );
    372        
     377
    373378        /* End of cleanup handler */
    374379        return;
     
    382387        int call_cb = 0;
    383388        struct timespec posted_on, queued_on;
    384        
     389
    385390        /* Get the timing of this call */
    386391        CHECK_SYS(  clock_gettime(CLOCK_REALTIME, &posted_on)  );
    387        
     392
    388393        /* lock the queue */
    389394        CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
    390        
     395
    391396        if ((!skip_max) && (queue->max)) {
    392397                while (queue->count >= queue->max) {
    393398                        int ret = 0;
    394                        
     399
    395400                        /* We have to wait for an item to be pulled */
    396401                        queue->thrs_push++ ;
     
    399404                        pthread_cleanup_pop(0);
    400405                        queue->thrs_push-- ;
    401                        
     406
    402407                        ASSERT( ret == 0 );
    403408                }
    404409        }
    405        
     410
    406411        /* Create a new list item */
    407412        CHECK_MALLOC_DO(  new = malloc (sizeof (struct fifo_item)) , {
     
    409414                        return ENOMEM;
    410415                } );
    411        
     416
    412417        fd_list_init(&new->item, *item);
    413418        *item = NULL;
    414        
     419
    415420        /* Add the new item at the end */
    416421        fd_list_insert_before( &queue->list, &new->item);
     
    422427                queue->highest = queue->count;
    423428        }
    424        
     429
    425430        /* store timing */
    426431        memcpy(&new->posted_on, &posted_on, sizeof(struct timespec));
    427        
     432
    428433        /* update queue timing info "blocking time" */
    429434        {
     
    436441                queue->blocking_time.tv_nsec = blocked_ns % 1000000000;
    437442        }
    438        
     443
    439444        /* Signal if threads are asleep */
    440445        if (queue->thrs > 0) {
     
    445450                CHECK_POSIX(  pthread_cond_signal(&queue->cond_push)  );
    446451        }
    447        
     452
    448453        /* Unlock */
    449454        CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
    450        
     455
    451456        /* Call high-watermark cb as needed */
    452457        if (call_cb && queue->h_cb)
    453458                (*queue->h_cb)(queue, &queue->data);
    454        
     459
    455460        /* Done */
    456461        return 0;
     
    461466{
    462467        TRACE_ENTRY( "%p %p", queue, item );
    463        
     468
    464469        /* Check the parameters */
    465470        CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item );
    466        
     471
    467472        return fd_fifo_post_internal ( queue,item, 0 );
    468        
     473
    469474}
    470475
     
    473478{
    474479        TRACE_ENTRY( "%p %p", queue, item );
    475        
     480
    476481        /* Check the parameters */
    477482        CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item );
    478        
     483
    479484        return fd_fifo_post_internal ( queue,item, 1 );
    480        
     485
    481486}
    482487
     
    487492        struct fifo_item * fi;
    488493        struct timespec now;
    489        
     494
    490495        ASSERT( ! FD_IS_LIST_EMPTY(&queue->list) );
    491        
     496
    492497        fi = (struct fifo_item *)(queue->list.next);
    493498        ret = fi->item.o;
     
    495500        queue->count--;
    496501        queue->total_items++;
    497        
     502
    498503        /* Update the timings */
    499504        CHECK_SYS_DO(  clock_gettime(CLOCK_REALTIME, &now), goto skip_timing  );
     
    501506                long long elapsed = (now.tv_sec - fi->posted_on.tv_sec) * 1000000000;
    502507                elapsed += now.tv_nsec - fi->posted_on.tv_nsec;
    503                
     508
    504509                queue->last_time.tv_sec = elapsed / 1000000000;
    505510                queue->last_time.tv_nsec = elapsed % 1000000000;
    506                
     511
    507512                elapsed += queue->total_time.tv_nsec;
    508513                queue->total_time.tv_sec += elapsed / 1000000000;
    509514                queue->total_time.tv_nsec = elapsed % 1000000000;
    510515        }
    511 skip_timing:   
     516skip_timing:
    512517        free(fi);
    513        
     518
    514519        if (queue->thrs_push) {
    515520                CHECK_POSIX_DO( pthread_cond_signal( &queue->cond_push ), );
    516521        }
    517        
     522
    518523        return ret;
    519524}
     
    524529        if ((queue->high == 0) || (queue->low == 0) || (queue->l_cb == 0))
    525530                return 0;
    526        
     531
    527532        if (((queue->count % queue->high) == queue->low) && (queue->highest > queue->count)) {
    528533                queue->highest -= queue->high;
    529534                return 1;
    530535        }
    531        
     536
    532537        return 0;
    533538}
     
    538543        int wouldblock = 0;
    539544        int call_cb = 0;
    540        
     545
    541546        TRACE_ENTRY( "%p %p", queue, item );
    542        
     547
    543548        /* Check the parameters */
    544549        CHECK_PARAMS( CHECK_FIFO( queue ) && item );
    545        
     550
    546551        /* lock the queue */
    547552        CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
    548        
     553
    549554        /* Check queue status */
    550555        if (queue->count > 0) {
     
    563568                                goto got_item;
    564569                }
    565                
     570
    566571                wouldblock = 1;
    567572                *item = NULL;
    568573        }
    569                
     574
    570575        /* Unlock */
    571576        CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
    572        
     577
    573578        /* Call low watermark callback as needed */
    574579        if (call_cb)
    575580                (*queue->l_cb)(queue, &queue->data);
    576        
     581
    577582        /* Done */
    578583        return wouldblock ? EWOULDBLOCK : 0;
     
    584589        struct fifo * q = (struct fifo *)queue;
    585590        TRACE_ENTRY( "%p", queue );
    586        
     591
    587592        /* The thread has been cancelled, therefore it does not wait on the queue anymore */
    588593        q->thrs--;
    589        
     594
    590595        /* Now unlock the queue, and we're done */
    591596        CHECK_POSIX_DO(  pthread_mutex_unlock( &q->mtx ),  /* nothing */  );
    592        
     597
    593598        /* End of cleanup handler */
    594599        return;
     
    600605        int call_cb = 0;
    601606        int ret = 0;
    602        
     607
    603608        /* Check the parameters */
    604609        CHECK_PARAMS( CHECK_FIFO( queue ) && item && (abstime || !istimed) );
    605        
     610
    606611        /* Initialize the return value */
    607612        *item = NULL;
    608        
     613
    609614        /* lock the queue */
    610615        CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
    611        
     616
    612617awaken:
    613618        /* Check queue status */
     
    618623                return EPIPE;
    619624        }
    620        
     625
    621626        if (queue->count > 0) {
    622627                /* There are items in the queue, so pick the first one */
     
    636641                if (ret == 0)
    637642                        goto awaken;  /* test for spurious wake-ups */
    638                
     643
    639644                /* otherwise (ETIMEDOUT / other error) just continue */
    640645        }
    641        
     646
    642647        /* Unlock */
    643648        CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
    644        
     649
    645650        /* Call low watermark callback as needed */
    646651        if (call_cb)
    647652                (*queue->l_cb)(queue, &queue->data);
    648        
     653
    649654        /* Done */
    650655        return ret;
     
    670675        int ret = 0;
    671676        TRACE_ENTRY( "%p %p", queue, abstime );
    672        
     677
    673678        CHECK_PARAMS_DO( CHECK_FIFO( queue ), return -EINVAL );
    674        
     679
    675680        /* lock the queue */
    676681        CHECK_POSIX_DO(  pthread_mutex_lock( &queue->mtx ), return -__ret__  );
    677        
    678 awaken: 
     682
     683awaken:
    679684        ret = (queue->count > 0 ) ? queue->count : 0;
    680685        if ((ret == 0) && (abstime != NULL)) {
     
    687692                if (ret == 0)
    688693                        goto awaken;  /* test for spurious wake-ups */
    689                
     694
    690695                if (ret == ETIMEDOUT)
    691696                        ret = 0;
    692                 else 
     697                else
    693698                        ret = -ret;
    694699        }
    695        
     700
    696701        /* Unlock */
    697702        CHECK_POSIX_DO(  pthread_mutex_unlock( &queue->mtx ), return -__ret__  );
    698        
     703
    699704        return ret;
    700705}
Note: See TracChangeset for help on using the changeset viewer.