diff freeDiameter/p_psm.c @ 34:0e2b57789361

Backup for the WE, some warnings remaining
author Sebastien Decugis <sdecugis@nict.go.jp>
date Fri, 30 Oct 2009 17:23:06 +0900
parents e6fcdf12b9a0
children 6486e97f56ae
line wrap: on
line diff
--- a/freeDiameter/p_psm.c	Thu Oct 29 18:05:45 2009 +0900
+++ b/freeDiameter/p_psm.c	Fri Oct 30 17:23:06 2009 +0900
@@ -83,32 +83,64 @@
 /*                 Manage the list of active peers                      */
 /************************************************************************/
 
-
 /* Enter/leave OPEN state */
 static int enter_open_state(struct fd_peer * peer)
 {
+	struct fd_list * li;
+	CHECK_PARAMS( FD_IS_LIST_EMPTY(&peer->p_actives) );
+	
+	/* Callback registered by the credential validator (fd_peer_validate_register) */
+	if (peer->p_cb2) {
+		CHECK_FCT_DO( (*peer->p_cb2)(&peer->p_hdr.info),
+			{
+				TRACE_DEBUG(FULL, "Validation failed, moving to state CLOSING");
+				peer->p_hdr.info.pi_state = STATE_CLOSING;
+				fd_psm_terminate(peer);
+			} );
+		peer->p_cb2 = NULL;
+		return 0;
+	}
+	/* Insert in the active peers list */
 	CHECK_POSIX( pthread_rwlock_wrlock(&fd_g_activ_peers_rw) );
-	TODO(" insert in fd_g_activ_peers ");
+	for (li = fd_g_activ_peers.next; li != &fd_g_activ_peers; li = li->next) {
+		struct fd_peer * next_p = (struct fd_peer *)li->o;
+		int cmp = strcmp(peer->p_hdr.info.pi_diamid, next_p->p_hdr.info.pi_diamid);
+		if (cmp < 0)
+			break;
+	}
+	fd_list_insert_before(li, &peer->p_actives);
+	CHECK_POSIX( pthread_rwlock_unlock(&fd_g_activ_peers_rw) );
 	
-	CHECK_POSIX( pthread_rwlock_unlock(&fd_g_activ_peers_rw) );
+	/* Callback registered when the peer was added, by fd_peer_add */
+	if (peer->p_cb) {
+		TRACE_DEBUG(FULL, "Calling add callback for peer %s", peer->p_hdr.info.pi_diamid);
+		(*peer->p_cb)(&peer->p_hdr.info, peer->p_cb_data);
+		peer->p_cb = NULL;
+		peer->p_cb_data = NULL;
+	}
 	
 	/* Start the thread to handle outgoing messages */
 	CHECK_FCT( fd_out_start(peer) );
 	
-	return ENOTSUP;
+	return 0;
 }
 static int leave_open_state(struct fd_peer * peer)
 {
-	TODO("Remove from active list");
+	/* Remove from active peers list */
+	CHECK_POSIX( pthread_rwlock_wrlock(&fd_g_activ_peers_rw) );
+	fd_list_unlink( &peer->p_actives );
+	CHECK_POSIX( pthread_rwlock_unlock(&fd_g_activ_peers_rw) );
 	
 	/* Stop the "out" thread */
 	CHECK_FCT( fd_out_stop(peer) );
 	
-	TODO("Failover pending messages: requeue in global structures");
+	/* Failover the messages */
+	fd_peer_failover_msg(peer);
 	
-	return ENOTSUP;
+	return 0;
 }
 
+
 /************************************************************************/
 /*                      Helpers for state changes                       */
 /************************************************************************/
@@ -164,7 +196,7 @@
 	
 	peer->p_psm_timer.tv_sec += delay;
 	
-#if 0
+#ifdef SLOW_PSM
 	/* temporary for debug */
 	peer->p_psm_timer.tv_sec += 10;
 #endif
@@ -187,7 +219,7 @@
 static void * p_psm_th( void * arg )
 {
 	struct fd_peer * peer = (struct fd_peer *)arg;
-	int created_started = started;
+	int created_started = started ? 1 : 0;
 	int event;
 	size_t ev_sz;
 	void * ev_data;
@@ -213,7 +245,7 @@
 	if (peer->p_flags.pf_responder) {
 		psm_next_timeout(peer, 0, INCNX_TIMEOUT);
 	} else {
-		psm_next_timeout(peer, created_started ? 0 : 1, 0);
+		psm_next_timeout(peer, created_started, 0);
 	}
 	
 psm_loop:
@@ -237,15 +269,6 @@
 		goto psm_loop;
 	}
 
-	/* Call the extension callback if needed */
-	if (peer->p_cb) {
-		/* Check if we must call it */
-			/*  */
-		/* OK */
-		TODO("Call CB");
-		TODO("Clear CB");
-	}
-
 	/* Handle the (easy) debug event now */
 	if (event == FDEVP_DUMP_ALL) {
 		fd_peer_dump(peer, ANNOYING);
@@ -276,8 +299,66 @@
 	
 	/* A message was received */
 	if (event == FDEVP_CNX_MSG_RECV) {
-		TODO("Parse the buffer into a message");
-		/* parse_and_get_local_ccode */
+		struct msg * msg = NULL;
+		struct msg_hdr * hdr;
+		
+		/* Parse the received buffer */
+		CHECK_FCT_DO( fd_msg_parse_buffer( (void *)&ev_data, ev_sz, &msg), 
+			{
+				fd_log_debug("Received invalid data from peer '%s', closing the connection\n", peer->p_hdr.info.pi_diamid);
+				CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), goto psm_end );
+				goto psm_loop;
+			} );
+		
+		TRACE_DEBUG(FULL, "Received this message from '%s':", peer->p_hdr.info.pi_diamid);
+		fd_msg_dump_walk(FULL, msg);
+	
+		/* Extract the header */
+		CHECK_FCT_DO( fd_msg_hdr(msg, &hdr), goto psm_end );
+		
+		/* If it is an answer, associate with the request */
+		if (!(hdr->msg_flags & CMD_FLAG_REQUEST)) {
+			struct msg * req;
+			/* Search matching request (same hbhid) */
+			CHECK_FCT_DO( fd_p_sr_fetch(&peer->p_sr, hdr->msg_hbhid, &req), goto psm_end );
+			if (req == NULL) {
+				fd_log_debug("Received a Diameter answer message with no corresponding sent request, discarding...\n");
+				fd_msg_dump_walk(NONE, msg);
+				fd_msg_free(msg);
+				goto psm_loop;
+			}
+			
+			/* Associate */
+			CHECK_FCT_DO( fd_msg_answ_associate( msg, req ), goto psm_end );
+		}
+		
+		/* We received a valid message, update the expiry timer */
+		CHECK_FCT_DO( fd_p_expi_update(peer), goto psm_end );
+
+		/* Now handle non-link-local messages */
+		if (fd_msg_is_routable(msg)) {
+			/* If we are not in OPEN state, discard the message */
+			if (peer->p_hdr.info.pi_state != STATE_OPEN) {
+				fd_log_debug("Received a routable message while not in OPEN state from peer '%s', discarded.\n", peer->p_hdr.info.pi_diamid);
+				fd_msg_dump_walk(NONE, msg);
+				fd_msg_free(msg);
+			} else {
+				/* Set the message source and add the Route-Record */
+				CHECK_FCT_DO( fd_msg_source_set( msg, peer->p_hdr.info.pi_diamid, 1, fd_g_config->cnf_dict ), goto psm_end);
+
+				/* Requeue to the global incoming queue */
+				CHECK_FCT_DO(fd_fifo_post(fd_g_incoming, &msg), goto psm_end );
+				
+				/* Update the peer timer */
+				if (!peer->p_flags.pf_dw_pending) {
+					psm_next_timeout(peer, 1, peer->p_hdr.info.pi_twtimer ?: fd_g_config->cnf_timer_tw);
+				}
+			}
+			goto psm_loop;
+		}
+		
+		/* Link-local message: They must be understood by our dictionary */
+		
 		TODO("Check if it is a local message (CER, DWR, ...)");
 		TODO("If not, check we are in OPEN state");
 		TODO("Update expiry timer if needed");
@@ -318,6 +399,7 @@
 		switch (peer->p_hdr.info.pi_state) {
 			case STATE_CLOSED:
 				TODO("Handle the CER, validate the peer if needed (and set expiry), set the alt_fifo in the connection, reply a CEA, eventually handshake, move to OPEN or REOPEN state");
+				/* In case of error : DIAMETER_UNKNOWN_PEER */
 				break;
 				
 			case STATE_WAITCNXACK:
@@ -352,7 +434,7 @@
 	}
 	
 	goto psm_loop;
-	
+
 psm_end:
 	pthread_cleanup_pop(1); /* set STATE_ZOMBIE */
 	peer->p_psm = (pthread_t)NULL;
@@ -397,12 +479,25 @@
 void fd_psm_abord(struct fd_peer * peer )
 {
 	TRACE_ENTRY("%p", peer);
-	TODO("Cancel PSM thread");
-	TODO("Cancel OUT thread");
-	TODO("Cleanup the peer connection object");
-	TODO("Cleanup the message queues (requeue)");
-	TODO("Call p_cb with NULL parameter if needed");
+	
+	/* Cancel PSM thread */
+	CHECK_FCT_DO( fd_thr_term(&peer->p_psm), /* continue */ );
+	
+	/* Cancel the OUT thread */
+	CHECK_FCT_DO( fd_out_stop(peer), /* continue */ );
 	
+	/* Cleanup the connection */
+	if (peer->p_cnxctx) {
+		fd_cnx_destroy(peer->p_cnxctx);
+	}
+	
+	/* Failover the messages */
+	fd_peer_failover_msg(peer);
+	
+	/* Empty the events list, this might leak some memory, but we only do it on exit, so... */
+	fd_event_destroy(&peer->p_events, free);
+	
+	/* More cleanups are performed in fd_peer_free */
 	return;
 }
 
"Welcome to our mercurial repository"