diff libfdcore/server.c @ 1214:76ac4bb75f0e

Merged with latest proposed version
author Sebastien Decugis <sdecugis@freediameter.net>
date Mon, 17 Jun 2013 10:11:57 +0800
parents 9ff57791a5ab
children
line wrap: on
line diff
--- a/libfdcore/server.c	Tue Jun 11 18:13:29 2013 +0800
+++ b/libfdcore/server.c	Mon Jun 17 10:11:57 2013 +0800
@@ -55,24 +55,18 @@
 	int 		proto;		/* IPPROTO_TCP or IPPROTO_SCTP */
 	int 		secur;		/* TLS is started immediatly after connection ? 0: no; 1: yes (TLS/TCP or DTLS/SCTP); 2: yes (TLS/TCP or TLS/SCTP) */
 	
-	pthread_t	thr;		/* The thread listening for new connections */
+	pthread_t	thr;		/* The thread waiting for new connections (will store the data in the clients fifo) */
 	enum s_state	state;		/* state of the thread */
 	
-	struct fd_list	clients;	/* List of clients connected to this server, not yet identified */
-	pthread_mutex_t	clients_mtx;	/* Mutex to protect the list of clients */
+	struct fifo	*pending;	/* FIFO of struct cnxctx */
+	struct pool_workers {
+		struct server * s;	/* pointer to the parent server structure */
+		int		id;	/* The worker id for logs */
+		pthread_t	worker; /* The thread */
+	}		*workers;	/* array of cnf_thr_srv items */
 };
 
 
-/* Client information (connecting peer for which we don't have the CER yet) */
-struct client {
-	struct fd_list	 chain;	/* link in the server's list of clients */
-	struct cnxctx	*conn;	/* Parameters of the connection */
-	struct timespec	 ts;	/* Deadline for receiving CER (after INCNX_TIMEOUT) */
-	pthread_t	 thr; 	/* connection state machine */
-};
-
-
-
 /* Micro functions to read/change the status thread-safely */
 static pthread_mutex_t s_lock = PTHREAD_MUTEX_INITIALIZER;
 static enum s_state get_status(struct server * s)
@@ -91,11 +85,17 @@
 }
 	
 
+/* dump one item of the server->pending fifo */
+static DECLARE_FD_DUMP_PROTOTYPE(dump_cnx, void * item) {
+	struct cnxctx * c = item;
+	CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " '%s'", fd_cnx_getid(c)), return NULL);
+	return *buf;
+}
 
 /* Dump all servers information */
 DECLARE_FD_DUMP_PROTOTYPE(fd_servers_dump, int details)
 {
-	struct fd_list * li, *cli;
+	struct fd_list * li;
 	
 	FD_DUMP_HANDLE_OFFSET();
 	
@@ -112,13 +112,7 @@
 					((st == TERMINATED) ? "Thread terminated" :
 								  "Thread status unknown"))), return NULL);
 			/* Dump the client list of this server */
-			CHECK_POSIX_DO( pthread_mutex_lock(&s->clients_mtx), );
-			for (cli = s->clients.next; cli != &s->clients; cli = cli->next) {
-				struct client * c = (struct client *)cli;
-				char bufts[128];
-				CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n  {client}(@%p)'%s': to:%s", c, fd_cnx_getid(c->conn), fd_log_time(&c->ts, bufts, sizeof(bufts))), break);
-			}
-			CHECK_POSIX_DO( pthread_mutex_unlock(&s->clients_mtx), );
+			CHECK_MALLOC_DO( fd_fifo_dump(FD_DUMP_STD_PARAMS, "pending connections", s->pending, dump_cnx), return NULL );
 
 			if (li->next != &FD_SERVERS) {
 				CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n"), return NULL);
@@ -133,72 +127,83 @@
 }
 
 
