view waaad/routing.c @ 411:7b3d4431610a

Improved support for creating error messages, even when no dictionary definition is present
author Sebastien Decugis <sdecugis@nict.go.jp>
date Thu, 11 Jun 2009 18:08:17 +0900
parents e86dba02630a
children
line wrap: on
line source

/*********************************************************************************************************
* Software License Agreement (BSD License)                                                               *
* Author: Sebastien Decugis <sdecugis@nict.go.jp>							 *
*													 *
* Copyright (c) 2009, WIDE Project and NICT								 *
* All rights reserved.											 *
* 													 *
* Redistribution and use of this software in source and binary forms, with or without modification, are  *
* permitted provided that the following conditions are met:						 *
* 													 *
* * Redistributions of source code must retain the above 						 *
*   copyright notice, this list of conditions and the 							 *
*   following disclaimer.										 *
*    													 *
* * Redistributions in binary form must reproduce the above 						 *
*   copyright notice, this list of conditions and the 							 *
*   following disclaimer in the documentation and/or other						 *
*   materials provided with the distribution.								 *
* 													 *
* * Neither the name of the WIDE Project or NICT nor the 						 *
*   names of its contributors may be used to endorse or 						 *
*   promote products derived from this software without 						 *
*   specific prior written permission of WIDE Project and 						 *
*   NICT.												 *
* 													 *
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 	 *
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 	 *
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF   *
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.								 *
*********************************************************************************************************/

/* Routing module.
 * 
 * See routing.h and routing-api.h for more information on the functions and types involved.
 */

#include "waaad-internal.h"

/* Structure to store a registered callback */
typedef struct {
	uti_list_t	 chain;	/* the "o" field points to the "data" opaque data */
	void		*cb;	/* callback */
} _rt_hdl_t;

/* Definitions for the module variables */
static pthread_rwlock_t	 out_lck;
static uti_list_t	 out_nrm;	/* list of RT_OUT_NORMAL handlers. Elements are _rt_hdl_t. */
static uti_list_t	 out_late;	/* list of RT_OUT_LATE handlers */
	
static pthread_rwlock_t	 fwd_lck;
static uti_list_t	 fwd_req;	/* list of RT_FWD_REQ handlers. Elements are _rt_hdl_t. */
static uti_list_t	 fwd_ans;	/* list of RT_FWD_ANS handlers */
static uti_list_t	 fwd_all;	/* list of RT_FWD_ALL handlers */

static pthread_t	 in_th = (pthread_t) NULL;
static pthread_t	 out_th = (pthread_t) NULL;

/* Initialize the lists */
static void rt_handlers_init()
{
	uti_list_init( &out_nrm, NULL );
	uti_list_init( &out_late, NULL );
	uti_list_init( &fwd_req, NULL );
	uti_list_init( &fwd_ans, NULL );
	uti_list_init( &fwd_all, NULL );
}

/************************************************************************************************/

