diff libfdcore/p_psm.c @ 706:4ffbc9f1e922

Large UNTESTED commit with the following changes: * Improved DiameterIdentity handling (esp. interationalization issues), and improve efficiency of some string operations in peers, sessions, and dictionary modules (closes #7) * Cleanup in the session module to free only unreferenced sessions (#16) * Removed fd_cpu_flush_cache(), replaced by more robust alternatives. * Improved peer state machine algorithm to counter SCTP multistream race condition.
author Sebastien Decugis <sdecugis@nict.go.jp>
date Wed, 09 Feb 2011 15:26:58 +0900
parents 78b665400097
children 4a9f08d6b6ba
line wrap: on
line diff
--- a/libfdcore/p_psm.c	Mon Jan 31 17:22:21 2011 +0900
+++ b/libfdcore/p_psm.c	Wed Feb 09 15:26:58 2011 +0900
@@ -35,6 +35,73 @@
 
 #include "fdcore-internal.h"
 
+/*
+This file implement a Peer State Machine which is a mix of:
+ - the state machine described in rfc3588bis
+ - the state machine described in rfc3539#section-3.4
+ - the following observations.
+ 
+The delivery of Diameter messages must not always be unordered: order is important at
+begining and end of a connection lifetime. It means we need agility to
+switch between "ordering enforced" and "ordering not enforced to counter
+HotLB" modes of operation.
+
+The connection state machine represented in RFC3588 (and rfc3588bis) is
+incomplete, because it lacks the SUSPECT state and the 3 DWR/DWA
+exchanges (section 5.1) when the peer recovers from this state.
+Personnally I don't see the rationale for exchanging 3 messages (why 3?)
+but, if we require at least 1 DWR/DWA exchange to be always performed
+after the CER/CEA exchange (and initiated by the peer that sent the
+CEA), we have a simple way to deal with our ordering problem, as resumed
+bellow. Peers are: [i]nitiator, [r]esponder.
+ (1) [i] SCTP connection attempt.
+ (2) [r] accept the connection.
+ (3) [i,r] (if secure port) DTLS handshake, close on failure.
+ (4) [i] Send CER
+ (5) [r] Receive CER, send CEA using stream 0, flag "unordered" cleared.
+       [r] Immediately send a DWR after the CEA, also using stream 0,
+flag "unordered" cleared.
+       [r] Move to STATE_OPEN_NEW state -- equivalent to OPEN except
+that all messages are sent ordered at the moment.
+ (6) [i] receive CEA, move to OPEN state. All messages can be sent
+unordered in OPEN state.
+       [i] As per normal operation, reply with DWA to the DWR.
+ (7) [r] Upon reception of the DWA, move to OPEN state, messages can be
+sent unordered from this point.
+
+Note about (5) and (6): if the Diameter Identity received in CER or CEA
+does not match the credentials from the certificate presented during
+DTLS handshake, we may need to specify a path of clean disconnection
+(not blocking the remote peer waiting for something).
+
+This proposed mechanism removes the problem of application messages
+received before the CEA by the initiator. Note that if the "old" inband
+TLS handshake is used, this handshake plays the same synchronization
+role than the new DWR/DWA, which becomes useless.
+
+
+The other time where ordering is important is by the end of connection
+lifetime, when one peer is shutting down the link for some reason
+(reboot, overload, no activity, etc...). In case of unordered delivery,
+we may have:
+- peer A sends an application message followed by a DPR. Peer B receives
+the DPR first and tears down the connection. Application message is lost.
+- Peer B sends an application message, then receives a DPR and answers a
+DPA. Peer A receives the DPA before the application message. The
+application message is lost.
+
+This situation is actually quite possible because DPR/DPA messages are
+very short, while application messages can be quite large. Therefore,
+they require much more time to deliver.
+
+I really cannot see a way to counter this effect by using the ordering
+of the messages, except by applying a timer (state STATE_CLOSING_GRACE).
+
+However, this problem must be balanced with the fact that the message
+that is lost will be in many cases sent again as the failover mechanism
+specifies.
+*/
+
 /* The actual declaration of peer_state_str */
 DECLARE_STATE_STR();
 
@@ -100,11 +167,13 @@
 		peer->p_cb2 = NULL;
 		return 0;
 	}