-/* The state machine to handle incoming connection before the remote peer is identified */
-static void * client_sm(void * arg)
+/* The thread in the pool for handling new clients connecting to a server */
+static void * client_worker(void * arg)
 {
-	struct client * c = arg;
-	struct server * s = NULL;
+	struct pool_workers * pw = arg;
+	struct server * s = pw->s;
+	struct cnxctx * c = NULL;
+	int fatal = 0;
+	struct timespec ts;
 	struct fd_cnx_rcvdata rcv_data;
 	struct fd_msg_pmdl * pmdl = NULL;
 	struct msg    * msg = NULL;
 	struct msg_hdr *hdr = NULL;
 	struct fd_pei pei;
 	
-	TRACE_ENTRY("%p", c);
+	TRACE_ENTRY("%p", arg);
 	
+	/* Set the thread name */
+	{
+		char buf[48];
+		snprintf(buf, sizeof(buf), "Worker#%d[%s%s]", pw->id, IPPROTO_NAME(s->proto), s->secur?", Sec" : "");
+		fd_log_threadname ( buf );
+	}
+	
+	/* Loop until canceled / error */
+next_client:
+	LOG_A("Ready to process next incoming connection");
+
 	memset(&rcv_data, 0, sizeof(rcv_data));
 	
-	CHECK_PARAMS_DO(c && c->conn && c->chain.head, goto fatal_error );
-	
-	
-	s = c->chain.head->o;
-	
-	/* Name the current thread */
-	fd_log_threadname ( fd_cnx_getid(c->conn) );
-	
+	/* Get the next connection */
+	CHECK_FCT_DO( fd_fifo_get( s->pending, &c ), { fatal = 1; goto cleanup; } );
+
 	/* Handshake if we are a secure server port, or start clear otherwise */
 	if (s->secur) {
-		int ret = fd_cnx_handshake(c->conn, GNUTLS_SERVER, (s->secur == 1) ? ALGO_HANDSHAKE_DEFAULT : ALGO_HANDSHAKE_3436, NULL, NULL);
+		LOG_D("Starting handshake with %s", fd_cnx_getid(c));
+
+		int ret = fd_cnx_handshake(c, GNUTLS_SERVER, (s->secur == 1) ? ALGO_HANDSHAKE_DEFAULT : ALGO_HANDSHAKE_3436, NULL, NULL);
 		if (ret != 0) {
 			char buf[1024];
-			snprintf(buf, sizeof(buf), "TLS handshake failed for client '%s', connection aborted.", fd_cnx_getid(c->conn));
-			
+			snprintf(buf, sizeof(buf), "TLS handshake failed for connection '%s', connection closed.", fd_cnx_getid(c));
+
 			fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL);
-			
+
 			goto cleanup;
 		}
 	} else {
-		CHECK_FCT_DO( fd_cnx_start_clear(c->conn, 0), goto cleanup );
+		CHECK_FCT_DO( fd_cnx_start_clear(c, 0), goto cleanup );
 	}
 	
 	/* Set the timeout to receive the first message */
-	CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &c->ts), goto fatal_error );
-	c->ts.tv_sec += INCNX_TIMEOUT;
+	CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &ts), { fatal = 1; goto cleanup; } );
+	ts.tv_sec += INCNX_TIMEOUT;
 	
 	/* Receive the first Diameter message on the connection -- cleanup in case of timeout */
-	CHECK_FCT_DO( fd_cnx_receive(c->conn, &c->ts, &rcv_data.buffer, &rcv_data.length), 
+	CHECK_FCT_DO( fd_cnx_receive(c, &ts, &rcv_data.buffer, &rcv_data.length), 
 		{
 			char buf[1024];
 			
 			switch (__ret__) {
 			case ETIMEDOUT:
-				snprintf(buf, sizeof(buf), "Client '%s' did not send CER within %ds, connection aborted.", fd_cnx_getid(c->conn), INCNX_TIMEOUT);
+				snprintf(buf, sizeof(buf), "Client '%s' did not send CER within %ds, connection aborted.", fd_cnx_getid(c), INCNX_TIMEOUT);
 				fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL);
 				break;
 			
 			case ENOTCONN:
-				snprintf(buf, sizeof(buf), "Connection from '%s' in error before CER was received.", fd_cnx_getid(c->conn));
+				snprintf(buf, sizeof(buf), "Connection from '%s' in error before CER was received.", fd_cnx_getid(c));
 				fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL);
 				break;
 			
 			default:
-				snprintf(buf, sizeof(buf), "Connection from '%s': unspecified error, connection aborted.", fd_cnx_getid(c->conn));
+				snprintf(buf, sizeof(buf), "Connection from '%s': unspecified error, connection aborted.", fd_cnx_getid(c));
 				fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL);
 			}
 			goto cleanup;
 		} );
 	
-	TRACE_DEBUG(FULL, "Received %zdb from new client '%s'", rcv_data.length, fd_cnx_getid(c->conn));
+	TRACE_DEBUG(FULL, "Received %zdb from new client '%s'", rcv_data.length, fd_cnx_getid(c));
 	
 	pmdl = fd_msg_pmdl_get_inbuf(rcv_data.buffer, rcv_data.length);
 	
@@ -211,7 +216,7 @@
 	
 	/* Log incoming message */
 	fd_hook_associate(msg, pmdl);
