Navigation


Changeset 1104:757df62cadb6 in freeDiameter


Ignore:
Timestamp:
May 10, 2013, 7:49:19 PM (11 years ago)
Author:
Sebastien Decugis <sdecugis@freediameter.net>
Branch:
default
Parents:
1103:d8591b1c56cd (diff), 1059:a1d6e1980132 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Phase:
public
Message:

Merge

Files:
4 edited

Legend:

Unmodified
Added
Removed
  • libfdcore/p_psm.c

    r1058 r1104  
    241241                        case FDEVP_CNX_INCOMING: {
    242242                                struct cnx_incoming * evd = ev->data;
    243                                 fd_msg_log( FD_MSG_LOG_DROPPED, evd->cer, "Message discarded while cleaning peer state machine queue." );
     243                                //fd_msg_log( FD_MSG_LOG_DROPPED, evd->cer, "Message discarded while cleaning peer state machine queue." );
    244244                                CHECK_FCT_DO( fd_msg_free(evd->cer), /* continue */);
    245245                                fd_cnx_destroy(evd->cnx);
     
    451451                TRACE_DEBUG(INFO, "Invalid event received in PSM '%s' : %d", peer->p_hdr.info.pi_diamid, event);
    452452                ASSERT(0); /* we should investigate this situation */
    453                 goto psm_loop;
    454         }
    455 
    456         /* Handle the (easy) debug event now */
    457         if (event == FDEVP_DUMP_ALL) {
    458                 fd_peer_dump(peer, ANNOYING);
    459453                goto psm_loop;
    460454        }
     
    489483                struct msg * msg = NULL;
    490484                struct msg_hdr * hdr;
    491                 struct timespec rcvon;
    492                
    493                 /* Retrieve the piggytailed timestamp */
    494                 memcpy(&rcvon, ev_data+ev_sz, sizeof(struct timespec));
     485                struct fd_cnx_rcvdata rcv_data;
     486                struct fd_msg_pmdl * pmdl = NULL;
     487               
     488                rcv_data.buffer = ev_data;
     489                rcv_data.length = ev_sz;
     490                pmdl = fd_msg_pmdl_get_inbuf(rcv_data.buffer, rcv_data.length);
    495491               
    496492                /* Parse the received buffer */
    497493                CHECK_FCT_DO( fd_msg_parse_buffer( (void *)&ev_data, ev_sz, &msg),
    498494                        {
    499                                 fd_log_debug("Received invalid data from peer '%s', closing the connection", peer->p_hdr.info.pi_diamid);
     495                                fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, NULL, peer, &rcv_data, pmdl );
    500496                                free(ev_data);
    501497                                CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), goto psm_reset );
     
    503499                        } );
    504500                       
    505                 CHECK_FCT_DO( fd_msg_ts_set_recv(msg, &rcvon), /* ... */ );
    506                
     501                fd_hook_associate(msg, pmdl);
     502       
    507503                /* If the current state does not allow receiving messages, just drop it */
    508504                if (cur_state == STATE_CLOSED) {
    509505                        /* In such case, just discard the message */
    510                         fd_msg_log( FD_MSG_LOG_DROPPED, msg, "Purged from peer '%s''s queue (CLOSED state).", peer->p_hdr.info.pi_diamid );
     506                        fd_hook_call(HOOK_MESSAGE_DROPPED, msg, peer, "Message purged from queue, peer in CLOSED state", fd_msg_pmdl_get(msg));
    511507                        fd_msg_free(msg);
    512508                        goto psm_loop;
    513509                }
    514510               
    515                 /* Log incoming message */
    516                 fd_msg_log( FD_MSG_LOG_RECEIVED, msg, "Received %zdb from '%s' (%s)", ev_sz, peer->p_hdr.info.pi_diamid, STATE_STR(cur_state) );
    517        
    518511                /* Extract the header */
    519512                CHECK_FCT_DO( fd_msg_hdr(msg, &hdr), goto psm_end );
     
    525518                        CHECK_FCT_DO( fd_p_sr_fetch(&peer->p_sr, hdr->msg_hbhid, &req), goto psm_end );
    526519                        if (req == NULL) {
    527                                 fd_msg_log( FD_MSG_LOG_DROPPED, msg, "Answer received with no corresponding sent request." );
     520                                fd_hook_call(HOOK_MESSAGE_DROPPED, msg, peer, "Answer received with no corresponding sent request.", fd_msg_pmdl_get(msg));
    528521                                fd_msg_free(msg);
    529522                                goto psm_loop;
     
    533526                        CHECK_FCT_DO( fd_msg_answ_associate( msg, req ), goto psm_end );
    534527                       
    535                         /* Display the delay to receive the answer */
    536                         {
    537                                 struct timespec reqsent, delay;
    538                                 (void) fd_msg_ts_get_sent(req, &reqsent);
    539                                 TS_DIFFERENCE( &delay, &reqsent, &rcvon );
    540                                 fd_msg_log( FD_MSG_LOG_TIMING, msg, "Answer received in %ld.%6.6ld sec.", (long)delay.tv_sec, delay.tv_nsec / 1000 );
    541                         }
    542528                } else {
    543529                        /* Mark the incoming request so that we know we have pending answers for this peer */
     
    546532                        CHECK_POSIX_DO( pthread_mutex_unlock(&peer->p_state_mtx), goto psm_end  );
    547533                }
     534               
     535                /* Log incoming message */
     536                fd_hook_call(HOOK_MESSAGE_RECEIVED, msg, peer, NULL, fd_msg_pmdl_get(msg));
    548537               
    549538                if (cur_state == STATE_OPEN_NEW) {
     
    584573                                case STATE_WAITCEA:
    585574                                case STATE_CLOSED:
    586                                 default:
     575                                default: {
    587576                                        /* In such case, just discard the message */
    588                                         fd_msg_log( FD_MSG_LOG_DROPPED, msg, "Received from peer '%s' while connection was not in state %s.", peer->p_hdr.info.pi_diamid, STATE_STR(cur_state) );
     577                                        char buf[128];
     578                                        snprintf(buf, sizeof(buf), "Received while peer state machine was in state %s.", STATE_STR(cur_state));
     579                                        fd_hook_call(HOOK_MESSAGE_DROPPED, msg, peer, buf, fd_msg_pmdl_get(msg));
    589580                                        fd_msg_free(msg);
     581                                }
    590582                        }
    591583                        goto psm_loop;
     
    594586                /* Link-local message: They must be understood by our dictionary, otherwise we return an error */
    595587                {
    596                         int ret = fd_msg_parse_or_error( &msg );
     588                        struct msg * error = NULL;
     589                        int ret = fd_msg_parse_or_error( &msg, &error );
    597590                        if (ret != EBADMSG) {
    598                                 CHECK_FCT_DO( ret, goto psm_end );
     591                                CHECK_FCT_DO( ret,
     592                                        {
     593                                                LOG_E("%s: An unexpected error occurred while parsing a link-local message", peer->p_hdr.info.pi_diamid);
     594                                                fd_msg_free(msg);
     595                                                goto psm_end;
     596                                        } );
    599597                        } else {
    600                                 if (msg) {
     598                                if (msg == NULL) {
    601599                                        /* Send the error back to the peer */
    602                                         CHECK_FCT_DO( ret = fd_out_send(&msg, NULL, peer, FD_CNX_ORDERED),  );
    603                                         if (msg) {
     600                                        CHECK_FCT_DO( ret = fd_out_send(&error, NULL, peer, FD_CNX_ORDERED),  );
     601                                        if (error) {
    604602                                                /* Only if an error occurred & the message was not saved / dumped */
    605                                                 fd_msg_log( FD_MSG_LOG_DROPPED, msg, "Internal error: Problem while sending (%s)", strerror(ret) );
    606                                                 CHECK_FCT_DO( fd_msg_free(msg), goto psm_end);
     603                                                LOG_E("%s: error sending a message", peer->p_hdr.info.pi_diamid);
     604                                                CHECK_FCT_DO( fd_msg_free(error), goto psm_end);
    607605                                        }
    608606                                } else {
    609607                                        /* We received an invalid answer, let's disconnect */
     608                                        LOG_E("%s: Received invalid answer to Base protocol message, disconnecting...", peer->p_hdr.info.pi_diamid);
     609                                        CHECK_FCT_DO( fd_msg_free(msg), goto psm_end);
    610610                                        CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), goto psm_reset );
    611611                                }
     
    617617                switch (hdr->msg_code) {
    618618                        case CC_CAPABILITIES_EXCHANGE:
    619                                 CHECK_FCT_DO( fd_p_ce_msgrcv(&msg, (hdr->msg_flags & CMD_FLAG_REQUEST), peer), goto psm_reset );
     619                                CHECK_FCT_DO( fd_p_ce_msgrcv(&msg, (hdr->msg_flags & CMD_FLAG_REQUEST), peer),
     620                                        {
     621                                                if (msg)
     622                                                        CHECK_FCT_DO( fd_msg_free(msg), );
     623                                                goto psm_reset;
     624                                        } );
    620625                                break;
    621626                       
     
    652657                                /* Cleanup the message if not done */
    653658                                if (msg) {
    654                                         fd_msg_log( FD_MSG_LOG_DROPPED, msg, "Received un-handled non-routable command from peer '%s'.", peer->p_hdr.info.pi_diamid );
     659                                        //fd_msg_log( FD_MSG_LOG_DROPPED, msg, "Received un-handled non-routable command from peer '%s'.", peer->p_hdr.info.pi_diamid );
    655660                                        CHECK_FCT_DO( fd_msg_free(msg), /* continue */);
    656661                                        msg = NULL;
     
    660665                /* At this point the message must have been fully handled already */
    661666                if (msg) {
    662                         fd_msg_log( FD_MSG_LOG_DROPPED, msg, "Internal error ('%s'): unhandled message.", peer->p_hdr.info.pi_diamid );
     667                        //fd_msg_log( FD_MSG_LOG_DROPPED, msg, "Internal error ('%s'): unhandled message.", peer->p_hdr.info.pi_diamid );
    663668                        fd_msg_free(msg);
    664669                }
     
    686691                                /* Mark the connection problem */
    687692                                peer->p_flags.pf_cnx_pb = 1;
     693                       
     694                                fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, peer, "The connection was broken", NULL);
    688695                               
    689696                                /* Destroy the connection, restart the timer to a new connection attempt */
     
    721728                CHECK_FCT_DO( fd_cnx_getremoteeps(peer->p_cnxctx, &peer->p_hdr.info.pi_endpoints), /* ignore the error */);
    722729               
    723                 /* We do not support local endpoints change currently, but it could be added here if needed (refresh fd_g_config->cnf_endpoints)*/
    724                
    725                 if (TRACE_BOOL(ANNOYING)) {
    726                         TRACE_DEBUG(ANNOYING, "New remote endpoint(s):" );
    727                         fd_ep_dump(6, &peer->p_hdr.info.pi_endpoints);
     730                /* We do not support local endpoints change currently, but it could be added here if needed (refresh fd_g_config->cnf_endpoints) */
     731                {
     732                        char * buf = NULL;
     733                        size_t len = 0;
     734                        LOG_D("New remote endpoint(s): %s",  fd_ep_dump(&buf, &len, NULL, 6, &peer->p_hdr.info.pi_endpoints) ?: "error");
     735                        free(buf);
    728736                }
    729737               
     
    746754                }
    747755                if (params->cer) {
    748                         fd_msg_log( FD_MSG_LOG_DROPPED, params->cer, "Internal error: this CER was not handled as expected." );
    749756                        CHECK_FCT_DO( fd_msg_free(params->cer), );
    750757                        params->cer = NULL;
     
    767774                        case STATE_WAITCNXACK_ELEC:
    768775                        case STATE_WAITCNXACK:
     776                                LOG_D("%s: Connection established", peer->p_hdr.info.pi_diamid);
    769777                                fd_p_ce_handle_newcnx(peer, cnx);
    770778                                break;
     
    795803                               
    796804                        case STATE_WAITCNXACK:
     805                                LOG_D("%s: Connection attempt failed", peer->p_hdr.info.pi_diamid);
    797806                                /* Go back to CLOSE */
    798807                                fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_tctimer ?: fd_g_config->cnf_timer_tc);
     
    817826                               
    818827                        case STATE_CLOSED:
     828                                LOG_D("%s: Connecting...", peer->p_hdr.info.pi_diamid);
    819829                                CHECK_FCT_DO( fd_psm_change_state(peer, STATE_WAITCNXACK), goto psm_end );
    820830                                fd_psm_next_timeout(peer, 0, CNX_TIMEOUT);
     
    825835                                /* Mark the connection problem */
    826836                                peer->p_flags.pf_cnx_pb = 1;
    827                         case STATE_CLOSING:
    828837                        case STATE_WAITCNXACK:
    829838                        case STATE_WAITCEA:
     839                                fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, peer, "Timeout while waiting for remote peer", NULL);
     840                        case STATE_CLOSING:
    830841                                /* Destroy the connection, restart the timer to a new connection attempt */
    831842                                fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_tctimer ?: fd_g_config->cnf_timer_tc);
     
    863874       
    864875psm_end:
     876        LOG_N("%s: Going to ZOMBIE state (no more activity)", peer->p_hdr.info.pi_diamid);
    865877        fd_psm_cleanup(peer, 1);
    866878        TRACE_DEBUG(INFO, "'%s'\t-> STATE_ZOMBIE (terminated)\t'%s'",
  • libfdcore/p_psm.c

    r1103 r1104  
    336336                peer->p_psm_timer.tv_sec += random() % 4;
    337337                peer->p_psm_timer.tv_nsec+= random() % 1000000000L;
    338                 if (peer->p_psm_timer.tv_nsec > 1000000000L) {
     338                if (peer->p_psm_timer.tv_nsec >= 1000000000L) {
    339339                        peer->p_psm_timer.tv_nsec -= 1000000000L;
    340340                        peer->p_psm_timer.tv_sec ++;
  • libfdproto/fifo.c

    r1059 r1104  
    7070        int             highest;/* The highest count value for which h_cb has been called */
    7171        int             highest_ever; /* The max count value this queue has reached (for tweaking) */
     72       
     73        long long       total_items;   /* Cumulated number of items that went through this fifo (excluding current count), always increasing. */
     74        struct timespec total_time;    /* Cumulated time all items spent in this queue, including blocking time (always growing, use deltas for monitoring) */
     75        struct timespec blocking_time; /* Cumulated time threads trying to post new items were blocked (queue full). */
     76        struct timespec last_time;     /* For the last element retrieved from the queue, how long it take between posting (including blocking) and poping */
     77       
     78};
     79
     80struct fifo_item {
     81        struct fd_list   item;
     82        struct timespec  posted_on;
    7283};
    7384
     
    108119
    109120/* Dump the content of a queue */
    110 void fd_fifo_dump(int level, char * name, struct fifo * queue, void (*dump_item)(int level, void * item))
    111 {
    112         TRACE_ENTRY("%i %p %p %p", level, name, queue, dump_item);
    113        
    114         if (!TRACE_BOOL(level))
    115                 return;
    116        
    117         fd_log_debug("Dumping queue '%s' (%p):", name ?: "?", queue);
     121DECLARE_FD_DUMP_PROTOTYPE(fd_fifo_dump, char * name, struct fifo * queue, fd_fifo_dump_item_cb dump_item)
     122{
     123        FD_DUMP_HANDLE_OFFSET();
     124       
     125        if (name) {
     126                CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "'%s'(@%p): ", name, queue), return NULL);
     127        } else {
     128                CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "{fifo}(@%p): ", queue), return NULL);
     129        }
     130       
    118131        if (!CHECK_FIFO( queue )) {
    119                 fd_log_debug("  Queue invalid!");
    120                 if (queue)
    121                         fd_log_debug("  (%x != %x)", queue->eyec, FIFO_EYEC);
    122                 return;
     132                return fd_dump_extend(FD_DUMP_STD_PARAMS, "INVALID/NULL");
    123133        }
    124134       
    125135        CHECK_POSIX_DO(  pthread_mutex_lock( &queue->mtx ), /* continue */  );
    126         fd_log_debug("   %d elements in queue / %d threads waiting", queue->count, queue->thrs);
    127         fd_log_debug("   %d elements max / %d threads waiting to push", queue->max, queue->thrs_push);
    128         fd_log_debug("   thresholds: %d / %d (h:%d), cb: %p,%p (%p), highest: %d",
    129                         queue->high, queue->low, queue->highest,
    130                         queue->h_cb, queue->l_cb, queue->data,
    131                         queue->highest_ever);
     136        CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "items:%d,%d,%d threads:%d,%d stats:%lld/%ld.%06ld,%ld.%06ld,%ld.%06ld thresholds:%d,%d,%d,%p,%p,%p",
     137                                                queue->count, queue->highest_ever, queue->max,
     138                                                queue->thrs, queue->thrs_push,
     139                                                queue->total_items,(long)queue->total_time.tv_sec,(long)(queue->total_time.tv_nsec/1000),(long)queue->blocking_time.tv_sec,(long)(queue->blocking_time.tv_nsec/1000),(long)queue->last_time.tv_sec,(long)(queue->last_time.tv_nsec/1000),
     140                                                queue->high, queue->low, queue->highest, queue->h_cb, queue->l_cb, queue->data),
     141                         goto error);
    132142       
    133143        if (dump_item) {
     
    135145                int i = 0;
    136146                for (li = queue->list.next; li != &queue->list; li = li->next) {
    137                         fd_log_debug("  [%i] item %p in fifo %p:", i++, li->o, queue);
    138                         (*dump_item)(level, li->o);
     147                        struct fifo_item * fi = (struct fifo_item *)li;
     148                        CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n [#%i](@%p)@%ld.%06ld: ",
     149                                                i++, fi->item.o, (long)fi->posted_on.tv_sec,(long)(fi->posted_on.tv_nsec/1000)),
     150                                         goto error);
     151                        CHECK_MALLOC_DO( (*dump_item)(FD_DUMP_STD_PARAMS, fi->item.o), goto error);
    139152                }
    140153        }
    141154        CHECK_POSIX_DO(  pthread_mutex_unlock( &queue->mtx ), /* continue */  );
    142155       
     156        return *buf;
     157error:
     158        CHECK_POSIX_DO(  pthread_mutex_unlock( &queue->mtx ), /* continue */  );
     159        return NULL;
    143160}
    144161
     
    243260        old->eyec = FIFO_EYEC;
    244261       
     262        /* Merge the stats in the new queue */
     263        new->total_items += old->total_items;
     264        old->total_items = 0;
     265       
     266        new->total_time.tv_nsec += old->total_time.tv_nsec;
     267        new->total_time.tv_sec += old->total_time.tv_sec + (new->total_time.tv_nsec / 1000000000);
     268        new->total_time.tv_nsec %= 1000000000;
     269        old->total_time.tv_nsec = 0;
     270        old->total_time.tv_sec = 0;
     271       
     272        new->blocking_time.tv_nsec += old->blocking_time.tv_nsec;
     273        new->blocking_time.tv_sec += old->blocking_time.tv_sec + (new->blocking_time.tv_nsec / 1000000000);
     274        new->blocking_time.tv_nsec %= 1000000000;
     275        old->blocking_time.tv_nsec = 0;
     276        old->blocking_time.tv_sec = 0;
     277       
    245278        /* Unlock, we're done */
    246279        CHECK_POSIX(  pthread_mutex_unlock( &new->mtx )  );
     
    250283}
    251284
    252 /* Get the length of the queue */
    253 int fd_fifo_length ( struct fifo * queue, int * length )
    254 {
    255         TRACE_ENTRY( "%p %p", queue, length );
     285/* Get the information on the queue */
     286int fd_fifo_getstats( struct fifo * queue, int * current_count, int * limit_count, int * highest_count, long long * total_count,
     287                                           struct timespec * total, struct timespec * blocking, struct timespec * last)
     288{
     289        TRACE_ENTRY( "%p %p %p %p %p %p %p %p", queue, current_count, limit_count, highest_count, total_count, total, blocking, last);
    256290       
    257291        /* Check the parameters */
    258         CHECK_PARAMS( CHECK_FIFO( queue ) && length );
     292        CHECK_PARAMS( CHECK_FIFO( queue ) );
    259293       
    260294        /* lock the queue */
    261295        CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
    262296       
    263         /* Retrieve the count */
    264         *length = queue->count;
     297        if (current_count)
     298                *current_count = queue->count;
     299       
     300        if (limit_count)
     301                *limit_count = queue->max;
     302       
     303        if (highest_count)
     304                *highest_count = queue->highest_ever;
     305       
     306        if (total_count)
     307                *total_count = queue->total_items;
     308       
     309        if (total)
     310                memcpy(total, &queue->total_time, sizeof(struct timespec));
     311       
     312        if (blocking)
     313                memcpy(blocking, &queue->blocking_time, sizeof(struct timespec));
     314       
     315        if (last)
     316                memcpy(last, &queue->last_time, sizeof(struct timespec));
    265317       
    266318        /* Unlock */
     
    271323}
    272324
     325
    273326/* alternate version with no error checking */
    274 int fd_fifo_length_noerr ( struct fifo * queue )
     327int fd_fifo_length ( struct fifo * queue )
    275328{
    276329        if ( !CHECK_FIFO( queue ) )
     
    326379int fd_fifo_post_int ( struct fifo * queue, void ** item )
    327380{
    328         struct fd_list * new;
     381        struct fifo_item * new;
    329382        int call_cb = 0;
     383        struct timespec posted_on, queued_on;
    330384       
    331385        TRACE_ENTRY( "%p %p", queue, item );
     
    333387        /* Check the parameters */
    334388        CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item );
     389       
     390        /* Get the timing of this call */
     391        CHECK_SYS(  clock_gettime(CLOCK_REALTIME, &posted_on)  );
    335392       
    336393        /* lock the queue */
     
    353410       
    354411        /* Create a new list item */
    355         CHECK_MALLOC_DO(  new = malloc (sizeof (struct fd_list)) , {
     412        CHECK_MALLOC_DO(  new = malloc (sizeof (struct fifo_item)) , {
    356413                        pthread_mutex_unlock( &queue->mtx );
     414                        return ENOMEM;
    357415                } );
    358416       
    359         fd_list_init(new, *item);
     417        fd_list_init(&new->item, *item);
    360418        *item = NULL;
    361419       
    362420        /* Add the new item at the end */
    363         fd_list_insert_before( &queue->list, new);
     421        fd_list_insert_before( &queue->list, &new->item);
    364422        queue->count++;
    365423        if (queue->highest_ever < queue->count)
     
    370428        }
    371429       
     430        /* store timing */
     431        memcpy(&new->posted_on, &posted_on, sizeof(struct timespec));
     432       
     433        /* update queue timing info "blocking time" */
     434        {
     435                long long blocked_ns;
     436                CHECK_SYS(  clock_gettime(CLOCK_REALTIME, &queued_on)  );
     437                blocked_ns = (queued_on.tv_sec - posted_on.tv_sec) * 1000000000;
     438                blocked_ns += (queued_on.tv_nsec - posted_on.tv_nsec);
     439                blocked_ns += queue->blocking_time.tv_nsec;
     440                queue->blocking_time.tv_sec += blocked_ns / 1000000000;
     441                queue->blocking_time.tv_nsec = blocked_ns % 1000000000;
     442        }
     443       
    372444        /* Signal if threads are asleep */
    373445        if (queue->thrs > 0) {
     
    394466{
    395467        void * ret = NULL;
    396         struct fd_list * li;
     468        struct fifo_item * fi;
     469        struct timespec now;
    397470       
    398471        ASSERT( ! FD_IS_LIST_EMPTY(&queue->list) );
    399472       
    400         fd_list_unlink(li = queue->list.next);
     473        fi = (struct fifo_item *)(queue->list.next);
     474        ret = fi->item.o;
     475        fd_list_unlink(&fi->item);
    401476        queue->count--;
    402         ret = li->o;
    403         free(li);
     477        queue->total_items++;
     478       
     479        /* Update the timings */
     480        CHECK_SYS_DO(  clock_gettime(CLOCK_REALTIME, &now), goto skip_timing  );
     481        {
     482                long long elapsed = (now.tv_sec - fi->posted_on.tv_sec) * 1000000000;
     483                elapsed += now.tv_nsec - fi->posted_on.tv_nsec;
     484               
     485                queue->last_time.tv_sec = elapsed / 1000000000;
     486                queue->last_time.tv_nsec = elapsed % 1000000000;
     487               
     488                elapsed += queue->total_time.tv_nsec;
     489                queue->total_time.tv_sec += elapsed / 1000000000;
     490                queue->total_time.tv_nsec = elapsed % 1000000000;
     491        }
     492skip_timing:   
     493        free(fi);
    404494       
    405495        if (queue->thrs_push) {
     
    509599                return EPIPE;
    510600        }
    511                
     601       
    512602        if (queue->count > 0) {
    513603                /* There are items in the queue, so pick the first one */
  • libfdproto/fifo.c

    r1103 r1104  
    579579static int fifo_tget ( struct fifo * queue, void ** item, int istimed, const struct timespec *abstime)
    580580{
    581         int timedout = 0;
    582581        int call_cb = 0;
     582        int ret = 0;
    583583       
    584584        /* Check the parameters */
     
    605605                call_cb = test_l_cb(queue);
    606606        } else {
    607                 int ret = 0;
    608607                /* We have to wait for a new item */
    609608                queue->thrs++ ;
     
    619618                        goto awaken;  /* test for spurious wake-ups */
    620619               
    621                 if (istimed && (ret == ETIMEDOUT)) {
    622                         timedout = 1;
    623                 } else {
    624                         /* Unexpected error condition (means we need to debug) */
    625                         ASSERT( ret == 0 /* never true */ );
    626                 }
     620                /* otherwise (ETIMEDOUT / other error) just continue */
    627621        }
    628622       
     
    635629       
    636630        /* Done */
    637         return timedout ? ETIMEDOUT : 0;
     631        return ret;
    638632}
    639633
Note: See TracChangeset for help on using the changeset viewer.