Mercurial > hg > waaad
changeset 246:13647ca6e0ad
Progress on the routing module
author | Sebastien Decugis <sdecugis@nict.go.jp> |
---|---|
date | Tue, 02 Dec 2008 16:20:09 +0900 |
parents | c141c6a50f3d |
children | adbc3782ba69 |
files | waaad/dict-base.c waaad/dict-hardcoded.h waaad/message.c waaad/message.h waaad/peer-internal.h waaad/peer-psm.c waaad/peer-struct.c waaad/peer.h waaad/routing.c |
diffstat | 9 files changed, 486 insertions(+), 30 deletions(-) [+] |
line wrap: on
line diff
--- a/waaad/dict-base.c Fri Nov 28 18:22:14 2008 +0900 +++ b/waaad/dict-base.c Tue Dec 02 16:20:09 2008 +0900 @@ -1821,6 +1821,9 @@ */ dict_avp_data_t data = { 263, /* Code */ + #if AC_SESSION_ID != 263 + #error "AC_SESSION_ID definition mismatch" + #endif 0, /* Vendor */ "Session-Id", /* Name */ AVP_FLAG_VENDOR | AVP_FLAG_MANDATORY, /* Fixed flags */
--- a/waaad/dict-hardcoded.h Fri Nov 28 18:22:14 2008 +0900 +++ b/waaad/dict-hardcoded.h Tue Dec 02 16:20:09 2008 +0900 @@ -59,6 +59,7 @@ #define AC_VENDOR_SPECIFIC_APPLICATION_ID 260 #define AC_REDIRECT_HOST_USAGE 261 #define AC_REDIRECT_MAX_CACHE_TIME 262 +#define AC_SESSION_ID 263 #define AC_ORIGIN_HOST 264 #define AC_SUPPORTED_VENDOR_ID 265 #define AC_VENDOR_ID 266
--- a/waaad/message.c Fri Nov 28 18:22:14 2008 +0900 +++ b/waaad/message.c Tue Dec 02 16:20:09 2008 +0900 @@ -1293,6 +1293,14 @@ return _mpd_do(_C(msg), 0); } +/* Idem, for one AVP. */ +int msg_parse_dict_avp ( msg_avp_t * avp ) +{ + TRACE_ENTRY("%p", avp); + + return _mpd_do(_C(avp), 1); +} + /***************************************************************************************************************/ @@ -2059,6 +2067,60 @@ return 0; } +/* Create an anwer to a query */ +int msg_new_answer_from_req ( msg_t ** msg ) +{ + _msg_t *qry, *answ = NULL; + + TRACE_ENTRY("%p", msg); + + CHECK_PARAMS( msg && CHECK_MSG(*msg) ); + + qry = (_msg_t *)(*msg); + CHECK_PARAMS( qry->msg_public.msg_flags & CMD_FLAG_REQUEST ); + + /* Create the new message */ + CHECK_MALLOC( answ = (_msg_t *) malloc (sizeof(_msg_t)) ); + + /* Initialize the fields */ + init_msg(answ); + answ->msg_public.msg_version = MSG_VERSION; + answ->msg_public.msg_flags = qry->msg_public.msg_flags & ~CMD_FLAG_REQUEST; + answ->msg_public.msg_code = qry->msg_public.msg_code; + answ->msg_public.msg_appl = qry->msg_public.msg_appl; + answ->msg_public.msg_hbhid = qry->msg_public.msg_hbhid; + answ->msg_public.msg_eteid = qry->msg_public.msg_eteid; + + /* Associate the answer and the query */ + CHECK_FCT( msg_answ_associate( (msg_t *)answ, (msg_t *)qry ) ); + *msg = (msg_t *)answ; + + /* If the first AVP of the query is a Session-Id, copy it to the answer */ + { + msg_avp_t * avp = NULL; + msg_avp_t * navp = NULL; + msg_avp_data_t * avpdata = NULL; + + CHECK_FCT( msg_browse(qry, MSG_BRW_FIRST_CHILD, &avp, NULL) ); + CHECK_FCT( msg_avp_data( avp, &avpdata ) ); + + if (((avpdata->avp_flags & AVP_FLAG_VENDOR) == 0) && (avpdata->avp_code == AC_SESSION_ID)) { + CHECK_FCT( msg_parse_dict_avp(avp) ); + ASSERT(avpdata->avp_data); + + CHECK_FCT( msg_avp_new( ((_msg_avp_t *)avp)->avp_model, 0, &navp ) ); + + /* Set its value */ + CHECK_FCT( msg_avp_setvalue( navp, avpdata->avp_data ) ); + + /* Add it to the message */ + CHECK_FCT( msg_avp_add( *msg, MSG_BRW_FIRST_CHILD, navp ) ); + } + } + + return 0; +} + /* Add Result-Code and eventually Failed-AVP, Error-Message and Error-Reporting-Host AVPs */ int msg_rescode_set( msg_t * msg, char * rescode, char * errormsg, msg_avp_t * optavp ) {
--- a/waaad/message.h Fri Nov 28 18:22:14 2008 +0900 +++ b/waaad/message.h Tue Dec 02 16:20:09 2008 +0900 @@ -160,6 +160,23 @@ int msg_parse_dict ( msg_t * msg ); /* + * FUNCTION: msg_parse_dict_avp + * + * PARAMETERS: + * msg : An AVP from the msg_t tree. + * + * DESCRIPTION: + * This function is similar to msg_parse_dict, but only for one AVP of a message. + * + * RETURN VALUE: + * 0 : The message has been fully parsed as described. + * EINVAL : the msg parameter is invalid for this operation. + * ENOMEM : Unable to allocate enough memory to complete the operation. + * ENOTSUP : No dictionary definition for the command or one of the mandatory AVP. + */ +int msg_parse_dict_avp ( msg_avp_t * avp ); + +/* * FUNCTION: msg_is_routable * * PARAMETERS: @@ -176,6 +193,25 @@ int msg_is_routable ( msg_t * msg ); /* + * FUNCTION: msg_new_answer_from_req + * + * PARAMETERS: + * msg : The location of the query on entry, and of answer on return. + * + * DESCRIPTION: + * This function creates the empty answer message for a request. + * The header is set properly (R flag, ccode, appid, hbhid, eteid) + * The Session-Id AVP is copied if present. + * The calling code should usually call msg_rescode_set function on the answer. + * Upon return, the original query may be retrieved by calling msg_answ_getq on the message. + * + * RETURN VALUE: + * 0 : Operation complete. + * !0 : an error occurred. + */ +int msg_new_answer_from_req ( msg_t ** msg ); + +/* * FUNCTION: msg_rescode_set * * PARAMETERS: @@ -253,4 +289,5 @@ int msg_add_origin ( msg_t * msg ); /* Add Origin-Host, Origin-Realm, Origin-State-Id AVPS at the end of the message */ + #endif /* _MESSAGE_H */
--- a/waaad/peer-internal.h Fri Nov 28 18:22:14 2008 +0900 +++ b/waaad/peer-internal.h Tue Dec 02 16:20:09 2008 +0900 @@ -282,7 +282,6 @@ /* The peers hash table. ALL peers are linked in the "all" hash list, by their p_global list */ typedef struct { uti_list_t all; /* Sentinel for the p_global list of peers which hash belongs to this sublist. */ - uti_list_t actives;/* idem for the p_active sublist */ pthread_mutex_t lock; /* to protect these lists */ int count; /* number of peers in this sublist */ } _peer_hash_t; @@ -291,11 +290,11 @@ #define H_MASK( __hash) ((__hash) & (( 1 << _PEER_HASH_SIZE ) - 1)) #define H_ALL( _hash ) (&(_peer_hash[H_MASK(_hash)].all )) -#define H_ACT( _hash ) (&(_peer_hash[H_MASK(_hash)].actives)) #define H_LOCK( _hash ) (&(_peer_hash[H_MASK(_hash)].lock )) #define H_COUNT(_hash ) ( _peer_hash[H_MASK(_hash)].count ) -/* the others lists of peers, for expiration mechanism */ +/* the others lists of peers */ +extern uti_list_t _peers_actives; /* peers in the OPEN state, linked by their p_active fields. */ extern uti_list_t _peers_deleted; /* deleted peers, threads are linked by their p_exp_list list. */ extern uti_list_t _peers_expiring; /* peers that will expire, linked by their p_exp_list list. */ extern pthread_mutex_t _peers_lists_mtx; /* mutex to protect these lists */
--- a/waaad/peer-psm.c Fri Nov 28 18:22:14 2008 +0900 +++ b/waaad/peer-psm.c Tue Dec 02 16:20:09 2008 +0900 @@ -1226,7 +1226,12 @@ /* Handle the "active" peers list */ if (peer->p_state != STATE_OPEN) { - uti_list_unlink( &peer->p_active ); + if (!IS_LIST_EMPTY(&peer->p_active)) { + CHECK_POSIX_DO( pthread_mutex_lock( &_peers_lists_mtx ), uti_event_send(WDE_ERROR) ); + TRACE_DEBUG(FULL, "Peer '%s' removed from the list of active peers", peer->p_diamid); + uti_list_unlink( &peer->p_active ); + CHECK_POSIX_DO( pthread_mutex_unlock( &_peers_lists_mtx ), uti_event_send(WDE_ERROR) ); + } return; } @@ -1234,7 +1239,9 @@ return; /* we are already linked */ /* Ok, we have to insert this peer into the active list */ - for (prev = H_ACT(peer->p_hash); prev->next != H_ACT(peer->p_hash); prev = prev->next) { + CHECK_POSIX_DO( pthread_mutex_lock( &_peers_lists_mtx ), uti_event_send(WDE_ERROR) ); + + for (prev = &_peers_actives; prev->next != &_peers_actives; prev = prev->next) { int cmp = 0; _peer_t * cur = _P(prev->next->o); @@ -1258,6 +1265,8 @@ uti_list_insert_after(prev, &peer->p_active); + CHECK_POSIX_DO( pthread_mutex_unlock( &_peers_lists_mtx ), uti_event_send(WDE_ERROR) ); + TRACE_DEBUG(FULL, "Peer '%s' inserted in the list of active peers", peer->p_diamid); return;
--- a/waaad/peer-struct.c Fri Nov 28 18:22:14 2008 +0900 +++ b/waaad/peer-struct.c Tue Dec 02 16:20:09 2008 +0900 @@ -45,17 +45,22 @@ /* The hash table */ _peer_hash_t _peer_hash[1 << _PEER_HASH_SIZE]; +/* The list of active peers */ +uti_list_t _peers_actives; + + /* Initialize the hash table */ int _peer_struct_init() { int i; + uti_list_init(&_peers_actives, NULL); + /* Initialize the hash table */ memset(_peer_hash, 0, sizeof(_peer_hash)); for (i = 0; i < sizeof(_peer_hash) / sizeof(_peer_hash[0]); i++) { uti_list_init(&_peer_hash[i].all, &_peer_hash[i]); - uti_list_init(&_peer_hash[i].actives, &_peer_hash[i]); CHECK_POSIX( pthread_mutex_init(&_peer_hash[i].lock, NULL) ); } @@ -423,7 +428,7 @@ } /* Find a structure from diamid and hash */ -peer_t * peer_getptr(char * diamid, uint32_t hash) +peer_t * peer_struct_getptr(char * diamid, uint32_t hash) { uti_list_t *li; peer_t * res = NULL; @@ -464,8 +469,103 @@ return NULL; } -/* Create the list of OPEN peers */ -int peer_list_open( rt_dpl_t ** list ) +/* Create the list of OPEN peers supporting this app or relay, without the rejected peers */ +int peer_struct_list_open( application_id_t app, rt_dpl_t ** list, uti_list_t * rejects ) { - return ENOTSUP; + uti_list_t * src = NULL; + + TRACE_ENTRY("%u %p %p", app, list, rejects); + + CHECK_PARAMS( list && rejects ); + + *list = NULL; + + /* Lock the list of active peers. This ensures that the peers will not be modified while accessing their data */ + CHECK_POSIX( pthread_mutex_lock( &_peers_lists_mtx ) ); + + /* We go through all the active peers */ + for (src = _peers_actives.next; src != &_peers_actives; src = src->next) { + _peer_t * src_peer = _P(src->o); + rt_dpl_t * dst = NULL; + int match = 0; + + /* First check if this peer may be candidate */ + if ((app != 0) && (src_peer->p_app_relay == 0)) { + int i; + /* check if the peer supports the required application */ + if (src_peer->p_app_size == 0) + continue; + ASSERT(src_peer->p_app_list != NULL); + + /* The array is ordered (see _peer_struct_appl_add) */ + for (i = 0; i < src_peer->p_app_size; i++) + if (src_peer->p_app_list[i].a >= app) + break; + /* Skip this peer if the corresponding application was NOT found */ + if (src_peer->p_app_list[i].a != app) + continue; + } + + /* Now browse the (ordered) rejects list to check if the current peer is not in it */ + while (!IS_LIST_EMPTY(rejects)) { + /* For the format of the rejects list, see create_list_rejects in routing.c */ + rt_reject_t * next = (rt_reject_t *)(rejects->next); + + if (next->hash > src_peer->p_hash) + break; + + if (next->hash == src_peer->p_hash) { + int cmp = 0; + avp_value_t *next_av = (avp_value_t *)(next->chain.o); + + cmp = strncasecmp((char *)next_av->os.data, src_peer->p_diamid, next_av->os.len); + + if (cmp > 0) + break; + + if (cmp == 0) { + size_t l = strlen(src_peer->p_diamid); + + if (next_av->os.len > l) + break; + + if (next_av->os.len == l) + match = 1; + } + } + + /* At this point, the next element in rejects is either equal (match = 1) or inferior (match = 0) to src_peer, we can delete it. */ + uti_list_unlink(&next->chain); + free(next); + } + + /* Skip the peer if it was found in the rejects list */ + if (match) + continue; + + /* Ok, we have to copy this peer in the destination list */ + CHECK_MALLOC_DO( dst = (rt_dpl_t *) malloc(sizeof(rt_dpl_t)), + { + CHECK_POSIX_DO( pthread_mutex_unlock( &_peers_lists_mtx ), /* continue */ ); + return ENOMEM; + } ); + + memset(dst, 0, sizeof(rt_dpl_t)); + + dst->peer = (peer_t *)src_peer; + dst->next = *list; + *list = dst; + } + + /* Unlock the list of active peers, we're done */ + CHECK_POSIX( pthread_mutex_unlock( &_peers_lists_mtx ) ); + + /* We can free any remaining entry in reject list */ + while (!IS_LIST_EMPTY(rejects)) { + uti_list_t * next = rejects->next; + uti_list_unlink(next); + free(next); + } + + return 0; }
--- a/waaad/peer.h Fri Nov 28 18:22:14 2008 +0900 +++ b/waaad/peer.h Tue Dec 02 16:20:09 2008 +0900 @@ -50,6 +50,9 @@ /* Include the definition of rt_dpl_t */ #include <waaad/routing-api.h> +/* Include the definition of uti_list_t */ +#include "utils.h" + /* Functions called only in the daemon */ /* @@ -136,23 +139,33 @@ * RETURN VALUE: * Pointer to the peer structure or NULL if not found. */ -peer_t * peer_getptr(char * diamid, uint32_t hash); +peer_t * peer_struct_getptr(char * diamid, uint32_t hash); +/* List of peers that must NOT be in the rt_dpl_t list returned by next function */ +/* Note that the list is ordered by hash and AVP value */ +typedef struct { + uti_list_t chain; /* linking information. The "o" field points to the AVP data location. */ + uint32_t hash; /* The hash value for this AVP data. */ +} rt_reject_t; /* * FUNCTION: peer_list_open * * PARAMETERS: - * list : location where the list must be stored. + * app : the application that must be supported (or 0 for all peers) + * list : location where the list must be stored. + * rejects: list of diameter ids of the peers that must not be returned. * * DESCRIPTION: - * Create a master list for routing module with all the peers in OPEN state, at the time of calling. + * Create a master list for routing module with all the peers in OPEN state (except rejected) + * that support the application app (or relay), at the time of calling. + * Note that this function also frees the content of the rejects list as a side effect. * * RETURN VALUE: * 0 : List is created successfully. * !0 : an error occurred. */ -int peer_list_open( rt_dpl_t ** list ); +int peer_struct_list_open( application_id_t app, rt_dpl_t ** list, uti_list_t * rejects ); #endif /* ! _PEER_H */
--- a/waaad/routing.c Fri Nov 28 18:22:14 2008 +0900 +++ b/waaad/routing.c Tue Dec 02 16:20:09 2008 +0900 @@ -69,6 +69,8 @@ uti_list_init( &fwd_all, NULL ); } +/************************************************************************************************/ + /* Is a request handled locally? */ static int handle_locally( msg_t * msg, int * res ) { @@ -99,6 +101,137 @@ return ENOTSUP; } +/* Add an entry to the list of rejected peers */ +static int reject_add(uti_list_t *rejects, msg_avp_data_t * avpdata) +{ + uti_list_t *li; + rt_reject_t * new = NULL; + uint32_t hash; + + TRACE_ENTRY("%p %p", rejects, avpdata); + + if (avpdata->avp_data == NULL) { + /* Ignore if the data is not set */ + TRACE_DEBUG(FULL, "AVP with unset value => ??"); + ASSERT(0); /* To check if this really happens, and understand why... */ + return EINVAL; + } + + /* Now compute the hash */ + hash = uti_hash ( (char *)avpdata->avp_data->os.data, avpdata->avp_data->os.len ); + + /* Find the location to insert in the list */ + for (li = rejects->next; li != rejects; li = li->next) { + rt_reject_t * nxt = (rt_reject_t *)li; + size_t l_nxt = ((avp_value_t *)(li->o))->os.len; + size_t l_new = avpdata->avp_data->os.len; + int cmp = 0; + + if (nxt->hash < hash) + continue; + + if (nxt->hash > hash) + break; + + /* Same hash, compare the strings */ + cmp = strncasecmp((char *)((avp_value_t *)(li->o))->os.data, (char *)avpdata->avp_data->os.data, l_nxt < l_new ? l_nxt : l_new ); + + if (cmp < 0) + continue; + + if (cmp > 0) + break; + + /* Same radical, compare the lengths */ + if (l_nxt < l_new) + continue; + + if (l_nxt > l_new) + break; + + /* Ok, we already have the same element, we can stop... */ + return 0; + } + + /* We can create the new element and insert it before the "li" element */ + CHECK_MALLOC( new = (rt_reject_t *) malloc( sizeof(rt_reject_t) ) ); + uti_list_init(&new->chain, &avpdata->avp_data); + new->hash = hash; + uti_list_insert_before(li, &new->chain); + + return 0; +} + +/* Create the list of rt_reject_t corresponding to the message. This contains Origin-host + all Route-Records diameter ids. */ +static int create_list_rejects(msg_t * msg, uti_list_t *rejects) +{ + msg_avp_t * avp = NULL; + + TRACE_ENTRY("%p %p", msg, rejects); + + CHECK_FCT( msg_browse(msg, MSG_BRW_FIRST_CHILD, &avp, NULL) ); + + /* Now loop on all AVPs -- we will break when last AVP was parsed */ + while (avp) { + msg_avp_data_t * avpdata; + + CHECK_FCT( msg_avp_data( avp, &avpdata ) ); + + if (avpdata->avp_flags & AVP_FLAG_VENDOR) { + goto next; + } + + switch (avpdata->avp_code) { + case AC_ORIGIN_HOST: /* Origin-Host */ + case AC_ROUTE_RECORD: /* Route-Record */ + /* Dictionary-resolve this object, it is not done already for new messages */ + CHECK_FCT( msg_parse_dict_avp(avp) ); + /* And now add the diameter id to the list of rejected peers for forwarding */ + CHECK_FCT( reject_add(rejects, avpdata) ); + break; + + default: /* Other AVP */ + /* just skip */ + ; + } + +next: + /* Go to next AVP */ + CHECK_FCT( msg_browse(avp, MSG_BRW_NEXT, &avp, NULL) ); + } + + /* We're done */ + return 0; +} + +/* Order the list of peers after outrt callbacks were called */ +static int order_rt_list(rt_dpl_t **list) +{ + rt_dpl_t sorted = { .next = NULL }; /* list of sorted elements */ + + TRACE_ENTRY("%p", list); + CHECK_PARAMS( list ); + + while (*list != NULL) { + rt_dpl_t * prev = &sorted; + + /* Get the first element from list */ + rt_dpl_t * next = *list; + *list = next->next; + + /* Now insert this element in out list */ + while ((prev->next != NULL) && (prev->next->score > next->score)) + prev = prev->next; + + next->next = prev->next; + prev->next = next; + } + + *list = sorted.next; + + return 0; +} + /* Pass a message to all registered callbacks. The read lock must be taken already. */ static int process_fwcb_list(uti_list_t * list, msg_t ** msg) { @@ -108,7 +241,7 @@ int ret = 0; _rt_hdl_t * hdl = (_rt_hdl_t *)li; - TRACE_DEBUG(ANNOYING, "Calling next routing callback: %p for message %p", hdl->cb, *msg); + TRACE_DEBUG(ANNOYING, "Calling next FW routing callback: %p for message %p", hdl->cb, *msg); ret = (*(rt_fwd_cb_t)hdl->cb)(hdl->chain.o, *msg); if (ret != 0) { @@ -120,8 +253,22 @@ return 0; } + +/* Pass a message to all registered callbacks. The read lock must be taken already. */ +static int process_outcb_list(uti_list_t * cblist, msg_t * msg, rt_dpl_t * list ) +{ + uti_list_t * li; + for (li = cblist->next; li != cblist; li = li->next) { + _rt_hdl_t * hdl = (_rt_hdl_t *)li; + TRACE_DEBUG(ANNOYING, "Calling next OUT routing callback: %p for message %p", hdl->cb, msg); + CHECK_FCT( (*(rt_out_cb_t)hdl->cb)(hdl->chain.o, msg, list) ); + } + return 0; +} + +/************************************************************************************************/ /* Thread for incoming messages */ static void * rt_in_th(void * arg) @@ -216,6 +363,7 @@ /* Loop */ do { rt_dpl_t * list = NULL; + uti_list_t rejects; pthread_testcancel(); @@ -249,7 +397,7 @@ CHECK_FCT_DO( msg_source_get( qry, &qry_src, &qry_src_h ), goto error ); ASSERT(qry_src != NULL); - peer = peer_getptr(qry_src, qry_src_h); + peer = peer_struct_getptr(qry_src, qry_src_h); if (peer == NULL) { /* Ok the peer does not exist anymore, just discard the message */ log_error("Unable to forward answer to peer '%d', the object does not exist anymore. Message dropped.\n", qry_src); @@ -275,27 +423,111 @@ } /* Ok, we are forwarding a query if we are here */ + uti_list_init(&rejects, NULL); - /* Create a list with all the "open" peers */ - CHECK_FCT_DO( peer_list_open( &list ), goto error ); + /* Create a list of the peers that are not candidates since they are already in the R-R */ + CHECK_FCT_DO( create_list_rejects(msg, &rejects), goto error ); + + /* Create a list with all the "open" peers, without the peers from rejects */ + CHECK_FCT_DO( peer_struct_list_open( hdr->msg_appl, &list, &rejects ), goto error ); - /* -- en fait ca serait beaucoup plus efficace de creer la liste sans les R-R directement... -- */ - - /* Remove the peers that are in the Route-Record AVPs of the message */ - - /* 0 peer remaining? => UNABLE_TO_DELIVER */ + if (list == NULL) { + /* 0 peer remaining? => UNABLE_TO_DELIVER */ + log_normal("Unable to forward a request (application %u), sending an error.\n", hdr->msg_appl); + msg_dump_walk(msg); + + /* Create the answer message */ + CHECK_FCT_DO( msg_new_answer_from_req(&msg), goto error ); + + /* Set the result code */ + CHECK_FCT_DO( msg_rescode_set(msg, "DIAMETER_UNABLE_TO_DELIVER", NULL, NULL), goto error ); + + /* Requeue in the incoming queue */ + CHECK_FCT_DO( meq_post( g_meq_incoming, msg ), + { + CHECK_FCT_DO( msg_free(msg, 1), /* nothing */ ); + goto error; + } ); + + /* Jump to next message */ + continue; + } - /* 1 peer remaining: send to this peer */ + if (list->next != NULL) { - /* otherwise call the rt_out_cb_t callbacks */ - - /* order the list */ + /* call the rt_out_cb_t callbacks */ + CHECK_POSIX_DO( pthread_rwlock_rdlock(&out_lck), goto error ); + + pthread_cleanup_push((void (*)())pthread_rwlock_unlock, &out_lck); + + CHECK_FCT_DO( process_outcb_list( &out_nrm, msg, list ), + { + log_normal("An error occurred in one of the routing extension, message discarded.\n"); + msg_dump_walk(msg); + msg_free(msg, 1); + CHECK_POSIX_DO( pthread_rwlock_unlock(&out_lck), ); + continue; + } ); + CHECK_FCT_DO( process_outcb_list( &out_late, msg, list ), + { + log_normal("An error occurred in one of the routing extension, message discarded.\n"); + msg_dump_walk(msg); + msg_free(msg, 1); + CHECK_POSIX_DO( pthread_rwlock_unlock(&out_lck), ); + continue; + } ); + + pthread_cleanup_pop(0); + + CHECK_POSIX_DO( pthread_rwlock_unlock(&out_lck), goto error ); + + /* order the list by priority */ + CHECK_FCT_DO( order_rt_list(&list), goto error ); + } - /* send to the first peer */ +trynext: + /* Send to peers in the list until success */ + if (list != NULL) { + rt_dpl_t * head = list; + peer_t * dest = list->peer; + int ret; + + list = head->next; + free(head); + + ret = peer_send( dest, msg ); + + if (ret == ENOTCONN) + goto trynext; + + CHECK_FCT_DO( ret, + { + CHECK_FCT_DO( msg_free(msg, 1), /* nothing */ ); + goto error; + } ); + + /* Message has been sent successfully */ + msg = NULL; + } - TRACE_DEBUG(INFO, "TBD..."); - msg_free(msg, 1); + /* Ok, now free the list */ + while (list != NULL) { + rt_dpl_t * head = list; + list = head->next; + free(head); + } + /* If the message was not sent */ + if (msg) { + /* The message could not be sent, requeue for new try */ + CHECK_FCT_DO( meq_post( g_meq_outgoing, msg ), + { + CHECK_FCT_DO( msg_free(msg, 1), /* nothing */ ); + goto error; + } ); + } + + /* Ok, go to next message */ } while (1); error: /* An error occured */