Navigation


source: freeDiameter/libfdcore/routing_dispatch.c

Last change on this file was 1554:566bb46cc73f, checked in by Sebastien Decugis <sdecugis@freediameter.net>, 5 months ago

Updated copyright information

File size: 46.1 KB
Line 
1/*********************************************************************************************************
2* Software License Agreement (BSD License)                                                               *
3* Author: Sebastien Decugis <sdecugis@freediameter.net>                                                  *
4*                                                                                                        *
5* Copyright (c) 2019, WIDE Project and NICT                                                              *
6* All rights reserved.                                                                                   *
7*                                                                                                        *
8* Redistribution and use of this software in source and binary forms, with or without modification, are  *
9* permitted provided that the following conditions are met:                                              *
10*                                                                                                        *
11* * Redistributions of source code must retain the above                                                 *
12*   copyright notice, this list of conditions and the                                                    *
13*   following disclaimer.                                                                                *
14*                                                                                                        *
15* * Redistributions in binary form must reproduce the above                                              *
16*   copyright notice, this list of conditions and the                                                    *
17*   following disclaimer in the documentation and/or other                                               *
18*   materials provided with the distribution.                                                            *
19*                                                                                                        *
20* * Neither the name of the WIDE Project or NICT nor the                                                 *
21*   names of its contributors may be used to endorse or                                                  *
22*   promote products derived from this software without                                                  *
23*   specific prior written permission of WIDE Project and                                                *
24*   NICT.                                                                                                *
25*                                                                                                        *
26* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
27* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
28* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
29* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT     *
30* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS    *
31* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
32* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF   *
33* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.                                                             *
34*********************************************************************************************************/
35
36#include "fdcore-internal.h"
37
38#ifdef linux
39/* This needs -D_USE_GNU, and since I have no idea what else that does, let's simply copy the declaration. */
40
41/* Set thread name visible in the kernel and its interfaces.  */
42extern int pthread_setname_np (pthread_t __target_thread, const char *__name);
43#endif
44
45/********************************************************************************/
46/*              First part : handling the extensions callbacks                  */
47/********************************************************************************/
48
49/* Lists of the callbacks, and locks to protect them */
50static pthread_rwlock_t rt_fwd_lock = PTHREAD_RWLOCK_INITIALIZER;
51static struct fd_list   rt_fwd_list = FD_LIST_INITIALIZER_O(rt_fwd_list, &rt_fwd_lock);
52
53static pthread_rwlock_t rt_out_lock = PTHREAD_RWLOCK_INITIALIZER;
54static struct fd_list   rt_out_list = FD_LIST_INITIALIZER_O(rt_out_list, &rt_out_lock);
55
56/* Items in the lists are the same */
57struct rt_hdl {
58        struct fd_list  chain;  /* link in the rt_fwd_list or rt_out_list */
59        void *          cbdata; /* the registered data */
60        union {
61                int     order;  /* This value is used to sort the list */
62                int     dir;    /* It is the direction for FWD handlers */
63                int     prio;   /* and the priority for OUT handlers */
64        };
65        union {
66                int (*rt_fwd_cb)(void * cbdata, struct msg ** msg);
67                int (*rt_out_cb)(void * cbdata, struct msg ** msg, struct fd_list * candidates);
68        };
69};     
70
71/* Add a new entry in the list */
72static int add_ordered(struct rt_hdl * new, struct fd_list * list)
73{
74        /* The list is ordered by prio parameter */
75        struct fd_list * li;
76       
77        CHECK_POSIX( pthread_rwlock_wrlock(list->o) );
78       
79        for (li = list->next; li != list; li = li->next) {
80                struct rt_hdl * h = (struct rt_hdl *) li;
81                if (new->order <= h->order)
82                        break;
83        }
84       
85        fd_list_insert_before(li, &new->chain);
86       
87        CHECK_POSIX( pthread_rwlock_unlock(list->o) );
88       
89        return 0;
90}
91
92/* Register a new FWD callback */
93int fd_rt_fwd_register ( int (*rt_fwd_cb)(void * cbdata, struct msg ** msg), void * cbdata, enum fd_rt_fwd_dir dir, struct fd_rt_fwd_hdl ** handler )
94{
95        struct rt_hdl * new;
96       
97        TRACE_ENTRY("%p %p %d %p", rt_fwd_cb, cbdata, dir, handler);
98        CHECK_PARAMS( rt_fwd_cb );
99        CHECK_PARAMS( (dir >= RT_FWD_REQ) && ( dir <= RT_FWD_ANS) );
100       
101        /* Create a new container */
102        CHECK_MALLOC(new = malloc(sizeof(struct rt_hdl)));
103        memset(new, 0, sizeof(struct rt_hdl));
104       
105        /* Write the content */
106        fd_list_init(&new->chain, NULL);
107        new->cbdata     = cbdata;
108        new->dir        = dir;
109        new->rt_fwd_cb  = rt_fwd_cb;
110       
111        /* Save this in the list */
112        CHECK_FCT( add_ordered(new, &rt_fwd_list) );
113       
114        /* Give it back to the extension if needed */
115        if (handler)
116                *handler = (void *)new;
117       
118        return 0;
119}
120
121/* Remove it */
122int fd_rt_fwd_unregister ( struct fd_rt_fwd_hdl * handler, void ** cbdata )
123{
124        struct rt_hdl * del;
125        TRACE_ENTRY( "%p %p", handler, cbdata);
126        CHECK_PARAMS( handler );
127       
128        del = (struct rt_hdl *)handler;
129        CHECK_PARAMS( del->chain.head == &rt_fwd_list );
130       
131        /* Unlink */
132        CHECK_POSIX( pthread_rwlock_wrlock(&rt_fwd_lock) );
133        fd_list_unlink(&del->chain);
134        CHECK_POSIX( pthread_rwlock_unlock(&rt_fwd_lock) );
135       
136        if (cbdata)
137                *cbdata = del->cbdata;
138       
139        free(del);
140        return 0;
141}
142
143/* Register a new OUT callback */
144int fd_rt_out_register ( int (*rt_out_cb)(void * cbdata, struct msg ** pmsg, struct fd_list * candidates), void * cbdata, int priority, struct fd_rt_out_hdl ** handler )
145{
146        struct rt_hdl * new;
147       
148        TRACE_ENTRY("%p %p %d %p", rt_out_cb, cbdata, priority, handler);
149        CHECK_PARAMS( rt_out_cb );
150       
151        /* Create a new container */
152        CHECK_MALLOC(new = malloc(sizeof(struct rt_hdl)));
153        memset(new, 0, sizeof(struct rt_hdl));
154       
155        /* Write the content */
156        fd_list_init(&new->chain, NULL);
157        new->cbdata     = cbdata;
158        new->prio       = priority;
159        new->rt_out_cb  = rt_out_cb;
160       
161        /* Save this in the list */
162        CHECK_FCT( add_ordered(new, &rt_out_list) );
163       
164        /* Give it back to the extension if needed */
165        if (handler)
166                *handler = (void *)new;
167       
168        return 0;
169}
170
171/* Remove it */
172int fd_rt_out_unregister ( struct fd_rt_out_hdl * handler, void ** cbdata )
173{
174        struct rt_hdl * del;
175        TRACE_ENTRY( "%p %p", handler, cbdata);
176        CHECK_PARAMS( handler );
177       
178        del = (struct rt_hdl *)handler;
179        CHECK_PARAMS( del->chain.head == &rt_out_list );
180       
181        /* Unlink */
182        CHECK_POSIX( pthread_rwlock_wrlock(&rt_out_lock) );
183        fd_list_unlink(&del->chain);
184        CHECK_POSIX( pthread_rwlock_unlock(&rt_out_lock) );
185       
186        if (cbdata)
187                *cbdata = del->cbdata;
188       
189        free(del);
190        return 0;
191}
192
193/********************************************************************************/
194/*                      Some default OUT routing callbacks                      */
195/********************************************************************************/
196
197/* Prevent sending to peers that do not support the message application */
198static int dont_send_if_no_common_app(void * cbdata, struct msg ** pmsg, struct fd_list * candidates)
199{
200        struct msg * msg = *pmsg;
201        struct fd_list * li;
202        struct msg_hdr * hdr;
203       
204        TRACE_ENTRY("%p %p %p", cbdata, msg, candidates);
205        CHECK_PARAMS(msg && candidates);
206       
207        CHECK_FCT( fd_msg_hdr(msg, &hdr) );
208       
209        /* For Base Diameter Protocol, every peer is supposed to support it, so skip */
210        if (hdr->msg_appl == 0)
211                return 0;
212       
213        /* Otherwise, check that the peers support the application */
214        for (li = candidates->next; li != candidates; li = li->next) {
215                struct rtd_candidate *c = (struct rtd_candidate *) li;
216                struct fd_peer * peer;
217                struct fd_app *found;
218                CHECK_FCT( fd_peer_getbyid( c->diamid, c->diamidlen, 0, (void *)&peer ) );
219                if (peer && !peer->p_hdr.info.runtime.pir_relay) {
220                        /* Check if the remote peer advertised the message's appli */
221                        CHECK_FCT( fd_app_check(&peer->p_hdr.info.runtime.pir_apps, hdr->msg_appl, &found) );
222                        if (!found)
223                                c->score += FD_SCORE_NO_DELIVERY;
224                }
225        }
226
227        return 0;
228}
229
230/* Detect if the Destination-Host and Destination-Realm match the peer */
231static int score_destination_avp(void * cbdata, struct msg ** pmsg, struct fd_list * candidates)
232{
233        struct msg * msg = *pmsg;
234        struct fd_list * li;
235        struct avp * avp;
236        union avp_value *dh = NULL, *dr = NULL;
237       
238        TRACE_ENTRY("%p %p %p", cbdata, msg, candidates);
239        CHECK_PARAMS(msg && candidates);
240       
241        /* Search the Destination-Host and Destination-Realm AVPs -- we could also use fd_msg_search_avp here, but this one is slightly more efficient */
242        CHECK_FCT(  fd_msg_browse(msg, MSG_BRW_FIRST_CHILD, &avp, NULL) );
243        while (avp) {
244                struct avp_hdr * ahdr;
245                CHECK_FCT(  fd_msg_avp_hdr( avp, &ahdr ) );
246
247                if (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) {
248                        switch (ahdr->avp_code) {
249                                case AC_DESTINATION_HOST:
250                                        /* Parse this AVP */
251                                        CHECK_FCT( fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, NULL ) );
252                                        ASSERT( ahdr->avp_value );
253                                        dh = ahdr->avp_value;
254                                        break;
255
256                                case AC_DESTINATION_REALM:
257                                        /* Parse this AVP */
258                                        CHECK_FCT( fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, NULL ) );
259                                        ASSERT( ahdr->avp_value );
260                                        dr = ahdr->avp_value;
261                                        break;
262                        }
263                }
264
265                if (dh && dr)
266                        break;
267
268                /* Go to next AVP */
269                CHECK_FCT(  fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL) );
270        }
271       
272        /* Now, check each candidate against these AVP values */
273        for (li = candidates->next; li != candidates; li = li->next) {
274                struct rtd_candidate *c = (struct rtd_candidate *) li;
275               
276            #if 0 /* this is actually useless since the sending process will also ensure that the peer is still available */
277                struct fd_peer * peer;
278                /* Since the candidates list comes from the peers list, we do not have any issue with upper/lower case to find the peer object */
279                CHECK_FCT( fd_peer_getbyid( c->diamid, c->diamidlen, 0, (void *)&peer ) );
280                if (!peer)
281                        continue; /* it has been deleted since the candidate list was generated; avoid sending to this one in that case. */
282            #endif /* 0 */
283               
284                /* In the AVPs, the value comes from the network, so let's be case permissive */
285                if (dh && !fd_os_almostcasesrch(dh->os.data, dh->os.len, c->diamid, c->diamidlen, NULL) ) {
286                        /* The candidate is the Destination-Host */
287                        c->score += FD_SCORE_FINALDEST;
288                } else {
289                        if (dr && !fd_os_almostcasesrch(dr->os.data, dr->os.len, c->realm, c->realmlen, NULL) ) {
290                                /* The candidate's realm matchs the Destination-Realm */
291                                c->score += FD_SCORE_REALM;
292                        }
293                }
294        }
295
296        return 0;
297}
298
299/********************************************************************************/
300/*                        Helper functions                                      */
301/********************************************************************************/
302
303/* Find (first) '!' and '@' positions in a UTF-8 encoded string (User-Name AVP value) */
304static void nai_get_indexes(union avp_value * un, int * excl_idx, int * at_idx)
305{
306        int i;
307       
308        TRACE_ENTRY("%p %p %p", un, excl_idx, at_idx);
309        CHECK_PARAMS_DO( un && excl_idx && at_idx, return );
310       
311        *excl_idx = 0;
312        *at_idx = 0;
313       
314        /* Search if there is a '!' before any '@' -- do we need to check it contains a '.' ? */
315        for (i = 0; i < un->os.len; i++) {
316                /* The '!' marks the decorated NAI */
317                if ( un->os.data[i] == (unsigned char) '!' ) {
318                        if (!*excl_idx)
319                                *excl_idx = i;
320                        continue;
321                }
322                /* If we reach the realm part, we can stop */
323                if ( un->os.data[i] == (unsigned char) '@' ) {
324                        *at_idx = i;
325                        break;
326                }
327                /* Stop if we find a \0 in the middle */
328                if ( un->os.data[i] == 0 ) {
329                        return;
330                }
331                /* Skip escaped characters */
332                if ( un->os.data[i] == (unsigned char) '\\' ) {
333                        i++;
334                        continue;
335                }
336        }
337       
338        return;
339}       
340
341/* Test if a User-Name AVP contains a Decorated NAI -- RFC4282, RFC5729 */
342/* Create new User-Name and Destination-Realm values */
343static int process_decorated_NAI(int * was_nai, union avp_value * un, union avp_value * dr)
344{
345        int at_idx, sep_idx;
346        unsigned char * old_un;
347        TRACE_ENTRY("%p %p %p", was_nai, un, dr);
348        CHECK_PARAMS(was_nai && un && dr);
349       
350        /* Save the decorated User-Name, for example 'homerealm.example.net!user@otherrealm.example.net' */
351        old_un = un->os.data;
352       
353        /* Search the positions of the first '!' and the '@' in the string */
354        nai_get_indexes(un, &sep_idx, &at_idx);
355        if ((!sep_idx) || (sep_idx > at_idx) || !fd_os_is_valid_DiameterIdentity(old_un, sep_idx /* this is the new realm part */)) {
356                *was_nai = 0;
357                return 0;
358        }
359       
360        *was_nai = 1;
361       
362        /* Create the new User-Name value */
363        CHECK_MALLOC( un->os.data = malloc( at_idx ) );
364        memcpy( un->os.data, old_un + sep_idx + 1, at_idx - sep_idx ); /* user@ */
365        memcpy( un->os.data + at_idx - sep_idx, old_un, sep_idx ); /* homerealm.example.net */
366       
367        /* Create the new Destination-Realm value */
368        CHECK_MALLOC( dr->os.data = realloc(dr->os.data, sep_idx) );
369        memcpy( dr->os.data, old_un, sep_idx );
370        dr->os.len = sep_idx;
371       
372        TRACE_DEBUG(FULL, "Processed Decorated NAI : '%.*s' became '%.*s' (%.*s)",
373                                (int)un->os.len, old_un,
374                                (int)at_idx, un->os.data,
375                                (int)dr->os.len, dr->os.data);
376       
377        un->os.len = at_idx;
378        free(old_un);
379       
380        return 0;
381}
382
383
384/* Function to return an error to an incoming request */
385static int return_error(struct msg ** pmsg, char * error_code, char * error_message, struct avp * failedavp)
386{
387        struct fd_peer * peer;
388        int is_loc = 0;
389
390        /* Get the source of the message */
391        {
392                DiamId_t id;
393                size_t   idlen;
394                CHECK_FCT( fd_msg_source_get( *pmsg, &id, &idlen ) );
395               
396                if (id == NULL) {
397                        is_loc = 1; /* The message was issued locally */
398                } else {
399               
400                        /* Search the peer with this id */
401                        CHECK_FCT( fd_peer_getbyid( id, idlen, 0, (void *)&peer ) );
402
403                        if (!peer) {
404                                char buf[256];
405                                snprintf(buf, sizeof(buf), "Unable to send error '%s' to deleted peer '%s' in reply to this message.", error_code, id);
406                                fd_hook_call(HOOK_MESSAGE_DROPPED, *pmsg, NULL, buf, fd_msg_pmdl_get(*pmsg));
407                                fd_msg_free(*pmsg);
408                                *pmsg = NULL;
409                                return 0;
410                        }
411                }
412        }
413       
414        /* Create the error message */
415        CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, pmsg, MSGFL_ANSW_ERROR ) );
416
417        /* Set the error code */
418        CHECK_FCT( fd_msg_rescode_set(*pmsg, error_code, error_message, failedavp, 1 ) );
419
420        /* Send the answer */
421        if (is_loc) {
422                CHECK_FCT( fd_fifo_post(fd_g_incoming, pmsg) );
423        } else {
424                CHECK_FCT( fd_out_send(pmsg, NULL, peer, 1) );
425        }
426       
427        /* Done */
428        return 0;
429}
430
431
432/****************************************************************************/
433/*         Second part : threads moving messages in the daemon              */
434/****************************************************************************/
435
436/* The DISPATCH message processing */
437static int msg_dispatch(struct msg * msg)
438{
439        struct msg_hdr * hdr;
440        int is_req = 0;
441        struct session * sess;
442        enum disp_action action;
443        char * ec = NULL;
444        char * em = NULL;
445        struct msg *msgptr = msg, *error = NULL;
446
447        /* Read the message header */
448        CHECK_FCT( fd_msg_hdr(msg, &hdr) );
449        is_req = hdr->msg_flags & CMD_FLAG_REQUEST;
450       
451        /* Note: if the message is for local delivery, we should test for duplicate
452          (draft-asveren-dime-dupcons-00). This may conflict with path validation decisions, no clear answer yet */
453
454        /* At this point, we need to understand the message content, so parse it */
455        CHECK_FCT_DO( fd_msg_parse_or_error( &msgptr, &error ),
456                {
457                        int rescue = 0;
458                        if (__ret__ != EBADMSG) {
459                                fd_hook_call(HOOK_MESSAGE_DROPPED, msgptr, NULL, "Error while parsing received answer", fd_msg_pmdl_get(msgptr));
460                                fd_msg_free(msgptr);
461                        } else {
462                                if (!msgptr) {
463                                        fd_hook_call(HOOK_MESSAGE_PARSING_ERROR2, error, NULL, NULL, fd_msg_pmdl_get(error));
464                                        /* error now contains the answer message to send back */
465                                        CHECK_FCT( fd_fifo_post(fd_g_outgoing, &error) );
466                                } else if (!error) {
467                                        /* We have received an invalid answer to our query */
468                                        fd_hook_call(HOOK_MESSAGE_DROPPED, msgptr, NULL, "Received answer failed the dictionary / rules parsing", fd_msg_pmdl_get(msgptr));
469                                        fd_msg_free(msgptr);
470                                } else {
471                                        /* We will pass the invalid received error to the application */
472                                        rescue = 1;
473                                }
474                        }
475                        if (!rescue)
476                                return 0; /* We are done with this message, go to the next */
477                } );
478
479        /* First, if the original request was registered with a callback and we receive the answer, call it. */
480        if ( ! is_req ) {
481                struct msg * qry;
482                void (*anscb)(void *, struct msg **) = NULL;
483                void * data = NULL;
484
485                /* Retrieve the corresponding query */
486                CHECK_FCT( fd_msg_answ_getq( msgptr, &qry ) );
487
488                /* Retrieve any registered handler */
489                CHECK_FCT( fd_msg_anscb_get( qry, &anscb, NULL, &data ) );
490
491                /* If a callback was registered, pass the message to it */
492                if (anscb != NULL) {
493
494                        TRACE_DEBUG(FULL, "Calling callback registered when query was sent (%p, %p)", anscb, data);
495                        (*anscb)(data, &msgptr);
496                       
497                        /* If the message is processed, we're done */
498                        if (msgptr == NULL) {
499                                return 0;
500                        }
501                       
502                        /* otherwise continue the dispatching --hoping that the anscb callback did not mess with our message :) */
503                }
504        }
505       
506        /* Retrieve the session of the message */
507        CHECK_FCT( fd_msg_sess_get(fd_g_config->cnf_dict, msgptr, &sess, NULL) );
508
509        /* Now, call any callback registered for the message */
510        CHECK_FCT( fd_msg_dispatch ( &msgptr, sess, &action, &ec, &em, &error) );
511
512        /* Now, act depending on msg and action and ec */
513        if (msgptr) {
514                switch ( action ) {
515                        case DISP_ACT_CONT:
516                                /* No callback has handled the message, let's reply with a generic error or relay it */
517                                if (!fd_g_config->cnf_flags.no_fwd) {
518                                        /* requeue to fd_g_outgoing */
519                                        fd_hook_call(HOOK_MESSAGE_ROUTING_FORWARD, msgptr, NULL, NULL, fd_msg_pmdl_get(msgptr));
520                                        CHECK_FCT( fd_fifo_post(fd_g_outgoing, &msgptr) );
521                                        break;
522                                }
523                                /* We don't relay => reply error */
524                                em = "The message was not handled by any extension callback";
525                                ec = "DIAMETER_COMMAND_UNSUPPORTED";
526                                /* and continue as if an error occurred... */
527                        case DISP_ACT_ERROR:
528                                /* We have a problem with delivering the message */
529                                if (ec == NULL) {
530                                        ec = "DIAMETER_UNABLE_TO_COMPLY";
531                                }
532                               
533                                if (!is_req) {
534                                        fd_hook_call(HOOK_MESSAGE_DROPPED, msgptr, NULL, "Internal error: Answer received to locally issued request, but not handled by any handler.", fd_msg_pmdl_get(msgptr));
535                                        fd_msg_free(msgptr);
536                                        break;
537                                }
538                               
539                                /* Create an answer with the error code and message */
540                                CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, &msgptr, 0 ) );
541                                CHECK_FCT( fd_msg_rescode_set(msgptr, ec, em, NULL, 1 ) );
542                               
543                        case DISP_ACT_SEND:
544                                /* Now, send the message */
545                                CHECK_FCT( fd_fifo_post(fd_g_outgoing, &msgptr) );
546                }
547        } else if (em) {
548                fd_hook_call(HOOK_MESSAGE_DROPPED, error, NULL, em, fd_msg_pmdl_get(error));
549                fd_msg_free(error);
550        }
551       
552        /* We're done with dispatching this message */
553        return 0;
554}
555
556/* The ROUTING-IN message processing */
557static int msg_rt_in(struct msg * msg)
558{
559        struct msg_hdr * hdr;
560        int is_req = 0;
561        int is_err = 0;
562        DiamId_t qry_src = NULL;
563        struct msg *msgptr = msg;
564
565        /* Read the message header */
566        CHECK_FCT( fd_msg_hdr(msg, &hdr) );
567        is_req = hdr->msg_flags & CMD_FLAG_REQUEST;
568        is_err = hdr->msg_flags & CMD_FLAG_ERROR;
569
570        /* Handle incorrect bits */
571        if (is_req && is_err) {
572                fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "R & E bits were set", fd_msg_pmdl_get(msgptr));
573                CHECK_FCT( return_error( &msgptr, "DIAMETER_INVALID_HDR_BITS", "R & E bits were set", NULL) );
574                return 0;
575        }
576       
577        /* If it is a request, we must analyze its content to decide what we do with it */
578        if (is_req) {
579                struct avp * avp, *un = NULL;
580                union avp_value * un_val = NULL, *dr_val = NULL;
581                enum status { UNKNOWN, YES, NO };
582                /* Are we Destination-Host? */
583                enum status is_dest_host = UNKNOWN;
584                /* Are we Destination-Realm? */
585                enum status is_dest_realm = UNKNOWN;
586                /* Do we support the application of the message? */
587                enum status is_local_app = UNKNOWN;
588
589                /* Check if we have local support for the message application */
590                if ( (hdr->msg_appl == 0) || (hdr->msg_appl == AI_RELAY) ) {
591                        fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Received a routable message with application id 0 or " _stringize(AI_RELAY) " (relay)", fd_msg_pmdl_get(msgptr));
592                        CHECK_FCT( return_error( &msgptr, "DIAMETER_APPLICATION_UNSUPPORTED", "Routable message with application id 0 or relay", NULL) );
593                        return 0;
594                } else {
595                        struct fd_app * app;
596                        CHECK_FCT( fd_app_check(&fd_g_config->cnf_apps, hdr->msg_appl, &app) );
597                        is_local_app = (app ? YES : NO);
598                }
599
600                /* Parse the message for Dest-Host, Dest-Realm, and Route-Record */
601                CHECK_FCT(  fd_msg_browse(msgptr, MSG_BRW_FIRST_CHILD, &avp, NULL)  );
602                while (avp) {
603                        struct avp_hdr * ahdr;
604                        struct fd_pei error_info;
605                        int ret;
606                       
607                        memset(&error_info, 0, sizeof(struct fd_pei)); 
608                       
609                        CHECK_FCT(  fd_msg_avp_hdr( avp, &ahdr )  );
610
611                        if (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) {
612                                switch (ahdr->avp_code) {
613                                        case AC_DESTINATION_HOST:
614                                                /* Parse this AVP */
615                                                CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ),
616                                                        {
617                                                                if (error_info.pei_errcode) {
618                                                                        fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, error_info.pei_message ?: error_info.pei_errcode, fd_msg_pmdl_get(msgptr));
619                                                                        CHECK_FCT( return_error( &msgptr, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) );
620                                                                        if (error_info.pei_avp_free) { fd_msg_free(error_info.pei_avp); }
621                                                                        return 0;
622                                                                } else {
623                                                                        fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Unspecified error while parsing Destination-Host AVP", fd_msg_pmdl_get(msgptr));
624                                                                        return ret;
625                                                                }
626                                                        } );
627                                                ASSERT( ahdr->avp_value );
628                                                /* Compare the Destination-Host AVP of the message with our identity */
629                                                if (!fd_os_almostcasesrch(ahdr->avp_value->os.data, ahdr->avp_value->os.len, fd_g_config->cnf_diamid, fd_g_config->cnf_diamid_len, NULL)) {
630                                                        is_dest_host = YES;
631                                                } else {
632                                                        is_dest_host = NO;
633                                                }
634                                                break;
635
636                                        case AC_DESTINATION_REALM:
637                                                /* Parse this AVP */
638                                                CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ),
639                                                        {
640                                                                if (error_info.pei_errcode) {
641                                                                        fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, error_info.pei_message ?: error_info.pei_errcode, fd_msg_pmdl_get(msgptr));
642                                                                        CHECK_FCT( return_error( &msgptr, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) );
643                                                                        if (error_info.pei_avp_free) { fd_msg_free(error_info.pei_avp); }
644                                                                        return 0;
645                                                                } else {
646                                                                        fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Unspecified error while parsing Destination-Realm AVP", fd_msg_pmdl_get(msgptr));
647                                                                        return ret;
648                                                                }
649                                                        } );
650                                                ASSERT( ahdr->avp_value );
651                                                dr_val = ahdr->avp_value;
652                                                /* Compare the Destination-Realm AVP of the message with our identity */
653                                                if (!fd_os_almostcasesrch(dr_val->os.data, dr_val->os.len, fd_g_config->cnf_diamrlm, fd_g_config->cnf_diamrlm_len, NULL)) {
654                                                        is_dest_realm = YES;
655                                                } else {
656                                                        is_dest_realm = NO;
657                                                }
658                                                break;
659
660                                        /* we also use User-Name for decorated NAI */
661                                        case AC_USER_NAME:
662                                                /* Parse this AVP */
663                                                CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ),
664                                                        {
665                                                                if (error_info.pei_errcode) {
666                                                                        fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, error_info.pei_message ?: error_info.pei_errcode, fd_msg_pmdl_get(msgptr));
667                                                                        CHECK_FCT( return_error( &msgptr, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) );
668                                                                        if (error_info.pei_avp_free) { fd_msg_free(error_info.pei_avp); }
669                                                                        return 0;
670                                                                } else {
671                                                                        fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Unspecified error while parsing User-Name AVP", fd_msg_pmdl_get(msgptr));
672                                                                        return ret;
673                                                                }
674                                                        } );
675                                                ASSERT( ahdr->avp_value );
676                                                un = avp;
677                                                un_val = ahdr->avp_value;
678                                                break;
679                                               
680                                        case AC_ROUTE_RECORD:
681                                                /* Parse this AVP */
682                                                CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ),
683                                                        {
684                                                                if (error_info.pei_errcode) {
685                                                                        fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, error_info.pei_message ?: error_info.pei_errcode, fd_msg_pmdl_get(msgptr));
686                                                                        CHECK_FCT( return_error( &msgptr, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) );
687                                                                        if (error_info.pei_avp_free) { fd_msg_free(error_info.pei_avp); }
688                                                                        return 0;
689                                                                } else {
690                                                                        fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Unspecified error while parsing Route-Record AVP", fd_msg_pmdl_get(msgptr));
691                                                                        return ret;
692                                                                }
693                                                        } );
694                                                ASSERT( ahdr->avp_value );
695                                                /* Is this our own name ? */
696                                                if (!fd_os_almostcasesrch(ahdr->avp_value->os.data, ahdr->avp_value->os.len, fd_g_config->cnf_diamid, fd_g_config->cnf_diamid_len, NULL)) {
697                                                        /* Yes: then we must return DIAMETER_LOOP_DETECTED according to Diameter RFC */
698                                                        char * error = "DIAMETER_LOOP_DETECTED";
699                                                        fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, error, fd_msg_pmdl_get(msgptr));
700                                                        CHECK_FCT( return_error( &msgptr, error, NULL, NULL) );
701                                                        return 0;
702                                                }
703                                                break;
704                                               
705                                       
706                                }
707                        }
708
709                        /* Stop when we found all 3 AVPs -- they are supposed to be at the beginning of the message, so this should be fast */
710                        if ((is_dest_host != UNKNOWN) && (is_dest_realm != UNKNOWN) && un)
711                                break;
712
713                        /* Go to next AVP */
714                        CHECK_FCT(  fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL)  );
715                }
716
717                /* OK, now decide what we do with the request */
718
719                /* Handle the missing routing AVPs first */
720                if ( is_dest_realm == UNKNOWN ) {
721                        fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Non-routable message not supported (invalid bit ? missing Destination-Realm ?)", fd_msg_pmdl_get(msgptr));
722                        CHECK_FCT( return_error( &msgptr, "DIAMETER_COMMAND_UNSUPPORTED", "Non-routable message not supported (invalid bit ? missing Destination-Realm ?)", NULL) );
723                        return 0;
724                }
725
726                /* If we are listed as Destination-Host */
727                if (is_dest_host == YES) {
728                        if (is_local_app == YES) {
729                                /* Ok, give the message to the dispatch thread */
730                                fd_hook_call(HOOK_MESSAGE_ROUTING_LOCAL, msgptr, NULL, NULL, fd_msg_pmdl_get(msgptr));
731                                CHECK_FCT( fd_fifo_post(fd_g_local, &msgptr) );
732                        } else {
733                                /* We don't support the application, reply an error */
734                                fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Application unsupported", fd_msg_pmdl_get(msgptr));
735                                CHECK_FCT( return_error( &msgptr, "DIAMETER_APPLICATION_UNSUPPORTED", NULL, NULL) );
736                        }
737                        return 0;
738                }
739
740                /* If the message is explicitely for someone else */
741                if ((is_dest_host == NO) || (is_dest_realm == NO)) {
742                        if (fd_g_config->cnf_flags.no_fwd) {
743                                fd_hook_call(HOOK_MESSAGE_ROUTING_ERROR, msgptr, NULL, "Message for another realm/host", fd_msg_pmdl_get(msgptr));
744                                CHECK_FCT( return_error( &msgptr, "DIAMETER_UNABLE_TO_DELIVER", "I am not a Diameter agent", NULL) );
745                                return 0;
746                        }
747                } else {
748                /* Destination-Host was not set, and Destination-Realm is matching : we may handle or pass to a fellow peer */
749                        int is_nai = 0;
750
751                        /* test for decorated NAI  (RFC5729 section 4.4) */
752                        /* Handle the decorated NAI */
753                        if (un_val) {
754                                CHECK_FCT_DO( process_decorated_NAI(&is_nai, un_val, dr_val),
755                                        {
756                                                /* If the process failed, we assume it is because of the AVP format */
757                                                fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Failed to process decorated NAI", fd_msg_pmdl_get(msgptr));
758                                                CHECK_FCT( return_error( &msgptr, "DIAMETER_INVALID_AVP_VALUE", "Failed to process decorated NAI", un) );
759                                                return 0;
760                                        } );
761                        }
762                               
763                        if (is_nai) {
764                                /* We have transformed the AVP, now submit it again in the queue */
765                                CHECK_FCT(fd_fifo_post(fd_g_incoming, &msgptr) );
766                                return 0;
767                        }
768
769                        if (is_local_app == YES) {
770                                /* Handle localy since we are able to */
771                                fd_hook_call(HOOK_MESSAGE_ROUTING_LOCAL, msgptr, NULL, NULL, fd_msg_pmdl_get(msgptr));
772                                CHECK_FCT(fd_fifo_post(fd_g_local, &msgptr) );
773                                return 0;
774                        }
775
776                        if (fd_g_config->cnf_flags.no_fwd) {
777                                /* We return an error */
778                                fd_hook_call(HOOK_MESSAGE_ROUTING_ERROR, msgptr, NULL, "Application unsupported", fd_msg_pmdl_get(msgptr));
779                                CHECK_FCT( return_error( &msgptr, "DIAMETER_APPLICATION_UNSUPPORTED", NULL, NULL) );
780                                return 0;
781                        }
782                }
783
784                /* From that point, for requests, we will call the registered callbacks, then forward to another peer */
785
786        } else {
787                /* The message is an answer */
788                struct msg * qry;
789
790                /* Retrieve the corresponding query and its origin */
791                CHECK_FCT( fd_msg_answ_getq( msgptr, &qry ) );
792                CHECK_FCT( fd_msg_source_get( qry, &qry_src, NULL ) );
793
794                if ((!qry_src) && (!is_err)) {
795                        /* The message is a normal answer to a request issued localy, we do not call the callbacks chain on it. */
796                        fd_hook_call(HOOK_MESSAGE_ROUTING_LOCAL, msgptr, NULL, NULL, fd_msg_pmdl_get(msgptr));
797                        CHECK_FCT(fd_fifo_post(fd_g_local, &msgptr) );
798                        return 0;
799                }
800               
801                /* From that point, for answers, we will call the registered callbacks, then pass it to the dispatch module or forward it */
802        }
803
804        /* Call all registered callbacks for this message */
805        {
806                struct fd_list * li;
807
808                CHECK_FCT( pthread_rwlock_rdlock( &rt_fwd_lock ) );
809                pthread_cleanup_push( fd_cleanup_rwlock, &rt_fwd_lock );
810
811                /* requests: dir = 1 & 2 => in order; answers = 3 & 2 => in reverse order */
812                for (   li = (is_req ? rt_fwd_list.next : rt_fwd_list.prev) ; msgptr && (li != &rt_fwd_list) ; li = (is_req ? li->next : li->prev) ) {
813                        struct rt_hdl * rh = (struct rt_hdl *)li;
814                        int ret;
815
816                        if (is_req && (rh->dir > RT_FWD_ALL))
817                                break;
818                        if ((!is_req) && (rh->dir < RT_FWD_ALL))
819                                break;
820
821                        /* Ok, call this cb */
822                        TRACE_DEBUG(ANNOYING, "Calling next FWD callback on %p : %p", msgptr, rh->rt_fwd_cb);
823                        CHECK_FCT_DO( ret = (*rh->rt_fwd_cb)(rh->cbdata, &msgptr),
824                                {
825                                        char buf[256];
826                                        snprintf(buf, sizeof(buf), "A FWD routing callback returned an error: %s", strerror(ret));
827                                        fd_hook_call(HOOK_MESSAGE_ROUTING_ERROR, msgptr, NULL, buf, fd_msg_pmdl_get(msgptr));
828                                        fd_hook_call(HOOK_MESSAGE_DROPPED, msgptr, NULL, buf, fd_msg_pmdl_get(msgptr));
829                                        fd_msg_free(msgptr);
830                                        msgptr = NULL;
831                                        break;
832                                } );
833                }
834
835                pthread_cleanup_pop(0);
836                CHECK_FCT( pthread_rwlock_unlock( &rt_fwd_lock ) );
837
838                /* If a callback has handled the message, we stop now */
839                if (!msgptr)
840                        return 0;
841        }
842
843        /* Now pass the message to the next step: either forward to another peer, or dispatch to local extensions */
844        if (is_req || qry_src) {
845                fd_hook_call(HOOK_MESSAGE_ROUTING_FORWARD, msgptr, NULL, NULL, fd_msg_pmdl_get(msgptr));
846                CHECK_FCT(fd_fifo_post(fd_g_outgoing, &msgptr) );
847        } else {
848                fd_hook_call(HOOK_MESSAGE_ROUTING_LOCAL, msgptr, NULL, NULL, fd_msg_pmdl_get(msgptr));
849                CHECK_FCT(fd_fifo_post(fd_g_local, &msgptr) );
850        }
851
852        /* We're done with this message */
853        return 0;
854}
855               
856
857/* The ROUTING-OUT message processing */
858static int msg_rt_out(struct msg * msg)
859{
860        struct rt_data * rtd = NULL;
861        struct msg_hdr * hdr;
862        int is_req = 0;
863        int ret;
864        struct fd_list * li, *candidates;
865        struct avp * avp;
866        struct rtd_candidate * c;
867        struct msg *msgptr = msg;
868        DiamId_t qry_src = NULL;
869        size_t qry_src_len = 0;
870       
871        /* Read the message header */
872        CHECK_FCT( fd_msg_hdr(msgptr, &hdr) );
873        is_req = hdr->msg_flags & CMD_FLAG_REQUEST;
874       
875        /* For answers, the routing is very easy */
876        if ( ! is_req ) {
877                struct msg * qry;
878                struct msg_hdr * qry_hdr;
879                struct fd_peer * peer = NULL;
880
881                /* Retrieve the corresponding query and its origin */
882                CHECK_FCT( fd_msg_answ_getq( msgptr, &qry ) );
883                CHECK_FCT( fd_msg_source_get( qry, &qry_src, &qry_src_len ) );
884
885                ASSERT( qry_src ); /* if it is NULL, the message should have been in the LOCAL queue! */
886
887                /* Find the peer corresponding to this name */
888                CHECK_FCT( fd_peer_getbyid( qry_src, qry_src_len, 0, (void *) &peer ) );
889                if (fd_peer_getstate(peer) != STATE_OPEN && fd_peer_getstate(peer) != STATE_CLOSING_GRACE) {
890                        char buf[128];
891                        snprintf(buf, sizeof(buf), "Unable to forward answer to deleted / closed peer '%s'.", qry_src);
892                        fd_hook_call(HOOK_MESSAGE_ROUTING_ERROR, msgptr, NULL, buf, fd_msg_pmdl_get(msgptr));
893                        fd_hook_call(HOOK_MESSAGE_DROPPED, msgptr, NULL, buf, fd_msg_pmdl_get(msgptr));
894                        fd_msg_free(msgptr);
895                        return 0;
896                }
897
898                /* We must restore the hop-by-hop id */
899                CHECK_FCT( fd_msg_hdr(qry, &qry_hdr) );
900                hdr->msg_hbhid = qry_hdr->msg_hbhid;
901
902                /* Push the message into this peer */
903                CHECK_FCT( fd_out_send(&msgptr, NULL, peer, 1) );
904
905                /* We're done with this answer */
906                return 0;
907        }
908       
909        /* From that point, the message is a request */
910        CHECK_FCT( fd_msg_source_get( msgptr, &qry_src, &qry_src_len ) );
911        /* if qry_src != NULL, this message is relayed, otherwise it is locally issued */
912
913        /* Get the routing data out of the message if any (in case of re-transmit) */
914        CHECK_FCT( fd_msg_rt_get ( msgptr, &rtd ) );
915
916        /* If there is no routing data already, let's create it */
917        if (rtd == NULL) {
918                CHECK_FCT( fd_rtd_init(&rtd) );
919
920                /* Add all peers currently in OPEN state */
921                CHECK_FCT( pthread_rwlock_rdlock(&fd_g_activ_peers_rw) );
922                for (li = fd_g_activ_peers.next; li != &fd_g_activ_peers; li = li->next) {
923                        struct fd_peer * p = (struct fd_peer *)li->o;
924                        CHECK_FCT_DO( ret = fd_rtd_candidate_add(rtd, 
925                                                        p->p_hdr.info.pi_diamid, 
926                                                        p->p_hdr.info.pi_diamidlen, 
927                                                        p->p_hdr.info.runtime.pir_realm,
928                                                        p->p_hdr.info.runtime.pir_realmlen), 
929                                { CHECK_FCT_DO( pthread_rwlock_unlock(&fd_g_activ_peers_rw), ); return ret; } );
930                }
931                CHECK_FCT( pthread_rwlock_unlock(&fd_g_activ_peers_rw) );
932
933                /* Now let's remove all peers from the Route-Records */
934                CHECK_FCT(  fd_msg_browse(msgptr, MSG_BRW_FIRST_CHILD, &avp, NULL)  );
935                while (avp) {
936                        struct avp_hdr * ahdr;
937                        struct fd_pei error_info;
938                        CHECK_FCT(  fd_msg_avp_hdr( avp, &ahdr )  );
939
940                        if ((ahdr->avp_code == AC_ROUTE_RECORD) && (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) ) {
941                                /* Parse this AVP */
942                                CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ),
943                                        {
944                                                if (error_info.pei_errcode) {
945                                                        CHECK_FCT( return_error( &msgptr, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) );
946                                                        if (error_info.pei_avp_free) { fd_msg_free(error_info.pei_avp); }
947                                                        return 0;
948                                                } else {
949                                                        return ret;
950                                                }
951                                        } );
952                                ASSERT( ahdr->avp_value );
953                                /* Remove this value from the list. We don't need to pay special attention to the contents here. */
954                                fd_rtd_candidate_del(rtd, ahdr->avp_value->os.data, ahdr->avp_value->os.len);
955                        }
956
957                        /* Go to next AVP */
958                        CHECK_FCT(  fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL)  );
959                }
960               
961                /* Save the routing information in the message */
962                CHECK_FCT( fd_msg_rt_associate ( msgptr, rtd ) );
963        }
964
965        /* Note: we reset the scores and pass the message to the callbacks, maybe we could re-use the saved scores when we have received an error ? -- TODO */
966
967        /* Ok, we have our list in rtd now, let's (re)initialize the scores */
968        fd_rtd_candidate_extract(rtd, &candidates, FD_SCORE_INI);
969
970        /* Pass the list to registered callbacks (even if it is empty list) */
971        {
972                CHECK_FCT( pthread_rwlock_rdlock( &rt_out_lock ) );
973                pthread_cleanup_push( fd_cleanup_rwlock, &rt_out_lock );
974
975                /* We call the cb by reverse priority order */
976                for (   li = rt_out_list.prev ; (msgptr != NULL) && (li != &rt_out_list) ; li = li->prev ) {
977                        struct rt_hdl * rh = (struct rt_hdl *)li;
978
979                        TRACE_DEBUG(ANNOYING, "Calling next OUT callback on %p : %p (prio %d)", msgptr, rh->rt_out_cb, rh->prio);
980                        CHECK_FCT_DO( ret = (*rh->rt_out_cb)(rh->cbdata, &msgptr, candidates),
981                                {
982                                        char buf[256];
983                                        snprintf(buf, sizeof(buf), "An OUT routing callback returned an error: %s", strerror(ret));
984                                        fd_hook_call(HOOK_MESSAGE_ROUTING_ERROR, msgptr, NULL, buf, fd_msg_pmdl_get(msgptr));
985                                        fd_hook_call(HOOK_MESSAGE_DROPPED, msgptr, NULL, buf, fd_msg_pmdl_get(msgptr));
986                                        fd_msg_free(msgptr);
987                                        msgptr = NULL;
988                                } );
989                }
990
991                pthread_cleanup_pop(0);
992                CHECK_FCT( pthread_rwlock_unlock( &rt_out_lock ) );
993
994                /* If an error occurred or the callback disposed of the message, go to next message */
995                if (! msgptr) {
996                        return 0;
997                }
998        }
999       
1000        /* Order the candidate peers by score attributed by the callbacks */
1001        CHECK_FCT( fd_rtd_candidate_reorder(candidates) );
1002
1003        /* Now try sending the message */
1004        for (li = candidates->prev; li != candidates; li = li->prev) {
1005                struct fd_peer * peer;
1006
1007                c = (struct rtd_candidate *) li;
1008
1009                /* Stop when we have reached the end of valid candidates */
1010                if (c->score < 0)
1011                        break;
1012
1013                /* Search for the peer */
1014                CHECK_FCT( fd_peer_getbyid( c->diamid, c->diamidlen, 0, (void *)&peer ) );
1015
1016                if (fd_peer_getstate(peer) == STATE_OPEN) {
1017                        /* Send to this one */
1018                        CHECK_FCT_DO( fd_out_send(&msgptr, NULL, peer, 1), continue );
1019                       
1020                        /* If the sending was successful */
1021                        break;
1022                }
1023        }
1024
1025        /* If the message has not been sent, return an error */
1026        if (msgptr) {
1027                fd_hook_call(HOOK_MESSAGE_ROUTING_ERROR, msgptr, NULL, "No remaining suitable candidate to route the message to", fd_msg_pmdl_get(msgptr));
1028                return_error( &msgptr, "DIAMETER_UNABLE_TO_DELIVER", "No suitable candidate to route the message to", NULL);
1029        }
1030
1031        /* We're done with this message */
1032       
1033        return 0;
1034}
1035
1036
1037/********************************************************************************/
1038/*                     Management of the threads                                */
1039/********************************************************************************/
1040
1041/* Note: in the first version, we only create one thread of each kind.
1042 We could improve the scalability by using the threshold feature of the queues
1043 to create additional threads if a queue is filling up, or at least giving a configurable
1044 number of threads of each kind.
1045 */
1046
1047/* Control of the threads */
1048static enum { RUN = 0, STOP = 1 } order_val = RUN;
1049static pthread_mutex_t order_state_lock = PTHREAD_MUTEX_INITIALIZER;
1050
1051/* Threads report their status */
1052enum thread_state { NOTRUNNING = 0, RUNNING = 1 };
1053static void cleanup_state(void * state_loc)
1054{
1055        CHECK_POSIX_DO( pthread_mutex_lock(&order_state_lock), );
1056        *(enum thread_state *)state_loc = NOTRUNNING;
1057        CHECK_POSIX_DO( pthread_mutex_unlock(&order_state_lock), );
1058}
1059
1060/* This is the common thread code (same for routing and dispatching) */
1061static void * process_thr(void * arg, int (*action_cb)(struct msg * msg), struct fifo * queue, char * action_name)
1062{
1063        TRACE_ENTRY("%p %p %p %p", arg, action_cb, queue, action_name);
1064       
1065        /* Set the thread name */
1066        {
1067                char buf[48];
1068                snprintf(buf, sizeof(buf), "%s (%p)", action_name, arg);
1069                fd_log_threadname ( buf );
1070        }
1071       
1072        /* The thread reports its status when canceled */
1073        CHECK_PARAMS_DO(arg, return NULL);
1074        pthread_cleanup_push( cleanup_state, arg );
1075       
1076        /* Mark the thread running */
1077        CHECK_POSIX_DO( pthread_mutex_lock(&order_state_lock), );
1078        *(enum thread_state *)arg = RUNNING;
1079        CHECK_POSIX_DO( pthread_mutex_unlock(&order_state_lock), );
1080       
1081        do {
1082                struct msg * msg;
1083       
1084                /* Get the next message from the queue */
1085                {
1086                        int ret;
1087                        struct timespec ts;
1088                       
1089                        CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &ts), goto fatal_error );
1090                        ts.tv_sec += 1;
1091                       
1092                        ret = fd_fifo_timedget ( queue, &msg, &ts );
1093                        if (ret == ETIMEDOUT) {
1094                                /* Test the current order */
1095                                {
1096                                        int must_stop;
1097                                        CHECK_POSIX_DO( pthread_mutex_lock(&order_state_lock), { ASSERT(0); } ); /* we lock to flush the caches */
1098                                        must_stop = (order_val == STOP);
1099                                        CHECK_POSIX_DO( pthread_mutex_unlock(&order_state_lock), { ASSERT(0); } );
1100                                        if (must_stop)
1101                                                goto end;
1102
1103                                        pthread_testcancel();
1104                                }
1105                                /* Ok, we are allowed to continue */
1106                                continue;
1107                        }
1108                        if (ret == EPIPE)
1109                                /* The queue was destroyed, we are probably exiting */
1110                                goto end;
1111                       
1112                        /* check if another error occurred */
1113                        CHECK_FCT_DO( ret, goto fatal_error );
1114                }
1115               
1116                LOG_A("%s: Picked next message", action_name);
1117
1118                /* Now process the message */
1119                CHECK_FCT_DO( (*action_cb)(msg), goto fatal_error);
1120
1121                /* We're done with this message */
1122       
1123        } while (1);
1124       
1125fatal_error:
1126        TRACE_DEBUG(INFO, "An unrecoverable error occurred, %s thread is terminating...", action_name);
1127        CHECK_FCT_DO(fd_core_shutdown(), );
1128       
1129end:   
1130        ; /* noop so that we get rid of "label at end of compund statement" warning */
1131        /* Mark the thread as terminated */
1132        pthread_cleanup_pop(1);
1133        return NULL;
1134}
1135
1136/* The dispatch thread */
1137static void * dispatch_thr(void * arg)
1138{
1139        return process_thr(arg, msg_dispatch, fd_g_local, "Dispatch");
1140}
1141
1142/* The (routing-in) thread -- see description in freeDiameter.h */
1143static void * routing_in_thr(void * arg)
1144{
1145        return process_thr(arg, msg_rt_in, fd_g_incoming, "Routing-IN");
1146}
1147
1148/* The (routing-out) thread -- see description in freeDiameter.h */
1149static void * routing_out_thr(void * arg)
1150{
1151        return process_thr(arg, msg_rt_out, fd_g_outgoing, "Routing-OUT");
1152}
1153
1154
1155/********************************************************************************/
1156/*                     The functions for the other files                        */
1157/********************************************************************************/
1158
1159static pthread_t * dispatch = NULL;
1160static enum thread_state * disp_state = NULL;
1161
1162static pthread_t * rt_out = NULL;
1163static enum thread_state * out_state = NULL;
1164
1165static pthread_t * rt_in  = NULL;
1166static enum thread_state * in_state = NULL;
1167
1168/* Initialize the routing and dispatch threads */
1169int fd_rtdisp_init(void)
1170{
1171        int i;
1172       
1173        /* Prepare the array for threads */
1174        CHECK_MALLOC( disp_state = calloc(fd_g_config->cnf_dispthr, sizeof(enum thread_state)) );
1175        CHECK_MALLOC( dispatch = calloc(fd_g_config->cnf_dispthr, sizeof(pthread_t)) );
1176        CHECK_MALLOC( out_state = calloc(fd_g_config->cnf_rtoutthr, sizeof(enum thread_state)) );
1177        CHECK_MALLOC( rt_out = calloc(fd_g_config->cnf_rtoutthr, sizeof(pthread_t)) );
1178        CHECK_MALLOC( in_state = calloc(fd_g_config->cnf_rtinthr, sizeof(enum thread_state)) );
1179        CHECK_MALLOC( rt_in = calloc(fd_g_config->cnf_rtinthr, sizeof(pthread_t)) );
1180       
1181        /* Create the threads */
1182        for (i=0; i < fd_g_config->cnf_dispthr; i++) {
1183                CHECK_POSIX( pthread_create( &dispatch[i], NULL, dispatch_thr, &disp_state[i] ) );
1184#ifdef linux
1185                pthread_setname_np(dispatch[i], "fd-dispatch");
1186#endif
1187        }
1188        for (i=0; i < fd_g_config->cnf_rtoutthr; i++) {
1189                CHECK_POSIX( pthread_create( &rt_out[i], NULL, routing_out_thr, &out_state[i] ) );
1190#ifdef linux
1191                pthread_setname_np(rt_out[i], "fd-routing-out");
1192#endif
1193        }
1194        for (i=0; i < fd_g_config->cnf_rtinthr; i++) {
1195                CHECK_POSIX( pthread_create( &rt_in[i], NULL, routing_in_thr, &in_state[i] ) );
1196#ifdef linux
1197                pthread_setname_np(rt_in[i], "fd-routing-in");
1198#endif
1199        }
1200       
1201        /* Later: TODO("Set the thresholds for the queues to create more threads as needed"); */
1202       
1203        /* Register the built-in callbacks */
1204        CHECK_FCT( fd_rt_out_register( dont_send_if_no_common_app, NULL, 10, NULL ) );
1205        CHECK_FCT( fd_rt_out_register( score_destination_avp, NULL, 10, NULL ) );
1206       
1207        return 0;
1208}
1209
1210/* Ask the thread to terminate after next iteration */
1211int fd_rtdisp_cleanstop(void)
1212{
1213        CHECK_POSIX_DO( pthread_mutex_lock(&order_state_lock), );
1214        order_val = STOP;
1215        CHECK_POSIX_DO( pthread_mutex_unlock(&order_state_lock), );
1216
1217        return 0;
1218}
1219
1220static void stop_thread_delayed(enum thread_state *st, pthread_t * thr, char * th_name)
1221{
1222        TRACE_ENTRY("%p %p", st, thr);
1223        CHECK_PARAMS_DO(st && thr, return);
1224        int terminated;
1225       
1226        CHECK_POSIX_DO( pthread_mutex_lock(&order_state_lock), );
1227        terminated = (*st == NOTRUNNING);
1228        CHECK_POSIX_DO( pthread_mutex_unlock(&order_state_lock), );
1229       
1230
1231        /* Wait for a second for the thread to complete, by monitoring my_state */
1232        if (!terminated) {
1233                TRACE_DEBUG(INFO, "Waiting for the %s thread to have a chance to terminate", th_name);
1234                do {
1235                        struct timespec  ts, ts_final;
1236
1237                        CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &ts), break );
1238                       
1239                        ts_final.tv_sec = ts.tv_sec + 1;
1240                        ts_final.tv_nsec = ts.tv_nsec;
1241                       
1242                        while (TS_IS_INFERIOR( &ts, &ts_final )) {
1243                       
1244                                CHECK_POSIX_DO( pthread_mutex_lock(&order_state_lock), );
1245                                terminated = (*st == NOTRUNNING);
1246                                CHECK_POSIX_DO( pthread_mutex_unlock(&order_state_lock), );
1247                                if (terminated)
1248                                        break;
1249                               
1250                                usleep(100000);
1251                                CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &ts), break );
1252                        }
1253                } while (0);
1254        }
1255
1256        /* Now stop the thread and reclaim its resources */
1257        CHECK_FCT_DO( fd_thr_term(thr ), /* continue */);
1258       
1259}
1260
1261/* Stop the thread after up to one second of wait */
1262int fd_rtdisp_fini(void)
1263{
1264        int i;
1265       
1266        /* Destroy the incoming queue */
1267        CHECK_FCT_DO( fd_queues_fini(&fd_g_incoming), /* ignore */);
1268       
1269        /* Stop the routing IN thread */
1270        if (rt_in != NULL) {
1271                for (i=0; i < fd_g_config->cnf_rtinthr; i++) {
1272                        stop_thread_delayed(&in_state[i], &rt_in[i], "IN routing");
1273                }
1274                free(rt_in);
1275                rt_in = NULL;
1276        }
1277        if (in_state != NULL) {
1278                free(in_state);
1279                in_state = NULL;
1280        }
1281       
1282        /* Destroy the outgoing queue */
1283        CHECK_FCT_DO( fd_queues_fini(&fd_g_outgoing), /* ignore */);
1284       
1285        /* Stop the routing OUT thread */
1286        if (rt_out != NULL) {
1287                for (i=0; i < fd_g_config->cnf_rtinthr; i++) {
1288                        stop_thread_delayed(&out_state[i], &rt_out[i], "OUT routing");
1289                }
1290                free(rt_out);
1291                rt_out = NULL;
1292        }
1293        if (out_state != NULL) {
1294                free(out_state);
1295                out_state = NULL;
1296        }
1297       
1298        /* Destroy the local queue */
1299        CHECK_FCT_DO( fd_queues_fini(&fd_g_local), /* ignore */);
1300       
1301        /* Stop the Dispatch threads */
1302        if (dispatch != NULL) {
1303                for (i=0; i < fd_g_config->cnf_dispthr; i++) {
1304                        stop_thread_delayed(&disp_state[i], &dispatch[i], "Dispatching");
1305                }
1306                free(dispatch);
1307                dispatch = NULL;
1308        }
1309        if (disp_state != NULL) {
1310                free(disp_state);
1311                disp_state = NULL;
1312        }
1313       
1314        return 0;
1315}
1316
1317/* Cleanup handlers */
1318int fd_rtdisp_cleanup(void)
1319{
1320        /* Cleanup all remaining handlers */
1321        while (!FD_IS_LIST_EMPTY(&rt_fwd_list)) {
1322                CHECK_FCT_DO( fd_rt_fwd_unregister ( (void *)rt_fwd_list.next, NULL ), /* continue */ );
1323        }
1324        while (!FD_IS_LIST_EMPTY(&rt_out_list)) {
1325                CHECK_FCT_DO( fd_rt_out_unregister ( (void *)rt_out_list.next, NULL ), /* continue */ );
1326        }
1327       
1328        fd_disp_unregister_all(); /* destroy remaining handlers */
1329
1330        return 0;
1331}
1332
1333
1334/********************************************************************************/
1335/*                     For extensions to register a new appl                    */
1336/********************************************************************************/
1337
1338/* Add an application into the peer's supported apps */
1339int fd_disp_app_support ( struct dict_object * app, struct dict_object * vendor, int auth, int acct )
1340{
1341        application_id_t aid = 0;
1342        vendor_id_t      vid = 0;
1343       
1344        TRACE_ENTRY("%p %p %d %d", app, vendor, auth, acct);
1345        CHECK_PARAMS( app && (auth || acct) );
1346       
1347        {
1348                enum dict_object_type type = 0;
1349                struct dict_application_data data;
1350                CHECK_FCT( fd_dict_gettype(app, &type) );
1351                CHECK_PARAMS( type == DICT_APPLICATION );
1352                CHECK_FCT( fd_dict_getval(app, &data) );
1353                aid = data.application_id;
1354        }
1355
1356        if (vendor) {
1357                enum dict_object_type type = 0;
1358                struct dict_vendor_data data;
1359                CHECK_FCT( fd_dict_gettype(vendor, &type) );
1360                CHECK_PARAMS( type == DICT_VENDOR );
1361                CHECK_FCT( fd_dict_getval(vendor, &data) );
1362                vid = data.vendor_id;
1363        }
1364       
1365        return fd_app_merge(&fd_g_config->cnf_apps, aid, vid, auth, acct);
1366}
1367
1368
1369
Note: See TracBrowser for help on using the repository browser.