/* Is a request handled locally? */
static int handle_locally( msg_t * msg, application_id_t app, int * res )
{
	msg_avp_t * avp = NULL;
	int is_dest_host = 0; /* 0: undef, 1: yes, -1: no */
	int is_dest_realm = 0; /* 0: undef, 1: yes, -1: no */
	int is_supp_app = 0; /* 0: undef, 1: yes, -1: no */
	
	TRACE_ENTRY("%p", msg);
	
	/* Are we listed as Destination-Host? */
	/* Are we supporting the Destination-Realm? */
	/* Do we support locally the application of the message? */
	
	CHECK_FCT(  msg_browse(msg, MSG_BRW_FIRST_CHILD, &avp, NULL)  );

	while (avp && ! (is_dest_host && is_dest_realm)) {
		msg_avp_data_t * avpdata;

		CHECK_FCT(  msg_avp_data( avp, &avpdata )  );

		if (avpdata->avp_flags & AVP_FLAG_VENDOR) {
			goto next;
		}

		switch (avpdata->avp_code) {
			case AC_DESTINATION_HOST:
				CHECK_FCT( msg_parse_dict_avp ( avp ) );
				
				if (strncasecmp(g_pconf->diameter_identity, (char *)avpdata->avp_data->os.data, avpdata->avp_data->os.len)) {
					is_dest_host = -1;
				} else {
					if (avpdata->avp_data->os.len == strlen(g_pconf->diameter_identity))
						is_dest_host = 1;
					else
						is_dest_host = -1;
				}
				break;

			case AC_DESTINATION_REALM:
				CHECK_FCT( msg_parse_dict_avp ( avp ) );
				
				if (strncasecmp(g_pconf->diameter_realm, (char *)avpdata->avp_data->os.data, avpdata->avp_data->os.len)) {
					is_dest_realm = -1;
				} else {
					if (avpdata->avp_data->os.len == strlen(g_pconf->diameter_realm))
						is_dest_realm = 1;
					else
						is_dest_realm = -1;
				}
				break;

			default: /* Other AVP */
				/* skip */;
		}

next:			
		/* Go to next AVP */
		CHECK_FCT( msg_browse(avp, MSG_BRW_NEXT, &avp, NULL) );
	}
	
	if (app == 0) {
		is_supp_app = 1;
	} else {
		int i;
		
		is_supp_app = -1;
		
		for (i = 0; i < g_conf->supported_apps_nb; i++) {
			if (g_conf->supported_apps_list[i].a == app)
				is_supp_app = 1;
			if (g_conf->supported_apps_list[i].a >= app)
				break;
		}
	}
	
	/* We have the needed information, now make the decision: */
	
	/* We are explicitly target of the message */
	if (is_dest_host == 1) {
		*res = is_supp_app; /* will handle locally or return an error */
		goto end;
	}
	
	/* We are explicitly not target of the message */
	if ((is_dest_host == -1) || (is_dest_realm == -1)) {
		*res = 0; /* will forward the message */
		goto end;
	}
	
	/* From this point we are destination realm or it is not specified */
	if (is_supp_app == 1)
		*res = 1; /* will handle locally */
	else
		*res = 0; /* will forward */
		
end:
	return 0;
}

/* Try sending a message to an ordered list of peers */
static int try_send_to_list( msg_t **msg, rt_dpl_t **list)
{
	TRACE_ENTRY("%p %p", msg, list);
	
	while (*list != NULL) {
		rt_dpl_t * head = *list;
		peer_t * dest = head->peer;
		int ret = head->score;

		*list = head->next;
		free(head);

		/* Otherwise try sending the message to this peer */
		ret = peer_send( dest, *msg );

		if (ret == ENOTCONN)
			continue;

		CHECK_FCT_DO( ret,
			{
				CHECK_FCT_DO(  msg_free(*msg, 1), /* nothing */ );
				*msg = NULL;
				return ret;
			} );

		/* Message has been sent successfully */

		/* Save the remaining list along the message, it will be freed automatically with message and may be used if network error is received */
		CHECK_FCT( msg_rt_associate( *msg, list ) );
		*msg = NULL;
	}
	
	return 0;
}

/* Handling received messages with the E bit set (Protocol errors).
 * If error is handled successfully, *msg is set to NULL.
 * Otherwise, msg is unchanged and the routing module will forward the error as a normal answer.
 */
