changeset 1010:357c2f892d24

Implement a new counter on pending answers to send back to a peer. Function fd_peer_get_load_pending updated to retrieve this counter as well. When a peer has answers pending, the connection is not immediately teared down upon DPR/DPA exchange, but a GRACE_TIMEOUT delay (default 1 sec) is granted.
author Sebastien Decugis <sdecugis@freediameter.net>
date Mon, 25 Mar 2013 16:39:32 +0100
parents e22434c66126
children aaf8743df5e7
files include/freeDiameter/libfdcore.h libfdcore/fdcore-internal.h libfdcore/p_ce.c libfdcore/p_dp.c libfdcore/p_out.c libfdcore/p_psm.c libfdcore/peers.c
diffstat 7 files changed, 94 insertions(+), 33 deletions(-) [+]
line wrap: on
line diff
--- a/include/freeDiameter/libfdcore.h	Mon Mar 25 14:35:55 2013 +0100
+++ b/include/freeDiameter/libfdcore.h	Mon Mar 25 16:39:32 2013 +0100
@@ -389,6 +389,8 @@
  *
  * PARAMETERS:
  *  peer	: The peer which load to read
+ *  to_receive  : (out) number of requests sent to this peer without matching answer yet.
+ *  to_send     : (out) number of requests received from this peer and not yet answered.
  *
  * DESCRIPTION: 
  *   Returns the current number of requests sent to this peer
@@ -399,7 +401,7 @@
  *  0  : The load parameter has been updated. (it should have a positive value always)
  * !0  : An error occurred
  */
-int fd_peer_get_load_pending(struct peer_hdr *peer, int * load);
+int fd_peer_get_load_pending(struct peer_hdr *peer, long * to_receive, long * to_send);
 
 /*
  * FUNCTION:	fd_peer_validate_register
--- a/libfdcore/fdcore-internal.h	Mon Mar 25 14:35:55 2013 +0100
+++ b/libfdcore/fdcore-internal.h	Mon Mar 25 16:39:32 2013 +0100
@@ -70,6 +70,11 @@
 #define DPR_TIMEOUT 	15	/* in seconds */
 #endif /* DPR_TIMEOUT */
 
+/* Delai where the connection is maintained opened to allow exchanging remaining pending answers after DPR/DPA */
+#ifndef GRACE_TIMEOUT
+#define GRACE_TIMEOUT   1	/* in seconds */
+#endif /* GRACE_TIMEOUT */
+
 /* The Vendor-Id to advertise in CER/CEA */
 #ifndef MY_VENDOR_ID
 #define MY_VENDOR_ID	0 	/* Reserved value to tell it must be ignored */
@@ -125,7 +130,7 @@
 struct sr_list {
 	struct fd_list 	srs; /* requests ordered by hop-by-hop id */
 	struct fd_list  exp; /* requests that have a timeout set, ordered by timeout */
-	int             cnt; /* number of requests in the srs list */
+	long            cnt; /* number of requests in the srs list */
 	pthread_mutex_t	mtx; /* mutex to protect these lists */
 	pthread_cond_t  cnd; /* cond var used by the thread that handles timeouts */
 	pthread_t       thr; /* the thread that handles timeouts (and calls the anscb) */
@@ -181,6 +186,9 @@
 	/* Sent requests (for fallback), list of struct sentreq ordered by hbh */
 	struct sr_list	 p_sr;
 	
+	/* Pending received requests not yet answered (count only) */
+	long		 p_reqin_count; /* We use p_state_mtx to protect this value */
+	
 	/* Data for transitional states before the peer is in OPEN state */
 	struct {
 		struct cnxctx * p_receiver;	/* Only used in case of election */
--- a/libfdcore/p_ce.c	Mon Mar 25 14:35:55 2013 +0100
+++ b/libfdcore/p_ce.c	Mon Mar 25 16:39:32 2013 +0100
@@ -918,6 +918,11 @@
 			isi = 0;
 	}
 	
+	/* Update the counter to match with the answer being sent */
+	CHECK_POSIX( pthread_mutex_lock(&peer->p_state_mtx) );
+	peer->p_reqin_count++;
+	CHECK_POSIX( pthread_mutex_unlock(&peer->p_state_mtx) );
+
 	/* Reply a CEA */
 	CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, &msg, 0 ) );
 	CHECK_FCT( fd_msg_rescode_set(msg, "DIAMETER_SUCCESS", NULL, NULL, 0 ) );
