# HG changeset patch # User Sebastien Decugis # Date 1371435117 -28800 # Node ID 76ac4bb75f0ef130e52e1e23788320ab02dbeda2 # Parent e1ced4db7f6747f542c1b27d7e589249edde20b1# Parent 98478a8aabb17a9bd2dbb50eef7ff96cbed0e9c5 Merged with latest proposed version diff -r e1ced4db7f67 -r 76ac4bb75f0e .hgtags diff -r e1ced4db7f67 -r 76ac4bb75f0e contrib/debian/changelog --- a/contrib/debian/changelog Tue Jun 11 18:13:29 2013 +0800 +++ b/contrib/debian/changelog Mon Jun 17 10:11:57 2013 +0800 @@ -7,13 +7,14 @@ * New extension: rt_load_balance (load balancer based on pending queue size). * New extension: rt_busypeers. See doc/rt_busypeers.conf.sample. * New extension: dbg_msg_timings. Measures timing of message operations. - * New extension: dbg_msg_dumps. Use to visualize all received and sent messages. + * New extension: dbg_msg_dumps. Use to control hooks display. * New API (fd_hook_*) for extensions to control messages logging & profiling * New API (fd_stats_*) for extensions to monitor framework state (e.g. SNMP implem) * API changes: all the fd_*_dump functions now return malloc'd strings instead of logging directly. * Updated dbg_monitoring extension to use the new API * New script to generate dictionary extensions from org file (see contrib/tools) - * New compilation option: WORKAROUND_ACCEPT_INVALID_VSAI to improve interoperability. + * New compilation option: WORKAROUND_ACCEPT_INVALID_VSAI to improve compatibility + with invalid Vendor-Specific-Application-Id AVPs received from some equipments (e.g. Cisco). * New compilation option: DISABLE_PEER_EXPIRY for use in test environments. * Extensions are now also searched in LD_LIBRARY_PATH. * Copy Proxy-Info AVP automatically in new answers. @@ -27,8 +28,10 @@ * Allow running without TLS configuration. * Upgraded SCTP code to comply with RFC 6458 * Using default secure Diameter port number 5658 as per RFC 6733 + * Updated TLS code for performance improvements with new GNU TLS. + * Fix interlocking problem when large number of requests were failed over. - -- Sebastien Decugis Mon, 03 Jun 2013 14:20:05 +0800 + -- Sebastien Decugis Fri, 14 Jun 2013 17:36:07 +0800 freediameter (1.1.6) UNRELEASED; urgency=low diff -r e1ced4db7f67 -r 76ac4bb75f0e doc/freediameter.conf.sample --- a/doc/freediameter.conf.sample Tue Jun 11 18:13:29 2013 +0800 +++ b/doc/freediameter.conf.sample Mon Jun 17 10:11:57 2013 +0800 @@ -30,10 +30,11 @@ # The port this peer is listening on for incoming TLS-protected connections (TCP and SCTP). # See TLS_old_method for more information about TLS flavours. +# Note: we use TLS/SCTP instead of DTLS/SCTP at the moment. This will change in future version of freeDiameter. # Default: 5658. Use 0 to disable. #SecPort = 5658; -# freeDiameter 1.2.0 introduces the support of DTLS over SCTP (RFC6083) instead of TLS over SCTP (RFC3436), +# freeDiameter now supports DTLS over SCTP (RFC6083) instead of TLS over SCTP (RFC3436), # as specified in RFC6733. If you need compatibility with older implementation that use TLS over SCTP, you # can open an additional SCTP server port using TLS/SCTP by specifying the following parameter. # Note that no TCP server is started on the following port. @@ -84,6 +85,16 @@ #ListenOn = "2001:200:903:2::202:1"; #ListenOn = "fe80::21c:5ff:fe98:7d62%eth0"; + +############################################################## +## Server configuration + +# How many Diameter peers are allowed to be connecting at the same time ? +# This parameter limits the number of incoming connections from the time +# the connection is accepted until the first CER is received. +# Default: 5 unidentified clients in paralel. +#ThreadsPerServer = 5; + ############################################################## ## TLS Configuration @@ -199,6 +210,15 @@ # test_* : dummy extensions that are useful only in testing environments. +# The dbg_msg_dump.fdx extension allows you to tweak the way freeDiameter displays some +# information about some events. This extension does not actually use a configuration file +# but receives directly a parameter in the string passed to the extension. Here are some examples: +# LoadExtension = "dbg_msg_dump.fdx" : "0x1111"; # Removes all default hooks, very quiet even in case of errors. +# LoadExtension = "dbg_msg_dump.fdx" : "0x2222"; # Display all events with few details. +# LoadExtension = "dbg_msg_dump.fdx" : "0x80"; # Dump complete information about sent and received messages. +# See the top of extensions/dbg_msg/dump/dbg_msg.dump.c file for more details on the value. + + ############################################################## ## Peers configuration @@ -217,8 +237,8 @@ #ConnectPeer = "diameterid" [ { parameter1; parameter2; ...} ] ; # Parameters that can be specified in the peer's parameter list: # No_TCP; No_SCTP; No_IP; No_IPv6; Prefer_TCP; TLS_old_method; -# No_TLS; # assume transparent security instead of TLS -# SctpSec3436; # Use TLS/SCTP instead of DTLS/SCTP to protect SCTP associations with this peer. +# No_TLS; # assume transparent security instead of TLS. +# SctpSec3436; # Use TLS/SCTP instead of DTLS/SCTP to protect SCTP associations with this peer (not recommended). # Port = 5658; # The port to connect to # TcTimer = 30; # TwTimer = 30; diff -r e1ced4db7f67 -r 76ac4bb75f0e doc/rt_default.conf.sample --- a/doc/rt_default.conf.sample Tue Jun 11 18:13:29 2013 +0800 +++ b/doc/rt_default.conf.sample Mon Jun 17 10:11:57 2013 +0800 @@ -20,7 +20,7 @@ # discovery extension). # # The default forwarding behavior of freeDiameter is: -# - if the message contains a Destination-Host AVP, and this the designated peer is an eligible candidate, send to this peer. +# - if the message contains a Destination-Host AVP, and the designated peer is an eligible candidate, send to this peer. # - if a peer does not support the message application or Relay application, give it a penalty for this message # (it means that unless overwritten by an extension, the message will not be sent to that peer) # - if one of the eligible peer advertised a realm matching the message's Destination-Realm, send to this peer. diff -r e1ced4db7f67 -r 76ac4bb75f0e doc/test_app.conf.sample --- a/doc/test_app.conf.sample Tue Jun 11 18:13:29 2013 +0800 +++ b/doc/test_app.conf.sample Mon Jun 17 10:11:57 2013 +0800 @@ -1,6 +1,6 @@ ####################### # This file contains the description of configuration and general information about the -# "App_test" extension. +# "test_app" extension. # This extension provides a simple way to send a predefined message over the Diameter Network. # It may be used to test the Routing or other base mechanisms from the Diameter network. @@ -63,5 +63,5 @@ # user-name = "user@server.foreign.net"; # The signal that triggers sending the test message -# Note: Symbolic names are now recognized, you must use integers +# Note: Symbolic names are not recognized, you must use integers # signal = 10; diff -r e1ced4db7f67 -r 76ac4bb75f0e extensions/app_radgw/rgw_clients.c --- a/extensions/app_radgw/rgw_clients.c Tue Jun 11 18:13:29 2013 +0800 +++ b/extensions/app_radgw/rgw_clients.c Mon Jun 17 10:11:57 2013 +0800 @@ -203,7 +203,7 @@ } /* If we reach this part, some fatal error was encountered */ - CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); + CHECK_FCT_DO(fd_core_shutdown(), ); TRACE_DEBUG(FULL, "Thread terminated"); return NULL; } diff -r e1ced4db7f67 -r 76ac4bb75f0e extensions/dbg_msg_dumps/dbg_msg_dumps.c --- a/extensions/dbg_msg_dumps/dbg_msg_dumps.c Tue Jun 11 18:13:29 2013 +0800 +++ b/extensions/dbg_msg_dumps/dbg_msg_dumps.c Mon Jun 17 10:11:57 2013 +0800 @@ -36,36 +36,296 @@ /* This extension uses the hooks mechanism to display the full content of received and sent messages, for learning & debugging purpose. Do NOT use this extension in production environment because it will slow down all operation. */ + +/* You can add a configuration parameter on the LoadExtension line, e.g. +LoadExtension="dbg_msg_dump.fdx":"0x149"; +The value is an hexadecimal value with the following bits meaning: */ +#define HK_ERRORS_QUIET 0x0001 /* errors are not dumped -- removes the default handling as well */ +#define HK_ERRORS_COMPACT 0x0002 /* errors in compact mode */ +#define HK_ERRORS_FULL 0x0004 /* errors in full mode (1 line with all the data) */ +#define HK_ERRORS_TREE 0x0008 /* errors in treeview mode (message split over multiple lines) */ + +#define HK_SNDRCV_QUIET 0x0010 /* send+rcv are not dumped -- removes the default handling as well */ +#define HK_SNDRCV_COMPACT 0x0020 /* send+rcv in compact mode */ +#define HK_SNDRCV_FULL 0x0040 /* send+rcv in full mode */ +#define HK_SNDRCV_TREE 0x0080 /* send+rcv in tree mode */ + +#define HK_ROUTING_QUIET 0x0100 /* routing decisions are not dumped -- removes the default handling as well */ +#define HK_ROUTING_COMPACT 0x0200 /* routing decisions in compact mode */ + +#define HK_PEERS_QUIET 0x1000 /* peers connections events are not dumped -- removes the default handling as well */ +#define HK_PEERS_COMPACT 0x2000 /* peers connections events in compact mode */ +/* +Default value is HK_ERRORS_DETAIL + HK_SNDRCV_DETAIL + HK_PEERS_COMPACT +*/ #include -static struct fd_hook_hdl *md_hdl = NULL; +static struct fd_hook_hdl *md_hdl[4] = {NULL,NULL,NULL,NULL}; +static uint32_t dump_level = HK_ERRORS_TREE | HK_SNDRCV_TREE | HK_PEERS_COMPACT; /* default */ +static char * buf = NULL; +static size_t len; +static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; /* The callback called when messages are received and sent */ -static void md_hook_cb(enum fd_hook_type type, struct msg * msg, struct peer_hdr * peer, void * other, struct fd_hook_permsgdata *pmd, void * regdata) +static void md_hook_cb_tree(enum fd_hook_type type, struct msg * msg, struct peer_hdr * peer, void * other, struct fd_hook_permsgdata *pmd, void * regdata) { - char * buf = NULL; - size_t len; + char * peer_name = peer ? peer->info.pi_diamid : ""; + + CHECK_POSIX_DO( pthread_mutex_lock(&mtx), ); + + if (msg) { + CHECK_MALLOC_DO( fd_msg_dump_treeview(&buf, &len, NULL, msg, fd_g_config->cnf_dict, (type == HOOK_MESSAGE_PARSING_ERROR) ? 0 : 1, 1), + { LOG_E("Error while dumping a message"); pthread_mutex_unlock(&mtx); return; } ); + } + + switch (type) { +/* errors */ + case HOOK_MESSAGE_FAILOVER: + LOG_E("FAILOVER from '%s':", peer_name); + LOG_SPLIT(FD_LOG_ERROR, " ", buf, NULL); + break; + case HOOK_MESSAGE_PARSING_ERROR: + if (msg) { + DiamId_t id = NULL; + if (!fd_msg_source_get( msg, &id, NULL )) + id = (DiamId_t)""; + if (!id) + id = (DiamId_t)""; + LOG_E("PARSING ERROR: '%s' from '%s': ", (char *)other, (char *)id); + LOG_SPLIT(FD_LOG_ERROR, " ", buf, NULL); + } else { + struct fd_cnx_rcvdata *rcv_data = other; + CHECK_MALLOC_DO(fd_dump_extend_hexdump(&buf, &len, NULL, rcv_data->buffer, rcv_data->length, 0, 0), break); + LOG_E("PARSING ERROR: %zdB msg from '%s': %s", rcv_data->length, peer_name, buf); + } + break; + case HOOK_MESSAGE_ROUTING_ERROR: + LOG_E("ROUTING ERROR '%s' for: ", (char *)other); + LOG_SPLIT(FD_LOG_ERROR, " ", buf, NULL); + break; + case HOOK_MESSAGE_DROPPED: + LOG_E("DROPPED '%s'", (char *)other); + LOG_SPLIT(FD_LOG_ERROR, " ", buf, NULL); + break; + +/* send receive */ + case HOOK_MESSAGE_RECEIVED: + LOG_N("RCV from '%s':", peer_name); + LOG_SPLIT(FD_LOG_NOTICE, " ", buf, NULL); + break; + case HOOK_MESSAGE_SENT: + LOG_N("SND to '%s':", peer_name); + LOG_SPLIT(FD_LOG_NOTICE, " ", buf, NULL); + break; + +/* The following are not received in this hook */ + case HOOK_MESSAGE_LOCAL: + case HOOK_MESSAGE_ROUTING_FORWARD: + case HOOK_MESSAGE_ROUTING_LOCAL: + + case HOOK_PEER_CONNECT_FAILED: + case HOOK_PEER_CONNECT_SUCCESS: + + case HOOK_DATA_RECEIVED: + break; + } + + CHECK_POSIX_DO( pthread_mutex_unlock(&mtx), ); +} + +static void md_hook_cb_full(enum fd_hook_type type, struct msg * msg, struct peer_hdr * peer, void * other, struct fd_hook_permsgdata *pmd, void * regdata) +{ + char * peer_name = peer ? peer->info.pi_diamid : ""; + + CHECK_POSIX_DO( pthread_mutex_lock(&mtx), ); + + if (msg) { + CHECK_MALLOC_DO( fd_msg_dump_full(&buf, &len, NULL, msg, fd_g_config->cnf_dict, (type == HOOK_MESSAGE_PARSING_ERROR) ? 0 : 1, 1), + { LOG_E("Error while dumping a message"); pthread_mutex_unlock(&mtx); return; } ); + } + + switch (type) { +/* errors */ + case HOOK_MESSAGE_FAILOVER: + LOG_E("FAILOVER from '%s': %s", peer_name, buf); + break; + case HOOK_MESSAGE_PARSING_ERROR: + if (msg) { + DiamId_t id = NULL; + if (!fd_msg_source_get( msg, &id, NULL )) + id = (DiamId_t)""; + if (!id) + id = (DiamId_t)""; + LOG_E("PARSING ERROR: '%s' from '%s': %s", (char *)other, (char *)id, buf); + } else { + struct fd_cnx_rcvdata *rcv_data = other; + CHECK_MALLOC_DO(fd_dump_extend_hexdump(&buf, &len, NULL, rcv_data->buffer, rcv_data->length, 0, 0), break); + LOG_E("PARSING ERROR: %zdB msg from '%s': %s", rcv_data->length, peer_name, buf); + } + break; + case HOOK_MESSAGE_ROUTING_ERROR: + LOG_E("ROUTING ERROR '%s' for: %s", (char *)other, buf); + break; + case HOOK_MESSAGE_DROPPED: + LOG_E("DROPPED '%s' %s", (char *)other, buf); + break; - CHECK_MALLOC_DO( fd_msg_dump_treeview(&buf, &len, NULL, msg, fd_g_config->cnf_dict, 1, 1), - { LOG_E("Error while dumping a message"); return; } ); +/* send receive */ + case HOOK_MESSAGE_RECEIVED: + LOG_N("RCV from '%s': %s", peer_name, buf); + break; + case HOOK_MESSAGE_SENT: + LOG_N("SND to '%s': %s", peer_name, buf); + break; + +/* The following are not received in this hook */ + case HOOK_MESSAGE_LOCAL: + case HOOK_MESSAGE_ROUTING_FORWARD: + case HOOK_MESSAGE_ROUTING_LOCAL: + + case HOOK_PEER_CONNECT_FAILED: + case HOOK_PEER_CONNECT_SUCCESS: + + case HOOK_DATA_RECEIVED: + break; + } + + CHECK_POSIX_DO( pthread_mutex_unlock(&mtx), ); +} + +static void md_hook_cb_compact(enum fd_hook_type type, struct msg * msg, struct peer_hdr * peer, void * other, struct fd_hook_permsgdata *pmd, void * regdata) +{ + char * peer_name = peer ? peer->info.pi_diamid : ""; + + CHECK_POSIX_DO( pthread_mutex_lock(&mtx), ); + + if (msg) { + CHECK_MALLOC_DO( fd_msg_dump_summary(&buf, &len, NULL, msg, fd_g_config->cnf_dict, 0, 0), + { LOG_E("Error while dumping a message"); pthread_mutex_unlock(&mtx); return; } ); + } - LOG_N("%s %s:", - (type == HOOK_MESSAGE_RECEIVED) ? "RCV FROM" : "SENT TO", - peer ? peer->info.pi_diamid:""); - LOG_SPLIT( FD_LOG_NOTICE, " ", buf ?:"", NULL); + switch (type) { +/* errors */ + case HOOK_MESSAGE_FAILOVER: + LOG_E("FAILOVER from '%s': %s", peer_name, buf); + break; + case HOOK_MESSAGE_PARSING_ERROR: + if (msg) { + DiamId_t id = NULL; + if (!fd_msg_source_get( msg, &id, NULL )) + id = (DiamId_t)""; + if (!id) + id = (DiamId_t)""; + LOG_E("PARSING ERROR: '%s' from '%s': %s", (char *)other, (char *)id, buf); + } else { + struct fd_cnx_rcvdata *rcv_data = other; + CHECK_MALLOC_DO(fd_dump_extend_hexdump(&buf, &len, NULL, rcv_data->buffer, rcv_data->length, 0, 0), break); + LOG_E("PARSING ERROR: %zdB msg from '%s': %s", rcv_data->length, peer_name, buf); + } + break; + case HOOK_MESSAGE_ROUTING_ERROR: + LOG_E("ROUTING ERROR '%s' for: %s", (char *)other, buf); + break; + case HOOK_MESSAGE_DROPPED: + LOG_E("DROPPED '%s' %s", (char *)other, buf); + break; + +/* send receive */ + case HOOK_MESSAGE_RECEIVED: + LOG_N("RCV from '%s': %s", peer_name, buf); + break; + case HOOK_MESSAGE_SENT: + LOG_N("SND to '%s': %s", peer_name, buf); + break; + +/* routing */ + case HOOK_MESSAGE_LOCAL: + LOG_N("ISSUED: %s", buf); + break; + case HOOK_MESSAGE_ROUTING_FORWARD: + LOG_N("FORWARDING: %s", buf); + break; + case HOOK_MESSAGE_ROUTING_LOCAL: + LOG_N("DISPATCHING: %s", buf); + break; + +/* peers */ + case HOOK_PEER_CONNECT_FAILED: + LOG_N("CONNECT FAILED to %s: %s", peer_name, (char *)other); + break; + case HOOK_PEER_CONNECT_SUCCESS: { + char protobuf[40]; + if (peer) { + CHECK_FCT_DO(fd_peer_cnx_proto_info(peer, protobuf, sizeof(protobuf)), break ); + } else { + protobuf[0] = '-'; + protobuf[1] = '\0'; + } + LOG_N("CONNECTED TO '%s' (%s): %s", peer_name, protobuf, buf); + } + break; +/* Not handled */ + case HOOK_DATA_RECEIVED: + break; + } + + CHECK_POSIX_DO( pthread_mutex_unlock(&mtx), ); +} - free(buf); +static void md_hook_cb_quiet(enum fd_hook_type type, struct msg * msg, struct peer_hdr * peer, void * other, struct fd_hook_permsgdata *pmd, void * regdata) +{ } /* Entry point */ static int md_main(char * conffile) { + uint32_t mask_errors, mask_sndrcv, mask_routing, mask_peers; + uint32_t mask_quiet, mask_compact, mask_full, mask_tree; TRACE_ENTRY("%p", conffile); - CHECK_FCT( fd_hook_register( HOOK_MASK( HOOK_MESSAGE_RECEIVED, HOOK_MESSAGE_SENT ), - md_hook_cb, NULL, NULL, &md_hdl) ); + if (conffile != NULL) { + char * endp; + dump_level = (uint8_t)strtoul(conffile, &endp, 16); + CHECK_PARAMS_DO( *endp == '\0', { + LOG_E("Configuration parameter must be in the form \"0xNNNN\""); + return EINVAL; }); + } + + mask_errors = HOOK_MASK( HOOK_MESSAGE_FAILOVER, HOOK_MESSAGE_PARSING_ERROR, HOOK_MESSAGE_ROUTING_ERROR, HOOK_MESSAGE_DROPPED ); + mask_sndrcv = HOOK_MASK( HOOK_MESSAGE_RECEIVED, HOOK_MESSAGE_SENT ); + mask_routing= HOOK_MASK( HOOK_MESSAGE_LOCAL, HOOK_MESSAGE_ROUTING_FORWARD, HOOK_MESSAGE_ROUTING_LOCAL ); + mask_peers = HOOK_MASK( HOOK_PEER_CONNECT_FAILED, HOOK_PEER_CONNECT_SUCCESS ); + + mask_quiet = (dump_level & HK_ERRORS_QUIET) ? mask_errors : 0; + mask_quiet |= (dump_level & HK_SNDRCV_QUIET) ? mask_sndrcv : 0; + mask_quiet |= (dump_level & HK_ROUTING_QUIET) ? mask_routing : 0; + mask_quiet |= (dump_level & HK_PEERS_QUIET) ? mask_peers : 0; + mask_compact = (dump_level & HK_ERRORS_COMPACT) ? mask_errors : 0; + mask_compact |= (dump_level & HK_SNDRCV_COMPACT) ? mask_sndrcv : 0; + mask_compact |= (dump_level & HK_ROUTING_COMPACT) ? mask_routing : 0; + mask_compact |= (dump_level & HK_PEERS_COMPACT) ? mask_peers : 0; + + mask_full = (dump_level & HK_ERRORS_FULL) ? mask_errors : 0; + mask_full |= (dump_level & HK_SNDRCV_FULL) ? mask_sndrcv : 0; + + mask_tree = (dump_level & HK_ERRORS_TREE) ? mask_errors : 0; + mask_tree |= (dump_level & HK_SNDRCV_TREE) ? mask_sndrcv : 0; + + if (mask_quiet) { + CHECK_FCT( fd_hook_register( mask_quiet, md_hook_cb_quiet, NULL, NULL, &md_hdl[0]) ); + } + if (mask_compact) { + CHECK_FCT( fd_hook_register( mask_compact, md_hook_cb_compact, NULL, NULL, &md_hdl[1]) ); + } + if (mask_full) { + CHECK_FCT( fd_hook_register( mask_full, md_hook_cb_full, NULL, NULL, &md_hdl[2]) ); + } + if (mask_tree) { + CHECK_FCT( fd_hook_register( mask_tree, md_hook_cb_tree, NULL, NULL, &md_hdl[3]) ); + } + return 0; } @@ -73,7 +333,10 @@ void fd_ext_fini(void) { TRACE_ENTRY(); - CHECK_FCT_DO( fd_hook_unregister( md_hdl ), ); + if (md_hdl[0]) { CHECK_FCT_DO( fd_hook_unregister( md_hdl[0] ), ); } + if (md_hdl[1]) { CHECK_FCT_DO( fd_hook_unregister( md_hdl[1] ), ); } + if (md_hdl[2]) { CHECK_FCT_DO( fd_hook_unregister( md_hdl[2] ), ); } + if (md_hdl[2]) { CHECK_FCT_DO( fd_hook_unregister( md_hdl[3] ), ); } return ; } diff -r e1ced4db7f67 -r 76ac4bb75f0e extensions/test_app/ta_bench.c --- a/extensions/test_app/ta_bench.c Tue Jun 11 18:13:29 2013 +0800 +++ b/extensions/test_app/ta_bench.c Mon Jun 17 10:11:57 2013 +0800 @@ -198,6 +198,7 @@ static void ta_bench_start() { struct timespec end_time, now; struct ta_stats start, end; + int nsec = 0; /* Save the initial stats */ CHECK_POSIX_DO( pthread_mutex_lock(&ta_conf->stats_lock), ); @@ -205,16 +206,19 @@ CHECK_POSIX_DO( pthread_mutex_unlock(&ta_conf->stats_lock), ); /* We will run for ta_conf->bench_duration seconds */ + LOG_N("Starting benchmark client, %ds", ta_conf->bench_duration); CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &end_time), ); end_time.tv_sec += ta_conf->bench_duration; /* Now loop until timeout is reached */ do { /* Do not create more that NB_CONCURRENT_MESSAGES in paralel */ - int ret = sem_wait(&ta_sem); + int ret = sem_timedwait(&ta_sem, &end_time); if (ret == -1) { ret = errno; - CHECK_POSIX_DO(ret, ); /* Just to log it */ + if (ret != ETIMEDOUT) { + CHECK_POSIX_DO(ret, ); /* Just to log it */ + } break; } @@ -228,32 +232,37 @@ ta_bench_test_message(); } while (1); - /* Save the stats now */ - CHECK_POSIX_DO( pthread_mutex_lock(&ta_conf->stats_lock), ); - CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), ); /* Re-read the time because we might have spent some time wiating for the mutex */ - memcpy(&end, &ta_conf->stats, sizeof(struct ta_stats)); - CHECK_POSIX_DO( pthread_mutex_unlock(&ta_conf->stats_lock), ); - - /* Now, display the statistics */ - fd_log_debug( "------- app_test Benchmark result ---------"); - if (now.tv_nsec >= end_time.tv_nsec) { - fd_log_debug( " Executing for: %d.%06ld sec", - (int)(now.tv_sec + ta_conf->bench_duration - end_time.tv_sec), - (long)(now.tv_nsec - end_time.tv_nsec) / 1000); - } else { - fd_log_debug( " Executing for: %d.%06ld sec", - (int)(now.tv_sec + ta_conf->bench_duration - 1 - end_time.tv_sec), - (long)(now.tv_nsec + 1000000000 - end_time.tv_nsec) / 1000); - } - fd_log_debug( " %llu messages sent", end.nb_sent - start.nb_sent); - fd_log_debug( " %llu error(s) received", end.nb_errs - start.nb_errs); - fd_log_debug( " %llu answer(s) received", end.nb_recv - start.nb_recv); - fd_log_debug( " Overall:"); - fd_log_debug( " fastest: %ld.%06ld sec.", end.shortest / 1000000, end.shortest % 1000000); - fd_log_debug( " slowest: %ld.%06ld sec.", end.longest / 1000000, end.longest % 1000000); - fd_log_debug( " Average: %ld.%06ld sec.", end.avg / 1000000, end.avg % 1000000); - fd_log_debug( " Throughput: %llu messages / sec", (end.nb_recv - start.nb_recv) / (( now.tv_sec + ta_conf->bench_duration - end_time.tv_sec ) + ((now.tv_nsec - end_time.tv_nsec) / 1000000000))); - fd_log_debug( "-------------------------------------"); + do { + CHECK_POSIX_DO( pthread_mutex_lock(&ta_conf->stats_lock), ); + CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), ); /* Re-read the time because we might have spent some time wiating for the mutex */ + memcpy(&end, &ta_conf->stats, sizeof(struct ta_stats)); + CHECK_POSIX_DO( pthread_mutex_unlock(&ta_conf->stats_lock), ); + + /* Now, display the statistics */ + LOG_N( "------- app_test Benchmark results, end sending +%ds ---------", nsec); + if (now.tv_nsec >= end_time.tv_nsec) { + LOG_N( " Executing for: %d.%06ld sec", + (int)(now.tv_sec + ta_conf->bench_duration - end_time.tv_sec), + (long)(now.tv_nsec - end_time.tv_nsec) / 1000); + } else { + LOG_N( " Executing for: %d.%06ld sec", + (int)(now.tv_sec + ta_conf->bench_duration - 1 - end_time.tv_sec), + (long)(now.tv_nsec + 1000000000 - end_time.tv_nsec) / 1000); + } + LOG_N( " %llu messages sent", end.nb_sent - start.nb_sent); + LOG_N( " %llu error(s) received", end.nb_errs - start.nb_errs); + LOG_N( " %llu answer(s) received", end.nb_recv - start.nb_recv); + LOG_N( " Overall:"); + LOG_N( " fastest: %ld.%06ld sec.", end.shortest / 1000000, end.shortest % 1000000); + LOG_N( " slowest: %ld.%06ld sec.", end.longest / 1000000, end.longest % 1000000); + LOG_N( " Average: %ld.%06ld sec.", end.avg / 1000000, end.avg % 1000000); + LOG_N( " Throughput: %llu messages / sec", (end.nb_recv - start.nb_recv) / (( now.tv_sec + ta_conf->bench_duration - end_time.tv_sec ) + ((now.tv_nsec - end_time.tv_nsec) / 1000000000))); + LOG_N( "-------------------------------------"); + + nsec ++; + sleep(1); + } while ( (end.nb_sent - start.nb_sent) > (end.nb_errs - start.nb_errs) + (end.nb_recv - start.nb_recv) ); + LOG_N( "--------------- Test Complete --------------"); } diff -r e1ced4db7f67 -r 76ac4bb75f0e extensions/test_app/test_app.c --- a/extensions/test_app/test_app.c Tue Jun 11 18:13:29 2013 +0800 +++ b/extensions/test_app/test_app.c Mon Jun 17 10:11:57 2013 +0800 @@ -95,7 +95,7 @@ /* Now, loop until canceled */ while (1) { /* Display statistics every XX seconds */ - sleep(ta_conf->bench_duration * 3); + sleep(ta_conf->bench_duration + 3); /* Now, get the current stats */ CHECK_POSIX_DO( pthread_mutex_lock(&ta_conf->stats_lock), ); @@ -135,6 +135,22 @@ return NULL; /* never called */ } +static struct fd_hook_hdl * hookhdl[2] = { NULL, NULL }; +static void ta_hook_cb_silent(enum fd_hook_type type, struct msg * msg, struct peer_hdr * peer, void * other, struct fd_hook_permsgdata *pmd, void * regdata) { +} +static void ta_hook_cb_oneline(enum fd_hook_type type, struct msg * msg, struct peer_hdr * peer, void * other, struct fd_hook_permsgdata *pmd, void * regdata) { + char * buf = NULL; + size_t len; + + CHECK_MALLOC_DO( fd_msg_dump_summary(&buf, &len, NULL, msg, NULL, 0, 0), + { LOG_E("Error while dumping a message"); return; } ); + + LOG_N("{%d} %s: %s", type, (char *)other ?:"", buf ?:""); + + free(buf); +} + + /* entry point */ static int ta_entry(char * conffile) { @@ -171,6 +187,15 @@ /* Advertise the support for the test application in the peer */ CHECK_FCT( fd_disp_app_support ( ta_appli, ta_vendor, 1, 0 ) ); + if (ta_conf->mode & MODE_BENCH) { + /* Register an empty hook to disable the default handling */ + CHECK_FCT( fd_hook_register( HOOK_MASK( HOOK_DATA_RECEIVED, HOOK_MESSAGE_RECEIVED, HOOK_MESSAGE_LOCAL, HOOK_MESSAGE_SENT, HOOK_MESSAGE_ROUTING_FORWARD, HOOK_MESSAGE_ROUTING_LOCAL ), + ta_hook_cb_silent, NULL, NULL, &hookhdl[0]) ); + CHECK_FCT( fd_hook_register( HOOK_MASK( HOOK_MESSAGE_ROUTING_ERROR, HOOK_MESSAGE_DROPPED ), + ta_hook_cb_oneline, NULL, NULL, &hookhdl[1]) ); + + } + /* Start the statistics thread */ CHECK_POSIX( pthread_create(&ta_stats_th, NULL, ta_stats, NULL) ); @@ -184,6 +209,10 @@ ta_cli_fini(); if (ta_conf->mode & MODE_SERV) ta_serv_fini(); + if (hookhdl[0]) + fd_hook_unregister( hookhdl[0] ); + if (hookhdl[1]) + fd_hook_unregister( hookhdl[1] ); CHECK_FCT_DO( fd_thr_term(&ta_stats_th), ); CHECK_POSIX_DO( pthread_mutex_destroy(&ta_conf->stats_lock), ); } diff -r e1ced4db7f67 -r 76ac4bb75f0e extensions/test_netemul/tne_process.c --- a/extensions/test_netemul/tne_process.c Tue Jun 11 18:13:29 2013 +0800 +++ b/extensions/test_netemul/tne_process.c Mon Jun 17 10:11:57 2013 +0800 @@ -280,7 +280,7 @@ error: TRACE_DEBUG(INFO, "A fatal error occurred in test_netemul/process thread!"); ASSERT(0); - CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); + CHECK_FCT_DO(fd_core_shutdown(), ); return NULL; } diff -r e1ced4db7f67 -r 76ac4bb75f0e include/freeDiameter/libfdcore.h --- a/include/freeDiameter/libfdcore.h Tue Jun 11 18:13:29 2013 +0800 +++ b/include/freeDiameter/libfdcore.h Mon Jun 17 10:11:57 2013 +0800 @@ -135,6 +135,7 @@ uint16_t cnf_port_3436; /* Open an additional server port to listen to old TLS/SCTP clients (RFC3436, freeDiameter versions < 1.2.0) */ uint16_t cnf_sctp_str; /* default max number of streams for SCTP associations (def: 30) */ struct fd_list cnf_endpoints; /* the local endpoints to bind the server to. list of struct fd_endpoint. default is empty (bind all). After servers are started, this is the actual list of endpoints including port information. */ + int cnf_thr_srv; /* Number of threads per servers handling the connection state machines */ struct fd_list cnf_apps; /* Applications locally supported (except relay, see flags). Use fd_disp_app_support to add one. list of struct fd_app. */ uint16_t cnf_dispthr; /* Number of dispatch threads to create */ struct { @@ -815,8 +816,8 @@ /* Daemon's codespace: 1000->1999 (1500->1999 defined in fdcore-internal.h) */ enum { - FDEV_TERMINATE = 1000 /* request to terminate */ - ,FDEV_TRIGGER /* Trigger available for extensions. size is sizeof(int), data is int * */ + FDEV_TERMINATE_INT= 1000 /* request to terminate. DO NOT USE. Use fd_core_shutdown() instead. */ + ,FDEV_TRIGGER /* Trigger available for extensions. size is sizeof(int), data is int * */ }; int fd_event_send(struct fifo *queue, int code, size_t datasz, void * data); @@ -1049,7 +1050,7 @@ - {permsgdata} is always NULL for this hook. */ -#define HOOK_PEER_LAST HOOK_PEER_CONNECT_SUCCESS +#define HOOK_LAST HOOK_PEER_CONNECT_SUCCESS }; diff -r e1ced4db7f67 -r 76ac4bb75f0e include/freeDiameter/libfdproto.h --- a/include/freeDiameter/libfdproto.h Tue Jun 11 18:13:29 2013 +0800 +++ b/include/freeDiameter/libfdproto.h Mon Jun 17 10:11:57 2013 +0800 @@ -519,7 +519,7 @@ } } /* the following macro must be replaced with LOG_E or LOG_F */ -# define TRACE_ERROR fd_log_error +# define TRACE_ERROR LOG_E /* The following macros are missing the faillevel information, which indicates at what log level the error case should be displayed. */ @@ -3121,6 +3121,10 @@ #define fd_fifo_post(queue, item) \ fd_fifo_post_int((queue), (void *)(item)) +/* Similar function but does not block. It can cause the number of items in the queue to exceed the maximum set. Do not use for normal operation, +only for failure recovery for example. */ +int fd_fifo_post_noblock( struct fifo * queue, void ** item ); + /* * FUNCTION: fd_fifo_get * diff -r e1ced4db7f67 -r 76ac4bb75f0e libfdcore/cnxctx.c --- a/libfdcore/cnxctx.c Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdcore/cnxctx.c Mon Jun 17 10:11:57 2013 +0800 @@ -292,11 +292,13 @@ fd_sa_sdump_numeric(sa_buf, sa); + LOG_D("Connecting to TCP %s...", sa_buf); + /* Create the socket and connect, which can take some time and/or fail */ { int ret = fd_tcp_client( &sock, sa, addrlen ); if (ret != 0) { - LOG_A("TCP connection to %s failed: %s", sa_buf, strerror(ret)); + LOG_D("TCP connection to %s failed: %s", sa_buf, strerror(ret)); return NULL; } } @@ -347,10 +349,12 @@ fd_sa_sdump_numeric(sa_buf, &((struct fd_endpoint *)(list->next))->sa); + LOG_D("Connecting to SCTP %s:%hu...", sa_buf, port); + { int ret = fd_sctp_client( &sock, no_ip6, port, list ); if (ret != 0) { - LOG_A("SCTP connection to [%s,...] failed: %s", sa_buf, strerror(ret)); + LOG_D("SCTP connection to [%s,...] failed: %s", sa_buf, strerror(ret)); return NULL; } } @@ -610,7 +614,7 @@ fatal: /* An unrecoverable error occurred, stop the daemon */ ASSERT(0); - CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); + CHECK_FCT_DO(fd_core_shutdown(), ); } /* Set the timeout option on the socket */ @@ -620,9 +624,9 @@ /* Set a timeout on the socket so that in any case we are not stuck waiting for something */ memset(&tv, 0, sizeof(tv)); - tv.tv_sec = 3; /* allow 3 seconds timeout for TLS session cleanup */ - CHECK_SYS_DO( setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)), /* best effort only */ ); - CHECK_SYS_DO( setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)), /* Also timeout for sending, to avoid waiting forever */ ); + tv.tv_usec = 100000L; /* 100ms, to react quickly to head-of-the-line blocking. */ + CHECK_SYS_DO( setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)), ); + CHECK_SYS_DO( setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)), ); } @@ -670,23 +674,30 @@ return ret; } -/* Send, for older GNUTLS */ -#ifndef GNUTLS_VERSION_212 -static ssize_t fd_cnx_s_send(struct cnxctx * conn, const void *buffer, size_t length) +/* Send */ +static ssize_t fd_cnx_s_sendv(struct cnxctx * conn, const struct iovec * iov, int iovcnt) { ssize_t ret = 0; - int timedout = 0; + struct timespec ts, now; + CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &ts), return -1 ); again: - ret = send(conn->cc_socket, buffer, length, 0); + ret = writev(conn->cc_socket, iov, iovcnt); /* Handle special case of timeout */ if ((ret < 0) && ((errno == EAGAIN) || (errno == EINTR))) { + ret = -errno; pthread_testcancel(); - if (! fd_cnx_teststate(conn, CC_STATUS_CLOSING )) + + /* Check how much time we were blocked for this sending. */ + CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), return -1 ); + if ( ((now.tv_sec - ts.tv_sec) * 1000 + ((now.tv_nsec - ts.tv_nsec) / 1000000L)) > MAX_HOTL_BLOCKING_TIME) { + LOG_D("Unable to send any data for %dms, closing the connection", MAX_HOTL_BLOCKING_TIME); + } else if (! fd_cnx_teststate(conn, CC_STATUS_CLOSING )) { goto again; /* don't care, just ignore */ - if (!timedout) { - timedout ++; /* allow for one timeout while closing */ - goto again; } + + /* propagate the error */ + errno = -ret; + ret = -1; CHECK_SYS_DO(ret, /* continue */); } @@ -696,33 +707,17 @@ return ret; } -#endif /* GNUTLS_VERSION_212 */ -/* Send */ -static ssize_t fd_cnx_s_sendv(struct cnxctx * conn, const struct iovec * iov, int iovcnt) +/* Send, for older GNUTLS */ +#ifndef GNUTLS_VERSION_212 +static ssize_t fd_cnx_s_send(struct cnxctx * conn, const void *buffer, size_t length) { - ssize_t ret = 0; - int timedout = 0; -again: - ret = writev(conn->cc_socket, iov, iovcnt); - /* Handle special case of timeout */ - if ((ret < 0) && ((errno == EAGAIN) || (errno == EINTR))) { - pthread_testcancel(); - if (! fd_cnx_teststate(conn, CC_STATUS_CLOSING )) - goto again; /* don't care, just ignore */ - if (!timedout) { - timedout ++; /* allow for one timeout while closing */ - goto again; - } - CHECK_SYS_DO(ret, /* continue */); - } - - /* Mark the error */ - if (ret <= 0) - fd_cnx_markerror(conn); - - return ret; + struct iovec iov; + iov.iov_base = (void *)buffer; + iov.iov_len = length; + return fd_cnx_s_sendv(conn, &iov, 1); } +#endif /* GNUTLS_VERSION_212 */ #define ALIGNOF(t) ((char *)(&((struct { char c; t _h; } *)0)->_h) - (char *)0) /* Could use __alignof__(t) on some systems but this is more portable probably */ #define PMDL_PADDED(len) ( ((len) + ALIGNOF(struct fd_msg_pmdl) - 1) & ~(ALIGNOF(struct fd_msg_pmdl) - 1) ) @@ -807,6 +802,9 @@ } received += ret; + + if (header[0] != DIAMETER_VERSION) + break; /* No need to wait for 4 bytes in this case */ } while (received < sizeof(header)); rcv_data.length = ((size_t)header[1] << 16) + ((size_t)header[2] << 8) + (size_t)header[3]; @@ -842,8 +840,7 @@ CHECK_FCT_DO( fd_event_send( fd_cnx_target_queue(conn), FDEVP_CNX_MSG_RECV, rcv_data.length, rcv_data.buffer), { free_rcvdata(&rcv_data); - CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); - return NULL; + goto fatal; } ); } while (conn->cc_loop); @@ -854,7 +851,7 @@ fatal: /* An unrecoverable error occurred, stop the daemon */ - CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); + CHECK_FCT_DO(fd_core_shutdown(), ); goto out; } @@ -907,7 +904,7 @@ fatal: /* An unrecoverable error occurred, stop the daemon */ - CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); + CHECK_FCT_DO(fd_core_shutdown(), ); goto out; } #endif /* DISABLE_SCTP */ @@ -1001,9 +998,12 @@ static ssize_t fd_tls_send_handle_error(struct cnxctx * conn, gnutls_session_t session, void * data, size_t sz) { ssize_t ret; + struct timespec ts, now; + CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &ts), return -1 ); again: CHECK_GNUTLS_DO( ret = gnutls_record_send(session, data, sz), { + pthread_testcancel(); switch (ret) { case GNUTLS_E_REHANDSHAKE: if (!fd_cnx_teststate(conn, CC_STATUS_CLOSING)) { @@ -1018,9 +1018,12 @@ case GNUTLS_E_AGAIN: case GNUTLS_E_INTERRUPTED: - if (!fd_cnx_teststate(conn, CC_STATUS_CLOSING)) + CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), return -1 ); + if ( ((now.tv_sec - ts.tv_sec) * 1000 + ((now.tv_nsec - ts.tv_nsec) / 1000000L)) > MAX_HOTL_BLOCKING_TIME) { + LOG_D("Unable to send any data for %dms, closing the connection", MAX_HOTL_BLOCKING_TIME); + } else if (! fd_cnx_teststate(conn, CC_STATUS_CLOSING )) { goto again; - TRACE_DEBUG(INFO, "Connection is closing, so abord gnutls_record_send now."); + } break; default: @@ -1098,7 +1101,7 @@ CHECK_FCT_DO( ret = fd_event_send( fd_cnx_target_queue(conn), FDEVP_CNX_MSG_RECV, rcv_data.length, rcv_data.buffer), { free_rcvdata(&rcv_data); - CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); + CHECK_FCT_DO(fd_core_shutdown(), ); return ret; } ); @@ -1633,9 +1636,11 @@ return 0; } +#ifndef DISABLE_SCTP static int fd_cnx_uses_dtls(struct cnxctx * conn) { return fd_cnx_may_dtls(conn) && (fd_cnx_teststate(conn, CC_STATUS_TLS)); } +#endif /* DISABLE_SCTP */ /* TLS handshake a connection; no need to have called start_clear before. Reception is active if handhsake is successful */ int fd_cnx_handshake(struct cnxctx * conn, int mode, int algo, char * priority, void * alt_creds) diff -r e1ced4db7f67 -r 76ac4bb75f0e libfdcore/cnxctx.h --- a/libfdcore/cnxctx.h Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdcore/cnxctx.h Mon Jun 17 10:11:57 2013 +0800 @@ -38,6 +38,9 @@ #ifndef _CNXCTX_H #define _CNXCTX_H +/* Maximum time we allow a connection to be blocked because of head-of-the-line buffers. After this delay, connection is considered in error. */ +#define MAX_HOTL_BLOCKING_TIME 1000 /* ms */ + /* The connection context structure */ struct cnxctx { char cc_id[60]; /* The name of this connection. the first 5 chars are reserved for flags display (cc_state). */ diff -r e1ced4db7f67 -r 76ac4bb75f0e libfdcore/config.c --- a/libfdcore/config.c Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdcore/config.c Mon Jun 17 10:11:57 2013 +0800 @@ -58,6 +58,7 @@ fd_g_config->cnf_port = DIAMETER_PORT; fd_g_config->cnf_port_tls = DIAMETER_SECURE_PORT; fd_g_config->cnf_sctp_str = 30; + fd_g_config->cnf_thr_srv = 5; fd_g_config->cnf_dispthr = 4; fd_list_init(&fd_g_config->cnf_endpoints, NULL); fd_list_init(&fd_g_config->cnf_apps, NULL); @@ -97,7 +98,8 @@ CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Local SCTP TLS port .... : %hu\n", fd_g_config->cnf_port_3436), return NULL); } CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Number of SCTP streams . : %hu\n", fd_g_config->cnf_sctp_str), return NULL); - CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Number of server threads : %hu\n", fd_g_config->cnf_dispthr), return NULL); + CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Number of clients thr .. : %d\n", fd_g_config->cnf_thr_srv), return NULL); + CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Number of app threads .. : %hu\n", fd_g_config->cnf_dispthr), return NULL); if (FD_IS_LIST_EMPTY(&fd_g_config->cnf_endpoints)) { CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Local endpoints ........ : Default (use all available)\n"), return NULL); } else { diff -r e1ced4db7f67 -r 76ac4bb75f0e libfdcore/core.c --- a/libfdcore/core.c Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdcore/core.c Mon Jun 17 10:11:57 2013 +0800 @@ -148,7 +148,7 @@ } break; - case FDEV_TERMINATE: + case FDEV_TERMINATE_INT: goto end; default: @@ -326,6 +326,8 @@ { enum core_state cur_state = core_state_get(); + LOG_F("Initiating freeDiameter shutdown sequence (%d)", cur_state); + if (cur_state < CORE_RUNNING) { /* Calling application must make sure the initialization is not ongoing in a separate thread... */ if (pthread_mutex_lock(&core_lock) != 0) { @@ -338,7 +340,7 @@ pthread_mutex_unlock(&core_lock); } else if (cur_state == CORE_RUNNING) { core_state_set(CORE_SHUTDOWN); - CHECK_FCT( fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL) ); + CHECK_FCT( fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE_INT, 0, NULL) ); } /* Other case, the framework is already shutting down */ diff -r e1ced4db7f67 -r 76ac4bb75f0e libfdcore/dict_base_proto.c --- a/libfdcore/dict_base_proto.c Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdcore/dict_base_proto.c Mon Jun 17 10:11:57 2013 +0800 @@ -890,7 +890,14 @@ }; struct local_rules_definition rules[] = - { { "Vendor-Id", RULE_REQUIRED, -1, 1 } + { +#ifndef WORKAROUND_ACCEPT_INVALID_VSAI + /* ABNF from RFC6733 */ + { "Vendor-Id", RULE_REQUIRED, -1, 1 } +#else /* WORKAROUND_ACCEPT_INVALID_VSAI */ + /* ABNF from RFC3588 (including erratum, because original text is nonsense) */ + { "Vendor-Id", RULE_REQUIRED, -1, -1} +#endif /* WORKAROUND_ACCEPT_INVALID_VSAI */ ,{ "Auth-Application-Id", RULE_OPTIONAL, -1, 1 } ,{ "Acct-Application-Id", RULE_OPTIONAL, -1, 1 } }; diff -r e1ced4db7f67 -r 76ac4bb75f0e libfdcore/events.c --- a/libfdcore/events.c Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdcore/events.c Mon Jun 17 10:11:57 2013 +0800 @@ -104,7 +104,7 @@ switch (event) { #define case_str( _val )\ case _val : return #_val - case_str(FDEV_TERMINATE); + case_str(FDEV_TERMINATE_INT); case_str(FDEV_TRIGGER); default: diff -r e1ced4db7f67 -r 76ac4bb75f0e libfdcore/fdcore-internal.h --- a/libfdcore/fdcore-internal.h Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdcore/fdcore-internal.h Mon Jun 17 10:11:57 2013 +0800 @@ -183,6 +183,7 @@ /* Sent requests (for fallback), list of struct sentreq ordered by hbh */ struct sr_list p_sr; + struct fifo *p_tofailover; /* Pending received requests not yet answered (count only) */ long p_reqin_count; /* We use p_state_mtx to protect this value */ diff -r e1ced4db7f67 -r 76ac4bb75f0e libfdcore/fdd.l --- a/libfdcore/fdd.l Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdcore/fdd.l Mon Jun 17 10:11:57 2013 +0800 @@ -255,6 +255,7 @@ (?i:"SCTP_streams") { return SCTPSTREAMS; } (?i:"AppServThreads") { return APPSERVTHREADS;} (?i:"ListenOn") { return LISTENON; } +(?i:"ThreadsPerServer") { return THRPERSRV; } (?i:"TcTimer") { return TCTIMER; } (?i:"TwTimer") { return TWTIMER; } (?i:"NoRelay") { return NORELAY; } diff -r e1ced4db7f67 -r 76ac4bb75f0e libfdcore/fdd.y --- a/libfdcore/fdd.y Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdcore/fdd.y Mon Jun 17 10:11:57 2013 +0800 @@ -108,6 +108,7 @@ %token SCTPSTREAMS %token APPSERVTHREADS %token LISTENON +%token THRPERSRV %token TCTIMER %token TWTIMER %token NORELAY @@ -136,6 +137,7 @@ | conffile sec3436 | conffile sctpstreams | conffile listenon + | conffile thrpersrv | conffile norelay | conffile appservthreads | conffile noip @@ -238,6 +240,14 @@ } ; +thrpersrv: THRPERSRV '=' INTEGER ';' + { + CHECK_PARAMS_DO( ($3 > 0), + { yyerror (&yylloc, conf, "Invalid value"); YYERROR; } ); + conf->cnf_thr_srv = $3; + } + ; + norelay: NORELAY ';' { conf->cnf_flags.no_fwd = 1; diff -r e1ced4db7f67 -r 76ac4bb75f0e libfdcore/hooks.c --- a/libfdcore/hooks.c Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdcore/hooks.c Mon Jun 17 10:11:57 2013 +0800 @@ -55,7 +55,7 @@ /* Now a hook registered by an extension */ struct fd_hook_hdl { - struct fd_list chain[HOOK_PEER_LAST+1]; + struct fd_list chain[HOOK_LAST+1]; void (*fd_hook_cb)(enum fd_hook_type type, struct msg * msg, struct peer_hdr * peer, void * other, struct fd_hook_permsgdata *pmd, void * regdata); void *regdata; struct fd_hook_data_hdl *data_hdl; @@ -65,13 +65,13 @@ struct { struct fd_list sentinel; pthread_rwlock_t rwlock; -} HS_array[HOOK_PEER_LAST+1]; +} HS_array[HOOK_LAST+1]; /* Initialize the array of sentinels for the hooks */ int fd_hooks_init(void) { int i; - for (i=0; i <= HOOK_PEER_LAST; i++) { + for (i=0; i <= HOOK_LAST; i++) { fd_list_init(&HS_array[i].sentinel, NULL); CHECK_POSIX( pthread_rwlock_init(&HS_array[i].rwlock, NULL) ); } @@ -128,7 +128,7 @@ newhdl->regdata = regdata; newhdl->data_hdl = data_hdl; - for (i=0; i <= HOOK_PEER_LAST; i++) { + for (i=0; i <= HOOK_LAST; i++) { fd_list_init(&newhdl->chain[i], newhdl); if (type_mask & (1<chain[i])) { CHECK_POSIX( pthread_rwlock_wrlock(&HS_array[i].rwlock) ); fd_list_unlink(&handler->chain[i]); @@ -271,7 +271,7 @@ va_start(ap, dummy); while ((next = va_arg(ap, int)) >= 0) { - if (next > HOOK_PEER_LAST) + if (next > HOOK_LAST) break; /* invalid parameter */ ret |= (1<"; - - if (!id) - id = (DiamId_t)""; - CHECK_MALLOC_DO(fd_msg_dump_treeview(&buf, &len, NULL, msg, NULL, 0, 1), break); - LOG_E("Routing error: '%s' for the following message:", (char *)other); LOG_SPLIT(FD_LOG_ERROR, " ", buf?:"", NULL); break; diff -r e1ced4db7f67 -r 76ac4bb75f0e libfdcore/p_ce.c --- a/libfdcore/p_ce.c Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdcore/p_ce.c Mon Jun 17 10:11:57 2013 +0800 @@ -469,8 +469,10 @@ } switch (inhdr->avp_code) { case AC_VENDOR_ID: /* Vendor-Id */ +#ifndef WORKAROUND_ACCEPT_INVALID_VSAI if (vid != 0) - invalid++; /* We already had one such AVP */ + invalid++; /* We already had one such AVP. This is invalid according to RFC6733 but not RFC3588 (but there is an erratum) */ +#endif /* WORKAROUND_ACCEPT_INVALID_VSAI */ vid = inhdr->avp_value->u32; break; case AC_AUTH_APPLICATION_ID: /* Auth-Application-Id */ diff -r e1ced4db7f67 -r 76ac4bb75f0e libfdcore/p_cnx.c --- a/libfdcore/p_cnx.c Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdcore/p_cnx.c Mon Jun 17 10:11:57 2013 +0800 @@ -77,6 +77,7 @@ uint16_t port_no; /* network order */ int dotls_immediate; + int count = 0; TRACE_ENTRY("%p", peer); @@ -120,6 +121,10 @@ AF_INET)); } + /* We don't use the alternate addresses that were sent by the remote peer */ + CHECK_FCT( fd_ep_clearflags(&peer->p_hdr.info.pi_endpoints, EP_FL_ADV) ); + + /* Now check we have at least one address to attempt */ if (FD_IS_LIST_EMPTY(&peer->p_hdr.info.pi_endpoints)) { TRACE_DEBUG(INFO, "No address %savailable to connect to peer '%s', aborting", @@ -183,6 +188,7 @@ } else { fd_list_insert_before(&peer->p_connparams, &new->chain); } + count++; } } @@ -205,9 +211,12 @@ } else { fd_list_insert_after(&peer->p_connparams, &new->chain); /* very first position */ } + count++; } #endif /* DISABLE_SCTP */ + LOG_D("Prepared %d sets of connection parameters to peer %s", count, peer->p_hdr.info.pi_diamid); + return 0; } @@ -282,7 +291,7 @@ /* Handshake if needed (secure port) */ if (nc->dotls) { CHECK_FCT_DO( fd_cnx_handshake(cnx, GNUTLS_CLIENT, - (peer->p_hdr.info.config.pic_flags.sctpsec == PI_SCTPSEC_3436) ? ALGO_HANDSHAKE_3436 : ALGO_HANDSHAKE_DEFAULT, + ALGO_HANDSHAKE_3436, peer->p_hdr.info.config.pic_priority, NULL), { /* Handshake failed ... */ @@ -313,7 +322,7 @@ fd_cnx_destroy(cnx); /* Generate a termination event */ - CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); + CHECK_FCT_DO(fd_core_shutdown(), ); } return NULL; diff -r e1ced4db7f67 -r 76ac4bb75f0e libfdcore/p_expiry.c --- a/libfdcore/p_expiry.c Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdcore/p_expiry.c Mon Jun 17 10:11:57 2013 +0800 @@ -86,7 +86,7 @@ error: TRACE_DEBUG(INFO, "An error occurred in peers module! GC thread is terminating..."); ASSERT(0); - CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); + CHECK_FCT_DO(fd_core_shutdown(), ); return NULL; } @@ -138,7 +138,7 @@ pthread_cleanup_pop( 1 ); TRACE_DEBUG(INFO, "An error occurred in peers module! Expiry thread is terminating..."); - CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); + CHECK_FCT_DO(fd_core_shutdown(), ); return NULL; } diff -r e1ced4db7f67 -r 76ac4bb75f0e libfdcore/p_out.c --- a/libfdcore/p_out.c Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdcore/p_out.c Mon Jun 17 10:11:57 2013 +0800 @@ -74,8 +74,13 @@ /* Log the message */ fd_hook_call(HOOK_MESSAGE_SENT, cpy_for_logs_only, peer, NULL, fd_msg_pmdl_get(cpy_for_logs_only)); + pthread_cleanup_push((void *)fd_msg_free, *msg /* might be NULL, no problem */); + /* Send the message */ CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz), ); + + pthread_cleanup_pop(0); + out: ; pthread_cleanup_pop(1); @@ -92,20 +97,12 @@ return 0; } -static void cleanup_requeue(void * arg) -{ - struct msg *msg = arg; - CHECK_FCT_DO(fd_fifo_post(fd_g_outgoing, &msg), - { - fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, "An error occurred while attempting to requeue this message during cancellation of the sending function", fd_msg_pmdl_get(msg)); - CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */); - } ); -} - /* The code of the "out" thread */ static void * out_thr(void * arg) { struct fd_peer * peer = arg; + int stop = 0; + struct msg * msg; ASSERT( CHECK_PEER(peer) ); /* Set the thread name */ @@ -116,16 +113,12 @@ } /* Loop until cancelation */ - while (1) { - struct msg * msg; + while (!stop) { int ret; /* Retrieve next message to send */ CHECK_FCT_DO( fd_fifo_get(peer->p_tosend, &msg), goto error ); - /* Now if we are cancelled, we requeue this message */ - pthread_cleanup_push(cleanup_requeue, msg); - /* Send the message, log any error */ CHECK_FCT_DO( ret = do_send(&msg, peer->p_cnxctx, &peer->p_hbh, peer), { @@ -135,12 +128,30 @@ fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, buf, fd_msg_pmdl_get(msg)); fd_msg_free(msg); } + stop = 1; } ); - /* Loop */ - pthread_cleanup_pop(0); } + /* If we're here it means there was an error on the socket. We need to continue to purge the fifo & until we are canceled */ + CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ ); + + /* Requeue all routable messages in the global "out" queue, until we are canceled once the PSM deals with the CNX_ERROR sent above */ + while ( fd_fifo_get(peer->p_tosend, &msg) == 0 ) { + if (fd_msg_is_routable(msg)) { + CHECK_FCT_DO(fd_fifo_post_noblock(peer->p_tofailover, (void *)&msg), + { + /* fallback: destroy the message */ + fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, "Internal error: unable to requeue this message during failover process", fd_msg_pmdl_get(msg)); + CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */) + } ); + } else { + /* Just free it */ + /* fd_hook_call(HOOK_MESSAGE_DROPPED, m, NULL, "Non-routable message freed during handover", fd_msg_pmdl_get(m)); */ + CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */) + } + } + error: /* It is not really a connection error, but the effect is the same, we are not able to send anymore message */ CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ ); diff -r e1ced4db7f67 -r 76ac4bb75f0e libfdcore/p_psm.c --- a/libfdcore/p_psm.c Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdcore/p_psm.c Mon Jun 17 10:11:57 2013 +0800 @@ -813,7 +813,6 @@ break; case STATE_WAITCNXACK: - LOG_D("%s: Connection attempt failed", peer->p_hdr.info.pi_diamid); /* Go back to CLOSE */ fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_tctimer ?: fd_g_config->cnf_timer_tc); goto psm_reset; diff -r e1ced4db7f67 -r 76ac4bb75f0e libfdcore/p_sr.c --- a/libfdcore/p_sr.c Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdcore/p_sr.c Mon Jun 17 10:11:57 2013 +0800 @@ -37,9 +37,9 @@ /* Structure to store a sent request */ struct sentreq { - struct fd_list chain; /* the "o" field points directly to the hop-by-hop of the request (uint32_t *) */ + struct fd_list chain; /* the "o" field points directly to the (new) hop-by-hop of the request (uint32_t *) */ struct msg *req; /* A request that was sent and not yet answered. */ - uint32_t prevhbh;/* The value to set in the hbh header when the message is retrieved */ + uint32_t prevhbh;/* The value to set back in the hbh header when the message is retrieved */ struct fd_list expire; /* the list of expiring requests */ struct timespec added_on; /* the time the request was added */ }; @@ -65,10 +65,7 @@ struct fd_list * li; struct timespec now; - if (!TRACE_BOOL(ANNOYING)) - return; - - fd_log_debug("%sSentReq list @%p:", text, srlist); + LOG_D("%sSentReq list @%p:", text, srlist); CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), ); @@ -76,7 +73,7 @@ struct sentreq * sr = (struct sentreq *)li; uint32_t * nexthbh = li->o; - fd_log_debug(" - Next req (hbh:%x): [since %ld.%06ld sec]", *nexthbh, + LOG_D(" - Next req (hbh:0x%x, prev:0x%x): [since %ld.%06ld sec]", *nexthbh, sr->prevhbh, (long)((now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_sec - sr->added_on.tv_sec) : (now.tv_sec - sr->added_on.tv_sec - 1)), (long)((now.tv_nsec >= sr->added_on.tv_nsec) ? ((now.tv_nsec - sr->added_on.tv_nsec) / 1000) : ((now.tv_nsec - sr->added_on.tv_nsec + 1000000000) / 1000))); } @@ -224,8 +221,9 @@ CHECK_POSIX( pthread_mutex_lock(&srlist->mtx) ); next = find_or_next(&srlist->srs, *hbhloc, &match); if (match) { - TRACE_DEBUG(INFO, "A request with the same hop-by-hop Id was already sent: error"); + TRACE_DEBUG(INFO, "A request with the same hop-by-hop Id (0x%x) was already sent: error", *hbhloc); free(sr); + srl_dump("Current list of SR: ", &srlist->srs); CHECK_POSIX_DO( pthread_mutex_unlock(&srlist->mtx), /* ignore */ ); return EINVAL; } @@ -234,7 +232,6 @@ *req = NULL; fd_list_insert_before(next, &sr->chain); srlist->cnt++; - srl_dump("Saved new request, ", &srlist->srs); /* In case of request with a timeout, also store in the timeout list */ ts = fd_msg_anscb_gettimeout( sr->req ); @@ -279,10 +276,10 @@ /* Search the request in the list */ CHECK_POSIX( pthread_mutex_lock(&srlist->mtx) ); - srl_dump("Fetching a request, ", &srlist->srs); sr = (struct sentreq *)find_or_next(&srlist->srs, hbh, &match); if (!match) { TRACE_DEBUG(INFO, "There is no saved request with this hop-by-hop id (%x)", hbh); + srl_dump("Current list of SR: ", &srlist->srs); *req = NULL; } else { /* Restore hop-by-hop id */ @@ -326,7 +323,7 @@ fd_hook_call(HOOK_MESSAGE_FAILOVER, sr->req, (struct fd_peer *)srlist->srs.o, NULL, fd_msg_pmdl_get(sr->req)); /* Requeue for sending to another peer */ - CHECK_FCT_DO( ret = fd_fifo_post(fd_g_outgoing, &sr->req), + CHECK_FCT_DO( ret = fd_fifo_post_noblock(fd_g_outgoing, (void *)&sr->req), { char buf[256]; snprintf(buf, sizeof(buf), "Internal error: error while requeuing during failover: %s", strerror(ret)); diff -r e1ced4db7f67 -r 76ac4bb75f0e libfdcore/peers.c --- a/libfdcore/peers.c Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdcore/peers.c Mon Jun 17 10:11:57 2013 +0800 @@ -77,6 +77,7 @@ fd_list_init(&p->p_actives, p); fd_list_init(&p->p_expiry, p); CHECK_FCT( fd_fifo_new(&p->p_tosend, 5) ); + CHECK_FCT( fd_fifo_new(&p->p_tofailover, 0) ); p->p_hbh = lrand48(); fd_list_init(&p->p_sr.srs, p); @@ -232,7 +233,7 @@ free(__li); \ } -/* Empty the lists of p_tosend and p_sentreq messages */ +/* Empty the lists of p_tosend, p_failover, and p_sentreq messages */ void fd_peer_failover_msg(struct fd_peer * peer) { struct msg *m; @@ -241,8 +242,26 @@ /* Requeue all messages in the "out" queue */ while ( fd_fifo_tryget(peer->p_tosend, &m) == 0 ) { + /* but only if they are routable */ + if (fd_msg_is_routable(m)) { + fd_hook_call(HOOK_MESSAGE_FAILOVER, m, peer, NULL, fd_msg_pmdl_get(m)); + CHECK_FCT_DO(fd_fifo_post_noblock(fd_g_outgoing, (void *)&m), + { + /* fallback: destroy the message */ + fd_hook_call(HOOK_MESSAGE_DROPPED, m, NULL, "Internal error: unable to requeue this message during failover process", fd_msg_pmdl_get(m)); + CHECK_FCT_DO(fd_msg_free(m), /* What can we do more? */) + } ); + } else { + /* Just free it */ + /* fd_hook_call(HOOK_MESSAGE_DROPPED, m, NULL, "Non-routable message freed during handover", fd_msg_pmdl_get(m)); */ + CHECK_FCT_DO(fd_msg_free(m), /* What can we do more? */) + } + } + + /* Requeue all messages in the "failover" queue */ + while ( fd_fifo_tryget(peer->p_tofailover, &m) == 0 ) { fd_hook_call(HOOK_MESSAGE_FAILOVER, m, peer, NULL, fd_msg_pmdl_get(m)); - CHECK_FCT_DO(fd_fifo_post(fd_g_outgoing, &m), + CHECK_FCT_DO(fd_fifo_post_noblock(fd_g_outgoing, (void *)&m), { /* fallback: destroy the message */ fd_hook_call(HOOK_MESSAGE_DROPPED, m, NULL, "Internal error: unable to requeue this message during failover process", fd_msg_pmdl_get(m)); @@ -327,6 +346,7 @@ fd_list_unlink(&p->p_actives); CHECK_FCT_DO( fd_fifo_del(&p->p_tosend), /* continue */ ); + CHECK_FCT_DO( fd_fifo_del(&p->p_tofailover), /* continue */ ); CHECK_POSIX_DO( pthread_mutex_destroy(&p->p_state_mtx), /* continue */); CHECK_POSIX_DO( pthread_mutex_destroy(&p->p_sr.mtx), /* continue */); CHECK_POSIX_DO( pthread_cond_destroy(&p->p_sr.cnd), /* continue */); diff -r e1ced4db7f67 -r 76ac4bb75f0e libfdcore/routing_dispatch.c --- a/libfdcore/routing_dispatch.c Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdcore/routing_dispatch.c Mon Jun 17 10:11:57 2013 +0800 @@ -1114,7 +1114,7 @@ fatal_error: TRACE_DEBUG(INFO, "An unrecoverable error occurred, %s thread is terminating...", action_name); - CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); + CHECK_FCT_DO(fd_core_shutdown(), ); end: ; /* noop so that we get rid of "label at end of compund statement" warning */ diff -r e1ced4db7f67 -r 76ac4bb75f0e libfdcore/sctp.c --- a/libfdcore/sctp.c Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdcore/sctp.c Mon Jun 17 10:11:57 2013 +0800 @@ -250,7 +250,7 @@ fd_log_debug( "Def SCTP_NODELAY value : %s", nodelay ? "true" : "false"); } - nodelay = 1; /* We turn ON the Nagle algorithm (probably the default already), since we might have several messages to send through the same proxy (not the same session). */ + nodelay = 1; /* We turn ON to disable the Nagle algorithm, so that packets are sent ASAP. */ /* Set the option to the socket */ CHECK_SYS( setsockopt(sk, IPPROTO_SCTP, SCTP_NODELAY, &nodelay, sizeof(nodelay)) ); @@ -935,18 +935,9 @@ pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); if (ret < 0) { - int lvl; - switch (ret = errno) { - case ECONNREFUSED: - - /* "Normal" errors */ - lvl = FULL; - break; - default: - lvl = INFO; - } + ret = errno; /* Some errors are expected, we log at different level */ - TRACE_DEBUG( lvl, "sctp_connectx returned an error: %s", strerror(ret)); + LOG_A("sctp_connectx returned an error: %s", strerror(ret)); goto out; } @@ -1092,10 +1083,11 @@ uint8_t anci[CMSG_SPACE(sizeof(struct sctp_sndinfo))]; #endif /* OLD_SCTP_SOCKET_API */ ssize_t ret; - int timedout = 0; + struct timespec ts, now; TRACE_ENTRY("%p %hu %p %d", conn, strid, iov, iovcnt); CHECK_PARAMS_DO(conn && iov && iovcnt, { errno = EINVAL; return -1; } ); + CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &ts), return -1 ); memset(&mhdr, 0, sizeof(mhdr)); memset(&anci, 0, sizeof(anci)); @@ -1129,12 +1121,17 @@ /* Handle special case of timeout */ if ((ret < 0) && ((errno == EAGAIN) || (errno == EINTR))) { pthread_testcancel(); - if (! fd_cnx_teststate(conn, CC_STATUS_CLOSING )) + /* Check how much time we were blocked for this sending. */ + CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), return -1 ); + if ( ((now.tv_sec - ts.tv_sec) * 1000 + ((now.tv_nsec - ts.tv_nsec) / 1000000L)) > MAX_HOTL_BLOCKING_TIME) { + LOG_D("Unable to send any data for %dms, closing the connection", MAX_HOTL_BLOCKING_TIME); + } else if (! fd_cnx_teststate(conn, CC_STATUS_CLOSING )) { goto again; /* don't care, just ignore */ - if (!timedout) { - timedout ++; /* allow for one timeout while closing */ - goto again; } + + /* propagate the error */ + errno = -ret; + ret = -1; } CHECK_SYS_DO( ret, ); /* for tracing error only */ diff -r e1ced4db7f67 -r 76ac4bb75f0e libfdcore/sctp3436.c --- a/libfdcore/sctp3436.c Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdcore/sctp3436.c Mon Jun 17 10:11:57 2013 +0800 @@ -129,7 +129,7 @@ fatal: /* An unrecoverable error occurred, stop the daemon */ - CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); + CHECK_FCT_DO(fd_core_shutdown(), ); goto out; } diff -r e1ced4db7f67 -r 76ac4bb75f0e libfdcore/server.c --- a/libfdcore/server.c Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdcore/server.c Mon Jun 17 10:11:57 2013 +0800 @@ -55,24 +55,18 @@ int proto; /* IPPROTO_TCP or IPPROTO_SCTP */ int secur; /* TLS is started immediatly after connection ? 0: no; 1: yes (TLS/TCP or DTLS/SCTP); 2: yes (TLS/TCP or TLS/SCTP) */ - pthread_t thr; /* The thread listening for new connections */ + pthread_t thr; /* The thread waiting for new connections (will store the data in the clients fifo) */ enum s_state state; /* state of the thread */ - struct fd_list clients; /* List of clients connected to this server, not yet identified */ - pthread_mutex_t clients_mtx; /* Mutex to protect the list of clients */ + struct fifo *pending; /* FIFO of struct cnxctx */ + struct pool_workers { + struct server * s; /* pointer to the parent server structure */ + int id; /* The worker id for logs */ + pthread_t worker; /* The thread */ + } *workers; /* array of cnf_thr_srv items */ }; -/* Client information (connecting peer for which we don't have the CER yet) */ -struct client { - struct fd_list chain; /* link in the server's list of clients */ - struct cnxctx *conn; /* Parameters of the connection */ - struct timespec ts; /* Deadline for receiving CER (after INCNX_TIMEOUT) */ - pthread_t thr; /* connection state machine */ -}; - - - /* Micro functions to read/change the status thread-safely */ static pthread_mutex_t s_lock = PTHREAD_MUTEX_INITIALIZER; static enum s_state get_status(struct server * s) @@ -91,11 +85,17 @@ } +/* dump one item of the server->pending fifo */ +static DECLARE_FD_DUMP_PROTOTYPE(dump_cnx, void * item) { + struct cnxctx * c = item; + CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " '%s'", fd_cnx_getid(c)), return NULL); + return *buf; +} /* Dump all servers information */ DECLARE_FD_DUMP_PROTOTYPE(fd_servers_dump, int details) { - struct fd_list * li, *cli; + struct fd_list * li; FD_DUMP_HANDLE_OFFSET(); @@ -112,13 +112,7 @@ ((st == TERMINATED) ? "Thread terminated" : "Thread status unknown"))), return NULL); /* Dump the client list of this server */ - CHECK_POSIX_DO( pthread_mutex_lock(&s->clients_mtx), ); - for (cli = s->clients.next; cli != &s->clients; cli = cli->next) { - struct client * c = (struct client *)cli; - char bufts[128]; - CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n {client}(@%p)'%s': to:%s", c, fd_cnx_getid(c->conn), fd_log_time(&c->ts, bufts, sizeof(bufts))), break); - } - CHECK_POSIX_DO( pthread_mutex_unlock(&s->clients_mtx), ); + CHECK_MALLOC_DO( fd_fifo_dump(FD_DUMP_STD_PARAMS, "pending connections", s->pending, dump_cnx), return NULL ); if (li->next != &FD_SERVERS) { CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n"), return NULL); @@ -133,72 +127,83 @@ } -/* The state machine to handle incoming connection before the remote peer is identified */ -static void * client_sm(void * arg) +/* The thread in the pool for handling new clients connecting to a server */ +static void * client_worker(void * arg) { - struct client * c = arg; - struct server * s = NULL; + struct pool_workers * pw = arg; + struct server * s = pw->s; + struct cnxctx * c = NULL; + int fatal = 0; + struct timespec ts; struct fd_cnx_rcvdata rcv_data; struct fd_msg_pmdl * pmdl = NULL; struct msg * msg = NULL; struct msg_hdr *hdr = NULL; struct fd_pei pei; - TRACE_ENTRY("%p", c); + TRACE_ENTRY("%p", arg); + /* Set the thread name */ + { + char buf[48]; + snprintf(buf, sizeof(buf), "Worker#%d[%s%s]", pw->id, IPPROTO_NAME(s->proto), s->secur?", Sec" : ""); + fd_log_threadname ( buf ); + } + + /* Loop until canceled / error */ +next_client: + LOG_A("Ready to process next incoming connection"); + memset(&rcv_data, 0, sizeof(rcv_data)); - CHECK_PARAMS_DO(c && c->conn && c->chain.head, goto fatal_error ); - - - s = c->chain.head->o; - - /* Name the current thread */ - fd_log_threadname ( fd_cnx_getid(c->conn) ); - + /* Get the next connection */ + CHECK_FCT_DO( fd_fifo_get( s->pending, &c ), { fatal = 1; goto cleanup; } ); + /* Handshake if we are a secure server port, or start clear otherwise */ if (s->secur) { - int ret = fd_cnx_handshake(c->conn, GNUTLS_SERVER, (s->secur == 1) ? ALGO_HANDSHAKE_DEFAULT : ALGO_HANDSHAKE_3436, NULL, NULL); + LOG_D("Starting handshake with %s", fd_cnx_getid(c)); + + int ret = fd_cnx_handshake(c, GNUTLS_SERVER, (s->secur == 1) ? ALGO_HANDSHAKE_DEFAULT : ALGO_HANDSHAKE_3436, NULL, NULL); if (ret != 0) { char buf[1024]; - snprintf(buf, sizeof(buf), "TLS handshake failed for client '%s', connection aborted.", fd_cnx_getid(c->conn)); - + snprintf(buf, sizeof(buf), "TLS handshake failed for connection '%s', connection closed.", fd_cnx_getid(c)); + fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL); - + goto cleanup; } } else { - CHECK_FCT_DO( fd_cnx_start_clear(c->conn, 0), goto cleanup ); + CHECK_FCT_DO( fd_cnx_start_clear(c, 0), goto cleanup ); } /* Set the timeout to receive the first message */ - CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &c->ts), goto fatal_error ); - c->ts.tv_sec += INCNX_TIMEOUT; + CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &ts), { fatal = 1; goto cleanup; } ); + ts.tv_sec += INCNX_TIMEOUT; /* Receive the first Diameter message on the connection -- cleanup in case of timeout */ - CHECK_FCT_DO( fd_cnx_receive(c->conn, &c->ts, &rcv_data.buffer, &rcv_data.length), + CHECK_FCT_DO( fd_cnx_receive(c, &ts, &rcv_data.buffer, &rcv_data.length), { char buf[1024]; switch (__ret__) { case ETIMEDOUT: - snprintf(buf, sizeof(buf), "Client '%s' did not send CER within %ds, connection aborted.", fd_cnx_getid(c->conn), INCNX_TIMEOUT); + snprintf(buf, sizeof(buf), "Client '%s' did not send CER within %ds, connection aborted.", fd_cnx_getid(c), INCNX_TIMEOUT); fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL); break; case ENOTCONN: - snprintf(buf, sizeof(buf), "Connection from '%s' in error before CER was received.", fd_cnx_getid(c->conn)); + snprintf(buf, sizeof(buf), "Connection from '%s' in error before CER was received.", fd_cnx_getid(c)); fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL); break; default: - snprintf(buf, sizeof(buf), "Connection from '%s': unspecified error, connection aborted.", fd_cnx_getid(c->conn)); + snprintf(buf, sizeof(buf), "Connection from '%s': unspecified error, connection aborted.", fd_cnx_getid(c)); fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL); } goto cleanup; } ); - TRACE_DEBUG(FULL, "Received %zdb from new client '%s'", rcv_data.length, fd_cnx_getid(c->conn)); + TRACE_DEBUG(FULL, "Received %zdb from new client '%s'", rcv_data.length, fd_cnx_getid(c)); pmdl = fd_msg_pmdl_get_inbuf(rcv_data.buffer, rcv_data.length); @@ -211,7 +216,7 @@ /* Log incoming message */ fd_hook_associate(msg, pmdl); - fd_hook_call(HOOK_MESSAGE_RECEIVED, msg, NULL, fd_cnx_getid(c->conn), fd_msg_pmdl_get(msg)); + fd_hook_call(HOOK_MESSAGE_RECEIVED, msg, NULL, fd_cnx_getid(c), fd_msg_pmdl_get(msg)); /* We expect a CER, it must parse with our dictionary and rules */ CHECK_FCT_DO( fd_msg_parse_rules( msg, fd_g_config->cnf_dict, &pei ), @@ -220,57 +225,52 @@ fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msg, NULL, pei.pei_message ?: pei.pei_errcode, fd_msg_pmdl_get(msg)); - snprintf(buf, sizeof(buf), "Error parsing CER from '%s', connection aborted.", fd_cnx_getid(c->conn)); + snprintf(buf, sizeof(buf), "Error parsing CER from '%s', connection aborted.", fd_cnx_getid(c)); fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL); goto cleanup; } ); /* Now check we received a CER */ - CHECK_FCT_DO( fd_msg_hdr ( msg, &hdr ), goto fatal_error ); + CHECK_FCT_DO( fd_msg_hdr ( msg, &hdr ), { fatal = 1; goto cleanup; } ); CHECK_PARAMS_DO( (hdr->msg_appl == 0) && (hdr->msg_flags & CMD_FLAG_REQUEST) && (hdr->msg_code == CC_CAPABILITIES_EXCHANGE), { /* Parsing failed -- trace details */ char buf[1024]; - snprintf(buf, sizeof(buf), "Expected CER from '%s', received a different message, connection aborted.", fd_cnx_getid(c->conn)); + snprintf(buf, sizeof(buf), "Expected CER from '%s', received a different message, connection aborted.", fd_cnx_getid(c)); fd_hook_call(HOOK_PEER_CONNECT_FAILED, msg, NULL, buf, NULL); goto cleanup; } ); - /* Finally, pass the information to the peers module which will handle it next */ - pthread_cleanup_push((void *)fd_cnx_destroy, c->conn); + /* Finally, pass the information to the peers module which will handle it in a separate thread */ + pthread_cleanup_push((void *)fd_cnx_destroy, c); pthread_cleanup_push((void *)fd_msg_free, msg); - CHECK_FCT_DO( fd_peer_handle_newCER( &msg, &c->conn ), ); + CHECK_FCT_DO( fd_peer_handle_newCER( &msg, &c ), ); pthread_cleanup_pop(0); pthread_cleanup_pop(0); - - /* The end, we cleanup the client structure */ + cleanup: - /* Unlink the client structure */ - CHECK_POSIX_DO( pthread_mutex_lock(&s->clients_mtx), goto fatal_error ); - fd_list_unlink( &c->chain ); - CHECK_POSIX_DO( pthread_mutex_unlock(&s->clients_mtx), goto fatal_error ); - /* Cleanup the parsed message if any */ if (msg) { CHECK_FCT_DO( fd_msg_free(msg), /* continue */); + msg = NULL; } - /* Destroy the connection object if present */ - if (c->conn) - fd_cnx_destroy(c->conn); + /* Close the connection if needed */ + if (c != NULL) { + fd_cnx_destroy(c); + c = NULL; + } /* Cleanup the received buffer if any */ free(rcv_data.buffer); - /* Detach the thread, cleanup the client structure */ - pthread_detach(pthread_self()); - free(c); + + if (!fatal) + goto next_client; + + LOG_E("Worker thread exiting."); return NULL; - -fatal_error: /* This has effect to terminate the daemon */ - CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); - return NULL; -} +} /* The thread managing a server */ static void * serv_th(void * arg) @@ -279,41 +279,31 @@ CHECK_PARAMS_DO(s, goto error); fd_log_threadname ( fd_cnx_getid(s->conn) ); + set_status(s, RUNNING); /* Accept incoming connections */ CHECK_FCT_DO( fd_cnx_serv_listen(s->conn), goto error ); do { - struct client * c = NULL; struct cnxctx * conn = NULL; /* Wait for a new client or cancel */ - CHECK_MALLOC_DO( conn = fd_cnx_serv_accept(s->conn), goto error ); - - /* Create a client structure */ - CHECK_MALLOC_DO( c = malloc(sizeof(struct client)), goto error ); - memset(c, 0, sizeof(struct client)); - fd_list_init(&c->chain, c); - c->conn = conn; + CHECK_MALLOC_DO( conn = fd_cnx_serv_accept(s->conn), break ); - /* Save the client in the list */ - CHECK_POSIX_DO( pthread_mutex_lock( &s->clients_mtx ), goto error ); - fd_list_insert_before(&s->clients, &c->chain); - CHECK_POSIX_DO( pthread_mutex_unlock( &s->clients_mtx ), goto error ); - - /* Start the client thread */ - CHECK_POSIX_DO( pthread_create( &c->thr, NULL, client_sm, c ), goto error ); + /* Store this connection in the fifo for processing by the worker pool. Will block when the fifo is full */ + pthread_cleanup_push((void *)fd_cnx_destroy, conn); + CHECK_FCT_DO( fd_fifo_post( s->pending, &conn ), break ); + pthread_cleanup_pop(0); } while (1); - error: if (s) set_status(s, TERMINATED); /* Send error signal to the core */ LOG_F( "An error occurred in server module! Thread is terminating..."); - CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); + CHECK_FCT_DO(fd_core_shutdown(), ); return NULL; } @@ -323,6 +313,7 @@ static struct server * new_serv( int proto, int secur ) { struct server * new; + int i; /* New server structure */ CHECK_MALLOC_DO( new = malloc(sizeof(struct server)), return NULL ); @@ -331,8 +322,16 @@ fd_list_init(&new->chain, new); new->proto = proto; new->secur = secur; - CHECK_POSIX_DO( pthread_mutex_init(&new->clients_mtx, NULL), return NULL ); - fd_list_init(&new->clients, new); + + CHECK_FCT_DO( fd_fifo_new(&new->pending, 5), return NULL); + CHECK_MALLOC_DO( new->workers = calloc( fd_g_config->cnf_thr_srv, sizeof(struct pool_workers) ), return NULL ); + + for (i = 0; i < fd_g_config->cnf_thr_srv; i++) { + /* Create the pool */ + new->workers[i].s = new; + new->workers[i].id = i; + CHECK_POSIX_DO( pthread_create( &new->workers[i].worker, NULL, client_worker, &new->workers[i]), return NULL ); + } return new; } @@ -478,9 +477,8 @@ /* Loop on all servers */ while (!FD_IS_LIST_EMPTY(&FD_SERVERS)) { struct server * s = (struct server *)(FD_SERVERS.next); - - /* Lock client list now */ - CHECK_FCT_DO( pthread_mutex_lock(&s->clients_mtx), /* continue anyway */); + int i; + struct cnxctx * c; /* cancel thread */ CHECK_FCT_DO( fd_thr_term(&s->thr), /* continue */); @@ -488,23 +486,18 @@ /* destroy server connection context */ fd_cnx_destroy(s->conn); - /* cancel and destroy all clients */ - while (!FD_IS_LIST_EMPTY(&s->clients)) { - struct client * c = (struct client *)(s->clients.next); - - /* Destroy client's thread */ - CHECK_FCT_DO( fd_thr_term(&c->thr), /* continue */); - - /* Destroy client's connection */ - fd_cnx_destroy(c->conn); - - /* Unlink and free the client */ - fd_list_unlink(&c->chain); - free(c); + /* cancel and destroy all worker threads */ + for (i = 0; i < fd_g_config->cnf_thr_srv; i++) { + /* Destroy worker thread */ + CHECK_FCT_DO( fd_thr_term(&s->workers[i].worker), /* continue */); } - /* Unlock & destroy */ - CHECK_FCT_DO( pthread_mutex_unlock(&s->clients_mtx), /* continue anyway */); - CHECK_FCT_DO( pthread_mutex_destroy(&s->clients_mtx), /* continue */); + free(s->workers); + + /* Close any pending connection */ + while ( fd_fifo_tryget( s->pending, &c ) == 0 ) { + fd_cnx_destroy(c); + } + CHECK_FCT_DO( fd_fifo_del(&s->pending), ); /* Now destroy the server object */ fd_list_unlink(&s->chain); diff -r e1ced4db7f67 -r 76ac4bb75f0e libfdcore/tcp.c --- a/libfdcore/tcp.c Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdcore/tcp.c Mon Jun 17 10:11:57 2013 +0800 @@ -123,7 +123,6 @@ { int ret = 0; int s; - char * buf = NULL; size_t len = 0; TRACE_ENTRY("%p %p %d", sock, sa, salen); CHECK_PARAMS( sock && (*sock <= 0) && sa && salen ); @@ -137,27 +136,14 @@ /* Cleanup if we are cancelled */ pthread_cleanup_push(fd_cleanup_socket, &s); - LOG_D( "Attempting TCP connection to %s...", fd_sa_dump(&buf, &len, NULL, sa, NI_NUMERICHOST | NI_NUMERICSERV)?:"" ); - free(buf); - /* Try connecting to the remote address */ ret = connect(s, sa, salen); pthread_cleanup_pop(0); if (ret < 0) { - int lvl; - switch (ret = errno) { - case ECONNREFUSED: - - /* "Normal" errors */ - lvl = FULL; - break; - default: - lvl = INFO; - } - /* Some errors are expected, we log at different level */ - TRACE_DEBUG( lvl, "connect returned an error: %s", strerror(ret)); + ret = errno; + LOG_A( "connect returned an error: %s", strerror(ret)); CHECK_SYS_DO( close(s), /* continue */ ); *sock = -1; return ret; diff -r e1ced4db7f67 -r 76ac4bb75f0e libfdproto/fifo.c --- a/libfdproto/fifo.c Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdproto/fifo.c Mon Jun 17 10:11:57 2013 +0800 @@ -376,24 +376,19 @@ /* Post a new item in the queue */ -int fd_fifo_post_int ( struct fifo * queue, void ** item ) +int fd_fifo_post_internal ( struct fifo * queue, void ** item, int skip_max ) { struct fifo_item * new; int call_cb = 0; struct timespec posted_on, queued_on; - TRACE_ENTRY( "%p %p", queue, item ); - - /* Check the parameters */ - CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item ); - /* Get the timing of this call */ CHECK_SYS( clock_gettime(CLOCK_REALTIME, &posted_on) ); /* lock the queue */ CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); - if (queue->max) { + if ((!skip_max) && (queue->max)) { while (queue->count >= queue->max) { int ret = 0; @@ -461,6 +456,30 @@ return 0; } +/* Post a new item in the queue */ +int fd_fifo_post_int ( struct fifo * queue, void ** item ) +{ + TRACE_ENTRY( "%p %p", queue, item ); + + /* Check the parameters */ + CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item ); + + return fd_fifo_post_internal ( queue,item, 0 ); + +} + +/* Post a new item in the queue, not blocking */ +int fd_fifo_post_noblock ( struct fifo * queue, void ** item ) +{ + TRACE_ENTRY( "%p %p", queue, item ); + + /* Check the parameters */ + CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item ); + + return fd_fifo_post_internal ( queue,item, 1 ); + +} + /* Pop the first item from the queue */ static void * mq_pop(struct fifo * queue) { diff -r e1ced4db7f67 -r 76ac4bb75f0e libfdproto/messages.c --- a/libfdproto/messages.c Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdproto/messages.c Mon Jun 17 10:11:57 2013 +0800 @@ -706,6 +706,9 @@ { TRACE_ENTRY("%p", object); + if (object == NULL) + return 0; + if (CHECK_MSG(object)) { if (_M(object)->msg_query) { _M(_M(object)->msg_query)->msg_associated = 0;