static int handle_error_msg(msg_t ** msg)
{
	msg_t * qry = NULL;
	msg_avp_t * avp = NULL;
#ifdef USE_MORE_INFO 
	/* The management of the errors may need to be improved based on this additional information, the code is here
	   if someone wants to use it, you just have to define this USE_MORE_INFO symbol.
	 */
	avp_value_t * qry_dst = NULL;
	avp_value_t * err_src = NULL;
	avp_value_t * err_erh = NULL;
#endif /* USE_MORE_INFO */
	avp_value_t * err_resco = NULL;
	
	TRACE_ENTRY("%p", msg);
	CHECK_PARAMS( msg );
	
	CHECK_FCT(  msg_answ_getq( *msg, &qry )  );
	CHECK_PARAMS( qry );
	
#ifdef USE_MORE_INFO
	/* The error was sent by the destination-host peer of the query (if any)? */
	CHECK_FCT(  msg_browse(qry, MSG_BRW_FIRST_CHILD, &avp, NULL)  );
	while (avp) {
		msg_avp_data_t * avpdata;

		CHECK_FCT(  msg_avp_data( avp, &avpdata )  );

		if (((avpdata->avp_flags & AVP_FLAG_VENDOR) == 0) && (avpdata->avp_code == AC_DESTINATION_HOST)) {
			CHECK_FCT( msg_parse_dict_avp ( avp ) );	/* to interpret the AVP value and assign the avp_data ptr */
			qry_dst = avpdata->avp_data;
			break;
		}

		/* Go to next AVP */
		CHECK_FCT( msg_browse(avp, MSG_BRW_NEXT, &avp, NULL) );
	}
#endif /* USE_MORE_INFO */
	
	/* Now check which peer sent the error, and what error it is */
	CHECK_FCT(  msg_browse(*msg, MSG_BRW_FIRST_CHILD, &avp, NULL)  );
	while (avp) {
		msg_avp_data_t * avpdata;

		CHECK_FCT(  msg_avp_data( avp, &avpdata )  );

		if ((avpdata->avp_flags & AVP_FLAG_VENDOR) == 0) {
			switch (avpdata->avp_code) {
#ifdef USE_MORE_INFO
				case AC_ORIGIN_HOST:
					CHECK_FCT( msg_parse_dict_avp ( avp ) );	/* to interpret the AVP value and assign the avp_data ptr */
					err_src = avpdata->avp_data;
					break;

				case AC_ERROR_REPORTING_HOST:
					CHECK_FCT( msg_parse_dict_avp ( avp ) );
					err_erh = avpdata->avp_data;
					break;
#endif /* USE_MORE_INFO */
	
				case AC_RESULT_CODE:
					CHECK_FCT( msg_parse_dict_avp ( avp ) );
					err_resco = avpdata->avp_data;
					break;
			}
		}
#ifndef USE_MORE_INFO
		if (err_resco)
			break;
#endif /* USE_MORE_INFO */

		/* Go to next AVP */
		CHECK_FCT( msg_browse(avp, MSG_BRW_NEXT, &avp, NULL) );
	}
	
	/* Make sure we have the result-code, otherwise simply forward the message */
	if (err_resco != NULL) {
		switch (err_resco->u32) {
			case ER_DIAMETER_REALM_NOT_SERVED:
			case ER_DIAMETER_TOO_BUSY:
				/* Try and find another peer to send the message to */
				{
					rt_dpl_t * list = NULL;
					/* Get the previously saved list of possible peers */
					CHECK_FCT( msg_rt_get( qry, &list ) );
					/* If this list is empty, we continue forwarding the error. We could also re-compute a new list (?) */
					if (list == NULL)
						break;

					CHECK_FCT(  try_send_to_list( &qry, &list)  );

					if (qry == NULL) {
						/* The message has been sent to another peer, we can discard the first received error */
						CHECK_FCT( msg_answ_detach( *msg)  );
						CHECK_FCT( msg_free(*msg, 1) );
						*msg = NULL;
					}
				}
				break;

			case ER_DIAMETER_REDIRECT_INDICATION:
				if (g_pconf->handle_redirects != 0) {
					/* In this case, the list of routing will be different, we try once again to send the query */
					CHECK_FCT( msg_answ_detach( *msg)  );
					CHECK_FCT( msg_free(*msg, 1) );
					*msg = qry;
				}
				break;

			/* default: just forward the error */
		}
	}
	if (*msg) {
		TRACE_DEBUG(FULL, "The message will be forwarded following normal path." );
	} else {
		TRACE_DEBUG(FULL, "The message has been handled." );
	}
	return 0;
}

