# HG changeset patch # User Sebastien Decugis # Date 1364225972 -3600 # Node ID 357c2f892d24424a991cc0216f1729878ad8299d # Parent e22434c661262a8916d743841ba34e3ee308ec2e Implement a new counter on pending answers to send back to a peer. Function fd_peer_get_load_pending updated to retrieve this counter as well. When a peer has answers pending, the connection is not immediately teared down upon DPR/DPA exchange, but a GRACE_TIMEOUT delay (default 1 sec) is granted. diff -r e22434c66126 -r 357c2f892d24 include/freeDiameter/libfdcore.h --- a/include/freeDiameter/libfdcore.h Mon Mar 25 14:35:55 2013 +0100 +++ b/include/freeDiameter/libfdcore.h Mon Mar 25 16:39:32 2013 +0100 @@ -389,6 +389,8 @@ * * PARAMETERS: * peer : The peer which load to read + * to_receive : (out) number of requests sent to this peer without matching answer yet. + * to_send : (out) number of requests received from this peer and not yet answered. * * DESCRIPTION: * Returns the current number of requests sent to this peer @@ -399,7 +401,7 @@ * 0 : The load parameter has been updated. (it should have a positive value always) * !0 : An error occurred */ -int fd_peer_get_load_pending(struct peer_hdr *peer, int * load); +int fd_peer_get_load_pending(struct peer_hdr *peer, long * to_receive, long * to_send); /* * FUNCTION: fd_peer_validate_register diff -r e22434c66126 -r 357c2f892d24 libfdcore/fdcore-internal.h --- a/libfdcore/fdcore-internal.h Mon Mar 25 14:35:55 2013 +0100 +++ b/libfdcore/fdcore-internal.h Mon Mar 25 16:39:32 2013 +0100 @@ -70,6 +70,11 @@ #define DPR_TIMEOUT 15 /* in seconds */ #endif /* DPR_TIMEOUT */ +/* Delai where the connection is maintained opened to allow exchanging remaining pending answers after DPR/DPA */ +#ifndef GRACE_TIMEOUT +#define GRACE_TIMEOUT 1 /* in seconds */ +#endif /* GRACE_TIMEOUT */ + /* The Vendor-Id to advertise in CER/CEA */ #ifndef MY_VENDOR_ID #define MY_VENDOR_ID 0 /* Reserved value to tell it must be ignored */ @@ -125,7 +130,7 @@ struct sr_list { struct fd_list srs; /* requests ordered by hop-by-hop id */ struct fd_list exp; /* requests that have a timeout set, ordered by timeout */ - int cnt; /* number of requests in the srs list */ + long cnt; /* number of requests in the srs list */ pthread_mutex_t mtx; /* mutex to protect these lists */ pthread_cond_t cnd; /* cond var used by the thread that handles timeouts */ pthread_t thr; /* the thread that handles timeouts (and calls the anscb) */ @@ -181,6 +186,9 @@ /* Sent requests (for fallback), list of struct sentreq ordered by hbh */ struct sr_list p_sr; + /* Pending received requests not yet answered (count only) */ + long p_reqin_count; /* We use p_state_mtx to protect this value */ + /* Data for transitional states before the peer is in OPEN state */ struct { struct cnxctx * p_receiver; /* Only used in case of election */ diff -r e22434c66126 -r 357c2f892d24 libfdcore/p_ce.c --- a/libfdcore/p_ce.c Mon Mar 25 14:35:55 2013 +0100 +++ b/libfdcore/p_ce.c Mon Mar 25 16:39:32 2013 +0100 @@ -918,6 +918,11 @@ isi = 0; } + /* Update the counter to match with the answer being sent */ + CHECK_POSIX( pthread_mutex_lock(&peer->p_state_mtx) ); + peer->p_reqin_count++; + CHECK_POSIX( pthread_mutex_unlock(&peer->p_state_mtx) ); + /* Reply a CEA */ CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, &msg, 0 ) ); CHECK_FCT( fd_msg_rescode_set(msg, "DIAMETER_SUCCESS", NULL, NULL, 0 ) ); diff -r e22434c66126 -r 357c2f892d24 libfdcore/p_dp.c --- a/libfdcore/p_dp.c Mon Mar 25 14:35:55 2013 +0100 +++ b/libfdcore/p_dp.c Mon Mar 25 16:39:32 2013 +0100 @@ -62,6 +62,7 @@ /* Handle a received message */ int fd_p_dp_handle(struct msg ** msg, int req, struct fd_peer * peer) { + long to_receive, to_send; TRACE_ENTRY("%p %d %p", msg, req, peer); if (req) { @@ -109,28 +110,34 @@ CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, msg, 0 ) ); CHECK_FCT( fd_msg_rescode_set( *msg, "DIAMETER_SUCCESS", NULL, NULL, 1 ) ); - /* Move to CLOSING state to failover outgoing messages (and avoid failing over the DPA...) */ - CHECK_FCT( fd_psm_change_state(peer, STATE_CLOSING) ); - - /* Now send the DPA */ - CHECK_FCT( fd_out_send( msg, NULL, peer, FD_CNX_ORDERED) ); + /* Do we have pending exchanges with this peer? */ + CHECK_FCT( fd_peer_get_load_pending(&peer->p_hdr, &to_receive, &to_send) ); - if (fd_cnx_isMultichan(peer->p_cnxctx)) { - /* There is a possibililty that messages are still in the pipe coming here, so let's grace for 1 second */ - CHECK_FCT( fd_psm_change_state(peer, STATE_CLOSING_GRACE) ); - fd_psm_next_timeout(peer, 0, 1); + if ((to_receive == 0) && (to_send == 1 /* only the DPA */)) { + /* No pending exchange, move to CLOSING directly */ + CHECK_FCT( fd_psm_change_state(peer, STATE_CLOSING) ); + + /* Now send the DPA */ + CHECK_FCT( fd_out_send( msg, NULL, peer, FD_CNX_ORDERED) ); - } else { - /* Move to CLOSED state */ + /* and move to CLOSED */ fd_psm_cleanup(peer, 0); /* Reset the timer for next connection attempt -- we'll retry sooner or later depending on the disconnection cause */ fd_psm_next_timeout(peer, 1, fd_p_dp_newdelay(peer)); + } else { + /* We have pending exchanges, we move to CLOSING_GRACE which allows exchanges of answers but + not new requests */ + CHECK_FCT( fd_psm_change_state(peer, STATE_CLOSING_GRACE) ); + fd_psm_next_timeout(peer, 0, GRACE_TIMEOUT); + + /* Now send the DPA */ + CHECK_FCT( fd_out_send( msg, NULL, peer, FD_CNX_ORDERED) ); } } else { /* We received a DPA */ int curstate = fd_peer_getstate(peer); - if (curstate != STATE_CLOSING) { + if (curstate != STATE_CLOSING_GRACE) { TRACE_DEBUG(INFO, "Ignoring DPA received in state %s", STATE_STR(curstate)); } @@ -140,12 +147,16 @@ CHECK_FCT_DO( fd_msg_free( *msg ), /* continue */ ); *msg = NULL; - if (fd_cnx_isMultichan(peer->p_cnxctx)) { - CHECK_FCT( fd_psm_change_state(peer, STATE_CLOSING_GRACE) ); - fd_psm_next_timeout(peer, 0, 1); + /* Do we still have pending exchanges with this peer? */ + CHECK_FCT( fd_peer_get_load_pending(&peer->p_hdr, &to_receive, &to_send) ); + if ((to_receive != 0) || (to_send != 0)) { + TRACE_DEBUG(INFO, "Received DPA but pending load: [%ld, %ld], giving grace delay before closing", to_receive, to_send); + fd_psm_next_timeout(peer, 0, GRACE_TIMEOUT); peer->p_flags.pf_localterm = 1; + } else { + /* otherwise, go to CLOSING state, the psm will handle terminating the connection */ + CHECK_FCT( fd_psm_change_state(peer, STATE_CLOSING) ); } - /* otherwise, return in CLOSING state, the psm will handle it */ } return 0; @@ -187,7 +198,7 @@ peer->p_hdr.info.runtime.pir_lastDC = val.u32; /* Update the peer state and timer */ - CHECK_FCT( fd_psm_change_state(peer, STATE_CLOSING) ); + CHECK_FCT( fd_psm_change_state(peer, STATE_CLOSING_GRACE) ); fd_psm_next_timeout(peer, 0, DPR_TIMEOUT); /* Now send the DPR message */ diff -r e22434c66126 -r 357c2f892d24 libfdcore/p_out.c --- a/libfdcore/p_out.c Mon Mar 25 14:35:55 2013 +0100 +++ b/libfdcore/p_out.c Mon Mar 25 16:39:32 2013 +0100 @@ -172,9 +172,21 @@ /* Wrapper to sending a message either by out thread (peer in OPEN state) or directly; cnx or peer must be provided. Flags are valid only for direct sending, not through thread (unused) */ int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer, uint32_t flags) { + struct msg_hdr * hdr; + TRACE_ENTRY("%p %p %p %x", msg, cnx, peer, flags); CHECK_PARAMS( msg && *msg && (cnx || (peer && peer->p_cnxctx))); + if (peer) { + CHECK_FCT( fd_msg_hdr(*msg, &hdr) ); + if (!(hdr->msg_flags & CMD_FLAG_REQUEST)) { + /* Update the count of pending answers to send */ + CHECK_POSIX( pthread_mutex_lock(&peer->p_state_mtx) ); + peer->p_reqin_count--; + CHECK_POSIX( pthread_mutex_unlock(&peer->p_state_mtx) ); + } + } + if (fd_peer_getstate(peer) == STATE_OPEN) { /* Normal case: just queue for the out thread to pick it up */ CHECK_FCT( fd_fifo_post(peer->p_tosend, msg) ); diff -r e22434c66126 -r 357c2f892d24 libfdcore/p_psm.c --- a/libfdcore/p_psm.c Mon Mar 25 14:35:55 2013 +0100 +++ b/libfdcore/p_psm.c Mon Mar 25 16:39:32 2013 +0100 @@ -44,9 +44,9 @@ The delivery of Diameter messages must not always be unordered: order is important at begining and end of a connection lifetime. It means we need agility to switch between "ordering enforced" and "ordering not enforced to counter -HotLB" modes of operation. +Head of the Line Blocking" modes of operation. -The connection state machine represented in RFC3588 (and rfc3588bis) is +The connection state machine represented in RFC3588 (and RFC6733) is incomplete, because it lacks the SUSPECT state and the 3 DWR/DWA exchanges (section 5.1) when the peer recovers from this state. Personnally I don't see the rationale for exchanging 3 messages (why 3?) @@ -90,12 +90,15 @@ DPA. Peer A receives the DPA before the application message. The application message is lost. -This situation is actually quite possible because DPR/DPA messages are +This situation is actually happening easily because DPR/DPA messages are very short, while application messages can be quite large. Therefore, they require much more time to deliver. I really cannot see a way to counter this effect by using the ordering of the messages, except by applying a timer (state STATE_CLOSING_GRACE). +This timer can be also useful when we detect that some messages has not +yet received an answer on this link, to give time to the application to +complete the exchange ongoing. However, this problem must be balanced with the fact that the message that is lost will be in many cases sent again as the failover mechanism @@ -196,7 +199,7 @@ return 0; } -static int leave_open_state(struct fd_peer * peer) +static int leave_open_state(struct fd_peer * peer, int skip_failover) { /* Remove from active peers list */ CHECK_POSIX( pthread_rwlock_wrlock(&fd_g_activ_peers_rw) ); @@ -207,7 +210,9 @@ CHECK_FCT( fd_out_stop(peer) ); /* Failover the messages */ - fd_peer_failover_msg(peer); + if (!skip_failover) { + fd_peer_failover_msg(peer); + } return 0; } @@ -287,9 +292,12 @@ CHECK_POSIX( pthread_mutex_unlock(&peer->p_state_mtx) ); if (old == STATE_OPEN) { - CHECK_FCT( leave_open_state(peer) ); + CHECK_FCT( leave_open_state(peer, new_state == STATE_CLOSING_GRACE) ); } - + if (old == STATE_CLOSING_GRACE) { + fd_peer_failover_msg(peer); + } + if (new_state == STATE_OPEN) { CHECK_FCT( enter_open_state(peer) ); } @@ -298,6 +306,9 @@ /* Purge event list */ fd_psm_events_free(peer); + /* Reset the counter of pending anwers to send */ + peer->p_reqin_count = 0; + /* If the peer is not persistant, we destroy it */ if (peer->p_hdr.info.config.pic_flags.persist == PI_PRST_NONE) { CHECK_FCT( fd_event_send(peer->p_events, FDEVP_TERMINATE, 0, NULL) ); @@ -528,6 +539,11 @@ TS_DIFFERENCE( &delay, &reqsent, &rcvon ); fd_msg_log( FD_MSG_LOG_TIMING, msg, "Answer received in %d.%06.6d sec.", delay.tv_sec, delay.tv_nsec / 1000 ); } + } else { + /* Mark the incoming request so that we know we have pending answers for this peer */ + CHECK_POSIX_DO( pthread_mutex_lock(&peer->p_state_mtx), goto psm_end ); + peer->p_reqin_count++; + CHECK_POSIX_DO( pthread_mutex_unlock(&peer->p_state_mtx), goto psm_end ); } if (cur_state == STATE_OPEN_NEW) { diff -r e22434c66126 -r 357c2f892d24 libfdcore/peers.c --- a/libfdcore/peers.c Mon Mar 25 14:35:55 2013 +0100 +++ b/libfdcore/peers.c Mon Mar 25 16:39:32 2013 +0100 @@ -257,15 +257,22 @@ } /* Return the value of srlist->cnt */ -int fd_peer_get_load_pending(struct peer_hdr *peer, int * load) +int fd_peer_get_load_pending(struct peer_hdr *peer, long * to_receive, long * to_send) { struct fd_peer * p = (struct fd_peer *)peer; - TRACE_ENTRY("%p %p", peer, load); - CHECK_PARAMS(CHECK_PEER(peer) && load); + TRACE_ENTRY("%p %p %p", peer, to_receive, to_send); + CHECK_PARAMS(CHECK_PEER(peer)); - CHECK_POSIX( pthread_mutex_lock(&p->p_sr.mtx) ); - *load = p->p_sr.cnt; - CHECK_POSIX( pthread_mutex_unlock(&p->p_sr.mtx) ); + if (to_receive) { + CHECK_POSIX( pthread_mutex_lock(&p->p_sr.mtx) ); + *to_receive = p->p_sr.cnt; + CHECK_POSIX( pthread_mutex_unlock(&p->p_sr.mtx) ); + } + if (to_send) { + CHECK_POSIX( pthread_mutex_lock(&p->p_state_mtx) ); + *to_send = p->p_reqin_count; + CHECK_POSIX( pthread_mutex_unlock(&p->p_state_mtx) ); + } return 0; } @@ -410,7 +417,7 @@ return; } - snprintf(buf, sizeof(buf), "> %s\t%s\t[%dsr]", STATE_STR(fd_peer_getstate(peer)), peer->p_hdr.info.pi_diamid, peer->p_sr.cnt); + snprintf(buf, sizeof(buf), "> %s\t%s\t[%ldsr,%ldpa]", STATE_STR(fd_peer_getstate(peer)), peer->p_hdr.info.pi_diamid, peer->p_sr.cnt, peer->p_reqin_count); if (details > INFO) { snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf), "\t(rlm:%s)", peer->p_hdr.info.runtime.pir_realm ?: ""); if (peer->p_hdr.info.runtime.pir_prodname)