Navigation


Changeset 124:cc42d8607114 in freeDiameter


Ignore:
Timestamp:
Dec 10, 2009, 2:15:04 PM (14 years ago)
Author:
Sebastien Decugis <sdecugis@nict.go.jp>
Branch:
default
Phase:
public
Message:

Completed cleanups of queues when the daemon is stopping

Files:
6 edited

Legend:

Unmodified
Added
Removed
  • freeDiameter/fD.h

    r123 r124  
    100100/* Message queues */
    101101int fd_queues_init(void);
    102 int fd_queues_fini_rt(void);
    103 int fd_queues_fini_disp(void);
     102int fd_queues_fini(struct fifo ** queue);
    104103
    105104/* Create all the dictionary objects defined in the Diameter base RFC. */
  • freeDiameter/queues.c

    r123 r124  
    5151}
    5252
    53 /* Destroy the routing message queues */
    54 int fd_queues_fini_rt(void)
     53/* Destroy a queue after emptying it (and dumping the content) */
     54int fd_queues_fini(struct fifo ** queue)
    5555{
    56         TRACE_ENTRY();
     56        struct msg * msg;
     57        int ret = 0;
    5758       
     59        TRACE_ENTRY("%p", queue);
     60       
     61        /* Note : the threads that post into this queue should already been stopped before this !!! */
     62
    5863        /* Empty all contents */
    59         TODO("Empty all contents (dump to log file ?)");
     64        while (1) {
     65                /* Check if there is a message in the queue */
     66                ret = fd_fifo_tryget(*queue, &msg);
     67                if (ret == EWOULDBLOCK)
     68                        break;
     69                CHECK_FCT(ret);
     70               
     71                /* We got one! */
     72                fd_log_debug("The following message is lost because the daemon is stopping:\n");
     73                fd_msg_dump_walk(NONE, msg);
     74                fd_msg_free(msg);
     75        }
    6076       
    61         /* Now, delete the queues */
    62         CHECK_FCT( fd_fifo_del ( &fd_g_incoming ) );
    63         CHECK_FCT( fd_fifo_del ( &fd_g_outgoing ) );
     77        /* Now, delete the empty queue */
     78        CHECK_FCT( fd_fifo_del ( queue ) );
    6479       
    6580        return 0;
    6681}
    67 
    68 /* Destroy the local message queue */
    69 int fd_queues_fini_disp(void)
    70 {
    71         TRACE_ENTRY();
    72        
    73         /* Empty all contents */
    74         TODO("Empty all contents (dump to log file ?)");
    75        
    76         CHECK_FCT( fd_fifo_del ( &fd_g_local ) );
    77        
    78         return 0;
    79 }
  • freeDiameter/routing_dispatch.c

    r123 r124  
    185185
    186186/********************************************************************************/
    187 /*                        Helper functions                                      */
     187/*                      Some default OUT routing callbacks                      */
    188188/********************************************************************************/
    189 /* Test if a User-Name AVP contains a Decorated NAI -- RFC4282, draft-ietf-dime-nai-routing-04 */
    190 static int is_decorated_NAI(union avp_value * un)
    191 {
    192         int i;
    193         TRACE_ENTRY("%p", un);
    194        
    195         /* If there was no User-Name, we return false */
    196         if (un == NULL)
    197                 return 0;
    198        
    199         /* Search if there is a '!' before any '@' -- do we need to check it contains a '.' ? */
    200         for (i = 0; i < un->os.len; i++) {
    201                 if ( un->os.data[i] == (unsigned char) '!' )
    202                         return 1;
    203                 if ( un->os.data[i] == (unsigned char) '@' )
    204                         break;
    205                 if ( un->os.data[i] == (unsigned char) '\\' )
    206                         i++; /* next one was escaped */
    207         }
    208        
    209         return 0;
    210 }
    211 
    212 /* Create new User-Name and Destination-Realm values */
    213 static int process_decorated_NAI(union avp_value * un, union avp_value * dr)
    214 {
    215         int i, at_idx = 0, sep_idx = 0;
    216         unsigned char * old_un;
    217         TRACE_ENTRY("%p %p", un, dr);
    218         CHECK_PARAMS(un && dr);
    219        
    220         /* Save the decorated User-Name, for example 'homerealm.example.net!user@otherrealm.example.net' */
    221         old_un = un->os.data;
    222        
    223         /* Search the positions of the first '!' and the '@' in the string */
    224         for (i = 0; i < un->os.len; i++) {
    225                 if ( (!sep_idx) && (old_un[i] == (unsigned char) '!') )
    226                         sep_idx = i;
    227                 if ( old_un[i] == (unsigned char) '@' ) {
    228                         at_idx = i;
    229                         break;
    230                 }
    231                 if ( un->os.data[i] == (unsigned char) '\\' )
    232                         i++; /* next one is escaped */
    233         }
    234        
    235         CHECK_PARAMS( 0 < sep_idx < at_idx < un->os.len);
    236        
    237         /* Create the new User-Name value */
    238         CHECK_MALLOC( un->os.data = malloc( at_idx ) );
    239         memcpy( un->os.data, old_un + sep_idx + 1, at_idx - sep_idx ); /* user@ */
    240         memcpy( un->os.data + at_idx - sep_idx, old_un, sep_idx ); /* homerealm.example.net */
    241        
    242         /* Create the new Destination-Realm value */
    243         CHECK_MALLOC( dr->os.data = realloc(dr->os.data, sep_idx) );
    244         memcpy( dr->os.data, old_un, sep_idx );
    245         dr->os.len = sep_idx;
    246        
    247         TRACE_DEBUG(FULL, "Processed Decorated NAI '%.*s' into '%.*s' (%.*s)",
    248                                 un->os.len, old_un,
    249                                 at_idx, un->os.data,
    250                                 dr->os.len, dr->os.data);
    251        
    252         un->os.len = at_idx;
    253         free(old_un);
    254        
    255         return 0;
    256 }
    257 
    258 /* Function to return an error to an incoming request */
    259 static int return_error(struct msg * msg, char * error_code, char * error_message, struct avp * failedavp)
    260 {
    261         struct fd_peer * peer;
    262         int is_loc = 0;
    263 
    264         /* Get the source of the message */
    265         {
    266                 char * id;
    267                 CHECK_FCT( fd_msg_source_get( msg, &id ) );
    268                
    269                 if (id == NULL) {
    270                         is_loc = 1; /* The message was issued locally */
    271                 } else {
    272                
    273                         /* Search the peer with this id */
    274                         CHECK_FCT( fd_peer_getbyid( id, (void *)&peer ) );
    275 
    276                         if (!peer) {
    277                                 TRACE_DEBUG(INFO, "Unable to send error '%s' to deleted peer '%s' in reply to:", error_code, id);
    278                                 fd_msg_dump_walk(INFO, msg);
    279                                 fd_msg_free(msg);
    280                                 return 0;
    281                         }
    282                 }
    283         }
    284        
    285         /* Create the error message */
    286         CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, &msg, MSGFL_ANSW_ERROR ) );
    287 
    288         /* Set the error code */
    289         CHECK_FCT( fd_msg_rescode_set(msg, error_code, error_message, failedavp, 1 ) );
    290 
    291         /* Send the answer */
    292         if (is_loc) {
    293                 CHECK_FCT( fd_fifo_post(fd_g_incoming, &msg) );
    294         } else {
    295                 CHECK_FCT( fd_out_send(&msg, NULL, peer) );
    296         }
    297        
    298         /* Done */
    299         return 0;
    300 }
    301 
    302 
    303 /********************************************************************************/
    304 /*         Second part : the threads moving messages in the daemon              */
    305 /********************************************************************************/
    306 
    307 /* Note: in the first version, we only create one thread of each kind.
    308  We could improve the scalability by using the threshold feature of the queues
    309  to create additional threads if a queue is filling up.
    310  */
    311 
    312 /* Control of the threads */
    313 enum thread_state { INITIAL = 0, RUNNING = 1, TERMINATED = 2 };
    314 static void cleanup_state(void * state_loc)
    315 {
    316         if (state_loc)
    317                 *(enum thread_state *)state_loc = TERMINATED;
    318 }
    319 static pthread_mutex_t order_lock = PTHREAD_MUTEX_INITIALIZER;
    320 static enum { RUN = 0, STOP = 1 } order_val = RUN;;
    321 
    322 /* The dispatching thread */
    323 static void * dispatch_thr(void * arg)
    324 {
    325         TRACE_ENTRY("%p", arg);
    326        
    327         /* Set the thread name */
    328         {
    329                 char buf[48];
    330                 snprintf(buf, sizeof(buf), "Dispatch %p", arg);
    331                 fd_log_threadname ( buf );
    332         }
    333 
    334         pthread_cleanup_push( cleanup_state, arg );
    335        
    336         /* Mark the thread running */
    337         *(enum thread_state *)arg = RUNNING;
    338        
    339         do {
    340                 struct msg * msg;
    341                 struct msg_hdr * hdr;
    342                 int is_req = 0;
    343                 struct session * sess;
    344                 enum disp_action action;
    345                 const char * ec = NULL;
    346                 const char * em = NULL;
    347                
    348                 /* Test the environment */
    349                 {
    350                         int must_stop;
    351                         CHECK_POSIX_DO( pthread_mutex_lock(&order_lock), goto end ); /* we lock to flush the caches */
    352                         must_stop = (order_val == STOP);
    353                         CHECK_POSIX_DO( pthread_mutex_unlock(&order_lock), goto end );
    354                         if (must_stop)
    355                                 goto end;
    356                        
    357                         pthread_testcancel();
    358                 }
    359                
    360                 /* Ok, we are allowed to run */
    361                
    362                 /* Get the next message from the queue */
    363                 {
    364                         int ret;
    365                         CHECK_FCT_DO( ret = fd_fifo_get ( fd_g_local, &msg ),
    366                                 {
    367                                         if (ret == EPIPE)
    368                                                 /* The queue was destroyed */
    369                                                 goto end;
    370                                         goto fatal_error;
    371                                 } );
    372                 }
    373                
    374                 if (TRACE_BOOL(FULL)) {
    375                         TRACE_DEBUG(FULL, "Picked next message");
    376                         fd_msg_dump_one(ANNOYING, msg);
    377                 }
    378                
    379                 /* Read the message header */
    380                 CHECK_FCT_DO( fd_msg_hdr(msg, &hdr), goto fatal_error );
    381                 is_req = hdr->msg_flags & CMD_FLAG_REQUEST;
    382                
    383                 /* Note: if the message is for local delivery, we should test for duplicate
    384                   (draft-asveren-dime-dupcons-00). This may conflict with path validation decisions, no clear answer yet */
    385                
    386                 /* At this point, we need to understand the message content, so parse it */
    387                 {
    388                         int ret;
    389                         CHECK_FCT_DO( ret = fd_msg_parse_or_error( &msg ),
    390                                 {
    391                                         /* in case of error, the message is already dump'd */
    392                                         if ((ret == EBADMSG) && (msg != NULL)) {
    393                                                 /* msg now contains the answer message to send back */
    394                                                 CHECK_FCT_DO( fd_fifo_post(fd_g_outgoing, &msg), goto fatal_error );
    395                                         }
    396                                         if (msg) {      /* another error happen'd */
    397                                                 TRACE_DEBUG(INFO, "An unexpected error occurred (%s), discarding a message:", strerror(ret));
    398                                                 fd_msg_dump_walk(INFO, msg);
    399                                                 CHECK_FCT_DO( fd_msg_free(msg), /* continue */);
    400                                         }
    401                                         /* Go to the next message */
    402                                         continue;
    403                                 } );
    404                 }
    405                
    406                 /* First, if the original request was registered with a callback and we receive the answer, call it. */
    407                 if ( ! is_req ) {
    408                         struct msg * qry;
    409                         void (*anscb)(void *, struct msg **) = NULL;
    410                         void * data = NULL;
    411                        
    412                         /* Retrieve the corresponding query */
    413                         CHECK_FCT_DO( fd_msg_answ_getq( msg, &qry ), goto fatal_error );
    414                        
    415                         /* Retrieve any registered handler */
    416                         CHECK_FCT_DO( fd_msg_anscb_get( qry, &anscb, &data ), goto fatal_error );
    417                        
    418                         /* If a callback was registered, pass the message to it */
    419                         if (anscb != NULL) {
    420                                
    421                                 TRACE_DEBUG(FULL, "Calling callback registered when query was sent (%p, %p)", anscb, data);
    422                                 (*anscb)(data, &msg);
    423                                
    424                                 if (msg == NULL) {
    425                                         /* Ok, the message is now handled, we can skip to the next one */
    426                                         continue;
    427                                 }
    428                         }
    429                 }
    430                
    431                 /* Retrieve the session of the message */
    432                 CHECK_FCT_DO( fd_msg_sess_get(fd_g_config->cnf_dict, msg, &sess, NULL), goto fatal_error );
    433                
    434                 /* Now, call any callback registered for the message */
    435                 CHECK_FCT_DO( fd_msg_dispatch ( &msg, sess, &action, &ec), goto fatal_error );
    436                
    437                 /* Now, act depending on msg and action and ec */
    438                 if (!msg)
    439                         continue;
    440                
    441                 switch ( action ) {
    442                         case DISP_ACT_CONT:
    443                                 /* No callback has handled the message, let's reply with a generic error */
    444                                 em = "The message was not handled by any extension callback";
    445                                 ec = "DIAMETER_COMMAND_UNSUPPORTED";
    446                        
    447                         case DISP_ACT_ERROR:
    448                                 /* We have a problem with delivering the message */
    449                                 if (ec == NULL) {
    450                                         ec = "DIAMETER_UNABLE_TO_COMPLY";
    451                                 }
    452                                
    453                                 if (!is_req) {
    454                                         TRACE_DEBUG(INFO, "Received an answer to a localy issued query, but no handler processed this answer!");
    455                                         fd_msg_dump_walk(INFO, msg);
    456                                         fd_msg_free(msg);
    457                                         msg = NULL;
    458                                         break;
    459                                 }
    460                                
    461                                 /* Create an answer with the error code and message */
    462                                 CHECK_FCT_DO( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, &msg, 0 ), goto fatal_error );
    463                                 CHECK_FCT_DO( fd_msg_rescode_set(msg, (char *)ec, (char *)em, NULL, 1 ), goto fatal_error );
    464                                
    465                         case DISP_ACT_SEND:
    466                                 /* Now, send the message */
    467                                 CHECK_FCT_DO( fd_fifo_post(fd_g_outgoing, &msg), goto fatal_error );
    468                 }
    469                
    470                 /* We're done with this message */
    471        
    472         } while (1);
    473        
    474 fatal_error:
    475         TRACE_DEBUG(INFO, "An error occurred in dispatch module! Thread is terminating...");
    476         CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), );
    477        
    478 end:   
    479         /* Mark the thread as terminated */
    480         pthread_cleanup_pop(1);
    481         return NULL;
    482 }
    483 
    484 
    485 /* The (routing-in) thread -- see description in freeDiameter.h */
    486 static void * routing_in_thr(void * arg)
    487 {
    488         TRACE_ENTRY("%p", arg);
    489        
    490         /* Set the thread name */
    491         {
    492                 char buf[48];
    493                 snprintf(buf, sizeof(buf), "Routing-IN %p", arg);
    494                 fd_log_threadname ( buf );
    495         }
    496 
    497         pthread_cleanup_push( cleanup_state, arg );
    498        
    499         /* Mark the thread running */
    500         *(enum thread_state *)arg = RUNNING;
    501        
    502         /* Main thread loop */
    503         do {
    504                 struct msg * msg;
    505                 struct msg_hdr * hdr;
    506                 int is_req = 0;
    507                 int is_err = 0;
    508                 char * qry_src = NULL;
    509                
    510                 /* Test if we were told to stop */
    511                 {
    512                         int must_stop;
    513                         CHECK_POSIX_DO( pthread_mutex_lock(&order_lock), goto end ); /* we lock to flush the caches */
    514                         must_stop = (order_val == STOP);
    515                         CHECK_POSIX_DO( pthread_mutex_unlock(&order_lock), goto end );
    516                         if (must_stop)
    517                                 goto end;
    518                        
    519                         pthread_testcancel();
    520                 }
    521                
    522                 /* Get the next message from the incoming queue */
    523                 {
    524                         int ret;
    525                         CHECK_FCT_DO( ret = fd_fifo_get ( fd_g_incoming, &msg ),
    526                                 {
    527                                         if (ret == EPIPE)
    528                                                 /* The queue was destroyed */
    529                                                 goto end;
    530                                         goto fatal_error;
    531                                 } );
    532                 }
    533                
    534                 if (TRACE_BOOL(FULL)) {
    535                         TRACE_DEBUG(FULL, "Picked next message");
    536                         fd_msg_dump_one(ANNOYING, msg);
    537                 }
    538                
    539                 /* Read the message header */
    540                 CHECK_FCT_DO( fd_msg_hdr(msg, &hdr), goto fatal_error );
    541                 is_req = hdr->msg_flags & CMD_FLAG_REQUEST;
    542                 is_err = hdr->msg_flags & CMD_FLAG_ERROR;
    543                
    544                 /* Handle incorrect bits */
    545                 if (is_req && is_err) {
    546                         CHECK_FCT_DO( return_error( msg, "DIAMETER_INVALID_HDR_BITS", "R & E bits were set", NULL), goto fatal_error );
    547                         continue;
    548                 }
    549                
    550                 /* If it is a request, we must analyze its content to decide what we do with it */
    551                 if (is_req) {
    552                         struct avp * avp, *un = NULL;
    553                         union avp_value * un_val = NULL, *dr_val = NULL;
    554                         enum status { UNKNOWN, YES, NO };
    555                         /* Are we Destination-Host? */
    556                         enum status is_dest_host = UNKNOWN;
    557                         /* Are we Destination-Realm? */
    558                         enum status is_dest_realm = UNKNOWN;
    559                         /* Do we support the application of the message? */
    560                         enum status is_local_app = UNKNOWN;
    561                        
    562                         /* Check if we have local support for the message application */
    563                         if ( (hdr->msg_appl == 0) || (hdr->msg_appl == AI_RELAY) ) {
    564                                 TRACE_DEBUG(INFO, "Received a routable message with application id 0, returning DIAMETER_APPLICATION_UNSUPPORTED");
    565                                 CHECK_FCT_DO( return_error( msg, "DIAMETER_APPLICATION_UNSUPPORTED", "Routable message with application id 0 or relay", NULL), goto fatal_error );
    566                                 continue;
    567                         } else {
    568                                 struct fd_app * app;
    569                                 CHECK_FCT_DO( fd_app_check(&fd_g_config->cnf_apps, hdr->msg_appl, &app), goto fatal_error );
    570                                 is_local_app = (app ? YES : NO);
    571                         }
    572                        
    573                         /* Parse the message for Dest-Host and Dest-Realm */
    574                         CHECK_FCT_DO(  fd_msg_browse(msg, MSG_BRW_FIRST_CHILD, &avp, NULL), goto fatal_error  );
    575                         while (avp) {
    576                                 struct avp_hdr * ahdr;
    577                                 CHECK_FCT_DO(  fd_msg_avp_hdr( avp, &ahdr ), goto fatal_error  );
    578                                
    579                                 if (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) {
    580                                         switch (ahdr->avp_code) {
    581                                                 case AC_DESTINATION_HOST:
    582                                                         /* Parse this AVP */
    583                                                         CHECK_FCT_DO( fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, NULL ), goto fatal_error );
    584                                                         ASSERT( ahdr->avp_value );
    585                                                         /* Compare the Destination-Host AVP of the message with our identity */
    586                                                         if (ahdr->avp_value->os.len != fd_g_config->cnf_diamid_len) {
    587                                                                 is_dest_host = NO;
    588                                                         } else {
    589                                                                 is_dest_host = (strncasecmp(fd_g_config->cnf_diamid, (char *)ahdr->avp_value->os.data, fd_g_config->cnf_diamid_len)
    590                                                                                         ? NO : YES);
    591                                                         }
    592                                                         break;
    593                                                        
    594                                                 case AC_DESTINATION_REALM:
    595                                                         /* Parse this AVP */
    596                                                         CHECK_FCT_DO( fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, NULL ), goto fatal_error );
    597                                                         ASSERT( ahdr->avp_value );
    598                                                         dr_val = ahdr->avp_value;
    599                                                         /* Compare the Destination-Realm AVP of the message with our identity */
    600                                                         if (ahdr->avp_value->os.len != fd_g_config->cnf_diamrlm_len) {
    601                                                                 is_dest_realm = NO;
    602                                                         } else {
    603                                                                 is_dest_realm = (strncasecmp(fd_g_config->cnf_diamrlm, (char *)ahdr->avp_value->os.data, fd_g_config->cnf_diamrlm_len)
    604                                                                                         ? NO : YES);
    605                                                         }
    606                                                         break;
    607                                                
    608                                                 case AC_USER_NAME:
    609                                                         /* Parse this AVP */
    610                                                         CHECK_FCT_DO( fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, NULL ), goto fatal_error );
    611                                                         ASSERT( ahdr->avp_value );
    612                                                         un = avp;
    613                                                         un_val = ahdr->avp_value;
    614                                                         break;
    615                                         }
    616                                 }
    617                                
    618                                 if ((is_dest_host != UNKNOWN) && (is_dest_realm != UNKNOWN) && un)
    619                                         break;
    620                                
    621                                 /* Go to next AVP */
    622                                 CHECK_FCT_DO(  fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL), goto fatal_error  );
    623                         }
    624                        
    625                         /* OK, now decide what we do with the request */
    626                        
    627                         /* Handle the missing routing AVPs first */
    628                         if ( is_dest_realm == UNKNOWN ) {
    629                                 CHECK_FCT_DO( return_error( msg, "DIAMETER_COMMAND_UNSUPPORTED", "Non-routable message not supported (invalid bit ? missing Destination-Realm ?)", NULL), goto fatal_error );
    630                                 continue;
    631                         }
    632                        
    633                         /* If we are listed as Destination-Host */
    634                         if (is_dest_host == YES) {
    635                                 if (is_local_app == YES) {
    636                                         /* Ok, give the message to the dispatch thread */
    637                                         CHECK_FCT_DO(fd_fifo_post(fd_g_local, &msg), goto fatal_error );
    638                                 } else {
    639                                         /* We don't support the application, reply an error */
    640                                         CHECK_FCT_DO( return_error( msg, "DIAMETER_APPLICATION_UNSUPPORTED", NULL, NULL), goto fatal_error );
    641                                 }
    642                                 continue;
    643                         }
    644                        
    645                         /* If the message is explicitely for someone else */
    646                         if ((is_dest_host == NO) || (is_dest_realm == NO)) {
    647                                 if (fd_g_config->cnf_flags.no_fwd) {
    648                                         CHECK_FCT_DO( return_error( msg, "DIAMETER_UNABLE_TO_DELIVER", "This peer is not an agent", NULL), goto fatal_error );
    649                                         continue;
    650                                 }
    651                         } else {
    652                         /* Destination-Host was not set, and Destination-Realm is matching : we may handle or pass to a fellow peer */
    653                                
    654                                 /* test for decorated NAI  (draft-ietf-dime-nai-routing-04 section 4.4) */
    655                                 if (is_decorated_NAI(un_val)) {
    656                                         /* Handle the decorated NAI */
    657                                         CHECK_FCT_DO( process_decorated_NAI(un_val, dr_val),
    658                                                 {
    659                                                         /* If the process failed, we assume it is because of the AVP format */
    660                                                         CHECK_FCT_DO( return_error( msg, "DIAMETER_INVALID_AVP_VALUE", "Failed to process decorated NAI", un), goto fatal_error );
    661                                                         continue;
    662                                                 } );
    663                                        
    664                                         /* We have transformed the AVP, now submit it again in the queue */
    665                                         CHECK_FCT_DO(fd_fifo_post(fd_g_incoming, &msg), goto fatal_error );
    666                                         continue;
    667                                 }
    668  
    669                                 if (is_local_app == YES) {
    670                                         /* Handle localy since we are able to */
    671                                         CHECK_FCT_DO(fd_fifo_post(fd_g_local, &msg), goto fatal_error );
    672                                         continue;
    673                                 }
    674                                
    675                                 if (fd_g_config->cnf_flags.no_fwd) {
    676                                         /* We return an error */
    677                                         CHECK_FCT_DO( return_error( msg, "DIAMETER_APPLICATION_UNSUPPORTED", NULL, NULL), goto fatal_error );
    678                                         continue;
    679                                 }
    680                         }
    681                        
    682                         /* From that point, for requests, we will call the registered callbacks, then forward to another peer */
    683                        
    684                 } else {
    685                         /* The message is an answer */
    686                         struct msg * qry;
    687                        
    688                         /* Retrieve the corresponding query and its origin */
    689                         CHECK_FCT_DO( fd_msg_answ_getq( msg, &qry ), goto fatal_error );
    690                         CHECK_FCT_DO( fd_msg_source_get( qry, &qry_src ), goto fatal_error );
    691                        
    692                         if ((!qry_src) && (!is_err)) {
    693                                 /* The message is a normal answer to a request issued localy, we do not call the callbacks chain on it. */
    694                                 CHECK_FCT_DO(fd_fifo_post(fd_g_local, &msg), goto fatal_error );
    695                                 continue;
    696                         }
    697                        
    698                         /* From that point, for answers, we will call the registered callbacks, then pass it to the dispatch module or forward it */
    699                 }
    700                
    701                 /* Call all registered callbacks for this message */
    702                 {
    703                         struct fd_list * li;
    704                        
    705                         CHECK_FCT_DO( pthread_rwlock_rdlock( &rt_fwd_lock ), goto fatal_error );
    706                         pthread_cleanup_push( fd_cleanup_rwlock, &rt_fwd_lock );
    707                        
    708                         /* requests: dir = 1 & 2 => in order; answers = 3 & 2 => in reverse order */
    709                         for (   li = (is_req ? rt_fwd_list.next : rt_fwd_list.prev) ; msg && (li != &rt_fwd_list) ; li = (is_req ? li->next : li->prev) ) {
    710                                 struct rt_hdl * rh = (struct rt_hdl *)li;
    711                                
    712                                 if (is_req && (rh->dir > RT_FWD_ALL))
    713                                         break;
    714                                 if ((!is_req) && (rh->dir < RT_FWD_ALL))
    715                                         break;
    716                                
    717                                 /* Ok, call this cb */
    718                                 TRACE_DEBUG(ANNOYING, "Calling next FWD callback on %p : %p", msg, rh->rt_fwd_cb);
    719                                 CHECK_FCT_DO( (*rh->rt_fwd_cb)(rh->cbdata, &msg),
    720                                         {
    721                                                 TRACE_DEBUG(INFO, "A FWD routing callback returned an error, message discarded.");
    722                                                 fd_msg_dump_walk(INFO, msg);
    723                                                 fd_msg_free(msg);
    724                                                 msg = NULL;
    725                                         } );
    726                         }
    727                        
    728                         pthread_cleanup_pop(0);
    729                         CHECK_FCT_DO( pthread_rwlock_unlock( &rt_fwd_lock ), goto fatal_error );
    730                        
    731                         /* If a callback has handled the message, we stop now */
    732                         if (!msg)
    733                                 continue;
    734                 }
    735                
    736                 /* Now handle the message to the next step: either forward to another peer, or for local delivery */
    737                 if (is_req || qry_src) {
    738                         CHECK_FCT_DO(fd_fifo_post(fd_g_outgoing, &msg), goto fatal_error );
    739                 } else {
    740                         CHECK_FCT_DO(fd_fifo_post(fd_g_local, &msg), goto fatal_error );
    741                 }
    742                
    743                 /* We're done with this message */
    744         } while (1);
    745        
    746 fatal_error:
    747         TRACE_DEBUG(INFO, "An error occurred in routing module! IN thread is terminating...");
    748         CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), );
    749        
    750 end:   
    751         /* Mark the thread as terminated */
    752         pthread_cleanup_pop(1);
    753         return NULL;
    754 }
    755 
    756 
    757 /* The (routing-out) thread -- see description in freeDiameter.h */
    758 static void * routing_out_thr(void * arg)
    759 {
    760         struct rt_data * rtd = NULL;
    761         TRACE_ENTRY("%p", arg);
    762        
    763         /* Set the thread name */
    764         {
    765                 char buf[48];
    766                 snprintf(buf, sizeof(buf), "Routing OUT %p", arg);
    767                 fd_log_threadname ( buf );
    768         }
    769 
    770         pthread_cleanup_push( cleanup_state, arg );
    771        
    772         /* Mark the thread running */
    773         *(enum thread_state *)arg = RUNNING;
    774        
    775        
    776         /* Main thread loop */
    777         do {
    778                 struct msg * msg;
    779                 struct msg_hdr * hdr;
    780                 int is_req = 0;
    781                 struct fd_list * li, *candidates;
    782                 struct avp * avp;
    783                 struct rtd_candidate * c;
    784                
    785                 /* If we loop'd with some undeleted routing data, destroy it */
    786                 if (rtd != NULL)
    787                         fd_rtd_free(&rtd);
    788                
    789                 /* Test if we were told to stop */
    790                 {
    791                         int must_stop;
    792                         CHECK_POSIX_DO( pthread_mutex_lock(&order_lock), goto end ); /* we lock to flush the caches */
    793                         must_stop = (order_val == STOP);
    794                         CHECK_POSIX_DO( pthread_mutex_unlock(&order_lock), goto end );
    795                         if (must_stop)
    796                                 goto end;
    797                        
    798                         pthread_testcancel();
    799                 }
    800                
    801                 /* Get the next message from the ougoing queue */
    802                 {
    803                         int ret;
    804                         CHECK_FCT_DO( ret = fd_fifo_get ( fd_g_outgoing, &msg ),
    805                                 {
    806                                         if (ret == EPIPE)
    807                                                 /* The queue was destroyed */
    808                                                 goto end;
    809                                         goto fatal_error;
    810                                 } );
    811                 }
    812                
    813                 if (TRACE_BOOL(FULL)) {
    814                         TRACE_DEBUG(FULL, "Picked next message");
    815                         fd_msg_dump_one(ANNOYING, msg);
    816                 }
    817                
    818                 /* Read the message header */
    819                 CHECK_FCT_DO( fd_msg_hdr(msg, &hdr), goto fatal_error );
    820                 is_req = hdr->msg_flags & CMD_FLAG_REQUEST;
    821                
    822                 /* For answers, the routing is very easy */
    823                 if ( ! is_req ) {
    824                         struct msg * qry;
    825                         char * qry_src = NULL;
    826                         struct msg_hdr * qry_hdr;
    827                         struct fd_peer * peer = NULL;
    828                        
    829                         /* Retrieve the corresponding query and its origin */
    830                         CHECK_FCT_DO( fd_msg_answ_getq( msg, &qry ), goto fatal_error );
    831                         CHECK_FCT_DO( fd_msg_source_get( qry, &qry_src ), goto fatal_error );
    832                        
    833                         ASSERT( qry_src ); /* if it is NULL, the message should have been in the LOCAL queue! */
    834                        
    835                         /* Find the peer corresponding to this name */
    836                         CHECK_FCT_DO( fd_peer_getbyid( qry_src, (void *) &peer ), goto fatal_error );
    837                         if ((!peer) || (peer->p_hdr.info.runtime.pir_state != STATE_OPEN)) {
    838                                 TRACE_DEBUG(INFO, "Unable to forward answer message to peer '%s', deleted or not in OPEN state.", qry_src);
    839                                 fd_msg_dump_walk(INFO, msg);
    840                                 fd_msg_free(msg);
    841                                 continue;
    842                         }
    843                        
    844                         /* We must restore the hop-by-hop id */
    845                         CHECK_FCT_DO( fd_msg_hdr(qry, &qry_hdr), goto fatal_error );
    846                         hdr->msg_hbhid = qry_hdr->msg_hbhid;
    847                        
    848                         /* Push the message into this peer */
    849                         CHECK_FCT_DO( fd_out_send(&msg, NULL, peer), goto fatal_error );
    850                        
    851                         /* We're done with this answer */
    852                         continue;
    853                 }
    854                
    855                 /* From that point, the message is a request */
    856                
    857                 /* Get the routing data out of the message if any (in case of re-transmit) */
    858                 CHECK_FCT_DO( fd_msg_rt_get ( msg, &rtd ), goto fatal_error );
    859                
    860                 /* If there is no routing data already, let's create it */
    861                 if (rtd == NULL) {
    862                         CHECK_FCT_DO( fd_rtd_init(&rtd), goto fatal_error );
    863                        
    864                         /* Add all peers in OPEN state */
    865                         CHECK_FCT_DO( pthread_rwlock_wrlock(&fd_g_activ_peers_rw), goto fatal_error );
    866                         for (li = fd_g_activ_peers.next; li != &fd_g_activ_peers; li = li->next) {
    867                                 struct fd_peer * p = (struct fd_peer *)li->o;
    868                                 CHECK_FCT_DO( fd_rtd_candidate_add(rtd, p->p_hdr.info.pi_diamid), goto fatal_error);
    869                         }
    870                         CHECK_FCT_DO( pthread_rwlock_unlock(&fd_g_activ_peers_rw), goto fatal_error );
    871                        
    872                         /* Now let's remove all peers from the Route-Records */
    873                         CHECK_FCT_DO(  fd_msg_browse(msg, MSG_BRW_FIRST_CHILD, &avp, NULL), goto fatal_error  );
    874                         while (avp) {
    875                                 struct avp_hdr * ahdr;
    876                                 CHECK_FCT_DO(  fd_msg_avp_hdr( avp, &ahdr ), goto fatal_error  );
    877                                
    878                                 if ((ahdr->avp_code == AC_ROUTE_RECORD) && (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) ) {
    879                                         /* Parse this AVP */
    880                                         CHECK_FCT_DO( fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, NULL ), goto fatal_error );
    881                                         ASSERT( ahdr->avp_value );
    882                                         /* Remove this value from the list */
    883                                         fd_rtd_candidate_del(rtd, (char *)ahdr->avp_value->os.data, ahdr->avp_value->os.len);
    884                                 }
    885                                
    886                                 /* Go to next AVP */
    887                                 CHECK_FCT_DO(  fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL), goto fatal_error  );
    888                         }
    889                 }
    890                
    891                 /* Note: we reset the scores and pass the message to the callbacks, maybe we could re-use the saved scores when we have received an error ? */
    892                
    893                 /* Ok, we have our list in rtd now, let's (re)initialize the scores */
    894                 fd_rtd_candidate_extract(rtd, &candidates);
    895                
    896                 /* Pass the list to registered callbacks (even if it is empty) */
    897                 {
    898                         CHECK_FCT_DO( pthread_rwlock_rdlock( &rt_out_lock ), goto fatal_error );
    899                         pthread_cleanup_push( fd_cleanup_rwlock, &rt_out_lock );
    900                        
    901                         /* We call the cb by reverse priority order */
    902                         for (   li = rt_out_list.prev ; li != &rt_out_list ; li = li->prev ) {
    903                                 struct rt_hdl * rh = (struct rt_hdl *)li;
    904                                
    905                                 TRACE_DEBUG(ANNOYING, "Calling next OUT callback on %p : %p (prio %d)", msg, rh->rt_out_cb, rh->prio);
    906                                 CHECK_FCT_DO( (*rh->rt_out_cb)(rh->cbdata, msg, candidates),
    907                                         {
    908                                                 TRACE_DEBUG(INFO, "An OUT routing callback returned an error ! Message discarded.");
    909                                                 fd_msg_dump_walk(INFO, msg);
    910                                                 fd_msg_free(msg);
    911                                                 msg = NULL;
    912                                                 break;
    913                                         } );
    914                         }
    915                        
    916                         pthread_cleanup_pop(0);
    917                         CHECK_FCT_DO( pthread_rwlock_unlock( &rt_out_lock ), goto fatal_error );
    918                        
    919                         /* If an error occurred, skip to the next message */
    920                         if (!msg)
    921                                 continue;
    922                 }
    923                
    924                 /* Order the candidate peers by score attributed by the callbacks */
    925                 CHECK_FCT_DO( fd_rtd_candidate_reorder(candidates), goto fatal_error );
    926                
    927                 /* Save the routing information in the message */
    928                 CHECK_FCT_DO( fd_msg_rt_associate ( msg, &rtd ), goto fatal_error );
    929                
    930                 /* Now try sending the message */
    931                 for (li = candidates->prev; li != candidates; li = li->prev) {
    932                         struct fd_peer * peer;
    933                        
    934                         c = (struct rtd_candidate *) li;
    935                        
    936                         /* Stop when we have reached the end of valid candidates */
    937                         if (c->score < 0)
    938                                 break;
    939                        
    940                         /* Search for the peer */
    941                         CHECK_FCT_DO( fd_peer_getbyid( c->diamid, (void *)&peer ), goto fatal_error );
    942                        
    943                         if (peer && (peer->p_hdr.info.runtime.pir_state == STATE_OPEN)) {
    944                                 /* Send to this one */
    945                                 CHECK_FCT_DO( fd_out_send(&msg, NULL, peer), continue );
    946                                 /* If the sending was successful */
    947                                 break;
    948                         }
    949                 }
    950                
    951                 /* If the message has not been sent, return an error */
    952                 if (msg) {
    953                         TRACE_DEBUG(INFO, "Could not send the following message, replying with UNABLE_TO_DELIVER");
    954                         fd_msg_dump_walk(INFO, msg);
    955                         return_error( msg, "DIAMETER_UNABLE_TO_DELIVER", "No suitable candidate to route the message to", NULL);
    956                 }
    957                
    958                 /* We're done with this message */
    959                
    960         } while (1);
    961        
    962 fatal_error:
    963         TRACE_DEBUG(INFO, "An error occurred in routing module! OUT thread is terminating...");
    964         CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), );
    965        
    966 end:   
    967         /* Mark the thread as terminated */
    968         pthread_cleanup_pop(1);
    969         return NULL;
    970 }
    971 
    972 
    973 /********************************************************************************/
    974 /*                      Some default routing callbacks                          */
    975 /********************************************************************************/
    976 
    977 /* First OUT callback: prevent sending to peers that do not support the message application */
     189
     190/* Prevent sending to peers that do not support the message application */
    978191static int dont_send_if_no_common_app(void * cbdata, struct msg * msg, struct fd_list * candidates)
    979192{
     
    1007220}
    1008221
    1009 /* Second OUT callback: Detect if the Destination-Host and Destination-Realm match the peer */
     222/* Detect if the Destination-Host and Destination-Realm match the peer */
    1010223static int score_destination_avp(void * cbdata, struct msg * msg, struct fd_list * candidates)
    1011224{
     
    1074287
    1075288/********************************************************************************/
     289/*                        Helper functions                                      */
     290/********************************************************************************/
     291
     292/* Find (first) '!' and '@' positions in a UTF-8 encoded string (User-Name AVP value) */
     293static void nai_get_indexes(union avp_value * un, int * excl_idx, int * at_idx)
     294{
     295        int i;
     296       
     297        TRACE_ENTRY("%p %p %p", un, excl_idx, at_idx);
     298        CHECK_PARAMS_DO( un && excl_idx, return );
     299        *excl_idx = 0;
     300       
     301        /* Search if there is a '!' before any '@' -- do we need to check it contains a '.' ? */
     302        for (i = 0; i < un->os.len; i++) {
     303                /* The '!' marks the decorated NAI */
     304                if ( un->os.data[i] == (unsigned char) '!' ) {
     305                        if (!*excl_idx)
     306                                *excl_idx = i;
     307                        if (!at_idx)
     308                                return;
     309                }
     310                /* If we reach the realm part, we can stop */
     311                if ( un->os.data[i] == (unsigned char) '@' ) {
     312                        if (at_idx)
     313                                *at_idx = i;
     314                        break;
     315                }
     316                /* Skip escaped characters */
     317                if ( un->os.data[i] == (unsigned char) '\\' ) {
     318                        i++;
     319                        continue;
     320                }
     321                /* Skip UTF-8 characters spanning on several bytes */
     322                if ( (un->os.data[i] & 0xF8) == 0xF0 ) { /* 11110zzz */
     323                        i += 3;
     324                        continue;
     325                }
     326                if ( (un->os.data[i] & 0xF0) == 0xE0 ) { /* 1110yyyy */
     327                        i += 2;
     328                        continue;
     329                }
     330                if ( (un->os.data[i] & 0xE0) == 0xC0 ) { /* 110yyyxx */
     331                        i += 1;
     332                        continue;
     333                }
     334        }
     335       
     336        return;
     337}       
     338
     339/* Test if a User-Name AVP contains a Decorated NAI -- RFC4282, draft-ietf-dime-nai-routing-04 */
     340static int is_decorated_NAI(union avp_value * un)
     341{
     342        int i;
     343        TRACE_ENTRY("%p", un);
     344       
     345        /* If there was no User-Name, we return false */
     346        if (un == NULL)
     347                return 0;
     348       
     349        nai_get_indexes(un, &i, NULL);
     350       
     351        return i;
     352}
     353
     354/* Create new User-Name and Destination-Realm values */
     355static int process_decorated_NAI(union avp_value * un, union avp_value * dr)
     356{
     357        int i, at_idx = 0, sep_idx = 0;
     358        unsigned char * old_un;
     359        TRACE_ENTRY("%p %p", un, dr);
     360        CHECK_PARAMS(un && dr);
     361       
     362        /* Save the decorated User-Name, for example 'homerealm.example.net!user@otherrealm.example.net' */
     363        old_un = un->os.data;
     364       
     365        /* Search the positions of the first '!' and the '@' in the string */
     366        nai_get_indexes(un, &sep_idx, &at_idx);
     367        CHECK_PARAMS( 0 < sep_idx < at_idx < un->os.len);
     368       
     369        /* Create the new User-Name value */
     370        CHECK_MALLOC( un->os.data = malloc( at_idx ) );
     371        memcpy( un->os.data, old_un + sep_idx + 1, at_idx - sep_idx ); /* user@ */
     372        memcpy( un->os.data + at_idx - sep_idx, old_un, sep_idx ); /* homerealm.example.net */
     373       
     374        /* Create the new Destination-Realm value */
     375        CHECK_MALLOC( dr->os.data = realloc(dr->os.data, sep_idx) );
     376        memcpy( dr->os.data, old_un, sep_idx );
     377        dr->os.len = sep_idx;
     378       
     379        TRACE_DEBUG(FULL, "Processed Decorated NAI : '%.*s' became '%.*s' (%.*s)",
     380                                un->os.len, old_un,
     381                                at_idx, un->os.data,
     382                                dr->os.len, dr->os.data);
     383       
     384        un->os.len = at_idx;
     385        free(old_un);
     386       
     387        return 0;
     388}
     389
     390/* Function to return an error to an incoming request */
     391static int return_error(struct msg ** pmsg, char * error_code, char * error_message, struct avp * failedavp)
     392{
     393        struct fd_peer * peer;
     394        int is_loc = 0;
     395
     396        /* Get the source of the message */
     397        {
     398                char * id;
     399                CHECK_FCT( fd_msg_source_get( *pmsg, &id ) );
     400               
     401                if (id == NULL) {
     402                        is_loc = 1; /* The message was issued locally */
     403                } else {
     404               
     405                        /* Search the peer with this id */
     406                        CHECK_FCT( fd_peer_getbyid( id, (void *)&peer ) );
     407
     408                        if (!peer) {
     409                                TRACE_DEBUG(INFO, "Unable to send error '%s' to deleted peer '%s' in reply to:", error_code, id);
     410                                fd_msg_dump_walk(INFO, *pmsg);
     411                                fd_msg_free(*pmsg);
     412                                *pmsg = NULL;
     413                                return 0;
     414                        }
     415                }
     416        }
     417       
     418        /* Create the error message */
     419        CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, pmsg, MSGFL_ANSW_ERROR ) );
     420
     421        /* Set the error code */
     422        CHECK_FCT( fd_msg_rescode_set(*pmsg, error_code, error_message, failedavp, 1 ) );
     423
     424        /* Send the answer */
     425        if (is_loc) {
     426                CHECK_FCT( fd_fifo_post(fd_g_incoming, pmsg) );
     427        } else {
     428                CHECK_FCT( fd_out_send(pmsg, NULL, peer) );
     429        }
     430       
     431        /* Done */
     432        return 0;
     433}
     434
     435
     436/****************************************************************************/
     437/*         Second part : threads moving messages in the daemon              */
     438/****************************************************************************/
     439
     440/* These are the functions of each threads: dispatch & routing */
     441/* The DISPATCH message processing */
     442static int msg_dispatch(struct msg ** pmsg)
     443{
     444        struct msg_hdr * hdr;
     445        int is_req = 0, ret;
     446        struct session * sess;
     447        enum disp_action action;
     448        const char * ec = NULL;
     449        const char * em = NULL;
     450
     451        /* Read the message header */
     452        CHECK_FCT( fd_msg_hdr(*pmsg, &hdr) );
     453        is_req = hdr->msg_flags & CMD_FLAG_REQUEST;
     454               
     455        /* Note: if the message is for local delivery, we should test for duplicate
     456          (draft-asveren-dime-dupcons-00). This may conflict with path validation decisions, no clear answer yet */
     457
     458        /* At this point, we need to understand the message content, so parse it */
     459        CHECK_FCT_DO( ret = fd_msg_parse_or_error( pmsg ),
     460                {
     461                        /* in case of error, the message is already dump'd */
     462                        if ((ret == EBADMSG) && (*pmsg != NULL)) {
     463                                /* msg now contains the answer message to send back */
     464                                CHECK_FCT( fd_fifo_post(fd_g_outgoing, pmsg) );
     465                        }
     466                        if (*pmsg) {    /* another error happen'd */
     467                                TRACE_DEBUG(INFO, "An unexpected error occurred (%s), discarding a message:", strerror(ret));
     468                                fd_msg_dump_walk(INFO, *pmsg);
     469                                CHECK_FCT_DO( fd_msg_free(*pmsg), /* continue */);
     470                                *pmsg = NULL;
     471                        }
     472                        /* We're done with this one */
     473                        return 0;
     474                } );
     475
     476        /* First, if the original request was registered with a callback and we receive the answer, call it. */
     477        if ( ! is_req ) {
     478                struct msg * qry;
     479                void (*anscb)(void *, struct msg **) = NULL;
     480                void * data = NULL;
     481
     482                /* Retrieve the corresponding query */
     483                CHECK_FCT( fd_msg_answ_getq( *pmsg, &qry ) );
     484
     485                /* Retrieve any registered handler */
     486                CHECK_FCT( fd_msg_anscb_get( qry, &anscb, &data ) );
     487
     488                /* If a callback was registered, pass the message to it */
     489                if (anscb != NULL) {
     490
     491                        TRACE_DEBUG(FULL, "Calling callback registered when query was sent (%p, %p)", anscb, data);
     492                        (*anscb)(data, pmsg);
     493                       
     494                        /* If the message is processed, we're done */
     495                        if (*pmsg == NULL) {
     496                                return 0;
     497                        }
     498                }
     499        }
     500       
     501        /* Retrieve the session of the message */
     502        CHECK_FCT( fd_msg_sess_get(fd_g_config->cnf_dict, *pmsg, &sess, NULL) );
     503
     504        /* Now, call any callback registered for the message */
     505        CHECK_FCT( fd_msg_dispatch ( pmsg, sess, &action, &ec) );
     506
     507        /* Now, act depending on msg and action and ec */
     508        if (*pmsg)
     509                switch ( action ) {
     510                        case DISP_ACT_CONT:
     511                                /* No callback has handled the message, let's reply with a generic error */
     512                                em = "The message was not handled by any extension callback";
     513                                ec = "DIAMETER_COMMAND_UNSUPPORTED";
     514                       
     515                        case DISP_ACT_ERROR:
     516                                /* We have a problem with delivering the message */
     517                                if (ec == NULL) {
     518                                        ec = "DIAMETER_UNABLE_TO_COMPLY";
     519                                }
     520                               
     521                                if (!is_req) {
     522                                        TRACE_DEBUG(INFO, "Received an answer to a localy issued query, but no handler processed this answer!");
     523                                        fd_msg_dump_walk(INFO, *pmsg);
     524                                        fd_msg_free(*pmsg);
     525                                        *pmsg = NULL;
     526                                        break;
     527                                }
     528                               
     529                                /* Create an answer with the error code and message */
     530                                CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, pmsg, 0 ) );
     531                                CHECK_FCT( fd_msg_rescode_set(*pmsg, (char *)ec, (char *)em, NULL, 1 ) );
     532                               
     533                        case DISP_ACT_SEND:
     534                                /* Now, send the message */
     535                                CHECK_FCT( fd_fifo_post(fd_g_outgoing, pmsg) );
     536                }
     537       
     538        /* We're done with dispatching this message */
     539        return 0;
     540}
     541
     542/* The ROUTING-IN message processing */
     543static int msg_rt_in(struct msg ** pmsg)
     544{
     545        struct msg_hdr * hdr;
     546        int is_req = 0;
     547        int is_err = 0;
     548        char * qry_src = NULL;
     549
     550        /* Read the message header */
     551        CHECK_FCT( fd_msg_hdr(*pmsg, &hdr) );
     552        is_req = hdr->msg_flags & CMD_FLAG_REQUEST;
     553        is_err = hdr->msg_flags & CMD_FLAG_ERROR;
     554
     555        /* Handle incorrect bits */
     556        if (is_req && is_err) {
     557                CHECK_FCT( return_error( pmsg, "DIAMETER_INVALID_HDR_BITS", "R & E bits were set", NULL) );
     558                return 0;
     559        }
     560       
     561        /* If it is a request, we must analyze its content to decide what we do with it */
     562        if (is_req) {
     563                struct avp * avp, *un = NULL;
     564                union avp_value * un_val = NULL, *dr_val = NULL;
     565                enum status { UNKNOWN, YES, NO };
     566                /* Are we Destination-Host? */
     567                enum status is_dest_host = UNKNOWN;
     568                /* Are we Destination-Realm? */
     569                enum status is_dest_realm = UNKNOWN;
     570                /* Do we support the application of the message? */
     571                enum status is_local_app = UNKNOWN;
     572
     573                /* Check if we have local support for the message application */
     574                if ( (hdr->msg_appl == 0) || (hdr->msg_appl == AI_RELAY) ) {
     575                        TRACE_DEBUG(INFO, "Received a routable message with application id 0, returning DIAMETER_APPLICATION_UNSUPPORTED");
     576                        CHECK_FCT( return_error( pmsg, "DIAMETER_APPLICATION_UNSUPPORTED", "Routable message with application id 0 or relay", NULL) );
     577                        return 0;
     578                } else {
     579                        struct fd_app * app;
     580                        CHECK_FCT( fd_app_check(&fd_g_config->cnf_apps, hdr->msg_appl, &app) );
     581                        is_local_app = (app ? YES : NO);
     582                }
     583
     584                /* Parse the message for Dest-Host and Dest-Realm */
     585                CHECK_FCT(  fd_msg_browse(*pmsg, MSG_BRW_FIRST_CHILD, &avp, NULL)  );
     586                while (avp) {
     587                        struct avp_hdr * ahdr;
     588                        struct fd_pei error_info;
     589                        int ret;
     590                        CHECK_FCT(  fd_msg_avp_hdr( avp, &ahdr )  );
     591
     592                        if (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) {
     593                                switch (ahdr->avp_code) {
     594                                        case AC_DESTINATION_HOST:
     595                                                /* Parse this AVP */
     596                                                CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ),
     597                                                        {
     598                                                                if (error_info.pei_errcode) {
     599                                                                        CHECK_FCT( return_error( pmsg, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) );
     600                                                                        return 0;
     601                                                                } else {
     602                                                                        return ret;
     603                                                                }
     604                                                        } );
     605                                                ASSERT( ahdr->avp_value );
     606                                                /* Compare the Destination-Host AVP of the message with our identity */
     607                                                if (ahdr->avp_value->os.len != fd_g_config->cnf_diamid_len) {
     608                                                        is_dest_host = NO;
     609                                                } else {
     610                                                        is_dest_host = (strncasecmp(fd_g_config->cnf_diamid, (char *)ahdr->avp_value->os.data, fd_g_config->cnf_diamid_len)
     611                                                                                ? NO : YES);
     612                                                }
     613                                                break;
     614
     615                                        case AC_DESTINATION_REALM:
     616                                                /* Parse this AVP */
     617                                                CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ),
     618                                                        {
     619                                                                if (error_info.pei_errcode) {
     620                                                                        CHECK_FCT( return_error( pmsg, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) );
     621                                                                        return 0;
     622                                                                } else {
     623                                                                        return ret;
     624                                                                }
     625                                                        } );
     626                                                ASSERT( ahdr->avp_value );
     627                                                dr_val = ahdr->avp_value;
     628                                                /* Compare the Destination-Realm AVP of the message with our identity */
     629                                                if (ahdr->avp_value->os.len != fd_g_config->cnf_diamrlm_len) {
     630                                                        is_dest_realm = NO;
     631                                                } else {
     632                                                        is_dest_realm = (strncasecmp(fd_g_config->cnf_diamrlm, (char *)ahdr->avp_value->os.data, fd_g_config->cnf_diamrlm_len)
     633                                                                                ? NO : YES);
     634                                                }
     635                                                break;
     636
     637                                        case AC_USER_NAME:
     638                                                /* Parse this AVP */
     639                                                CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ),
     640                                                        {
     641                                                                if (error_info.pei_errcode) {
     642                                                                        CHECK_FCT( return_error( pmsg, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) );
     643                                                                        return 0;
     644                                                                } else {
     645                                                                        return ret;
     646                                                                }
     647                                                        } );
     648                                                ASSERT( ahdr->avp_value );
     649                                                un = avp;
     650                                                un_val = ahdr->avp_value;
     651                                                break;
     652                                }
     653                        }
     654
     655                        if ((is_dest_host != UNKNOWN) && (is_dest_realm != UNKNOWN) && un)
     656                                break;
     657
     658                        /* Go to next AVP */
     659                        CHECK_FCT(  fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL)  );
     660                }
     661
     662                /* OK, now decide what we do with the request */
     663
     664                /* Handle the missing routing AVPs first */
     665                if ( is_dest_realm == UNKNOWN ) {
     666                        CHECK_FCT( return_error( pmsg, "DIAMETER_COMMAND_UNSUPPORTED", "Non-routable message not supported (invalid bit ? missing Destination-Realm ?)", NULL) );
     667                        return 0;
     668                }
     669
     670                /* If we are listed as Destination-Host */
     671                if (is_dest_host == YES) {
     672                        if (is_local_app == YES) {
     673                                /* Ok, give the message to the dispatch thread */
     674                                CHECK_FCT( fd_fifo_post(fd_g_local, pmsg) );
     675                        } else {
     676                                /* We don't support the application, reply an error */
     677                                CHECK_FCT( return_error( pmsg, "DIAMETER_APPLICATION_UNSUPPORTED", NULL, NULL) );
     678                        }
     679                        return 0;
     680                }
     681
     682                /* If the message is explicitely for someone else */
     683                if ((is_dest_host == NO) || (is_dest_realm == NO)) {
     684                        if (fd_g_config->cnf_flags.no_fwd) {
     685                                CHECK_FCT( return_error( pmsg, "DIAMETER_UNABLE_TO_DELIVER", "This peer is not an agent", NULL) );
     686                                return 0;
     687                        }
     688                } else {
     689                /* Destination-Host was not set, and Destination-Realm is matching : we may handle or pass to a fellow peer */
     690
     691                        /* test for decorated NAI  (draft-ietf-dime-nai-routing-04 section 4.4) */
     692                        if (is_decorated_NAI(un_val)) {
     693                                /* Handle the decorated NAI */
     694                                CHECK_FCT_DO( process_decorated_NAI(un_val, dr_val),
     695                                        {
     696                                                /* If the process failed, we assume it is because of the AVP format */
     697                                                CHECK_FCT( return_error( pmsg, "DIAMETER_INVALID_AVP_VALUE", "Failed to process decorated NAI", un) );
     698                                                return 0;
     699                                        } );
     700
     701                                /* We have transformed the AVP, now submit it again in the queue */
     702                                CHECK_FCT(fd_fifo_post(fd_g_incoming, pmsg) );
     703                                return 0;
     704                        }
     705
     706                        if (is_local_app == YES) {
     707                                /* Handle localy since we are able to */
     708                                CHECK_FCT(fd_fifo_post(fd_g_local, pmsg) );
     709                                return 0;
     710                        }
     711
     712                        if (fd_g_config->cnf_flags.no_fwd) {
     713                                /* We return an error */
     714                                CHECK_FCT( return_error( pmsg, "DIAMETER_APPLICATION_UNSUPPORTED", NULL, NULL) );
     715                                return 0;
     716                        }
     717                }
     718
     719                /* From that point, for requests, we will call the registered callbacks, then forward to another peer */
     720
     721        } else {
     722                /* The message is an answer */
     723                struct msg * qry;
     724
     725                /* Retrieve the corresponding query and its origin */
     726                CHECK_FCT( fd_msg_answ_getq( *pmsg, &qry ) );
     727                CHECK_FCT( fd_msg_source_get( qry, &qry_src ) );
     728
     729                if ((!qry_src) && (!is_err)) {
     730                        /* The message is a normal answer to a request issued localy, we do not call the callbacks chain on it. */
     731                        CHECK_FCT(fd_fifo_post(fd_g_local, pmsg) );
     732                        return 0;
     733                }
     734
     735                /* From that point, for answers, we will call the registered callbacks, then pass it to the dispatch module or forward it */
     736        }
     737
     738        /* Call all registered callbacks for this message */
     739        {
     740                struct fd_list * li;
     741
     742                CHECK_FCT( pthread_rwlock_rdlock( &rt_fwd_lock ) );
     743                pthread_cleanup_push( fd_cleanup_rwlock, &rt_fwd_lock );
     744
     745                /* requests: dir = 1 & 2 => in order; answers = 3 & 2 => in reverse order */
     746                for (   li = (is_req ? rt_fwd_list.next : rt_fwd_list.prev) ; *pmsg && (li != &rt_fwd_list) ; li = (is_req ? li->next : li->prev) ) {
     747                        struct rt_hdl * rh = (struct rt_hdl *)li;
     748
     749                        if (is_req && (rh->dir > RT_FWD_ALL))
     750                                break;
     751                        if ((!is_req) && (rh->dir < RT_FWD_ALL))
     752                                break;
     753
     754                        /* Ok, call this cb */
     755                        TRACE_DEBUG(ANNOYING, "Calling next FWD callback on %p : %p", *pmsg, rh->rt_fwd_cb);
     756                        CHECK_FCT_DO( (*rh->rt_fwd_cb)(rh->cbdata, pmsg),
     757                                {
     758                                        TRACE_DEBUG(INFO, "A FWD routing callback returned an error, message discarded.");
     759                                        fd_msg_dump_walk(INFO, *pmsg);
     760                                        fd_msg_free(*pmsg);
     761                                        *pmsg = NULL;
     762                                } );
     763                }
     764
     765                pthread_cleanup_pop(0);
     766                CHECK_FCT( pthread_rwlock_unlock( &rt_fwd_lock ) );
     767
     768                /* If a callback has handled the message, we stop now */
     769                if (!*pmsg)
     770                        return 0;
     771        }
     772
     773        /* Now pass the message to the next step: either forward to another peer, or dispatch to local extensions */
     774        if (is_req || qry_src) {
     775                CHECK_FCT(fd_fifo_post(fd_g_outgoing, pmsg) );
     776        } else {
     777                CHECK_FCT(fd_fifo_post(fd_g_local, pmsg) );
     778        }
     779
     780        /* We're done with this message */
     781        return 0;
     782}
     783               
     784
     785/* The ROUTING-OUT message processing */
     786static int msg_rt_out(struct msg ** pmsg)
     787{
     788        struct rt_data * rtd = NULL;
     789        struct msg_hdr * hdr;
     790        int is_req = 0;
     791        int ret;
     792        struct fd_list * li, *candidates;
     793        struct avp * avp;
     794        struct rtd_candidate * c;
     795       
     796        /* Read the message header */
     797        CHECK_FCT( fd_msg_hdr(*pmsg, &hdr) );
     798        is_req = hdr->msg_flags & CMD_FLAG_REQUEST;
     799               
     800        /* For answers, the routing is very easy */
     801        if ( ! is_req ) {
     802                struct msg * qry;
     803                char * qry_src = NULL;
     804                struct msg_hdr * qry_hdr;
     805                struct fd_peer * peer = NULL;
     806
     807                /* Retrieve the corresponding query and its origin */
     808                CHECK_FCT( fd_msg_answ_getq( *pmsg, &qry ) );
     809                CHECK_FCT( fd_msg_source_get( qry, &qry_src ) );
     810
     811                ASSERT( qry_src ); /* if it is NULL, the message should have been in the LOCAL queue! */
     812
     813                /* Find the peer corresponding to this name */
     814                CHECK_FCT( fd_peer_getbyid( qry_src, (void *) &peer ) );
     815                if ((!peer) || (peer->p_hdr.info.runtime.pir_state != STATE_OPEN)) {
     816                        TRACE_DEBUG(INFO, "Unable to forward answer message to peer '%s', deleted or not in OPEN state.", qry_src);
     817                        fd_msg_dump_walk(INFO, *pmsg);
     818                        fd_msg_free(*pmsg);
     819                        *pmsg = NULL;
     820                        return 0;
     821                }
     822
     823                /* We must restore the hop-by-hop id */
     824                CHECK_FCT( fd_msg_hdr(qry, &qry_hdr) );
     825                hdr->msg_hbhid = qry_hdr->msg_hbhid;
     826
     827                /* Push the message into this peer */
     828                CHECK_FCT( fd_out_send(pmsg, NULL, peer) );
     829
     830                /* We're done with this answer */
     831                return 0;
     832        }
     833       
     834        /* From that point, the message is a request */
     835
     836        /* Get the routing data out of the message if any (in case of re-transmit) */
     837        CHECK_FCT( fd_msg_rt_get ( *pmsg, &rtd ) );
     838
     839        /* If there is no routing data already, let's create it */
     840        if (rtd == NULL) {
     841                CHECK_FCT( fd_rtd_init(&rtd) );
     842
     843                /* Add all peers in OPEN state */
     844                CHECK_FCT( pthread_rwlock_rdlock(&fd_g_activ_peers_rw) );
     845                for (li = fd_g_activ_peers.next; li != &fd_g_activ_peers; li = li->next) {
     846                        struct fd_peer * p = (struct fd_peer *)li->o;
     847                        CHECK_FCT_DO( ret = fd_rtd_candidate_add(rtd, p->p_hdr.info.pi_diamid), { CHECK_FCT_DO( pthread_rwlock_unlock(&fd_g_activ_peers_rw), ); return ret; } );
     848                }
     849                CHECK_FCT( pthread_rwlock_unlock(&fd_g_activ_peers_rw) );
     850
     851                /* Now let's remove all peers from the Route-Records */
     852                CHECK_FCT(  fd_msg_browse(*pmsg, MSG_BRW_FIRST_CHILD, &avp, NULL)  );
     853                while (avp) {
     854                        struct avp_hdr * ahdr;
     855                        struct fd_pei error_info;
     856                        CHECK_FCT(  fd_msg_avp_hdr( avp, &ahdr )  );
     857
     858                        if ((ahdr->avp_code == AC_ROUTE_RECORD) && (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) ) {
     859                                /* Parse this AVP */
     860                                CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ),
     861                                        {
     862                                                if (error_info.pei_errcode) {
     863                                                        CHECK_FCT( return_error( pmsg, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) );
     864                                                        return 0;
     865                                                } else {
     866                                                        return ret;
     867                                                }
     868                                        } );
     869                                ASSERT( ahdr->avp_value );
     870                                /* Remove this value from the list */
     871                                fd_rtd_candidate_del(rtd, (char *)ahdr->avp_value->os.data, ahdr->avp_value->os.len);
     872                        }
     873
     874                        /* Go to next AVP */
     875                        CHECK_FCT(  fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL)  );
     876                }
     877        }
     878
     879        /* Note: we reset the scores and pass the message to the callbacks, maybe we could re-use the saved scores when we have received an error ? */
     880
     881        /* Ok, we have our list in rtd now, let's (re)initialize the scores */
     882        fd_rtd_candidate_extract(rtd, &candidates, FD_SCORE_INI);
     883
     884        /* Pass the list to registered callbacks (even if it is empty) */
     885        {
     886                CHECK_FCT( pthread_rwlock_rdlock( &rt_out_lock ) );
     887                pthread_cleanup_push( fd_cleanup_rwlock, &rt_out_lock );
     888
     889                /* We call the cb by reverse priority order */
     890                for (   li = rt_out_list.prev ; li != &rt_out_list ; li = li->prev ) {
     891                        struct rt_hdl * rh = (struct rt_hdl *)li;
     892
     893                        TRACE_DEBUG(ANNOYING, "Calling next OUT callback on %p : %p (prio %d)", *pmsg, rh->rt_out_cb, rh->prio);
     894                        CHECK_FCT_DO( ret = (*rh->rt_out_cb)(rh->cbdata, *pmsg, candidates),
     895                                {
     896                                        TRACE_DEBUG(INFO, "An OUT routing callback returned an error (%s) ! Message discarded.", strerror(ret));
     897                                        fd_msg_dump_walk(INFO, *pmsg);
     898                                        fd_msg_free(*pmsg);
     899                                        *pmsg = NULL;
     900                                        break;
     901                                } );
     902                }
     903
     904                pthread_cleanup_pop(0);
     905                CHECK_FCT( pthread_rwlock_unlock( &rt_out_lock ) );
     906
     907                /* If an error occurred, skip to the next message */
     908                if (! *pmsg) {
     909                        if (rtd)
     910                                fd_rtd_free(&rtd);
     911                        return 0;
     912                }
     913        }
     914       
     915        /* Order the candidate peers by score attributed by the callbacks */
     916        CHECK_FCT( fd_rtd_candidate_reorder(candidates) );
     917
     918        /* Save the routing information in the message */
     919        CHECK_FCT( fd_msg_rt_associate ( *pmsg, &rtd ) );
     920
     921        /* Now try sending the message */
     922        for (li = candidates->prev; li != candidates; li = li->prev) {
     923                struct fd_peer * peer;
     924
     925                c = (struct rtd_candidate *) li;
     926
     927                /* Stop when we have reached the end of valid candidates */
     928                if (c->score < 0)
     929                        break;
     930
     931                /* Search for the peer */
     932                CHECK_FCT( fd_peer_getbyid( c->diamid, (void *)&peer ) );
     933
     934                if (peer && (peer->p_hdr.info.runtime.pir_state == STATE_OPEN)) {
     935                        /* Send to this one */
     936                        CHECK_FCT_DO( fd_out_send(pmsg, NULL, peer), continue );
     937                       
     938                        /* If the sending was successful */
     939                        break;
     940                }
     941        }
     942
     943        /* If the message has not been sent, return an error */
     944        if (*pmsg) {
     945                TRACE_DEBUG(INFO, "Could not send the following message, replying with UNABLE_TO_DELIVER");
     946                fd_msg_dump_walk(INFO, *pmsg);
     947                return_error( pmsg, "DIAMETER_UNABLE_TO_DELIVER", "No suitable candidate to route the message to", NULL);
     948        }
     949
     950        /* We're done with this message */
     951       
     952        return 0;
     953}
     954
     955
     956/********************************************************************************/
     957/*                     Management of the threads                                */
     958/********************************************************************************/
     959
     960/* Note: in the first version, we only create one thread of each kind.
     961 We could improve the scalability by using the threshold feature of the queues
     962 to create additional threads if a queue is filling up, or at least giving a configurable
     963 number of threads of each kind.
     964 */
     965
     966/* Control of the threads */
     967static enum { RUN = 0, STOP = 1 } order_val = RUN;
     968static pthread_mutex_t order_lock = PTHREAD_MUTEX_INITIALIZER;
     969
     970/* Threads report their status */
     971enum thread_state { INITIAL = 0, RUNNING = 1, TERMINATED = 2 };
     972static void cleanup_state(void * state_loc)
     973{
     974        if (state_loc)
     975                *(enum thread_state *)state_loc = TERMINATED;
     976}
     977
     978/* This is the common thread code (same for routing and dispatching) */
     979static void * process_thr(void * arg, int (*action_cb)(struct msg ** pmsg), struct fifo * queue, char * action_name)
     980{
     981        TRACE_ENTRY("%p %p %p %p", arg, action_cb, queue, action_name);
     982       
     983        /* Set the thread name */
     984        {
     985                char buf[48];
     986                snprintf(buf, sizeof(buf), "%s (%p)", action_name, arg);
     987                fd_log_threadname ( buf );
     988        }
     989       
     990        /* The thread reports its status when canceled */
     991        CHECK_PARAMS_DO(arg, return NULL);
     992        pthread_cleanup_push( cleanup_state, arg );
     993       
     994        /* Mark the thread running */
     995        *(enum thread_state *)arg = RUNNING;
     996       
     997        do {
     998                struct msg * msg;
     999       
     1000                /* Test the current order */
     1001                {
     1002                        int must_stop;
     1003                        CHECK_POSIX_DO( pthread_mutex_lock(&order_lock), goto end ); /* we lock to flush the caches */
     1004                        must_stop = (order_val == STOP);
     1005                        CHECK_POSIX_DO( pthread_mutex_unlock(&order_lock), goto end );
     1006                        if (must_stop)
     1007                                goto end;
     1008                       
     1009                        pthread_testcancel();
     1010                }
     1011               
     1012                /* Ok, we are allowed to run */
     1013               
     1014                /* Get the next message from the queue */
     1015                {
     1016                        int ret;
     1017                        CHECK_FCT_DO( ret = fd_fifo_get ( queue, &msg ),
     1018                                {
     1019                                        if (ret == EPIPE)
     1020                                                /* The queue was destroyed, we are probably exiting */
     1021                                                goto end;
     1022                                        /* another error occurred */
     1023                                        goto fatal_error;
     1024                                } );
     1025                }
     1026               
     1027                if (TRACE_BOOL(FULL)) {
     1028                        TRACE_DEBUG(FULL, "Picked next message");
     1029                        fd_msg_dump_one(ANNOYING, msg);
     1030                }
     1031               
     1032                /* Now process the message */
     1033                CHECK_FCT_DO( (*action_cb)(&msg), goto fatal_error);
     1034
     1035                /* We're done with this message */
     1036       
     1037        } while (1);
     1038       
     1039fatal_error:
     1040        TRACE_DEBUG(INFO, "An unrecoverable error occurred, %s thread is terminating...", action_name);
     1041        CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), );
     1042       
     1043end:   
     1044        /* Mark the thread as terminated */
     1045        pthread_cleanup_pop(1);
     1046        return NULL;
     1047}
     1048
     1049/* The dispatch thread */
     1050static void * dispatch_thr(void * arg)
     1051{
     1052        return process_thr(arg, msg_dispatch, fd_g_local, "Dispatch");
     1053}
     1054
     1055/* The (routing-in) thread -- see description in freeDiameter.h */
     1056static void * routing_in_thr(void * arg)
     1057{
     1058        return process_thr(arg, msg_rt_in, fd_g_incoming, "Routing-IN");
     1059}
     1060
     1061/* The (routing-out) thread -- see description in freeDiameter.h */
     1062static void * routing_out_thr(void * arg)
     1063{
     1064        return process_thr(arg, msg_rt_out, fd_g_outgoing, "Routing-OUT");
     1065}
     1066
     1067
     1068/********************************************************************************/
    10761069/*                     The functions for the other files                        */
    10771070/********************************************************************************/
    10781071
    1079 /* Later: TODO("Set thresholds on queues"); */
     1072/* Later: make this more dynamic */
    10801073static pthread_t dispatch = (pthread_t)NULL;
    10811074static enum thread_state disp_state = INITIAL;
     
    11121105}
    11131106
    1114 /* Stop the thread after up to one second of wait */
    1115 int fd_rtdisp_fini(void)
    1116 {
    1117         /* Destroy the local queue */
    1118         CHECK_FCT_DO( fd_queues_fini_disp(), /* ignore */);
     1107static void stop_thread_delayed(enum thread_state *st, pthread_t * thr, char * th_name)
     1108{
     1109        TRACE_ENTRY("%p %p", st, thr);
     1110        CHECK_PARAMS_DO(st && thr, return);
    11191111
    11201112        /* Wait for a second for the thread to complete, by monitoring my_state */
    1121         if (disp_state != TERMINATED) {
    1122                 TRACE_DEBUG(INFO, "Waiting for the dispatch thread to have a chance to terminate");
     1113        if (*st != TERMINATED) {
     1114                TRACE_DEBUG(INFO, "Waiting for the %s thread to have a chance to terminate", th_name);
    11231115                do {
    11241116                        struct timespec  ts, ts_final;
     
    11301122                       
    11311123                        while (TS_IS_INFERIOR( &ts, &ts_final )) {
    1132                                 if (disp_state == TERMINATED)
     1124                                if (*st == TERMINATED)
    11331125                                        break;
    11341126                               
     
    11401132
    11411133        /* Now stop the thread and reclaim its resources */
    1142         CHECK_FCT_DO( fd_thr_term(&dispatch ), /* continue */);
    1143        
    1144        
    1145         TODO("Add terminating the routing threads");
     1134        CHECK_FCT_DO( fd_thr_term(thr ), /* continue */);
     1135       
     1136}
     1137
     1138/* Stop the thread after up to one second of wait */
     1139int fd_rtdisp_fini(void)
     1140{
     1141        /* Destroy the incoming queue */
     1142        CHECK_FCT_DO( fd_queues_fini(&fd_g_incoming), /* ignore */);
     1143       
     1144        /* Stop the routing IN thread */
     1145        stop_thread_delayed(&in_state, &rt_in, "IN routing");
     1146       
     1147        /* Destroy the outgoing queue */
     1148        CHECK_FCT_DO( fd_queues_fini(&fd_g_outgoing), /* ignore */);
     1149       
     1150        /* Stop the routing OUT thread */
     1151        stop_thread_delayed(&out_state, &rt_out, "OUT routing");
     1152       
     1153        /* Destroy the local queue */
     1154        CHECK_FCT_DO( fd_queues_fini(&fd_g_local), /* ignore */);
     1155       
     1156        /* Stop the Dispatch thread */
     1157        stop_thread_delayed(&disp_state, &dispatch, "Dispatching");
     1158       
    11461159        return 0;
    11471160}
     
    11631176}
    11641177
     1178
     1179/********************************************************************************/
     1180/*                     For extensiosn to register a new appl                    */
     1181/********************************************************************************/
    11651182
    11661183/* Add an application into the peer's supported apps */
  • include/freeDiameter/freeDiameter.h

    r105 r124  
    562562enum fd_rt_out_score {
    563563        FD_SCORE_NO_DELIVERY     = -70, /* We should not send this message to this candidate */
     564        FD_SCORE_INI             =  -2, /* All candidates are initialized with this value */
    564565        FD_SCORE_LOAD_BALANCE    =   1, /* Use this to differentiate between several peers with the same score */
    565566        FD_SCORE_DEFAULT         =   5, /* The peer is a default route for all messages */
  • include/freeDiameter/libfreeDiameter.h

    r114 r124  
    16331633
    16341634/* Extract the list of valid candidates, and initialize their scores to 0 */
    1635 void fd_rtd_candidate_extract(struct rt_data * rtd, struct fd_list ** candidates);
     1635void fd_rtd_candidate_extract(struct rt_data * rtd, struct fd_list ** candidates, int ini_score);
    16361636
    16371637/* If a peer returned a protocol error for this message, save it so that we don't try to send it there again */
     
    26062606 * PARAMETERS:
    26072607 *  queue       : The queue from which the element must be retrieved.
    2608  *  msg         : On return, the message is stored here.
     2608 *  item        : On return, the first element of the queue is stored here.
    26092609 *
    26102610 * DESCRIPTION:
  • libfreeDiameter/rt_data.c

    r84 r124  
    227227
    228228/* Extract the list of valid candidates, and initialize their scores to 0 */
    229 void fd_rtd_candidate_extract(struct rt_data * rtd, struct fd_list ** candidates)
     229void fd_rtd_candidate_extract(struct rt_data * rtd, struct fd_list ** candidates, int ini_score)
    230230{
    231231        TRACE_ENTRY("%p %p", rtd, candidates);
     
    236236       
    237237        if (rtd->extracted) {
    238                 /* Reset all scores to 0 */
     238                /* Reset all scores to INITIAL score */
    239239                struct fd_list * li;
    240240                for (li = rtd->candidates.next; li != &rtd->candidates; li = li->next) {
    241241                        struct rtd_candidate * c = (struct rtd_candidate *) li;
    242                         c->score = 0;
     242                        c->score = ini_score;
    243243                }
    244244        }
Note: See TracChangeset for help on using the changeset viewer.