+	
 	/* Insert in the active peers list */
 	CHECK_POSIX( pthread_rwlock_wrlock(&fd_g_activ_peers_rw) );
 	for (li = fd_g_activ_peers.next; li != &fd_g_activ_peers; li = li->next) {
 		struct fd_peer * next_p = (struct fd_peer *)li->o;
-		int cmp = strcmp(peer->p_hdr.info.pi_diamid, next_p->p_hdr.info.pi_diamid);
+		int cmp = fd_os_cmp(peer->p_hdr.info.pi_diamid, peer->p_hdr.info.pi_diamidlen, 
+					next_p->p_hdr.info.pi_diamid, next_p->p_hdr.info.pi_diamidlen);
 		if (cmp < 0)
 			break;
 	}
@@ -114,7 +183,7 @@
 	/* Callback registered when the peer was added, by fd_peer_add */
 	if (peer->p_cb) {
 		TRACE_DEBUG(FULL, "Calling add callback for peer %s", peer->p_hdr.info.pi_diamid);
-		(*peer->p_cb)(&peer->p_hdr.info, peer->p_cb_data);
+		(*peer->p_cb)(&peer->p_hdr.info, peer->p_cb_data); /* TODO: do this in a separate detached thread? */
 		peer->p_cb = NULL;
 		peer->p_cb_data = NULL;
 	}
@@ -177,6 +246,23 @@
 	}
 }
 
+/* Read state */
+int fd_peer_get_state(struct peer_hdr *peer)
+{
+	int ret;
+	
+	struct fd_peer * p = (struct fd_peer *)peer;
+	
+	if (!CHECK_PEER(p))
+		return -1;
+	
+	CHECK_POSIX_DO( pthread_mutex_lock(&p->p_state_mtx), return -1 );
+	ret = p->p_state;
+	CHECK_POSIX_DO( pthread_mutex_unlock(&p->p_state_mtx), return -1 );
+	
+	return ret;
+}
+
 
 /* Change state */
 int fd_psm_change_state(struct fd_peer * peer, int new_state)
@@ -185,8 +271,8 @@
 	
 	TRACE_ENTRY("%p %d(%s)", peer, new_state, STATE_STR(new_state));
 	CHECK_PARAMS( CHECK_PEER(peer) );
-	fd_cpu_flush_cache();
-	old = peer->p_hdr.info.runtime.pir_state;
+	
+	old = fd_peer_getstate(peer);
 	if (old == new_state)
 		return 0;
 	
@@ -195,8 +281,10 @@
 			STATE_STR(new_state),
 			peer->p_hdr.info.pi_diamid);
 	
