Mercurial > hg > freeDiameter
diff libfdcore/p_psm.c @ 706:4ffbc9f1e922
Large UNTESTED commit with the following changes:
* Improved DiameterIdentity handling (esp. interationalization issues),
and improve efficiency of some string operations in peers, sessions,
and dictionary modules (closes #7)
* Cleanup in the session module to free only unreferenced sessions (#16)
* Removed fd_cpu_flush_cache(), replaced by more robust alternatives.
* Improved peer state machine algorithm to counter SCTP multistream race
condition.
author | Sebastien Decugis <sdecugis@nict.go.jp> |
---|---|
date | Wed, 09 Feb 2011 15:26:58 +0900 |
parents | 78b665400097 |
children | 4a9f08d6b6ba |
line wrap: on
line diff
--- a/libfdcore/p_psm.c Mon Jan 31 17:22:21 2011 +0900 +++ b/libfdcore/p_psm.c Wed Feb 09 15:26:58 2011 +0900 @@ -35,6 +35,73 @@ #include "fdcore-internal.h" +/* +This file implement a Peer State Machine which is a mix of: + - the state machine described in rfc3588bis + - the state machine described in rfc3539#section-3.4 + - the following observations. + +The delivery of Diameter messages must not always be unordered: order is important at +begining and end of a connection lifetime. It means we need agility to +switch between "ordering enforced" and "ordering not enforced to counter +HotLB" modes of operation. + +The connection state machine represented in RFC3588 (and rfc3588bis) is +incomplete, because it lacks the SUSPECT state and the 3 DWR/DWA +exchanges (section 5.1) when the peer recovers from this state. +Personnally I don't see the rationale for exchanging 3 messages (why 3?) +but, if we require at least 1 DWR/DWA exchange to be always performed +after the CER/CEA exchange (and initiated by the peer that sent the +CEA), we have a simple way to deal with our ordering problem, as resumed +bellow. Peers are: [i]nitiator, [r]esponder. + (1) [i] SCTP connection attempt. + (2) [r] accept the connection. + (3) [i,r] (if secure port) DTLS handshake, close on failure. + (4) [i] Send CER + (5) [r] Receive CER, send CEA using stream 0, flag "unordered" cleared. + [r] Immediately send a DWR after the CEA, also using stream 0, +flag "unordered" cleared. + [r] Move to STATE_OPEN_NEW state -- equivalent to OPEN except +that all messages are sent ordered at the moment. + (6) [i] receive CEA, move to OPEN state. All messages can be sent +unordered in OPEN state. + [i] As per normal operation, reply with DWA to the DWR. + (7) [r] Upon reception of the DWA, move to OPEN state, messages can be +sent unordered from this point. + +Note about (5) and (6): if the Diameter Identity received in CER or CEA +does not match the credentials from the certificate presented during +DTLS handshake, we may need to specify a path of clean disconnection +(not blocking the remote peer waiting for something). + +This proposed mechanism removes the problem of application messages +received before the CEA by the initiator. Note that if the "old" inband +TLS handshake is used, this handshake plays the same synchronization +role than the new DWR/DWA, which becomes useless. + + +The other time where ordering is important is by the end of connection +lifetime, when one peer is shutting down the link for some reason +(reboot, overload, no activity, etc...). In case of unordered delivery, +we may have: +- peer A sends an application message followed by a DPR. Peer B receives +the DPR first and tears down the connection. Application message is lost. +- Peer B sends an application message, then receives a DPR and answers a +DPA. Peer A receives the DPA before the application message. The +application message is lost. + +This situation is actually quite possible because DPR/DPA messages are +very short, while application messages can be quite large. Therefore, +they require much more time to deliver. + +I really cannot see a way to counter this effect by using the ordering +of the messages, except by applying a timer (state STATE_CLOSING_GRACE). + +However, this problem must be balanced with the fact that the message +that is lost will be in many cases sent again as the failover mechanism +specifies. +*/ + /* The actual declaration of peer_state_str */ DECLARE_STATE_STR(); @@ -100,11 +167,13 @@ peer->p_cb2 = NULL; return 0; } + /* Insert in the active peers list */ CHECK_POSIX( pthread_rwlock_wrlock(&fd_g_activ_peers_rw) ); for (li = fd_g_activ_peers.next; li != &fd_g_activ_peers; li = li->next) { struct fd_peer * next_p = (struct fd_peer *)li->o; - int cmp = strcmp(peer->p_hdr.info.pi_diamid, next_p->p_hdr.info.pi_diamid); + int cmp = fd_os_cmp(peer->p_hdr.info.pi_diamid, peer->p_hdr.info.pi_diamidlen, + next_p->p_hdr.info.pi_diamid, next_p->p_hdr.info.pi_diamidlen); if (cmp < 0) break; } @@ -114,7 +183,7 @@ /* Callback registered when the peer was added, by fd_peer_add */ if (peer->p_cb) { TRACE_DEBUG(FULL, "Calling add callback for peer %s", peer->p_hdr.info.pi_diamid); - (*peer->p_cb)(&peer->p_hdr.info, peer->p_cb_data); + (*peer->p_cb)(&peer->p_hdr.info, peer->p_cb_data); /* TODO: do this in a separate detached thread? */ peer->p_cb = NULL; peer->p_cb_data = NULL; } @@ -177,6 +246,23 @@ } } +/* Read state */ +int fd_peer_get_state(struct peer_hdr *peer) +{ + int ret; + + struct fd_peer * p = (struct fd_peer *)peer; + + if (!CHECK_PEER(p)) + return -1; + + CHECK_POSIX_DO( pthread_mutex_lock(&p->p_state_mtx), return -1 ); + ret = p->p_state; + CHECK_POSIX_DO( pthread_mutex_unlock(&p->p_state_mtx), return -1 ); + + return ret; +} + /* Change state */ int fd_psm_change_state(struct fd_peer * peer, int new_state) @@ -185,8 +271,8 @@ TRACE_ENTRY("%p %d(%s)", peer, new_state, STATE_STR(new_state)); CHECK_PARAMS( CHECK_PEER(peer) ); - fd_cpu_flush_cache(); - old = peer->p_hdr.info.runtime.pir_state; + + old = fd_peer_getstate(peer); if (old == new_state) return 0; @@ -195,8 +281,10 @@ STATE_STR(new_state), peer->p_hdr.info.pi_diamid); - peer->p_hdr.info.runtime.pir_state = new_state; - fd_cpu_flush_cache(); + + CHECK_POSIX( pthread_mutex_lock(&peer->p_state_mtx) ); + peer->p_state = new_state; + CHECK_POSIX( pthread_mutex_unlock(&peer->p_state_mtx) ); if (old == STATE_OPEN) { CHECK_FCT( leave_open_state(peer) ); @@ -254,8 +342,7 @@ void fd_psm_cleanup(struct fd_peer * peer, int terminate) { /* Move to CLOSED state: failover messages, stop OUT thread, unlink peer from active list */ - fd_cpu_flush_cache(); - if (peer->p_hdr.info.runtime.pir_state != STATE_ZOMBIE) { + if (fd_peer_getstate(peer) != STATE_ZOMBIE) { CHECK_FCT_DO( fd_psm_change_state(peer, STATE_CLOSED), /* continue */ ); } @@ -284,8 +371,9 @@ { struct fd_peer * peer = (struct fd_peer *)arg; CHECK_PARAMS_DO( CHECK_PEER(peer), return ); - peer->p_hdr.info.runtime.pir_state = STATE_ZOMBIE; - fd_cpu_flush_cache(); + CHECK_POSIX_DO( pthread_mutex_lock(&peer->p_state_mtx), ); + peer->p_state = STATE_ZOMBIE; + CHECK_POSIX_DO( pthread_mutex_unlock(&peer->p_state_mtx), ); return; } @@ -297,6 +385,7 @@ int event; size_t ev_sz; void * ev_data; + int cur_state; CHECK_PARAMS_DO( CHECK_PEER(peer), ASSERT(0) ); @@ -305,12 +394,14 @@ /* Set the thread name */ { char buf[48]; - sprintf(buf, "PSM/%.*s", (int)sizeof(buf) - 5, peer->p_hdr.info.pi_diamid); + snprintf(buf, sizeof(buf), "PSM/%s", peer->p_hdr.info.pi_diamid); fd_log_threadname ( buf ); } /* The state machine starts in CLOSED state */ - peer->p_hdr.info.runtime.pir_state = STATE_CLOSED; + CHECK_POSIX_DO( pthread_mutex_lock(&peer->p_state_mtx), goto psm_end ); + peer->p_state = STATE_CLOSED; + CHECK_POSIX_DO( pthread_mutex_unlock(&peer->p_state_mtx), goto psm_end ); /* Wait that the PSM are authorized to start in the daemon */ CHECK_FCT_DO( fd_psm_waitstart(), goto psm_end ); @@ -325,23 +416,29 @@ psm_loop: /* Get next event */ TRACE_DEBUG(FULL, "'%s' in state '%s' waiting for next event.", - peer->p_hdr.info.pi_diamid, STATE_STR(peer->p_hdr.info.runtime.pir_state)); + peer->p_hdr.info.pi_diamid, STATE_STR(fd_peer_getstate(peer))); CHECK_FCT_DO( fd_event_timedget(peer->p_events, &peer->p_psm_timer, FDEVP_PSM_TIMEOUT, &event, &ev_sz, &ev_data), goto psm_end ); + + cur_state = fd_peer_getstate(peer); + if (cur_state == -1) + goto psm_end; + TRACE_DEBUG(FULL, "'%s'\t<-- '%s'\t(%p,%zd)\t'%s'", - STATE_STR(peer->p_hdr.info.runtime.pir_state), + STATE_STR(cur_state), fd_pev_str(event), ev_data, ev_sz, peer->p_hdr.info.pi_diamid); /* Now, the action depends on the current state and the incoming event */ /* The following states are impossible */ - ASSERT( peer->p_hdr.info.runtime.pir_state != STATE_NEW ); - ASSERT( peer->p_hdr.info.runtime.pir_state != STATE_ZOMBIE ); - ASSERT( peer->p_hdr.info.runtime.pir_state != STATE_OPEN_HANDSHAKE ); /* because it should exist only between two loops */ + ASSERT( cur_state != STATE_NEW ); + ASSERT( cur_state != STATE_ZOMBIE ); + ASSERT( cur_state != STATE_OPEN_HANDSHAKE ); /* because it should exist only between two loops */ /* Purge invalid events */ if (!CHECK_PEVENT(event)) { TRACE_DEBUG(INFO, "Invalid event received in PSM '%s' : %d", peer->p_hdr.info.pi_diamid, event); + ASSERT(0); /* we should investigate this situation */ goto psm_loop; } @@ -353,15 +450,17 @@ /* Requests to terminate the peer object */ if (event == FDEVP_TERMINATE) { - switch (peer->p_hdr.info.runtime.pir_state) { + switch (cur_state) { case STATE_OPEN: + case STATE_OPEN_NEW: case STATE_REOPEN: - /* We cannot just close the conenction, we have to send a DPR first */ + /* We cannot just close the connection, we have to send a DPR first */ CHECK_FCT_DO( fd_p_dp_initiate(peer, ev_data), goto psm_end ); goto psm_loop; /* case STATE_CLOSING: + case STATE_CLOSING_GRACE: case STATE_WAITCNXACK: case STATE_WAITCNXACK_ELEC: case STATE_WAITCEA: @@ -379,13 +478,6 @@ struct msg * msg = NULL; struct msg_hdr * hdr; - /* If the current state does not allow receiving messages, just drop it */ - if (peer->p_hdr.info.runtime.pir_state == STATE_CLOSED) { - TRACE_DEBUG(FULL, "Purging message in queue while in CLOSED state (%zdb)", ev_sz); - free(ev_data); - goto psm_loop; - } - /* Parse the received buffer */ CHECK_FCT_DO( fd_msg_parse_buffer( (void *)&ev_data, ev_sz, &msg), { @@ -395,8 +487,16 @@ goto psm_loop; } ); + /* If the current state does not allow receiving messages, just drop it */ + if (cur_state == STATE_CLOSED) { + /* In such case, just discard the message */ + fd_msg_log( FD_MSG_LOG_DROPPED, msg, "Purged from peer '%s''s queue (CLOSED state).", peer->p_hdr.info.pi_diamid ); + fd_msg_free(msg); + goto psm_loop; + } + /* Log incoming message */ - fd_msg_log( FD_MSG_LOG_RECEIVED, msg, "Received %zdb from '%s'", ev_sz, peer->p_hdr.info.pi_diamid ); + fd_msg_log( FD_MSG_LOG_RECEIVED, msg, "Received %zdb from '%s' (%s)", ev_sz, peer->p_hdr.info.pi_diamid, STATE_STR(cur_state) ); /* Extract the header */ CHECK_FCT_DO( fd_msg_hdr(msg, &hdr), goto psm_end ); @@ -416,27 +516,34 @@ CHECK_FCT_DO( fd_msg_answ_associate( msg, req ), goto psm_end ); } + if (cur_state == STATE_OPEN_NEW) { + /* OK, we have received something, so the connection is supposedly now in OPEN state at the remote site */ + fd_psm_change_state(peer, STATE_OPEN ); + } + /* Now handle non-link-local messages */ if (fd_msg_is_routable(msg)) { - switch (peer->p_hdr.info.runtime.pir_state) { + switch (cur_state) { /* To maximize compatibility -- should not be a security issue here */ case STATE_REOPEN: case STATE_SUSPECT: case STATE_CLOSING: + case STATE_CLOSING_GRACE: TRACE_DEBUG(FULL, "Accepted a message while not in OPEN state... "); /* The standard situation : */ + case STATE_OPEN_NEW: case STATE_OPEN: /* We received a valid routable message, update the expiry timer */ CHECK_FCT_DO( fd_p_expi_update(peer), goto psm_end ); /* Set the message source and add the Route-Record */ - CHECK_FCT_DO( fd_msg_source_set( msg, peer->p_hdr.info.pi_diamid, 1, fd_g_config->cnf_dict ), goto psm_end); + CHECK_FCT_DO( fd_msg_source_set( msg, peer->p_hdr.info.pi_diamid, peer->p_hdr.info.pi_diamidlen, 1, fd_g_config->cnf_dict ), goto psm_end); /* Requeue to the global incoming queue */ CHECK_FCT_DO(fd_fifo_post(fd_g_incoming, &msg), goto psm_end ); /* Update the peer timer (only in OPEN state) */ - if ((peer->p_hdr.info.runtime.pir_state == STATE_OPEN) && (!peer->p_flags.pf_dw_pending)) { + if ((cur_state == STATE_OPEN) && (!peer->p_flags.pf_dw_pending)) { fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_twtimer ?: fd_g_config->cnf_timer_tw); } break; @@ -448,7 +555,7 @@ case STATE_CLOSED: default: /* In such case, just discard the message */ - fd_msg_log( FD_MSG_LOG_DROPPED, msg, "Received from peer '%s' while connection was not in OPEN state.", peer->p_hdr.info.pi_diamid ); + fd_msg_log( FD_MSG_LOG_DROPPED, msg, "Received from peer '%s' while connection was not in state %s.", peer->p_hdr.info.pi_diamid, STATE_STR(cur_state) ); fd_msg_free(msg); } goto psm_loop; @@ -484,8 +591,9 @@ case CC_DISCONNECT_PEER: CHECK_FCT_DO( fd_p_dp_handle(&msg, (hdr->msg_flags & CMD_FLAG_REQUEST), peer), goto psm_reset ); - if (peer->p_hdr.info.runtime.pir_state == STATE_CLOSING) + if (fd_peer_getstate(peer) == STATE_CLOSING) goto psm_end; + break; case CC_DEVICE_WATCHDOG: @@ -493,7 +601,7 @@ break; default: - /* Unknown / unexpected / invalid message */ + /* Unknown / unexpected / invalid message -- but validated by our dictionary */ TRACE_DEBUG(INFO, "Invalid non-routable command received: %u.", hdr->msg_code); if (hdr->msg_flags & CMD_FLAG_REQUEST) { do { @@ -501,14 +609,14 @@ CHECK_FCT_DO( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, &msg, MSGFL_ANSW_ERROR ), break ); /* Set the error code */ - CHECK_FCT_DO( fd_msg_rescode_set(msg, "DIAMETER_INVALID_HDR_BITS", NULL, NULL, 1 ), break ); + CHECK_FCT_DO( fd_msg_rescode_set(msg, "DIAMETER_COMMAND_UNSUPPORTED", "Or maybe the P-bit or application Id are erroneous.", NULL, 1 ), break ); /* Send the answer */ CHECK_FCT_DO( fd_out_send(&msg, peer->p_cnxctx, peer, FD_CNX_ORDERED), break ); } while (0); } else { /* We did ASK for it ??? */ - fd_log_debug("Invalid PXY flag in answer header ?\n"); + TRACE_DEBUG(INFO, "Received answer with erroneous 'is_routable' result..."); } /* Cleanup the message if not done */ @@ -530,7 +638,7 @@ /* The connection object is broken */ if (event == FDEVP_CNX_ERROR) { - switch (peer->p_hdr.info.runtime.pir_state) { + switch (cur_state) { case STATE_WAITCNXACK_ELEC: /* Abort the initiating side */ fd_p_cnx_abort(peer, 0); @@ -540,6 +648,7 @@ case STATE_WAITCEA: case STATE_OPEN: + case STATE_OPEN_NEW: case STATE_REOPEN: case STATE_WAITCNXACK: case STATE_SUSPECT: @@ -557,6 +666,15 @@ /* We sent a DPR so we are terminating, do not wait for DPA */ goto psm_end; + case STATE_CLOSING_GRACE: + if (peer->p_flags.pf_localterm) /* initiated here */ + goto psm_end; + + fd_psm_cleanup(peer, 0); + + /* Reset the timer for next connection attempt */ + fd_psm_next_timeout(peer, 1, fd_p_dp_newdelay(peer)); + goto psm_loop; } goto psm_loop; } @@ -615,7 +733,7 @@ CHECK_POSIX_DO( pthread_join( peer->p_ini_thr, NULL), /* ignore, it is not a big deal */); peer->p_ini_thr = (pthread_t)NULL; - switch (peer->p_hdr.info.runtime.pir_state) { + switch (cur_state) { case STATE_WAITCNXACK_ELEC: case STATE_WAITCNXACK: fd_p_ce_handle_newcnx(peer, cnx); @@ -623,7 +741,7 @@ default: /* Just abort the attempt and continue */ - TRACE_DEBUG(FULL, "Connection attempt successful but current state is %s, closing...", STATE_STR(peer->p_hdr.info.runtime.pir_state)); + TRACE_DEBUG(FULL, "Connection attempt successful but current state is %s, closing... (too slow?)", STATE_STR(cur_state)); fd_cnx_destroy(cnx); } @@ -637,7 +755,7 @@ CHECK_POSIX_DO( pthread_join( peer->p_ini_thr, NULL), /* ignore, it is not a big deal */); peer->p_ini_thr = (pthread_t)NULL; - switch (peer->p_hdr.info.runtime.pir_state) { + switch (cur_state) { case STATE_WAITCNXACK_ELEC: /* Abort the initiating side */ fd_p_cnx_abort(peer, 0); @@ -652,7 +770,7 @@ default: /* Just ignore */ - TRACE_DEBUG(FULL, "Connection attempt failed but current state is %s, ignoring...", STATE_STR(peer->p_hdr.info.runtime.pir_state)); + TRACE_DEBUG(FULL, "Connection attempt failed but current state is %s, ignoring...", STATE_STR(cur_state)); } goto psm_loop; @@ -660,9 +778,10 @@ /* The timeout for the current state has been reached */ if (event == FDEVP_PSM_TIMEOUT) { - switch (peer->p_hdr.info.runtime.pir_state) { + switch (cur_state) { case STATE_OPEN: case STATE_REOPEN: + case STATE_OPEN_NEW: CHECK_FCT_DO( fd_p_dw_timeout(peer), goto psm_end ); goto psm_loop; @@ -675,7 +794,6 @@ case STATE_SUSPECT: /* Mark the connection problem */ peer->p_flags.pf_cnx_pb = 1; - case STATE_CLOSING: case STATE_WAITCNXACK: case STATE_WAITCEA: @@ -683,6 +801,16 @@ fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_tctimer ?: fd_g_config->cnf_timer_tc); goto psm_reset; + case STATE_CLOSING_GRACE: + /* The grace period is completed, now close */ + if (peer->p_flags.pf_localterm) + goto psm_end; + + fd_psm_cleanup(peer, 0); + /* Reset the timer for next connection attempt */ + fd_psm_next_timeout(peer, 1, fd_p_dp_newdelay(peer)); + goto psm_loop; + case STATE_WAITCNXACK_ELEC: /* Abort the initiating side */ fd_p_cnx_abort(peer, 0); @@ -696,7 +824,7 @@ } /* Default action : the handling has not yet been implemented. [for debug only] */ - TRACE_DEBUG(INFO, "Missing handler in PSM for '%s'\t<-- '%s'", STATE_STR(peer->p_hdr.info.runtime.pir_state), fd_pev_str(event)); + TRACE_DEBUG(INFO, "Missing handler in PSM for '%s'\t<-- '%s'", STATE_STR(cur_state), fd_pev_str(event)); psm_reset: if (peer->p_flags.pf_delete) goto psm_end; @@ -706,10 +834,9 @@ psm_end: fd_psm_cleanup(peer, 1); TRACE_DEBUG(INFO, "'%s'\t-> STATE_ZOMBIE (terminated)\t'%s'", - STATE_STR(peer->p_hdr.info.runtime.pir_state), + STATE_STR(fd_peer_getstate(peer)), peer->p_hdr.info.pi_diamid); pthread_cleanup_pop(1); /* set STATE_ZOMBIE */ - fd_cpu_flush_cache(); peer->p_psm = (pthread_t)NULL; pthread_detach(pthread_self()); return NULL; @@ -725,7 +852,7 @@ TRACE_ENTRY("%p", peer); /* Check the peer and state are OK */ - CHECK_PARAMS( CHECK_PEER(peer) && (peer->p_hdr.info.runtime.pir_state == STATE_NEW) ); + CHECK_PARAMS( fd_peer_getstate(peer) == STATE_NEW ); /* Create the FIFO for events */ CHECK_FCT( fd_fifo_new(&peer->p_events) ); @@ -743,8 +870,7 @@ TRACE_ENTRY("%p", peer); CHECK_PARAMS( CHECK_PEER(peer) ); - fd_cpu_flush_cache(); - if (peer->p_hdr.info.runtime.pir_state != STATE_ZOMBIE) { + if (fd_peer_getstate(peer) != STATE_ZOMBIE) { CHECK_FCT( fd_event_send(peer->p_events, FDEVP_TERMINATE, 0, reason) ); } else { TRACE_DEBUG(FULL, "Peer '%s' was already terminated", peer->p_hdr.info.pi_diamid);