Mercurial > hg > freeDiameter-dtls
diff libfdcore/server.c @ 1214:76ac4bb75f0e
Merged with latest proposed version
author | Sebastien Decugis <sdecugis@freediameter.net> |
---|---|
date | Mon, 17 Jun 2013 10:11:57 +0800 |
parents | 9ff57791a5ab |
children |
line wrap: on
line diff
--- a/libfdcore/server.c Tue Jun 11 18:13:29 2013 +0800 +++ b/libfdcore/server.c Mon Jun 17 10:11:57 2013 +0800 @@ -55,24 +55,18 @@ int proto; /* IPPROTO_TCP or IPPROTO_SCTP */ int secur; /* TLS is started immediatly after connection ? 0: no; 1: yes (TLS/TCP or DTLS/SCTP); 2: yes (TLS/TCP or TLS/SCTP) */ - pthread_t thr; /* The thread listening for new connections */ + pthread_t thr; /* The thread waiting for new connections (will store the data in the clients fifo) */ enum s_state state; /* state of the thread */ - struct fd_list clients; /* List of clients connected to this server, not yet identified */ - pthread_mutex_t clients_mtx; /* Mutex to protect the list of clients */ + struct fifo *pending; /* FIFO of struct cnxctx */ + struct pool_workers { + struct server * s; /* pointer to the parent server structure */ + int id; /* The worker id for logs */ + pthread_t worker; /* The thread */ + } *workers; /* array of cnf_thr_srv items */ }; -/* Client information (connecting peer for which we don't have the CER yet) */ -struct client { - struct fd_list chain; /* link in the server's list of clients */ - struct cnxctx *conn; /* Parameters of the connection */ - struct timespec ts; /* Deadline for receiving CER (after INCNX_TIMEOUT) */ - pthread_t thr; /* connection state machine */ -}; - - - /* Micro functions to read/change the status thread-safely */ static pthread_mutex_t s_lock = PTHREAD_MUTEX_INITIALIZER; static enum s_state get_status(struct server * s) @@ -91,11 +85,17 @@ } +/* dump one item of the server->pending fifo */ +static DECLARE_FD_DUMP_PROTOTYPE(dump_cnx, void * item) { + struct cnxctx * c = item; + CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " '%s'", fd_cnx_getid(c)), return NULL); + return *buf; +} /* Dump all servers information */ DECLARE_FD_DUMP_PROTOTYPE(fd_servers_dump, int details) { - struct fd_list * li, *cli; + struct fd_list * li; FD_DUMP_HANDLE_OFFSET(); @@ -112,13 +112,7 @@ ((st == TERMINATED) ? "Thread terminated" : "Thread status unknown"))), return NULL); /* Dump the client list of this server */ - CHECK_POSIX_DO( pthread_mutex_lock(&s->clients_mtx), ); - for (cli = s->clients.next; cli != &s->clients; cli = cli->next) { - struct client * c = (struct client *)cli; - char bufts[128]; - CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n {client}(@%p)'%s': to:%s", c, fd_cnx_getid(c->conn), fd_log_time(&c->ts, bufts, sizeof(bufts))), break); - } - CHECK_POSIX_DO( pthread_mutex_unlock(&s->clients_mtx), ); + CHECK_MALLOC_DO( fd_fifo_dump(FD_DUMP_STD_PARAMS, "pending connections", s->pending, dump_cnx), return NULL ); if (li->next != &FD_SERVERS) { CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n"), return NULL); @@ -133,72 +127,83 @@ } -/* The state machine to handle incoming connection before the remote peer is identified */ -static void * client_sm(void * arg) +/* The thread in the pool for handling new clients connecting to a server */ +static void * client_worker(void * arg) { - struct client * c = arg; - struct server * s = NULL; + struct pool_workers * pw = arg; + struct server * s = pw->s; + struct cnxctx * c = NULL; + int fatal = 0; + struct timespec ts; struct fd_cnx_rcvdata rcv_data; struct fd_msg_pmdl * pmdl = NULL; struct msg * msg = NULL; struct msg_hdr *hdr = NULL; struct fd_pei pei; - TRACE_ENTRY("%p", c); + TRACE_ENTRY("%p", arg); + /* Set the thread name */ + { + char buf[48]; + snprintf(buf, sizeof(buf), "Worker#%d[%s%s]", pw->id, IPPROTO_NAME(s->proto), s->secur?", Sec" : ""); + fd_log_threadname ( buf ); + } + + /* Loop until canceled / error */ +next_client: + LOG_A("Ready to process next incoming connection"); + memset(&rcv_data, 0, sizeof(rcv_data)); - CHECK_PARAMS_DO(c && c->conn && c->chain.head, goto fatal_error ); - - - s = c->chain.head->o; - - /* Name the current thread */ - fd_log_threadname ( fd_cnx_getid(c->conn) ); - + /* Get the next connection */ + CHECK_FCT_DO( fd_fifo_get( s->pending, &c ), { fatal = 1; goto cleanup; } ); + /* Handshake if we are a secure server port, or start clear otherwise */ if (s->secur) { - int ret = fd_cnx_handshake(c->conn, GNUTLS_SERVER, (s->secur == 1) ? ALGO_HANDSHAKE_DEFAULT : ALGO_HANDSHAKE_3436, NULL, NULL); + LOG_D("Starting handshake with %s", fd_cnx_getid(c)); + + int ret = fd_cnx_handshake(c, GNUTLS_SERVER, (s->secur == 1) ? ALGO_HANDSHAKE_DEFAULT : ALGO_HANDSHAKE_3436, NULL, NULL); if (ret != 0) { char buf[1024]; - snprintf(buf, sizeof(buf), "TLS handshake failed for client '%s', connection aborted.", fd_cnx_getid(c->conn)); - + snprintf(buf, sizeof(buf), "TLS handshake failed for connection '%s', connection closed.", fd_cnx_getid(c)); + fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL); - + goto cleanup; } } else { - CHECK_FCT_DO( fd_cnx_start_clear(c->conn, 0), goto cleanup ); + CHECK_FCT_DO( fd_cnx_start_clear(c, 0), goto cleanup ); } /* Set the timeout to receive the first message */ - CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &c->ts), goto fatal_error ); - c->ts.tv_sec += INCNX_TIMEOUT; + CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &ts), { fatal = 1; goto cleanup; } ); + ts.tv_sec += INCNX_TIMEOUT; /* Receive the first Diameter message on the connection -- cleanup in case of timeout */ - CHECK_FCT_DO( fd_cnx_receive(c->conn, &c->ts, &rcv_data.buffer, &rcv_data.length), + CHECK_FCT_DO( fd_cnx_receive(c, &ts, &rcv_data.buffer, &rcv_data.length), { char buf[1024]; switch (__ret__) { case ETIMEDOUT: - snprintf(buf, sizeof(buf), "Client '%s' did not send CER within %ds, connection aborted.", fd_cnx_getid(c->conn), INCNX_TIMEOUT); + snprintf(buf, sizeof(buf), "Client '%s' did not send CER within %ds, connection aborted.", fd_cnx_getid(c), INCNX_TIMEOUT); fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL); break; case ENOTCONN: - snprintf(buf, sizeof(buf), "Connection from '%s' in error before CER was received.", fd_cnx_getid(c->conn)); + snprintf(buf, sizeof(buf), "Connection from '%s' in error before CER was received.", fd_cnx_getid(c)); fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL); break; default: - snprintf(buf, sizeof(buf), "Connection from '%s': unspecified error, connection aborted.", fd_cnx_getid(c->conn)); + snprintf(buf, sizeof(buf), "Connection from '%s': unspecified error, connection aborted.", fd_cnx_getid(c)); fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL); } goto cleanup; } ); - TRACE_DEBUG(FULL, "Received %zdb from new client '%s'", rcv_data.length, fd_cnx_getid(c->conn)); + TRACE_DEBUG(FULL, "Received %zdb from new client '%s'", rcv_data.length, fd_cnx_getid(c)); pmdl = fd_msg_pmdl_get_inbuf(rcv_data.buffer, rcv_data.length); @@ -211,7 +216,7 @@ /* Log incoming message */ fd_hook_associate(msg, pmdl); - fd_hook_call(HOOK_MESSAGE_RECEIVED, msg, NULL, fd_cnx_getid(c->conn), fd_msg_pmdl_get(msg)); + fd_hook_call(HOOK_MESSAGE_RECEIVED, msg, NULL, fd_cnx_getid(c), fd_msg_pmdl_get(msg)); /* We expect a CER, it must parse with our dictionary and rules */ CHECK_FCT_DO( fd_msg_parse_rules( msg, fd_g_config->cnf_dict, &pei ), @@ -220,57 +225,52 @@ fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msg, NULL, pei.pei_message ?: pei.pei_errcode, fd_msg_pmdl_get(msg)); - snprintf(buf, sizeof(buf), "Error parsing CER from '%s', connection aborted.", fd_cnx_getid(c->conn)); + snprintf(buf, sizeof(buf), "Error parsing CER from '%s', connection aborted.", fd_cnx_getid(c)); fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL); goto cleanup; } ); /* Now check we received a CER */ - CHECK_FCT_DO( fd_msg_hdr ( msg, &hdr ), goto fatal_error ); + CHECK_FCT_DO( fd_msg_hdr ( msg, &hdr ), { fatal = 1; goto cleanup; } ); CHECK_PARAMS_DO( (hdr->msg_appl == 0) && (hdr->msg_flags & CMD_FLAG_REQUEST) && (hdr->msg_code == CC_CAPABILITIES_EXCHANGE), { /* Parsing failed -- trace details */ char buf[1024]; - snprintf(buf, sizeof(buf), "Expected CER from '%s', received a different message, connection aborted.", fd_cnx_getid(c->conn)); + snprintf(buf, sizeof(buf), "Expected CER from '%s', received a different message, connection aborted.", fd_cnx_getid(c)); fd_hook_call(HOOK_PEER_CONNECT_FAILED, msg, NULL, buf, NULL); goto cleanup; } ); - /* Finally, pass the information to the peers module which will handle it next */ - pthread_cleanup_push((void *)fd_cnx_destroy, c->conn); + /* Finally, pass the information to the peers module which will handle it in a separate thread */ + pthread_cleanup_push((void *)fd_cnx_destroy, c); pthread_cleanup_push((void *)fd_msg_free, msg); - CHECK_FCT_DO( fd_peer_handle_newCER( &msg, &c->conn ), ); + CHECK_FCT_DO( fd_peer_handle_newCER( &msg, &c ), ); pthread_cleanup_pop(0); pthread_cleanup_pop(0); - - /* The end, we cleanup the client structure */ + cleanup: - /* Unlink the client structure */ - CHECK_POSIX_DO( pthread_mutex_lock(&s->clients_mtx), goto fatal_error ); - fd_list_unlink( &c->chain ); - CHECK_POSIX_DO( pthread_mutex_unlock(&s->clients_mtx), goto fatal_error ); - /* Cleanup the parsed message if any */ if (msg) { CHECK_FCT_DO( fd_msg_free(msg), /* continue */); + msg = NULL; } - /* Destroy the connection object if present */ - if (c->conn) - fd_cnx_destroy(c->conn); + /* Close the connection if needed */ + if (c != NULL) { + fd_cnx_destroy(c); + c = NULL; + } /* Cleanup the received buffer if any */ free(rcv_data.buffer); - /* Detach the thread, cleanup the client structure */ - pthread_detach(pthread_self()); - free(c); + + if (!fatal) + goto next_client; + + LOG_E("Worker thread exiting."); return NULL; - -fatal_error: /* This has effect to terminate the daemon */ - CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); - return NULL; -} +} /* The thread managing a server */ static void * serv_th(void * arg) @@ -279,41 +279,31 @@ CHECK_PARAMS_DO(s, goto error); fd_log_threadname ( fd_cnx_getid(s->conn) ); + set_status(s, RUNNING); /* Accept incoming connections */ CHECK_FCT_DO( fd_cnx_serv_listen(s->conn), goto error ); do { - struct client * c = NULL; struct cnxctx * conn = NULL; /* Wait for a new client or cancel */ - CHECK_MALLOC_DO( conn = fd_cnx_serv_accept(s->conn), goto error ); - - /* Create a client structure */ - CHECK_MALLOC_DO( c = malloc(sizeof(struct client)), goto error ); - memset(c, 0, sizeof(struct client)); - fd_list_init(&c->chain, c); - c->conn = conn; + CHECK_MALLOC_DO( conn = fd_cnx_serv_accept(s->conn), break ); - /* Save the client in the list */ - CHECK_POSIX_DO( pthread_mutex_lock( &s->clients_mtx ), goto error ); - fd_list_insert_before(&s->clients, &c->chain); - CHECK_POSIX_DO( pthread_mutex_unlock( &s->clients_mtx ), goto error ); - - /* Start the client thread */ - CHECK_POSIX_DO( pthread_create( &c->thr, NULL, client_sm, c ), goto error ); + /* Store this connection in the fifo for processing by the worker pool. Will block when the fifo is full */ + pthread_cleanup_push((void *)fd_cnx_destroy, conn); + CHECK_FCT_DO( fd_fifo_post( s->pending, &conn ), break ); + pthread_cleanup_pop(0); } while (1); - error: if (s) set_status(s, TERMINATED); /* Send error signal to the core */ LOG_F( "An error occurred in server module! Thread is terminating..."); - CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); + CHECK_FCT_DO(fd_core_shutdown(), ); return NULL; } @@ -323,6 +313,7 @@ static struct server * new_serv( int proto, int secur ) { struct server * new; + int i; /* New server structure */ CHECK_MALLOC_DO( new = malloc(sizeof(struct server)), return NULL ); @@ -331,8 +322,16 @@ fd_list_init(&new->chain, new); new->proto = proto; new->secur = secur; - CHECK_POSIX_DO( pthread_mutex_init(&new->clients_mtx, NULL), return NULL ); - fd_list_init(&new->clients, new); + + CHECK_FCT_DO( fd_fifo_new(&new->pending, 5), return NULL); + CHECK_MALLOC_DO( new->workers = calloc( fd_g_config->cnf_thr_srv, sizeof(struct pool_workers) ), return NULL ); + + for (i = 0; i < fd_g_config->cnf_thr_srv; i++) { + /* Create the pool */ + new->workers[i].s = new; + new->workers[i].id = i; + CHECK_POSIX_DO( pthread_create( &new->workers[i].worker, NULL, client_worker, &new->workers[i]), return NULL ); + } return new; } @@ -478,9 +477,8 @@ /* Loop on all servers */ while (!FD_IS_LIST_EMPTY(&FD_SERVERS)) { struct server * s = (struct server *)(FD_SERVERS.next); - - /* Lock client list now */ - CHECK_FCT_DO( pthread_mutex_lock(&s->clients_mtx), /* continue anyway */); + int i; + struct cnxctx * c; /* cancel thread */ CHECK_FCT_DO( fd_thr_term(&s->thr), /* continue */); @@ -488,23 +486,18 @@ /* destroy server connection context */ fd_cnx_destroy(s->conn); - /* cancel and destroy all clients */ - while (!FD_IS_LIST_EMPTY(&s->clients)) { - struct client * c = (struct client *)(s->clients.next); - - /* Destroy client's thread */ - CHECK_FCT_DO( fd_thr_term(&c->thr), /* continue */); - - /* Destroy client's connection */ - fd_cnx_destroy(c->conn); - - /* Unlink and free the client */ - fd_list_unlink(&c->chain); - free(c); + /* cancel and destroy all worker threads */ + for (i = 0; i < fd_g_config->cnf_thr_srv; i++) { + /* Destroy worker thread */ + CHECK_FCT_DO( fd_thr_term(&s->workers[i].worker), /* continue */); } - /* Unlock & destroy */ - CHECK_FCT_DO( pthread_mutex_unlock(&s->clients_mtx), /* continue anyway */); - CHECK_FCT_DO( pthread_mutex_destroy(&s->clients_mtx), /* continue */); + free(s->workers); + + /* Close any pending connection */ + while ( fd_fifo_tryget( s->pending, &c ) == 0 ) { + fd_cnx_destroy(c); + } + CHECK_FCT_DO( fd_fifo_del(&s->pending), ); /* Now destroy the server object */ fd_list_unlink(&s->chain);