/* Add an entry to the list of rejected peers */
static int reject_add(uti_list_t *rejects, msg_avp_data_t * avpdata)
{
	uti_list_t *li;
	rt_reject_t * new = NULL;
	uint32_t hash;
	
	TRACE_ENTRY("%p %p", rejects, avpdata);
	
	if (avpdata->avp_data == NULL) {
		/* Ignore if the data is not set */
		TRACE_DEBUG(FULL, "AVP with unset value => ??");
		ASSERT(0); /* To check if this really happens, and understand why... */
		return EINVAL;
	}
	
	/* Now compute the hash */
	hash = uti_hash ( (char *)avpdata->avp_data->os.data, avpdata->avp_data->os.len );
	
	/* Find the location to insert in the list */
	for (li = rejects->next; li != rejects; li = li->next) {
		rt_reject_t * nxt = (rt_reject_t *)li;
		size_t l_nxt = ((avp_value_t *)(li->o))->os.len;
		size_t l_new = avpdata->avp_data->os.len;
		int cmp = 0;
		
		if (nxt->hash < hash)
			continue;
		
		if (nxt->hash > hash)
			break;
		
		/* Same hash, compare the strings */
		cmp = strncasecmp((char *)((avp_value_t *)(li->o))->os.data, (char *)avpdata->avp_data->os.data, l_nxt < l_new ? l_nxt : l_new );
		
		if (cmp < 0)
			continue;
		
		if (cmp > 0)
			break;
		
		/* Same radical, compare the lengths */
		if (l_nxt < l_new)
			continue;
		
		if (l_nxt > l_new)
			break;
		
		/* Ok, we already have the same element, we can stop... */
		return 0;
	}
	
	/* We can create the new element and insert it before the "li" element */
	CHECK_MALLOC( new = (rt_reject_t *) malloc( sizeof(rt_reject_t) ) );
	uti_list_init(&new->chain, avpdata->avp_data);
	new->hash = hash;
	uti_list_insert_before(li, &new->chain);
	
	return 0;
}

/* Create the list of rt_reject_t corresponding to the message. This contains Origin-host + all Route-Records diameter ids. */
static int create_list_rejects(msg_t * msg, uti_list_t *rejects)
{
	msg_avp_t * avp = NULL;
	
	TRACE_ENTRY("%p %p", msg, rejects);
	
	CHECK_FCT(  msg_browse(msg, MSG_BRW_FIRST_CHILD, &avp, NULL)  );
	
	/* Now loop on all AVPs -- we will break when last AVP was parsed */
	while (avp) {
		msg_avp_data_t * avpdata;

		CHECK_FCT(  msg_avp_data( avp, &avpdata )  );

		if (avpdata->avp_flags & AVP_FLAG_VENDOR) {
			goto next;
		}

		switch (avpdata->avp_code) {
			case AC_ORIGIN_HOST: /* Origin-Host */
			case AC_ROUTE_RECORD: /* Route-Record */
				/* Dictionary-resolve this object, it is not done already for new messages */
				CHECK_FCT( msg_parse_dict_avp(avp) );
				/* And now add the diameter id to the list of rejected peers for forwarding */
				CHECK_FCT( reject_add(rejects, avpdata) );
				break;

			default: /* Other AVP */
				/* just skip */
				;
		}

next:			
		/* Go to next AVP */
		CHECK_FCT( msg_browse(avp, MSG_BRW_NEXT, &avp, NULL) );
	}
	
	/* We're done */
	return 0;
}

/* Order the list of peers after outrt callbacks were called */
static int order_rt_list(rt_dpl_t **list)
{
	rt_dpl_t sorted = { .next = NULL }; /* list of sorted elements */
	
	TRACE_ENTRY("%p", list);
	CHECK_PARAMS( list );
	
	while (*list != NULL) {
		rt_dpl_t * prev = &sorted;
		
		/* Get the first element from list */
		rt_dpl_t * next = *list;
		*list = next->next;
		
		if (next->score <= 0) {
			free(next);
			continue;
		}
		
		/* Now insert this element in out list */
		while ((prev->next != NULL) && (prev->next->score > next->score))
			prev = prev->next;
		
		next->next = prev->next;
		prev->next = next;
	}
	
	*list = sorted.next;
	
	return 0;
}

/* Pass a message to all registered callbacks. The read lock must be taken already. */
static int process_fwcb_list(uti_list_t * list, msg_t ** msg)
{
	uti_list_t * li;
	
	for (li = list->next; *msg && (li != list); li = li->next) {
		int ret = 0;
		_rt_hdl_t * hdl = (_rt_hdl_t *)li;
		
		TRACE_DEBUG(ANNOYING, "Calling next FW routing callback: %p for message %p", hdl->cb, *msg);
		
		ret = (*(rt_fwd_cb_t)hdl->cb)(hdl->chain.o, *msg);
		if (ret != 0) {
			TRACE_DEBUG(FULL, "A forward handler returned %d != 0, message discarded.", ret);
			msg_free(*msg, 1);
			*msg = NULL;
		}
	}
	
	return 0;
}

