changeset 1397:239ba25870d8

Allow parametrizing the number of threads for routing in/out. This is for high-load situations where freeDiameter was limited by the corresponding queues.
author Thomas Klausner <tk@giga.or.at>
date Fri, 15 Nov 2019 11:40:37 +0100
parents 188c82b6690b
children a52a1dbae99f
files doc/freediameter.conf.sample include/freeDiameter/libfdcore.h libfdcore/config.c libfdcore/core.c libfdcore/fdcore-internal.h libfdcore/fdd.l libfdcore/fdd.y libfdcore/queues.c libfdcore/routing_dispatch.c
diffstat 9 files changed, 150 insertions(+), 13 deletions(-) [+]
line wrap: on
line diff
--- a/doc/freediameter.conf.sample	Fri Nov 15 11:38:30 2019 +0100
+++ b/doc/freediameter.conf.sample	Fri Nov 15 11:40:37 2019 +0100
@@ -188,6 +188,14 @@
 # Default: 4
 #AppServThreads = 4;
 
+# Number of server threads that can handle incoming message routing at the same time.
+# Default: 1
+#RoutingInThreads = 1;
+
+# Number of server threads that can handle outgoing message routing at the same time.
+# Default: 1
+#RoutingOutThreads= 1;
+
 # Other applications are configured by loaded extensions.
 
 ##############################################################
--- a/include/freeDiameter/libfdcore.h	Fri Nov 15 11:38:30 2019 +0100
+++ b/include/freeDiameter/libfdcore.h	Fri Nov 15 11:40:37 2019 +0100
@@ -137,7 +137,12 @@
 	regex_t		 cnf_processing_peers_pattern_regex;	/* Regex pattern for identifying processing peers */
 	struct fd_list	 cnf_apps;	/* Applications locally supported (except relay, see flags). Use fd_disp_app_support to add one. list of struct fd_app. */
 	uint16_t	 cnf_dispthr;	/* Number of dispatch threads to create */
+	uint16_t     cnf_rtinthr;  /* Number of routing in threads to create */
+	uint16_t     cnf_rtoutthr;  /* Number of routing out threads to create */
 	uint16_t	 cnf_rr_in_answers;	/* include Route-Record AVP in answers */
+	int		 cnf_qin_limit;	/* limit for incoming queue*/
+	int		 cnf_qout_limit;	/* limit for outgoing queue */
+	int		 cnf_qlocal_limit;	/* limit for local queue */
 	struct {
 		unsigned no_fwd : 1;	/* the peer does not relay messages (0xffffff app id) */
 		unsigned no_ip4 : 1;	/* disable IP */
--- a/libfdcore/config.c	Fri Nov 15 11:38:30 2019 +0100
+++ b/libfdcore/config.c	Fri Nov 15 11:40:37 2019 +0100
@@ -61,6 +61,11 @@
 	fd_g_config->cnf_thr_srv  = 5;
 	fd_g_config->cnf_processing_peers_minimum = 0;
 	fd_g_config->cnf_dispthr  = 4;
+	fd_g_config->cnf_rtinthr = 1;
+	fd_g_config->cnf_rtoutthr = 1;
+	fd_g_config->cnf_qin_limit = 20;
+	fd_g_config->cnf_qout_limit = 30;
+	fd_g_config->cnf_qlocal_limit = 25;
 	fd_list_init(&fd_g_config->cnf_endpoints, NULL);
 	fd_list_init(&fd_g_config->cnf_apps, NULL);
 	#ifdef DISABLE_SCTP
@@ -103,6 +108,11 @@
 	CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "  Number of clients thr .. : %d\n", fd_g_config->cnf_thr_srv), return NULL);
 	CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "  Number of app threads .. : %hu\n", fd_g_config->cnf_dispthr), return NULL);
 	CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "  Minimal processing peers : %hu\n", fd_g_config->cnf_processing_peers_minimum), return NULL);
