diff freeDiameter/cnxctx.c @ 25:67ca08d5bc48

Completed connection context files
author Sebastien Decugis <sdecugis@nict.go.jp>
date Mon, 26 Oct 2009 16:00:49 +0900
parents bd83ce9328ed
children b4684b76c6ab
line wrap: on
line diff
--- a/freeDiameter/cnxctx.c	Wed Oct 21 18:42:45 2009 +0900
+++ b/freeDiameter/cnxctx.c	Mon Oct 26 16:00:49 2009 +0900
@@ -34,6 +34,12 @@
 *********************************************************************************************************/
 
 #include "fD.h"
+#include "cnxctx.h"
+
+/* The maximum size of Diameter message we accept to receive (<= 2^24) to avoid too big mallocs in case of trashed headers */
+#ifndef DIAMETER_MSG_SIZE_MAX
+#define DIAMETER_MSG_SIZE_MAX	65535	/* in bytes */
+#endif /* DIAMETER_MSG_SIZE_MAX */
 
 /* Connections contexts (cnxctx) in freeDiameter are wrappers around the sockets and TLS operations .
  * They are used to hide the details of the processing to the higher layers of the daemon.
@@ -56,7 +62,7 @@
  *    - otherwise to receive clear messages, call fd_cnx_start_clear. fd_cnx_handshake can be called later.
  *
  * 3) Usage
- *    - fd_cnx_receive, fd_cnx_send : exchange messages on this connection (send is synchronous, receive is not).
+ *    - fd_cnx_receive, fd_cnx_send : exchange messages on this connection (send is synchronous, receive is not, but blocking).
  *    - fd_cnx_recv_setaltfifo : when a message is received, the event is sent to an external fifo list. fd_cnx_receive does not work when the alt_fifo is set.
  *    - fd_cnx_getid : retrieve a descriptive string for the connection (for debug)
  *    - fd_cnx_getremoteid : identification of the remote peer (IP address or fqdn)
@@ -67,42 +73,10 @@
  *    - fd_cnx_destroy
  */
 
-/* The connection context structure */
-struct cnxctx {
-	char		cc_id[60];	/* The name of this connection */
-	char		cc_remid[60];	/* Id of remote peer */
 
-	int 		cc_socket;	/* The socket object of the connection -- <=0 if no socket is created */
-
-	int 		cc_proto;	/* IPPROTO_TCP or IPPROTO_SCTP */
-	int		cc_tls;		/* Is TLS already started ? */
-
-	struct fifo *	cc_events;	/* Events occuring on the connection */
-	pthread_t	cc_mgr;		/* manager thread for the connection */
-	struct fifo *	cc_incoming;	/* FIFO queue of messages received on the connection */
-	struct fifo *	cc_alt;		/* alternate fifo to send FDEVP_CNX_MSG_RECV events to. */
-
-	/* If cc_proto == SCTP */
-	struct	{
-		int		str_out;/* Out streams */
-		int		str_in;	/* In streams */
-		int		pairs;	/* max number of pairs ( = min(in, out)) */
-		int		next;	/* # of stream the next message will be sent to */
-	} 		cc_sctp_para;
-
-	/* If cc_tls == true */
-	struct {
-		int				 mode; 		/* GNUTLS_CLIENT / GNUTLS_SERVER */
-		gnutls_session_t 		 session;	/* Session object (stream #0 in case of SCTP) */
-	}		cc_tls_para;
-
-	/* If both conditions */
-	struct {
-		gnutls_session_t 		*res_sessions;	/* Sessions of other pairs of streams, resumed from the first */
-		/* Buffers, threads, ... */
-	}		cc_sctp_tls_para;
-};
-
+/*******************************************/
+/*     Creation of a connection object     */
+/*******************************************/
 
 /* Initialize a context structure */
 static struct cnxctx * fd_cnx_init(int full)
@@ -115,7 +89,6 @@
 	memset(conn, 0, sizeof(struct cnxctx));
 
 	if (full) {
-		CHECK_FCT_DO( fd_fifo_new ( &conn->cc_events ), return NULL );
 		CHECK_FCT_DO( fd_fifo_new ( &conn->cc_incoming ), return NULL );
 	}
 
@@ -351,7 +324,7 @@
 	return NULL;
 }
 