/* Pass a message to all registered callbacks. The read lock must be taken already. */
static int process_outcb_list(uti_list_t * cblist, msg_t * msg, rt_dpl_t * list )
{
	uti_list_t * li;
	
	for (li = cblist->next; li != cblist; li = li->next) {
		_rt_hdl_t * hdl = (_rt_hdl_t *)li;
		TRACE_DEBUG(ANNOYING, "Calling next OUT routing callback: %p for message %p", hdl->cb, msg);
		CHECK_FCT( (*(rt_out_cb_t)hdl->cb)(hdl->chain.o, msg, list)  );
	}
	
	return 0;
}

/************************************************************************************************/

/* Thread for messages in INCOMING queue */
static void * rt_in_th(void * arg)
{
	THREAD_NAME( "IN Routing" );
	
	/* Loop */
	do {
		msg_t * msg = NULL;
		msg_data_t * hdr;
		int is_req = 0;
		int is_err = 0;
		int call_cb = 0;
		int for_local = 0; /* 1: must handle locally. -1: must but cannot */
		
		pthread_testcancel();
		
		/* pick next message in global incoming queue */
		CHECK_FCT_DO(  meq_get( g_meq_incoming, &msg ),  goto error );
		
		/* Retrieve some flag values */
		CHECK_FCT_DO(  msg_data(msg, &hdr), goto error  );
		is_req = hdr->msg_flags & CMD_FLAG_REQUEST;
		is_err = hdr->msg_flags & CMD_FLAG_ERROR;
		
		if (is_req) {
			CHECK_FCT_DO(  handle_locally( msg, hdr->msg_appl, &for_local ), goto error  );
			/* For requests, call the handlers only for messages to be forwarded */
			call_cb = ! for_local;
		} else {
			/* For answers */
			msg_t * qry = NULL;
			char * qry_src = NULL;
			
			/* Get corresponding query */
			CHECK_FCT_DO(  msg_answ_getq( msg, &qry ), goto error  );
			ASSERT(qry != NULL);
			if (qry == NULL)
				goto error;
			
			/* Was the query created locally or forwarded ? */
			CHECK_FCT_DO(  msg_source_get( qry, &qry_src, NULL ), goto error  );
			if (qry_src == NULL)
				for_local = 1;
			
			/* We have to call the callbacks for errors and forwarded answers */
			call_cb = (is_err || ! for_local);
		}
		
		if ( (for_local == -1) || ((for_local == 0) && (g_pconf->disable_relay != 0) ) ) {
			/* Generate an error UNABLE TO DELIVER */
			if (for_local)
				log_normal("Received a message for local peer but application is not supported.\n");
			else
				log_normal("Relaying message is disabled, sending an error.\n");
			msg_dump_walk(FULL, msg);
			
			if (is_req) {
				/* Create the answer message */
				CHECK_FCT_DO( msg_new_answer_from_req(&msg, 1), goto error  );

				/* Set the result code */
				CHECK_FCT_DO( msg_rescode_set(msg, "DIAMETER_APPLICATION_UNSUPPORTED", NULL, NULL, 1), goto error  );

				/* Requeue in the incoming queue */
				CHECK_FCT_DO(  meq_post( g_meq_incoming, msg ),
					{
						CHECK_FCT_DO(  msg_free(msg, 1), /* nothing */ );
						goto error;
					} );
			} else {
				TRACE_DEBUG(INFO, "Abnormal: must forward an answer when relaying is disabled...");
				ASSERT(0);
				CHECK_FCT_DO(  msg_free(msg, 1), /* nothing */ );
			}
				
			/* Jump to next message */
			continue;
		}
		
		if (call_cb) {
			/* Pass the message to all registered forward routing callbacks */
			
			CHECK_POSIX_DO( pthread_rwlock_rdlock(&fwd_lck), goto error );
			
			pthread_cleanup_push(cleanup_rwlock, &fwd_lck);

			if (is_req) {
				CHECK_FCT_DO( process_fwcb_list( &fwd_req, &msg ), goto error );
			} else {
				CHECK_FCT_DO( process_fwcb_list( &fwd_ans, &msg ), goto error );
			}
			CHECK_FCT_DO( process_fwcb_list( &fwd_all, &msg ), goto error );
			
			pthread_cleanup_pop(0);
			
			CHECK_POSIX_DO( pthread_rwlock_unlock(&fwd_lck), goto error );
		}
		
		if ((msg != NULL) && (is_err)) {
			CHECK_FCT_DO( handle_error_msg( &msg ), goto error );
		}
		
		if (msg != NULL) {
			/* Requeue the message for another thread to pick it */
			CHECK_FCT_DO(  meq_post( for_local ? g_meq_local : g_meq_outgoing, msg ),
				{
					CHECK_FCT_DO(  msg_free(msg, 1), /* nothing */ );
					goto error;
				} );
		}
		
	} while (1);
error:
	/* An error occured */	
	CHECK_FCT_DO(  uti_event_send(WDE_ERROR),  /* nothing */  );
	TRACE_DEBUG(INFO, "Thread terminated");
	
	return NULL;
}


