Changeset 124:cc42d8607114 in freeDiameter
- Timestamp:
- Dec 10, 2009, 2:15:04 PM (14 years ago)
- Branch:
- default
- Phase:
- public
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
freeDiameter/fD.h
r123 r124 100 100 /* Message queues */ 101 101 int fd_queues_init(void); 102 int fd_queues_fini_rt(void); 103 int fd_queues_fini_disp(void); 102 int fd_queues_fini(struct fifo ** queue); 104 103 105 104 /* Create all the dictionary objects defined in the Diameter base RFC. */ -
freeDiameter/queues.c
r123 r124 51 51 } 52 52 53 /* Destroy the routing message queues*/54 int fd_queues_fini _rt(void)53 /* Destroy a queue after emptying it (and dumping the content) */ 54 int fd_queues_fini(struct fifo ** queue) 55 55 { 56 TRACE_ENTRY(); 56 struct msg * msg; 57 int ret = 0; 57 58 59 TRACE_ENTRY("%p", queue); 60 61 /* Note : the threads that post into this queue should already been stopped before this !!! */ 62 58 63 /* Empty all contents */ 59 TODO("Empty all contents (dump to log file ?)"); 64 while (1) { 65 /* Check if there is a message in the queue */ 66 ret = fd_fifo_tryget(*queue, &msg); 67 if (ret == EWOULDBLOCK) 68 break; 69 CHECK_FCT(ret); 70 71 /* We got one! */ 72 fd_log_debug("The following message is lost because the daemon is stopping:\n"); 73 fd_msg_dump_walk(NONE, msg); 74 fd_msg_free(msg); 75 } 60 76 61 /* Now, delete the queues */ 62 CHECK_FCT( fd_fifo_del ( &fd_g_incoming ) ); 63 CHECK_FCT( fd_fifo_del ( &fd_g_outgoing ) ); 77 /* Now, delete the empty queue */ 78 CHECK_FCT( fd_fifo_del ( queue ) ); 64 79 65 80 return 0; 66 81 } 67 68 /* Destroy the local message queue */69 int fd_queues_fini_disp(void)70 {71 TRACE_ENTRY();72 73 /* Empty all contents */74 TODO("Empty all contents (dump to log file ?)");75 76 CHECK_FCT( fd_fifo_del ( &fd_g_local ) );77 78 return 0;79 } -
freeDiameter/routing_dispatch.c
r123 r124 185 185 186 186 /********************************************************************************/ 187 /* Helper functions*/187 /* Some default OUT routing callbacks */ 188 188 /********************************************************************************/ 189 /* Test if a User-Name AVP contains a Decorated NAI -- RFC4282, draft-ietf-dime-nai-routing-04 */ 190 static int is_decorated_NAI(union avp_value * un) 191 { 192 int i; 193 TRACE_ENTRY("%p", un); 194 195 /* If there was no User-Name, we return false */ 196 if (un == NULL) 197 return 0; 198 199 /* Search if there is a '!' before any '@' -- do we need to check it contains a '.' ? */ 200 for (i = 0; i < un->os.len; i++) { 201 if ( un->os.data[i] == (unsigned char) '!' ) 202 return 1; 203 if ( un->os.data[i] == (unsigned char) '@' ) 204 break; 205 if ( un->os.data[i] == (unsigned char) '\\' ) 206 i++; /* next one was escaped */ 207 } 208 209 return 0; 210 } 211 212 /* Create new User-Name and Destination-Realm values */ 213 static int process_decorated_NAI(union avp_value * un, union avp_value * dr) 214 { 215 int i, at_idx = 0, sep_idx = 0; 216 unsigned char * old_un; 217 TRACE_ENTRY("%p %p", un, dr); 218 CHECK_PARAMS(un && dr); 219 220 /* Save the decorated User-Name, for example 'homerealm.example.net!user@otherrealm.example.net' */ 221 old_un = un->os.data; 222 223 /* Search the positions of the first '!' and the '@' in the string */ 224 for (i = 0; i < un->os.len; i++) { 225 if ( (!sep_idx) && (old_un[i] == (unsigned char) '!') ) 226 sep_idx = i; 227 if ( old_un[i] == (unsigned char) '@' ) { 228 at_idx = i; 229 break; 230 } 231 if ( un->os.data[i] == (unsigned char) '\\' ) 232 i++; /* next one is escaped */ 233 } 234 235 CHECK_PARAMS( 0 < sep_idx < at_idx < un->os.len); 236 237 /* Create the new User-Name value */ 238 CHECK_MALLOC( un->os.data = malloc( at_idx ) ); 239 memcpy( un->os.data, old_un + sep_idx + 1, at_idx - sep_idx ); /* user@ */ 240 memcpy( un->os.data + at_idx - sep_idx, old_un, sep_idx ); /* homerealm.example.net */ 241 242 /* Create the new Destination-Realm value */ 243 CHECK_MALLOC( dr->os.data = realloc(dr->os.data, sep_idx) ); 244 memcpy( dr->os.data, old_un, sep_idx ); 245 dr->os.len = sep_idx; 246 247 TRACE_DEBUG(FULL, "Processed Decorated NAI '%.*s' into '%.*s' (%.*s)", 248 un->os.len, old_un, 249 at_idx, un->os.data, 250 dr->os.len, dr->os.data); 251 252 un->os.len = at_idx; 253 free(old_un); 254 255 return 0; 256 } 257 258 /* Function to return an error to an incoming request */ 259 static int return_error(struct msg * msg, char * error_code, char * error_message, struct avp * failedavp) 260 { 261 struct fd_peer * peer; 262 int is_loc = 0; 263 264 /* Get the source of the message */ 265 { 266 char * id; 267 CHECK_FCT( fd_msg_source_get( msg, &id ) ); 268 269 if (id == NULL) { 270 is_loc = 1; /* The message was issued locally */ 271 } else { 272 273 /* Search the peer with this id */ 274 CHECK_FCT( fd_peer_getbyid( id, (void *)&peer ) ); 275 276 if (!peer) { 277 TRACE_DEBUG(INFO, "Unable to send error '%s' to deleted peer '%s' in reply to:", error_code, id); 278 fd_msg_dump_walk(INFO, msg); 279 fd_msg_free(msg); 280 return 0; 281 } 282 } 283 } 284 285 /* Create the error message */ 286 CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, &msg, MSGFL_ANSW_ERROR ) ); 287 288 /* Set the error code */ 289 CHECK_FCT( fd_msg_rescode_set(msg, error_code, error_message, failedavp, 1 ) ); 290 291 /* Send the answer */ 292 if (is_loc) { 293 CHECK_FCT( fd_fifo_post(fd_g_incoming, &msg) ); 294 } else { 295 CHECK_FCT( fd_out_send(&msg, NULL, peer) ); 296 } 297 298 /* Done */ 299 return 0; 300 } 301 302 303 /********************************************************************************/ 304 /* Second part : the threads moving messages in the daemon */ 305 /********************************************************************************/ 306 307 /* Note: in the first version, we only create one thread of each kind. 308 We could improve the scalability by using the threshold feature of the queues 309 to create additional threads if a queue is filling up. 310 */ 311 312 /* Control of the threads */ 313 enum thread_state { INITIAL = 0, RUNNING = 1, TERMINATED = 2 }; 314 static void cleanup_state(void * state_loc) 315 { 316 if (state_loc) 317 *(enum thread_state *)state_loc = TERMINATED; 318 } 319 static pthread_mutex_t order_lock = PTHREAD_MUTEX_INITIALIZER; 320 static enum { RUN = 0, STOP = 1 } order_val = RUN;; 321 322 /* The dispatching thread */ 323 static void * dispatch_thr(void * arg) 324 { 325 TRACE_ENTRY("%p", arg); 326 327 /* Set the thread name */ 328 { 329 char buf[48]; 330 snprintf(buf, sizeof(buf), "Dispatch %p", arg); 331 fd_log_threadname ( buf ); 332 } 333 334 pthread_cleanup_push( cleanup_state, arg ); 335 336 /* Mark the thread running */ 337 *(enum thread_state *)arg = RUNNING; 338 339 do { 340 struct msg * msg; 341 struct msg_hdr * hdr; 342 int is_req = 0; 343 struct session * sess; 344 enum disp_action action; 345 const char * ec = NULL; 346 const char * em = NULL; 347 348 /* Test the environment */ 349 { 350 int must_stop; 351 CHECK_POSIX_DO( pthread_mutex_lock(&order_lock), goto end ); /* we lock to flush the caches */ 352 must_stop = (order_val == STOP); 353 CHECK_POSIX_DO( pthread_mutex_unlock(&order_lock), goto end ); 354 if (must_stop) 355 goto end; 356 357 pthread_testcancel(); 358 } 359 360 /* Ok, we are allowed to run */ 361 362 /* Get the next message from the queue */ 363 { 364 int ret; 365 CHECK_FCT_DO( ret = fd_fifo_get ( fd_g_local, &msg ), 366 { 367 if (ret == EPIPE) 368 /* The queue was destroyed */ 369 goto end; 370 goto fatal_error; 371 } ); 372 } 373 374 if (TRACE_BOOL(FULL)) { 375 TRACE_DEBUG(FULL, "Picked next message"); 376 fd_msg_dump_one(ANNOYING, msg); 377 } 378 379 /* Read the message header */ 380 CHECK_FCT_DO( fd_msg_hdr(msg, &hdr), goto fatal_error ); 381 is_req = hdr->msg_flags & CMD_FLAG_REQUEST; 382 383 /* Note: if the message is for local delivery, we should test for duplicate 384 (draft-asveren-dime-dupcons-00). This may conflict with path validation decisions, no clear answer yet */ 385 386 /* At this point, we need to understand the message content, so parse it */ 387 { 388 int ret; 389 CHECK_FCT_DO( ret = fd_msg_parse_or_error( &msg ), 390 { 391 /* in case of error, the message is already dump'd */ 392 if ((ret == EBADMSG) && (msg != NULL)) { 393 /* msg now contains the answer message to send back */ 394 CHECK_FCT_DO( fd_fifo_post(fd_g_outgoing, &msg), goto fatal_error ); 395 } 396 if (msg) { /* another error happen'd */ 397 TRACE_DEBUG(INFO, "An unexpected error occurred (%s), discarding a message:", strerror(ret)); 398 fd_msg_dump_walk(INFO, msg); 399 CHECK_FCT_DO( fd_msg_free(msg), /* continue */); 400 } 401 /* Go to the next message */ 402 continue; 403 } ); 404 } 405 406 /* First, if the original request was registered with a callback and we receive the answer, call it. */ 407 if ( ! is_req ) { 408 struct msg * qry; 409 void (*anscb)(void *, struct msg **) = NULL; 410 void * data = NULL; 411 412 /* Retrieve the corresponding query */ 413 CHECK_FCT_DO( fd_msg_answ_getq( msg, &qry ), goto fatal_error ); 414 415 /* Retrieve any registered handler */ 416 CHECK_FCT_DO( fd_msg_anscb_get( qry, &anscb, &data ), goto fatal_error ); 417 418 /* If a callback was registered, pass the message to it */ 419 if (anscb != NULL) { 420 421 TRACE_DEBUG(FULL, "Calling callback registered when query was sent (%p, %p)", anscb, data); 422 (*anscb)(data, &msg); 423 424 if (msg == NULL) { 425 /* Ok, the message is now handled, we can skip to the next one */ 426 continue; 427 } 428 } 429 } 430 431 /* Retrieve the session of the message */ 432 CHECK_FCT_DO( fd_msg_sess_get(fd_g_config->cnf_dict, msg, &sess, NULL), goto fatal_error ); 433 434 /* Now, call any callback registered for the message */ 435 CHECK_FCT_DO( fd_msg_dispatch ( &msg, sess, &action, &ec), goto fatal_error ); 436 437 /* Now, act depending on msg and action and ec */ 438 if (!msg) 439 continue; 440 441 switch ( action ) { 442 case DISP_ACT_CONT: 443 /* No callback has handled the message, let's reply with a generic error */ 444 em = "The message was not handled by any extension callback"; 445 ec = "DIAMETER_COMMAND_UNSUPPORTED"; 446 447 case DISP_ACT_ERROR: 448 /* We have a problem with delivering the message */ 449 if (ec == NULL) { 450 ec = "DIAMETER_UNABLE_TO_COMPLY"; 451 } 452 453 if (!is_req) { 454 TRACE_DEBUG(INFO, "Received an answer to a localy issued query, but no handler processed this answer!"); 455 fd_msg_dump_walk(INFO, msg); 456 fd_msg_free(msg); 457 msg = NULL; 458 break; 459 } 460 461 /* Create an answer with the error code and message */ 462 CHECK_FCT_DO( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, &msg, 0 ), goto fatal_error ); 463 CHECK_FCT_DO( fd_msg_rescode_set(msg, (char *)ec, (char *)em, NULL, 1 ), goto fatal_error ); 464 465 case DISP_ACT_SEND: 466 /* Now, send the message */ 467 CHECK_FCT_DO( fd_fifo_post(fd_g_outgoing, &msg), goto fatal_error ); 468 } 469 470 /* We're done with this message */ 471 472 } while (1); 473 474 fatal_error: 475 TRACE_DEBUG(INFO, "An error occurred in dispatch module! Thread is terminating..."); 476 CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); 477 478 end: 479 /* Mark the thread as terminated */ 480 pthread_cleanup_pop(1); 481 return NULL; 482 } 483 484 485 /* The (routing-in) thread -- see description in freeDiameter.h */ 486 static void * routing_in_thr(void * arg) 487 { 488 TRACE_ENTRY("%p", arg); 489 490 /* Set the thread name */ 491 { 492 char buf[48]; 493 snprintf(buf, sizeof(buf), "Routing-IN %p", arg); 494 fd_log_threadname ( buf ); 495 } 496 497 pthread_cleanup_push( cleanup_state, arg ); 498 499 /* Mark the thread running */ 500 *(enum thread_state *)arg = RUNNING; 501 502 /* Main thread loop */ 503 do { 504 struct msg * msg; 505 struct msg_hdr * hdr; 506 int is_req = 0; 507 int is_err = 0; 508 char * qry_src = NULL; 509 510 /* Test if we were told to stop */ 511 { 512 int must_stop; 513 CHECK_POSIX_DO( pthread_mutex_lock(&order_lock), goto end ); /* we lock to flush the caches */ 514 must_stop = (order_val == STOP); 515 CHECK_POSIX_DO( pthread_mutex_unlock(&order_lock), goto end ); 516 if (must_stop) 517 goto end; 518 519 pthread_testcancel(); 520 } 521 522 /* Get the next message from the incoming queue */ 523 { 524 int ret; 525 CHECK_FCT_DO( ret = fd_fifo_get ( fd_g_incoming, &msg ), 526 { 527 if (ret == EPIPE) 528 /* The queue was destroyed */ 529 goto end; 530 goto fatal_error; 531 } ); 532 } 533 534 if (TRACE_BOOL(FULL)) { 535 TRACE_DEBUG(FULL, "Picked next message"); 536 fd_msg_dump_one(ANNOYING, msg); 537 } 538 539 /* Read the message header */ 540 CHECK_FCT_DO( fd_msg_hdr(msg, &hdr), goto fatal_error ); 541 is_req = hdr->msg_flags & CMD_FLAG_REQUEST; 542 is_err = hdr->msg_flags & CMD_FLAG_ERROR; 543 544 /* Handle incorrect bits */ 545 if (is_req && is_err) { 546 CHECK_FCT_DO( return_error( msg, "DIAMETER_INVALID_HDR_BITS", "R & E bits were set", NULL), goto fatal_error ); 547 continue; 548 } 549 550 /* If it is a request, we must analyze its content to decide what we do with it */ 551 if (is_req) { 552 struct avp * avp, *un = NULL; 553 union avp_value * un_val = NULL, *dr_val = NULL; 554 enum status { UNKNOWN, YES, NO }; 555 /* Are we Destination-Host? */ 556 enum status is_dest_host = UNKNOWN; 557 /* Are we Destination-Realm? */ 558 enum status is_dest_realm = UNKNOWN; 559 /* Do we support the application of the message? */ 560 enum status is_local_app = UNKNOWN; 561 562 /* Check if we have local support for the message application */ 563 if ( (hdr->msg_appl == 0) || (hdr->msg_appl == AI_RELAY) ) { 564 TRACE_DEBUG(INFO, "Received a routable message with application id 0, returning DIAMETER_APPLICATION_UNSUPPORTED"); 565 CHECK_FCT_DO( return_error( msg, "DIAMETER_APPLICATION_UNSUPPORTED", "Routable message with application id 0 or relay", NULL), goto fatal_error ); 566 continue; 567 } else { 568 struct fd_app * app; 569 CHECK_FCT_DO( fd_app_check(&fd_g_config->cnf_apps, hdr->msg_appl, &app), goto fatal_error ); 570 is_local_app = (app ? YES : NO); 571 } 572 573 /* Parse the message for Dest-Host and Dest-Realm */ 574 CHECK_FCT_DO( fd_msg_browse(msg, MSG_BRW_FIRST_CHILD, &avp, NULL), goto fatal_error ); 575 while (avp) { 576 struct avp_hdr * ahdr; 577 CHECK_FCT_DO( fd_msg_avp_hdr( avp, &ahdr ), goto fatal_error ); 578 579 if (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) { 580 switch (ahdr->avp_code) { 581 case AC_DESTINATION_HOST: 582 /* Parse this AVP */ 583 CHECK_FCT_DO( fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, NULL ), goto fatal_error ); 584 ASSERT( ahdr->avp_value ); 585 /* Compare the Destination-Host AVP of the message with our identity */ 586 if (ahdr->avp_value->os.len != fd_g_config->cnf_diamid_len) { 587 is_dest_host = NO; 588 } else { 589 is_dest_host = (strncasecmp(fd_g_config->cnf_diamid, (char *)ahdr->avp_value->os.data, fd_g_config->cnf_diamid_len) 590 ? NO : YES); 591 } 592 break; 593 594 case AC_DESTINATION_REALM: 595 /* Parse this AVP */ 596 CHECK_FCT_DO( fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, NULL ), goto fatal_error ); 597 ASSERT( ahdr->avp_value ); 598 dr_val = ahdr->avp_value; 599 /* Compare the Destination-Realm AVP of the message with our identity */ 600 if (ahdr->avp_value->os.len != fd_g_config->cnf_diamrlm_len) { 601 is_dest_realm = NO; 602 } else { 603 is_dest_realm = (strncasecmp(fd_g_config->cnf_diamrlm, (char *)ahdr->avp_value->os.data, fd_g_config->cnf_diamrlm_len) 604 ? NO : YES); 605 } 606 break; 607 608 case AC_USER_NAME: 609 /* Parse this AVP */ 610 CHECK_FCT_DO( fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, NULL ), goto fatal_error ); 611 ASSERT( ahdr->avp_value ); 612 un = avp; 613 un_val = ahdr->avp_value; 614 break; 615 } 616 } 617 618 if ((is_dest_host != UNKNOWN) && (is_dest_realm != UNKNOWN) && un) 619 break; 620 621 /* Go to next AVP */ 622 CHECK_FCT_DO( fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL), goto fatal_error ); 623 } 624 625 /* OK, now decide what we do with the request */ 626 627 /* Handle the missing routing AVPs first */ 628 if ( is_dest_realm == UNKNOWN ) { 629 CHECK_FCT_DO( return_error( msg, "DIAMETER_COMMAND_UNSUPPORTED", "Non-routable message not supported (invalid bit ? missing Destination-Realm ?)", NULL), goto fatal_error ); 630 continue; 631 } 632 633 /* If we are listed as Destination-Host */ 634 if (is_dest_host == YES) { 635 if (is_local_app == YES) { 636 /* Ok, give the message to the dispatch thread */ 637 CHECK_FCT_DO(fd_fifo_post(fd_g_local, &msg), goto fatal_error ); 638 } else { 639 /* We don't support the application, reply an error */ 640 CHECK_FCT_DO( return_error( msg, "DIAMETER_APPLICATION_UNSUPPORTED", NULL, NULL), goto fatal_error ); 641 } 642 continue; 643 } 644 645 /* If the message is explicitely for someone else */ 646 if ((is_dest_host == NO) || (is_dest_realm == NO)) { 647 if (fd_g_config->cnf_flags.no_fwd) { 648 CHECK_FCT_DO( return_error( msg, "DIAMETER_UNABLE_TO_DELIVER", "This peer is not an agent", NULL), goto fatal_error ); 649 continue; 650 } 651 } else { 652 /* Destination-Host was not set, and Destination-Realm is matching : we may handle or pass to a fellow peer */ 653 654 /* test for decorated NAI (draft-ietf-dime-nai-routing-04 section 4.4) */ 655 if (is_decorated_NAI(un_val)) { 656 /* Handle the decorated NAI */ 657 CHECK_FCT_DO( process_decorated_NAI(un_val, dr_val), 658 { 659 /* If the process failed, we assume it is because of the AVP format */ 660 CHECK_FCT_DO( return_error( msg, "DIAMETER_INVALID_AVP_VALUE", "Failed to process decorated NAI", un), goto fatal_error ); 661 continue; 662 } ); 663 664 /* We have transformed the AVP, now submit it again in the queue */ 665 CHECK_FCT_DO(fd_fifo_post(fd_g_incoming, &msg), goto fatal_error ); 666 continue; 667 } 668 669 if (is_local_app == YES) { 670 /* Handle localy since we are able to */ 671 CHECK_FCT_DO(fd_fifo_post(fd_g_local, &msg), goto fatal_error ); 672 continue; 673 } 674 675 if (fd_g_config->cnf_flags.no_fwd) { 676 /* We return an error */ 677 CHECK_FCT_DO( return_error( msg, "DIAMETER_APPLICATION_UNSUPPORTED", NULL, NULL), goto fatal_error ); 678 continue; 679 } 680 } 681 682 /* From that point, for requests, we will call the registered callbacks, then forward to another peer */ 683 684 } else { 685 /* The message is an answer */ 686 struct msg * qry; 687 688 /* Retrieve the corresponding query and its origin */ 689 CHECK_FCT_DO( fd_msg_answ_getq( msg, &qry ), goto fatal_error ); 690 CHECK_FCT_DO( fd_msg_source_get( qry, &qry_src ), goto fatal_error ); 691 692 if ((!qry_src) && (!is_err)) { 693 /* The message is a normal answer to a request issued localy, we do not call the callbacks chain on it. */ 694 CHECK_FCT_DO(fd_fifo_post(fd_g_local, &msg), goto fatal_error ); 695 continue; 696 } 697 698 /* From that point, for answers, we will call the registered callbacks, then pass it to the dispatch module or forward it */ 699 } 700 701 /* Call all registered callbacks for this message */ 702 { 703 struct fd_list * li; 704 705 CHECK_FCT_DO( pthread_rwlock_rdlock( &rt_fwd_lock ), goto fatal_error ); 706 pthread_cleanup_push( fd_cleanup_rwlock, &rt_fwd_lock ); 707 708 /* requests: dir = 1 & 2 => in order; answers = 3 & 2 => in reverse order */ 709 for ( li = (is_req ? rt_fwd_list.next : rt_fwd_list.prev) ; msg && (li != &rt_fwd_list) ; li = (is_req ? li->next : li->prev) ) { 710 struct rt_hdl * rh = (struct rt_hdl *)li; 711 712 if (is_req && (rh->dir > RT_FWD_ALL)) 713 break; 714 if ((!is_req) && (rh->dir < RT_FWD_ALL)) 715 break; 716 717 /* Ok, call this cb */ 718 TRACE_DEBUG(ANNOYING, "Calling next FWD callback on %p : %p", msg, rh->rt_fwd_cb); 719 CHECK_FCT_DO( (*rh->rt_fwd_cb)(rh->cbdata, &msg), 720 { 721 TRACE_DEBUG(INFO, "A FWD routing callback returned an error, message discarded."); 722 fd_msg_dump_walk(INFO, msg); 723 fd_msg_free(msg); 724 msg = NULL; 725 } ); 726 } 727 728 pthread_cleanup_pop(0); 729 CHECK_FCT_DO( pthread_rwlock_unlock( &rt_fwd_lock ), goto fatal_error ); 730 731 /* If a callback has handled the message, we stop now */ 732 if (!msg) 733 continue; 734 } 735 736 /* Now handle the message to the next step: either forward to another peer, or for local delivery */ 737 if (is_req || qry_src) { 738 CHECK_FCT_DO(fd_fifo_post(fd_g_outgoing, &msg), goto fatal_error ); 739 } else { 740 CHECK_FCT_DO(fd_fifo_post(fd_g_local, &msg), goto fatal_error ); 741 } 742 743 /* We're done with this message */ 744 } while (1); 745 746 fatal_error: 747 TRACE_DEBUG(INFO, "An error occurred in routing module! IN thread is terminating..."); 748 CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); 749 750 end: 751 /* Mark the thread as terminated */ 752 pthread_cleanup_pop(1); 753 return NULL; 754 } 755 756 757 /* The (routing-out) thread -- see description in freeDiameter.h */ 758 static void * routing_out_thr(void * arg) 759 { 760 struct rt_data * rtd = NULL; 761 TRACE_ENTRY("%p", arg); 762 763 /* Set the thread name */ 764 { 765 char buf[48]; 766 snprintf(buf, sizeof(buf), "Routing OUT %p", arg); 767 fd_log_threadname ( buf ); 768 } 769 770 pthread_cleanup_push( cleanup_state, arg ); 771 772 /* Mark the thread running */ 773 *(enum thread_state *)arg = RUNNING; 774 775 776 /* Main thread loop */ 777 do { 778 struct msg * msg; 779 struct msg_hdr * hdr; 780 int is_req = 0; 781 struct fd_list * li, *candidates; 782 struct avp * avp; 783 struct rtd_candidate * c; 784 785 /* If we loop'd with some undeleted routing data, destroy it */ 786 if (rtd != NULL) 787 fd_rtd_free(&rtd); 788 789 /* Test if we were told to stop */ 790 { 791 int must_stop; 792 CHECK_POSIX_DO( pthread_mutex_lock(&order_lock), goto end ); /* we lock to flush the caches */ 793 must_stop = (order_val == STOP); 794 CHECK_POSIX_DO( pthread_mutex_unlock(&order_lock), goto end ); 795 if (must_stop) 796 goto end; 797 798 pthread_testcancel(); 799 } 800 801 /* Get the next message from the ougoing queue */ 802 { 803 int ret; 804 CHECK_FCT_DO( ret = fd_fifo_get ( fd_g_outgoing, &msg ), 805 { 806 if (ret == EPIPE) 807 /* The queue was destroyed */ 808 goto end; 809 goto fatal_error; 810 } ); 811 } 812 813 if (TRACE_BOOL(FULL)) { 814 TRACE_DEBUG(FULL, "Picked next message"); 815 fd_msg_dump_one(ANNOYING, msg); 816 } 817 818 /* Read the message header */ 819 CHECK_FCT_DO( fd_msg_hdr(msg, &hdr), goto fatal_error ); 820 is_req = hdr->msg_flags & CMD_FLAG_REQUEST; 821 822 /* For answers, the routing is very easy */ 823 if ( ! is_req ) { 824 struct msg * qry; 825 char * qry_src = NULL; 826 struct msg_hdr * qry_hdr; 827 struct fd_peer * peer = NULL; 828 829 /* Retrieve the corresponding query and its origin */ 830 CHECK_FCT_DO( fd_msg_answ_getq( msg, &qry ), goto fatal_error ); 831 CHECK_FCT_DO( fd_msg_source_get( qry, &qry_src ), goto fatal_error ); 832 833 ASSERT( qry_src ); /* if it is NULL, the message should have been in the LOCAL queue! */ 834 835 /* Find the peer corresponding to this name */ 836 CHECK_FCT_DO( fd_peer_getbyid( qry_src, (void *) &peer ), goto fatal_error ); 837 if ((!peer) || (peer->p_hdr.info.runtime.pir_state != STATE_OPEN)) { 838 TRACE_DEBUG(INFO, "Unable to forward answer message to peer '%s', deleted or not in OPEN state.", qry_src); 839 fd_msg_dump_walk(INFO, msg); 840 fd_msg_free(msg); 841 continue; 842 } 843 844 /* We must restore the hop-by-hop id */ 845 CHECK_FCT_DO( fd_msg_hdr(qry, &qry_hdr), goto fatal_error ); 846 hdr->msg_hbhid = qry_hdr->msg_hbhid; 847 848 /* Push the message into this peer */ 849 CHECK_FCT_DO( fd_out_send(&msg, NULL, peer), goto fatal_error ); 850 851 /* We're done with this answer */ 852 continue; 853 } 854 855 /* From that point, the message is a request */ 856 857 /* Get the routing data out of the message if any (in case of re-transmit) */ 858 CHECK_FCT_DO( fd_msg_rt_get ( msg, &rtd ), goto fatal_error ); 859 860 /* If there is no routing data already, let's create it */ 861 if (rtd == NULL) { 862 CHECK_FCT_DO( fd_rtd_init(&rtd), goto fatal_error ); 863 864 /* Add all peers in OPEN state */ 865 CHECK_FCT_DO( pthread_rwlock_wrlock(&fd_g_activ_peers_rw), goto fatal_error ); 866 for (li = fd_g_activ_peers.next; li != &fd_g_activ_peers; li = li->next) { 867 struct fd_peer * p = (struct fd_peer *)li->o; 868 CHECK_FCT_DO( fd_rtd_candidate_add(rtd, p->p_hdr.info.pi_diamid), goto fatal_error); 869 } 870 CHECK_FCT_DO( pthread_rwlock_unlock(&fd_g_activ_peers_rw), goto fatal_error ); 871 872 /* Now let's remove all peers from the Route-Records */ 873 CHECK_FCT_DO( fd_msg_browse(msg, MSG_BRW_FIRST_CHILD, &avp, NULL), goto fatal_error ); 874 while (avp) { 875 struct avp_hdr * ahdr; 876 CHECK_FCT_DO( fd_msg_avp_hdr( avp, &ahdr ), goto fatal_error ); 877 878 if ((ahdr->avp_code == AC_ROUTE_RECORD) && (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) ) { 879 /* Parse this AVP */ 880 CHECK_FCT_DO( fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, NULL ), goto fatal_error ); 881 ASSERT( ahdr->avp_value ); 882 /* Remove this value from the list */ 883 fd_rtd_candidate_del(rtd, (char *)ahdr->avp_value->os.data, ahdr->avp_value->os.len); 884 } 885 886 /* Go to next AVP */ 887 CHECK_FCT_DO( fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL), goto fatal_error ); 888 } 889 } 890 891 /* Note: we reset the scores and pass the message to the callbacks, maybe we could re-use the saved scores when we have received an error ? */ 892 893 /* Ok, we have our list in rtd now, let's (re)initialize the scores */ 894 fd_rtd_candidate_extract(rtd, &candidates); 895 896 /* Pass the list to registered callbacks (even if it is empty) */ 897 { 898 CHECK_FCT_DO( pthread_rwlock_rdlock( &rt_out_lock ), goto fatal_error ); 899 pthread_cleanup_push( fd_cleanup_rwlock, &rt_out_lock ); 900 901 /* We call the cb by reverse priority order */ 902 for ( li = rt_out_list.prev ; li != &rt_out_list ; li = li->prev ) { 903 struct rt_hdl * rh = (struct rt_hdl *)li; 904 905 TRACE_DEBUG(ANNOYING, "Calling next OUT callback on %p : %p (prio %d)", msg, rh->rt_out_cb, rh->prio); 906 CHECK_FCT_DO( (*rh->rt_out_cb)(rh->cbdata, msg, candidates), 907 { 908 TRACE_DEBUG(INFO, "An OUT routing callback returned an error ! Message discarded."); 909 fd_msg_dump_walk(INFO, msg); 910 fd_msg_free(msg); 911 msg = NULL; 912 break; 913 } ); 914 } 915 916 pthread_cleanup_pop(0); 917 CHECK_FCT_DO( pthread_rwlock_unlock( &rt_out_lock ), goto fatal_error ); 918 919 /* If an error occurred, skip to the next message */ 920 if (!msg) 921 continue; 922 } 923 924 /* Order the candidate peers by score attributed by the callbacks */ 925 CHECK_FCT_DO( fd_rtd_candidate_reorder(candidates), goto fatal_error ); 926 927 /* Save the routing information in the message */ 928 CHECK_FCT_DO( fd_msg_rt_associate ( msg, &rtd ), goto fatal_error ); 929 930 /* Now try sending the message */ 931 for (li = candidates->prev; li != candidates; li = li->prev) { 932 struct fd_peer * peer; 933 934 c = (struct rtd_candidate *) li; 935 936 /* Stop when we have reached the end of valid candidates */ 937 if (c->score < 0) 938 break; 939 940 /* Search for the peer */ 941 CHECK_FCT_DO( fd_peer_getbyid( c->diamid, (void *)&peer ), goto fatal_error ); 942 943 if (peer && (peer->p_hdr.info.runtime.pir_state == STATE_OPEN)) { 944 /* Send to this one */ 945 CHECK_FCT_DO( fd_out_send(&msg, NULL, peer), continue ); 946 /* If the sending was successful */ 947 break; 948 } 949 } 950 951 /* If the message has not been sent, return an error */ 952 if (msg) { 953 TRACE_DEBUG(INFO, "Could not send the following message, replying with UNABLE_TO_DELIVER"); 954 fd_msg_dump_walk(INFO, msg); 955 return_error( msg, "DIAMETER_UNABLE_TO_DELIVER", "No suitable candidate to route the message to", NULL); 956 } 957 958 /* We're done with this message */ 959 960 } while (1); 961 962 fatal_error: 963 TRACE_DEBUG(INFO, "An error occurred in routing module! OUT thread is terminating..."); 964 CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); 965 966 end: 967 /* Mark the thread as terminated */ 968 pthread_cleanup_pop(1); 969 return NULL; 970 } 971 972 973 /********************************************************************************/ 974 /* Some default routing callbacks */ 975 /********************************************************************************/ 976 977 /* First OUT callback: prevent sending to peers that do not support the message application */ 189 190 /* Prevent sending to peers that do not support the message application */ 978 191 static int dont_send_if_no_common_app(void * cbdata, struct msg * msg, struct fd_list * candidates) 979 192 { … … 1007 220 } 1008 221 1009 /* Second OUT callback:Detect if the Destination-Host and Destination-Realm match the peer */222 /* Detect if the Destination-Host and Destination-Realm match the peer */ 1010 223 static int score_destination_avp(void * cbdata, struct msg * msg, struct fd_list * candidates) 1011 224 { … … 1074 287 1075 288 /********************************************************************************/ 289 /* Helper functions */ 290 /********************************************************************************/ 291 292 /* Find (first) '!' and '@' positions in a UTF-8 encoded string (User-Name AVP value) */ 293 static void nai_get_indexes(union avp_value * un, int * excl_idx, int * at_idx) 294 { 295 int i; 296 297 TRACE_ENTRY("%p %p %p", un, excl_idx, at_idx); 298 CHECK_PARAMS_DO( un && excl_idx, return ); 299 *excl_idx = 0; 300 301 /* Search if there is a '!' before any '@' -- do we need to check it contains a '.' ? */ 302 for (i = 0; i < un->os.len; i++) { 303 /* The '!' marks the decorated NAI */ 304 if ( un->os.data[i] == (unsigned char) '!' ) { 305 if (!*excl_idx) 306 *excl_idx = i; 307 if (!at_idx) 308 return; 309 } 310 /* If we reach the realm part, we can stop */ 311 if ( un->os.data[i] == (unsigned char) '@' ) { 312 if (at_idx) 313 *at_idx = i; 314 break; 315 } 316 /* Skip escaped characters */ 317 if ( un->os.data[i] == (unsigned char) '\\' ) { 318 i++; 319 continue; 320 } 321 /* Skip UTF-8 characters spanning on several bytes */ 322 if ( (un->os.data[i] & 0xF8) == 0xF0 ) { /* 11110zzz */ 323 i += 3; 324 continue; 325 } 326 if ( (un->os.data[i] & 0xF0) == 0xE0 ) { /* 1110yyyy */ 327 i += 2; 328 continue; 329 } 330 if ( (un->os.data[i] & 0xE0) == 0xC0 ) { /* 110yyyxx */ 331 i += 1; 332 continue; 333 } 334 } 335 336 return; 337 } 338 339 /* Test if a User-Name AVP contains a Decorated NAI -- RFC4282, draft-ietf-dime-nai-routing-04 */ 340 static int is_decorated_NAI(union avp_value * un) 341 { 342 int i; 343 TRACE_ENTRY("%p", un); 344 345 /* If there was no User-Name, we return false */ 346 if (un == NULL) 347 return 0; 348 349 nai_get_indexes(un, &i, NULL); 350 351 return i; 352 } 353 354 /* Create new User-Name and Destination-Realm values */ 355 static int process_decorated_NAI(union avp_value * un, union avp_value * dr) 356 { 357 int i, at_idx = 0, sep_idx = 0; 358 unsigned char * old_un; 359 TRACE_ENTRY("%p %p", un, dr); 360 CHECK_PARAMS(un && dr); 361 362 /* Save the decorated User-Name, for example 'homerealm.example.net!user@otherrealm.example.net' */ 363 old_un = un->os.data; 364 365 /* Search the positions of the first '!' and the '@' in the string */ 366 nai_get_indexes(un, &sep_idx, &at_idx); 367 CHECK_PARAMS( 0 < sep_idx < at_idx < un->os.len); 368 369 /* Create the new User-Name value */ 370 CHECK_MALLOC( un->os.data = malloc( at_idx ) ); 371 memcpy( un->os.data, old_un + sep_idx + 1, at_idx - sep_idx ); /* user@ */ 372 memcpy( un->os.data + at_idx - sep_idx, old_un, sep_idx ); /* homerealm.example.net */ 373 374 /* Create the new Destination-Realm value */ 375 CHECK_MALLOC( dr->os.data = realloc(dr->os.data, sep_idx) ); 376 memcpy( dr->os.data, old_un, sep_idx ); 377 dr->os.len = sep_idx; 378 379 TRACE_DEBUG(FULL, "Processed Decorated NAI : '%.*s' became '%.*s' (%.*s)", 380 un->os.len, old_un, 381 at_idx, un->os.data, 382 dr->os.len, dr->os.data); 383 384 un->os.len = at_idx; 385 free(old_un); 386 387 return 0; 388 } 389 390 /* Function to return an error to an incoming request */ 391 static int return_error(struct msg ** pmsg, char * error_code, char * error_message, struct avp * failedavp) 392 { 393 struct fd_peer * peer; 394 int is_loc = 0; 395 396 /* Get the source of the message */ 397 { 398 char * id; 399 CHECK_FCT( fd_msg_source_get( *pmsg, &id ) ); 400 401 if (id == NULL) { 402 is_loc = 1; /* The message was issued locally */ 403 } else { 404 405 /* Search the peer with this id */ 406 CHECK_FCT( fd_peer_getbyid( id, (void *)&peer ) ); 407 408 if (!peer) { 409 TRACE_DEBUG(INFO, "Unable to send error '%s' to deleted peer '%s' in reply to:", error_code, id); 410 fd_msg_dump_walk(INFO, *pmsg); 411 fd_msg_free(*pmsg); 412 *pmsg = NULL; 413 return 0; 414 } 415 } 416 } 417 418 /* Create the error message */ 419 CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, pmsg, MSGFL_ANSW_ERROR ) ); 420 421 /* Set the error code */ 422 CHECK_FCT( fd_msg_rescode_set(*pmsg, error_code, error_message, failedavp, 1 ) ); 423 424 /* Send the answer */ 425 if (is_loc) { 426 CHECK_FCT( fd_fifo_post(fd_g_incoming, pmsg) ); 427 } else { 428 CHECK_FCT( fd_out_send(pmsg, NULL, peer) ); 429 } 430 431 /* Done */ 432 return 0; 433 } 434 435 436 /****************************************************************************/ 437 /* Second part : threads moving messages in the daemon */ 438 /****************************************************************************/ 439 440 /* These are the functions of each threads: dispatch & routing */ 441 /* The DISPATCH message processing */ 442 static int msg_dispatch(struct msg ** pmsg) 443 { 444 struct msg_hdr * hdr; 445 int is_req = 0, ret; 446 struct session * sess; 447 enum disp_action action; 448 const char * ec = NULL; 449 const char * em = NULL; 450 451 /* Read the message header */ 452 CHECK_FCT( fd_msg_hdr(*pmsg, &hdr) ); 453 is_req = hdr->msg_flags & CMD_FLAG_REQUEST; 454 455 /* Note: if the message is for local delivery, we should test for duplicate 456 (draft-asveren-dime-dupcons-00). This may conflict with path validation decisions, no clear answer yet */ 457 458 /* At this point, we need to understand the message content, so parse it */ 459 CHECK_FCT_DO( ret = fd_msg_parse_or_error( pmsg ), 460 { 461 /* in case of error, the message is already dump'd */ 462 if ((ret == EBADMSG) && (*pmsg != NULL)) { 463 /* msg now contains the answer message to send back */ 464 CHECK_FCT( fd_fifo_post(fd_g_outgoing, pmsg) ); 465 } 466 if (*pmsg) { /* another error happen'd */ 467 TRACE_DEBUG(INFO, "An unexpected error occurred (%s), discarding a message:", strerror(ret)); 468 fd_msg_dump_walk(INFO, *pmsg); 469 CHECK_FCT_DO( fd_msg_free(*pmsg), /* continue */); 470 *pmsg = NULL; 471 } 472 /* We're done with this one */ 473 return 0; 474 } ); 475 476 /* First, if the original request was registered with a callback and we receive the answer, call it. */ 477 if ( ! is_req ) { 478 struct msg * qry; 479 void (*anscb)(void *, struct msg **) = NULL; 480 void * data = NULL; 481 482 /* Retrieve the corresponding query */ 483 CHECK_FCT( fd_msg_answ_getq( *pmsg, &qry ) ); 484 485 /* Retrieve any registered handler */ 486 CHECK_FCT( fd_msg_anscb_get( qry, &anscb, &data ) ); 487 488 /* If a callback was registered, pass the message to it */ 489 if (anscb != NULL) { 490 491 TRACE_DEBUG(FULL, "Calling callback registered when query was sent (%p, %p)", anscb, data); 492 (*anscb)(data, pmsg); 493 494 /* If the message is processed, we're done */ 495 if (*pmsg == NULL) { 496 return 0; 497 } 498 } 499 } 500 501 /* Retrieve the session of the message */ 502 CHECK_FCT( fd_msg_sess_get(fd_g_config->cnf_dict, *pmsg, &sess, NULL) ); 503 504 /* Now, call any callback registered for the message */ 505 CHECK_FCT( fd_msg_dispatch ( pmsg, sess, &action, &ec) ); 506 507 /* Now, act depending on msg and action and ec */ 508 if (*pmsg) 509 switch ( action ) { 510 case DISP_ACT_CONT: 511 /* No callback has handled the message, let's reply with a generic error */ 512 em = "The message was not handled by any extension callback"; 513 ec = "DIAMETER_COMMAND_UNSUPPORTED"; 514 515 case DISP_ACT_ERROR: 516 /* We have a problem with delivering the message */ 517 if (ec == NULL) { 518 ec = "DIAMETER_UNABLE_TO_COMPLY"; 519 } 520 521 if (!is_req) { 522 TRACE_DEBUG(INFO, "Received an answer to a localy issued query, but no handler processed this answer!"); 523 fd_msg_dump_walk(INFO, *pmsg); 524 fd_msg_free(*pmsg); 525 *pmsg = NULL; 526 break; 527 } 528 529 /* Create an answer with the error code and message */ 530 CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, pmsg, 0 ) ); 531 CHECK_FCT( fd_msg_rescode_set(*pmsg, (char *)ec, (char *)em, NULL, 1 ) ); 532 533 case DISP_ACT_SEND: 534 /* Now, send the message */ 535 CHECK_FCT( fd_fifo_post(fd_g_outgoing, pmsg) ); 536 } 537 538 /* We're done with dispatching this message */ 539 return 0; 540 } 541 542 /* The ROUTING-IN message processing */ 543 static int msg_rt_in(struct msg ** pmsg) 544 { 545 struct msg_hdr * hdr; 546 int is_req = 0; 547 int is_err = 0; 548 char * qry_src = NULL; 549 550 /* Read the message header */ 551 CHECK_FCT( fd_msg_hdr(*pmsg, &hdr) ); 552 is_req = hdr->msg_flags & CMD_FLAG_REQUEST; 553 is_err = hdr->msg_flags & CMD_FLAG_ERROR; 554 555 /* Handle incorrect bits */ 556 if (is_req && is_err) { 557 CHECK_FCT( return_error( pmsg, "DIAMETER_INVALID_HDR_BITS", "R & E bits were set", NULL) ); 558 return 0; 559 } 560 561 /* If it is a request, we must analyze its content to decide what we do with it */ 562 if (is_req) { 563 struct avp * avp, *un = NULL; 564 union avp_value * un_val = NULL, *dr_val = NULL; 565 enum status { UNKNOWN, YES, NO }; 566 /* Are we Destination-Host? */ 567 enum status is_dest_host = UNKNOWN; 568 /* Are we Destination-Realm? */ 569 enum status is_dest_realm = UNKNOWN; 570 /* Do we support the application of the message? */ 571 enum status is_local_app = UNKNOWN; 572 573 /* Check if we have local support for the message application */ 574 if ( (hdr->msg_appl == 0) || (hdr->msg_appl == AI_RELAY) ) { 575 TRACE_DEBUG(INFO, "Received a routable message with application id 0, returning DIAMETER_APPLICATION_UNSUPPORTED"); 576 CHECK_FCT( return_error( pmsg, "DIAMETER_APPLICATION_UNSUPPORTED", "Routable message with application id 0 or relay", NULL) ); 577 return 0; 578 } else { 579 struct fd_app * app; 580 CHECK_FCT( fd_app_check(&fd_g_config->cnf_apps, hdr->msg_appl, &app) ); 581 is_local_app = (app ? YES : NO); 582 } 583 584 /* Parse the message for Dest-Host and Dest-Realm */ 585 CHECK_FCT( fd_msg_browse(*pmsg, MSG_BRW_FIRST_CHILD, &avp, NULL) ); 586 while (avp) { 587 struct avp_hdr * ahdr; 588 struct fd_pei error_info; 589 int ret; 590 CHECK_FCT( fd_msg_avp_hdr( avp, &ahdr ) ); 591 592 if (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) { 593 switch (ahdr->avp_code) { 594 case AC_DESTINATION_HOST: 595 /* Parse this AVP */ 596 CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ), 597 { 598 if (error_info.pei_errcode) { 599 CHECK_FCT( return_error( pmsg, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) ); 600 return 0; 601 } else { 602 return ret; 603 } 604 } ); 605 ASSERT( ahdr->avp_value ); 606 /* Compare the Destination-Host AVP of the message with our identity */ 607 if (ahdr->avp_value->os.len != fd_g_config->cnf_diamid_len) { 608 is_dest_host = NO; 609 } else { 610 is_dest_host = (strncasecmp(fd_g_config->cnf_diamid, (char *)ahdr->avp_value->os.data, fd_g_config->cnf_diamid_len) 611 ? NO : YES); 612 } 613 break; 614 615 case AC_DESTINATION_REALM: 616 /* Parse this AVP */ 617 CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ), 618 { 619 if (error_info.pei_errcode) { 620 CHECK_FCT( return_error( pmsg, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) ); 621 return 0; 622 } else { 623 return ret; 624 } 625 } ); 626 ASSERT( ahdr->avp_value ); 627 dr_val = ahdr->avp_value; 628 /* Compare the Destination-Realm AVP of the message with our identity */ 629 if (ahdr->avp_value->os.len != fd_g_config->cnf_diamrlm_len) { 630 is_dest_realm = NO; 631 } else { 632 is_dest_realm = (strncasecmp(fd_g_config->cnf_diamrlm, (char *)ahdr->avp_value->os.data, fd_g_config->cnf_diamrlm_len) 633 ? NO : YES); 634 } 635 break; 636 637 case AC_USER_NAME: 638 /* Parse this AVP */ 639 CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ), 640 { 641 if (error_info.pei_errcode) { 642 CHECK_FCT( return_error( pmsg, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) ); 643 return 0; 644 } else { 645 return ret; 646 } 647 } ); 648 ASSERT( ahdr->avp_value ); 649 un = avp; 650 un_val = ahdr->avp_value; 651 break; 652 } 653 } 654 655 if ((is_dest_host != UNKNOWN) && (is_dest_realm != UNKNOWN) && un) 656 break; 657 658 /* Go to next AVP */ 659 CHECK_FCT( fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL) ); 660 } 661 662 /* OK, now decide what we do with the request */ 663 664 /* Handle the missing routing AVPs first */ 665 if ( is_dest_realm == UNKNOWN ) { 666 CHECK_FCT( return_error( pmsg, "DIAMETER_COMMAND_UNSUPPORTED", "Non-routable message not supported (invalid bit ? missing Destination-Realm ?)", NULL) ); 667 return 0; 668 } 669 670 /* If we are listed as Destination-Host */ 671 if (is_dest_host == YES) { 672 if (is_local_app == YES) { 673 /* Ok, give the message to the dispatch thread */ 674 CHECK_FCT( fd_fifo_post(fd_g_local, pmsg) ); 675 } else { 676 /* We don't support the application, reply an error */ 677 CHECK_FCT( return_error( pmsg, "DIAMETER_APPLICATION_UNSUPPORTED", NULL, NULL) ); 678 } 679 return 0; 680 } 681 682 /* If the message is explicitely for someone else */ 683 if ((is_dest_host == NO) || (is_dest_realm == NO)) { 684 if (fd_g_config->cnf_flags.no_fwd) { 685 CHECK_FCT( return_error( pmsg, "DIAMETER_UNABLE_TO_DELIVER", "This peer is not an agent", NULL) ); 686 return 0; 687 } 688 } else { 689 /* Destination-Host was not set, and Destination-Realm is matching : we may handle or pass to a fellow peer */ 690 691 /* test for decorated NAI (draft-ietf-dime-nai-routing-04 section 4.4) */ 692 if (is_decorated_NAI(un_val)) { 693 /* Handle the decorated NAI */ 694 CHECK_FCT_DO( process_decorated_NAI(un_val, dr_val), 695 { 696 /* If the process failed, we assume it is because of the AVP format */ 697 CHECK_FCT( return_error( pmsg, "DIAMETER_INVALID_AVP_VALUE", "Failed to process decorated NAI", un) ); 698 return 0; 699 } ); 700 701 /* We have transformed the AVP, now submit it again in the queue */ 702 CHECK_FCT(fd_fifo_post(fd_g_incoming, pmsg) ); 703 return 0; 704 } 705 706 if (is_local_app == YES) { 707 /* Handle localy since we are able to */ 708 CHECK_FCT(fd_fifo_post(fd_g_local, pmsg) ); 709 return 0; 710 } 711 712 if (fd_g_config->cnf_flags.no_fwd) { 713 /* We return an error */ 714 CHECK_FCT( return_error( pmsg, "DIAMETER_APPLICATION_UNSUPPORTED", NULL, NULL) ); 715 return 0; 716 } 717 } 718 719 /* From that point, for requests, we will call the registered callbacks, then forward to another peer */ 720 721 } else { 722 /* The message is an answer */ 723 struct msg * qry; 724 725 /* Retrieve the corresponding query and its origin */ 726 CHECK_FCT( fd_msg_answ_getq( *pmsg, &qry ) ); 727 CHECK_FCT( fd_msg_source_get( qry, &qry_src ) ); 728 729 if ((!qry_src) && (!is_err)) { 730 /* The message is a normal answer to a request issued localy, we do not call the callbacks chain on it. */ 731 CHECK_FCT(fd_fifo_post(fd_g_local, pmsg) ); 732 return 0; 733 } 734 735 /* From that point, for answers, we will call the registered callbacks, then pass it to the dispatch module or forward it */ 736 } 737 738 /* Call all registered callbacks for this message */ 739 { 740 struct fd_list * li; 741 742 CHECK_FCT( pthread_rwlock_rdlock( &rt_fwd_lock ) ); 743 pthread_cleanup_push( fd_cleanup_rwlock, &rt_fwd_lock ); 744 745 /* requests: dir = 1 & 2 => in order; answers = 3 & 2 => in reverse order */ 746 for ( li = (is_req ? rt_fwd_list.next : rt_fwd_list.prev) ; *pmsg && (li != &rt_fwd_list) ; li = (is_req ? li->next : li->prev) ) { 747 struct rt_hdl * rh = (struct rt_hdl *)li; 748 749 if (is_req && (rh->dir > RT_FWD_ALL)) 750 break; 751 if ((!is_req) && (rh->dir < RT_FWD_ALL)) 752 break; 753 754 /* Ok, call this cb */ 755 TRACE_DEBUG(ANNOYING, "Calling next FWD callback on %p : %p", *pmsg, rh->rt_fwd_cb); 756 CHECK_FCT_DO( (*rh->rt_fwd_cb)(rh->cbdata, pmsg), 757 { 758 TRACE_DEBUG(INFO, "A FWD routing callback returned an error, message discarded."); 759 fd_msg_dump_walk(INFO, *pmsg); 760 fd_msg_free(*pmsg); 761 *pmsg = NULL; 762 } ); 763 } 764 765 pthread_cleanup_pop(0); 766 CHECK_FCT( pthread_rwlock_unlock( &rt_fwd_lock ) ); 767 768 /* If a callback has handled the message, we stop now */ 769 if (!*pmsg) 770 return 0; 771 } 772 773 /* Now pass the message to the next step: either forward to another peer, or dispatch to local extensions */ 774 if (is_req || qry_src) { 775 CHECK_FCT(fd_fifo_post(fd_g_outgoing, pmsg) ); 776 } else { 777 CHECK_FCT(fd_fifo_post(fd_g_local, pmsg) ); 778 } 779 780 /* We're done with this message */ 781 return 0; 782 } 783 784 785 /* The ROUTING-OUT message processing */ 786 static int msg_rt_out(struct msg ** pmsg) 787 { 788 struct rt_data * rtd = NULL; 789 struct msg_hdr * hdr; 790 int is_req = 0; 791 int ret; 792 struct fd_list * li, *candidates; 793 struct avp * avp; 794 struct rtd_candidate * c; 795 796 /* Read the message header */ 797 CHECK_FCT( fd_msg_hdr(*pmsg, &hdr) ); 798 is_req = hdr->msg_flags & CMD_FLAG_REQUEST; 799 800 /* For answers, the routing is very easy */ 801 if ( ! is_req ) { 802 struct msg * qry; 803 char * qry_src = NULL; 804 struct msg_hdr * qry_hdr; 805 struct fd_peer * peer = NULL; 806 807 /* Retrieve the corresponding query and its origin */ 808 CHECK_FCT( fd_msg_answ_getq( *pmsg, &qry ) ); 809 CHECK_FCT( fd_msg_source_get( qry, &qry_src ) ); 810 811 ASSERT( qry_src ); /* if it is NULL, the message should have been in the LOCAL queue! */ 812 813 /* Find the peer corresponding to this name */ 814 CHECK_FCT( fd_peer_getbyid( qry_src, (void *) &peer ) ); 815 if ((!peer) || (peer->p_hdr.info.runtime.pir_state != STATE_OPEN)) { 816 TRACE_DEBUG(INFO, "Unable to forward answer message to peer '%s', deleted or not in OPEN state.", qry_src); 817 fd_msg_dump_walk(INFO, *pmsg); 818 fd_msg_free(*pmsg); 819 *pmsg = NULL; 820 return 0; 821 } 822 823 /* We must restore the hop-by-hop id */ 824 CHECK_FCT( fd_msg_hdr(qry, &qry_hdr) ); 825 hdr->msg_hbhid = qry_hdr->msg_hbhid; 826 827 /* Push the message into this peer */ 828 CHECK_FCT( fd_out_send(pmsg, NULL, peer) ); 829 830 /* We're done with this answer */ 831 return 0; 832 } 833 834 /* From that point, the message is a request */ 835 836 /* Get the routing data out of the message if any (in case of re-transmit) */ 837 CHECK_FCT( fd_msg_rt_get ( *pmsg, &rtd ) ); 838 839 /* If there is no routing data already, let's create it */ 840 if (rtd == NULL) { 841 CHECK_FCT( fd_rtd_init(&rtd) ); 842 843 /* Add all peers in OPEN state */ 844 CHECK_FCT( pthread_rwlock_rdlock(&fd_g_activ_peers_rw) ); 845 for (li = fd_g_activ_peers.next; li != &fd_g_activ_peers; li = li->next) { 846 struct fd_peer * p = (struct fd_peer *)li->o; 847 CHECK_FCT_DO( ret = fd_rtd_candidate_add(rtd, p->p_hdr.info.pi_diamid), { CHECK_FCT_DO( pthread_rwlock_unlock(&fd_g_activ_peers_rw), ); return ret; } ); 848 } 849 CHECK_FCT( pthread_rwlock_unlock(&fd_g_activ_peers_rw) ); 850 851 /* Now let's remove all peers from the Route-Records */ 852 CHECK_FCT( fd_msg_browse(*pmsg, MSG_BRW_FIRST_CHILD, &avp, NULL) ); 853 while (avp) { 854 struct avp_hdr * ahdr; 855 struct fd_pei error_info; 856 CHECK_FCT( fd_msg_avp_hdr( avp, &ahdr ) ); 857 858 if ((ahdr->avp_code == AC_ROUTE_RECORD) && (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) ) { 859 /* Parse this AVP */ 860 CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ), 861 { 862 if (error_info.pei_errcode) { 863 CHECK_FCT( return_error( pmsg, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) ); 864 return 0; 865 } else { 866 return ret; 867 } 868 } ); 869 ASSERT( ahdr->avp_value ); 870 /* Remove this value from the list */ 871 fd_rtd_candidate_del(rtd, (char *)ahdr->avp_value->os.data, ahdr->avp_value->os.len); 872 } 873 874 /* Go to next AVP */ 875 CHECK_FCT( fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL) ); 876 } 877 } 878 879 /* Note: we reset the scores and pass the message to the callbacks, maybe we could re-use the saved scores when we have received an error ? */ 880 881 /* Ok, we have our list in rtd now, let's (re)initialize the scores */ 882 fd_rtd_candidate_extract(rtd, &candidates, FD_SCORE_INI); 883 884 /* Pass the list to registered callbacks (even if it is empty) */ 885 { 886 CHECK_FCT( pthread_rwlock_rdlock( &rt_out_lock ) ); 887 pthread_cleanup_push( fd_cleanup_rwlock, &rt_out_lock ); 888 889 /* We call the cb by reverse priority order */ 890 for ( li = rt_out_list.prev ; li != &rt_out_list ; li = li->prev ) { 891 struct rt_hdl * rh = (struct rt_hdl *)li; 892 893 TRACE_DEBUG(ANNOYING, "Calling next OUT callback on %p : %p (prio %d)", *pmsg, rh->rt_out_cb, rh->prio); 894 CHECK_FCT_DO( ret = (*rh->rt_out_cb)(rh->cbdata, *pmsg, candidates), 895 { 896 TRACE_DEBUG(INFO, "An OUT routing callback returned an error (%s) ! Message discarded.", strerror(ret)); 897 fd_msg_dump_walk(INFO, *pmsg); 898 fd_msg_free(*pmsg); 899 *pmsg = NULL; 900 break; 901 } ); 902 } 903 904 pthread_cleanup_pop(0); 905 CHECK_FCT( pthread_rwlock_unlock( &rt_out_lock ) ); 906 907 /* If an error occurred, skip to the next message */ 908 if (! *pmsg) { 909 if (rtd) 910 fd_rtd_free(&rtd); 911 return 0; 912 } 913 } 914 915 /* Order the candidate peers by score attributed by the callbacks */ 916 CHECK_FCT( fd_rtd_candidate_reorder(candidates) ); 917 918 /* Save the routing information in the message */ 919 CHECK_FCT( fd_msg_rt_associate ( *pmsg, &rtd ) ); 920 921 /* Now try sending the message */ 922 for (li = candidates->prev; li != candidates; li = li->prev) { 923 struct fd_peer * peer; 924 925 c = (struct rtd_candidate *) li; 926 927 /* Stop when we have reached the end of valid candidates */ 928 if (c->score < 0) 929 break; 930 931 /* Search for the peer */ 932 CHECK_FCT( fd_peer_getbyid( c->diamid, (void *)&peer ) ); 933 934 if (peer && (peer->p_hdr.info.runtime.pir_state == STATE_OPEN)) { 935 /* Send to this one */ 936 CHECK_FCT_DO( fd_out_send(pmsg, NULL, peer), continue ); 937 938 /* If the sending was successful */ 939 break; 940 } 941 } 942 943 /* If the message has not been sent, return an error */ 944 if (*pmsg) { 945 TRACE_DEBUG(INFO, "Could not send the following message, replying with UNABLE_TO_DELIVER"); 946 fd_msg_dump_walk(INFO, *pmsg); 947 return_error( pmsg, "DIAMETER_UNABLE_TO_DELIVER", "No suitable candidate to route the message to", NULL); 948 } 949 950 /* We're done with this message */ 951 952 return 0; 953 } 954 955 956 /********************************************************************************/ 957 /* Management of the threads */ 958 /********************************************************************************/ 959 960 /* Note: in the first version, we only create one thread of each kind. 961 We could improve the scalability by using the threshold feature of the queues 962 to create additional threads if a queue is filling up, or at least giving a configurable 963 number of threads of each kind. 964 */ 965 966 /* Control of the threads */ 967 static enum { RUN = 0, STOP = 1 } order_val = RUN; 968 static pthread_mutex_t order_lock = PTHREAD_MUTEX_INITIALIZER; 969 970 /* Threads report their status */ 971 enum thread_state { INITIAL = 0, RUNNING = 1, TERMINATED = 2 }; 972 static void cleanup_state(void * state_loc) 973 { 974 if (state_loc) 975 *(enum thread_state *)state_loc = TERMINATED; 976 } 977 978 /* This is the common thread code (same for routing and dispatching) */ 979 static void * process_thr(void * arg, int (*action_cb)(struct msg ** pmsg), struct fifo * queue, char * action_name) 980 { 981 TRACE_ENTRY("%p %p %p %p", arg, action_cb, queue, action_name); 982 983 /* Set the thread name */ 984 { 985 char buf[48]; 986 snprintf(buf, sizeof(buf), "%s (%p)", action_name, arg); 987 fd_log_threadname ( buf ); 988 } 989 990 /* The thread reports its status when canceled */ 991 CHECK_PARAMS_DO(arg, return NULL); 992 pthread_cleanup_push( cleanup_state, arg ); 993 994 /* Mark the thread running */ 995 *(enum thread_state *)arg = RUNNING; 996 997 do { 998 struct msg * msg; 999 1000 /* Test the current order */ 1001 { 1002 int must_stop; 1003 CHECK_POSIX_DO( pthread_mutex_lock(&order_lock), goto end ); /* we lock to flush the caches */ 1004 must_stop = (order_val == STOP); 1005 CHECK_POSIX_DO( pthread_mutex_unlock(&order_lock), goto end ); 1006 if (must_stop) 1007 goto end; 1008 1009 pthread_testcancel(); 1010 } 1011 1012 /* Ok, we are allowed to run */ 1013 1014 /* Get the next message from the queue */ 1015 { 1016 int ret; 1017 CHECK_FCT_DO( ret = fd_fifo_get ( queue, &msg ), 1018 { 1019 if (ret == EPIPE) 1020 /* The queue was destroyed, we are probably exiting */ 1021 goto end; 1022 /* another error occurred */ 1023 goto fatal_error; 1024 } ); 1025 } 1026 1027 if (TRACE_BOOL(FULL)) { 1028 TRACE_DEBUG(FULL, "Picked next message"); 1029 fd_msg_dump_one(ANNOYING, msg); 1030 } 1031 1032 /* Now process the message */ 1033 CHECK_FCT_DO( (*action_cb)(&msg), goto fatal_error); 1034 1035 /* We're done with this message */ 1036 1037 } while (1); 1038 1039 fatal_error: 1040 TRACE_DEBUG(INFO, "An unrecoverable error occurred, %s thread is terminating...", action_name); 1041 CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); 1042 1043 end: 1044 /* Mark the thread as terminated */ 1045 pthread_cleanup_pop(1); 1046 return NULL; 1047 } 1048 1049 /* The dispatch thread */ 1050 static void * dispatch_thr(void * arg) 1051 { 1052 return process_thr(arg, msg_dispatch, fd_g_local, "Dispatch"); 1053 } 1054 1055 /* The (routing-in) thread -- see description in freeDiameter.h */ 1056 static void * routing_in_thr(void * arg) 1057 { 1058 return process_thr(arg, msg_rt_in, fd_g_incoming, "Routing-IN"); 1059 } 1060 1061 /* The (routing-out) thread -- see description in freeDiameter.h */ 1062 static void * routing_out_thr(void * arg) 1063 { 1064 return process_thr(arg, msg_rt_out, fd_g_outgoing, "Routing-OUT"); 1065 } 1066 1067 1068 /********************************************************************************/ 1076 1069 /* The functions for the other files */ 1077 1070 /********************************************************************************/ 1078 1071 1079 /* Later: TODO("Set thresholds on queues");*/1072 /* Later: make this more dynamic */ 1080 1073 static pthread_t dispatch = (pthread_t)NULL; 1081 1074 static enum thread_state disp_state = INITIAL; … … 1112 1105 } 1113 1106 1114 /* Stop the thread after up to one second of wait */ 1115 int fd_rtdisp_fini(void) 1116 { 1117 /* Destroy the local queue */ 1118 CHECK_FCT_DO( fd_queues_fini_disp(), /* ignore */); 1107 static void stop_thread_delayed(enum thread_state *st, pthread_t * thr, char * th_name) 1108 { 1109 TRACE_ENTRY("%p %p", st, thr); 1110 CHECK_PARAMS_DO(st && thr, return); 1119 1111 1120 1112 /* Wait for a second for the thread to complete, by monitoring my_state */ 1121 if ( disp_state!= TERMINATED) {1122 TRACE_DEBUG(INFO, "Waiting for the dispatch thread to have a chance to terminate");1113 if (*st != TERMINATED) { 1114 TRACE_DEBUG(INFO, "Waiting for the %s thread to have a chance to terminate", th_name); 1123 1115 do { 1124 1116 struct timespec ts, ts_final; … … 1130 1122 1131 1123 while (TS_IS_INFERIOR( &ts, &ts_final )) { 1132 if ( disp_state== TERMINATED)1124 if (*st == TERMINATED) 1133 1125 break; 1134 1126 … … 1140 1132 1141 1133 /* Now stop the thread and reclaim its resources */ 1142 CHECK_FCT_DO( fd_thr_term(&dispatch ), /* continue */); 1143 1144 1145 TODO("Add terminating the routing threads"); 1134 CHECK_FCT_DO( fd_thr_term(thr ), /* continue */); 1135 1136 } 1137 1138 /* Stop the thread after up to one second of wait */ 1139 int fd_rtdisp_fini(void) 1140 { 1141 /* Destroy the incoming queue */ 1142 CHECK_FCT_DO( fd_queues_fini(&fd_g_incoming), /* ignore */); 1143 1144 /* Stop the routing IN thread */ 1145 stop_thread_delayed(&in_state, &rt_in, "IN routing"); 1146 1147 /* Destroy the outgoing queue */ 1148 CHECK_FCT_DO( fd_queues_fini(&fd_g_outgoing), /* ignore */); 1149 1150 /* Stop the routing OUT thread */ 1151 stop_thread_delayed(&out_state, &rt_out, "OUT routing"); 1152 1153 /* Destroy the local queue */ 1154 CHECK_FCT_DO( fd_queues_fini(&fd_g_local), /* ignore */); 1155 1156 /* Stop the Dispatch thread */ 1157 stop_thread_delayed(&disp_state, &dispatch, "Dispatching"); 1158 1146 1159 return 0; 1147 1160 } … … 1163 1176 } 1164 1177 1178 1179 /********************************************************************************/ 1180 /* For extensiosn to register a new appl */ 1181 /********************************************************************************/ 1165 1182 1166 1183 /* Add an application into the peer's supported apps */ -
include/freeDiameter/freeDiameter.h
r105 r124 562 562 enum fd_rt_out_score { 563 563 FD_SCORE_NO_DELIVERY = -70, /* We should not send this message to this candidate */ 564 FD_SCORE_INI = -2, /* All candidates are initialized with this value */ 564 565 FD_SCORE_LOAD_BALANCE = 1, /* Use this to differentiate between several peers with the same score */ 565 566 FD_SCORE_DEFAULT = 5, /* The peer is a default route for all messages */ -
include/freeDiameter/libfreeDiameter.h
r114 r124 1633 1633 1634 1634 /* Extract the list of valid candidates, and initialize their scores to 0 */ 1635 void fd_rtd_candidate_extract(struct rt_data * rtd, struct fd_list ** candidates );1635 void fd_rtd_candidate_extract(struct rt_data * rtd, struct fd_list ** candidates, int ini_score); 1636 1636 1637 1637 /* If a peer returned a protocol error for this message, save it so that we don't try to send it there again */ … … 2606 2606 * PARAMETERS: 2607 2607 * queue : The queue from which the element must be retrieved. 2608 * msg : On return, the message is stored here.2608 * item : On return, the first element of the queue is stored here. 2609 2609 * 2610 2610 * DESCRIPTION: -
libfreeDiameter/rt_data.c
r84 r124 227 227 228 228 /* Extract the list of valid candidates, and initialize their scores to 0 */ 229 void fd_rtd_candidate_extract(struct rt_data * rtd, struct fd_list ** candidates )229 void fd_rtd_candidate_extract(struct rt_data * rtd, struct fd_list ** candidates, int ini_score) 230 230 { 231 231 TRACE_ENTRY("%p %p", rtd, candidates); … … 236 236 237 237 if (rtd->extracted) { 238 /* Reset all scores to 0*/238 /* Reset all scores to INITIAL score */ 239 239 struct fd_list * li; 240 240 for (li = rtd->candidates.next; li != &rtd->candidates; li = li->next) { 241 241 struct rtd_candidate * c = (struct rtd_candidate *) li; 242 c->score = 0;242 c->score = ini_score; 243 243 } 244 244 }
Note: See TracChangeset
for help on using the changeset viewer.