changeset 1207:043b894b0511

Cleanups in failover situation to avoid deadlocks and corrupt messages ids. Tested OK now.
author Sebastien Decugis <sdecugis@freediameter.net>
date Fri, 14 Jun 2013 17:30:42 +0800
parents ef7c5e39badf
children 906b1e409076
files libfdcore/fdcore-internal.h libfdcore/p_out.c libfdcore/p_sr.c libfdcore/peers.c
diffstat 4 files changed, 50 insertions(+), 28 deletions(-) [+]
line wrap: on
line diff
--- a/libfdcore/fdcore-internal.h	Fri Jun 14 17:30:01 2013 +0800
+++ b/libfdcore/fdcore-internal.h	Fri Jun 14 17:30:42 2013 +0800
@@ -183,6 +183,7 @@
 	
 	/* Sent requests (for fallback), list of struct sentreq ordered by hbh */
 	struct sr_list	 p_sr;
+	struct fifo	*p_tofailover;
 	
 	/* Pending received requests not yet answered (count only) */
 	long		 p_reqin_count; /* We use p_state_mtx to protect this value */
--- a/libfdcore/p_out.c	Fri Jun 14 17:30:01 2013 +0800
+++ b/libfdcore/p_out.c	Fri Jun 14 17:30:42 2013 +0800
@@ -74,8 +74,13 @@
 	/* Log the message */
 	fd_hook_call(HOOK_MESSAGE_SENT, cpy_for_logs_only, peer, NULL, fd_msg_pmdl_get(cpy_for_logs_only));
 	
+	pthread_cleanup_push((void *)fd_msg_free, *msg /* might be NULL, no problem */);
+	
 	/* Send the message */
 	CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz), );
+	
+	pthread_cleanup_pop(0);
+	
 out:
 	;	
 	pthread_cleanup_pop(1);
@@ -92,20 +97,12 @@
 	return 0;
 }
 
-static void cleanup_requeue(void * arg)
-{
-	struct msg *msg = arg;
-	CHECK_FCT_DO(fd_fifo_post(fd_g_outgoing, &msg),
-		{
-			fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, "An error occurred while attempting to requeue this message during cancellation of the sending function", fd_msg_pmdl_get(msg));
-			CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */);
-		} );
-}
-
 /* The code of the "out" thread */
 static void * out_thr(void * arg)
 {
 	struct fd_peer * peer = arg;
+	int stop = 0;
+	struct msg * msg;
 	ASSERT( CHECK_PEER(peer) );
 	
 	/* Set the thread name */
@@ -116,16 +113,12 @@
 	}
 	
 	/* Loop until cancelation */
-	while (1) {
-		struct msg * msg;
+	while (!stop) {
 		int ret;
 		
 		/* Retrieve next message to send */
 		CHECK_FCT_DO( fd_fifo_get(peer->p_tosend, &msg), goto error );
 		
-		/* Now if we are cancelled, we requeue this message */
-		pthread_cleanup_push(cleanup_requeue, msg);
-		
 		/* Send the message, log any error */
 		CHECK_FCT_DO( ret = do_send(&msg, peer->p_cnxctx, &peer->p_hbh, peer),
 			{
@@ -135,12 +128,30 @@
 					fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, buf, fd_msg_pmdl_get(msg));
 					fd_msg_free(msg);
 				}
+				stop = 1;
 			} );
 			
-		/* Loop */
-		pthread_cleanup_pop(0);
 	}
 	
+	/* If we're here it means there was an error on the socket. We need to continue to purge the fifo & until we are canceled */
+	CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ );
+	
+	/* Requeue all routable messages in the global "out" queue, until we are canceled once the PSM deals with the CNX_ERROR sent above */
+	while ( fd_fifo_get(peer->p_tosend, &msg) == 0 ) {
+		if (fd_msg_is_routable(msg)) {
+			CHECK_FCT_DO(fd_fifo_post_noblock(peer->p_tofailover, (void *)&msg), 
+				{
+					/* fallback: destroy the message */
+					fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, "Internal error: unable to requeue this message during failover process", fd_msg_pmdl_get(msg));
+					CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */)
+				} );
+		} else {
+			/* Just free it */
+			/* fd_hook_call(HOOK_MESSAGE_DROPPED, m, NULL, "Non-routable message freed during handover", fd_msg_pmdl_get(m)); */
+			CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */)
+		}
+	}
+
 error:
 	/* It is not really a connection error, but the effect is the same, we are not able to send anymore message */
 	CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ );
--- a/libfdcore/p_sr.c	Fri Jun 14 17:30:01 2013 +0800
+++ b/libfdcore/p_sr.c	Fri Jun 14 17:30:42 2013 +0800
@@ -37,9 +37,9 @@
 
 /* Structure to store a sent request */
 struct sentreq {
-	struct fd_list	chain; 	/* the "o" field points directly to the hop-by-hop of the request (uint32_t *)  */
+	struct fd_list	chain; 	/* the "o" field points directly to the (new) hop-by-hop of the request (uint32_t *)  */
 	struct msg	*req;	/* A request that was sent and not yet answered. */
-	uint32_t	prevhbh;/* The value to set in the hbh header when the message is retrieved */
+	uint32_t	prevhbh;/* The value to set back in the hbh header when the message is retrieved */
 	struct fd_list  expire; /* the list of expiring requests */
 	struct timespec added_on; /* the time the request was added */
 };