/* Thread for messages in OUTGOING */
static void * rt_out_th(void * arg)
{
	THREAD_NAME( "OUT Routing" );
	
	/* Loop */
	do {
		msg_t * msg = NULL;
		msg_data_t * hdr;
		rt_dpl_t * list = NULL;
		uti_list_t rejects;
		
		pthread_testcancel();
		
		/* pick next message in global outgoing queue */
		CHECK_FCT_DO(  meq_get( g_meq_outgoing, &msg ),  goto error );
		
		/* Retrieve some flag values */
		CHECK_FCT_DO(  msg_data(msg, &hdr), goto error  );
		
		/* Handle the case of answers, routing is very simple */
		if ((hdr->msg_flags & CMD_FLAG_REQUEST) == 0 ) {
			msg_t * qry = NULL;
			msg_data_t * qry_hdr;
			char * qry_src = NULL;
			uint32_t qry_src_h = 0;
			peer_t * peer = NULL;
			
			/* Get corresponding query */
			CHECK_FCT_DO(  msg_answ_getq( msg, &qry ), goto error  );
			ASSERT(qry != NULL);
			if (qry == NULL)
				goto error;
			
			/* Retrieve the source of the corresponding query */
			CHECK_FCT_DO(  msg_source_get( qry, &qry_src, &qry_src_h ), goto error  );
			ASSERT(qry_src != NULL);
			
			peer = peer_struct_getptr(qry_src, qry_src_h);
			if (peer == NULL) {
				/* Ok the peer does not exist anymore, just discard the message */
				log_error("Unable to forward answer to deleted peer '%d', message dropped.\n", qry_src);
				msg_dump_walk(INFO, msg);
				msg_dump_walk(INFO, qry);
				msg_free(msg,1);
				continue;
			}
			
			/* Set the hop-by-hop id of the answer to the same value as original received query */
			CHECK_FCT_DO(  msg_data(qry, &qry_hdr), goto error  );
			hdr->msg_hbhid = qry_hdr->msg_hbhid;
			
			/* Now send this answer to the peer */
			CHECK_FCT_DO( peer_send ( peer, msg ),
				{
					/* If an error occurred (probably, peer not in open state ) */
					log_error("An error occurred while forwarding answer to peer '%d', message dropped.\n", qry_src);
					msg_dump_walk(INFO, msg);
					msg_free(msg,1);
				}  );
			
			continue;
		}
		
		/* Ok, we are forwarding a query if we are here */
		uti_list_init(&rejects, NULL);
		
		/* Create a list of the peers that are not candidates since they are already in the R-R */
		CHECK_FCT_DO( create_list_rejects(msg, &rejects), goto error  );
		
		/* Create a list with all the "open" peers, without the peers from rejects */
		CHECK_FCT_DO( peer_struct_list_open( hdr->msg_appl, &list, &rejects ), goto error  );
		
		if (list == NULL) {
unable_deliver:
			/* 0 peer remaining? => UNABLE_TO_DELIVER */
			log_normal("Unable to forward a request (application %u), sending an error.\n", hdr->msg_appl);
			msg_dump_walk(FULL, msg);
			
			/* Create the answer message */
			CHECK_FCT_DO( msg_new_answer_from_req(&msg, 1), goto error  );
			
			/* Set the result code */
			CHECK_FCT_DO( msg_rescode_set(msg, "DIAMETER_UNABLE_TO_DELIVER", NULL, NULL, 1), goto error  );
			
			/* Requeue in the incoming queue */
			CHECK_FCT_DO(  meq_post( g_meq_incoming, msg ),
				{
					CHECK_FCT_DO(  msg_free(msg, 1), /* nothing */ );
					goto error;
				} );
			
			/* Jump to next message */
			continue;
		}
	
		/* call the rt_out_cb_t callbacks on the peers in the list */
		CHECK_POSIX_DO( pthread_rwlock_rdlock(&out_lck), goto error );
		pthread_cleanup_push(cleanup_rwlock, &out_lck);

		CHECK_FCT_DO( process_outcb_list( &out_nrm, msg, list ), 
			{
				log_normal("An error occurred in one of the routing extension, message discarded.\n");
				msg_dump_walk(INFO, msg);
				msg_free(msg, 1);
				CHECK_POSIX_DO( pthread_rwlock_unlock(&out_lck),  );
				continue;
			} );
		CHECK_FCT_DO( process_outcb_list( &out_late, msg, list ), 
			{
				log_normal("An error occurred in one of the routing extension, message discarded.\n");
				msg_dump_walk(INFO, msg);
				msg_free(msg, 1);
				CHECK_POSIX_DO( pthread_rwlock_unlock(&out_lck),  );
				continue;
			} );

		pthread_cleanup_pop(0);
		CHECK_POSIX_DO( pthread_rwlock_unlock(&out_lck), goto error );

		/* order the list by priority and remove <=0 scores */
		CHECK_FCT_DO( order_rt_list(&list), goto error );
		
		/* If no peer remained, then we must send an error */
		if (list == NULL)
			goto unable_deliver;
		
		/* Send the message to the first peer in the list that will accept it, if any */
		CHECK_FCT_DO( try_send_to_list( &msg, &list), goto error );
		
		/* If the message was not sent */
		if (msg) {
			/* The message could not be sent, requeue for new try */
			CHECK_FCT_DO(  meq_post( g_meq_outgoing, msg ),
				{
					CHECK_FCT_DO(  msg_free(msg, 1), /* nothing */ );
					goto error;
				} );
		}
		
		/* Ok, go to next message */
	} while (1);
error:
	/* An error occured */	
	CHECK_FCT_DO(  uti_event_send(WDE_ERROR),  /* nothing */  );
	TRACE_DEBUG(INFO, "Thread terminated");
	
	return NULL;
}

