changeset 246:13647ca6e0ad

Progress on the routing module
author Sebastien Decugis <sdecugis@nict.go.jp>
date Tue, 02 Dec 2008 16:20:09 +0900
parents c141c6a50f3d
children adbc3782ba69
files waaad/dict-base.c waaad/dict-hardcoded.h waaad/message.c waaad/message.h waaad/peer-internal.h waaad/peer-psm.c waaad/peer-struct.c waaad/peer.h waaad/routing.c
diffstat 9 files changed, 486 insertions(+), 30 deletions(-) [+]
line wrap: on
line diff
--- a/waaad/dict-base.c	Fri Nov 28 18:22:14 2008 +0900
+++ b/waaad/dict-base.c	Tue Dec 02 16:20:09 2008 +0900
@@ -1821,6 +1821,9 @@
 			*/
 			dict_avp_data_t data = { 
 					263, 					/* Code */
+					#if AC_SESSION_ID != 263
+					#error "AC_SESSION_ID definition mismatch"
+					#endif
 					0, 					/* Vendor */
 					"Session-Id", 				/* Name */
 					AVP_FLAG_VENDOR | AVP_FLAG_MANDATORY, 	/* Fixed flags */
--- a/waaad/dict-hardcoded.h	Fri Nov 28 18:22:14 2008 +0900
+++ b/waaad/dict-hardcoded.h	Tue Dec 02 16:20:09 2008 +0900
@@ -59,6 +59,7 @@
 #define AC_VENDOR_SPECIFIC_APPLICATION_ID 260
 #define AC_REDIRECT_HOST_USAGE		261
 #define AC_REDIRECT_MAX_CACHE_TIME	262
+#define AC_SESSION_ID 			263
 #define AC_ORIGIN_HOST			264
 #define AC_SUPPORTED_VENDOR_ID		265
 #define AC_VENDOR_ID			266
--- a/waaad/message.c	Fri Nov 28 18:22:14 2008 +0900
+++ b/waaad/message.c	Tue Dec 02 16:20:09 2008 +0900
@@ -1293,6 +1293,14 @@
 	return _mpd_do(_C(msg), 0);
 }
 
+/* Idem, for one AVP. */
+int msg_parse_dict_avp ( msg_avp_t * avp )
+{
+	TRACE_ENTRY("%p", avp);
+	
+	return _mpd_do(_C(avp), 1);
+}
+
 
 /***************************************************************************************************************/
 
@@ -2059,6 +2067,60 @@
 	return 0;
 }
 