+	CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "  Number of rtin threads . : %hu\n", fd_g_config->cnf_rtinthr), return NULL);
+	CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "  Number of rtout threads  : %hu\n", fd_g_config->cnf_rtoutthr), return NULL);
+	CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "  Incoming queue limit     : %hu\n", fd_g_config->cnf_qin_limit), return NULL);
+	CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "  Outgoing queue limit     : %hu\n", fd_g_config->cnf_qout_limit), return NULL);
+	CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "  Local queue limit        : %hu\n", fd_g_config->cnf_qlocal_limit), return NULL);
 	if (FD_IS_LIST_EMPTY(&fd_g_config->cnf_endpoints)) {
 		CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "  Local endpoints ........ : Default (use all available)\n"), return NULL);
 	} else {
--- a/libfdcore/core.c	Fri Nov 15 11:38:30 2019 +0100
+++ b/libfdcore/core.c	Fri Nov 15 11:40:37 2019 +0100
@@ -315,6 +315,8 @@
 int fd_core_start(void)
 {
 	int ret;
+	CHECK_FCT( fd_queues_init_after_conf() );
+
 	CHECK_POSIX( pthread_mutex_lock(&core_lock) );
 	ret = fd_core_start_int();
 	CHECK_POSIX( pthread_mutex_unlock(&core_lock) );
--- a/libfdcore/fdcore-internal.h	Fri Nov 15 11:38:30 2019 +0100
+++ b/libfdcore/fdcore-internal.h	Fri Nov 15 11:40:37 2019 +0100
@@ -109,6 +109,7 @@
 extern struct fifo * fd_g_local; /* messages to be handled to local extensions */
 /* Message queues */
 int fd_queues_init(void);
+int fd_queues_init_after_conf(void);
 int fd_queues_fini(struct fifo ** queue);
 
 /* Trigged events */
--- a/libfdcore/fdd.l	Fri Nov 15 11:38:30 2019 +0100
+++ b/libfdcore/fdd.l	Fri Nov 15 11:40:37 2019 +0100
@@ -254,6 +254,11 @@
 (?i:"TLS_old_method")	{ return OLDTLS; }
 (?i:"SCTP_streams")	{ return SCTPSTREAMS; }
 (?i:"AppServThreads")	{ return APPSERVTHREADS; }
+(?i:"RoutingInThreads")	{ return ROUTINGINTHREADS; }
+(?i:"RoutingOutThreads")	{ return ROUTINGOUTTHREADS; }
+(?i:"IncomingQueueLimit")	{ return QINLIMIT; }
+(?i:"OutgoingQueueLimit")	{ return QOUTLIMIT; }
+(?i:"LocalQueueLimit")	{ return QLOCALLIMIT; }
 (?i:"ListenOn")		{ return LISTENON; }
 (?i:"ThreadsPerServer")	{ return THRPERSRV; }
 (?i:"ProcessingPeersPattern")	{ return PROCESSINGPEERSPATTERN; }
@@ -271,6 +276,9 @@
 (?i:"TLS_Prio")		{ return TLS_PRIO; }
 (?i:"TLS_DH_bits")	{ return TLS_DH_BITS; }
 (?i:"TLS_DH_file")	{ return TLS_DH_FILE; }
+(?i:"RouteRecordInAnswers")	{ return RR_IN_ANSWERS;	}
+(?i:"Never")		{ return NEVER;	}
+(?i:"Always")		{ return ALWAYS; }
 
 
 	/* Valid single characters for yyparse */
--- a/libfdcore/fdd.y	Fri Nov 15 11:38:30 2019 +0100
+++ b/libfdcore/fdd.y	Fri Nov 15 11:40:37 2019 +0100
@@ -107,6 +107,11 @@
 %token		NOTLS
 %token		SCTPSTREAMS
 %token		APPSERVTHREADS
+%token		ROUTINGINTHREADS
+%token		ROUTINGOUTTHREADS
+%token		QINLIMIT
+%token		QOUTLIMIT
+%token		QLOCALLIMIT
 %token		LISTENON
 %token		THRPERSRV
 %token		PROCESSINGPEERSPATTERN
@@ -147,6 +152,11 @@
 			| conffile processingpeersminimum
 			| conffile norelay
 			| conffile appservthreads
+			| conffile routinginthreads
+			| conffile routingoutthreads
+			| conffile qinlimit
+			| conffile qoutlimit
+			| conffile qlocallimit
 			| conffile noip
 			| conffile noip6
 			| conffile notcp
@@ -309,6 +319,46 @@
 			}
 			;
 
+routinginthreads:		ROUTINGINTHREADS '=' INTEGER ';'
+			{
+				CHECK_PARAMS_DO( ($3 > 0) && ($3 < 256),
+					{ yyerror (&yylloc, conf, "Invalid value"); YYERROR; } );
+				conf->cnf_rtinthr = (uint16_t)$3;
+			}
+			;
+
+routingoutthreads:		ROUTINGOUTTHREADS '=' INTEGER ';'
+			{
+				CHECK_PARAMS_DO( ($3 > 0) && ($3 < 256),
+					{ yyerror (&yylloc, conf, "Invalid value"); YYERROR; } );
+				conf->cnf_rtoutthr = (uint16_t)$3;
+			}
+			;
+
+qinlimit:		QINLIMIT '=' INTEGER ';'
+			{
+				CHECK_PARAMS_DO( ($3 >= 0),
+					{ yyerror (&yylloc, conf, "Invalid value"); YYERROR; } );
+				conf->cnf_qin_limit = $3;
+			}
+			;
+
+qoutlimit:		QOUTLIMIT '=' INTEGER ';'
+			{
+				CHECK_PARAMS_DO( ($3 >= 0),
+					{ yyerror (&yylloc, conf, "Invalid value"); YYERROR; } );
+				conf->cnf_qout_limit = $3;
+			}
+			;
+
+qlocallimit:		QLOCALLIMIT '=' INTEGER ';'
+			{
+				CHECK_PARAMS_DO( ($3 >= 0),
+					{ yyerror (&yylloc, conf, "Invalid value"); YYERROR; } );
+				conf->cnf_qlocal_limit = $3;
+			}
+			;
+
 noip:			NOIP ';'
 			{
 				if (got_peer_noipv6) { 
--- a/libfdcore/queues.c	Fri Nov 15 11:38:30 2019 +0100
+++ b/libfdcore/queues.c	Fri Nov 15 11:40:37 2019 +0100
@@ -44,9 +44,19 @@
 int fd_queues_init(void)
 {
 	TRACE_ENTRY();
-	CHECK_FCT( fd_fifo_new ( &fd_g_incoming, 20 ) );
-	CHECK_FCT( fd_fifo_new ( &fd_g_outgoing, 30 ) );
-	CHECK_FCT( fd_fifo_new ( &fd_g_local, 25 ) );
+	CHECK_FCT( fd_fifo_new ( &fd_g_incoming, fd_g_config->cnf_qin_limit ) );
+	CHECK_FCT( fd_fifo_new ( &fd_g_outgoing, fd_g_config->cnf_qout_limit ) );
+	CHECK_FCT( fd_fifo_new ( &fd_g_local,    fd_g_config->cnf_qlocal_limit ) );
+	return 0;
+}
+
+/* Resize according to values given in configuration file */
+int fd_queues_init_after_conf(void)
+{
+	TRACE_ENTRY();
+	CHECK_FCT( fd_fifo_set_max ( fd_g_incoming, fd_g_config->cnf_qin_limit ) );
+	CHECK_FCT( fd_fifo_set_max ( fd_g_outgoing, fd_g_config->cnf_qout_limit ) );
+	CHECK_FCT( fd_fifo_set_max ( fd_g_local,    fd_g_config->cnf_qlocal_limit ) );
 	return 0;
 }
 
--- a/libfdcore/routing_dispatch.c	Fri Nov 15 11:38:30 2019 +0100
+++ b/libfdcore/routing_dispatch.c	Fri Nov 15 11:40:37 2019 +0100
@@ -35,6 +35,13 @@
 
 #include "fdcore-internal.h"
 
+#ifdef linux
+/* This needs -D_USE_GNU, and since I have no idea what else that does, let's simply copy the declaration. */
+
+/* Set thread name visible in the kernel and its interfaces.  */
+extern int pthread_setname_np (pthread_t __target_thread, const char *__name);
+#endif
+
 /********************************************************************************/
 /*              First part : handling the extensions callbacks                  */
 /********************************************************************************/
@@ -1152,28 +1159,44 @@
 static pthread_t * dispatch = NULL;
 static enum thread_state * disp_state = NULL;
 
-/* Later: make this more dynamic */
-static pthread_t rt_out = (pthread_t)NULL;
-static enum thread_state out_state = NOTRUNNING;
+static pthread_t * rt_out = NULL;
+static enum thread_state * out_state = NULL;
 
-static pthread_t rt_in  = (pthread_t)NULL;
-static enum thread_state in_state = NOTRUNNING;
+static pthread_t * rt_in  = NULL;
+static enum thread_state * in_state = NULL;
 
 /* Initialize the routing and dispatch threads */
 int fd_rtdisp_init(void)
 {
 	int i;
 	
-	/* Prepare the array for dispatch */
+	/* Prepare the array for threads */
 	CHECK_MALLOC( disp_state = calloc(fd_g_config->cnf_dispthr, sizeof(enum thread_state)) );
 	CHECK_MALLOC( dispatch = calloc(fd_g_config->cnf_dispthr, sizeof(pthread_t)) );
+	CHECK_MALLOC( out_state = calloc(fd_g_config->cnf_rtoutthr, sizeof(enum thread_state)) );
+	CHECK_MALLOC( rt_out = calloc(fd_g_config->cnf_rtoutthr, sizeof(pthread_t)) );
+	CHECK_MALLOC( in_state = calloc(fd_g_config->cnf_rtinthr, sizeof(enum thread_state)) );
+	CHECK_MALLOC( rt_in = calloc(fd_g_config->cnf_rtinthr, sizeof(pthread_t)) );
 	
 	/* Create the threads */
 	for (i=0; i < fd_g_config->cnf_dispthr; i++) {
 		CHECK_POSIX( pthread_create( &dispatch[i], NULL, dispatch_thr, &disp_state[i] ) );
+#ifdef linux
+		pthread_setname_np(dispatch[i], "fd-dispatch");
+#endif
 	}
-	CHECK_POSIX( pthread_create( &rt_out, NULL, routing_out_thr, &out_state) );
-	CHECK_POSIX( pthread_create( &rt_in,  NULL, routing_in_thr,  &in_state) );
+	for (i=0; i < fd_g_config->cnf_rtoutthr; i++) {
+		CHECK_POSIX( pthread_create( &rt_out[i], NULL, routing_out_thr, &out_state[i] ) );
+#ifdef linux
+		pthread_setname_np(rt_out[i], "fd-routing-out");
+#endif
+	}
+	for (i=0; i < fd_g_config->cnf_rtinthr; i++) {
+		CHECK_POSIX( pthread_create( &rt_in[i], NULL, routing_in_thr, &in_state[i] ) );
+#ifdef linux
+		pthread_setname_np(rt_in[i], "fd-routing-in");
+#endif
+	}
 	
 	/* Later: TODO("Set the thresholds for the queues to create more threads as needed"); */
 	
@@ -1244,13 +1267,33 @@
 	CHECK_FCT_DO( fd_queues_fini(&fd_g_incoming), /* ignore */);
 	
 	/* Stop the routing IN thread */
-	stop_thread_delayed(&in_state, &rt_in, "IN routing");
+	if (rt_in != NULL) {
+		for (i=0; i < fd_g_config->cnf_rtinthr; i++) {
+			stop_thread_delayed(&in_state[i], &rt_in[i], "IN routing");
+		}
+		free(rt_in);
+		rt_in = NULL;
+	}
+	if (in_state != NULL) {
+		free(in_state);
+		in_state = NULL;
+	}
 	
 	/* Destroy the outgoing queue */
 	CHECK_FCT_DO( fd_queues_fini(&fd_g_outgoing), /* ignore */);
 	
 	/* Stop the routing OUT thread */
-	stop_thread_delayed(&out_state, &rt_out, "OUT routing");
+	if (rt_out != NULL) {
+		for (i=0; i < fd_g_config->cnf_rtinthr; i++) {
+			stop_thread_delayed(&out_state[i], &rt_out[i], "OUT routing");
+		}
+		free(rt_out);
+		rt_out = NULL;
+	}
+	if (out_state != NULL) {
+		free(out_state);
+		out_state = NULL;
+	}
 	
 	/* Destroy the local queue */
 	CHECK_FCT_DO( fd_queues_fini(&fd_g_local), /* ignore */);
"Welcome to our mercurial repository"