/* Start the in thread */
static int rt_in_start()
{
	TRACE_ENTRY( "" );
	
	CHECK_POSIX(  pthread_create( &in_th, NULL, rt_in_th, NULL )  );
	
	return 0;
}
/* Stop the in thread */
static int rt_in_stop()
{
	TRACE_ENTRY( "" );
	if (in_th != (pthread_t)NULL)
		return _thread_term(&in_th);
	else
		return 0;
}
/* Start the in thread */
static int rt_out_start()
{
	TRACE_ENTRY( "" );
	
	CHECK_POSIX(  pthread_create( &out_th, NULL, rt_out_th, NULL )  );
	
	return 0;
}
/* Stop the in thread */
static int rt_out_stop()
{
	TRACE_ENTRY( "" );
	if (out_th != (pthread_t)NULL)
		return _thread_term(&out_th);
	else
		return 0;
}


/************************************************************************************************/

/* Initialize the module */
int rt_init ( void )
{
	TRACE_ENTRY( "" );
	
	CHECK_POSIX( pthread_rwlock_init( &out_lck, NULL ) );
	CHECK_POSIX( pthread_rwlock_init( &fwd_lck, NULL ) );
	
	rt_handlers_init();
	
	CHECK_FCT( rt_in_start() );
	CHECK_FCT( rt_out_start() );
	return 0;
}

