# HG changeset patch # User Sebastien Decugis # Date 1371202242 -28800 # Node ID 043b894b0511b6beb155576e9e2c509a21be8360 # Parent ef7c5e39badf53c260725e5b7b44922a3c3e7369 Cleanups in failover situation to avoid deadlocks and corrupt messages ids. Tested OK now. diff -r ef7c5e39badf -r 043b894b0511 libfdcore/fdcore-internal.h --- a/libfdcore/fdcore-internal.h Fri Jun 14 17:30:01 2013 +0800 +++ b/libfdcore/fdcore-internal.h Fri Jun 14 17:30:42 2013 +0800 @@ -183,6 +183,7 @@ /* Sent requests (for fallback), list of struct sentreq ordered by hbh */ struct sr_list p_sr; + struct fifo *p_tofailover; /* Pending received requests not yet answered (count only) */ long p_reqin_count; /* We use p_state_mtx to protect this value */ diff -r ef7c5e39badf -r 043b894b0511 libfdcore/p_out.c --- a/libfdcore/p_out.c Fri Jun 14 17:30:01 2013 +0800 +++ b/libfdcore/p_out.c Fri Jun 14 17:30:42 2013 +0800 @@ -74,8 +74,13 @@ /* Log the message */ fd_hook_call(HOOK_MESSAGE_SENT, cpy_for_logs_only, peer, NULL, fd_msg_pmdl_get(cpy_for_logs_only)); + pthread_cleanup_push((void *)fd_msg_free, *msg /* might be NULL, no problem */); + /* Send the message */ CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz), ); + + pthread_cleanup_pop(0); + out: ; pthread_cleanup_pop(1); @@ -92,20 +97,12 @@ return 0; } -static void cleanup_requeue(void * arg) -{ - struct msg *msg = arg; - CHECK_FCT_DO(fd_fifo_post(fd_g_outgoing, &msg), - { - fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, "An error occurred while attempting to requeue this message during cancellation of the sending function", fd_msg_pmdl_get(msg)); - CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */); - } ); -} - /* The code of the "out" thread */ static void * out_thr(void * arg) { struct fd_peer * peer = arg; + int stop = 0; + struct msg * msg; ASSERT( CHECK_PEER(peer) ); /* Set the thread name */ @@ -116,16 +113,12 @@ } /* Loop until cancelation */ - while (1) { - struct msg * msg; + while (!stop) { int ret; /* Retrieve next message to send */ CHECK_FCT_DO( fd_fifo_get(peer->p_tosend, &msg), goto error ); - /* Now if we are cancelled, we requeue this message */ - pthread_cleanup_push(cleanup_requeue, msg); - /* Send the message, log any error */ CHECK_FCT_DO( ret = do_send(&msg, peer->p_cnxctx, &peer->p_hbh, peer), { @@ -135,12 +128,30 @@ fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, buf, fd_msg_pmdl_get(msg)); fd_msg_free(msg); } + stop = 1; } ); - /* Loop */ - pthread_cleanup_pop(0); } + /* If we're here it means there was an error on the socket. We need to continue to purge the fifo & until we are canceled */ + CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ ); + + /* Requeue all routable messages in the global "out" queue, until we are canceled once the PSM deals with the CNX_ERROR sent above */ + while ( fd_fifo_get(peer->p_tosend, &msg) == 0 ) { + if (fd_msg_is_routable(msg)) { + CHECK_FCT_DO(fd_fifo_post_noblock(peer->p_tofailover, (void *)&msg), + { + /* fallback: destroy the message */ + fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, "Internal error: unable to requeue this message during failover process", fd_msg_pmdl_get(msg)); + CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */) + } ); + } else { + /* Just free it */ + /* fd_hook_call(HOOK_MESSAGE_DROPPED, m, NULL, "Non-routable message freed during handover", fd_msg_pmdl_get(m)); */ + CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */) + } + } + error: /* It is not really a connection error, but the effect is the same, we are not able to send anymore message */ CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ ); diff -r ef7c5e39badf -r 043b894b0511 libfdcore/p_sr.c --- a/libfdcore/p_sr.c Fri Jun 14 17:30:01 2013 +0800 +++ b/libfdcore/p_sr.c Fri Jun 14 17:30:42 2013 +0800 @@ -37,9 +37,9 @@ /* Structure to store a sent request */ struct sentreq { - struct fd_list chain; /* the "o" field points directly to the hop-by-hop of the request (uint32_t *) */ + struct fd_list chain; /* the "o" field points directly to the (new) hop-by-hop of the request (uint32_t *) */ struct msg *req; /* A request that was sent and not yet answered. */ - uint32_t prevhbh;/* The value to set in the hbh header when the message is retrieved */ + uint32_t prevhbh;/* The value to set back in the hbh header when the message is retrieved */ struct fd_list expire; /* the list of expiring requests */ struct timespec added_on; /* the time the request was added */ }; @@ -65,10 +65,7 @@ struct fd_list * li; struct timespec now; - if (!TRACE_BOOL(ANNOYING)) - return; - - fd_log_debug("%sSentReq list @%p:", text, srlist); + LOG_D("%sSentReq list @%p:", text, srlist); CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), ); @@ -76,7 +73,7 @@ struct sentreq * sr = (struct sentreq *)li; uint32_t * nexthbh = li->o; - fd_log_debug(" - Next req (hbh:%x): [since %ld.%06ld sec]", *nexthbh, + LOG_D(" - Next req (hbh:0x%x, prev:0x%x): [since %ld.%06ld sec]", *nexthbh, sr->prevhbh, (long)((now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_sec - sr->added_on.tv_sec) : (now.tv_sec - sr->added_on.tv_sec - 1)), (long)((now.tv_nsec >= sr->added_on.tv_nsec) ? ((now.tv_nsec - sr->added_on.tv_nsec) / 1000) : ((now.tv_nsec - sr->added_on.tv_nsec + 1000000000) / 1000))); } @@ -224,8 +221,9 @@ CHECK_POSIX( pthread_mutex_lock(&srlist->mtx) ); next = find_or_next(&srlist->srs, *hbhloc, &match); if (match) { - TRACE_DEBUG(INFO, "A request with the same hop-by-hop Id was already sent: error"); + TRACE_DEBUG(INFO, "A request with the same hop-by-hop Id (0x%x) was already sent: error", *hbhloc); free(sr); + srl_dump("Current list of SR: ", &srlist->srs); CHECK_POSIX_DO( pthread_mutex_unlock(&srlist->mtx), /* ignore */ ); return EINVAL; } @@ -234,7 +232,6 @@ *req = NULL; fd_list_insert_before(next, &sr->chain); srlist->cnt++; - srl_dump("Saved new request, ", &srlist->srs); /* In case of request with a timeout, also store in the timeout list */ ts = fd_msg_anscb_gettimeout( sr->req ); @@ -279,10 +276,10 @@ /* Search the request in the list */ CHECK_POSIX( pthread_mutex_lock(&srlist->mtx) ); - srl_dump("Fetching a request, ", &srlist->srs); sr = (struct sentreq *)find_or_next(&srlist->srs, hbh, &match); if (!match) { TRACE_DEBUG(INFO, "There is no saved request with this hop-by-hop id (%x)", hbh); + srl_dump("Current list of SR: ", &srlist->srs); *req = NULL; } else { /* Restore hop-by-hop id */ diff -r ef7c5e39badf -r 043b894b0511 libfdcore/peers.c --- a/libfdcore/peers.c Fri Jun 14 17:30:01 2013 +0800 +++ b/libfdcore/peers.c Fri Jun 14 17:30:42 2013 +0800 @@ -77,6 +77,7 @@ fd_list_init(&p->p_actives, p); fd_list_init(&p->p_expiry, p); CHECK_FCT( fd_fifo_new(&p->p_tosend, 5) ); + CHECK_FCT( fd_fifo_new(&p->p_tofailover, 0) ); p->p_hbh = lrand48(); fd_list_init(&p->p_sr.srs, p); @@ -232,7 +233,7 @@ free(__li); \ } -/* Empty the lists of p_tosend and p_sentreq messages */ +/* Empty the lists of p_tosend, p_failover, and p_sentreq messages */ void fd_peer_failover_msg(struct fd_peer * peer) { struct msg *m; @@ -257,6 +258,17 @@ } } + /* Requeue all messages in the "failover" queue */ + while ( fd_fifo_tryget(peer->p_tofailover, &m) == 0 ) { + fd_hook_call(HOOK_MESSAGE_FAILOVER, m, peer, NULL, fd_msg_pmdl_get(m)); + CHECK_FCT_DO(fd_fifo_post_noblock(fd_g_outgoing, (void *)&m), + { + /* fallback: destroy the message */ + fd_hook_call(HOOK_MESSAGE_DROPPED, m, NULL, "Internal error: unable to requeue this message during failover process", fd_msg_pmdl_get(m)); + CHECK_FCT_DO(fd_msg_free(m), /* What can we do more? */) + } ); + } + /* Requeue all routable sent requests */ fd_p_sr_failover(&peer->p_sr); @@ -334,6 +346,7 @@ fd_list_unlink(&p->p_actives); CHECK_FCT_DO( fd_fifo_del(&p->p_tosend), /* continue */ ); + CHECK_FCT_DO( fd_fifo_del(&p->p_tofailover), /* continue */ ); CHECK_POSIX_DO( pthread_mutex_destroy(&p->p_state_mtx), /* continue */); CHECK_POSIX_DO( pthread_mutex_destroy(&p->p_sr.mtx), /* continue */); CHECK_POSIX_DO( pthread_cond_destroy(&p->p_sr.cnd), /* continue */);