-	peer->p_hdr.info.runtime.pir_state = new_state;
-	fd_cpu_flush_cache();
+	
+	CHECK_POSIX( pthread_mutex_lock(&peer->p_state_mtx) );
+	peer->p_state = new_state;
+	CHECK_POSIX( pthread_mutex_unlock(&peer->p_state_mtx) );
 	
 	if (old == STATE_OPEN) {
 		CHECK_FCT( leave_open_state(peer) );
@@ -254,8 +342,7 @@
 void fd_psm_cleanup(struct fd_peer * peer, int terminate)
 {
 	/* Move to CLOSED state: failover messages, stop OUT thread, unlink peer from active list */
-	fd_cpu_flush_cache();
-	if (peer->p_hdr.info.runtime.pir_state != STATE_ZOMBIE) {
+	if (fd_peer_getstate(peer) != STATE_ZOMBIE) {
 		CHECK_FCT_DO( fd_psm_change_state(peer, STATE_CLOSED), /* continue */ );
 	}
 	
@@ -284,8 +371,9 @@
 {
 	struct fd_peer * peer = (struct fd_peer *)arg;
 	CHECK_PARAMS_DO( CHECK_PEER(peer), return );
-	peer->p_hdr.info.runtime.pir_state = STATE_ZOMBIE;
-	fd_cpu_flush_cache();
+	CHECK_POSIX_DO( pthread_mutex_lock(&peer->p_state_mtx), );
+	peer->p_state = STATE_ZOMBIE;
+	CHECK_POSIX_DO( pthread_mutex_unlock(&peer->p_state_mtx), );
 	return;
 }
 
@@ -297,6 +385,7 @@
 	int event;
 	size_t ev_sz;
 	void * ev_data;
+	int cur_state;
 	
 	CHECK_PARAMS_DO( CHECK_PEER(peer), ASSERT(0) );
 	
@@ -305,12 +394,14 @@
 	/* Set the thread name */
 	{
 		char buf[48];
-		sprintf(buf, "PSM/%.*s", (int)sizeof(buf) - 5, peer->p_hdr.info.pi_diamid);
+		snprintf(buf, sizeof(buf), "PSM/%s", peer->p_hdr.info.pi_diamid);
 		fd_log_threadname ( buf );
 	}
 	
 	/* The state machine starts in CLOSED state */
-	peer->p_hdr.info.runtime.pir_state = STATE_CLOSED;
+	CHECK_POSIX_DO( pthread_mutex_lock(&peer->p_state_mtx), goto psm_end );
+	peer->p_state = STATE_CLOSED;
+	CHECK_POSIX_DO( pthread_mutex_unlock(&peer->p_state_mtx), goto psm_end );
 
 	/* Wait that the PSM are authorized to start in the daemon */
 	CHECK_FCT_DO( fd_psm_waitstart(), goto psm_end );
@@ -325,23 +416,29 @@
 psm_loop:
 	/* Get next event */
 	TRACE_DEBUG(FULL, "'%s' in state '%s' waiting for next event.",
-			peer->p_hdr.info.pi_diamid, STATE_STR(peer->p_hdr.info.runtime.pir_state));
+			peer->p_hdr.info.pi_diamid, STATE_STR(fd_peer_getstate(peer)));
 	CHECK_FCT_DO( fd_event_timedget(peer->p_events, &peer->p_psm_timer, FDEVP_PSM_TIMEOUT, &event, &ev_sz, &ev_data), goto psm_end );
+	
+	cur_state = fd_peer_getstate(peer);
+	if (cur_state == -1)
+		goto psm_end;
+	
 	TRACE_DEBUG(FULL, "'%s'\t<-- '%s'\t(%p,%zd)\t'%s'",
-			STATE_STR(peer->p_hdr.info.runtime.pir_state),
+			STATE_STR(cur_state),
 			fd_pev_str(event), ev_data, ev_sz,
 			peer->p_hdr.info.pi_diamid);
 
 	/* Now, the action depends on the current state and the incoming event */
 
 	/* The following states are impossible */
-	ASSERT( peer->p_hdr.info.runtime.pir_state != STATE_NEW );
-	ASSERT( peer->p_hdr.info.runtime.pir_state != STATE_ZOMBIE );
-	ASSERT( peer->p_hdr.info.runtime.pir_state != STATE_OPEN_HANDSHAKE ); /* because it should exist only between two loops */
+	ASSERT( cur_state != STATE_NEW );
+	ASSERT( cur_state != STATE_ZOMBIE );
+	ASSERT( cur_state != STATE_OPEN_HANDSHAKE ); /* because it should exist only between two loops */
 
 	/* Purge invalid events */
 	if (!CHECK_PEVENT(event)) {
 		TRACE_DEBUG(INFO, "Invalid event received in PSM '%s' : %d", peer->p_hdr.info.pi_diamid, event);
+		ASSERT(0); /* we should investigate this situation */
 		goto psm_loop;
 	}
 
@@ -353,15 +450,17 @@
 
 	/* Requests to terminate the peer object */
 	if (event == FDEVP_TERMINATE) {
-		switch (peer->p_hdr.info.runtime.pir_state) {
+		switch (cur_state) {
 			case STATE_OPEN:
+			case STATE_OPEN_NEW:
 			case STATE_REOPEN:
-				/* We cannot just close the conenction, we have to send a DPR first */
+				/* We cannot just close the connection, we have to send a DPR first */
 				CHECK_FCT_DO( fd_p_dp_initiate(peer, ev_data), goto psm_end );
 				goto psm_loop;
 			
 			/*	
 			case STATE_CLOSING:
+			case STATE_CLOSING_GRACE:
 			case STATE_WAITCNXACK:
 			case STATE_WAITCNXACK_ELEC:
 			case STATE_WAITCEA:
@@ -379,13 +478,6 @@
 		struct msg * msg = NULL;
 		struct msg_hdr * hdr;
 		
-		/* If the current state does not allow receiving messages, just drop it */
-		if (peer->p_hdr.info.runtime.pir_state == STATE_CLOSED) {
-			TRACE_DEBUG(FULL, "Purging message in queue while in CLOSED state (%zdb)", ev_sz);
-			free(ev_data);
-			goto psm_loop;
-		}
-		
 		/* Parse the received buffer */
 		CHECK_FCT_DO( fd_msg_parse_buffer( (void *)&ev_data, ev_sz, &msg), 
 			{
@@ -395,8 +487,16 @@
 				goto psm_loop;
 			} );
 		
+		/* If the current state does not allow receiving messages, just drop it */
+		if (cur_state == STATE_CLOSED) {
+			/* In such case, just discard the message */
+			fd_msg_log( FD_MSG_LOG_DROPPED, msg, "Purged from peer '%s''s queue (CLOSED state).", peer->p_hdr.info.pi_diamid );
+			fd_msg_free(msg);
+			goto psm_loop;
+		}
+		
 		/* Log incoming message */
-		fd_msg_log( FD_MSG_LOG_RECEIVED, msg, "Received %zdb from '%s'", ev_sz, peer->p_hdr.info.pi_diamid );
+		fd_msg_log( FD_MSG_LOG_RECEIVED, msg, "Received %zdb from '%s' (%s)", ev_sz, peer->p_hdr.info.pi_diamid, STATE_STR(cur_state) );
 	
 		/* Extract the header */
 		CHECK_FCT_DO( fd_msg_hdr(msg, &hdr), goto psm_end );
@@ -416,27 +516,34 @@
 			CHECK_FCT_DO( fd_msg_answ_associate( msg, req ), goto psm_end );
 		}
 		
+		if (cur_state == STATE_OPEN_NEW) {
+			/* OK, we have received something, so the connection is supposedly now in OPEN state at the remote site */
+			fd_psm_change_state(peer, STATE_OPEN );
+		}
+		
 		/* Now handle non-link-local messages */
 		if (fd_msg_is_routable(msg)) {
-			switch (peer->p_hdr.info.runtime.pir_state) {
+			switch (cur_state) {
 				/* To maximize compatibility -- should not be a security issue here */
 				case STATE_REOPEN:
 				case STATE_SUSPECT:
 				case STATE_CLOSING:
+				case STATE_CLOSING_GRACE:
 					TRACE_DEBUG(FULL, "Accepted a message while not in OPEN state... ");
 				/* The standard situation : */
+				case STATE_OPEN_NEW:
 				case STATE_OPEN:
 					/* We received a valid routable message, update the expiry timer */
 					CHECK_FCT_DO( fd_p_expi_update(peer), goto psm_end );
 
 					/* Set the message source and add the Route-Record */
-					CHECK_FCT_DO( fd_msg_source_set( msg, peer->p_hdr.info.pi_diamid, 1, fd_g_config->cnf_dict ), goto psm_end);
+					CHECK_FCT_DO( fd_msg_source_set( msg, peer->p_hdr.info.pi_diamid, peer->p_hdr.info.pi_diamidlen, 1, fd_g_config->cnf_dict ), goto psm_end);
 
 					/* Requeue to the global incoming queue */
 					CHECK_FCT_DO(fd_fifo_post(fd_g_incoming, &msg), goto psm_end );
 
 					/* Update the peer timer (only in OPEN state) */
-					if ((peer->p_hdr.info.runtime.pir_state == STATE_OPEN) && (!peer->p_flags.pf_dw_pending)) {
+					if ((cur_state == STATE_OPEN) && (!peer->p_flags.pf_dw_pending)) {
 						fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_twtimer ?: fd_g_config->cnf_timer_tw);
 					}
 					break;
@@ -448,7 +555,7 @@
 				case STATE_CLOSED:
 				default:
 					/* In such case, just discard the message */
-					fd_msg_log( FD_MSG_LOG_DROPPED, msg, "Received from peer '%s' while connection was not in OPEN state.", peer->p_hdr.info.pi_diamid );
+					fd_msg_log( FD_MSG_LOG_DROPPED, msg, "Received from peer '%s' while connection was not in state %s.", peer->p_hdr.info.pi_diamid, STATE_STR(cur_state) );
 					fd_msg_free(msg);
 			}
 			goto psm_loop;
@@ -484,8 +591,9 @@
 			
 			case CC_DISCONNECT_PEER:
 				CHECK_FCT_DO( fd_p_dp_handle(&msg, (hdr->msg_flags & CMD_FLAG_REQUEST), peer), goto psm_reset );
-				if (peer->p_hdr.info.runtime.pir_state == STATE_CLOSING)
+				if (fd_peer_getstate(peer) == STATE_CLOSING)
 					goto psm_end;
+
 				break;
 			
 			case CC_DEVICE_WATCHDOG:
@@ -493,7 +601,7 @@
 				break;
 			
 			default:
-				/* Unknown / unexpected / invalid message */
+				/* Unknown / unexpected / invalid message -- but validated by our dictionary */
 				TRACE_DEBUG(INFO, "Invalid non-routable command received: %u.", hdr->msg_code);
 				if (hdr->msg_flags & CMD_FLAG_REQUEST) {
 					do {
@@ -501,14 +609,14 @@
 						CHECK_FCT_DO( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, &msg, MSGFL_ANSW_ERROR ), break );
 
 						/* Set the error code */
-						CHECK_FCT_DO( fd_msg_rescode_set(msg, "DIAMETER_INVALID_HDR_BITS", NULL, NULL, 1 ), break );
+						CHECK_FCT_DO( fd_msg_rescode_set(msg, "DIAMETER_COMMAND_UNSUPPORTED", "Or maybe the P-bit or application Id are erroneous.", NULL, 1 ), break );
 
 						/* Send the answer */
 						CHECK_FCT_DO( fd_out_send(&msg, peer->p_cnxctx, peer, FD_CNX_ORDERED), break );
 					} while (0);
 				} else {
 					/* We did ASK for it ??? */
-					fd_log_debug("Invalid PXY flag in answer header ?\n");
+					TRACE_DEBUG(INFO, "Received answer with erroneous 'is_routable' result...");
 				}
 				
 				/* Cleanup the message if not done */
@@ -530,7 +638,7 @@
 	
 	/* The connection object is broken */
 	if (event == FDEVP_CNX_ERROR) {
-		switch (peer->p_hdr.info.runtime.pir_state) {
+		switch (cur_state) {
 			case STATE_WAITCNXACK_ELEC:
 				/* Abort the initiating side */
 				fd_p_cnx_abort(peer, 0);
@@ -540,6 +648,7 @@
 			
 			case STATE_WAITCEA:
 			case STATE_OPEN:
+			case STATE_OPEN_NEW:
 			case STATE_REOPEN:
 			case STATE_WAITCNXACK:
 			case STATE_SUSPECT:
@@ -557,6 +666,15 @@
 				/* We sent a DPR so we are terminating, do not wait for DPA */
 				goto psm_end;
 				
+			case STATE_CLOSING_GRACE:
+				if (peer->p_flags.pf_localterm) /* initiated here */
+					goto psm_end;
+				
+				fd_psm_cleanup(peer, 0);
+				
+				/* Reset the timer for next connection attempt */
+				fd_psm_next_timeout(peer, 1, fd_p_dp_newdelay(peer));
+				goto psm_loop;
 		}
 		goto psm_loop;
 	}
@@ -615,7 +733,7 @@
 		CHECK_POSIX_DO( pthread_join( peer->p_ini_thr, NULL), /* ignore, it is not a big deal */);
 		peer->p_ini_thr = (pthread_t)NULL;
 		
-		switch (peer->p_hdr.info.runtime.pir_state) {
+		switch (cur_state) {
 			case STATE_WAITCNXACK_ELEC:
 			case STATE_WAITCNXACK:
 				fd_p_ce_handle_newcnx(peer, cnx);
@@ -623,7 +741,7 @@
 				
 			default:
 				/* Just abort the attempt and continue */
-				TRACE_DEBUG(FULL, "Connection attempt successful but current state is %s, closing...", STATE_STR(peer->p_hdr.info.runtime.pir_state));
+				TRACE_DEBUG(FULL, "Connection attempt successful but current state is %s, closing... (too slow?)", STATE_STR(cur_state));
 				fd_cnx_destroy(cnx);
 		}
 		
@@ -637,7 +755,7 @@
 		CHECK_POSIX_DO( pthread_join( peer->p_ini_thr, NULL), /* ignore, it is not a big deal */);
 		peer->p_ini_thr = (pthread_t)NULL;
 		
-		switch (peer->p_hdr.info.runtime.pir_state) {
+		switch (cur_state) {
 			case STATE_WAITCNXACK_ELEC:
 				/* Abort the initiating side */
 				fd_p_cnx_abort(peer, 0);
@@ -652,7 +770,7 @@
 				
 			default:
 				/* Just ignore */
-				TRACE_DEBUG(FULL, "Connection attempt failed but current state is %s, ignoring...", STATE_STR(peer->p_hdr.info.runtime.pir_state));
+				TRACE_DEBUG(FULL, "Connection attempt failed but current state is %s, ignoring...", STATE_STR(cur_state));
 		}
 		
 		goto psm_loop;
@@ -660,9 +778,10 @@
 	
 	/* The timeout for the current state has been reached */
 	if (event == FDEVP_PSM_TIMEOUT) {
-		switch (peer->p_hdr.info.runtime.pir_state) {
+		switch (cur_state) {
 			case STATE_OPEN:
 			case STATE_REOPEN:
+			case STATE_OPEN_NEW:
 				CHECK_FCT_DO( fd_p_dw_timeout(peer), goto psm_end );
 				goto psm_loop;
 				
@@ -675,7 +794,6 @@
 			case STATE_SUSPECT:
 				/* Mark the connection problem */
 				peer->p_flags.pf_cnx_pb = 1;
-				
 			case STATE_CLOSING:
 			case STATE_WAITCNXACK:
 			case STATE_WAITCEA:
@@ -683,6 +801,16 @@
 				fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_tctimer ?: fd_g_config->cnf_timer_tc);
 				goto psm_reset;
 				
+			case STATE_CLOSING_GRACE:
+				/* The grace period is completed, now close */
+				if (peer->p_flags.pf_localterm)
+					goto psm_end;
+				
+				fd_psm_cleanup(peer, 0);
+				/* Reset the timer for next connection attempt */
+				fd_psm_next_timeout(peer, 1, fd_p_dp_newdelay(peer));
+				goto psm_loop;
+				
 			case STATE_WAITCNXACK_ELEC:
 				/* Abort the initiating side */
 				fd_p_cnx_abort(peer, 0);
@@ -696,7 +824,7 @@
 	}
 	
 	/* Default action : the handling has not yet been implemented. [for debug only] */
-	TRACE_DEBUG(INFO, "Missing handler in PSM for '%s'\t<-- '%s'", STATE_STR(peer->p_hdr.info.runtime.pir_state), fd_pev_str(event));
+	TRACE_DEBUG(INFO, "Missing handler in PSM for '%s'\t<-- '%s'", STATE_STR(cur_state), fd_pev_str(event));
 psm_reset:
 	if (peer->p_flags.pf_delete)
 		goto psm_end;
@@ -706,10 +834,9 @@
 psm_end:
 	fd_psm_cleanup(peer, 1);
 	TRACE_DEBUG(INFO, "'%s'\t-> STATE_ZOMBIE (terminated)\t'%s'",
-			STATE_STR(peer->p_hdr.info.runtime.pir_state),
+			STATE_STR(fd_peer_getstate(peer)),
 			peer->p_hdr.info.pi_diamid);
 	pthread_cleanup_pop(1); /* set STATE_ZOMBIE */
-	fd_cpu_flush_cache();
 	peer->p_psm = (pthread_t)NULL;
 	pthread_detach(pthread_self());
 	return NULL;
@@ -725,7 +852,7 @@
 	TRACE_ENTRY("%p", peer);
 	
 	/* Check the peer and state are OK */
-	CHECK_PARAMS( CHECK_PEER(peer) && (peer->p_hdr.info.runtime.pir_state == STATE_NEW) );
+	CHECK_PARAMS( fd_peer_getstate(peer) == STATE_NEW );
 	
 	/* Create the FIFO for events */
 	CHECK_FCT( fd_fifo_new(&peer->p_events) );
@@ -743,8 +870,7 @@
 	TRACE_ENTRY("%p", peer);
 	CHECK_PARAMS( CHECK_PEER(peer) );
 	
-	fd_cpu_flush_cache();
-	if (peer->p_hdr.info.runtime.pir_state != STATE_ZOMBIE) {
+	if (fd_peer_getstate(peer) != STATE_ZOMBIE) {
 		CHECK_FCT( fd_event_send(peer->p_events, FDEVP_TERMINATE, 0, reason) );
 	} else {
 		TRACE_DEBUG(FULL, "Peer '%s' was already terminated", peer->p_hdr.info.pi_diamid);
"Welcome to our mercurial repository"