-	fd_hook_call(HOOK_MESSAGE_RECEIVED, msg, NULL, fd_cnx_getid(c->conn), fd_msg_pmdl_get(msg));
+	fd_hook_call(HOOK_MESSAGE_RECEIVED, msg, NULL, fd_cnx_getid(c), fd_msg_pmdl_get(msg));
 	
 	/* We expect a CER, it must parse with our dictionary and rules */
 	CHECK_FCT_DO( fd_msg_parse_rules( msg, fd_g_config->cnf_dict, &pei ), 
@@ -220,57 +225,52 @@
 			
 			fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msg, NULL, pei.pei_message ?: pei.pei_errcode, fd_msg_pmdl_get(msg));
 			
-			snprintf(buf, sizeof(buf), "Error parsing CER from '%s', connection aborted.", fd_cnx_getid(c->conn));
+			snprintf(buf, sizeof(buf), "Error parsing CER from '%s', connection aborted.", fd_cnx_getid(c));
 			fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL);
 			
 			goto cleanup;
 		} );
 	
 	/* Now check we received a CER */
-	CHECK_FCT_DO( fd_msg_hdr ( msg, &hdr ), goto fatal_error );
+	CHECK_FCT_DO( fd_msg_hdr ( msg, &hdr ), { fatal = 1; goto cleanup; }  );
 	CHECK_PARAMS_DO( (hdr->msg_appl == 0) && (hdr->msg_flags & CMD_FLAG_REQUEST) && (hdr->msg_code == CC_CAPABILITIES_EXCHANGE),
 		{ /* Parsing failed -- trace details */ 
 			char buf[1024];
-			snprintf(buf, sizeof(buf), "Expected CER from '%s', received a different message, connection aborted.", fd_cnx_getid(c->conn));
+			snprintf(buf, sizeof(buf), "Expected CER from '%s', received a different message, connection aborted.", fd_cnx_getid(c));
 			fd_hook_call(HOOK_PEER_CONNECT_FAILED, msg, NULL, buf, NULL);
 			goto cleanup;
 		} );
 	
-	/* Finally, pass the information to the peers module which will handle it next */
-	pthread_cleanup_push((void *)fd_cnx_destroy, c->conn);
+	/* Finally, pass the information to the peers module which will handle it in a separate thread */
+	pthread_cleanup_push((void *)fd_cnx_destroy, c);
 	pthread_cleanup_push((void *)fd_msg_free, msg);
-	CHECK_FCT_DO( fd_peer_handle_newCER( &msg, &c->conn ),  );
+	CHECK_FCT_DO( fd_peer_handle_newCER( &msg, &c ),  );
 	pthread_cleanup_pop(0);
 	pthread_cleanup_pop(0);
-	
-	/* The end, we cleanup the client structure */
+
 cleanup:
-	/* Unlink the client structure */
-	CHECK_POSIX_DO( pthread_mutex_lock(&s->clients_mtx), goto fatal_error );
-	fd_list_unlink( &c->chain );
-	CHECK_POSIX_DO( pthread_mutex_unlock(&s->clients_mtx), goto fatal_error );
-	
 	/* Cleanup the parsed message if any */
 	if (msg) {
 		CHECK_FCT_DO( fd_msg_free(msg), /* continue */);
+		msg = NULL;
 	}
 	
-	/* Destroy the connection object if present */
-	if (c->conn)
-		fd_cnx_destroy(c->conn);
+	/* Close the connection if needed */
+	if (c != NULL) {
+		fd_cnx_destroy(c);
+		c = NULL;
+	}
 	
 	/* Cleanup the received buffer if any */
 	free(rcv_data.buffer);
 	
-	/* Detach the thread, cleanup the client structure */
-	pthread_detach(pthread_self());
-	free(c);
+	
+	if (!fatal)
+		goto next_client;
+
+	LOG_E("Worker thread exiting.");
 	return NULL;
-	
-fatal_error:	/* This has effect to terminate the daemon */
-	CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), );
-	return NULL;
-}
+}	
 
 /* The thread managing a server */
 static void * serv_th(void * arg)
@@ -279,41 +279,31 @@
 	
 	CHECK_PARAMS_DO(s, goto error);
 	fd_log_threadname ( fd_cnx_getid(s->conn) );
