# HG changeset patch # User Sebastien Decugis # Date 1370852589 -28800 # Node ID 50bf33dc8fe0279747073003960c53670f7af80c # Parent f40de74bd1c7dcae558008ed5837916858100105 Limit number of incoming connections under processing to configurable value diff -r f40de74bd1c7 -r 50bf33dc8fe0 doc/freediameter.conf.sample --- a/doc/freediameter.conf.sample Mon Jun 10 14:27:15 2013 +0800 +++ b/doc/freediameter.conf.sample Mon Jun 10 16:23:09 2013 +0800 @@ -78,6 +78,16 @@ #ListenOn = "2001:200:903:2::202:1"; #ListenOn = "fe80::21c:5ff:fe98:7d62%eth0"; + +############################################################## +## Server configuration + +# How many Diameter peers are allowed to be connecting at the same time ? +# This parameter limits the number of incoming connections from the time +# the connection is accepted until the first CER is received. +# Default: 5 unidentified clients in paralel. +#ThreadsPerServer = 5; + ############################################################## ## TLS Configuration diff -r f40de74bd1c7 -r 50bf33dc8fe0 include/freeDiameter/libfdcore.h --- a/include/freeDiameter/libfdcore.h Mon Jun 10 14:27:15 2013 +0800 +++ b/include/freeDiameter/libfdcore.h Mon Jun 10 16:23:09 2013 +0800 @@ -131,6 +131,7 @@ uint16_t cnf_port_3436; /* Open an additional server port to listen to old TLS/SCTP clients (RFC3436, freeDiameter versions < 1.2.0) */ uint16_t cnf_sctp_str; /* default max number of streams for SCTP associations (def: 30) */ struct fd_list cnf_endpoints; /* the local endpoints to bind the server to. list of struct fd_endpoint. default is empty (bind all). After servers are started, this is the actual list of endpoints including port information. */ + int cnf_thr_srv; /* Number of threads per servers handling the connection state machines */ struct fd_list cnf_apps; /* Applications locally supported (except relay, see flags). Use fd_disp_app_support to add one. list of struct fd_app. */ uint16_t cnf_dispthr; /* Number of dispatch threads to create */ struct { diff -r f40de74bd1c7 -r 50bf33dc8fe0 libfdcore/config.c --- a/libfdcore/config.c Mon Jun 10 14:27:15 2013 +0800 +++ b/libfdcore/config.c Mon Jun 10 16:23:09 2013 +0800 @@ -58,6 +58,7 @@ fd_g_config->cnf_port = DIAMETER_PORT; fd_g_config->cnf_port_tls = DIAMETER_SECURE_PORT; fd_g_config->cnf_sctp_str = 30; + fd_g_config->cnf_thr_srv = 5; fd_g_config->cnf_dispthr = 4; fd_list_init(&fd_g_config->cnf_endpoints, NULL); fd_list_init(&fd_g_config->cnf_apps, NULL); @@ -97,7 +98,8 @@ CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Local SCTP TLS port .... : %hu\n", fd_g_config->cnf_port_3436), return NULL); } CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Number of SCTP streams . : %hu\n", fd_g_config->cnf_sctp_str), return NULL); - CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Number of server threads : %hu\n", fd_g_config->cnf_dispthr), return NULL); + CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Number of clients thr .. : %d\n", fd_g_config->cnf_thr_srv), return NULL); + CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Number of app threads .. : %hu\n", fd_g_config->cnf_dispthr), return NULL); if (FD_IS_LIST_EMPTY(&fd_g_config->cnf_endpoints)) { CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Local endpoints ........ : Default (use all available)\n"), return NULL); } else { diff -r f40de74bd1c7 -r 50bf33dc8fe0 libfdcore/fdd.l --- a/libfdcore/fdd.l Mon Jun 10 14:27:15 2013 +0800 +++ b/libfdcore/fdd.l Mon Jun 10 16:23:09 2013 +0800 @@ -255,6 +255,7 @@ (?i:"SCTP_streams") { return SCTPSTREAMS; } (?i:"AppServThreads") { return APPSERVTHREADS;} (?i:"ListenOn") { return LISTENON; } +(?i:"ThreadsPerServer") { return THRPERSRV; } (?i:"TcTimer") { return TCTIMER; } (?i:"TwTimer") { return TWTIMER; } (?i:"NoRelay") { return NORELAY; } diff -r f40de74bd1c7 -r 50bf33dc8fe0 libfdcore/fdd.y --- a/libfdcore/fdd.y Mon Jun 10 14:27:15 2013 +0800 +++ b/libfdcore/fdd.y Mon Jun 10 16:23:09 2013 +0800 @@ -108,6 +108,7 @@ %token SCTPSTREAMS %token APPSERVTHREADS %token LISTENON +%token THRPERSRV %token TCTIMER %token TWTIMER %token NORELAY @@ -136,6 +137,7 @@ | conffile sec3436 | conffile sctpstreams | conffile listenon + | conffile thrpersrv | conffile norelay | conffile appservthreads | conffile noip @@ -238,6 +240,14 @@ } ; +thrpersrv: THRPERSRV '=' INTEGER ';' + { + CHECK_PARAMS_DO( ($3 > 0), + { yyerror (&yylloc, conf, "Invalid value"); YYERROR; } ); + conf->cnf_thr_srv = $3; + } + ; + norelay: NORELAY ';' { conf->cnf_flags.no_fwd = 1; diff -r f40de74bd1c7 -r 50bf33dc8fe0 libfdcore/server.c --- a/libfdcore/server.c Mon Jun 10 14:27:15 2013 +0800 +++ b/libfdcore/server.c Mon Jun 10 16:23:09 2013 +0800 @@ -53,26 +53,20 @@ struct cnxctx * conn; /* server connection context (listening socket) */ int proto; /* IPPROTO_TCP or IPPROTO_SCTP */ - int secur; /* TLS is started immediatly after connection ? 0: no; 2: yes (TLS/TCP or TLS/SCTP) */ + int secur; /* TLS is started immediatly after connection ? 0: no; 1: RFU; 2: yes (TLS/TCP or TLS/SCTP) */ - pthread_t thr; /* The thread listening for new connections */ + pthread_t thr; /* The thread waiting for new connections (will store the data in the clients fifo) */ enum s_state state; /* state of the thread */ - struct fd_list clients; /* List of clients connected to this server, not yet identified */ - pthread_mutex_t clients_mtx; /* Mutex to protect the list of clients */ + struct fifo *pending; /* FIFO of struct cnxctx */ + struct pool_workers { + struct server * s; /* pointer to the parent server structure */ + int id; /* The worker id for logs */ + pthread_t worker; /* The thread */ + } *workers; /* array of cnf_thr_srv items */ }; -/* Client information (connecting peer for which we don't have the CER yet) */ -struct client { - struct fd_list chain; /* link in the server's list of clients */ - struct cnxctx *conn; /* Parameters of the connection */ - struct timespec ts; /* Deadline for receiving CER (after INCNX_TIMEOUT) */ - pthread_t thr; /* connection state machine */ -}; - - - /* Micro functions to read/change the status thread-safely */ static pthread_mutex_t s_lock = PTHREAD_MUTEX_INITIALIZER; static enum s_state get_status(struct server * s) @@ -91,11 +85,17 @@ } +/* dump one item of the server->pending fifo */ +static DECLARE_FD_DUMP_PROTOTYPE(dump_cnx, void * item) { + struct cnxctx * c = item; + CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " '%s'", fd_cnx_getid(c)), return NULL); + return *buf; +} /* Dump all servers information */ DECLARE_FD_DUMP_PROTOTYPE(fd_servers_dump, int details) { - struct fd_list * li, *cli; + struct fd_list * li; FD_DUMP_HANDLE_OFFSET(); @@ -112,13 +112,7 @@ ((st == TERMINATED) ? "Thread terminated" : "Thread status unknown"))), return NULL); /* Dump the client list of this server */ - CHECK_POSIX_DO( pthread_mutex_lock(&s->clients_mtx), ); - for (cli = s->clients.next; cli != &s->clients; cli = cli->next) { - struct client * c = (struct client *)cli; - char bufts[128]; - CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n {client}(@%p)'%s': to:%s", c, fd_cnx_getid(c->conn), fd_log_time(&c->ts, bufts, sizeof(bufts))), break); - } - CHECK_POSIX_DO( pthread_mutex_unlock(&s->clients_mtx), ); + CHECK_MALLOC_DO( fd_fifo_dump(FD_DUMP_STD_PARAMS, "pending connections", s->pending, dump_cnx), return NULL ); if (li->next != &FD_SERVERS) { CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n"), return NULL); @@ -133,72 +127,83 @@ } -/* The state machine to handle incoming connection before the remote peer is identified */ -static void * client_sm(void * arg) +/* The thread in the pool for handling new clients connecting to a server */ +static void * client_worker(void * arg) { - struct client * c = arg; - struct server * s = NULL; + struct pool_workers * pw = arg; + struct server * s = pw->s; + struct cnxctx * c = NULL; + int fatal = 0; + struct timespec ts; struct fd_cnx_rcvdata rcv_data; struct fd_msg_pmdl * pmdl = NULL; struct msg * msg = NULL; struct msg_hdr *hdr = NULL; struct fd_pei pei; - TRACE_ENTRY("%p", c); + TRACE_ENTRY("%p", arg); + + /* Set the thread name */ + { + char buf[48]; + snprintf(buf, sizeof(buf), "Worker#%d@%s", pw->id, fd_cnx_getid(s->conn)); + fd_log_threadname ( buf ); + } + + /* Loop until canceled / error */ +next_client: + LOG_A("Ready to process next incoming connection"); + + /* Get the next connection */ + CHECK_FCT_DO( fd_fifo_get( s->pending, &c ), { fatal = 1; goto cleanup; } ); + + /* Handshake if we are a secure server port, or start clear otherwise */ + if (s->secur) { + LOG_D("Starting handshake with %s", fd_cnx_getid(c)); + + int ret = fd_cnx_handshake(c, GNUTLS_SERVER, (s->secur == 1) ? ALGO_HANDSHAKE_DEFAULT : ALGO_HANDSHAKE_3436, NULL, NULL); + if (ret != 0) { + char buf[1024]; + snprintf(buf, sizeof(buf), "TLS handshake failed for connection '%s', connection closed.", fd_cnx_getid(c)); + + fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL); + + goto cleanup; + } + } else { + CHECK_FCT_DO( fd_cnx_start_clear(c, 0), goto cleanup ); + } + + /* Set the timeout to receive the first message */ + CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &ts), { fatal = 1; goto cleanup; } ); + ts.tv_sec += INCNX_TIMEOUT; memset(&rcv_data, 0, sizeof(rcv_data)); - CHECK_PARAMS_DO(c && c->conn && c->chain.head, goto fatal_error ); - - - s = c->chain.head->o; - - /* Name the current thread */ - fd_log_threadname ( fd_cnx_getid(c->conn) ); - - /* Handshake if we are a secure server port, or start clear otherwise */ - if (s->secur) { - int ret = fd_cnx_handshake(c->conn, GNUTLS_SERVER, (s->secur == 1) ? ALGO_HANDSHAKE_DEFAULT : ALGO_HANDSHAKE_3436, NULL, NULL); - if (ret != 0) { - char buf[1024]; - snprintf(buf, sizeof(buf), "TLS handshake failed for client '%s', connection aborted.", fd_cnx_getid(c->conn)); - - fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL); - - goto cleanup; - } - } else { - CHECK_FCT_DO( fd_cnx_start_clear(c->conn, 0), goto cleanup ); - } - - /* Set the timeout to receive the first message */ - CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &c->ts), goto fatal_error ); - c->ts.tv_sec += INCNX_TIMEOUT; - /* Receive the first Diameter message on the connection -- cleanup in case of timeout */ - CHECK_FCT_DO( fd_cnx_receive(c->conn, &c->ts, &rcv_data.buffer, &rcv_data.length), + CHECK_FCT_DO( fd_cnx_receive(c, &ts, &rcv_data.buffer, &rcv_data.length), { char buf[1024]; switch (__ret__) { case ETIMEDOUT: - snprintf(buf, sizeof(buf), "Client '%s' did not send CER within %ds, connection aborted.", fd_cnx_getid(c->conn), INCNX_TIMEOUT); + snprintf(buf, sizeof(buf), "Client '%s' did not send CER within %ds, connection aborted.", fd_cnx_getid(c), INCNX_TIMEOUT); fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL); break; case ENOTCONN: - snprintf(buf, sizeof(buf), "Connection from '%s' in error before CER was received.", fd_cnx_getid(c->conn)); + snprintf(buf, sizeof(buf), "Connection from '%s' in error before CER was received.", fd_cnx_getid(c)); fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL); break; default: - snprintf(buf, sizeof(buf), "Connection from '%s': unspecified error, connection aborted.", fd_cnx_getid(c->conn)); + snprintf(buf, sizeof(buf), "Connection from '%s': unspecified error, connection aborted.", fd_cnx_getid(c)); fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL); } goto cleanup; } ); - TRACE_DEBUG(FULL, "Received %zdb from new client '%s'", rcv_data.length, fd_cnx_getid(c->conn)); + TRACE_DEBUG(FULL, "Received %zdb from new client '%s'", rcv_data.length, fd_cnx_getid(c)); pmdl = fd_msg_pmdl_get_inbuf(rcv_data.buffer, rcv_data.length); @@ -211,7 +216,7 @@ /* Log incoming message */ fd_hook_associate(msg, pmdl); - fd_hook_call(HOOK_MESSAGE_RECEIVED, msg, NULL, fd_cnx_getid(c->conn), fd_msg_pmdl_get(msg)); + fd_hook_call(HOOK_MESSAGE_RECEIVED, msg, NULL, fd_cnx_getid(c), fd_msg_pmdl_get(msg)); /* We expect a CER, it must parse with our dictionary and rules */ CHECK_FCT_DO( fd_msg_parse_rules( msg, fd_g_config->cnf_dict, &pei ), @@ -220,57 +225,52 @@ fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msg, NULL, pei.pei_message ?: pei.pei_errcode, fd_msg_pmdl_get(msg)); - snprintf(buf, sizeof(buf), "Error parsing CER from '%s', connection aborted.", fd_cnx_getid(c->conn)); + snprintf(buf, sizeof(buf), "Error parsing CER from '%s', connection aborted.", fd_cnx_getid(c)); fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL); goto cleanup; } ); /* Now check we received a CER */ - CHECK_FCT_DO( fd_msg_hdr ( msg, &hdr ), goto fatal_error ); + CHECK_FCT_DO( fd_msg_hdr ( msg, &hdr ), { fatal = 1; goto cleanup; } ); CHECK_PARAMS_DO( (hdr->msg_appl == 0) && (hdr->msg_flags & CMD_FLAG_REQUEST) && (hdr->msg_code == CC_CAPABILITIES_EXCHANGE), { /* Parsing failed -- trace details */ char buf[1024]; - snprintf(buf, sizeof(buf), "Expected CER from '%s', received a different message, connection aborted.", fd_cnx_getid(c->conn)); + snprintf(buf, sizeof(buf), "Expected CER from '%s', received a different message, connection aborted.", fd_cnx_getid(c)); fd_hook_call(HOOK_PEER_CONNECT_FAILED, msg, NULL, buf, NULL); goto cleanup; } ); - /* Finally, pass the information to the peers module which will handle it next */ - pthread_cleanup_push((void *)fd_cnx_destroy, c->conn); + /* Finally, pass the information to the peers module which will handle it in a separate thread */ + pthread_cleanup_push((void *)fd_cnx_destroy, c); pthread_cleanup_push((void *)fd_msg_free, msg); - CHECK_FCT_DO( fd_peer_handle_newCER( &msg, &c->conn ), ); + CHECK_FCT_DO( fd_peer_handle_newCER( &msg, &c ), ); pthread_cleanup_pop(0); pthread_cleanup_pop(0); - - /* The end, we cleanup the client structure */ + cleanup: - /* Unlink the client structure */ - CHECK_POSIX_DO( pthread_mutex_lock(&s->clients_mtx), goto fatal_error ); - fd_list_unlink( &c->chain ); - CHECK_POSIX_DO( pthread_mutex_unlock(&s->clients_mtx), goto fatal_error ); - /* Cleanup the parsed message if any */ if (msg) { CHECK_FCT_DO( fd_msg_free(msg), /* continue */); + msg = NULL; } - /* Destroy the connection object if present */ - if (c->conn) - fd_cnx_destroy(c->conn); + /* Close the connection if needed */ + if (c != NULL) { + fd_cnx_destroy(c); + c = NULL; + } /* Cleanup the received buffer if any */ free(rcv_data.buffer); - /* Detach the thread, cleanup the client structure */ - pthread_detach(pthread_self()); - free(c); + + if (!fatal) + goto next_client; + + LOG_E("Worker thread exiting."); return NULL; - -fatal_error: /* This has effect to terminate the daemon */ - CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); - return NULL; -} +} /* The thread managing a server */ static void * serv_th(void * arg) @@ -279,34 +279,24 @@ CHECK_PARAMS_DO(s, goto error); fd_log_threadname ( fd_cnx_getid(s->conn) ); + set_status(s, RUNNING); /* Accept incoming connections */ CHECK_FCT_DO( fd_cnx_serv_listen(s->conn), goto error ); do { - struct client * c = NULL; struct cnxctx * conn = NULL; /* Wait for a new client or cancel */ - CHECK_MALLOC_DO( conn = fd_cnx_serv_accept(s->conn), goto error ); - - /* Create a client structure */ - CHECK_MALLOC_DO( c = malloc(sizeof(struct client)), goto error ); - memset(c, 0, sizeof(struct client)); - fd_list_init(&c->chain, c); - c->conn = conn; + CHECK_MALLOC_DO( conn = fd_cnx_serv_accept(s->conn), break ); - /* Save the client in the list */ - CHECK_POSIX_DO( pthread_mutex_lock( &s->clients_mtx ), goto error ); - fd_list_insert_before(&s->clients, &c->chain); - CHECK_POSIX_DO( pthread_mutex_unlock( &s->clients_mtx ), goto error ); - - /* Start the client thread */ - CHECK_POSIX_DO( pthread_create( &c->thr, NULL, client_sm, c ), goto error ); + /* Store this connection in the fifo for processing by the worker pool. Will block when the fifo is full */ + pthread_cleanup_push((void *)fd_cnx_destroy, conn); + CHECK_FCT_DO( fd_fifo_post( s->pending, &conn ), break ); + pthread_cleanup_pop(0); } while (1); - error: if (s) set_status(s, TERMINATED); @@ -323,6 +313,7 @@ static struct server * new_serv( int proto, int secur ) { struct server * new; + int i; /* New server structure */ CHECK_MALLOC_DO( new = malloc(sizeof(struct server)), return NULL ); @@ -331,8 +322,16 @@ fd_list_init(&new->chain, new); new->proto = proto; new->secur = secur; - CHECK_POSIX_DO( pthread_mutex_init(&new->clients_mtx, NULL), return NULL ); - fd_list_init(&new->clients, new); + + CHECK_FCT_DO( fd_fifo_new(&new->pending, 5), return NULL); + CHECK_MALLOC_DO( new->workers = calloc( fd_g_config->cnf_thr_srv, sizeof(struct pool_workers) ), return NULL ); + + for (i = 0; i < fd_g_config->cnf_thr_srv; i++) { + /* Create the pool */ + new->workers[i].s = new; + new->workers[i].id = i; + CHECK_POSIX_DO( pthread_create( &new->workers[i].worker, NULL, client_worker, &new->workers[i]), return NULL ); + } return new; } @@ -478,9 +477,8 @@ /* Loop on all servers */ while (!FD_IS_LIST_EMPTY(&FD_SERVERS)) { struct server * s = (struct server *)(FD_SERVERS.next); - - /* Lock client list now */ - CHECK_FCT_DO( pthread_mutex_lock(&s->clients_mtx), /* continue anyway */); + int i; + struct cnxctx * c; /* cancel thread */ CHECK_FCT_DO( fd_thr_term(&s->thr), /* continue */); @@ -488,23 +486,18 @@ /* destroy server connection context */ fd_cnx_destroy(s->conn); - /* cancel and destroy all clients */ - while (!FD_IS_LIST_EMPTY(&s->clients)) { - struct client * c = (struct client *)(s->clients.next); - - /* Destroy client's thread */ - CHECK_FCT_DO( fd_thr_term(&c->thr), /* continue */); - - /* Destroy client's connection */ - fd_cnx_destroy(c->conn); - - /* Unlink and free the client */ - fd_list_unlink(&c->chain); - free(c); + /* cancel and destroy all worker threads */ + for (i = 0; i < fd_g_config->cnf_thr_srv; i++) { + /* Destroy worker thread */ + CHECK_FCT_DO( fd_thr_term(&s->workers[i].worker), /* continue */); } - /* Unlock & destroy */ - CHECK_FCT_DO( pthread_mutex_unlock(&s->clients_mtx), /* continue anyway */); - CHECK_FCT_DO( pthread_mutex_destroy(&s->clients_mtx), /* continue */); + free(s->workers); + + /* Close any pending connection */ + while ( fd_fifo_tryget( s->pending, &c ) == 0 ) { + fd_cnx_destroy(c); + } + CHECK_FCT_DO( fd_fifo_del(&s->pending), ); /* Now destroy the server object */ fd_list_unlink(&s->chain);