-/* Same for SCTP, accepts a list of remote addresses to connect to (see sctp_connectx) */
+/* Same for SCTP, accepts a list of remote addresses to connect to (see sctp_connectx for how they are used) */
 struct cnxctx * fd_cnx_cli_connect_sctp(int no_ip6, uint16_t port, struct fd_list * list)
 {
 #ifdef DISABLE_SCTP
@@ -381,6 +354,12 @@
 	else
 		cnx->cc_sctp_para.pairs = cnx->cc_sctp_para.str_in;
 	
+	if (TRACE_BOOL(INFO)) {
+		fd_log_debug("Connection established to server '");
+		sSA_DUMP_NODE_SERV( &primary, NI_NUMERICSERV);
+		fd_log_debug("' (SCTP:%d, %d/%d streams).\n", sock, cnx->cc_sctp_para.str_in, cnx->cc_sctp_para.str_out);
+	}
+	
 	/* Generate the names for the object */
 	{
 		char addrbuf[INET6_ADDRSTRLEN];
@@ -402,12 +381,6 @@
 			snprintf(cnx->cc_remid, sizeof(cnx->cc_remid), "[err:%s]", gai_strerror(rc));
 	}
 	
-	if (TRACE_BOOL(INFO)) {
-		fd_log_debug("Connection established to server '");
-		sSA_DUMP_NODE_SERV( &primary, NI_NUMERICSERV);
-		fd_log_debug("' (SCTP:%d).\n", sock);
-	}
-	
 	return cnx;
 
 error:
@@ -423,102 +396,6 @@
 	return conn->cc_id;
 }
 