@@ -65,10 +65,7 @@
 	struct fd_list * li;
 	struct timespec now;
 	
-	if (!TRACE_BOOL(ANNOYING))
-		return;
-	
-	fd_log_debug("%sSentReq list @%p:", text, srlist);
+	LOG_D("%sSentReq list @%p:", text, srlist);
 	
 	CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), );
 	
@@ -76,7 +73,7 @@
 		struct sentreq * sr = (struct sentreq *)li;
 		uint32_t * nexthbh = li->o;
 		
-		fd_log_debug(" - Next req (hbh:%x): [since %ld.%06ld sec]", *nexthbh, 
+		LOG_D(" - Next req (hbh:0x%x, prev:0x%x): [since %ld.%06ld sec]", *nexthbh, sr->prevhbh,
 			(long)((now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_sec - sr->added_on.tv_sec) : (now.tv_sec - sr->added_on.tv_sec - 1)),
 			(long)((now.tv_nsec >= sr->added_on.tv_nsec) ? ((now.tv_nsec - sr->added_on.tv_nsec) / 1000) : ((now.tv_nsec - sr->added_on.tv_nsec + 1000000000) / 1000)));
 	}
@@ -224,8 +221,9 @@
 	CHECK_POSIX( pthread_mutex_lock(&srlist->mtx) );
 	next = find_or_next(&srlist->srs, *hbhloc, &match);
 	if (match) {
-		TRACE_DEBUG(INFO, "A request with the same hop-by-hop Id was already sent: error");
+		TRACE_DEBUG(INFO, "A request with the same hop-by-hop Id (0x%x) was already sent: error", *hbhloc);
 		free(sr);
+		srl_dump("Current list of SR: ", &srlist->srs);
 		CHECK_POSIX_DO( pthread_mutex_unlock(&srlist->mtx), /* ignore */ );
 		return EINVAL;
 	}
@@ -234,7 +232,6 @@
 	*req = NULL;
 	fd_list_insert_before(next, &sr->chain);
 	srlist->cnt++;
-	srl_dump("Saved new request, ", &srlist->srs);
 	
 	/* In case of request with a timeout, also store in the timeout list */
 	ts = fd_msg_anscb_gettimeout( sr->req );
@@ -279,10 +276,10 @@
 	
 	/* Search the request in the list */
 	CHECK_POSIX( pthread_mutex_lock(&srlist->mtx) );
-	srl_dump("Fetching a request, ", &srlist->srs);
 	sr = (struct sentreq *)find_or_next(&srlist->srs, hbh, &match);
 	if (!match) {
 		TRACE_DEBUG(INFO, "There is no saved request with this hop-by-hop id (%x)", hbh);
+		srl_dump("Current list of SR: ", &srlist->srs);
 		*req = NULL;
 	} else {
 		/* Restore hop-by-hop id */
--- a/libfdcore/peers.c	Fri Jun 14 17:30:01 2013 +0800
+++ b/libfdcore/peers.c	Fri Jun 14 17:30:42 2013 +0800
@@ -77,6 +77,7 @@
 	fd_list_init(&p->p_actives, p);
 	fd_list_init(&p->p_expiry, p);
 	CHECK_FCT( fd_fifo_new(&p->p_tosend, 5) );
+	CHECK_FCT( fd_fifo_new(&p->p_tofailover, 0) );
 	p->p_hbh = lrand48();
 	
 	fd_list_init(&p->p_sr.srs, p);
@@ -232,7 +233,7 @@
 		free(__li);						\
 	}
 
-/* Empty the lists of p_tosend and p_sentreq messages */
+/* Empty the lists of p_tosend, p_failover, and p_sentreq messages */
 void fd_peer_failover_msg(struct fd_peer * peer)
 {
 	struct msg *m;
@@ -257,6 +258,17 @@
 		}
 	}
 	
+	/* Requeue all messages in the "failover" queue */
+	while ( fd_fifo_tryget(peer->p_tofailover, &m) == 0 ) {
+		fd_hook_call(HOOK_MESSAGE_FAILOVER, m, peer, NULL, fd_msg_pmdl_get(m));
+		CHECK_FCT_DO(fd_fifo_post_noblock(fd_g_outgoing, (void *)&m), 
+			{
+				/* fallback: destroy the message */
+				fd_hook_call(HOOK_MESSAGE_DROPPED, m, NULL, "Internal error: unable to requeue this message during failover process", fd_msg_pmdl_get(m));
+				CHECK_FCT_DO(fd_msg_free(m), /* What can we do more? */)
+			} );
+	}
+	
 	/* Requeue all routable sent requests */
 	fd_p_sr_failover(&peer->p_sr);
 	
@@ -334,6 +346,7 @@
 	fd_list_unlink(&p->p_actives);
 	
 	CHECK_FCT_DO( fd_fifo_del(&p->p_tosend), /* continue */ );
+	CHECK_FCT_DO( fd_fifo_del(&p->p_tofailover), /* continue */ );
 	CHECK_POSIX_DO( pthread_mutex_destroy(&p->p_state_mtx), /* continue */);
 	CHECK_POSIX_DO( pthread_mutex_destroy(&p->p_sr.mtx), /* continue */);
 	CHECK_POSIX_DO( pthread_cond_destroy(&p->p_sr.cnd), /* continue */);
"Welcome to our mercurial repository"