Navigation


Changeset 1207:043b894b0511 in freeDiameter for libfdcore


Ignore:
Timestamp:
Jun 14, 2013, 6:30:42 PM (11 years ago)
Author:
Sebastien Decugis <sdecugis@freediameter.net>
Branch:
default
Phase:
public
Message:

Cleanups in failover situation to avoid deadlocks and corrupt messages ids. Tested OK now.

Location:
libfdcore
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • libfdcore/fdcore-internal.h

    r1186 r1207  
    184184        /* Sent requests (for fallback), list of struct sentreq ordered by hbh */
    185185        struct sr_list   p_sr;
     186        struct fifo     *p_tofailover;
    186187       
    187188        /* Pending received requests not yet answered (count only) */
  • libfdcore/p_out.c

    r1186 r1207  
    7575        fd_hook_call(HOOK_MESSAGE_SENT, cpy_for_logs_only, peer, NULL, fd_msg_pmdl_get(cpy_for_logs_only));
    7676       
     77        pthread_cleanup_push((void *)fd_msg_free, *msg /* might be NULL, no problem */);
     78       
    7779        /* Send the message */
    7880        CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz), );
     81       
     82        pthread_cleanup_pop(0);
     83       
    7984out:
    8085        ;       
     
    9398}
    9499
    95 static void cleanup_requeue(void * arg)
    96 {
    97         struct msg *msg = arg;
    98         CHECK_FCT_DO(fd_fifo_post(fd_g_outgoing, &msg),
    99                 {
    100                         fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, "An error occurred while attempting to requeue this message during cancellation of the sending function", fd_msg_pmdl_get(msg));
    101                         CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */);
    102                 } );
    103 }
    104 
    105100/* The code of the "out" thread */
    106101static void * out_thr(void * arg)
    107102{
    108103        struct fd_peer * peer = arg;
     104        int stop = 0;
     105        struct msg * msg;
    109106        ASSERT( CHECK_PEER(peer) );
    110107       
     
    117114       
    118115        /* Loop until cancelation */
    119         while (1) {
    120                 struct msg * msg;
     116        while (!stop) {
    121117                int ret;
    122118               
    123119                /* Retrieve next message to send */
    124120                CHECK_FCT_DO( fd_fifo_get(peer->p_tosend, &msg), goto error );
    125                
    126                 /* Now if we are cancelled, we requeue this message */
    127                 pthread_cleanup_push(cleanup_requeue, msg);
    128121               
    129122                /* Send the message, log any error */
     
    136129                                        fd_msg_free(msg);
    137130                                }
     131                                stop = 1;
    138132                        } );
    139133                       
    140                 /* Loop */
    141                 pthread_cleanup_pop(0);
    142         }
    143        
     134        }
     135       
     136        /* If we're here it means there was an error on the socket. We need to continue to purge the fifo & until we are canceled */
     137        CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ );
     138       
     139        /* Requeue all routable messages in the global "out" queue, until we are canceled once the PSM deals with the CNX_ERROR sent above */
     140        while ( fd_fifo_get(peer->p_tosend, &msg) == 0 ) {
     141                if (fd_msg_is_routable(msg)) {
     142                        CHECK_FCT_DO(fd_fifo_post_noblock(peer->p_tofailover, (void *)&msg),
     143                                {
     144                                        /* fallback: destroy the message */
     145                                        fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, "Internal error: unable to requeue this message during failover process", fd_msg_pmdl_get(msg));
     146                                        CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */)
     147                                } );
     148                } else {
     149                        /* Just free it */
     150                        /* fd_hook_call(HOOK_MESSAGE_DROPPED, m, NULL, "Non-routable message freed during handover", fd_msg_pmdl_get(m)); */
     151                        CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */)
     152                }
     153        }
     154
    144155error:
    145156        /* It is not really a connection error, but the effect is the same, we are not able to send anymore message */
  • libfdcore/p_sr.c

    r1188 r1207  
    3838/* Structure to store a sent request */
    3939struct sentreq {
    40         struct fd_list  chain;  /* the "o" field points directly to the hop-by-hop of the request (uint32_t *)  */
     40        struct fd_list  chain;  /* the "o" field points directly to the (new) hop-by-hop of the request (uint32_t *)  */
    4141        struct msg      *req;   /* A request that was sent and not yet answered. */
    42         uint32_t        prevhbh;/* The value to set in the hbh header when the message is retrieved */
     42        uint32_t        prevhbh;/* The value to set back in the hbh header when the message is retrieved */
    4343        struct fd_list  expire; /* the list of expiring requests */
    4444        struct timespec added_on; /* the time the request was added */
     
    6666        struct timespec now;
    6767       
    68         if (!TRACE_BOOL(ANNOYING))
    69                 return;
    70        
    71         fd_log_debug("%sSentReq list @%p:", text, srlist);
     68        LOG_D("%sSentReq list @%p:", text, srlist);
    7269       
    7370        CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), );
     
    7774                uint32_t * nexthbh = li->o;
    7875               
    79                 fd_log_debug(" - Next req (hbh:%x): [since %ld.%06ld sec]", *nexthbh,
     76                LOG_D(" - Next req (hbh:0x%x, prev:0x%x): [since %ld.%06ld sec]", *nexthbh, sr->prevhbh,
    8077                        (long)((now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_sec - sr->added_on.tv_sec) : (now.tv_sec - sr->added_on.tv_sec - 1)),
    8178                        (long)((now.tv_nsec >= sr->added_on.tv_nsec) ? ((now.tv_nsec - sr->added_on.tv_nsec) / 1000) : ((now.tv_nsec - sr->added_on.tv_nsec + 1000000000) / 1000)));
     
    225222        next = find_or_next(&srlist->srs, *hbhloc, &match);
    226223        if (match) {
    227                 TRACE_DEBUG(INFO, "A request with the same hop-by-hop Id was already sent: error");
     224                TRACE_DEBUG(INFO, "A request with the same hop-by-hop Id (0x%x) was already sent: error", *hbhloc);
    228225                free(sr);
     226                srl_dump("Current list of SR: ", &srlist->srs);
    229227                CHECK_POSIX_DO( pthread_mutex_unlock(&srlist->mtx), /* ignore */ );
    230228                return EINVAL;
     
    235233        fd_list_insert_before(next, &sr->chain);
    236234        srlist->cnt++;
    237         srl_dump("Saved new request, ", &srlist->srs);
    238235       
    239236        /* In case of request with a timeout, also store in the timeout list */
     
    280277        /* Search the request in the list */
    281278        CHECK_POSIX( pthread_mutex_lock(&srlist->mtx) );
    282         srl_dump("Fetching a request, ", &srlist->srs);
    283279        sr = (struct sentreq *)find_or_next(&srlist->srs, hbh, &match);
    284280        if (!match) {
    285281                TRACE_DEBUG(INFO, "There is no saved request with this hop-by-hop id (%x)", hbh);
     282                srl_dump("Current list of SR: ", &srlist->srs);
    286283                *req = NULL;
    287284        } else {
  • libfdcore/peers.c

    r1201 r1207  
    7878        fd_list_init(&p->p_expiry, p);
    7979        CHECK_FCT( fd_fifo_new(&p->p_tosend, 5) );
     80        CHECK_FCT( fd_fifo_new(&p->p_tofailover, 0) );
    8081        p->p_hbh = lrand48();
    8182       
     
    233234        }
    234235
    235 /* Empty the lists of p_tosend and p_sentreq messages */
     236/* Empty the lists of p_tosend, p_failover, and p_sentreq messages */
    236237void fd_peer_failover_msg(struct fd_peer * peer)
    237238{
     
    258259        }
    259260       
     261        /* Requeue all messages in the "failover" queue */
     262        while ( fd_fifo_tryget(peer->p_tofailover, &m) == 0 ) {
     263                fd_hook_call(HOOK_MESSAGE_FAILOVER, m, peer, NULL, fd_msg_pmdl_get(m));
     264                CHECK_FCT_DO(fd_fifo_post_noblock(fd_g_outgoing, (void *)&m),
     265                        {
     266                                /* fallback: destroy the message */
     267                                fd_hook_call(HOOK_MESSAGE_DROPPED, m, NULL, "Internal error: unable to requeue this message during failover process", fd_msg_pmdl_get(m));
     268                                CHECK_FCT_DO(fd_msg_free(m), /* What can we do more? */)
     269                        } );
     270        }
     271       
    260272        /* Requeue all routable sent requests */
    261273        fd_p_sr_failover(&peer->p_sr);
     
    335347       
    336348        CHECK_FCT_DO( fd_fifo_del(&p->p_tosend), /* continue */ );
     349        CHECK_FCT_DO( fd_fifo_del(&p->p_tofailover), /* continue */ );
    337350        CHECK_POSIX_DO( pthread_mutex_destroy(&p->p_state_mtx), /* continue */);
    338351        CHECK_POSIX_DO( pthread_mutex_destroy(&p->p_sr.mtx), /* continue */);
Note: See TracChangeset for help on using the changeset viewer.