+	
 	set_status(s, RUNNING);
 	
 	/* Accept incoming connections */
 	CHECK_FCT_DO( fd_cnx_serv_listen(s->conn), goto error );
 	
 	do {
-		struct client * c = NULL;
 		struct cnxctx * conn = NULL;
 		
 		/* Wait for a new client or cancel */
-		CHECK_MALLOC_DO( conn = fd_cnx_serv_accept(s->conn), goto error );
-		
-		/* Create a client structure */
-		CHECK_MALLOC_DO( c = malloc(sizeof(struct client)), goto error );
-		memset(c, 0, sizeof(struct client));
-		fd_list_init(&c->chain, c);
-		c->conn = conn;
+		CHECK_MALLOC_DO( conn = fd_cnx_serv_accept(s->conn), break );
 		
-		/* Save the client in the list */
-		CHECK_POSIX_DO( pthread_mutex_lock( &s->clients_mtx ), goto error );
-		fd_list_insert_before(&s->clients, &c->chain);
-		CHECK_POSIX_DO( pthread_mutex_unlock( &s->clients_mtx ), goto error );
-
-		/* Start the client thread */
-		CHECK_POSIX_DO( pthread_create( &c->thr, NULL, client_sm, c ), goto error );
+		/* Store this connection in the fifo for processing by the worker pool. Will block when the fifo is full */
+		pthread_cleanup_push((void *)fd_cnx_destroy, conn);
+		CHECK_FCT_DO( fd_fifo_post( s->pending, &conn ), break );
+		pthread_cleanup_pop(0);
 		
 	} while (1);
-	
 error:	
 	if (s)
 		set_status(s, TERMINATED);
 
 	/* Send error signal to the core */
 	LOG_F( "An error occurred in server module! Thread is terminating...");
-	CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), );
+	CHECK_FCT_DO(fd_core_shutdown(), );
 
 	return NULL;
 }
@@ -323,6 +313,7 @@
 static struct server * new_serv( int proto, int secur )
 {
 	struct server * new;
+	int i;
 	
 	/* New server structure */
 	CHECK_MALLOC_DO( new = malloc(sizeof(struct server)), return NULL );
@@ -331,8 +322,16 @@
 	fd_list_init(&new->chain, new);
 	new->proto = proto;
 	new->secur = secur;
-	CHECK_POSIX_DO( pthread_mutex_init(&new->clients_mtx, NULL), return NULL );
-	fd_list_init(&new->clients, new);
+	
+	CHECK_FCT_DO( fd_fifo_new(&new->pending, 5), return NULL);
+	CHECK_MALLOC_DO( new->workers = calloc( fd_g_config->cnf_thr_srv, sizeof(struct pool_workers) ), return NULL );
+	
+	for (i = 0; i < fd_g_config->cnf_thr_srv; i++) {
+		/* Create the pool */
+		new->workers[i].s = new;
+		new->workers[i].id = i;
+		CHECK_POSIX_DO( pthread_create( &new->workers[i].worker, NULL, client_worker, &new->workers[i]), return NULL );
+	}
 	
 	return new;
 }
@@ -478,9 +477,8 @@
 	/* Loop on all servers */
 	while (!FD_IS_LIST_EMPTY(&FD_SERVERS)) {
 		struct server * s = (struct server *)(FD_SERVERS.next);
-		
-		/* Lock client list now */
-		CHECK_FCT_DO( pthread_mutex_lock(&s->clients_mtx), /* continue anyway */);
+		int i;
+		struct cnxctx * c;
 		
 		/* cancel thread */
 		CHECK_FCT_DO( fd_thr_term(&s->thr), /* continue */);
@@ -488,23 +486,18 @@
 		/* destroy server connection context */
 		fd_cnx_destroy(s->conn);
 		
-		/* cancel and destroy all clients */
-		while (!FD_IS_LIST_EMPTY(&s->clients)) {
-			struct client * c = (struct client *)(s->clients.next);
-			
-			/* Destroy client's thread */
-			CHECK_FCT_DO( fd_thr_term(&c->thr), /* continue */);
-			
-			/* Destroy client's connection */
-			fd_cnx_destroy(c->conn);
-			
-			/* Unlink and free the client */
-			fd_list_unlink(&c->chain);
-			free(c);
+		/* cancel and destroy all worker threads */
+		for (i = 0; i < fd_g_config->cnf_thr_srv; i++) {
+			/* Destroy worker thread */
+			CHECK_FCT_DO( fd_thr_term(&s->workers[i].worker), /* continue */);
 		}
-		/* Unlock & destroy */
-		CHECK_FCT_DO( pthread_mutex_unlock(&s->clients_mtx), /* continue anyway */);
-		CHECK_FCT_DO( pthread_mutex_destroy(&s->clients_mtx), /* continue */);
+		free(s->workers);
+		
+		/* Close any pending connection */
+		while ( fd_fifo_tryget( s->pending, &c ) == 0 ) {
+			fd_cnx_destroy(c);
+		}
+		CHECK_FCT_DO( fd_fifo_del(&s->pending), );
 		
 		/* Now destroy the server object */
 		fd_list_unlink(&s->chain);
"Welcome to our mercurial repository"