+/* Create an anwer to a query */
+int msg_new_answer_from_req ( msg_t ** msg )
+{
+	_msg_t *qry, *answ = NULL;
+	
+	TRACE_ENTRY("%p", msg);
+	
+	CHECK_PARAMS(  msg && CHECK_MSG(*msg)  );
+	
+	qry = (_msg_t *)(*msg);
+	CHECK_PARAMS(  qry->msg_public.msg_flags & CMD_FLAG_REQUEST  );
+	
+	/* Create the new message */
+	CHECK_MALLOC(  answ = (_msg_t *) malloc (sizeof(_msg_t))  );
+	
+	/* Initialize the fields */
+	init_msg(answ);
+	answ->msg_public.msg_version	= MSG_VERSION;
+	answ->msg_public.msg_flags	= qry->msg_public.msg_flags & ~CMD_FLAG_REQUEST;
+	answ->msg_public.msg_code	= qry->msg_public.msg_code;
+	answ->msg_public.msg_appl	= qry->msg_public.msg_appl;
+	answ->msg_public.msg_hbhid	= qry->msg_public.msg_hbhid;
+	answ->msg_public.msg_eteid	= qry->msg_public.msg_eteid;
+	
+	/* Associate the answer and the query */
+	CHECK_FCT(  msg_answ_associate( (msg_t *)answ, (msg_t *)qry )  );
+	*msg = (msg_t *)answ;
+	
+	/* If the first AVP of the query is a Session-Id, copy it to the answer */
+	{
+		msg_avp_t * avp = NULL;
+		msg_avp_t * navp = NULL;
+		msg_avp_data_t * avpdata = NULL;
+		
+		CHECK_FCT(  msg_browse(qry, MSG_BRW_FIRST_CHILD, &avp, NULL)  );
+		CHECK_FCT(  msg_avp_data( avp, &avpdata )  );
+
+		if (((avpdata->avp_flags & AVP_FLAG_VENDOR) == 0) && (avpdata->avp_code == AC_SESSION_ID)) {
+			CHECK_FCT( msg_parse_dict_avp(avp) );
+			ASSERT(avpdata->avp_data);
+			
+			CHECK_FCT( msg_avp_new( ((_msg_avp_t *)avp)->avp_model, 0, &navp ) );
+			
+			/* Set its value */
+			CHECK_FCT( msg_avp_setvalue( navp, avpdata->avp_data ) );
+
+			/* Add it to the message */
+			CHECK_FCT( msg_avp_add( *msg, MSG_BRW_FIRST_CHILD, navp ) );
+		}
+	}
+
+	return 0;
+}
+
 /* Add Result-Code and eventually Failed-AVP, Error-Message and Error-Reporting-Host AVPs */
 int msg_rescode_set( msg_t * msg, char * rescode, char * errormsg, msg_avp_t * optavp )
 {
--- a/waaad/message.h	Fri Nov 28 18:22:14 2008 +0900
+++ b/waaad/message.h	Tue Dec 02 16:20:09 2008 +0900
@@ -160,6 +160,23 @@
 int msg_parse_dict ( msg_t * msg );
 
 /*
+ * FUNCTION:	msg_parse_dict_avp
+ *
+ * PARAMETERS:
+ *  msg		: An AVP from the msg_t tree.
+ *
+ * DESCRIPTION: 
+ *   This function is similar to msg_parse_dict, but only for one AVP of a message.
+ *
+ * RETURN VALUE:
+ *  0      	: The message has been fully parsed as described.
+ *  EINVAL 	: the msg parameter is invalid for this operation.
+ *  ENOMEM	: Unable to allocate enough memory to complete the operation.
+ *  ENOTSUP	: No dictionary definition for the command or one of the mandatory AVP.
+ */
+int msg_parse_dict_avp ( msg_avp_t * avp );
+
+/*
  * FUNCTION:	msg_is_routable
  *
  * PARAMETERS:
@@ -176,6 +193,25 @@
 int msg_is_routable ( msg_t * msg );
 
 /*
+ * FUNCTION:	msg_new_answer_from_req
+ *
+ * PARAMETERS:
+ *  msg		: The location of the query on entry, and of answer on return.
+ *
+ * DESCRIPTION: 
+ *   This function creates the empty answer message for a request.
+ *  The header is set properly (R flag, ccode, appid, hbhid, eteid)
+ *  The Session-Id AVP is copied if present.
+ *  The calling code should usually call msg_rescode_set function on the answer.
+ *  Upon return, the original query may be retrieved by calling msg_answ_getq on the message.
+ *
+ * RETURN VALUE:
+ *  0      	: Operation complete.
+ *  !0      	: an error occurred.
+ */
+int msg_new_answer_from_req ( msg_t ** msg );
+
+/*
  * FUNCTION:	msg_rescode_set
  *
  * PARAMETERS:
@@ -253,4 +289,5 @@
 int msg_add_origin ( msg_t * msg ); /* Add Origin-Host, Origin-Realm, Origin-State-Id AVPS at the end of the message */
 
 
+
 #endif /* _MESSAGE_H */
--- a/waaad/peer-internal.h	Fri Nov 28 18:22:14 2008 +0900
+++ b/waaad/peer-internal.h	Tue Dec 02 16:20:09 2008 +0900
@@ -282,7 +282,6 @@
 /* The peers hash table. ALL peers are linked in the "all" hash list, by their p_global list */
 typedef struct {
 	uti_list_t	all;	/* Sentinel for the p_global list of peers which hash belongs to this sublist. */
-	uti_list_t	actives;/* idem for the p_active sublist */
 	pthread_mutex_t	lock;	/* to protect these lists */
 	int		count;	/* number of peers in this sublist */
 } _peer_hash_t;
@@ -291,11 +290,11 @@
 
 #define H_MASK( __hash) ((__hash) & (( 1 << _PEER_HASH_SIZE ) - 1))
 #define H_ALL(  _hash ) (&(_peer_hash[H_MASK(_hash)].all    ))
-#define H_ACT(  _hash ) (&(_peer_hash[H_MASK(_hash)].actives))
 #define H_LOCK( _hash ) (&(_peer_hash[H_MASK(_hash)].lock   ))
 #define H_COUNT(_hash ) (  _peer_hash[H_MASK(_hash)].count   )
 
-/* the others lists of peers, for expiration mechanism */
+/* the others lists of peers */
+extern uti_list_t	_peers_actives;	  /* peers in the OPEN state, linked by their p_active fields. */
 extern uti_list_t 	_peers_deleted;   /* deleted peers, threads are linked by their p_exp_list list. */
 extern uti_list_t 	_peers_expiring;  /* peers that will expire, linked by their p_exp_list list. */
 extern pthread_mutex_t  _peers_lists_mtx; /* mutex to protect these lists */
--- a/waaad/peer-psm.c	Fri Nov 28 18:22:14 2008 +0900
+++ b/waaad/peer-psm.c	Tue Dec 02 16:20:09 2008 +0900
@@ -1226,7 +1226,12 @@
 	
 	/* Handle the "active" peers list */
 	if (peer->p_state != STATE_OPEN) {
-		uti_list_unlink( &peer->p_active );
+		if (!IS_LIST_EMPTY(&peer->p_active)) {
+			CHECK_POSIX_DO(  pthread_mutex_lock( &_peers_lists_mtx ),  uti_event_send(WDE_ERROR)  );
+			TRACE_DEBUG(FULL, "Peer '%s' removed from the list of active peers", peer->p_diamid);
+			uti_list_unlink( &peer->p_active );
+			CHECK_POSIX_DO(  pthread_mutex_unlock( &_peers_lists_mtx ),  uti_event_send(WDE_ERROR)  );
+		}
 		return;
 	}
 	
@@ -1234,7 +1239,9 @@
 		return; /* we are already linked */
 	
 	/* Ok, we have to insert this peer into the active list */
-	for (prev = H_ACT(peer->p_hash); prev->next != H_ACT(peer->p_hash); prev = prev->next) {
+	CHECK_POSIX_DO(  pthread_mutex_lock( &_peers_lists_mtx ),  uti_event_send(WDE_ERROR)  );
+	
+	for (prev = &_peers_actives; prev->next != &_peers_actives; prev = prev->next) {
 		int cmp = 0;
 		_peer_t * cur = _P(prev->next->o);
 		
@@ -1258,6 +1265,8 @@
 	
 	uti_list_insert_after(prev, &peer->p_active);
 	
+	CHECK_POSIX_DO(  pthread_mutex_unlock( &_peers_lists_mtx ),  uti_event_send(WDE_ERROR)  );
+	
 	TRACE_DEBUG(FULL, "Peer '%s' inserted in the list of active peers", peer->p_diamid);
 	
 	return;
--- a/waaad/peer-struct.c	Fri Nov 28 18:22:14 2008 +0900
+++ b/waaad/peer-struct.c	Tue Dec 02 16:20:09 2008 +0900
@@ -45,17 +45,22 @@
 /* The hash table */
 _peer_hash_t 	_peer_hash[1 << _PEER_HASH_SIZE];
 
+/* The list of active peers */
+uti_list_t 	_peers_actives;
+
+
 /* Initialize the hash table */
 int _peer_struct_init()
 {
 	int i;
 	
+	uti_list_init(&_peers_actives, NULL);
+	
 	/* Initialize the hash table */
 	memset(_peer_hash, 0, sizeof(_peer_hash));
 	
 	for (i = 0; i < sizeof(_peer_hash) / sizeof(_peer_hash[0]); i++) {
 		uti_list_init(&_peer_hash[i].all, 	&_peer_hash[i]);
-		uti_list_init(&_peer_hash[i].actives, 	&_peer_hash[i]);
 		CHECK_POSIX(  pthread_mutex_init(&_peer_hash[i].lock, NULL)  );
 	}
 	
@@ -423,7 +428,7 @@
 }
 
 /* Find a structure from diamid and hash */
-peer_t * peer_getptr(char * diamid, uint32_t hash)
+peer_t * peer_struct_getptr(char * diamid, uint32_t hash)
 {
 	uti_list_t *li;
 	peer_t * res = NULL;
@@ -464,8 +469,103 @@
 	return NULL;
 }
 
-/* Create the list of OPEN peers */		
-int peer_list_open( rt_dpl_t ** list )
+/* Create the list of OPEN peers supporting this app or relay, without the rejected peers */		
+int peer_struct_list_open( application_id_t app, rt_dpl_t ** list, uti_list_t * rejects )
 {
-	return ENOTSUP;
+	uti_list_t * src = NULL;
+	
+	TRACE_ENTRY("%u %p %p", app, list, rejects);
+	
+	CHECK_PARAMS( list && rejects );
+	
+	*list = NULL;
+	
+	/* Lock the list of active peers. This ensures that the peers will not be modified while accessing their data */
+	CHECK_POSIX(  pthread_mutex_lock( &_peers_lists_mtx )  );
+	
+	/* We go through all the active peers */
+	for (src = _peers_actives.next; src != &_peers_actives; src = src->next) {
+		_peer_t * src_peer = _P(src->o);
+		rt_dpl_t * dst = NULL;
+		int match = 0;
+		
+		/* First check if this peer may be candidate */
+		if ((app != 0) && (src_peer->p_app_relay == 0)) {
+			int i;
+			/* check if the peer supports the required application */
+			if (src_peer->p_app_size == 0)
+				continue;
+			ASSERT(src_peer->p_app_list != NULL);
+			
+			/* The array is ordered (see _peer_struct_appl_add) */
+			for (i = 0; i < src_peer->p_app_size; i++)
+				if (src_peer->p_app_list[i].a >= app)
+					break;
+			/* Skip this peer if the corresponding application was NOT found */
+			if (src_peer->p_app_list[i].a != app)
+				continue;
+		}
+		
+		/* Now browse the (ordered) rejects list to check if the current peer is not in it */
+		while (!IS_LIST_EMPTY(rejects)) {
+			/* For the format of the rejects list, see create_list_rejects in routing.c */
+			rt_reject_t * next = (rt_reject_t *)(rejects->next);
+			
+			if (next->hash > src_peer->p_hash)
+				break;
+			
+			if (next->hash == src_peer->p_hash) {
+				int cmp = 0;
+				avp_value_t *next_av = (avp_value_t *)(next->chain.o);
+				
+				cmp = strncasecmp((char *)next_av->os.data, src_peer->p_diamid, next_av->os.len);
+				
+				if (cmp > 0)
+					break;
+				
+				if (cmp == 0) {
+					size_t l = strlen(src_peer->p_diamid);
+					
+					if (next_av->os.len > l)
+						break;
+					
+					if (next_av->os.len == l)
+						match = 1;
+				}
+			}
+			
+			/* At this point, the next element in rejects is either equal (match = 1) or inferior (match = 0) to src_peer, we can delete it. */
+			uti_list_unlink(&next->chain);
+			free(next);
+		}
+		
+		/* Skip the peer if it was found in the rejects list */
+		if (match)
+			continue;
+	
+		/* Ok, we have to copy this peer in the destination list */
+		CHECK_MALLOC_DO( dst = (rt_dpl_t *) malloc(sizeof(rt_dpl_t)),
+			{ 
+				CHECK_POSIX_DO(  pthread_mutex_unlock( &_peers_lists_mtx ), /* continue */  );
+				return ENOMEM;
+			}  );
+		
+		memset(dst, 0, sizeof(rt_dpl_t));
+		
+		dst->peer = (peer_t *)src_peer;
+		dst->next = *list;
+		*list = dst;
+	}
+	
+	/* Unlock the list of active peers, we're done */
+	CHECK_POSIX(  pthread_mutex_unlock( &_peers_lists_mtx )  );
+	
+	/* We can free any remaining entry in reject list */
+	while (!IS_LIST_EMPTY(rejects)) {
+		uti_list_t * next = rejects->next;
+		uti_list_unlink(next);
+		free(next);
+	}
+	
+	return 0;
 }
--- a/waaad/peer.h	Fri Nov 28 18:22:14 2008 +0900
+++ b/waaad/peer.h	Tue Dec 02 16:20:09 2008 +0900
@@ -50,6 +50,9 @@
 /* Include the definition of rt_dpl_t */
 #include <waaad/routing-api.h>
 
+/* Include the definition of uti_list_t */
+#include "utils.h"
+
 /* Functions called only in the daemon */
 
 /*
@@ -136,23 +139,33 @@
  * RETURN VALUE:
  *  Pointer to the peer structure or NULL if not found.
  */
-peer_t * peer_getptr(char * diamid, uint32_t hash);
+peer_t * peer_struct_getptr(char * diamid, uint32_t hash);
 
+/* List of peers that must NOT be in the rt_dpl_t list returned by next function */
+/* Note that the list is ordered by hash and AVP value */
+typedef struct {
+	uti_list_t	chain;	/* linking information. The "o" field points to the AVP data location. */
+	uint32_t	hash;	/* The hash value for this AVP data. */
+} rt_reject_t;
 
 /*
  * FUNCTION:	peer_list_open
  *
  * PARAMETERS:
- *  list  : location where the list must be stored.
+ *  app    : the application that must be supported (or 0 for all peers)
+ *  list   : location where the list must be stored.
+ *  rejects: list of diameter ids of the peers that must not be returned.
  *
  * DESCRIPTION:
- *  Create a master list for routing module with all the peers in OPEN state, at the time of calling.
+ *  Create a master list for routing module with all the peers in OPEN state (except rejected) 
+ *  that support the application app (or relay), at the time of calling.
+ *  Note that this function also frees the content of the rejects list as a side effect.
  *
  * RETURN VALUE:
  *   0 : List is created successfully.
  *  !0 : an error occurred.
  */
-int peer_list_open( rt_dpl_t ** list );
+int peer_struct_list_open( application_id_t app, rt_dpl_t ** list, uti_list_t * rejects );
 
 
 #endif /* ! _PEER_H */
--- a/waaad/routing.c	Fri Nov 28 18:22:14 2008 +0900
+++ b/waaad/routing.c	Tue Dec 02 16:20:09 2008 +0900
@@ -69,6 +69,8 @@
 	uti_list_init( &fwd_all, NULL );
 }
 
+/************************************************************************************************/
+
 /* Is a request handled locally? */
 static int handle_locally( msg_t * msg, int * res )
 {
@@ -99,6 +101,137 @@
 	return ENOTSUP;
 }
 
+/* Add an entry to the list of rejected peers */
+static int reject_add(uti_list_t *rejects, msg_avp_data_t * avpdata)
+{
+	uti_list_t *li;
+	rt_reject_t * new = NULL;
+	uint32_t hash;
+	
+	TRACE_ENTRY("%p %p", rejects, avpdata);
+	
+	if (avpdata->avp_data == NULL) {
+		/* Ignore if the data is not set */
+		TRACE_DEBUG(FULL, "AVP with unset value => ??");
+		ASSERT(0); /* To check if this really happens, and understand why... */
+		return EINVAL;
+	}
+	
+	/* Now compute the hash */
+	hash = uti_hash ( (char *)avpdata->avp_data->os.data, avpdata->avp_data->os.len );
+	
+	/* Find the location to insert in the list */
+	for (li = rejects->next; li != rejects; li = li->next) {
+		rt_reject_t * nxt = (rt_reject_t *)li;
+		size_t l_nxt = ((avp_value_t *)(li->o))->os.len;
+		size_t l_new = avpdata->avp_data->os.len;
+		int cmp = 0;
+		
+		if (nxt->hash < hash)
+			continue;
+		
+		if (nxt->hash > hash)
+			break;
+		
+		/* Same hash, compare the strings */
+		cmp = strncasecmp((char *)((avp_value_t *)(li->o))->os.data, (char *)avpdata->avp_data->os.data, l_nxt < l_new ? l_nxt : l_new );
+		
+		if (cmp < 0)
+			continue;
+		
+		if (cmp > 0)
+			break;
+		
+		/* Same radical, compare the lengths */
+		if (l_nxt < l_new)
+			continue;
+		
+		if (l_nxt > l_new)
+			break;
+		
+		/* Ok, we already have the same element, we can stop... */
+		return 0;
+	}
+	
+	/* We can create the new element and insert it before the "li" element */
+	CHECK_MALLOC( new = (rt_reject_t *) malloc( sizeof(rt_reject_t) ) );
+	uti_list_init(&new->chain, &avpdata->avp_data);
+	new->hash = hash;
+	uti_list_insert_before(li, &new->chain);
+	
+	return 0;
+}
+
+/* Create the list of rt_reject_t corresponding to the message. This contains Origin-host + all Route-Records diameter ids. */
+static int create_list_rejects(msg_t * msg, uti_list_t *rejects)
+{
+	msg_avp_t * avp = NULL;
+	
+	TRACE_ENTRY("%p %p", msg, rejects);
+	
+	CHECK_FCT(  msg_browse(msg, MSG_BRW_FIRST_CHILD, &avp, NULL)  );
+	
+	/* Now loop on all AVPs -- we will break when last AVP was parsed */
+	while (avp) {
+		msg_avp_data_t * avpdata;
+
+		CHECK_FCT(  msg_avp_data( avp, &avpdata )  );
+
+		if (avpdata->avp_flags & AVP_FLAG_VENDOR) {
+			goto next;
+		}
+
+		switch (avpdata->avp_code) {
+			case AC_ORIGIN_HOST: /* Origin-Host */
+			case AC_ROUTE_RECORD: /* Route-Record */
+				/* Dictionary-resolve this object, it is not done already for new messages */
+				CHECK_FCT( msg_parse_dict_avp(avp) );
+				/* And now add the diameter id to the list of rejected peers for forwarding */
+				CHECK_FCT( reject_add(rejects, avpdata) );
+				break;
+
+			default: /* Other AVP */
+				/* just skip */
+				;
+		}
+
+next:			
+		/* Go to next AVP */
+		CHECK_FCT( msg_browse(avp, MSG_BRW_NEXT, &avp, NULL) );
+	}
+	
+	/* We're done */
+	return 0;
+}
+
+/* Order the list of peers after outrt callbacks were called */
+static int order_rt_list(rt_dpl_t **list)
+{
+	rt_dpl_t sorted = { .next = NULL }; /* list of sorted elements */
+	
+	TRACE_ENTRY("%p", list);
+	CHECK_PARAMS( list );
+	
+	while (*list != NULL) {
+		rt_dpl_t * prev = &sorted;
+		
+		/* Get the first element from list */
+		rt_dpl_t * next = *list;
+		*list = next->next;
+		
+		/* Now insert this element in out list */
+		while ((prev->next != NULL) && (prev->next->score > next->score))
+			prev = prev->next;
+		
+		next->next = prev->next;
+		prev->next = next;
+	}
+	
+	*list = sorted.next;
+	
+	return 0;
+}
+
 /* Pass a message to all registered callbacks. The read lock must be taken already. */
 static int process_fwcb_list(uti_list_t * list, msg_t ** msg)
 {
@@ -108,7 +241,7 @@
 		int ret = 0;
 		_rt_hdl_t * hdl = (_rt_hdl_t *)li;
 		
-		TRACE_DEBUG(ANNOYING, "Calling next routing callback: %p for message %p", hdl->cb, *msg);
+		TRACE_DEBUG(ANNOYING, "Calling next FW routing callback: %p for message %p", hdl->cb, *msg);
 		
 		ret = (*(rt_fwd_cb_t)hdl->cb)(hdl->chain.o, *msg);
 		if (ret != 0) {
@@ -120,8 +253,22 @@
 	
 	return 0;
 }
+
+/* Pass a message to all registered callbacks. The read lock must be taken already. */
+static int process_outcb_list(uti_list_t * cblist, msg_t * msg, rt_dpl_t * list )
+{
+	uti_list_t * li;
 	
+	for (li = cblist->next; li != cblist; li = li->next) {
+		_rt_hdl_t * hdl = (_rt_hdl_t *)li;
+		TRACE_DEBUG(ANNOYING, "Calling next OUT routing callback: %p for message %p", hdl->cb, msg);
+		CHECK_FCT( (*(rt_out_cb_t)hdl->cb)(hdl->chain.o, msg, list)  );
+	}
 	
+	return 0;
+}
+
+/************************************************************************************************/
 
 /* Thread for incoming messages */
 static void * rt_in_th(void * arg)
@@ -216,6 +363,7 @@
 	/* Loop */
 	do {
 		rt_dpl_t * list = NULL;
+		uti_list_t rejects;
 		
 		pthread_testcancel();
 		
@@ -249,7 +397,7 @@
 			CHECK_FCT_DO(  msg_source_get( qry, &qry_src, &qry_src_h ), goto error  );
 			ASSERT(qry_src != NULL);
 			
-			peer = peer_getptr(qry_src, qry_src_h);
+			peer = peer_struct_getptr(qry_src, qry_src_h);
 			if (peer == NULL) {
 				/* Ok the peer does not exist anymore, just discard the message */
 				log_error("Unable to forward answer to peer '%d', the object does not exist anymore. Message dropped.\n", qry_src);
@@ -275,27 +423,111 @@
 		}
 		
 		/* Ok, we are forwarding a query if we are here */
+		uti_list_init(&rejects, NULL);
 		
-		/* Create a list with all the "open" peers */
-		CHECK_FCT_DO( peer_list_open( &list ), goto error  );
+		/* Create a list of the peers that are not candidates since they are already in the R-R */
+		CHECK_FCT_DO( create_list_rejects(msg, &rejects), goto error  );
+		
+		/* Create a list with all the "open" peers, without the peers from rejects */
+		CHECK_FCT_DO( peer_struct_list_open( hdr->msg_appl, &list, &rejects ), goto error  );
 		
-		/* -- en fait ca serait beaucoup plus efficace de creer la liste sans les R-R directement... -- */
-	
-		/* Remove the peers that are in the Route-Record AVPs of the message */
-	
-		/* 0 peer remaining? => UNABLE_TO_DELIVER */
+		if (list == NULL) {
+			/* 0 peer remaining? => UNABLE_TO_DELIVER */
+			log_normal("Unable to forward a request (application %u), sending an error.\n", hdr->msg_appl);
+			msg_dump_walk(msg);
+			
+			/* Create the answer message */
+			CHECK_FCT_DO( msg_new_answer_from_req(&msg), goto error  );
+			
+			/* Set the result code */
+			CHECK_FCT_DO( msg_rescode_set(msg, "DIAMETER_UNABLE_TO_DELIVER", NULL, NULL), goto error  );
+			
+			/* Requeue in the incoming queue */
+			CHECK_FCT_DO(  meq_post( g_meq_incoming, msg ),
+				{
+					CHECK_FCT_DO(  msg_free(msg, 1), /* nothing */ );
+					goto error;
+				} );
+			
+			/* Jump to next message */
+			continue;
+		}
 	
-		/* 1 peer remaining: send to this peer */
+		if (list->next != NULL) {
 		
-		/* otherwise call the rt_out_cb_t callbacks */
-		
-		/* order the list */
+			/* call the rt_out_cb_t callbacks */
+			CHECK_POSIX_DO( pthread_rwlock_rdlock(&out_lck), goto error );
+
+			pthread_cleanup_push((void (*)())pthread_rwlock_unlock, &out_lck);
+
+			CHECK_FCT_DO( process_outcb_list( &out_nrm, msg, list ), 
+				{
+					log_normal("An error occurred in one of the routing extension, message discarded.\n");
+					msg_dump_walk(msg);
+					msg_free(msg, 1);
+					CHECK_POSIX_DO( pthread_rwlock_unlock(&out_lck),  );
+					continue;
+				} );
+			CHECK_FCT_DO( process_outcb_list( &out_late, msg, list ), 
+				{
+					log_normal("An error occurred in one of the routing extension, message discarded.\n");
+					msg_dump_walk(msg);
+					msg_free(msg, 1);
+					CHECK_POSIX_DO( pthread_rwlock_unlock(&out_lck),  );
+					continue;
+				} );
+
+			pthread_cleanup_pop(0);
+
+			CHECK_POSIX_DO( pthread_rwlock_unlock(&out_lck), goto error );
+
+			/* order the list by priority */
+			CHECK_FCT_DO( order_rt_list(&list), goto error );
+		}
 		
-		/* send to the first peer */
+trynext:
+		/* Send to peers in the list until success */
+		if (list != NULL) {		
+			rt_dpl_t * head = list;
+			peer_t * dest = list->peer;
+			int ret;
+			
+			list = head->next;
+			free(head);
+			
+			ret = peer_send( dest, msg );
+			
+			if (ret == ENOTCONN)
+				goto trynext;
+			
+			CHECK_FCT_DO( ret,
+				{
+					CHECK_FCT_DO(  msg_free(msg, 1), /* nothing */ );
+					goto error;
+				} );
+			
+			/* Message has been sent successfully */
+			msg = NULL;
+		}
 		
-		TRACE_DEBUG(INFO, "TBD...");
-		msg_free(msg, 1);
+		/* Ok, now free the list */
+		while (list != NULL) {
+			rt_dpl_t * head = list;
+			list = head->next;
+			free(head);
+		}
 		
+		/* If the message was not sent */
+		if (msg) {
+			/* The message could not be sent, requeue for new try */
+			CHECK_FCT_DO(  meq_post( g_meq_outgoing, msg ),
+				{
+					CHECK_FCT_DO(  msg_free(msg, 1), /* nothing */ );
+					goto error;
+				} );
+		}
+		
+		/* Ok, go to next message */
 	} while (1);
 error:
 	/* An error occured */	
"Welcome to our mercurial repository"