--- a/libfdcore/p_dp.c	Mon Mar 25 14:35:55 2013 +0100
+++ b/libfdcore/p_dp.c	Mon Mar 25 16:39:32 2013 +0100
@@ -62,6 +62,7 @@
 /* Handle a received message */
 int fd_p_dp_handle(struct msg ** msg, int req, struct fd_peer * peer)
 {
+	long to_receive, to_send;
 	TRACE_ENTRY("%p %d %p", msg, req, peer);
 	
 	if (req) {
@@ -109,28 +110,34 @@
 		CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, msg, 0 ) );
 		CHECK_FCT( fd_msg_rescode_set( *msg, "DIAMETER_SUCCESS", NULL, NULL, 1 ) );
 		
-		/* Move to CLOSING state to failover outgoing messages (and avoid failing over the DPA...) */
-		CHECK_FCT( fd_psm_change_state(peer, STATE_CLOSING) );
-		
-		/* Now send the DPA */
-		CHECK_FCT( fd_out_send( msg, NULL, peer, FD_CNX_ORDERED) );
+		/* Do we have pending exchanges with this peer? */
+		CHECK_FCT( fd_peer_get_load_pending(&peer->p_hdr, &to_receive, &to_send) );
 		
-		if (fd_cnx_isMultichan(peer->p_cnxctx)) {
-			/* There is a possibililty that messages are still in the pipe coming here, so let's grace for 1 second */
-			CHECK_FCT( fd_psm_change_state(peer, STATE_CLOSING_GRACE) );
-			fd_psm_next_timeout(peer, 0, 1);
+		if ((to_receive == 0) && (to_send == 1 /* only the DPA */)) {
+			/* No pending exchange, move to CLOSING directly */
+			CHECK_FCT( fd_psm_change_state(peer, STATE_CLOSING) );
+		
+			/* Now send the DPA */
+			CHECK_FCT( fd_out_send( msg, NULL, peer, FD_CNX_ORDERED) );
 			
-		} else {
-			/* Move to CLOSED state */
+			/* and move to CLOSED */
 			fd_psm_cleanup(peer, 0);
 
 			/* Reset the timer for next connection attempt -- we'll retry sooner or later depending on the disconnection cause */
 			fd_psm_next_timeout(peer, 1, fd_p_dp_newdelay(peer));
+		} else {
+			/* We have pending exchanges, we move to CLOSING_GRACE which allows exchanges of answers but
+			not new requests */
+			CHECK_FCT( fd_psm_change_state(peer, STATE_CLOSING_GRACE) );
+			fd_psm_next_timeout(peer, 0, GRACE_TIMEOUT);
+			
+			/* Now send the DPA */
+			CHECK_FCT( fd_out_send( msg, NULL, peer, FD_CNX_ORDERED) );
 		}
 	} else {
 		/* We received a DPA */
 		int curstate = fd_peer_getstate(peer);
-		if (curstate != STATE_CLOSING) {
+		if (curstate != STATE_CLOSING_GRACE) {
 			TRACE_DEBUG(INFO, "Ignoring DPA received in state %s", STATE_STR(curstate));
 		}
 			
@@ -140,12 +147,16 @@
 		CHECK_FCT_DO( fd_msg_free( *msg ), /* continue */ );
 		*msg = NULL;
 		
-		if (fd_cnx_isMultichan(peer->p_cnxctx)) {
-			CHECK_FCT( fd_psm_change_state(peer, STATE_CLOSING_GRACE) );
-			fd_psm_next_timeout(peer, 0, 1);
+		/* Do we still have pending exchanges with this peer? */
+		CHECK_FCT( fd_peer_get_load_pending(&peer->p_hdr, &to_receive, &to_send) );
+		if ((to_receive != 0) || (to_send != 0)) {
+			TRACE_DEBUG(INFO, "Received DPA but pending load: [%ld, %ld], giving grace delay before closing", to_receive, to_send);
+			fd_psm_next_timeout(peer, 0, GRACE_TIMEOUT);
 			peer->p_flags.pf_localterm = 1;
+		} else {
+			/* otherwise, go to CLOSING state, the psm will handle terminating the connection */
+			CHECK_FCT( fd_psm_change_state(peer, STATE_CLOSING) );
 		}
-		/* otherwise, return in CLOSING state, the psm will handle it */
 	}
 	
 	return 0;
@@ -187,7 +198,7 @@
 	peer->p_hdr.info.runtime.pir_lastDC = val.u32;
 	
 	/* Update the peer state and timer */
-	CHECK_FCT( fd_psm_change_state(peer, STATE_CLOSING) );
+	CHECK_FCT( fd_psm_change_state(peer, STATE_CLOSING_GRACE) );
 	fd_psm_next_timeout(peer, 0, DPR_TIMEOUT);
 	
 	/* Now send the DPR message */
--- a/libfdcore/p_out.c	Mon Mar 25 14:35:55 2013 +0100
+++ b/libfdcore/p_out.c	Mon Mar 25 16:39:32 2013 +0100
@@ -172,9 +172,21 @@
 /* Wrapper to sending a message either by out thread (peer in OPEN state) or directly; cnx or peer must be provided. Flags are valid only for direct sending, not through thread (unused) */
 int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer, uint32_t flags)
 {
+	struct msg_hdr * hdr;
+	
 	TRACE_ENTRY("%p %p %p %x", msg, cnx, peer, flags);
 	CHECK_PARAMS( msg && *msg && (cnx || (peer && peer->p_cnxctx)));
 	
+	if (peer) {
+		CHECK_FCT( fd_msg_hdr(*msg, &hdr) );
+		if (!(hdr->msg_flags & CMD_FLAG_REQUEST)) {
+			/* Update the count of pending answers to send */
+			CHECK_POSIX( pthread_mutex_lock(&peer->p_state_mtx) );
+			peer->p_reqin_count--;
+			CHECK_POSIX( pthread_mutex_unlock(&peer->p_state_mtx) );			
+		}
+	}
+	
 	if (fd_peer_getstate(peer) == STATE_OPEN) {
 		/* Normal case: just queue for the out thread to pick it up */
 		CHECK_FCT( fd_fifo_post(peer->p_tosend, msg) );
--- a/libfdcore/p_psm.c	Mon Mar 25 14:35:55 2013 +0100
+++ b/libfdcore/p_psm.c	Mon Mar 25 16:39:32 2013 +0100
@@ -44,9 +44,9 @@
 The delivery of Diameter messages must not always be unordered: order is important at
 begining and end of a connection lifetime. It means we need agility to
 switch between "ordering enforced" and "ordering not enforced to counter
-HotLB" modes of operation.
+Head of the Line Blocking" modes of operation.
 
-The connection state machine represented in RFC3588 (and rfc3588bis) is
+The connection state machine represented in RFC3588 (and RFC6733) is
 incomplete, because it lacks the SUSPECT state and the 3 DWR/DWA
 exchanges (section 5.1) when the peer recovers from this state.
 Personnally I don't see the rationale for exchanging 3 messages (why 3?)
@@ -90,12 +90,15 @@
 DPA. Peer A receives the DPA before the application message. The
 application message is lost.
 
-This situation is actually quite possible because DPR/DPA messages are
+This situation is actually happening easily because DPR/DPA messages are
 very short, while application messages can be quite large. Therefore,
 they require much more time to deliver.
 
 I really cannot see a way to counter this effect by using the ordering
 of the messages, except by applying a timer (state STATE_CLOSING_GRACE).
+This timer can be also useful when we detect that some messages has not 
+yet received an answer on this link, to give time to the application to 
+complete the exchange ongoing.
 
 However, this problem must be balanced with the fact that the message
 that is lost will be in many cases sent again as the failover mechanism
@@ -196,7 +199,7 @@
 	
 	return 0;
 }
-static int leave_open_state(struct fd_peer * peer)
+static int leave_open_state(struct fd_peer * peer, int skip_failover)
 {
 	/* Remove from active peers list */
 	CHECK_POSIX( pthread_rwlock_wrlock(&fd_g_activ_peers_rw) );
@@ -207,7 +210,9 @@
 	CHECK_FCT( fd_out_stop(peer) );
 	
 	/* Failover the messages */
-	fd_peer_failover_msg(peer);
+	if (!skip_failover) {
+		fd_peer_failover_msg(peer);
+	}
 	
 	return 0;
 }
@@ -287,9 +292,12 @@
 	CHECK_POSIX( pthread_mutex_unlock(&peer->p_state_mtx) );
 	
 	if (old == STATE_OPEN) {
-		CHECK_FCT( leave_open_state(peer) );
+		CHECK_FCT( leave_open_state(peer, new_state == STATE_CLOSING_GRACE) );
 	}
-	
+	if (old == STATE_CLOSING_GRACE) {
+		fd_peer_failover_msg(peer);
+	}
+
 	if (new_state == STATE_OPEN) {
 		CHECK_FCT( enter_open_state(peer) );
 	}
@@ -298,6 +306,9 @@
 		/* Purge event list */
 		fd_psm_events_free(peer);
 		
+		/* Reset the counter of pending anwers to send */
+		peer->p_reqin_count = 0;
+		
 		/* If the peer is not persistant, we destroy it */
 		if (peer->p_hdr.info.config.pic_flags.persist == PI_PRST_NONE) {
 			CHECK_FCT( fd_event_send(peer->p_events, FDEVP_TERMINATE, 0, NULL) );
@@ -528,6 +539,11 @@
 				TS_DIFFERENCE( &delay, &reqsent, &rcvon );
 				fd_msg_log( FD_MSG_LOG_TIMING, msg, "Answer received in %d.%06.6d sec.", delay.tv_sec, delay.tv_nsec / 1000 );
 			}
+		} else {
+			/* Mark the incoming request so that we know we have pending answers for this peer */
+			CHECK_POSIX_DO( pthread_mutex_lock(&peer->p_state_mtx), goto psm_end  );
+			peer->p_reqin_count++;
+			CHECK_POSIX_DO( pthread_mutex_unlock(&peer->p_state_mtx), goto psm_end  );
 		}
 		
 		if (cur_state == STATE_OPEN_NEW) {
--- a/libfdcore/peers.c	Mon Mar 25 14:35:55 2013 +0100
+++ b/libfdcore/peers.c	Mon Mar 25 16:39:32 2013 +0100
@@ -257,15 +257,22 @@
 }
 
 /* Return the value of srlist->cnt */
-int fd_peer_get_load_pending(struct peer_hdr *peer, int * load)
+int fd_peer_get_load_pending(struct peer_hdr *peer, long * to_receive, long * to_send)
 {
 	struct fd_peer * p = (struct fd_peer *)peer;
-	TRACE_ENTRY("%p %p", peer, load);
-	CHECK_PARAMS(CHECK_PEER(peer) && load);
+	TRACE_ENTRY("%p %p %p", peer, to_receive, to_send);
+	CHECK_PARAMS(CHECK_PEER(peer));
 	
-	CHECK_POSIX( pthread_mutex_lock(&p->p_sr.mtx) );
-	*load = p->p_sr.cnt;
-	CHECK_POSIX( pthread_mutex_unlock(&p->p_sr.mtx) );
+	if (to_receive) {
+		CHECK_POSIX( pthread_mutex_lock(&p->p_sr.mtx) );
+		*to_receive = p->p_sr.cnt;
+		CHECK_POSIX( pthread_mutex_unlock(&p->p_sr.mtx) );
+	}
+	if (to_send) {
+		CHECK_POSIX( pthread_mutex_lock(&p->p_state_mtx) );
+		*to_send = p->p_reqin_count;
+		CHECK_POSIX( pthread_mutex_unlock(&p->p_state_mtx) );
+	}
 	
 	return 0;
 }
@@ -410,7 +417,7 @@
 		return;
 	}
 
-	snprintf(buf, sizeof(buf), ">  %s\t%s\t[%dsr]", STATE_STR(fd_peer_getstate(peer)), peer->p_hdr.info.pi_diamid, peer->p_sr.cnt);
+	snprintf(buf, sizeof(buf), ">  %s\t%s\t[%ldsr,%ldpa]", STATE_STR(fd_peer_getstate(peer)), peer->p_hdr.info.pi_diamid, peer->p_sr.cnt, peer->p_reqin_count);
 	if (details > INFO) {
 		snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf), "\t(rlm:%s)", peer->p_hdr.info.runtime.pir_realm ?: "<unknown>");
 		if (peer->p_hdr.info.runtime.pir_prodname)
"Welcome to our mercurial repository"