Navigation



Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libfreeDiameter/fifo.c

    r14 r25  
    136136}
    137137
    138 /* Delete a queue. It must be unused. */
     138/* Delete a queue. It must be empty. */
    139139int fd_fifo_del ( struct fifo  ** queue )
    140140{
    141141        struct fifo * q;
     142        int loops = 0;
    142143       
    143144        TRACE_ENTRY( "%p", queue );
     
    149150        CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  );
    150151       
    151         if ((q->count != 0) || (q->thrs != 0) || (q->data != NULL)) {
    152                 TRACE_DEBUG(INFO, "The queue cannot be destroyed (%d, %d, %p)", q->count, q->thrs, q->data);
     152        if ((q->count != 0) || (q->data != NULL)) {
     153                TRACE_DEBUG(INFO, "The queue cannot be destroyed (%d, %p)", q->count, q->data);
    153154                CHECK_POSIX_DO(  pthread_mutex_unlock( &q->mtx ), /* no fallback */  );
    154155                return EINVAL;
    155156        }
    156157       
     158        /* Ok, now invalidate the queue */
     159        q->eyec = 0xdead;
     160       
     161        while (q->thrs) {
     162                CHECK_POSIX(  pthread_cond_signal(&q->cond)  );
     163                CHECK_POSIX(  pthread_mutex_unlock( &q->mtx ));
     164                pthread_yield();
     165                CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  );
     166                ASSERT( ++loops < 10 ); /* detect infinite loops */
     167        }
     168       
    157169        /* sanity check */
    158170        ASSERT(FD_IS_LIST_EMPTY(&q->list));
    159171       
    160         /* Ok, now invalidate the queue */
    161         q->eyec = 0xdead;
    162        
    163172        /* And destroy it */
    164173        CHECK_POSIX(  pthread_mutex_unlock( &q->mtx )  );
     
    170179        free(q);
    171180        *queue = NULL;
     181       
     182        return 0;
     183}
     184
     185/* Move the content of old into new, and update loc_update atomically */
     186int fd_fifo_move ( struct fifo ** old, struct fifo * new, struct fifo ** loc_update )
     187{
     188        struct fifo * q;
     189        int loops = 0;
     190       
     191        TRACE_ENTRY("%p %p %p", old, new, loc_update);
     192        CHECK_PARAMS( old && CHECK_FIFO( *old ) && CHECK_FIFO( new ));
     193       
     194        q = *old;
     195        CHECK_PARAMS( ! q->data );
     196        if (new->high) {
     197                TODO("Implement support for thresholds in fd_fifo_move...");
     198        }
     199       
     200        /* Update loc_update */
     201        *old = NULL;
     202        if (loc_update)
     203                *loc_update = new;
     204       
     205        /* Lock the queues */
     206        CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  );
     207        CHECK_POSIX(  pthread_mutex_lock( &new->mtx )  );
     208       
     209        /* Any waiting thread on the old queue returns an error */
     210        q->eyec = 0xdead;
     211        while (q->thrs) {
     212                CHECK_POSIX(  pthread_cond_signal(&q->cond)  );
     213                CHECK_POSIX(  pthread_mutex_unlock( &q->mtx ));
     214                pthread_yield();
     215                CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  );
     216                ASSERT( ++loops < 10 ); /* detect infinite loops */
     217        }
     218       
     219        /* Move all data from old to new */
     220        fd_list_move_end( &new->list, &q->list );
     221        if (q->count && (!new->count)) {
     222                CHECK_POSIX(  pthread_cond_signal(&new->cond)  );
     223        }
     224        new->count += q->count;
     225       
     226        /* Destroy old */
     227        CHECK_POSIX(  pthread_mutex_unlock( &q->mtx )  );
     228        CHECK_POSIX(  pthread_cond_destroy( &q->cond )  );
     229        CHECK_POSIX(  pthread_mutex_destroy( &q->mtx )  );
     230        free(q);
     231       
     232        /* Unlock new, we're done */
     233        CHECK_POSIX(  pthread_mutex_unlock( &new->mtx )  );
    172234       
    173235        return 0;
     
    379441awaken:
    380442        /* Check queue status */
     443        if (!CHECK_FIFO( queue )) {
     444                /* The queue is being destroyed */
     445                CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
     446                TRACE_DEBUG(FULL, "The queue is being destroyed -> EPIPE");
     447                return EPIPE;
     448        }
     449               
    381450        if (queue->count > 0) {
    382451                /* There are items in the queue, so pick the first one */
Note: See TracChangeset for help on using the changeset viewer.