/* End of the module */
int rt_fini ( void )
{
	TRACE_ENTRY( "" );
	CHECK_FCT( rt_in_stop() );
	CHECK_FCT( rt_out_stop() );
	
	/* Destroy all registered handlers */
	while (!IS_LIST_EMPTY(&out_nrm)) {
		CHECK_FCT_DO( rt_out_unregister ( (rt_out_hdl_t *) out_nrm.next ), break );
	}
	while (!IS_LIST_EMPTY(&out_late)) {
		CHECK_FCT_DO( rt_out_unregister ( (rt_out_hdl_t *) out_late.next ), break );
	}
	while (!IS_LIST_EMPTY(&fwd_req)) {
		CHECK_FCT_DO( rt_fwd_unregister ( (rt_fwd_hdl_t *) fwd_req.next ), break );
	}
	while (!IS_LIST_EMPTY(&fwd_ans)) {
		CHECK_FCT_DO( rt_fwd_unregister ( (rt_fwd_hdl_t *) fwd_ans.next ), break );
	}
	while (!IS_LIST_EMPTY(&fwd_all)) {
		CHECK_FCT_DO( rt_fwd_unregister ( (rt_fwd_hdl_t *) fwd_all.next ), break );
	}
			
	CHECK_POSIX_DO( pthread_rwlock_destroy( &out_lck ), /* continue */ );
	CHECK_POSIX_DO( pthread_rwlock_destroy( &fwd_lck ), /* continue */ );
	return 0;
}

/* Register a callback in one of the lists */
static int _rt_register( void * cb, void * data, void **handler, pthread_rwlock_t * lock, uti_list_t * sem)
{
	_rt_hdl_t * new;
	
	CHECK_MALLOC( new = (_rt_hdl_t *) malloc(sizeof(_rt_hdl_t)) );
	
	uti_list_init( &new->chain, data );
	new->cb = cb;
	
	CHECK_POSIX( pthread_rwlock_wrlock(lock) );
	uti_list_insert_before( sem, &new->chain );
	CHECK_POSIX( pthread_rwlock_unlock(lock) );
	
	*handler = new;
	return 0;
}

/* Register a callback for routing-out. */
int rt_out_register ( rt_out_cb_t cb, void * data, rt_out_pos_t pos, rt_out_hdl_t ** handler )
{
	uti_list_t * list = NULL;
	
	TRACE_ENTRY( "%p %p %d %p", cb, data, pos, handler );
	
	CHECK_PARAMS( cb && handler );
	
	switch (pos) {
		case RT_OUT_NORMAL:
			list = &out_nrm;
			break;
		case RT_OUT_LATE:
			list = &out_late;
			break;
		default:
			CHECK_PARAMS( list );
	}
	
	return _rt_register( cb, data, handler, &out_lck, list );
}

/* Remove a routing-out callback */
int rt_out_unregister ( rt_out_hdl_t * handler )
{
	_rt_hdl_t * h = (_rt_hdl_t *) handler;
	
	TRACE_ENTRY( "%p", handler );
	
	CHECK_POSIX( pthread_rwlock_wrlock(&out_lck) );
	
	uti_list_unlink( &h->chain );
	
	CHECK_POSIX( pthread_rwlock_unlock(&out_lck) );
	
	free(h);
	
	return 0;
}

/* Register a callback for routing-fwd. */
int rt_fwd_register ( rt_fwd_cb_t cb, void * data, rt_fwd_pos_t pos, rt_fwd_hdl_t ** handler )
{
	uti_list_t * list = NULL;
	
	TRACE_ENTRY( "%p %p %d %p", cb, data, pos, handler );
	
	CHECK_PARAMS( cb && handler );
	
	switch (pos) {
		case RT_FWD_REQ:
			list = &fwd_req;
			break;
		case RT_FWD_ANS:
			list = &fwd_ans;
			break;
		case RT_FWD_ALL:
			list = &fwd_all;
			break;
		default:
			CHECK_PARAMS( list );
	}
	
	return _rt_register( cb, data, handler, &fwd_lck, list );
}

/* Remove a routing-fwd callback */
int rt_fwd_unregister ( rt_fwd_hdl_t * handler )
{
	_rt_hdl_t * h = (_rt_hdl_t *) handler;
	
	TRACE_ENTRY( "%p", handler );
	
	CHECK_POSIX( pthread_rwlock_wrlock(&fwd_lck) );
	
	uti_list_unlink( &h->chain );
	
	CHECK_POSIX( pthread_rwlock_unlock(&fwd_lck) );
	
	free(h);
	
	return 0;
}


"Welcome to our mercurial repository"