Mercurial > hg > freeDiameter
changeset 124:cc42d8607114
Completed cleanups of queues when the daemon is stopping
author | Sebastien Decugis <sdecugis@nict.go.jp> |
---|---|
date | Thu, 10 Dec 2009 14:15:04 +0900 |
parents | 960fa8048805 |
children | b424df830a82 |
files | freeDiameter/fD.h freeDiameter/queues.c freeDiameter/routing_dispatch.c include/freeDiameter/freeDiameter.h include/freeDiameter/libfreeDiameter.h libfreeDiameter/rt_data.c |
diffstat | 6 files changed, 846 insertions(+), 827 deletions(-) [+] |
line wrap: on
line diff
--- a/freeDiameter/fD.h Wed Dec 09 19:02:31 2009 +0900 +++ b/freeDiameter/fD.h Thu Dec 10 14:15:04 2009 +0900 @@ -99,8 +99,7 @@ extern struct fifo * fd_g_local; /* messages to be handled to local extensions */ /* Message queues */ int fd_queues_init(void); -int fd_queues_fini_rt(void); -int fd_queues_fini_disp(void); +int fd_queues_fini(struct fifo ** queue); /* Create all the dictionary objects defined in the Diameter base RFC. */ int fd_dict_base_protocol(struct dictionary * dict);
--- a/freeDiameter/queues.c Wed Dec 09 19:02:31 2009 +0900 +++ b/freeDiameter/queues.c Thu Dec 10 14:15:04 2009 +0900 @@ -50,30 +50,32 @@ return 0; } -/* Destroy the routing message queues */ -int fd_queues_fini_rt(void) +/* Destroy a queue after emptying it (and dumping the content) */ +int fd_queues_fini(struct fifo ** queue) { - TRACE_ENTRY(); + struct msg * msg; + int ret = 0; + TRACE_ENTRY("%p", queue); + + /* Note : the threads that post into this queue should already been stopped before this !!! */ + /* Empty all contents */ - TODO("Empty all contents (dump to log file ?)"); + while (1) { + /* Check if there is a message in the queue */ + ret = fd_fifo_tryget(*queue, &msg); + if (ret == EWOULDBLOCK) + break; + CHECK_FCT(ret); + + /* We got one! */ + fd_log_debug("The following message is lost because the daemon is stopping:\n"); + fd_msg_dump_walk(NONE, msg); + fd_msg_free(msg); + } - /* Now, delete the queues */ - CHECK_FCT( fd_fifo_del ( &fd_g_incoming ) ); - CHECK_FCT( fd_fifo_del ( &fd_g_outgoing ) ); + /* Now, delete the empty queue */ + CHECK_FCT( fd_fifo_del ( queue ) ); return 0; } - -/* Destroy the local message queue */ -int fd_queues_fini_disp(void) -{ - TRACE_ENTRY(); - - /* Empty all contents */ - TODO("Empty all contents (dump to log file ?)"); - - CHECK_FCT( fd_fifo_del ( &fd_g_local ) ); - - return 0; -}
--- a/freeDiameter/routing_dispatch.c Wed Dec 09 19:02:31 2009 +0900 +++ b/freeDiameter/routing_dispatch.c Thu Dec 10 14:15:04 2009 +0900 @@ -184,797 +184,10 @@ } /********************************************************************************/ -/* Helper functions */ -/********************************************************************************/ -/* Test if a User-Name AVP contains a Decorated NAI -- RFC4282, draft-ietf-dime-nai-routing-04 */ -static int is_decorated_NAI(union avp_value * un) -{ - int i; - TRACE_ENTRY("%p", un); - - /* If there was no User-Name, we return false */ - if (un == NULL) - return 0; - - /* Search if there is a '!' before any '@' -- do we need to check it contains a '.' ? */ - for (i = 0; i < un->os.len; i++) { - if ( un->os.data[i] == (unsigned char) '!' ) - return 1; - if ( un->os.data[i] == (unsigned char) '@' ) - break; - if ( un->os.data[i] == (unsigned char) '\\' ) - i++; /* next one was escaped */ - } - - return 0; -} - -/* Create new User-Name and Destination-Realm values */ -static int process_decorated_NAI(union avp_value * un, union avp_value * dr) -{ - int i, at_idx = 0, sep_idx = 0; - unsigned char * old_un; - TRACE_ENTRY("%p %p", un, dr); - CHECK_PARAMS(un && dr); - - /* Save the decorated User-Name, for example 'homerealm.example.net!user@otherrealm.example.net' */ - old_un = un->os.data; - - /* Search the positions of the first '!' and the '@' in the string */ - for (i = 0; i < un->os.len; i++) { - if ( (!sep_idx) && (old_un[i] == (unsigned char) '!') ) - sep_idx = i; - if ( old_un[i] == (unsigned char) '@' ) { - at_idx = i; - break; - } - if ( un->os.data[i] == (unsigned char) '\\' ) - i++; /* next one is escaped */ - } - - CHECK_PARAMS( 0 < sep_idx < at_idx < un->os.len); - - /* Create the new User-Name value */ - CHECK_MALLOC( un->os.data = malloc( at_idx ) ); - memcpy( un->os.data, old_un + sep_idx + 1, at_idx - sep_idx ); /* user@ */ - memcpy( un->os.data + at_idx - sep_idx, old_un, sep_idx ); /* homerealm.example.net */ - - /* Create the new Destination-Realm value */ - CHECK_MALLOC( dr->os.data = realloc(dr->os.data, sep_idx) ); - memcpy( dr->os.data, old_un, sep_idx ); - dr->os.len = sep_idx; - - TRACE_DEBUG(FULL, "Processed Decorated NAI '%.*s' into '%.*s' (%.*s)", - un->os.len, old_un, - at_idx, un->os.data, - dr->os.len, dr->os.data); - - un->os.len = at_idx; - free(old_un); - - return 0; -} - -/* Function to return an error to an incoming request */ -static int return_error(struct msg * msg, char * error_code, char * error_message, struct avp * failedavp) -{ - struct fd_peer * peer; - int is_loc = 0; - - /* Get the source of the message */ - { - char * id; - CHECK_FCT( fd_msg_source_get( msg, &id ) ); - - if (id == NULL) { - is_loc = 1; /* The message was issued locally */ - } else { - - /* Search the peer with this id */ - CHECK_FCT( fd_peer_getbyid( id, (void *)&peer ) ); - - if (!peer) { - TRACE_DEBUG(INFO, "Unable to send error '%s' to deleted peer '%s' in reply to:", error_code, id); - fd_msg_dump_walk(INFO, msg); - fd_msg_free(msg); - return 0; - } - } - } - - /* Create the error message */ - CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, &msg, MSGFL_ANSW_ERROR ) ); - - /* Set the error code */ - CHECK_FCT( fd_msg_rescode_set(msg, error_code, error_message, failedavp, 1 ) ); - - /* Send the answer */ - if (is_loc) { - CHECK_FCT( fd_fifo_post(fd_g_incoming, &msg) ); - } else { - CHECK_FCT( fd_out_send(&msg, NULL, peer) ); - } - - /* Done */ - return 0; -} - - -/********************************************************************************/ -/* Second part : the threads moving messages in the daemon */ +/* Some default OUT routing callbacks */ /********************************************************************************/ -/* Note: in the first version, we only create one thread of each kind. - We could improve the scalability by using the threshold feature of the queues - to create additional threads if a queue is filling up. - */ - -/* Control of the threads */ -enum thread_state { INITIAL = 0, RUNNING = 1, TERMINATED = 2 }; -static void cleanup_state(void * state_loc) -{ - if (state_loc) - *(enum thread_state *)state_loc = TERMINATED; -} -static pthread_mutex_t order_lock = PTHREAD_MUTEX_INITIALIZER; -static enum { RUN = 0, STOP = 1 } order_val = RUN;; - -/* The dispatching thread */ -static void * dispatch_thr(void * arg) -{ - TRACE_ENTRY("%p", arg); - - /* Set the thread name */ - { - char buf[48]; - snprintf(buf, sizeof(buf), "Dispatch %p", arg); - fd_log_threadname ( buf ); - } - - pthread_cleanup_push( cleanup_state, arg ); - - /* Mark the thread running */ - *(enum thread_state *)arg = RUNNING; - - do { - struct msg * msg; - struct msg_hdr * hdr; - int is_req = 0; - struct session * sess; - enum disp_action action; - const char * ec = NULL; - const char * em = NULL; - - /* Test the environment */ - { - int must_stop; - CHECK_POSIX_DO( pthread_mutex_lock(&order_lock), goto end ); /* we lock to flush the caches */ - must_stop = (order_val == STOP); - CHECK_POSIX_DO( pthread_mutex_unlock(&order_lock), goto end ); - if (must_stop) - goto end; - - pthread_testcancel(); - } - - /* Ok, we are allowed to run */ - - /* Get the next message from the queue */ - { - int ret; - CHECK_FCT_DO( ret = fd_fifo_get ( fd_g_local, &msg ), - { - if (ret == EPIPE) - /* The queue was destroyed */ - goto end; - goto fatal_error; - } ); - } - - if (TRACE_BOOL(FULL)) { - TRACE_DEBUG(FULL, "Picked next message"); - fd_msg_dump_one(ANNOYING, msg); - } - - /* Read the message header */ - CHECK_FCT_DO( fd_msg_hdr(msg, &hdr), goto fatal_error ); - is_req = hdr->msg_flags & CMD_FLAG_REQUEST; - - /* Note: if the message is for local delivery, we should test for duplicate - (draft-asveren-dime-dupcons-00). This may conflict with path validation decisions, no clear answer yet */ - - /* At this point, we need to understand the message content, so parse it */ - { - int ret; - CHECK_FCT_DO( ret = fd_msg_parse_or_error( &msg ), - { - /* in case of error, the message is already dump'd */ - if ((ret == EBADMSG) && (msg != NULL)) { - /* msg now contains the answer message to send back */ - CHECK_FCT_DO( fd_fifo_post(fd_g_outgoing, &msg), goto fatal_error ); - } - if (msg) { /* another error happen'd */ - TRACE_DEBUG(INFO, "An unexpected error occurred (%s), discarding a message:", strerror(ret)); - fd_msg_dump_walk(INFO, msg); - CHECK_FCT_DO( fd_msg_free(msg), /* continue */); - } - /* Go to the next message */ - continue; - } ); - } - - /* First, if the original request was registered with a callback and we receive the answer, call it. */ - if ( ! is_req ) { - struct msg * qry; - void (*anscb)(void *, struct msg **) = NULL; - void * data = NULL; - - /* Retrieve the corresponding query */ - CHECK_FCT_DO( fd_msg_answ_getq( msg, &qry ), goto fatal_error ); - - /* Retrieve any registered handler */ - CHECK_FCT_DO( fd_msg_anscb_get( qry, &anscb, &data ), goto fatal_error ); - - /* If a callback was registered, pass the message to it */ - if (anscb != NULL) { - - TRACE_DEBUG(FULL, "Calling callback registered when query was sent (%p, %p)", anscb, data); - (*anscb)(data, &msg); - - if (msg == NULL) { - /* Ok, the message is now handled, we can skip to the next one */ - continue; - } - } - } - - /* Retrieve the session of the message */ - CHECK_FCT_DO( fd_msg_sess_get(fd_g_config->cnf_dict, msg, &sess, NULL), goto fatal_error ); - - /* Now, call any callback registered for the message */ - CHECK_FCT_DO( fd_msg_dispatch ( &msg, sess, &action, &ec), goto fatal_error ); - - /* Now, act depending on msg and action and ec */ - if (!msg) - continue; - - switch ( action ) { - case DISP_ACT_CONT: - /* No callback has handled the message, let's reply with a generic error */ - em = "The message was not handled by any extension callback"; - ec = "DIAMETER_COMMAND_UNSUPPORTED"; - - case DISP_ACT_ERROR: - /* We have a problem with delivering the message */ - if (ec == NULL) { - ec = "DIAMETER_UNABLE_TO_COMPLY"; - } - - if (!is_req) { - TRACE_DEBUG(INFO, "Received an answer to a localy issued query, but no handler processed this answer!"); - fd_msg_dump_walk(INFO, msg); - fd_msg_free(msg); - msg = NULL; - break; - } - - /* Create an answer with the error code and message */ - CHECK_FCT_DO( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, &msg, 0 ), goto fatal_error ); - CHECK_FCT_DO( fd_msg_rescode_set(msg, (char *)ec, (char *)em, NULL, 1 ), goto fatal_error ); - - case DISP_ACT_SEND: - /* Now, send the message */ - CHECK_FCT_DO( fd_fifo_post(fd_g_outgoing, &msg), goto fatal_error ); - } - - /* We're done with this message */ - - } while (1); - -fatal_error: - TRACE_DEBUG(INFO, "An error occurred in dispatch module! Thread is terminating..."); - CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); - -end: - /* Mark the thread as terminated */ - pthread_cleanup_pop(1); - return NULL; -} - - -/* The (routing-in) thread -- see description in freeDiameter.h */ -static void * routing_in_thr(void * arg) -{ - TRACE_ENTRY("%p", arg); - - /* Set the thread name */ - { - char buf[48]; - snprintf(buf, sizeof(buf), "Routing-IN %p", arg); - fd_log_threadname ( buf ); - } - - pthread_cleanup_push( cleanup_state, arg ); - - /* Mark the thread running */ - *(enum thread_state *)arg = RUNNING; - - /* Main thread loop */ - do { - struct msg * msg; - struct msg_hdr * hdr; - int is_req = 0; - int is_err = 0; - char * qry_src = NULL; - - /* Test if we were told to stop */ - { - int must_stop; - CHECK_POSIX_DO( pthread_mutex_lock(&order_lock), goto end ); /* we lock to flush the caches */ - must_stop = (order_val == STOP); - CHECK_POSIX_DO( pthread_mutex_unlock(&order_lock), goto end ); - if (must_stop) - goto end; - - pthread_testcancel(); - } - - /* Get the next message from the incoming queue */ - { - int ret; - CHECK_FCT_DO( ret = fd_fifo_get ( fd_g_incoming, &msg ), - { - if (ret == EPIPE) - /* The queue was destroyed */ - goto end; - goto fatal_error; - } ); - } - - if (TRACE_BOOL(FULL)) { - TRACE_DEBUG(FULL, "Picked next message"); - fd_msg_dump_one(ANNOYING, msg); - } - - /* Read the message header */ - CHECK_FCT_DO( fd_msg_hdr(msg, &hdr), goto fatal_error ); - is_req = hdr->msg_flags & CMD_FLAG_REQUEST; - is_err = hdr->msg_flags & CMD_FLAG_ERROR; - - /* Handle incorrect bits */ - if (is_req && is_err) { - CHECK_FCT_DO( return_error( msg, "DIAMETER_INVALID_HDR_BITS", "R & E bits were set", NULL), goto fatal_error ); - continue; - } - - /* If it is a request, we must analyze its content to decide what we do with it */ - if (is_req) { - struct avp * avp, *un = NULL; - union avp_value * un_val = NULL, *dr_val = NULL; - enum status { UNKNOWN, YES, NO }; - /* Are we Destination-Host? */ - enum status is_dest_host = UNKNOWN; - /* Are we Destination-Realm? */ - enum status is_dest_realm = UNKNOWN; - /* Do we support the application of the message? */ - enum status is_local_app = UNKNOWN; - - /* Check if we have local support for the message application */ - if ( (hdr->msg_appl == 0) || (hdr->msg_appl == AI_RELAY) ) { - TRACE_DEBUG(INFO, "Received a routable message with application id 0, returning DIAMETER_APPLICATION_UNSUPPORTED"); - CHECK_FCT_DO( return_error( msg, "DIAMETER_APPLICATION_UNSUPPORTED", "Routable message with application id 0 or relay", NULL), goto fatal_error ); - continue; - } else { - struct fd_app * app; - CHECK_FCT_DO( fd_app_check(&fd_g_config->cnf_apps, hdr->msg_appl, &app), goto fatal_error ); - is_local_app = (app ? YES : NO); - } - - /* Parse the message for Dest-Host and Dest-Realm */ - CHECK_FCT_DO( fd_msg_browse(msg, MSG_BRW_FIRST_CHILD, &avp, NULL), goto fatal_error ); - while (avp) { - struct avp_hdr * ahdr; - CHECK_FCT_DO( fd_msg_avp_hdr( avp, &ahdr ), goto fatal_error ); - - if (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) { - switch (ahdr->avp_code) { - case AC_DESTINATION_HOST: - /* Parse this AVP */ - CHECK_FCT_DO( fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, NULL ), goto fatal_error ); - ASSERT( ahdr->avp_value ); - /* Compare the Destination-Host AVP of the message with our identity */ - if (ahdr->avp_value->os.len != fd_g_config->cnf_diamid_len) { - is_dest_host = NO; - } else { - is_dest_host = (strncasecmp(fd_g_config->cnf_diamid, (char *)ahdr->avp_value->os.data, fd_g_config->cnf_diamid_len) - ? NO : YES); - } - break; - - case AC_DESTINATION_REALM: - /* Parse this AVP */ - CHECK_FCT_DO( fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, NULL ), goto fatal_error ); - ASSERT( ahdr->avp_value ); - dr_val = ahdr->avp_value; - /* Compare the Destination-Realm AVP of the message with our identity */ - if (ahdr->avp_value->os.len != fd_g_config->cnf_diamrlm_len) { - is_dest_realm = NO; - } else { - is_dest_realm = (strncasecmp(fd_g_config->cnf_diamrlm, (char *)ahdr->avp_value->os.data, fd_g_config->cnf_diamrlm_len) - ? NO : YES); - } - break; - - case AC_USER_NAME: - /* Parse this AVP */ - CHECK_FCT_DO( fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, NULL ), goto fatal_error ); - ASSERT( ahdr->avp_value ); - un = avp; - un_val = ahdr->avp_value; - break; - } - } - - if ((is_dest_host != UNKNOWN) && (is_dest_realm != UNKNOWN) && un) - break; - - /* Go to next AVP */ - CHECK_FCT_DO( fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL), goto fatal_error ); - } - - /* OK, now decide what we do with the request */ - - /* Handle the missing routing AVPs first */ - if ( is_dest_realm == UNKNOWN ) { - CHECK_FCT_DO( return_error( msg, "DIAMETER_COMMAND_UNSUPPORTED", "Non-routable message not supported (invalid bit ? missing Destination-Realm ?)", NULL), goto fatal_error ); - continue; - } - - /* If we are listed as Destination-Host */ - if (is_dest_host == YES) { - if (is_local_app == YES) { - /* Ok, give the message to the dispatch thread */ - CHECK_FCT_DO(fd_fifo_post(fd_g_local, &msg), goto fatal_error ); - } else { - /* We don't support the application, reply an error */ - CHECK_FCT_DO( return_error( msg, "DIAMETER_APPLICATION_UNSUPPORTED", NULL, NULL), goto fatal_error ); - } - continue; - } - - /* If the message is explicitely for someone else */ - if ((is_dest_host == NO) || (is_dest_realm == NO)) { - if (fd_g_config->cnf_flags.no_fwd) { - CHECK_FCT_DO( return_error( msg, "DIAMETER_UNABLE_TO_DELIVER", "This peer is not an agent", NULL), goto fatal_error ); - continue; - } - } else { - /* Destination-Host was not set, and Destination-Realm is matching : we may handle or pass to a fellow peer */ - - /* test for decorated NAI (draft-ietf-dime-nai-routing-04 section 4.4) */ - if (is_decorated_NAI(un_val)) { - /* Handle the decorated NAI */ - CHECK_FCT_DO( process_decorated_NAI(un_val, dr_val), - { - /* If the process failed, we assume it is because of the AVP format */ - CHECK_FCT_DO( return_error( msg, "DIAMETER_INVALID_AVP_VALUE", "Failed to process decorated NAI", un), goto fatal_error ); - continue; - } ); - - /* We have transformed the AVP, now submit it again in the queue */ - CHECK_FCT_DO(fd_fifo_post(fd_g_incoming, &msg), goto fatal_error ); - continue; - } - - if (is_local_app == YES) { - /* Handle localy since we are able to */ - CHECK_FCT_DO(fd_fifo_post(fd_g_local, &msg), goto fatal_error ); - continue; - } - - if (fd_g_config->cnf_flags.no_fwd) { - /* We return an error */ - CHECK_FCT_DO( return_error( msg, "DIAMETER_APPLICATION_UNSUPPORTED", NULL, NULL), goto fatal_error ); - continue; - } - } - - /* From that point, for requests, we will call the registered callbacks, then forward to another peer */ - - } else { - /* The message is an answer */ - struct msg * qry; - - /* Retrieve the corresponding query and its origin */ - CHECK_FCT_DO( fd_msg_answ_getq( msg, &qry ), goto fatal_error ); - CHECK_FCT_DO( fd_msg_source_get( qry, &qry_src ), goto fatal_error ); - - if ((!qry_src) && (!is_err)) { - /* The message is a normal answer to a request issued localy, we do not call the callbacks chain on it. */ - CHECK_FCT_DO(fd_fifo_post(fd_g_local, &msg), goto fatal_error ); - continue; - } - - /* From that point, for answers, we will call the registered callbacks, then pass it to the dispatch module or forward it */ - } - - /* Call all registered callbacks for this message */ - { - struct fd_list * li; - - CHECK_FCT_DO( pthread_rwlock_rdlock( &rt_fwd_lock ), goto fatal_error ); - pthread_cleanup_push( fd_cleanup_rwlock, &rt_fwd_lock ); - - /* requests: dir = 1 & 2 => in order; answers = 3 & 2 => in reverse order */ - for ( li = (is_req ? rt_fwd_list.next : rt_fwd_list.prev) ; msg && (li != &rt_fwd_list) ; li = (is_req ? li->next : li->prev) ) { - struct rt_hdl * rh = (struct rt_hdl *)li; - - if (is_req && (rh->dir > RT_FWD_ALL)) - break; - if ((!is_req) && (rh->dir < RT_FWD_ALL)) - break; - - /* Ok, call this cb */ - TRACE_DEBUG(ANNOYING, "Calling next FWD callback on %p : %p", msg, rh->rt_fwd_cb); - CHECK_FCT_DO( (*rh->rt_fwd_cb)(rh->cbdata, &msg), - { - TRACE_DEBUG(INFO, "A FWD routing callback returned an error, message discarded."); - fd_msg_dump_walk(INFO, msg); - fd_msg_free(msg); - msg = NULL; - } ); - } - - pthread_cleanup_pop(0); - CHECK_FCT_DO( pthread_rwlock_unlock( &rt_fwd_lock ), goto fatal_error ); - - /* If a callback has handled the message, we stop now */ - if (!msg) - continue; - } - - /* Now handle the message to the next step: either forward to another peer, or for local delivery */ - if (is_req || qry_src) { - CHECK_FCT_DO(fd_fifo_post(fd_g_outgoing, &msg), goto fatal_error ); - } else { - CHECK_FCT_DO(fd_fifo_post(fd_g_local, &msg), goto fatal_error ); - } - - /* We're done with this message */ - } while (1); - -fatal_error: - TRACE_DEBUG(INFO, "An error occurred in routing module! IN thread is terminating..."); - CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); - -end: - /* Mark the thread as terminated */ - pthread_cleanup_pop(1); - return NULL; -} - - -/* The (routing-out) thread -- see description in freeDiameter.h */ -static void * routing_out_thr(void * arg) -{ - struct rt_data * rtd = NULL; - TRACE_ENTRY("%p", arg); - - /* Set the thread name */ - { - char buf[48]; - snprintf(buf, sizeof(buf), "Routing OUT %p", arg); - fd_log_threadname ( buf ); - } - - pthread_cleanup_push( cleanup_state, arg ); - - /* Mark the thread running */ - *(enum thread_state *)arg = RUNNING; - - - /* Main thread loop */ - do { - struct msg * msg; - struct msg_hdr * hdr; - int is_req = 0; - struct fd_list * li, *candidates; - struct avp * avp; - struct rtd_candidate * c; - - /* If we loop'd with some undeleted routing data, destroy it */ - if (rtd != NULL) - fd_rtd_free(&rtd); - - /* Test if we were told to stop */ - { - int must_stop; - CHECK_POSIX_DO( pthread_mutex_lock(&order_lock), goto end ); /* we lock to flush the caches */ - must_stop = (order_val == STOP); - CHECK_POSIX_DO( pthread_mutex_unlock(&order_lock), goto end ); - if (must_stop) - goto end; - - pthread_testcancel(); - } - - /* Get the next message from the ougoing queue */ - { - int ret; - CHECK_FCT_DO( ret = fd_fifo_get ( fd_g_outgoing, &msg ), - { - if (ret == EPIPE) - /* The queue was destroyed */ - goto end; - goto fatal_error; - } ); - } - - if (TRACE_BOOL(FULL)) { - TRACE_DEBUG(FULL, "Picked next message"); - fd_msg_dump_one(ANNOYING, msg); - } - - /* Read the message header */ - CHECK_FCT_DO( fd_msg_hdr(msg, &hdr), goto fatal_error ); - is_req = hdr->msg_flags & CMD_FLAG_REQUEST; - - /* For answers, the routing is very easy */ - if ( ! is_req ) { - struct msg * qry; - char * qry_src = NULL; - struct msg_hdr * qry_hdr; - struct fd_peer * peer = NULL; - - /* Retrieve the corresponding query and its origin */ - CHECK_FCT_DO( fd_msg_answ_getq( msg, &qry ), goto fatal_error ); - CHECK_FCT_DO( fd_msg_source_get( qry, &qry_src ), goto fatal_error ); - - ASSERT( qry_src ); /* if it is NULL, the message should have been in the LOCAL queue! */ - - /* Find the peer corresponding to this name */ - CHECK_FCT_DO( fd_peer_getbyid( qry_src, (void *) &peer ), goto fatal_error ); - if ((!peer) || (peer->p_hdr.info.runtime.pir_state != STATE_OPEN)) { - TRACE_DEBUG(INFO, "Unable to forward answer message to peer '%s', deleted or not in OPEN state.", qry_src); - fd_msg_dump_walk(INFO, msg); - fd_msg_free(msg); - continue; - } - - /* We must restore the hop-by-hop id */ - CHECK_FCT_DO( fd_msg_hdr(qry, &qry_hdr), goto fatal_error ); - hdr->msg_hbhid = qry_hdr->msg_hbhid; - - /* Push the message into this peer */ - CHECK_FCT_DO( fd_out_send(&msg, NULL, peer), goto fatal_error ); - - /* We're done with this answer */ - continue; - } - - /* From that point, the message is a request */ - - /* Get the routing data out of the message if any (in case of re-transmit) */ - CHECK_FCT_DO( fd_msg_rt_get ( msg, &rtd ), goto fatal_error ); - - /* If there is no routing data already, let's create it */ - if (rtd == NULL) { - CHECK_FCT_DO( fd_rtd_init(&rtd), goto fatal_error ); - - /* Add all peers in OPEN state */ - CHECK_FCT_DO( pthread_rwlock_wrlock(&fd_g_activ_peers_rw), goto fatal_error ); - for (li = fd_g_activ_peers.next; li != &fd_g_activ_peers; li = li->next) { - struct fd_peer * p = (struct fd_peer *)li->o; - CHECK_FCT_DO( fd_rtd_candidate_add(rtd, p->p_hdr.info.pi_diamid), goto fatal_error); - } - CHECK_FCT_DO( pthread_rwlock_unlock(&fd_g_activ_peers_rw), goto fatal_error ); - - /* Now let's remove all peers from the Route-Records */ - CHECK_FCT_DO( fd_msg_browse(msg, MSG_BRW_FIRST_CHILD, &avp, NULL), goto fatal_error ); - while (avp) { - struct avp_hdr * ahdr; - CHECK_FCT_DO( fd_msg_avp_hdr( avp, &ahdr ), goto fatal_error ); - - if ((ahdr->avp_code == AC_ROUTE_RECORD) && (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) ) { - /* Parse this AVP */ - CHECK_FCT_DO( fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, NULL ), goto fatal_error ); - ASSERT( ahdr->avp_value ); - /* Remove this value from the list */ - fd_rtd_candidate_del(rtd, (char *)ahdr->avp_value->os.data, ahdr->avp_value->os.len); - } - - /* Go to next AVP */ - CHECK_FCT_DO( fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL), goto fatal_error ); - } - } - - /* Note: we reset the scores and pass the message to the callbacks, maybe we could re-use the saved scores when we have received an error ? */ - - /* Ok, we have our list in rtd now, let's (re)initialize the scores */ - fd_rtd_candidate_extract(rtd, &candidates); - - /* Pass the list to registered callbacks (even if it is empty) */ - { - CHECK_FCT_DO( pthread_rwlock_rdlock( &rt_out_lock ), goto fatal_error ); - pthread_cleanup_push( fd_cleanup_rwlock, &rt_out_lock ); - - /* We call the cb by reverse priority order */ - for ( li = rt_out_list.prev ; li != &rt_out_list ; li = li->prev ) { - struct rt_hdl * rh = (struct rt_hdl *)li; - - TRACE_DEBUG(ANNOYING, "Calling next OUT callback on %p : %p (prio %d)", msg, rh->rt_out_cb, rh->prio); - CHECK_FCT_DO( (*rh->rt_out_cb)(rh->cbdata, msg, candidates), - { - TRACE_DEBUG(INFO, "An OUT routing callback returned an error ! Message discarded."); - fd_msg_dump_walk(INFO, msg); - fd_msg_free(msg); - msg = NULL; - break; - } ); - } - - pthread_cleanup_pop(0); - CHECK_FCT_DO( pthread_rwlock_unlock( &rt_out_lock ), goto fatal_error ); - - /* If an error occurred, skip to the next message */ - if (!msg) - continue; - } - - /* Order the candidate peers by score attributed by the callbacks */ - CHECK_FCT_DO( fd_rtd_candidate_reorder(candidates), goto fatal_error ); - - /* Save the routing information in the message */ - CHECK_FCT_DO( fd_msg_rt_associate ( msg, &rtd ), goto fatal_error ); - - /* Now try sending the message */ - for (li = candidates->prev; li != candidates; li = li->prev) { - struct fd_peer * peer; - - c = (struct rtd_candidate *) li; - - /* Stop when we have reached the end of valid candidates */ - if (c->score < 0) - break; - - /* Search for the peer */ - CHECK_FCT_DO( fd_peer_getbyid( c->diamid, (void *)&peer ), goto fatal_error ); - - if (peer && (peer->p_hdr.info.runtime.pir_state == STATE_OPEN)) { - /* Send to this one */ - CHECK_FCT_DO( fd_out_send(&msg, NULL, peer), continue ); - /* If the sending was successful */ - break; - } - } - - /* If the message has not been sent, return an error */ - if (msg) { - TRACE_DEBUG(INFO, "Could not send the following message, replying with UNABLE_TO_DELIVER"); - fd_msg_dump_walk(INFO, msg); - return_error( msg, "DIAMETER_UNABLE_TO_DELIVER", "No suitable candidate to route the message to", NULL); - } - - /* We're done with this message */ - - } while (1); - -fatal_error: - TRACE_DEBUG(INFO, "An error occurred in routing module! OUT thread is terminating..."); - CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); - -end: - /* Mark the thread as terminated */ - pthread_cleanup_pop(1); - return NULL; -} - - -/********************************************************************************/ -/* Some default routing callbacks */ -/********************************************************************************/ - -/* First OUT callback: prevent sending to peers that do not support the message application */ +/* Prevent sending to peers that do not support the message application */ static int dont_send_if_no_common_app(void * cbdata, struct msg * msg, struct fd_list * candidates) { struct fd_list * li; @@ -1006,7 +219,7 @@ return 0; } -/* Second OUT callback: Detect if the Destination-Host and Destination-Realm match the peer */ +/* Detect if the Destination-Host and Destination-Realm match the peer */ static int score_destination_avp(void * cbdata, struct msg * msg, struct fd_list * candidates) { struct fd_list * li; @@ -1073,10 +286,790 @@ } /********************************************************************************/ +/* Helper functions */ +/********************************************************************************/ + +/* Find (first) '!' and '@' positions in a UTF-8 encoded string (User-Name AVP value) */ +static void nai_get_indexes(union avp_value * un, int * excl_idx, int * at_idx) +{ + int i; + + TRACE_ENTRY("%p %p %p", un, excl_idx, at_idx); + CHECK_PARAMS_DO( un && excl_idx, return ); + *excl_idx = 0; + + /* Search if there is a '!' before any '@' -- do we need to check it contains a '.' ? */ + for (i = 0; i < un->os.len; i++) { + /* The '!' marks the decorated NAI */ + if ( un->os.data[i] == (unsigned char) '!' ) { + if (!*excl_idx) + *excl_idx = i; + if (!at_idx) + return; + } + /* If we reach the realm part, we can stop */ + if ( un->os.data[i] == (unsigned char) '@' ) { + if (at_idx) + *at_idx = i; + break; + } + /* Skip escaped characters */ + if ( un->os.data[i] == (unsigned char) '\\' ) { + i++; + continue; + } + /* Skip UTF-8 characters spanning on several bytes */ + if ( (un->os.data[i] & 0xF8) == 0xF0 ) { /* 11110zzz */ + i += 3; + continue; + } + if ( (un->os.data[i] & 0xF0) == 0xE0 ) { /* 1110yyyy */ + i += 2; + continue; + } + if ( (un->os.data[i] & 0xE0) == 0xC0 ) { /* 110yyyxx */ + i += 1; + continue; + } + } + + return; +} + +/* Test if a User-Name AVP contains a Decorated NAI -- RFC4282, draft-ietf-dime-nai-routing-04 */ +static int is_decorated_NAI(union avp_value * un) +{ + int i; + TRACE_ENTRY("%p", un); + + /* If there was no User-Name, we return false */ + if (un == NULL) + return 0; + + nai_get_indexes(un, &i, NULL); + + return i; +} + +/* Create new User-Name and Destination-Realm values */ +static int process_decorated_NAI(union avp_value * un, union avp_value * dr) +{ + int i, at_idx = 0, sep_idx = 0; + unsigned char * old_un; + TRACE_ENTRY("%p %p", un, dr); + CHECK_PARAMS(un && dr); + + /* Save the decorated User-Name, for example 'homerealm.example.net!user@otherrealm.example.net' */ + old_un = un->os.data; + + /* Search the positions of the first '!' and the '@' in the string */ + nai_get_indexes(un, &sep_idx, &at_idx); + CHECK_PARAMS( 0 < sep_idx < at_idx < un->os.len); + + /* Create the new User-Name value */ + CHECK_MALLOC( un->os.data = malloc( at_idx ) ); + memcpy( un->os.data, old_un + sep_idx + 1, at_idx - sep_idx ); /* user@ */ + memcpy( un->os.data + at_idx - sep_idx, old_un, sep_idx ); /* homerealm.example.net */ + + /* Create the new Destination-Realm value */ + CHECK_MALLOC( dr->os.data = realloc(dr->os.data, sep_idx) ); + memcpy( dr->os.data, old_un, sep_idx ); + dr->os.len = sep_idx; + + TRACE_DEBUG(FULL, "Processed Decorated NAI : '%.*s' became '%.*s' (%.*s)", + un->os.len, old_un, + at_idx, un->os.data, + dr->os.len, dr->os.data); + + un->os.len = at_idx; + free(old_un); + + return 0; +} + +/* Function to return an error to an incoming request */ +static int return_error(struct msg ** pmsg, char * error_code, char * error_message, struct avp * failedavp) +{ + struct fd_peer * peer; + int is_loc = 0; + + /* Get the source of the message */ + { + char * id; + CHECK_FCT( fd_msg_source_get( *pmsg, &id ) ); + + if (id == NULL) { + is_loc = 1; /* The message was issued locally */ + } else { + + /* Search the peer with this id */ + CHECK_FCT( fd_peer_getbyid( id, (void *)&peer ) ); + + if (!peer) { + TRACE_DEBUG(INFO, "Unable to send error '%s' to deleted peer '%s' in reply to:", error_code, id); + fd_msg_dump_walk(INFO, *pmsg); + fd_msg_free(*pmsg); + *pmsg = NULL; + return 0; + } + } + } + + /* Create the error message */ + CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, pmsg, MSGFL_ANSW_ERROR ) ); + + /* Set the error code */ + CHECK_FCT( fd_msg_rescode_set(*pmsg, error_code, error_message, failedavp, 1 ) ); + + /* Send the answer */ + if (is_loc) { + CHECK_FCT( fd_fifo_post(fd_g_incoming, pmsg) ); + } else { + CHECK_FCT( fd_out_send(pmsg, NULL, peer) ); + } + + /* Done */ + return 0; +} + + +/****************************************************************************/ +/* Second part : threads moving messages in the daemon */ +/****************************************************************************/ + +/* These are the functions of each threads: dispatch & routing */ +/* The DISPATCH message processing */ +static int msg_dispatch(struct msg ** pmsg) +{ + struct msg_hdr * hdr; + int is_req = 0, ret; + struct session * sess; + enum disp_action action; + const char * ec = NULL; + const char * em = NULL; + + /* Read the message header */ + CHECK_FCT( fd_msg_hdr(*pmsg, &hdr) ); + is_req = hdr->msg_flags & CMD_FLAG_REQUEST; + + /* Note: if the message is for local delivery, we should test for duplicate + (draft-asveren-dime-dupcons-00). This may conflict with path validation decisions, no clear answer yet */ + + /* At this point, we need to understand the message content, so parse it */ + CHECK_FCT_DO( ret = fd_msg_parse_or_error( pmsg ), + { + /* in case of error, the message is already dump'd */ + if ((ret == EBADMSG) && (*pmsg != NULL)) { + /* msg now contains the answer message to send back */ + CHECK_FCT( fd_fifo_post(fd_g_outgoing, pmsg) ); + } + if (*pmsg) { /* another error happen'd */ + TRACE_DEBUG(INFO, "An unexpected error occurred (%s), discarding a message:", strerror(ret)); + fd_msg_dump_walk(INFO, *pmsg); + CHECK_FCT_DO( fd_msg_free(*pmsg), /* continue */); + *pmsg = NULL; + } + /* We're done with this one */ + return 0; + } ); + + /* First, if the original request was registered with a callback and we receive the answer, call it. */ + if ( ! is_req ) { + struct msg * qry; + void (*anscb)(void *, struct msg **) = NULL; + void * data = NULL; + + /* Retrieve the corresponding query */ + CHECK_FCT( fd_msg_answ_getq( *pmsg, &qry ) ); + + /* Retrieve any registered handler */ + CHECK_FCT( fd_msg_anscb_get( qry, &anscb, &data ) ); + + /* If a callback was registered, pass the message to it */ + if (anscb != NULL) { + + TRACE_DEBUG(FULL, "Calling callback registered when query was sent (%p, %p)", anscb, data); + (*anscb)(data, pmsg); + + /* If the message is processed, we're done */ + if (*pmsg == NULL) { + return 0; + } + } + } + + /* Retrieve the session of the message */ + CHECK_FCT( fd_msg_sess_get(fd_g_config->cnf_dict, *pmsg, &sess, NULL) ); + + /* Now, call any callback registered for the message */ + CHECK_FCT( fd_msg_dispatch ( pmsg, sess, &action, &ec) ); + + /* Now, act depending on msg and action and ec */ + if (*pmsg) + switch ( action ) { + case DISP_ACT_CONT: + /* No callback has handled the message, let's reply with a generic error */ + em = "The message was not handled by any extension callback"; + ec = "DIAMETER_COMMAND_UNSUPPORTED"; + + case DISP_ACT_ERROR: + /* We have a problem with delivering the message */ + if (ec == NULL) { + ec = "DIAMETER_UNABLE_TO_COMPLY"; + } + + if (!is_req) { + TRACE_DEBUG(INFO, "Received an answer to a localy issued query, but no handler processed this answer!"); + fd_msg_dump_walk(INFO, *pmsg); + fd_msg_free(*pmsg); + *pmsg = NULL; + break; + } + + /* Create an answer with the error code and message */ + CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, pmsg, 0 ) ); + CHECK_FCT( fd_msg_rescode_set(*pmsg, (char *)ec, (char *)em, NULL, 1 ) ); + + case DISP_ACT_SEND: + /* Now, send the message */ + CHECK_FCT( fd_fifo_post(fd_g_outgoing, pmsg) ); + } + + /* We're done with dispatching this message */ + return 0; +} + +/* The ROUTING-IN message processing */ +static int msg_rt_in(struct msg ** pmsg) +{ + struct msg_hdr * hdr; + int is_req = 0; + int is_err = 0; + char * qry_src = NULL; + + /* Read the message header */ + CHECK_FCT( fd_msg_hdr(*pmsg, &hdr) ); + is_req = hdr->msg_flags & CMD_FLAG_REQUEST; + is_err = hdr->msg_flags & CMD_FLAG_ERROR; + + /* Handle incorrect bits */ + if (is_req && is_err) { + CHECK_FCT( return_error( pmsg, "DIAMETER_INVALID_HDR_BITS", "R & E bits were set", NULL) ); + return 0; + } + + /* If it is a request, we must analyze its content to decide what we do with it */ + if (is_req) { + struct avp * avp, *un = NULL; + union avp_value * un_val = NULL, *dr_val = NULL; + enum status { UNKNOWN, YES, NO }; + /* Are we Destination-Host? */ + enum status is_dest_host = UNKNOWN; + /* Are we Destination-Realm? */ + enum status is_dest_realm = UNKNOWN; + /* Do we support the application of the message? */ + enum status is_local_app = UNKNOWN; + + /* Check if we have local support for the message application */ + if ( (hdr->msg_appl == 0) || (hdr->msg_appl == AI_RELAY) ) { + TRACE_DEBUG(INFO, "Received a routable message with application id 0, returning DIAMETER_APPLICATION_UNSUPPORTED"); + CHECK_FCT( return_error( pmsg, "DIAMETER_APPLICATION_UNSUPPORTED", "Routable message with application id 0 or relay", NULL) ); + return 0; + } else { + struct fd_app * app; + CHECK_FCT( fd_app_check(&fd_g_config->cnf_apps, hdr->msg_appl, &app) ); + is_local_app = (app ? YES : NO); + } + + /* Parse the message for Dest-Host and Dest-Realm */ + CHECK_FCT( fd_msg_browse(*pmsg, MSG_BRW_FIRST_CHILD, &avp, NULL) ); + while (avp) { + struct avp_hdr * ahdr; + struct fd_pei error_info; + int ret; + CHECK_FCT( fd_msg_avp_hdr( avp, &ahdr ) ); + + if (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) { + switch (ahdr->avp_code) { + case AC_DESTINATION_HOST: + /* Parse this AVP */ + CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ), + { + if (error_info.pei_errcode) { + CHECK_FCT( return_error( pmsg, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) ); + return 0; + } else { + return ret; + } + } ); + ASSERT( ahdr->avp_value ); + /* Compare the Destination-Host AVP of the message with our identity */ + if (ahdr->avp_value->os.len != fd_g_config->cnf_diamid_len) { + is_dest_host = NO; + } else { + is_dest_host = (strncasecmp(fd_g_config->cnf_diamid, (char *)ahdr->avp_value->os.data, fd_g_config->cnf_diamid_len) + ? NO : YES); + } + break; + + case AC_DESTINATION_REALM: + /* Parse this AVP */ + CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ), + { + if (error_info.pei_errcode) { + CHECK_FCT( return_error( pmsg, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) ); + return 0; + } else { + return ret; + } + } ); + ASSERT( ahdr->avp_value ); + dr_val = ahdr->avp_value; + /* Compare the Destination-Realm AVP of the message with our identity */ + if (ahdr->avp_value->os.len != fd_g_config->cnf_diamrlm_len) { + is_dest_realm = NO; + } else { + is_dest_realm = (strncasecmp(fd_g_config->cnf_diamrlm, (char *)ahdr->avp_value->os.data, fd_g_config->cnf_diamrlm_len) + ? NO : YES); + } + break; + + case AC_USER_NAME: + /* Parse this AVP */ + CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ), + { + if (error_info.pei_errcode) { + CHECK_FCT( return_error( pmsg, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) ); + return 0; + } else { + return ret; + } + } ); + ASSERT( ahdr->avp_value ); + un = avp; + un_val = ahdr->avp_value; + break; + } + } + + if ((is_dest_host != UNKNOWN) && (is_dest_realm != UNKNOWN) && un) + break; + + /* Go to next AVP */ + CHECK_FCT( fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL) ); + } + + /* OK, now decide what we do with the request */ + + /* Handle the missing routing AVPs first */ + if ( is_dest_realm == UNKNOWN ) { + CHECK_FCT( return_error( pmsg, "DIAMETER_COMMAND_UNSUPPORTED", "Non-routable message not supported (invalid bit ? missing Destination-Realm ?)", NULL) ); + return 0; + } + + /* If we are listed as Destination-Host */ + if (is_dest_host == YES) { + if (is_local_app == YES) { + /* Ok, give the message to the dispatch thread */ + CHECK_FCT( fd_fifo_post(fd_g_local, pmsg) ); + } else { + /* We don't support the application, reply an error */ + CHECK_FCT( return_error( pmsg, "DIAMETER_APPLICATION_UNSUPPORTED", NULL, NULL) ); + } + return 0; + } + + /* If the message is explicitely for someone else */ + if ((is_dest_host == NO) || (is_dest_realm == NO)) { + if (fd_g_config->cnf_flags.no_fwd) { + CHECK_FCT( return_error( pmsg, "DIAMETER_UNABLE_TO_DELIVER", "This peer is not an agent", NULL) ); + return 0; + } + } else { + /* Destination-Host was not set, and Destination-Realm is matching : we may handle or pass to a fellow peer */ + + /* test for decorated NAI (draft-ietf-dime-nai-routing-04 section 4.4) */ + if (is_decorated_NAI(un_val)) { + /* Handle the decorated NAI */ + CHECK_FCT_DO( process_decorated_NAI(un_val, dr_val), + { + /* If the process failed, we assume it is because of the AVP format */ + CHECK_FCT( return_error( pmsg, "DIAMETER_INVALID_AVP_VALUE", "Failed to process decorated NAI", un) ); + return 0; + } ); + + /* We have transformed the AVP, now submit it again in the queue */ + CHECK_FCT(fd_fifo_post(fd_g_incoming, pmsg) ); + return 0; + } + + if (is_local_app == YES) { + /* Handle localy since we are able to */ + CHECK_FCT(fd_fifo_post(fd_g_local, pmsg) ); + return 0; + } + + if (fd_g_config->cnf_flags.no_fwd) { + /* We return an error */ + CHECK_FCT( return_error( pmsg, "DIAMETER_APPLICATION_UNSUPPORTED", NULL, NULL) ); + return 0; + } + } + + /* From that point, for requests, we will call the registered callbacks, then forward to another peer */ + + } else { + /* The message is an answer */ + struct msg * qry; + + /* Retrieve the corresponding query and its origin */ + CHECK_FCT( fd_msg_answ_getq( *pmsg, &qry ) ); + CHECK_FCT( fd_msg_source_get( qry, &qry_src ) ); + + if ((!qry_src) && (!is_err)) { + /* The message is a normal answer to a request issued localy, we do not call the callbacks chain on it. */ + CHECK_FCT(fd_fifo_post(fd_g_local, pmsg) ); + return 0; + } + + /* From that point, for answers, we will call the registered callbacks, then pass it to the dispatch module or forward it */ + } + + /* Call all registered callbacks for this message */ + { + struct fd_list * li; + + CHECK_FCT( pthread_rwlock_rdlock( &rt_fwd_lock ) ); + pthread_cleanup_push( fd_cleanup_rwlock, &rt_fwd_lock ); + + /* requests: dir = 1 & 2 => in order; answers = 3 & 2 => in reverse order */ + for ( li = (is_req ? rt_fwd_list.next : rt_fwd_list.prev) ; *pmsg && (li != &rt_fwd_list) ; li = (is_req ? li->next : li->prev) ) { + struct rt_hdl * rh = (struct rt_hdl *)li; + + if (is_req && (rh->dir > RT_FWD_ALL)) + break; + if ((!is_req) && (rh->dir < RT_FWD_ALL)) + break; + + /* Ok, call this cb */ + TRACE_DEBUG(ANNOYING, "Calling next FWD callback on %p : %p", *pmsg, rh->rt_fwd_cb); + CHECK_FCT_DO( (*rh->rt_fwd_cb)(rh->cbdata, pmsg), + { + TRACE_DEBUG(INFO, "A FWD routing callback returned an error, message discarded."); + fd_msg_dump_walk(INFO, *pmsg); + fd_msg_free(*pmsg); + *pmsg = NULL; + } ); + } + + pthread_cleanup_pop(0); + CHECK_FCT( pthread_rwlock_unlock( &rt_fwd_lock ) ); + + /* If a callback has handled the message, we stop now */ + if (!*pmsg) + return 0; + } + + /* Now pass the message to the next step: either forward to another peer, or dispatch to local extensions */ + if (is_req || qry_src) { + CHECK_FCT(fd_fifo_post(fd_g_outgoing, pmsg) ); + } else { + CHECK_FCT(fd_fifo_post(fd_g_local, pmsg) ); + } + + /* We're done with this message */ + return 0; +} + + +/* The ROUTING-OUT message processing */ +static int msg_rt_out(struct msg ** pmsg) +{ + struct rt_data * rtd = NULL; + struct msg_hdr * hdr; + int is_req = 0; + int ret; + struct fd_list * li, *candidates; + struct avp * avp; + struct rtd_candidate * c; + + /* Read the message header */ + CHECK_FCT( fd_msg_hdr(*pmsg, &hdr) ); + is_req = hdr->msg_flags & CMD_FLAG_REQUEST; + + /* For answers, the routing is very easy */ + if ( ! is_req ) { + struct msg * qry; + char * qry_src = NULL; + struct msg_hdr * qry_hdr; + struct fd_peer * peer = NULL; + + /* Retrieve the corresponding query and its origin */ + CHECK_FCT( fd_msg_answ_getq( *pmsg, &qry ) ); + CHECK_FCT( fd_msg_source_get( qry, &qry_src ) ); + + ASSERT( qry_src ); /* if it is NULL, the message should have been in the LOCAL queue! */ + + /* Find the peer corresponding to this name */ + CHECK_FCT( fd_peer_getbyid( qry_src, (void *) &peer ) ); + if ((!peer) || (peer->p_hdr.info.runtime.pir_state != STATE_OPEN)) { + TRACE_DEBUG(INFO, "Unable to forward answer message to peer '%s', deleted or not in OPEN state.", qry_src); + fd_msg_dump_walk(INFO, *pmsg); + fd_msg_free(*pmsg); + *pmsg = NULL; + return 0; + } + + /* We must restore the hop-by-hop id */ + CHECK_FCT( fd_msg_hdr(qry, &qry_hdr) ); + hdr->msg_hbhid = qry_hdr->msg_hbhid; + + /* Push the message into this peer */ + CHECK_FCT( fd_out_send(pmsg, NULL, peer) ); + + /* We're done with this answer */ + return 0; + } + + /* From that point, the message is a request */ + + /* Get the routing data out of the message if any (in case of re-transmit) */ + CHECK_FCT( fd_msg_rt_get ( *pmsg, &rtd ) ); + + /* If there is no routing data already, let's create it */ + if (rtd == NULL) { + CHECK_FCT( fd_rtd_init(&rtd) ); + + /* Add all peers in OPEN state */ + CHECK_FCT( pthread_rwlock_rdlock(&fd_g_activ_peers_rw) ); + for (li = fd_g_activ_peers.next; li != &fd_g_activ_peers; li = li->next) { + struct fd_peer * p = (struct fd_peer *)li->o; + CHECK_FCT_DO( ret = fd_rtd_candidate_add(rtd, p->p_hdr.info.pi_diamid), { CHECK_FCT_DO( pthread_rwlock_unlock(&fd_g_activ_peers_rw), ); return ret; } ); + } + CHECK_FCT( pthread_rwlock_unlock(&fd_g_activ_peers_rw) ); + + /* Now let's remove all peers from the Route-Records */ + CHECK_FCT( fd_msg_browse(*pmsg, MSG_BRW_FIRST_CHILD, &avp, NULL) ); + while (avp) { + struct avp_hdr * ahdr; + struct fd_pei error_info; + CHECK_FCT( fd_msg_avp_hdr( avp, &ahdr ) ); + + if ((ahdr->avp_code == AC_ROUTE_RECORD) && (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) ) { + /* Parse this AVP */ + CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ), + { + if (error_info.pei_errcode) { + CHECK_FCT( return_error( pmsg, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) ); + return 0; + } else { + return ret; + } + } ); + ASSERT( ahdr->avp_value ); + /* Remove this value from the list */ + fd_rtd_candidate_del(rtd, (char *)ahdr->avp_value->os.data, ahdr->avp_value->os.len); + } + + /* Go to next AVP */ + CHECK_FCT( fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL) ); + } + } + + /* Note: we reset the scores and pass the message to the callbacks, maybe we could re-use the saved scores when we have received an error ? */ + + /* Ok, we have our list in rtd now, let's (re)initialize the scores */ + fd_rtd_candidate_extract(rtd, &candidates, FD_SCORE_INI); + + /* Pass the list to registered callbacks (even if it is empty) */ + { + CHECK_FCT( pthread_rwlock_rdlock( &rt_out_lock ) ); + pthread_cleanup_push( fd_cleanup_rwlock, &rt_out_lock ); + + /* We call the cb by reverse priority order */ + for ( li = rt_out_list.prev ; li != &rt_out_list ; li = li->prev ) { + struct rt_hdl * rh = (struct rt_hdl *)li; + + TRACE_DEBUG(ANNOYING, "Calling next OUT callback on %p : %p (prio %d)", *pmsg, rh->rt_out_cb, rh->prio); + CHECK_FCT_DO( ret = (*rh->rt_out_cb)(rh->cbdata, *pmsg, candidates), + { + TRACE_DEBUG(INFO, "An OUT routing callback returned an error (%s) ! Message discarded.", strerror(ret)); + fd_msg_dump_walk(INFO, *pmsg); + fd_msg_free(*pmsg); + *pmsg = NULL; + break; + } ); + } + + pthread_cleanup_pop(0); + CHECK_FCT( pthread_rwlock_unlock( &rt_out_lock ) ); + + /* If an error occurred, skip to the next message */ + if (! *pmsg) { + if (rtd) + fd_rtd_free(&rtd); + return 0; + } + } + + /* Order the candidate peers by score attributed by the callbacks */ + CHECK_FCT( fd_rtd_candidate_reorder(candidates) ); + + /* Save the routing information in the message */ + CHECK_FCT( fd_msg_rt_associate ( *pmsg, &rtd ) ); + + /* Now try sending the message */ + for (li = candidates->prev; li != candidates; li = li->prev) { + struct fd_peer * peer; + + c = (struct rtd_candidate *) li; + + /* Stop when we have reached the end of valid candidates */ + if (c->score < 0) + break; + + /* Search for the peer */ + CHECK_FCT( fd_peer_getbyid( c->diamid, (void *)&peer ) ); + + if (peer && (peer->p_hdr.info.runtime.pir_state == STATE_OPEN)) { + /* Send to this one */ + CHECK_FCT_DO( fd_out_send(pmsg, NULL, peer), continue ); + + /* If the sending was successful */ + break; + } + } + + /* If the message has not been sent, return an error */ + if (*pmsg) { + TRACE_DEBUG(INFO, "Could not send the following message, replying with UNABLE_TO_DELIVER"); + fd_msg_dump_walk(INFO, *pmsg); + return_error( pmsg, "DIAMETER_UNABLE_TO_DELIVER", "No suitable candidate to route the message to", NULL); + } + + /* We're done with this message */ + + return 0; +} + + +/********************************************************************************/ +/* Management of the threads */ +/********************************************************************************/ + +/* Note: in the first version, we only create one thread of each kind. + We could improve the scalability by using the threshold feature of the queues + to create additional threads if a queue is filling up, or at least giving a configurable + number of threads of each kind. + */ + +/* Control of the threads */ +static enum { RUN = 0, STOP = 1 } order_val = RUN; +static pthread_mutex_t order_lock = PTHREAD_MUTEX_INITIALIZER; + +/* Threads report their status */ +enum thread_state { INITIAL = 0, RUNNING = 1, TERMINATED = 2 }; +static void cleanup_state(void * state_loc) +{ + if (state_loc) + *(enum thread_state *)state_loc = TERMINATED; +} + +/* This is the common thread code (same for routing and dispatching) */ +static void * process_thr(void * arg, int (*action_cb)(struct msg ** pmsg), struct fifo * queue, char * action_name) +{ + TRACE_ENTRY("%p %p %p %p", arg, action_cb, queue, action_name); + + /* Set the thread name */ + { + char buf[48]; + snprintf(buf, sizeof(buf), "%s (%p)", action_name, arg); + fd_log_threadname ( buf ); + } + + /* The thread reports its status when canceled */ + CHECK_PARAMS_DO(arg, return NULL); + pthread_cleanup_push( cleanup_state, arg ); + + /* Mark the thread running */ + *(enum thread_state *)arg = RUNNING; + + do { + struct msg * msg; + + /* Test the current order */ + { + int must_stop; + CHECK_POSIX_DO( pthread_mutex_lock(&order_lock), goto end ); /* we lock to flush the caches */ + must_stop = (order_val == STOP); + CHECK_POSIX_DO( pthread_mutex_unlock(&order_lock), goto end ); + if (must_stop) + goto end; + + pthread_testcancel(); + } + + /* Ok, we are allowed to run */ + + /* Get the next message from the queue */ + { + int ret; + CHECK_FCT_DO( ret = fd_fifo_get ( queue, &msg ), + { + if (ret == EPIPE) + /* The queue was destroyed, we are probably exiting */ + goto end; + /* another error occurred */ + goto fatal_error; + } ); + } + + if (TRACE_BOOL(FULL)) { + TRACE_DEBUG(FULL, "Picked next message"); + fd_msg_dump_one(ANNOYING, msg); + } + + /* Now process the message */ + CHECK_FCT_DO( (*action_cb)(&msg), goto fatal_error); + + /* We're done with this message */ + + } while (1); + +fatal_error: + TRACE_DEBUG(INFO, "An unrecoverable error occurred, %s thread is terminating...", action_name); + CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); + +end: + /* Mark the thread as terminated */ + pthread_cleanup_pop(1); + return NULL; +} + +/* The dispatch thread */ +static void * dispatch_thr(void * arg) +{ + return process_thr(arg, msg_dispatch, fd_g_local, "Dispatch"); +} + +/* The (routing-in) thread -- see description in freeDiameter.h */ +static void * routing_in_thr(void * arg) +{ + return process_thr(arg, msg_rt_in, fd_g_incoming, "Routing-IN"); +} + +/* The (routing-out) thread -- see description in freeDiameter.h */ +static void * routing_out_thr(void * arg) +{ + return process_thr(arg, msg_rt_out, fd_g_outgoing, "Routing-OUT"); +} + + +/********************************************************************************/ /* The functions for the other files */ /********************************************************************************/ -/* Later: TODO("Set thresholds on queues"); */ +/* Later: make this more dynamic */ static pthread_t dispatch = (pthread_t)NULL; static enum thread_state disp_state = INITIAL; @@ -1111,15 +1104,14 @@ return 0; } -/* Stop the thread after up to one second of wait */ -int fd_rtdisp_fini(void) +static void stop_thread_delayed(enum thread_state *st, pthread_t * thr, char * th_name) { - /* Destroy the local queue */ - CHECK_FCT_DO( fd_queues_fini_disp(), /* ignore */); + TRACE_ENTRY("%p %p", st, thr); + CHECK_PARAMS_DO(st && thr, return); /* Wait for a second for the thread to complete, by monitoring my_state */ - if (disp_state != TERMINATED) { - TRACE_DEBUG(INFO, "Waiting for the dispatch thread to have a chance to terminate"); + if (*st != TERMINATED) { + TRACE_DEBUG(INFO, "Waiting for the %s thread to have a chance to terminate", th_name); do { struct timespec ts, ts_final; @@ -1129,7 +1121,7 @@ ts_final.tv_nsec = ts.tv_nsec; while (TS_IS_INFERIOR( &ts, &ts_final )) { - if (disp_state == TERMINATED) + if (*st == TERMINATED) break; usleep(100000); @@ -1139,10 +1131,31 @@ } /* Now stop the thread and reclaim its resources */ - CHECK_FCT_DO( fd_thr_term(&dispatch ), /* continue */); + CHECK_FCT_DO( fd_thr_term(thr ), /* continue */); + +} + +/* Stop the thread after up to one second of wait */ +int fd_rtdisp_fini(void) +{ + /* Destroy the incoming queue */ + CHECK_FCT_DO( fd_queues_fini(&fd_g_incoming), /* ignore */); + + /* Stop the routing IN thread */ + stop_thread_delayed(&in_state, &rt_in, "IN routing"); + /* Destroy the outgoing queue */ + CHECK_FCT_DO( fd_queues_fini(&fd_g_outgoing), /* ignore */); - TODO("Add terminating the routing threads"); + /* Stop the routing OUT thread */ + stop_thread_delayed(&out_state, &rt_out, "OUT routing"); + + /* Destroy the local queue */ + CHECK_FCT_DO( fd_queues_fini(&fd_g_local), /* ignore */); + + /* Stop the Dispatch thread */ + stop_thread_delayed(&disp_state, &dispatch, "Dispatching"); + return 0; } @@ -1163,6 +1176,10 @@ } +/********************************************************************************/ +/* For extensiosn to register a new appl */ +/********************************************************************************/ + /* Add an application into the peer's supported apps */ int fd_disp_app_support ( struct dict_object * app, struct dict_object * vendor, int auth, int acct ) {
--- a/include/freeDiameter/freeDiameter.h Wed Dec 09 19:02:31 2009 +0900 +++ b/include/freeDiameter/freeDiameter.h Thu Dec 10 14:15:04 2009 +0900 @@ -561,6 +561,7 @@ enum fd_rt_out_score { FD_SCORE_NO_DELIVERY = -70, /* We should not send this message to this candidate */ + FD_SCORE_INI = -2, /* All candidates are initialized with this value */ FD_SCORE_LOAD_BALANCE = 1, /* Use this to differentiate between several peers with the same score */ FD_SCORE_DEFAULT = 5, /* The peer is a default route for all messages */ FD_SCORE_DEFAULT_REALM = 10, /* The peer is a default route for this realm */
--- a/include/freeDiameter/libfreeDiameter.h Wed Dec 09 19:02:31 2009 +0900 +++ b/include/freeDiameter/libfreeDiameter.h Thu Dec 10 14:15:04 2009 +0900 @@ -1632,7 +1632,7 @@ void fd_rtd_candidate_del(struct rt_data * rtd, char * peerid, size_t sz /* if !0, peerid does not need to be \0 terminated */); /* Extract the list of valid candidates, and initialize their scores to 0 */ -void fd_rtd_candidate_extract(struct rt_data * rtd, struct fd_list ** candidates); +void fd_rtd_candidate_extract(struct rt_data * rtd, struct fd_list ** candidates, int ini_score); /* If a peer returned a protocol error for this message, save it so that we don't try to send it there again */ int fd_rtd_error_add(struct rt_data * rtd, char * sentto, uint8_t * origin, size_t originsz, uint32_t rcode); @@ -2605,7 +2605,7 @@ * * PARAMETERS: * queue : The queue from which the element must be retrieved. - * msg : On return, the message is stored here. + * item : On return, the first element of the queue is stored here. * * DESCRIPTION: * This function is similar to fd_fifo_get, except that it will not block if
--- a/libfreeDiameter/rt_data.c Wed Dec 09 19:02:31 2009 +0900 +++ b/libfreeDiameter/rt_data.c Thu Dec 10 14:15:04 2009 +0900 @@ -226,7 +226,7 @@ } /* Extract the list of valid candidates, and initialize their scores to 0 */ -void fd_rtd_candidate_extract(struct rt_data * rtd, struct fd_list ** candidates) +void fd_rtd_candidate_extract(struct rt_data * rtd, struct fd_list ** candidates, int ini_score) { TRACE_ENTRY("%p %p", rtd, candidates); CHECK_PARAMS_DO( candidates, return ); @@ -235,11 +235,11 @@ *candidates = &rtd->candidates; if (rtd->extracted) { - /* Reset all scores to 0 */ + /* Reset all scores to INITIAL score */ struct fd_list * li; for (li = rtd->candidates.next; li != &rtd->candidates; li = li->next) { struct rtd_candidate * c = (struct rtd_candidate *) li; - c->score = 0; + c->score = ini_score; } }