Mercurial > hg > freeDiameter
diff libfdcore/p_out.c @ 1207:043b894b0511
Cleanups in failover situation to avoid deadlocks and corrupt messages ids. Tested OK now.
author | Sebastien Decugis <sdecugis@freediameter.net> |
---|---|
date | Fri, 14 Jun 2013 17:30:42 +0800 |
parents | 56c36d1007b4 |
children | 8f9684264fe0 |
line wrap: on
line diff
--- a/libfdcore/p_out.c Fri Jun 14 17:30:01 2013 +0800 +++ b/libfdcore/p_out.c Fri Jun 14 17:30:42 2013 +0800 @@ -74,8 +74,13 @@ /* Log the message */ fd_hook_call(HOOK_MESSAGE_SENT, cpy_for_logs_only, peer, NULL, fd_msg_pmdl_get(cpy_for_logs_only)); + pthread_cleanup_push((void *)fd_msg_free, *msg /* might be NULL, no problem */); + /* Send the message */ CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz), ); + + pthread_cleanup_pop(0); + out: ; pthread_cleanup_pop(1); @@ -92,20 +97,12 @@ return 0; } -static void cleanup_requeue(void * arg) -{ - struct msg *msg = arg; - CHECK_FCT_DO(fd_fifo_post(fd_g_outgoing, &msg), - { - fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, "An error occurred while attempting to requeue this message during cancellation of the sending function", fd_msg_pmdl_get(msg)); - CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */); - } ); -} - /* The code of the "out" thread */ static void * out_thr(void * arg) { struct fd_peer * peer = arg; + int stop = 0; + struct msg * msg; ASSERT( CHECK_PEER(peer) ); /* Set the thread name */ @@ -116,16 +113,12 @@ } /* Loop until cancelation */ - while (1) { - struct msg * msg; + while (!stop) { int ret; /* Retrieve next message to send */ CHECK_FCT_DO( fd_fifo_get(peer->p_tosend, &msg), goto error ); - /* Now if we are cancelled, we requeue this message */ - pthread_cleanup_push(cleanup_requeue, msg); - /* Send the message, log any error */ CHECK_FCT_DO( ret = do_send(&msg, peer->p_cnxctx, &peer->p_hbh, peer), { @@ -135,12 +128,30 @@ fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, buf, fd_msg_pmdl_get(msg)); fd_msg_free(msg); } + stop = 1; } ); - /* Loop */ - pthread_cleanup_pop(0); } + /* If we're here it means there was an error on the socket. We need to continue to purge the fifo & until we are canceled */ + CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ ); + + /* Requeue all routable messages in the global "out" queue, until we are canceled once the PSM deals with the CNX_ERROR sent above */ + while ( fd_fifo_get(peer->p_tosend, &msg) == 0 ) { + if (fd_msg_is_routable(msg)) { + CHECK_FCT_DO(fd_fifo_post_noblock(peer->p_tofailover, (void *)&msg), + { + /* fallback: destroy the message */ + fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, "Internal error: unable to requeue this message during failover process", fd_msg_pmdl_get(msg)); + CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */) + } ); + } else { + /* Just free it */ + /* fd_hook_call(HOOK_MESSAGE_DROPPED, m, NULL, "Non-routable message freed during handover", fd_msg_pmdl_get(m)); */ + CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */) + } + } + error: /* It is not really a connection error, but the effect is the same, we are not able to send anymore message */ CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ );