changeset 88:9e2db1647d6f

Completed routing module
author Sebastien Decugis <sdecugis@nict.go.jp>
date Mon, 07 Dec 2009 15:51:09 +0900
parents c1c0f8a45c67
children 3f8b437bcb66
files freeDiameter/routing.c include/freeDiameter/libfreeDiameter.h
diffstat 2 files changed, 145 insertions(+), 18 deletions(-) [+]
line wrap: on
line diff
--- a/freeDiameter/routing.c	Fri Dec 04 17:23:06 2009 +0900
+++ b/freeDiameter/routing.c	Mon Dec 07 15:51:09 2009 +0900
@@ -265,20 +265,26 @@
 static int return_error(struct msg * msg, char * error_code, char * error_message, struct avp * failedavp)
 {
 	struct fd_peer * peer;
+	int is_loc = 0;
 
 	/* Get the source of the message */
 	{
 		char * id;
 		CHECK_FCT( fd_msg_source_get( msg, &id ) );
 		
-		/* Search the peer with this id */
-		CHECK_FCT( fd_peer_getbyid( id, (void *)&peer ) );
+		if (id == NULL) {
+			is_loc = 1; /* The message was issued locally */
+		} else {
 		
-		if (!peer) {
-			TRACE_DEBUG(INFO, "Unable to send error '%s' to deleted peer '%s' in reply to:", error_code, id);
-			fd_msg_dump_walk(INFO, msg);
-			fd_msg_free(msg);
-			return 0;
+			/* Search the peer with this id */
+			CHECK_FCT( fd_peer_getbyid( id, (void *)&peer ) );
+
+			if (!peer) {
+				TRACE_DEBUG(INFO, "Unable to send error '%s' to deleted peer '%s' in reply to:", error_code, id);
+				fd_msg_dump_walk(INFO, msg);
+				fd_msg_free(msg);
+				return 0;
+			}
 		}
 	}
 	
@@ -289,7 +295,11 @@
 	CHECK_FCT( fd_msg_rescode_set(msg, error_code, error_message, failedavp, 1 ) );
 
 	/* Send the answer */
-	CHECK_FCT( fd_out_send(&msg, NULL, peer) );
+	if (is_loc) {
+		CHECK_FCT( fd_fifo_post(fd_g_incoming, &msg) );
+	} else {
+		CHECK_FCT( fd_out_send(&msg, NULL, peer) );
+	}
 	
 	/* Done */
 	return 0;
@@ -547,6 +557,7 @@
 static void * routing_out_thr(void * arg)
 {
 	TRACE_ENTRY("%p", arg);
+	struct rt_data * rtd = NULL;
 	
 	/* Set the thread name */
 	if (arg) {
@@ -562,6 +573,13 @@
 		struct msg * msg;
 		struct msg_hdr * hdr;
 		int is_req = 0;
+		struct fd_list * li, *candidates;
+		struct avp * avp;
+		struct rtd_candidate * c;
+		
+		/* If we loop'd with some undeleted routing data, destroy it */
+		if (rtd != NULL)
+			fd_rtd_free(&rtd);
 		
 		/* Test if we were told to stop */
 		pthread_testcancel();
@@ -582,6 +600,7 @@
 		if ( ! is_req ) {
 			struct msg * qry;
 			char * qry_src = NULL;
+			struct msg_hdr * qry_hdr;
 			struct fd_peer * peer = NULL;
 			
 			/* Retrieve the corresponding query and its origin */
@@ -599,6 +618,10 @@
 				continue;
 			}
 			
+			/* We must restore the hop-by-hop id */
+			CHECK_FCT_DO( fd_msg_hdr(qry, &qry_hdr), goto fatal_error );
+			hdr->msg_hbhid = qry_hdr->msg_hbhid;
+			
 			/* Push the message into this peer */
 			CHECK_FCT_DO( fd_out_send(&msg, NULL, peer), goto fatal_error );
 			
@@ -606,10 +629,111 @@
 			continue;
 		}
 		
-		/* The message is a request */
-		TODO("use struct rt_data and fd_msg_rt_get");
+		/* From that point, the message is a request */
+		
+		/* Get the routing data out of the message if any (in case of re-transmit) */
+		CHECK_FCT_DO( fd_msg_rt_get ( msg, &rtd ), goto fatal_error );
+		
+		/* If there is no routing data already, let's create it */
+		if (rtd == NULL) {
+			CHECK_FCT_DO( fd_rtd_init(&rtd), goto fatal_error );
+			
+			/* Add all peers in OPEN state */
+			CHECK_FCT_DO( pthread_rwlock_wrlock(&fd_g_activ_peers_rw), goto fatal_error );
+			for (li = fd_g_activ_peers.next; li != &fd_g_activ_peers; li = li->next) {
+				struct fd_peer * p = (struct fd_peer *)li->o;
+				CHECK_FCT_DO( fd_rtd_candidate_add(rtd, p->p_hdr.info.pi_diamid), goto fatal_error);
+			}
+			CHECK_FCT_DO( pthread_rwlock_unlock(&fd_g_activ_peers_rw), goto fatal_error );
+			
+			/* Now let's remove all peers from the Route-Records */
+			CHECK_FCT_DO(  fd_msg_browse(msg, MSG_BRW_FIRST_CHILD, &avp, NULL), goto fatal_error  );
+			while (avp) {
+				struct avp_hdr * ahdr;
+				CHECK_FCT_DO(  fd_msg_avp_hdr( avp, &ahdr ), goto fatal_error  );
+				
+				if ((ahdr->avp_code == AC_ROUTE_RECORD) && (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) ) {
+					/* Parse this AVP */
+					CHECK_FCT_DO( fd_msg_parse_dict ( avp, fd_g_config->cnf_dict ), goto fatal_error );
+					ASSERT( ahdr->avp_value );
+					/* Remove this value from the list */
+					fd_rtd_candidate_del(rtd, (char *)ahdr->avp_value->os.data, ahdr->avp_value->os.len);
+				}
+				
+				/* Go to next AVP */
+				CHECK_FCT_DO(  fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL), goto fatal_error  );
+			}
+		}
+		
+		/* Note: we reset the scores and pass the message to the callbacks, maybe we could re-use the saved scores when we have received an error ? */
+		
+		/* Ok, we have our list in rtd now, let's (re)initialize the scores */
+		fd_rtd_candidate_extract(rtd, &candidates);
+		
+		/* Pass the list to registered callbacks (even if it is empty) */
+		{
+			CHECK_FCT_DO( pthread_rwlock_rdlock( &rt_out_lock ), goto fatal_error );
+			pthread_cleanup_push( fd_cleanup_rwlock, &rt_out_lock );
+			
+			/* We call the cb by reverse priority order */
+			for (	li = rt_out_list.prev ; li != &rt_out_list ; li = li->prev ) {
+				struct rt_hdl * rh = (struct rt_hdl *)li;
+				
+				TRACE_DEBUG(ANNOYING, "Calling next OUT callback on %p : %p (prio %d)", msg, rh->rt_out_cb, rh->prio);
+				CHECK_FCT_DO( (*rh->rt_out_cb)(rh->cbdata, msg, candidates),
+					{
+						TRACE_DEBUG(INFO, "An OUT routing callback returned an error ! Message discarded.");
+						fd_msg_dump_walk(INFO, msg);
+						fd_msg_free(msg);
+						msg = NULL;
+						break;
+					} );
+			}
+			
+			pthread_cleanup_pop(0);
+			CHECK_FCT_DO( pthread_rwlock_unlock( &rt_out_lock ), goto fatal_error );
+			
+			/* If an error occurred, skip to the next message */
+			if (!msg)
+				continue;
+		}
+		
+		/* Order the candidate peers by score attributed by the callbacks */
+		CHECK_FCT_DO( fd_rtd_candidate_reorder(candidates), goto fatal_error );
+		
+		/* Save the routing information in the message */
+		CHECK_FCT_DO( fd_msg_rt_associate ( msg, &rtd ), goto fatal_error );
+		
+		/* Now try sending the message */
+		for (li = candidates->prev; li != candidates; li = li->prev) {
+			struct fd_peer * peer;
+			
+			c = (struct rtd_candidate *) li;
+			
+			/* Stop when we have reached the end of valid candidates */
+			if (c->score < 0)
+				break;
+			
+			/* Search for the peer */
+			CHECK_FCT_DO( fd_peer_getbyid( c->diamid, (void *)&peer ), goto fatal_error );
+			
+			if (peer && (peer->p_hdr.info.runtime.pir_state == STATE_OPEN)) {
+				/* Send to this one */
+				CHECK_FCT_DO( fd_out_send(&msg, NULL, peer), continue );
+				/* If the sending was successful */
+				break;
+			}
+		}
+		
+		/* If the message has not been sent, return an error */
+		if (msg) {
+			TRACE_DEBUG(INFO, "Could not send the following message, replying with UNABLE_TO_DELIVER");
+			fd_msg_dump_walk(INFO, msg);
+			return_error( msg, "DIAMETER_UNABLE_TO_DELIVER", "No suitable candidate to route the message to", NULL);
+		}
 		
 		/* We're done with this message */
+		
 	} while (1);
 	
 fatal_error:
@@ -618,22 +742,25 @@
 	return NULL;
 }
 
-
+static pthread_t rt_out = (pthread_t)NULL;
+static pthread_t rt_in  = (pthread_t)NULL;
 
 /* Initialize the routing module */
 int fd_rt_init(void)
 {
-	TODO("Start the routing threads");
+	CHECK_POSIX( pthread_create( &rt_out, NULL, routing_out_thr, NULL) );
+	CHECK_POSIX( pthread_create( &rt_in,  NULL, routing_in_thr,  NULL) );
 	
 	/* Later: TODO("Set the thresholds for the IN and OUT queues to create more routing threads as needed"); */
-	return ENOTSUP;
+	return 0;
 }
 
 /* Terminate the routing module */
 int fd_rt_fini(void)
 {
-	TODO("Stop the routing threads");
-	return ENOTSUP;
+	CHECK_FCT_DO( fd_thr_term(&rt_in ), /* continue */);
+	CHECK_FCT_DO( fd_thr_term(&rt_out), /* continue */);
+	return 0;
 }
 
 
--- a/include/freeDiameter/libfreeDiameter.h	Fri Dec 04 17:23:06 2009 +0900
+++ b/include/freeDiameter/libfreeDiameter.h	Mon Dec 07 15:51:09 2009 +0900
@@ -1631,12 +1631,12 @@
 /* Remove a peer from the candidates (if it is found) */
 void fd_rtd_candidate_del(struct rt_data * rtd, char * peerid, size_t sz /* if !0, peerid does not need to be \0 terminated */);
 
+/* Extract the list of valid candidates, and initialize their scores to 0 */
+void fd_rtd_candidate_extract(struct rt_data * rtd, struct fd_list ** candidates);
+
 /* If a peer returned a protocol error for this message, save it so that we don't try to send it there again */
 int  fd_rtd_error_add(struct rt_data * rtd, char * sentto, uint8_t * origin, size_t originsz, uint32_t rcode);
 
-/* Extract the list of valid candidates, and initialize their scores to 0 */
-void fd_rtd_candidate_extract(struct rt_data * rtd, struct fd_list ** candidates);
-
 /* The extracted list items have the following structure: */
 struct rtd_candidate {
 	struct fd_list	chain;	/* link in the list returned by the previous fct */
"Welcome to our mercurial repository"