# HG changeset patch # User Thomas Klausner # Date 1573814437 -3600 # Node ID 239ba25870d88a58bcbe40683e509c1e0fe42947 # Parent 188c82b6690b64c4a29e35319a233652eabbe374 Allow parametrizing the number of threads for routing in/out. This is for high-load situations where freeDiameter was limited by the corresponding queues. diff -r 188c82b6690b -r 239ba25870d8 doc/freediameter.conf.sample --- a/doc/freediameter.conf.sample Fri Nov 15 11:38:30 2019 +0100 +++ b/doc/freediameter.conf.sample Fri Nov 15 11:40:37 2019 +0100 @@ -188,6 +188,14 @@ # Default: 4 #AppServThreads = 4; +# Number of server threads that can handle incoming message routing at the same time. +# Default: 1 +#RoutingInThreads = 1; + +# Number of server threads that can handle outgoing message routing at the same time. +# Default: 1 +#RoutingOutThreads= 1; + # Other applications are configured by loaded extensions. ############################################################## diff -r 188c82b6690b -r 239ba25870d8 include/freeDiameter/libfdcore.h --- a/include/freeDiameter/libfdcore.h Fri Nov 15 11:38:30 2019 +0100 +++ b/include/freeDiameter/libfdcore.h Fri Nov 15 11:40:37 2019 +0100 @@ -137,7 +137,12 @@ regex_t cnf_processing_peers_pattern_regex; /* Regex pattern for identifying processing peers */ struct fd_list cnf_apps; /* Applications locally supported (except relay, see flags). Use fd_disp_app_support to add one. list of struct fd_app. */ uint16_t cnf_dispthr; /* Number of dispatch threads to create */ + uint16_t cnf_rtinthr; /* Number of routing in threads to create */ + uint16_t cnf_rtoutthr; /* Number of routing out threads to create */ uint16_t cnf_rr_in_answers; /* include Route-Record AVP in answers */ + int cnf_qin_limit; /* limit for incoming queue*/ + int cnf_qout_limit; /* limit for outgoing queue */ + int cnf_qlocal_limit; /* limit for local queue */ struct { unsigned no_fwd : 1; /* the peer does not relay messages (0xffffff app id) */ unsigned no_ip4 : 1; /* disable IP */ diff -r 188c82b6690b -r 239ba25870d8 libfdcore/config.c --- a/libfdcore/config.c Fri Nov 15 11:38:30 2019 +0100 +++ b/libfdcore/config.c Fri Nov 15 11:40:37 2019 +0100 @@ -61,6 +61,11 @@ fd_g_config->cnf_thr_srv = 5; fd_g_config->cnf_processing_peers_minimum = 0; fd_g_config->cnf_dispthr = 4; + fd_g_config->cnf_rtinthr = 1; + fd_g_config->cnf_rtoutthr = 1; + fd_g_config->cnf_qin_limit = 20; + fd_g_config->cnf_qout_limit = 30; + fd_g_config->cnf_qlocal_limit = 25; fd_list_init(&fd_g_config->cnf_endpoints, NULL); fd_list_init(&fd_g_config->cnf_apps, NULL); #ifdef DISABLE_SCTP @@ -103,6 +108,11 @@ CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Number of clients thr .. : %d\n", fd_g_config->cnf_thr_srv), return NULL); CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Number of app threads .. : %hu\n", fd_g_config->cnf_dispthr), return NULL); CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Minimal processing peers : %hu\n", fd_g_config->cnf_processing_peers_minimum), return NULL); + CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Number of rtin threads . : %hu\n", fd_g_config->cnf_rtinthr), return NULL); + CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Number of rtout threads : %hu\n", fd_g_config->cnf_rtoutthr), return NULL); + CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Incoming queue limit : %hu\n", fd_g_config->cnf_qin_limit), return NULL); + CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Outgoing queue limit : %hu\n", fd_g_config->cnf_qout_limit), return NULL); + CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Local queue limit : %hu\n", fd_g_config->cnf_qlocal_limit), return NULL); if (FD_IS_LIST_EMPTY(&fd_g_config->cnf_endpoints)) { CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Local endpoints ........ : Default (use all available)\n"), return NULL); } else { diff -r 188c82b6690b -r 239ba25870d8 libfdcore/core.c --- a/libfdcore/core.c Fri Nov 15 11:38:30 2019 +0100 +++ b/libfdcore/core.c Fri Nov 15 11:40:37 2019 +0100 @@ -315,6 +315,8 @@ int fd_core_start(void) { int ret; + CHECK_FCT( fd_queues_init_after_conf() ); + CHECK_POSIX( pthread_mutex_lock(&core_lock) ); ret = fd_core_start_int(); CHECK_POSIX( pthread_mutex_unlock(&core_lock) ); diff -r 188c82b6690b -r 239ba25870d8 libfdcore/fdcore-internal.h --- a/libfdcore/fdcore-internal.h Fri Nov 15 11:38:30 2019 +0100 +++ b/libfdcore/fdcore-internal.h Fri Nov 15 11:40:37 2019 +0100 @@ -109,6 +109,7 @@ extern struct fifo * fd_g_local; /* messages to be handled to local extensions */ /* Message queues */ int fd_queues_init(void); +int fd_queues_init_after_conf(void); int fd_queues_fini(struct fifo ** queue); /* Trigged events */ diff -r 188c82b6690b -r 239ba25870d8 libfdcore/fdd.l --- a/libfdcore/fdd.l Fri Nov 15 11:38:30 2019 +0100 +++ b/libfdcore/fdd.l Fri Nov 15 11:40:37 2019 +0100 @@ -254,6 +254,11 @@ (?i:"TLS_old_method") { return OLDTLS; } (?i:"SCTP_streams") { return SCTPSTREAMS; } (?i:"AppServThreads") { return APPSERVTHREADS; } +(?i:"RoutingInThreads") { return ROUTINGINTHREADS; } +(?i:"RoutingOutThreads") { return ROUTINGOUTTHREADS; } +(?i:"IncomingQueueLimit") { return QINLIMIT; } +(?i:"OutgoingQueueLimit") { return QOUTLIMIT; } +(?i:"LocalQueueLimit") { return QLOCALLIMIT; } (?i:"ListenOn") { return LISTENON; } (?i:"ThreadsPerServer") { return THRPERSRV; } (?i:"ProcessingPeersPattern") { return PROCESSINGPEERSPATTERN; } @@ -271,6 +276,9 @@ (?i:"TLS_Prio") { return TLS_PRIO; } (?i:"TLS_DH_bits") { return TLS_DH_BITS; } (?i:"TLS_DH_file") { return TLS_DH_FILE; } +(?i:"RouteRecordInAnswers") { return RR_IN_ANSWERS; } +(?i:"Never") { return NEVER; } +(?i:"Always") { return ALWAYS; } /* Valid single characters for yyparse */ diff -r 188c82b6690b -r 239ba25870d8 libfdcore/fdd.y --- a/libfdcore/fdd.y Fri Nov 15 11:38:30 2019 +0100 +++ b/libfdcore/fdd.y Fri Nov 15 11:40:37 2019 +0100 @@ -107,6 +107,11 @@ %token NOTLS %token SCTPSTREAMS %token APPSERVTHREADS +%token ROUTINGINTHREADS +%token ROUTINGOUTTHREADS +%token QINLIMIT +%token QOUTLIMIT +%token QLOCALLIMIT %token LISTENON %token THRPERSRV %token PROCESSINGPEERSPATTERN @@ -147,6 +152,11 @@ | conffile processingpeersminimum | conffile norelay | conffile appservthreads + | conffile routinginthreads + | conffile routingoutthreads + | conffile qinlimit + | conffile qoutlimit + | conffile qlocallimit | conffile noip | conffile noip6 | conffile notcp @@ -309,6 +319,46 @@ } ; +routinginthreads: ROUTINGINTHREADS '=' INTEGER ';' + { + CHECK_PARAMS_DO( ($3 > 0) && ($3 < 256), + { yyerror (&yylloc, conf, "Invalid value"); YYERROR; } ); + conf->cnf_rtinthr = (uint16_t)$3; + } + ; + +routingoutthreads: ROUTINGOUTTHREADS '=' INTEGER ';' + { + CHECK_PARAMS_DO( ($3 > 0) && ($3 < 256), + { yyerror (&yylloc, conf, "Invalid value"); YYERROR; } ); + conf->cnf_rtoutthr = (uint16_t)$3; + } + ; + +qinlimit: QINLIMIT '=' INTEGER ';' + { + CHECK_PARAMS_DO( ($3 >= 0), + { yyerror (&yylloc, conf, "Invalid value"); YYERROR; } ); + conf->cnf_qin_limit = $3; + } + ; + +qoutlimit: QOUTLIMIT '=' INTEGER ';' + { + CHECK_PARAMS_DO( ($3 >= 0), + { yyerror (&yylloc, conf, "Invalid value"); YYERROR; } ); + conf->cnf_qout_limit = $3; + } + ; + +qlocallimit: QLOCALLIMIT '=' INTEGER ';' + { + CHECK_PARAMS_DO( ($3 >= 0), + { yyerror (&yylloc, conf, "Invalid value"); YYERROR; } ); + conf->cnf_qlocal_limit = $3; + } + ; + noip: NOIP ';' { if (got_peer_noipv6) { diff -r 188c82b6690b -r 239ba25870d8 libfdcore/queues.c --- a/libfdcore/queues.c Fri Nov 15 11:38:30 2019 +0100 +++ b/libfdcore/queues.c Fri Nov 15 11:40:37 2019 +0100 @@ -44,9 +44,19 @@ int fd_queues_init(void) { TRACE_ENTRY(); - CHECK_FCT( fd_fifo_new ( &fd_g_incoming, 20 ) ); - CHECK_FCT( fd_fifo_new ( &fd_g_outgoing, 30 ) ); - CHECK_FCT( fd_fifo_new ( &fd_g_local, 25 ) ); + CHECK_FCT( fd_fifo_new ( &fd_g_incoming, fd_g_config->cnf_qin_limit ) ); + CHECK_FCT( fd_fifo_new ( &fd_g_outgoing, fd_g_config->cnf_qout_limit ) ); + CHECK_FCT( fd_fifo_new ( &fd_g_local, fd_g_config->cnf_qlocal_limit ) ); + return 0; +} + +/* Resize according to values given in configuration file */ +int fd_queues_init_after_conf(void) +{ + TRACE_ENTRY(); + CHECK_FCT( fd_fifo_set_max ( fd_g_incoming, fd_g_config->cnf_qin_limit ) ); + CHECK_FCT( fd_fifo_set_max ( fd_g_outgoing, fd_g_config->cnf_qout_limit ) ); + CHECK_FCT( fd_fifo_set_max ( fd_g_local, fd_g_config->cnf_qlocal_limit ) ); return 0; } diff -r 188c82b6690b -r 239ba25870d8 libfdcore/routing_dispatch.c --- a/libfdcore/routing_dispatch.c Fri Nov 15 11:38:30 2019 +0100 +++ b/libfdcore/routing_dispatch.c Fri Nov 15 11:40:37 2019 +0100 @@ -35,6 +35,13 @@ #include "fdcore-internal.h" +#ifdef linux +/* This needs -D_USE_GNU, and since I have no idea what else that does, let's simply copy the declaration. */ + +/* Set thread name visible in the kernel and its interfaces. */ +extern int pthread_setname_np (pthread_t __target_thread, const char *__name); +#endif + /********************************************************************************/ /* First part : handling the extensions callbacks */ /********************************************************************************/ @@ -1152,28 +1159,44 @@ static pthread_t * dispatch = NULL; static enum thread_state * disp_state = NULL; -/* Later: make this more dynamic */ -static pthread_t rt_out = (pthread_t)NULL; -static enum thread_state out_state = NOTRUNNING; +static pthread_t * rt_out = NULL; +static enum thread_state * out_state = NULL; -static pthread_t rt_in = (pthread_t)NULL; -static enum thread_state in_state = NOTRUNNING; +static pthread_t * rt_in = NULL; +static enum thread_state * in_state = NULL; /* Initialize the routing and dispatch threads */ int fd_rtdisp_init(void) { int i; - /* Prepare the array for dispatch */ + /* Prepare the array for threads */ CHECK_MALLOC( disp_state = calloc(fd_g_config->cnf_dispthr, sizeof(enum thread_state)) ); CHECK_MALLOC( dispatch = calloc(fd_g_config->cnf_dispthr, sizeof(pthread_t)) ); + CHECK_MALLOC( out_state = calloc(fd_g_config->cnf_rtoutthr, sizeof(enum thread_state)) ); + CHECK_MALLOC( rt_out = calloc(fd_g_config->cnf_rtoutthr, sizeof(pthread_t)) ); + CHECK_MALLOC( in_state = calloc(fd_g_config->cnf_rtinthr, sizeof(enum thread_state)) ); + CHECK_MALLOC( rt_in = calloc(fd_g_config->cnf_rtinthr, sizeof(pthread_t)) ); /* Create the threads */ for (i=0; i < fd_g_config->cnf_dispthr; i++) { CHECK_POSIX( pthread_create( &dispatch[i], NULL, dispatch_thr, &disp_state[i] ) ); +#ifdef linux + pthread_setname_np(dispatch[i], "fd-dispatch"); +#endif } - CHECK_POSIX( pthread_create( &rt_out, NULL, routing_out_thr, &out_state) ); - CHECK_POSIX( pthread_create( &rt_in, NULL, routing_in_thr, &in_state) ); + for (i=0; i < fd_g_config->cnf_rtoutthr; i++) { + CHECK_POSIX( pthread_create( &rt_out[i], NULL, routing_out_thr, &out_state[i] ) ); +#ifdef linux + pthread_setname_np(rt_out[i], "fd-routing-out"); +#endif + } + for (i=0; i < fd_g_config->cnf_rtinthr; i++) { + CHECK_POSIX( pthread_create( &rt_in[i], NULL, routing_in_thr, &in_state[i] ) ); +#ifdef linux + pthread_setname_np(rt_in[i], "fd-routing-in"); +#endif + } /* Later: TODO("Set the thresholds for the queues to create more threads as needed"); */ @@ -1244,13 +1267,33 @@ CHECK_FCT_DO( fd_queues_fini(&fd_g_incoming), /* ignore */); /* Stop the routing IN thread */ - stop_thread_delayed(&in_state, &rt_in, "IN routing"); + if (rt_in != NULL) { + for (i=0; i < fd_g_config->cnf_rtinthr; i++) { + stop_thread_delayed(&in_state[i], &rt_in[i], "IN routing"); + } + free(rt_in); + rt_in = NULL; + } + if (in_state != NULL) { + free(in_state); + in_state = NULL; + } /* Destroy the outgoing queue */ CHECK_FCT_DO( fd_queues_fini(&fd_g_outgoing), /* ignore */); /* Stop the routing OUT thread */ - stop_thread_delayed(&out_state, &rt_out, "OUT routing"); + if (rt_out != NULL) { + for (i=0; i < fd_g_config->cnf_rtinthr; i++) { + stop_thread_delayed(&out_state[i], &rt_out[i], "OUT routing"); + } + free(rt_out); + rt_out = NULL; + } + if (out_state != NULL) { + free(out_state); + out_state = NULL; + } /* Destroy the local queue */ CHECK_FCT_DO( fd_queues_fini(&fd_g_local), /* ignore */);