-/* Start receving messages in clear (no TLS) on the connection */
-int fd_cnx_start_clear(struct cnxctx * conn)
-{
-
-	TODO("...");
-	return ENOTSUP;
-}
-
-/* TLS handshake a connection; no need to have called start_clear before. Reception is active if handhsake is successful */
-int fd_cnx_handshake(struct cnxctx * conn, int mode, char * priority)
-{
-	TRACE_ENTRY( "%p %d", conn, mode);
-	CHECK_PARAMS( conn && ( (mode == GNUTLS_CLIENT) || (mode == GNUTLS_SERVER) ) );
-
-	/* Save the mode */
-	conn->cc_tls_para.mode = mode;
-
-	/* Create the master session context */
-	CHECK_GNUTLS_DO( gnutls_init (&conn->cc_tls_para.session, mode), return ENOMEM );
-
-	/* Set the algorithm suite */
-	TODO("Use overwrite priority if non NULL");
-	CHECK_GNUTLS_DO( gnutls_priority_set( conn->cc_tls_para.session, fd_g_config->cnf_sec_data.prio_cache ), return EINVAL );
-
-	/* Set the credentials of this side of the connection */
-	CHECK_GNUTLS_DO( gnutls_credentials_set (conn->cc_tls_para.session, GNUTLS_CRD_CERTIFICATE, fd_g_config->cnf_sec_data.credentials), return EINVAL );
-
-	/* Request the remote credentials as well */
-	if (mode == GNUTLS_SERVER) {
-		gnutls_certificate_server_set_request (conn->cc_tls_para.session, GNUTLS_CERT_REQUIRE);
-	}
-
-	/* Set the socket info in the session */
-	gnutls_transport_set_ptr (conn->cc_tls_para.session, (gnutls_transport_ptr_t) conn->cc_socket);
-
-	/* Special case: multi-stream TLS is not natively managed in GNU TLS, we use a wrapper library */
-	if ((conn->cc_proto == IPPROTO_SCTP) && (conn->cc_sctp_para.pairs > 0)) {
-#ifndef DISABLE_SCTP
-		TODO("Initialize the SCTP TLS wrapper");
-		TODO("Set the lowat, push and pull functions");
-#else /* DISABLE_SCTP */
-		ASSERT(0);
-#endif /* DISABLE_SCTP */
-	}
-
-	/* Handshake master session */
-	{
-		int ret;
-		CHECK_GNUTLS_DO( ret = gnutls_handshake(conn->cc_tls_para.session),
-			{
-				if (TRACE_BOOL(INFO)) {
-					fd_log_debug("TLS Handshake failed on socket %d (%s) : %s\n", conn->cc_socket, conn->cc_id, gnutls_strerror(ret));
-				}
-				return EINVAL;
-			} );
-
-		/* Now verify the remote credentials are valid -- only simple test here */
-		CHECK_GNUTLS_DO( gnutls_certificate_verify_peers2 (conn->cc_tls_para.session, &ret), return EINVAL );
-		if (ret) {
-			if (TRACE_BOOL(INFO)) {
-				fd_log_debug("TLS: Remote certificate invalid on socket %d (%s) :\n", conn->cc_socket, conn->cc_id);
-				if (ret & GNUTLS_CERT_INVALID)
-					fd_log_debug(" - The certificate is not trusted (unknown CA?)\n");
-				if (ret & GNUTLS_CERT_REVOKED)
-					fd_log_debug(" - The certificate has been revoked.\n");
-				if (ret & GNUTLS_CERT_SIGNER_NOT_FOUND)
-					fd_log_debug(" - The certificate hasn't got a known issuer.\n");
-				if (ret & GNUTLS_CERT_SIGNER_NOT_CA)
-					fd_log_debug(" - The certificate signer is not a CA, or uses version 1, or 3 without basic constraints.\n");
-				if (ret & GNUTLS_CERT_INSECURE_ALGORITHM)
-					fd_log_debug(" - The certificate signature uses a weak algorithm.\n");
-			}
-			return EINVAL;
-		}
-	}
-
-	/* Other sessions in case of multi-stream SCTP are resumed from the master */
-	if ((conn->cc_proto == IPPROTO_SCTP) && (conn->cc_sctp_para.pairs > 0)) {
-#ifndef DISABLE_SCTP
-		TODO("Init and resume all additional sessions from the master one.");
-#endif /* DISABLE_SCTP */
-	}
-
-	TODO("Start the connection state machine thread");
-
-	return 0;
-}
-
-/* Retrieve TLS credentials of the remote peer, after handshake */
-int fd_cnx_getcred(struct cnxctx * conn, const gnutls_datum_t **cert_list, unsigned int *cert_list_size)
-{
-
-	TODO("...");
-	return ENOTSUP;
-}
-
 /* Get the list of endpoints (IP addresses) of the local and remote peers on this connection */
 int fd_cnx_getendpoints(struct cnxctx * conn, struct fd_list * local, struct fd_list * remote)
 {
@@ -550,7 +427,7 @@
 	
 	if (remote) {
 		/* Check we have a full connection object, not a listening socket (with no remote) */
-		CHECK_PARAMS( conn->cc_events );
+		CHECK_PARAMS( conn->cc_incoming );
 		
 		/* Retrieve the peer endpoint(s) of the connection */
 		switch (conn->cc_proto) {
@@ -586,35 +463,507 @@
 }
 
 
-/* Receive next message. if timeout is not NULL, wait only until timeout */
-int fd_cnx_receive(struct cnxctx * conn, struct timespec * timeout, unsigned char **buf, size_t * len)
+/**************************************/
+/*     Use of a connection object     */
+/**************************************/
+
+/* Receiver thread (TCP & noTLS) : incoming message is directly saved into the target queue */
+static void * rcvthr_notls_tcp(void * arg)
 {
+	struct cnxctx * conn = arg;
+	
+	TRACE_ENTRY("%p", arg);
+	
+	CHECK_PARAMS_DO(conn && (conn->cc_socket > 0), goto out);
+	ASSERT( conn->cc_proto == IPPROTO_TCP );
+	ASSERT( conn->cc_tls == 0 );
+	ASSERT( Target_Queue(conn) );
+	
+	/* Receive from a TCP connection: we have to rebuild the message boundaries */
+	do {
+		uint8_t header[4];
+		uint8_t * newmsg;
+		size_t  length;
+		ssize_t ret = 0;
+		size_t	received = 0;
 
-	TODO("...");
-	return ENOTSUP;
+		do {
+			ret = recv(conn->cc_socket, &header[received], sizeof(header) - received, 0);
+			if (ret <= 0) {
+				CHECK_SYS_DO(ret, /* continue */);
+				goto error; /* Stop the thread, the recipient of the event will cleanup */
+			}
+
+			received += ret;
+		} while (received < sizeof(header));
+
+		length = ((size_t)header[1] << 16) + ((size_t)header[2] << 8) + (size_t)header[3];
+
+		/* Check the received word is a valid begining of a Diameter message */
+		if ((header[0] != DIAMETER_VERSION)	/* defined in <libfreeDiameter.h> */
+		   || (length > DIAMETER_MSG_SIZE_MAX)) { /* to avoid too big mallocs */
+			/* The message is suspect */
+			TRACE_DEBUG(INFO, "Received suspect header [ver: %d, size: %g], assume disconnection", (int)header[0], length);
+			goto error; /* Stop the thread, the recipient of the event will cleanup */
+		}
+
+		/* Ok, now we can really receive the data */
+		CHECK_MALLOC_DO(  newmsg = malloc( length ), goto error );
+		memcpy(newmsg, header, sizeof(header));
+
+		while (received < length) {
+			pthread_cleanup_push(free, newmsg); /* In case we are canceled, clean the partialy built buffer */
+			ret = recv(conn->cc_socket, newmsg + received, length - received, 0);
+			pthread_cleanup_pop(0);
+
+			if (ret <= 0) {
+				CHECK_SYS_DO(ret, /* continue */);
+				free(newmsg);
+				goto error; /* Stop the thread, the recipient of the event will cleanup */
+			}
+			received += ret;
+		}
+		
+		/* We have received a complete message, send it */
+		CHECK_FCT_DO( fd_event_send( Target_Queue(conn), FDEVP_CNX_MSG_RECV, length, newmsg), /* continue or destroy everything? */);
+		
+	} while (conn->cc_loop);
+	
+out:
+	TRACE_DEBUG(FULL, "Thread terminated");	
+	return NULL;
+error:
+	CHECK_FCT_DO( fd_event_send( Target_Queue(conn), FDEVP_CNX_ERROR, 0, NULL), /* continue or destroy everything? */);
+	goto out;
 }
 
-/* Set / reset alternate FIFO list to send FDEVP_CNX_MSG_RECV to when message is received */
-int fd_cnx_recv_setaltfifo(struct cnxctx * conn, struct fifo * alt_fifo)
+#ifndef DISABLE_SCTP
+/* Receiver thread (SCTP & noTLS) : incoming message is directly saved into cc_incoming, no need to care for the stream ID */
+static void * rcvthr_notls_sctp(void * arg)
+{
+	struct cnxctx * conn = arg;
+	uint8_t * buf;
+	size_t    bufsz;
+	int	  event;
+	
+	TRACE_ENTRY("%p", arg);
+	
+	CHECK_PARAMS_DO(conn && (conn->cc_socket > 0), goto out);
+	ASSERT( conn->cc_proto == IPPROTO_SCTP );
+	ASSERT( conn->cc_tls == 0 );
+	ASSERT( Target_Queue(conn) );
+	
+	do {
+		CHECK_FCT_DO( fd_sctp_recvmeta(conn->cc_socket, NULL, &buf, &bufsz, &event), goto error );
+		if (event == FDEVP_CNX_ERROR) {
+			goto error;
+		}
+		
+		CHECK_FCT_DO( fd_event_send( Target_Queue(conn), event, bufsz, buf), goto error );
+		
+	} while (conn->cc_loop);
+	
+out:
+	TRACE_DEBUG(FULL, "Thread terminated");	
+	return NULL;
+error:
+	CHECK_FCT_DO( fd_event_send( Target_Queue(conn), FDEVP_CNX_ERROR, 0, NULL), /* continue or destroy everything? */);
+	goto out;
+}
+#endif /* DISABLE_SCTP */
+
+/* Returns 0 on error, received data size otherwise (always >= 0) */
+static ssize_t fd_tls_recv_handle_error(struct cnxctx * conn, gnutls_session_t session, void * data, size_t sz)
+{
+	ssize_t ret;
+again:	
+	CHECK_GNUTLS_DO( ret = gnutls_record_recv(session, data, sz),
+		{
+			switch (ret) {
+				case GNUTLS_E_REHANDSHAKE: 
+					CHECK_GNUTLS_DO( ret = gnutls_handshake(session),
+						{
+							if (TRACE_BOOL(INFO)) {
+								fd_log_debug("TLS re-handshake failed on socket %d (%s) : %s\n", conn->cc_socket, conn->cc_id, gnutls_strerror(ret));
+							}
+							ret = 0;
+							goto end;
+						} );
+
+				case GNUTLS_E_AGAIN:
+				case GNUTLS_E_INTERRUPTED:
+					goto again;
+
+				default:
+					TRACE_DEBUG(INFO, "This TLS error is not handled, assume unrecoverable error");
+					ret = 0;
+			}
+		} );
+end:	
+	return ret;
+}
+
+/* The function that receives TLS data and re-builds a Diameter message -- it exits only on error or cancelation */
+int fd_tls_rcvthr_core(struct cnxctx * conn, gnutls_session_t session)
 {
-	TRACE_ENTRY( "%p %p", conn, alt_fifo );
-	CHECK_PARAMS( conn );
+	/* No guaranty that GnuTLS preserves the message boundaries, so we re-build it as in TCP */
+	do {
+		uint8_t header[4];
+		uint8_t * newmsg;
+		size_t  length;
+		ssize_t ret = 0;
+		size_t	received = 0;
+
+		do {
+			ret = fd_tls_recv_handle_error(conn, conn->cc_tls_para.session, &header[received], sizeof(header) - received);
+			if (ret == 0) {
+				/* The connection is closed */
+				goto out;
+			}
+			received += ret;
+		} while (received < sizeof(header));
+
+		length = ((size_t)header[1] << 16) + ((size_t)header[2] << 8) + (size_t)header[3];
+
+		/* Check the received word is a valid beginning of a Diameter message */
+		if ((header[0] != DIAMETER_VERSION)	/* defined in <libfreeDiameter.h> */
+		   || (length > DIAMETER_MSG_SIZE_MAX)) { /* to avoid too big mallocs */
+			/* The message is suspect */
+			TRACE_DEBUG(INFO, "Received suspect header [ver: %d, size: %g], assume disconnection", (int)header[0], length);
+			goto out;
+		}
+
+		/* Ok, now we can really receive the data */
+		CHECK_MALLOC(  newmsg = malloc( length ) );
+		memcpy(newmsg, header, sizeof(header));
+
+		while (received < length) {
+			pthread_cleanup_push(free, newmsg); /* In case we are canceled, clean the partialy built buffer */
+			ret = fd_tls_recv_handle_error(conn, conn->cc_tls_para.session, newmsg + received, length - received);
+			pthread_cleanup_pop(0);
+
+			if (ret == 0) {
+				free(newmsg);
+				goto out; /* Stop the thread, the recipient of the event will cleanup */
+			}
+			received += ret;
+		}
+		
+		/* We have received a complete message, send it */
+		CHECK_FCT_DO( fd_event_send( Target_Queue(conn), FDEVP_CNX_MSG_RECV, length, newmsg), /* continue or destroy everything? */);
+		
+	} while (1);
+out:
+	return ENOTCONN;
+}
+
+/* Receiver thread (TLS & 1 stream SCTP or TCP) : gnutls directly handles the socket, save records into the target queue */
+static void * rcvthr_tls_single(void * arg)
+{
+	struct cnxctx * conn = arg;
+	
+	TRACE_ENTRY("%p", arg);
+	
+	CHECK_PARAMS_DO(conn && (conn->cc_socket > 0), goto error);
+	ASSERT( conn->cc_tls == 1 );
+	ASSERT( Target_Queue(conn) );
 	
-	/* Let's cross fingers that there is no race condition here... */
-	conn->cc_alt = alt_fifo;
+	CHECK_FCT_DO(fd_tls_rcvthr_core(conn, conn->cc_tls_para.session), /* continue */);
+error:
+	CHECK_FCT_DO( fd_event_send( Target_Queue(conn), FDEVP_CNX_ERROR, 0, NULL), /* continue or destroy everything? */);
+	TRACE_DEBUG(FULL, "Thread terminated");	
+	return NULL;
+}
+
+/* Start receving messages in clear (no TLS) on the connection */
+int fd_cnx_start_clear(struct cnxctx * conn, int loop)
+{
+	TRACE_ENTRY("%p %i", conn, loop);
+	
+	CHECK_PARAMS( conn && Target_Queue(conn) && (!conn->cc_tls) && (!conn->cc_loop));
+	
+	/* Save the loop request */
+	conn->cc_loop = loop;
+	
+	/* Release resources in case of a previous call was already made */
+	CHECK_FCT_DO( fd_thr_term(&conn->cc_rcvthr), /* continue */);
+	
+	switch (conn->cc_proto) {
+		case IPPROTO_TCP:
+			/* Start the tcp_notls thread */
+			CHECK_POSIX( pthread_create( &conn->cc_rcvthr, NULL, rcvthr_notls_tcp, conn ) );
+			break;
+#ifndef DISABLE_SCTP
+		case IPPROTO_SCTP:
+			/* Start the tcp_notls thread */
+			CHECK_POSIX( pthread_create( &conn->cc_rcvthr, NULL, rcvthr_notls_sctp, conn ) );
+			break;
+#endif /* DISABLE_SCTP */
+		default:
+			TRACE_DEBUG(INFO, "Unknown protocol: %d", conn->cc_proto);
+			return ENOTSUP;
+	}
+			
+	return 0;
+}
+
+/* Prepare a gnutls session object for handshake */
+int fd_tls_prepare(gnutls_session_t * session, int mode, char * priority)
+{
+	/* Create the master session context */
+	CHECK_GNUTLS_DO( gnutls_init (session, mode), return ENOMEM );
+
+	/* Set the algorithm suite */
+	if (priority) {
+		const char * errorpos;
+		CHECK_GNUTLS_DO( gnutls_priority_set_direct( *session, priority, &errorpos ), 
+			{ TRACE_DEBUG(INFO, "Error in priority string '%s' at position: '%s'\n", priority, errorpos); return EINVAL; } );
+	} else {
+		CHECK_GNUTLS_DO( gnutls_priority_set( *session, fd_g_config->cnf_sec_data.prio_cache ), return EINVAL );
+	}
+
+	/* Set the credentials of this side of the connection */
+	CHECK_GNUTLS_DO( gnutls_credentials_set (*session, GNUTLS_CRD_CERTIFICATE, fd_g_config->cnf_sec_data.credentials), return EINVAL );
+
+	/* Request the remote credentials as well */
+	if (mode == GNUTLS_SERVER) {
+		gnutls_certificate_server_set_request (*session, GNUTLS_CERT_REQUIRE);
+	}
 	
 	return 0;
 }
 
-/* Send a message */
+/* TLS handshake a connection; no need to have called start_clear before. Reception is active if handhsake is successful */
+int fd_cnx_handshake(struct cnxctx * conn, int mode, char * priority)
+{
+	TRACE_ENTRY( "%p %d", conn, mode);
+	CHECK_PARAMS( conn && (!conn->cc_tls) && ( (mode == GNUTLS_CLIENT) || (mode == GNUTLS_SERVER) ) && (!conn->cc_loop) );
+
+	/* Save the mode */
+	conn->cc_tls_para.mode = mode;
+	
+	/* Cancel receiving thread if any -- it should already be terminated anyway, we just release the resources */
+	CHECK_FCT_DO( fd_thr_term(&conn->cc_rcvthr), /* continue */);
+	
+	/* Once TLS handshake is done, we don't stop after the first message */
+	conn->cc_loop = 1;
+	
+	/* Prepare the master session credentials and priority */
+	CHECK_FCT( fd_tls_prepare(&conn->cc_tls_para.session, mode, priority) );
+
+	/* Special case: multi-stream TLS is not natively managed in GNU TLS, we use a wrapper library */
+	if (conn->cc_sctp_para.pairs > 1) {
+#ifdef DISABLE_SCTP
+		ASSERT(0);
+		CHECK_FCT( ENOTSUP );
+#else /* DISABLE_SCTP */
+		/* Initialize the wrapper, start the demux thread */
+		CHECK_FCT( fd_sctps_init(conn) );
+#endif /* DISABLE_SCTP */
+	} else {
+		/* Set the socket info in the session */
+		gnutls_transport_set_ptr (conn->cc_tls_para.session, (gnutls_transport_ptr_t) conn->cc_socket);
+	}
+
+	/* Handshake master session */
+	{
+		int ret;
+		CHECK_GNUTLS_DO( ret = gnutls_handshake(conn->cc_tls_para.session),
+			{
+				if (TRACE_BOOL(INFO)) {
+					fd_log_debug("TLS Handshake failed on socket %d (%s) : %s\n", conn->cc_socket, conn->cc_id, gnutls_strerror(ret));
+				}
+				return EINVAL;
+			} );
+
+		/* Now verify the remote credentials are valid -- only simple test here */
+		CHECK_GNUTLS_DO( gnutls_certificate_verify_peers2 (conn->cc_tls_para.session, &ret), return EINVAL );
+		if (ret) {
+			if (TRACE_BOOL(INFO)) {
+				fd_log_debug("TLS: Remote certificate invalid on socket %d (%s) :\n", conn->cc_socket, conn->cc_id);
+				if (ret & GNUTLS_CERT_INVALID)
+					fd_log_debug(" - The certificate is not trusted (unknown CA?)\n");
+				if (ret & GNUTLS_CERT_REVOKED)
+					fd_log_debug(" - The certificate has been revoked.\n");
+				if (ret & GNUTLS_CERT_SIGNER_NOT_FOUND)
+					fd_log_debug(" - The certificate hasn't got a known issuer.\n");
+				if (ret & GNUTLS_CERT_SIGNER_NOT_CA)
+					fd_log_debug(" - The certificate signer is not a CA, or uses version 1, or 3 without basic constraints.\n");
+				if (ret & GNUTLS_CERT_INSECURE_ALGORITHM)
+					fd_log_debug(" - The certificate signature uses a weak algorithm.\n");
+			}
+			return EINVAL;
+		}
+	}
+
+	/* Multi-stream TLS: handshake other streams as well */
+	if (conn->cc_sctp_para.pairs > 1) {
+#ifndef DISABLE_SCTP
+		/* Resume all additional sessions from the master one. */
+		CHECK_FCT(fd_sctps_handshake_others(conn, priority));
+		
+		/* Start decrypting the messages from all threads and queuing them in target queue */
+		CHECK_FCT(fd_sctps_startthreads(conn));
+#endif /* DISABLE_SCTP */
+	} else {
+		/* Start decrypting the data */
+		CHECK_POSIX( pthread_create( &conn->cc_rcvthr, NULL, rcvthr_tls_single, conn ) );
+	}
+
+	return 0;
+}
+
+/* Retrieve TLS credentials of the remote peer, after handshake */
+int fd_cnx_getcred(struct cnxctx * conn, const gnutls_datum_t **cert_list, unsigned int *cert_list_size)
+{
+	TRACE_ENTRY("%p %p %p", conn, cert_list, cert_list_size);
+	CHECK_PARAMS( conn && (conn->cc_tls) && cert_list && cert_list_size );
+	
+	/* This function only works for X.509 certificates. */
+	CHECK_PARAMS( gnutls_certificate_type_get (conn->cc_tls_para.session) == GNUTLS_CRT_X509 );
+	
+	*cert_list = gnutls_certificate_get_peers (conn->cc_tls_para.session, cert_list_size);
+	if (*cert_list == NULL) {
+		TRACE_DEBUG(INFO, "No certificate was provided by remote peer / an error occurred.");
+		return EINVAL;
+	}
+
+	TRACE_DEBUG( FULL, "Remote peer provided %d certificates.\n", *cert_list_size);
+	
+	return 0;
+}
+
+/* Receive next message. if timeout is not NULL, wait only until timeout. This function only pulls from a queue, mgr thread is filling that queue aynchrounously. */
+int fd_cnx_receive(struct cnxctx * conn, struct timespec * timeout, unsigned char **buf, size_t * len)
+{
+	int    ev;
+	size_t ev_sz;
+	void * ev_data;
+	
+	TRACE_ENTRY("%p %p %p %p", conn, timeout, buf, len);
+	CHECK_PARAMS(conn && (conn->cc_socket > 0) && buf && len);
+	CHECK_PARAMS(conn->cc_rcvthr != (pthread_t)NULL);
+	CHECK_PARAMS(conn->cc_alt == NULL);
+
+	/* Now, pull the first event */
+get_next:
+	if (timeout) {
+		CHECK_FCT( fd_event_timedget(conn->cc_incoming, timeout, FDEVP_PSM_TIMEOUT, &ev, &ev_sz, &ev_data) );
+	} else {
+		CHECK_FCT( fd_event_get(conn->cc_incoming, &ev, &ev_sz, &ev_data) );
+	}
+	
+	switch (ev) {
+		case FDEVP_CNX_MSG_RECV:
+			/* We got one */
+			*len = ev_sz;
+			*buf = ev_data;
+			return 0;
+			
+		case FDEVP_PSM_TIMEOUT:
+			TRACE_DEBUG(FULL, "Timeout event received");
+			return ETIMEDOUT;
+			
+		case FDEVP_CNX_EP_CHANGE:
+			/* We ignore this event */
+			goto get_next;
+			
+		case FDEVP_CNX_ERROR:
+			TRACE_DEBUG(FULL, "Received ERROR event on the connection");
+			return ENOTCONN;
+	}
+	
+	TRACE_DEBUG(INFO, "Received unexpected event %d (%s)", ev, fd_pev_str(ev));
+	return EINVAL;
+}
+
+/* Set an alternate FIFO list to send FDEVP_CNX_* events to */
+int fd_cnx_recv_setaltfifo(struct cnxctx * conn, struct fifo * alt_fifo)
+{
+	TRACE_ENTRY( "%p %p", conn, alt_fifo );
+	CHECK_PARAMS( conn && alt_fifo && conn->cc_incoming );
+	
+	/* The magic function does it all */
+	CHECK_FCT( fd_fifo_move( &conn->cc_incoming, alt_fifo, &conn->cc_alt ) );
+	
+	return 0;
+}
+
+/* Send function when no multi-stream is involved, or sending on stream #0 (send() always use stream 0)*/
+static int send_simple(struct cnxctx * conn, unsigned char * buf, size_t len)
+{
+	ssize_t ret;
+	size_t sent = 0;
+	TRACE_ENTRY("%p %p %g", conn, buf, len);
+	do {
+		if (conn->cc_tls) {
+			CHECK_GNUTLS_DO( ret = gnutls_record_send (conn->cc_tls_para.session, buf + sent, len - sent), return ENOTCONN );
+		} else {
+			CHECK_SYS( ret = send(conn->cc_socket, buf + sent, len - sent, 0) ); /* better to replace with sendmsg for atomic sending? */
+		}
+		sent += ret;
+	} while ( sent < len );
+	return 0;
+}
+
+/* Send a message -- this is synchronous -- and we assume it's never called by several threads at the same time, so we don't protect. */
 int fd_cnx_send(struct cnxctx * conn, unsigned char * buf, size_t len)
 {
+	TRACE_ENTRY("%p %p %g", conn, buf, len);
+	
+	CHECK_PARAMS(conn && (conn->cc_socket > 0) && buf && len);
 
-	TODO("...");
-	return ENOTSUP;
+	TRACE_DEBUG(FULL, "Sending %gb %sdata on connection %s", len, conn->cc_tls ? "TLS-protected ":"", conn->cc_id);
+	
+	switch (conn->cc_proto) {
+		case IPPROTO_TCP:
+			CHECK_FCT( send_simple(conn, buf, len) );
+			break;
+		
+#ifndef DISABLE_SCTP
+		case IPPROTO_SCTP: {
+			int multistr = 0;
+			
+			if ((conn->cc_sctp_para.str_out > 1) && ((! conn->cc_tls) || (conn->cc_sctp_para.pairs > 1)))  {
+				/* Update the id of the stream we will send this message on */
+				conn->cc_sctp_para.next += 1;
+				conn->cc_sctp_para.next %= (conn->cc_tls ? conn->cc_sctp_para.pairs : conn->cc_sctp_para.str_out);
+				multistr = 1;
+			}
+			
+			if ((!multistr) || (conn->cc_sctp_para.next == 0)) {
+				CHECK_FCT( send_simple(conn, buf, len) );
+			} else {
+				if (!conn->cc_tls) {
+					CHECK_FCT( fd_sctp_sendstr(conn->cc_socket, conn->cc_sctp_para.next, buf, len) );
+				} else {
+					/* push the record to the appropriate session */
+					ssize_t ret;
+					size_t sent = 0;
+					ASSERT(conn->cc_sctps_data.array != NULL);
+					do {
+						CHECK_GNUTLS_DO( ret = gnutls_record_send (conn->cc_sctps_data.array[conn->cc_sctp_para.next - 1].session, buf + sent, len - sent), { TODO("Handle error (re-handshake, etc.."); return ENOTCONN; } );
+						sent += ret;
+					} while ( sent < len );
+				}
+			}
+		}
+		break;
+#endif /* DISABLE_SCTP */
+	
+		default:
+			TRACE_DEBUG(INFO, "Unknwon protocol: %d", conn->cc_proto);
+			return ENOTSUP;	/* or EINVAL... */
+	}
+	
+	return 0;
 }
 
 
+/**************************************/
+/*     Destruction of connection      */
+/**************************************/
+
 /* Destroy a conn structure, and shutdown the socket */
 void fd_cnx_destroy(struct cnxctx * conn)
 {
@@ -622,22 +971,41 @@
 	
 	CHECK_PARAMS_DO(conn, return);
 
-	TODO("End TLS session(s) if started");
+	/* In case of TLS, stop receiver thread, then close properly the gnutls session */
+	if ((conn->cc_tls) && (conn->cc_sctp_para.pairs > 1)) {
+#ifndef DISABLE_SCTP
+		/* Multi-stream TLS: Stop all decipher threads, but not the demux thread */
+		fd_sctps_stopthreads(conn);
+#endif /* DISABLE_SCTP */
+	} else {
+		/* Stop the decoding thread */
+		CHECK_FCT_DO( fd_thr_term(&conn->cc_rcvthr), /* continue */ );
+	}
 	
-	TODO("Stop manager thread if running");
+	/* Terminate properly the TLS session(s) */
+	if (conn->cc_tls) {
+		/* Master session */
+		CHECK_GNUTLS_DO( gnutls_bye(conn->cc_tls_para.session, GNUTLS_SHUT_RDWR), /* Continue */ );
+		gnutls_deinit(conn->cc_tls_para.session);
+		
+#ifndef DISABLE_SCTP
+		if (conn->cc_sctp_para.pairs > 1) {
+			/* Multi-stream TLS: destroy the wrapper and stop the demux thread */
+			fd_sctps_destroy(conn);
+		}
+#endif /* DISABLE_SCTP */
+		
+	}
 	
 	/* Shut the connection down */
 	if (conn->cc_socket > 0) {
 		shutdown(conn->cc_socket, SHUT_RDWR);
 	}
 	
-	TODO("Empty FIFO queues");
-	
-	/* Destroy FIFO lists */
-	if (conn->cc_events)
-		CHECK_FCT_DO( fd_fifo_del ( &conn->cc_events ), /* continue */ );
-	if (conn->cc_incoming)
-		CHECK_FCT_DO( fd_fifo_del ( &conn->cc_incoming ), /* continue */ );
+	/* Empty and destroy FIFO list */
+	if (conn->cc_incoming) {
+		fd_event_destroy( &conn->cc_incoming, free );
+	}
 	
 	/* Free the object */
 	free(conn);
"Welcome to our mercurial repository"