changeset 25:67ca08d5bc48

Completed connection context files
author Sebastien Decugis <sdecugis@nict.go.jp>
date Mon, 26 Oct 2009 16:00:49 +0900
parents bd83ce9328ed
children b4684b76c6ab
files extensions/dbg_monitor/monitor.c freeDiameter/CMakeLists.txt freeDiameter/cnxctx.c freeDiameter/cnxctx.h freeDiameter/fD.h freeDiameter/main.c freeDiameter/p_expiry.c freeDiameter/p_psm.c freeDiameter/sctp.c freeDiameter/sctps.c freeDiameter/server.c freeDiameter/tcp.c include/freeDiameter/freeDiameter.h include/freeDiameter/libfreeDiameter.h libfreeDiameter/fifo.c libfreeDiameter/lists.c
diffstat 16 files changed, 1677 insertions(+), 255 deletions(-) [+]
line wrap: on
line diff
--- a/extensions/dbg_monitor/monitor.c	Wed Oct 21 18:42:45 2009 +0900
+++ b/extensions/dbg_monitor/monitor.c	Mon Oct 26 16:00:49 2009 +0900
@@ -48,9 +48,9 @@
 /* Function called on receipt of SIGUSR1 */
 static void got_sig(int signal)
 {
-	CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_DUMP_DICT, NULL), /* continue */);
-	CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_DUMP_CONFIG, NULL), /* continue */);
-	CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_DUMP_EXT, NULL), /* continue */);
+	CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_DUMP_DICT, 0, NULL), /* continue */);
+	CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_DUMP_CONFIG, 0, NULL), /* continue */);
+	CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_DUMP_EXT, 0, NULL), /* continue */);
 }
 /* Thread to display periodical debug information */
 static pthread_t thr;
@@ -76,8 +76,9 @@
 		sleep(3600); /* 1 hour */
 		#endif /* DEBUG */
 		TRACE_DEBUG(NONE, "Monitor information");
-		CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_DUMP_QUEUES, NULL), /* continue */);
-		CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_DUMP_PEERS, NULL), /* continue */);
+		CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_DUMP_QUEUES, 0, NULL), /* continue */);
+		CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_DUMP_SERV, 0, NULL), /* continue */);
+		CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_DUMP_PEERS, 0, NULL), /* continue */);
 		pthread_testcancel();
 	}
 	
--- a/freeDiameter/CMakeLists.txt	Wed Oct 21 18:42:45 2009 +0900
+++ b/freeDiameter/CMakeLists.txt	Mon Oct 26 16:00:49 2009 +0900
@@ -9,6 +9,7 @@
 # List of source files
 SET(FD_COMMON_SRC
 	fD.h
+	cnxctx.h
 	config.c
 	cnxctx.c
 	dispatch.c
@@ -25,7 +26,7 @@
 	)
 
 IF(NOT DISABLE_SCTP)
-	SET(FD_COMMON_SRC ${FD_COMMON_SRC} sctp.c)
+	SET(FD_COMMON_SRC ${FD_COMMON_SRC} sctp.c sctps.c)
 ENDIF(NOT DISABLE_SCTP)
 
 SET(FD_COMMON_GEN_SRC
--- a/freeDiameter/cnxctx.c	Wed Oct 21 18:42:45 2009 +0900
+++ b/freeDiameter/cnxctx.c	Mon Oct 26 16:00:49 2009 +0900
@@ -34,6 +34,12 @@
 *********************************************************************************************************/
 
 #include "fD.h"
+#include "cnxctx.h"
+
+/* The maximum size of Diameter message we accept to receive (<= 2^24) to avoid too big mallocs in case of trashed headers */
+#ifndef DIAMETER_MSG_SIZE_MAX
+#define DIAMETER_MSG_SIZE_MAX	65535	/* in bytes */
+#endif /* DIAMETER_MSG_SIZE_MAX */
 
 /* Connections contexts (cnxctx) in freeDiameter are wrappers around the sockets and TLS operations .
  * They are used to hide the details of the processing to the higher layers of the daemon.
@@ -56,7 +62,7 @@
  *    - otherwise to receive clear messages, call fd_cnx_start_clear. fd_cnx_handshake can be called later.
  *
  * 3) Usage
- *    - fd_cnx_receive, fd_cnx_send : exchange messages on this connection (send is synchronous, receive is not).
+ *    - fd_cnx_receive, fd_cnx_send : exchange messages on this connection (send is synchronous, receive is not, but blocking).
  *    - fd_cnx_recv_setaltfifo : when a message is received, the event is sent to an external fifo list. fd_cnx_receive does not work when the alt_fifo is set.
  *    - fd_cnx_getid : retrieve a descriptive string for the connection (for debug)
  *    - fd_cnx_getremoteid : identification of the remote peer (IP address or fqdn)
@@ -67,42 +73,10 @@
  *    - fd_cnx_destroy
  */
 
-/* The connection context structure */
-struct cnxctx {
-	char		cc_id[60];	/* The name of this connection */
-	char		cc_remid[60];	/* Id of remote peer */
 
-	int 		cc_socket;	/* The socket object of the connection -- <=0 if no socket is created */
-
-	int 		cc_proto;	/* IPPROTO_TCP or IPPROTO_SCTP */
-	int		cc_tls;		/* Is TLS already started ? */
-
-	struct fifo *	cc_events;	/* Events occuring on the connection */
-	pthread_t	cc_mgr;		/* manager thread for the connection */
-	struct fifo *	cc_incoming;	/* FIFO queue of messages received on the connection */
-	struct fifo *	cc_alt;		/* alternate fifo to send FDEVP_CNX_MSG_RECV events to. */
-
-	/* If cc_proto == SCTP */
-	struct	{
-		int		str_out;/* Out streams */
-		int		str_in;	/* In streams */
-		int		pairs;	/* max number of pairs ( = min(in, out)) */
-		int		next;	/* # of stream the next message will be sent to */
-	} 		cc_sctp_para;
-
-	/* If cc_tls == true */
-	struct {
-		int				 mode; 		/* GNUTLS_CLIENT / GNUTLS_SERVER */
-		gnutls_session_t 		 session;	/* Session object (stream #0 in case of SCTP) */
-	}		cc_tls_para;
-
-	/* If both conditions */
-	struct {
-		gnutls_session_t 		*res_sessions;	/* Sessions of other pairs of streams, resumed from the first */
-		/* Buffers, threads, ... */
-	}		cc_sctp_tls_para;
-};
-
+/*******************************************/
+/*     Creation of a connection object     */
+/*******************************************/
 
 /* Initialize a context structure */
 static struct cnxctx * fd_cnx_init(int full)
@@ -115,7 +89,6 @@
 	memset(conn, 0, sizeof(struct cnxctx));
 
 	if (full) {
-		CHECK_FCT_DO( fd_fifo_new ( &conn->cc_events ), return NULL );
 		CHECK_FCT_DO( fd_fifo_new ( &conn->cc_incoming ), return NULL );
 	}
 
@@ -351,7 +324,7 @@
 	return NULL;
 }
 
-/* Same for SCTP, accepts a list of remote addresses to connect to (see sctp_connectx) */
+/* Same for SCTP, accepts a list of remote addresses to connect to (see sctp_connectx for how they are used) */
 struct cnxctx * fd_cnx_cli_connect_sctp(int no_ip6, uint16_t port, struct fd_list * list)
 {
 #ifdef DISABLE_SCTP
@@ -381,6 +354,12 @@
 	else
 		cnx->cc_sctp_para.pairs = cnx->cc_sctp_para.str_in;
 	
+	if (TRACE_BOOL(INFO)) {
+		fd_log_debug("Connection established to server '");
+		sSA_DUMP_NODE_SERV( &primary, NI_NUMERICSERV);
+		fd_log_debug("' (SCTP:%d, %d/%d streams).\n", sock, cnx->cc_sctp_para.str_in, cnx->cc_sctp_para.str_out);
+	}
+	
 	/* Generate the names for the object */
 	{
 		char addrbuf[INET6_ADDRSTRLEN];
@@ -402,12 +381,6 @@
 			snprintf(cnx->cc_remid, sizeof(cnx->cc_remid), "[err:%s]", gai_strerror(rc));
 	}
 	
-	if (TRACE_BOOL(INFO)) {
-		fd_log_debug("Connection established to server '");
-		sSA_DUMP_NODE_SERV( &primary, NI_NUMERICSERV);
-		fd_log_debug("' (SCTP:%d).\n", sock);
-	}
-	
 	return cnx;
 
 error:
@@ -423,102 +396,6 @@
 	return conn->cc_id;
 }
 
-/* Start receving messages in clear (no TLS) on the connection */
-int fd_cnx_start_clear(struct cnxctx * conn)
-{
-
-	TODO("...");
-	return ENOTSUP;
-}
-
-/* TLS handshake a connection; no need to have called start_clear before. Reception is active if handhsake is successful */
-int fd_cnx_handshake(struct cnxctx * conn, int mode, char * priority)
-{
-	TRACE_ENTRY( "%p %d", conn, mode);
-	CHECK_PARAMS( conn && ( (mode == GNUTLS_CLIENT) || (mode == GNUTLS_SERVER) ) );
-
-	/* Save the mode */
-	conn->cc_tls_para.mode = mode;
-
-	/* Create the master session context */
-	CHECK_GNUTLS_DO( gnutls_init (&conn->cc_tls_para.session, mode), return ENOMEM );
-
-	/* Set the algorithm suite */
-	TODO("Use overwrite priority if non NULL");
-	CHECK_GNUTLS_DO( gnutls_priority_set( conn->cc_tls_para.session, fd_g_config->cnf_sec_data.prio_cache ), return EINVAL );
-
-	/* Set the credentials of this side of the connection */
-	CHECK_GNUTLS_DO( gnutls_credentials_set (conn->cc_tls_para.session, GNUTLS_CRD_CERTIFICATE, fd_g_config->cnf_sec_data.credentials), return EINVAL );
-
-	/* Request the remote credentials as well */
-	if (mode == GNUTLS_SERVER) {
-		gnutls_certificate_server_set_request (conn->cc_tls_para.session, GNUTLS_CERT_REQUIRE);
-	}
-
-	/* Set the socket info in the session */
-	gnutls_transport_set_ptr (conn->cc_tls_para.session, (gnutls_transport_ptr_t) conn->cc_socket);
-
-	/* Special case: multi-stream TLS is not natively managed in GNU TLS, we use a wrapper library */
-	if ((conn->cc_proto == IPPROTO_SCTP) && (conn->cc_sctp_para.pairs > 0)) {
-#ifndef DISABLE_SCTP
-		TODO("Initialize the SCTP TLS wrapper");
-		TODO("Set the lowat, push and pull functions");
-#else /* DISABLE_SCTP */
-		ASSERT(0);
-#endif /* DISABLE_SCTP */
-	}
-
-	/* Handshake master session */
-	{
-		int ret;
-		CHECK_GNUTLS_DO( ret = gnutls_handshake(conn->cc_tls_para.session),
-			{
-				if (TRACE_BOOL(INFO)) {
-					fd_log_debug("TLS Handshake failed on socket %d (%s) : %s\n", conn->cc_socket, conn->cc_id, gnutls_strerror(ret));
-				}
-				return EINVAL;
-			} );
-
-		/* Now verify the remote credentials are valid -- only simple test here */
-		CHECK_GNUTLS_DO( gnutls_certificate_verify_peers2 (conn->cc_tls_para.session, &ret), return EINVAL );
-		if (ret) {
-			if (TRACE_BOOL(INFO)) {
-				fd_log_debug("TLS: Remote certificate invalid on socket %d (%s) :\n", conn->cc_socket, conn->cc_id);
-				if (ret & GNUTLS_CERT_INVALID)
-					fd_log_debug(" - The certificate is not trusted (unknown CA?)\n");
-				if (ret & GNUTLS_CERT_REVOKED)
-					fd_log_debug(" - The certificate has been revoked.\n");
-				if (ret & GNUTLS_CERT_SIGNER_NOT_FOUND)
-					fd_log_debug(" - The certificate hasn't got a known issuer.\n");
-				if (ret & GNUTLS_CERT_SIGNER_NOT_CA)
-					fd_log_debug(" - The certificate signer is not a CA, or uses version 1, or 3 without basic constraints.\n");
-				if (ret & GNUTLS_CERT_INSECURE_ALGORITHM)
-					fd_log_debug(" - The certificate signature uses a weak algorithm.\n");
-			}
-			return EINVAL;
-		}
-	}
-
-	/* Other sessions in case of multi-stream SCTP are resumed from the master */
-	if ((conn->cc_proto == IPPROTO_SCTP) && (conn->cc_sctp_para.pairs > 0)) {
-#ifndef DISABLE_SCTP
-		TODO("Init and resume all additional sessions from the master one.");
-#endif /* DISABLE_SCTP */
-	}
-
-	TODO("Start the connection state machine thread");
-
-	return 0;
-}
-
-/* Retrieve TLS credentials of the remote peer, after handshake */
-int fd_cnx_getcred(struct cnxctx * conn, const gnutls_datum_t **cert_list, unsigned int *cert_list_size)
-{
-
-	TODO("...");
-	return ENOTSUP;
-}
-
 /* Get the list of endpoints (IP addresses) of the local and remote peers on this connection */
 int fd_cnx_getendpoints(struct cnxctx * conn, struct fd_list * local, struct fd_list * remote)
 {
@@ -550,7 +427,7 @@
 	
 	if (remote) {
 		/* Check we have a full connection object, not a listening socket (with no remote) */
-		CHECK_PARAMS( conn->cc_events );
+		CHECK_PARAMS( conn->cc_incoming );
 		
 		/* Retrieve the peer endpoint(s) of the connection */
 		switch (conn->cc_proto) {
@@ -586,35 +463,507 @@
 }
 
 
-/* Receive next message. if timeout is not NULL, wait only until timeout */
-int fd_cnx_receive(struct cnxctx * conn, struct timespec * timeout, unsigned char **buf, size_t * len)
+/**************************************/
+/*     Use of a connection object     */
+/**************************************/
+
+/* Receiver thread (TCP & noTLS) : incoming message is directly saved into the target queue */
+static void * rcvthr_notls_tcp(void * arg)
 {
+	struct cnxctx * conn = arg;
+	
+	TRACE_ENTRY("%p", arg);
+	
+	CHECK_PARAMS_DO(conn && (conn->cc_socket > 0), goto out);
+	ASSERT( conn->cc_proto == IPPROTO_TCP );
+	ASSERT( conn->cc_tls == 0 );
+	ASSERT( Target_Queue(conn) );
+	
+	/* Receive from a TCP connection: we have to rebuild the message boundaries */
+	do {
+		uint8_t header[4];
+		uint8_t * newmsg;
+		size_t  length;
+		ssize_t ret = 0;
+		size_t	received = 0;
 
-	TODO("...");
-	return ENOTSUP;
+		do {
+			ret = recv(conn->cc_socket, &header[received], sizeof(header) - received, 0);
+			if (ret <= 0) {
+				CHECK_SYS_DO(ret, /* continue */);
+				goto error; /* Stop the thread, the recipient of the event will cleanup */
+			}
+
+			received += ret;
+		} while (received < sizeof(header));
+
+		length = ((size_t)header[1] << 16) + ((size_t)header[2] << 8) + (size_t)header[3];
+
+		/* Check the received word is a valid begining of a Diameter message */
+		if ((header[0] != DIAMETER_VERSION)	/* defined in <libfreeDiameter.h> */
+		   || (length > DIAMETER_MSG_SIZE_MAX)) { /* to avoid too big mallocs */
+			/* The message is suspect */
+			TRACE_DEBUG(INFO, "Received suspect header [ver: %d, size: %g], assume disconnection", (int)header[0], length);
+			goto error; /* Stop the thread, the recipient of the event will cleanup */
+		}
+
+		/* Ok, now we can really receive the data */
+		CHECK_MALLOC_DO(  newmsg = malloc( length ), goto error );
+		memcpy(newmsg, header, sizeof(header));
+
+		while (received < length) {
+			pthread_cleanup_push(free, newmsg); /* In case we are canceled, clean the partialy built buffer */
+			ret = recv(conn->cc_socket, newmsg + received, length - received, 0);
+			pthread_cleanup_pop(0);
+
+			if (ret <= 0) {
+				CHECK_SYS_DO(ret, /* continue */);
+				free(newmsg);
+				goto error; /* Stop the thread, the recipient of the event will cleanup */
+			}
+			received += ret;
+		}
+		
+		/* We have received a complete message, send it */
+		CHECK_FCT_DO( fd_event_send( Target_Queue(conn), FDEVP_CNX_MSG_RECV, length, newmsg), /* continue or destroy everything? */);
+		
+	} while (conn->cc_loop);
+	
+out:
+	TRACE_DEBUG(FULL, "Thread terminated");	
+	return NULL;
+error:
+	CHECK_FCT_DO( fd_event_send( Target_Queue(conn), FDEVP_CNX_ERROR, 0, NULL), /* continue or destroy everything? */);
+	goto out;
 }
 
-/* Set / reset alternate FIFO list to send FDEVP_CNX_MSG_RECV to when message is received */
-int fd_cnx_recv_setaltfifo(struct cnxctx * conn, struct fifo * alt_fifo)
+#ifndef DISABLE_SCTP
+/* Receiver thread (SCTP & noTLS) : incoming message is directly saved into cc_incoming, no need to care for the stream ID */
+static void * rcvthr_notls_sctp(void * arg)
+{
+	struct cnxctx * conn = arg;
+	uint8_t * buf;
+	size_t    bufsz;
+	int	  event;
+	
+	TRACE_ENTRY("%p", arg);
+	
+	CHECK_PARAMS_DO(conn && (conn->cc_socket > 0), goto out);
+	ASSERT( conn->cc_proto == IPPROTO_SCTP );
+	ASSERT( conn->cc_tls == 0 );
+	ASSERT( Target_Queue(conn) );
+	
+	do {
+		CHECK_FCT_DO( fd_sctp_recvmeta(conn->cc_socket, NULL, &buf, &bufsz, &event), goto error );
+		if (event == FDEVP_CNX_ERROR) {
+			goto error;
+		}
+		
+		CHECK_FCT_DO( fd_event_send( Target_Queue(conn), event, bufsz, buf), goto error );
+		
+	} while (conn->cc_loop);
+	
+out:
+	TRACE_DEBUG(FULL, "Thread terminated");	
+	return NULL;
+error:
+	CHECK_FCT_DO( fd_event_send( Target_Queue(conn), FDEVP_CNX_ERROR, 0, NULL), /* continue or destroy everything? */);
+	goto out;
+}
+#endif /* DISABLE_SCTP */
+
+/* Returns 0 on error, received data size otherwise (always >= 0) */
+static ssize_t fd_tls_recv_handle_error(struct cnxctx * conn, gnutls_session_t session, void * data, size_t sz)
+{
+	ssize_t ret;
+again:	
+	CHECK_GNUTLS_DO( ret = gnutls_record_recv(session, data, sz),
+		{
+			switch (ret) {
+				case GNUTLS_E_REHANDSHAKE: 
+					CHECK_GNUTLS_DO( ret = gnutls_handshake(session),
+						{
+							if (TRACE_BOOL(INFO)) {
+								fd_log_debug("TLS re-handshake failed on socket %d (%s) : %s\n", conn->cc_socket, conn->cc_id, gnutls_strerror(ret));
+							}
+							ret = 0;
+							goto end;
+						} );
+
+				case GNUTLS_E_AGAIN:
+				case GNUTLS_E_INTERRUPTED:
+					goto again;
+
+				default:
+					TRACE_DEBUG(INFO, "This TLS error is not handled, assume unrecoverable error");
+					ret = 0;
+			}
+		} );
+end:	
+	return ret;
+}
+
+/* The function that receives TLS data and re-builds a Diameter message -- it exits only on error or cancelation */
+int fd_tls_rcvthr_core(struct cnxctx * conn, gnutls_session_t session)
 {
-	TRACE_ENTRY( "%p %p", conn, alt_fifo );
-	CHECK_PARAMS( conn );
+	/* No guaranty that GnuTLS preserves the message boundaries, so we re-build it as in TCP */
+	do {
+		uint8_t header[4];
+		uint8_t * newmsg;
+		size_t  length;
+		ssize_t ret = 0;
+		size_t	received = 0;
+
+		do {
+			ret = fd_tls_recv_handle_error(conn, conn->cc_tls_para.session, &header[received], sizeof(header) - received);
+			if (ret == 0) {
+				/* The connection is closed */
+				goto out;
+			}
+			received += ret;
+		} while (received < sizeof(header));
+
+		length = ((size_t)header[1] << 16) + ((size_t)header[2] << 8) + (size_t)header[3];
+
+		/* Check the received word is a valid beginning of a Diameter message */
+		if ((header[0] != DIAMETER_VERSION)	/* defined in <libfreeDiameter.h> */
+		   || (length > DIAMETER_MSG_SIZE_MAX)) { /* to avoid too big mallocs */
+			/* The message is suspect */
+			TRACE_DEBUG(INFO, "Received suspect header [ver: %d, size: %g], assume disconnection", (int)header[0], length);
+			goto out;
+		}
+
+		/* Ok, now we can really receive the data */
+		CHECK_MALLOC(  newmsg = malloc( length ) );
+		memcpy(newmsg, header, sizeof(header));
+
+		while (received < length) {
+			pthread_cleanup_push(free, newmsg); /* In case we are canceled, clean the partialy built buffer */
+			ret = fd_tls_recv_handle_error(conn, conn->cc_tls_para.session, newmsg + received, length - received);
+			pthread_cleanup_pop(0);
+
+			if (ret == 0) {
+				free(newmsg);
+				goto out; /* Stop the thread, the recipient of the event will cleanup */
+			}
+			received += ret;
+		}
+		
+		/* We have received a complete message, send it */
+		CHECK_FCT_DO( fd_event_send( Target_Queue(conn), FDEVP_CNX_MSG_RECV, length, newmsg), /* continue or destroy everything? */);
+		
+	} while (1);
+out:
+	return ENOTCONN;
+}
+
+/* Receiver thread (TLS & 1 stream SCTP or TCP) : gnutls directly handles the socket, save records into the target queue */
+static void * rcvthr_tls_single(void * arg)
+{
+	struct cnxctx * conn = arg;
+	
+	TRACE_ENTRY("%p", arg);
+	
+	CHECK_PARAMS_DO(conn && (conn->cc_socket > 0), goto error);
+	ASSERT( conn->cc_tls == 1 );
+	ASSERT( Target_Queue(conn) );
 	
-	/* Let's cross fingers that there is no race condition here... */
-	conn->cc_alt = alt_fifo;
+	CHECK_FCT_DO(fd_tls_rcvthr_core(conn, conn->cc_tls_para.session), /* continue */);
+error:
+	CHECK_FCT_DO( fd_event_send( Target_Queue(conn), FDEVP_CNX_ERROR, 0, NULL), /* continue or destroy everything? */);
+	TRACE_DEBUG(FULL, "Thread terminated");	
+	return NULL;
+}
+
+/* Start receving messages in clear (no TLS) on the connection */
+int fd_cnx_start_clear(struct cnxctx * conn, int loop)
+{
+	TRACE_ENTRY("%p %i", conn, loop);
+	
+	CHECK_PARAMS( conn && Target_Queue(conn) && (!conn->cc_tls) && (!conn->cc_loop));
+	
+	/* Save the loop request */
+	conn->cc_loop = loop;
+	
+	/* Release resources in case of a previous call was already made */
+	CHECK_FCT_DO( fd_thr_term(&conn->cc_rcvthr), /* continue */);
+	
+	switch (conn->cc_proto) {
+		case IPPROTO_TCP:
+			/* Start the tcp_notls thread */
+			CHECK_POSIX( pthread_create( &conn->cc_rcvthr, NULL, rcvthr_notls_tcp, conn ) );
+			break;
+#ifndef DISABLE_SCTP
+		case IPPROTO_SCTP:
+			/* Start the tcp_notls thread */
+			CHECK_POSIX( pthread_create( &conn->cc_rcvthr, NULL, rcvthr_notls_sctp, conn ) );
+			break;
+#endif /* DISABLE_SCTP */
+		default:
+			TRACE_DEBUG(INFO, "Unknown protocol: %d", conn->cc_proto);
+			return ENOTSUP;
+	}
+			
+	return 0;
+}
+
+/* Prepare a gnutls session object for handshake */
+int fd_tls_prepare(gnutls_session_t * session, int mode, char * priority)
+{
+	/* Create the master session context */
+	CHECK_GNUTLS_DO( gnutls_init (session, mode), return ENOMEM );
+
+	/* Set the algorithm suite */
+	if (priority) {
+		const char * errorpos;
+		CHECK_GNUTLS_DO( gnutls_priority_set_direct( *session, priority, &errorpos ), 
+			{ TRACE_DEBUG(INFO, "Error in priority string '%s' at position: '%s'\n", priority, errorpos); return EINVAL; } );
+	} else {
+		CHECK_GNUTLS_DO( gnutls_priority_set( *session, fd_g_config->cnf_sec_data.prio_cache ), return EINVAL );
+	}
+
+	/* Set the credentials of this side of the connection */
+	CHECK_GNUTLS_DO( gnutls_credentials_set (*session, GNUTLS_CRD_CERTIFICATE, fd_g_config->cnf_sec_data.credentials), return EINVAL );
+
+	/* Request the remote credentials as well */
+	if (mode == GNUTLS_SERVER) {
+		gnutls_certificate_server_set_request (*session, GNUTLS_CERT_REQUIRE);
+	}
 	
 	return 0;
 }
 
-/* Send a message */
+/* TLS handshake a connection; no need to have called start_clear before. Reception is active if handhsake is successful */
+int fd_cnx_handshake(struct cnxctx * conn, int mode, char * priority)
+{
+	TRACE_ENTRY( "%p %d", conn, mode);
+	CHECK_PARAMS( conn && (!conn->cc_tls) && ( (mode == GNUTLS_CLIENT) || (mode == GNUTLS_SERVER) ) && (!conn->cc_loop) );
+
+	/* Save the mode */
+	conn->cc_tls_para.mode = mode;
+	
+	/* Cancel receiving thread if any -- it should already be terminated anyway, we just release the resources */
+	CHECK_FCT_DO( fd_thr_term(&conn->cc_rcvthr), /* continue */);
+	
+	/* Once TLS handshake is done, we don't stop after the first message */
+	conn->cc_loop = 1;
+	
+	/* Prepare the master session credentials and priority */
+	CHECK_FCT( fd_tls_prepare(&conn->cc_tls_para.session, mode, priority) );
+
+	/* Special case: multi-stream TLS is not natively managed in GNU TLS, we use a wrapper library */
+	if (conn->cc_sctp_para.pairs > 1) {
+#ifdef DISABLE_SCTP
+		ASSERT(0);
+		CHECK_FCT( ENOTSUP );
+#else /* DISABLE_SCTP */
+		/* Initialize the wrapper, start the demux thread */
+		CHECK_FCT( fd_sctps_init(conn) );
+#endif /* DISABLE_SCTP */
+	} else {
+		/* Set the socket info in the session */
+		gnutls_transport_set_ptr (conn->cc_tls_para.session, (gnutls_transport_ptr_t) conn->cc_socket);
+	}
+
+	/* Handshake master session */
+	{
+		int ret;
+		CHECK_GNUTLS_DO( ret = gnutls_handshake(conn->cc_tls_para.session),
+			{
+				if (TRACE_BOOL(INFO)) {
+					fd_log_debug("TLS Handshake failed on socket %d (%s) : %s\n", conn->cc_socket, conn->cc_id, gnutls_strerror(ret));
+				}
+				return EINVAL;
+			} );
+
+		/* Now verify the remote credentials are valid -- only simple test here */
+		CHECK_GNUTLS_DO( gnutls_certificate_verify_peers2 (conn->cc_tls_para.session, &ret), return EINVAL );
+		if (ret) {
+			if (TRACE_BOOL(INFO)) {
+				fd_log_debug("TLS: Remote certificate invalid on socket %d (%s) :\n", conn->cc_socket, conn->cc_id);
+				if (ret & GNUTLS_CERT_INVALID)
+					fd_log_debug(" - The certificate is not trusted (unknown CA?)\n");
+				if (ret & GNUTLS_CERT_REVOKED)
+					fd_log_debug(" - The certificate has been revoked.\n");
+				if (ret & GNUTLS_CERT_SIGNER_NOT_FOUND)
+					fd_log_debug(" - The certificate hasn't got a known issuer.\n");
+				if (ret & GNUTLS_CERT_SIGNER_NOT_CA)
+					fd_log_debug(" - The certificate signer is not a CA, or uses version 1, or 3 without basic constraints.\n");
+				if (ret & GNUTLS_CERT_INSECURE_ALGORITHM)
+					fd_log_debug(" - The certificate signature uses a weak algorithm.\n");
+			}
+			return EINVAL;
+		}
+	}
+
+	/* Multi-stream TLS: handshake other streams as well */
+	if (conn->cc_sctp_para.pairs > 1) {
+#ifndef DISABLE_SCTP
+		/* Resume all additional sessions from the master one. */
+		CHECK_FCT(fd_sctps_handshake_others(conn, priority));
+		
+		/* Start decrypting the messages from all threads and queuing them in target queue */
+		CHECK_FCT(fd_sctps_startthreads(conn));
+#endif /* DISABLE_SCTP */
+	} else {
+		/* Start decrypting the data */
+		CHECK_POSIX( pthread_create( &conn->cc_rcvthr, NULL, rcvthr_tls_single, conn ) );
+	}
+
+	return 0;
+}
+
+/* Retrieve TLS credentials of the remote peer, after handshake */
+int fd_cnx_getcred(struct cnxctx * conn, const gnutls_datum_t **cert_list, unsigned int *cert_list_size)
+{
+	TRACE_ENTRY("%p %p %p", conn, cert_list, cert_list_size);
+	CHECK_PARAMS( conn && (conn->cc_tls) && cert_list && cert_list_size );
+	
+	/* This function only works for X.509 certificates. */
+	CHECK_PARAMS( gnutls_certificate_type_get (conn->cc_tls_para.session) == GNUTLS_CRT_X509 );
+	
+	*cert_list = gnutls_certificate_get_peers (conn->cc_tls_para.session, cert_list_size);
+	if (*cert_list == NULL) {
+		TRACE_DEBUG(INFO, "No certificate was provided by remote peer / an error occurred.");
+		return EINVAL;
+	}
+
+	TRACE_DEBUG( FULL, "Remote peer provided %d certificates.\n", *cert_list_size);
+	
+	return 0;
+}
+
+/* Receive next message. if timeout is not NULL, wait only until timeout. This function only pulls from a queue, mgr thread is filling that queue aynchrounously. */
+int fd_cnx_receive(struct cnxctx * conn, struct timespec * timeout, unsigned char **buf, size_t * len)
+{
+	int    ev;
+	size_t ev_sz;
+	void * ev_data;
+	
+	TRACE_ENTRY("%p %p %p %p", conn, timeout, buf, len);
+	CHECK_PARAMS(conn && (conn->cc_socket > 0) && buf && len);
+	CHECK_PARAMS(conn->cc_rcvthr != (pthread_t)NULL);
+	CHECK_PARAMS(conn->cc_alt == NULL);
+
+	/* Now, pull the first event */
+get_next:
+	if (timeout) {
+		CHECK_FCT( fd_event_timedget(conn->cc_incoming, timeout, FDEVP_PSM_TIMEOUT, &ev, &ev_sz, &ev_data) );
+	} else {
+		CHECK_FCT( fd_event_get(conn->cc_incoming, &ev, &ev_sz, &ev_data) );
+	}
+	
+	switch (ev) {
+		case FDEVP_CNX_MSG_RECV:
+			/* We got one */
+			*len = ev_sz;
+			*buf = ev_data;
+			return 0;
+			
+		case FDEVP_PSM_TIMEOUT:
+			TRACE_DEBUG(FULL, "Timeout event received");
+			return ETIMEDOUT;
+			
+		case FDEVP_CNX_EP_CHANGE:
+			/* We ignore this event */
+			goto get_next;
+			
+		case FDEVP_CNX_ERROR:
+			TRACE_DEBUG(FULL, "Received ERROR event on the connection");
+			return ENOTCONN;
+	}
+	
+	TRACE_DEBUG(INFO, "Received unexpected event %d (%s)", ev, fd_pev_str(ev));
+	return EINVAL;
+}
+
+/* Set an alternate FIFO list to send FDEVP_CNX_* events to */
+int fd_cnx_recv_setaltfifo(struct cnxctx * conn, struct fifo * alt_fifo)
+{
+	TRACE_ENTRY( "%p %p", conn, alt_fifo );
+	CHECK_PARAMS( conn && alt_fifo && conn->cc_incoming );
+	
+	/* The magic function does it all */
+	CHECK_FCT( fd_fifo_move( &conn->cc_incoming, alt_fifo, &conn->cc_alt ) );
+	
+	return 0;
+}
+
+/* Send function when no multi-stream is involved, or sending on stream #0 (send() always use stream 0)*/
+static int send_simple(struct cnxctx * conn, unsigned char * buf, size_t len)
+{
+	ssize_t ret;
+	size_t sent = 0;
+	TRACE_ENTRY("%p %p %g", conn, buf, len);
+	do {
+		if (conn->cc_tls) {
+			CHECK_GNUTLS_DO( ret = gnutls_record_send (conn->cc_tls_para.session, buf + sent, len - sent), return ENOTCONN );
+		} else {
+			CHECK_SYS( ret = send(conn->cc_socket, buf + sent, len - sent, 0) ); /* better to replace with sendmsg for atomic sending? */
+		}
+		sent += ret;
+	} while ( sent < len );
+	return 0;
+}
+
+/* Send a message -- this is synchronous -- and we assume it's never called by several threads at the same time, so we don't protect. */
 int fd_cnx_send(struct cnxctx * conn, unsigned char * buf, size_t len)
 {
+	TRACE_ENTRY("%p %p %g", conn, buf, len);
+	
+	CHECK_PARAMS(conn && (conn->cc_socket > 0) && buf && len);
 
-	TODO("...");
-	return ENOTSUP;
+	TRACE_DEBUG(FULL, "Sending %gb %sdata on connection %s", len, conn->cc_tls ? "TLS-protected ":"", conn->cc_id);
+	
+	switch (conn->cc_proto) {
+		case IPPROTO_TCP:
+			CHECK_FCT( send_simple(conn, buf, len) );
+			break;
+		
+#ifndef DISABLE_SCTP
+		case IPPROTO_SCTP: {
+			int multistr = 0;
+			
+			if ((conn->cc_sctp_para.str_out > 1) && ((! conn->cc_tls) || (conn->cc_sctp_para.pairs > 1)))  {
+				/* Update the id of the stream we will send this message on */
+				conn->cc_sctp_para.next += 1;
+				conn->cc_sctp_para.next %= (conn->cc_tls ? conn->cc_sctp_para.pairs : conn->cc_sctp_para.str_out);
+				multistr = 1;
+			}
+			
+			if ((!multistr) || (conn->cc_sctp_para.next == 0)) {
+				CHECK_FCT( send_simple(conn, buf, len) );
+			} else {
+				if (!conn->cc_tls) {
+					CHECK_FCT( fd_sctp_sendstr(conn->cc_socket, conn->cc_sctp_para.next, buf, len) );
+				} else {
+					/* push the record to the appropriate session */
+					ssize_t ret;
+					size_t sent = 0;
+					ASSERT(conn->cc_sctps_data.array != NULL);
+					do {
+						CHECK_GNUTLS_DO( ret = gnutls_record_send (conn->cc_sctps_data.array[conn->cc_sctp_para.next - 1].session, buf + sent, len - sent), { TODO("Handle error (re-handshake, etc.."); return ENOTCONN; } );
+						sent += ret;
+					} while ( sent < len );
+				}
+			}
+		}
+		break;
+#endif /* DISABLE_SCTP */
+	
+		default:
+			TRACE_DEBUG(INFO, "Unknwon protocol: %d", conn->cc_proto);
+			return ENOTSUP;	/* or EINVAL... */
+	}
+	
+	return 0;
 }
 
 
+/**************************************/
+/*     Destruction of connection      */
+/**************************************/
+
 /* Destroy a conn structure, and shutdown the socket */
 void fd_cnx_destroy(struct cnxctx * conn)
 {
@@ -622,22 +971,41 @@
 	
 	CHECK_PARAMS_DO(conn, return);
 
-	TODO("End TLS session(s) if started");
+	/* In case of TLS, stop receiver thread, then close properly the gnutls session */
+	if ((conn->cc_tls) && (conn->cc_sctp_para.pairs > 1)) {
+#ifndef DISABLE_SCTP
+		/* Multi-stream TLS: Stop all decipher threads, but not the demux thread */
+		fd_sctps_stopthreads(conn);
+#endif /* DISABLE_SCTP */
+	} else {
+		/* Stop the decoding thread */
+		CHECK_FCT_DO( fd_thr_term(&conn->cc_rcvthr), /* continue */ );
+	}
 	
-	TODO("Stop manager thread if running");
+	/* Terminate properly the TLS session(s) */
+	if (conn->cc_tls) {
+		/* Master session */
+		CHECK_GNUTLS_DO( gnutls_bye(conn->cc_tls_para.session, GNUTLS_SHUT_RDWR), /* Continue */ );
+		gnutls_deinit(conn->cc_tls_para.session);
+		
+#ifndef DISABLE_SCTP
+		if (conn->cc_sctp_para.pairs > 1) {
+			/* Multi-stream TLS: destroy the wrapper and stop the demux thread */
+			fd_sctps_destroy(conn);
+		}
+#endif /* DISABLE_SCTP */
+		
+	}
 	
 	/* Shut the connection down */
 	if (conn->cc_socket > 0) {
 		shutdown(conn->cc_socket, SHUT_RDWR);
 	}
 	
-	TODO("Empty FIFO queues");
-	
-	/* Destroy FIFO lists */
-	if (conn->cc_events)
-		CHECK_FCT_DO( fd_fifo_del ( &conn->cc_events ), /* continue */ );
-	if (conn->cc_incoming)
-		CHECK_FCT_DO( fd_fifo_del ( &conn->cc_incoming ), /* continue */ );
+	/* Empty and destroy FIFO list */
+	if (conn->cc_incoming) {
+		fd_event_destroy( &conn->cc_incoming, free );
+	}
 	
 	/* Free the object */
 	free(conn);
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/freeDiameter/cnxctx.h	Mon Oct 26 16:00:49 2009 +0900
@@ -0,0 +1,124 @@
+/*********************************************************************************************************
+* Software License Agreement (BSD License)                                                               *
+* Author: Sebastien Decugis <sdecugis@nict.go.jp>							 *
+*													 *
+* Copyright (c) 2009, WIDE Project and NICT								 *
+* All rights reserved.											 *
+* 													 *
+* Redistribution and use of this software in source and binary forms, with or without modification, are  *
+* permitted provided that the following conditions are met:						 *
+* 													 *
+* * Redistributions of source code must retain the above 						 *
+*   copyright notice, this list of conditions and the 							 *
+*   following disclaimer.										 *
+*    													 *
+* * Redistributions in binary form must reproduce the above 						 *
+*   copyright notice, this list of conditions and the 							 *
+*   following disclaimer in the documentation and/or other						 *
+*   materials provided with the distribution.								 *
+* 													 *
+* * Neither the name of the WIDE Project or NICT nor the 						 *
+*   names of its contributors may be used to endorse or 						 *
+*   promote products derived from this software without 						 *
+*   specific prior written permission of WIDE Project and 						 *
+*   NICT.												 *
+* 													 *
+* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
+* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
+* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
+* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 	 *
+* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 	 *
+* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
+* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF   *
+* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.								 *
+*********************************************************************************************************/
+
+/* This file contains the definitions for internal use in the connection context files */
+
+#ifndef _CNXCTX_H
+#define _CNXCTX_H
+
+/* The connection context structure */
+struct cnxctx {
+	char		cc_id[60];	/* The name of this connection */
+	char		cc_remid[60];	/* Id of remote peer */
+
+	int 		cc_socket;	/* The socket object of the connection -- <=0 if no socket is created */
+
+	int 		cc_proto;	/* IPPROTO_TCP or IPPROTO_SCTP */
+	int		cc_tls;		/* Is TLS already started ? */
+
+	pthread_t	cc_rcvthr;	/* thread for receiving messages on the connection */
+	int		cc_loop;	/* tell the thread if it loops or stops after the first message is received */
+	
+	struct fifo *	cc_incoming;	/* FIFO queue of events received on the connection, FDEVP_CNX_* */
+	struct fifo *	cc_alt;		/* alternate fifo to send FDEVP_CNX_* events to. */
+	#define Target_Queue(cnx)	((cnx)->cc_alt ?: (cnx)->cc_incoming)
+
+	/* If cc_tls == true */
+	struct {
+		int				 mode; 		/* GNUTLS_CLIENT / GNUTLS_SERVER */
+		gnutls_session_t 		 session;	/* Session object (stream #0 in case of SCTP) */
+	}		cc_tls_para;
+
+	/* If cc_proto == SCTP */
+	struct	{
+		uint16_t str_out;	/* Out streams */
+		uint16_t str_in;	/* In streams */
+		uint16_t pairs;		/* max number of pairs ( = min(in, out)) */
+		uint16_t next;		/* # of stream the next message will be sent to */
+	} 		cc_sctp_para;
+
+	/* If both conditions */
+	struct {
+		struct sctps_ctx *array; /* an array of cc_sctp_para.pairs elements -- the #0 is special (session is outside)*/
+		struct sr_store	 *sess_store; /* Session data of the master session, to resume the children sessions */
+	} 		cc_sctps_data;
+};
+
+/* TLS */
+int fd_tls_rcvthr_core(struct cnxctx * conn, gnutls_session_t session);
+int fd_tls_prepare(gnutls_session_t * session, int mode, char * priority);
+
+/* TCP */
+int fd_tcp_create_bind_server( int * sock, sSA * sa, socklen_t salen );
+int fd_tcp_listen( int sock );
+int fd_tcp_client( int *sock, sSA * sa, socklen_t salen );
+int fd_tcp_get_local_ep(int sock, sSS * ss, socklen_t *sl);
+int fd_tcp_get_remote_ep(int sock, sSS * ss, socklen_t *sl);
+
+#ifndef DISABLE_SCTP
+/* SCTP */
+int fd_sctp_create_bind_server( int * sock, struct fd_list * list, uint16_t port );
+int fd_sctp_listen( int sock );
+int fd_sctp_client( int *sock, int no_ip6, uint16_t port, struct fd_list * list );
+int fd_sctp_get_local_ep(int sock, struct fd_list * list);
+int fd_sctp_get_remote_ep(int sock, struct fd_list * list);
+int fd_sctp_get_str_info( int sock, uint16_t *in, uint16_t *out, sSS *primary );
+int fd_sctp_sendstr(int sock, uint16_t strid, uint8_t * buf, size_t len);
+int fd_sctp_recvmeta(int sock, uint16_t * strid, uint8_t ** buf, size_t * len, int *event);
+
+/* TLS over SCTP (multi-stream) */
+struct sctps_ctx {
+	struct cnxctx 	*parent; 	/* for info such as socket, conn name, event list */
+	uint16_t	 strid;		/* Stream # of this session */
+	struct fifo	*raw_recv;	/* Raw data received on this stream, for demux */
+	struct {
+		uint8_t *buf;
+		size_t   bufsz;
+		size_t   offset;
+	} 		 partial;	/* If the pull function did not read the full content of first message in raw, it stores it here for next read call. */
+	pthread_t	 thr;		/* Thread to decrypt raw data in this pair of streams */
+	gnutls_session_t session;	/* TLS context using this pair of streams -- except if strid == 0, in that case session is outside the array */
+};
+
+int fd_sctps_init(struct cnxctx * conn);
+int fd_sctps_handshake_others(struct cnxctx * conn, char * priority);
+int fd_sctps_startthreads(struct cnxctx * conn);
+void fd_sctps_stopthreads(struct cnxctx * conn);
+void fd_sctps_destroy(struct cnxctx * conn);
+
+#endif /* DISABLE_SCTP */
+
+#endif /* _CNXCTX_H */
+
--- a/freeDiameter/fD.h	Wed Oct 21 18:42:45 2009 +0900
+++ b/freeDiameter/fD.h	Mon Oct 26 16:00:49 2009 +0900
@@ -151,13 +151,19 @@
 /* Events codespace for struct fd_peer->p_events */
 enum {
 	/* Dump all info about this peer in the debug log */
-	 FDEVP_DUMP_ALL = 2000
+	 FDEVP_DUMP_ALL = 1500
 	
 	/* request to terminate this peer : disconnect, requeue all messages */
 	,FDEVP_TERMINATE
 	
-	/* A connection object has received a message -- stored in event->data */
+	/* A connection object has received a message. */
 	,FDEVP_CNX_MSG_RECV
+			 
+	/* A connection object has encountered an error (disconnected). */
+	,FDEVP_CNX_ERROR
+	
+	/* Endpoints of a connection have been changed (multihomed SCTP). */
+	,FDEVP_CNX_EP_CHANGE
 	
 	/* A message was received in the peer */
 	,FDEVP_MSG_INCOMING
@@ -209,7 +215,7 @@
 struct cnxctx * fd_cnx_cli_connect_tcp(sSA * sa, socklen_t addrlen);
 struct cnxctx * fd_cnx_cli_connect_sctp(int no_ip6, uint16_t port, struct fd_list * list);
 char * fd_cnx_getid(struct cnxctx * conn);
-int fd_cnx_start_clear(struct cnxctx * conn);
+int fd_cnx_start_clear(struct cnxctx * conn, int loop);
 int fd_cnx_handshake(struct cnxctx * conn, int mode, char * priority);
 int fd_cnx_getcred(struct cnxctx * conn, const gnutls_datum_t **cert_list, unsigned int *cert_list_size);
 int fd_cnx_getendpoints(struct cnxctx * conn, struct fd_list * local, struct fd_list * remote);
@@ -219,24 +225,5 @@
 int fd_cnx_send(struct cnxctx * conn, unsigned char * buf, size_t len);
 void fd_cnx_destroy(struct cnxctx * conn);
 
-/* TCP */
-int fd_tcp_create_bind_server( int * sock, sSA * sa, socklen_t salen );
-int fd_tcp_listen( int sock );
-int fd_tcp_client( int *sock, sSA * sa, socklen_t salen );
-int fd_tcp_get_local_ep(int sock, sSS * ss, socklen_t *sl);
-int fd_tcp_get_remote_ep(int sock, sSS * ss, socklen_t *sl);
-
-/* SCTP */
-#ifndef DISABLE_SCTP
-int fd_sctp_create_bind_server( int * sock, struct fd_list * list, uint16_t port );
-int fd_sctp_listen( int sock );
-int fd_sctp_client( int *sock, int no_ip6, uint16_t port, struct fd_list * list );
-int fd_sctp_get_local_ep(int sock, struct fd_list * list);
-int fd_sctp_get_remote_ep(int sock, struct fd_list * list);
-int fd_sctp_get_str_info( int sock, int *in, int *out, sSS *primary );
-
-#endif /* DISABLE_SCTP */
-
-
 
 #endif /* _FD_H */
--- a/freeDiameter/main.c	Wed Oct 21 18:42:45 2009 +0900
+++ b/freeDiameter/main.c	Mon Oct 26 16:00:49 2009 +0900
@@ -116,8 +116,8 @@
 	/* Now, just wait for events */
 	TRACE_DEBUG(INFO, FD_PROJECT_BINARY " daemon initialized.");
 	while (1) {
-		int code;
-		CHECK_FCT_DO(  fd_event_get(fd_g_config->cnf_main_ev, &code, NULL),  break  );
+		int code; size_t sz; void * data;
+		CHECK_FCT_DO(  fd_event_get(fd_g_config->cnf_main_ev, &code, &sz, &data),  break  );
 		switch (code) {
 			case FDEV_DUMP_DICT:
 				fd_dict_dump(fd_g_config->cnf_dict);
@@ -338,7 +338,7 @@
 	CHECK_SYS_DO(  sigwait(&sig_main, &sig), TRACE_DEBUG(INFO, "Error in sigwait function") );
 	
 	TRACE_DEBUG(INFO, "Received signal %s (%d), exiting", SIGNALSTR(sig), sig);
-	CHECK_FCT_DO( fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, NULL), exit(2) );
+	CHECK_FCT_DO( fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), exit(2) );
 	return NULL;
 }
 	
--- a/freeDiameter/p_expiry.c	Wed Oct 21 18:42:45 2009 +0900
+++ b/freeDiameter/p_expiry.c	Mon Oct 26 16:00:49 2009 +0900
@@ -86,7 +86,7 @@
 error:
 	TRACE_DEBUG(INFO, "An error occurred in peers module! GC thread is terminating...");
 	ASSERT(0);
-	CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, NULL), );
+	CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), );
 	return NULL;
 }
 
@@ -131,7 +131,7 @@
 		
 		/* Now, the first peer in the list is expired; signal it */
 		fd_list_unlink( &first->p_expiry );
-		CHECK_FCT_DO( fd_event_send(first->p_events, FDEVP_TERMINATE, NULL), goto error );
+		CHECK_FCT_DO( fd_event_send(first->p_events, FDEVP_TERMINATE, 0, NULL), goto error );
 		
 	} while (1);
 	
@@ -139,7 +139,7 @@
 error:
 	TRACE_DEBUG(INFO, "An error occurred in peers module! Expiry thread is terminating...");
 	ASSERT(0);
-	CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, NULL), );
+	CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), );
 	return NULL;
 }
 
--- a/freeDiameter/p_psm.c	Wed Oct 21 18:42:45 2009 +0900
+++ b/freeDiameter/p_psm.c	Mon Oct 26 16:00:49 2009 +0900
@@ -57,6 +57,8 @@
 		case_str(FDEVP_DUMP_ALL);
 		case_str(FDEVP_TERMINATE);
 		case_str(FDEVP_CNX_MSG_RECV);
+		case_str(FDEVP_CNX_ERROR);
+		case_str(FDEVP_CNX_EP_CHANGE);
 		case_str(FDEVP_MSG_INCOMING);
 		case_str(FDEVP_PSM_TIMEOUT);
 		
@@ -125,34 +127,13 @@
 #endif
 }
 
-/* Wait for the next event in the PSM, or timeout */
-static int psm_ev_timedget(struct fd_peer * peer, int *code, void ** data)
-{
-	struct fd_event * ev;
-	int ret = 0;
-	
-	TRACE_ENTRY("%p %p %p", peer, code, data);
-	
-	ret = fd_fifo_timedget(peer->p_events, &ev, &peer->p_psm_timer);
-	if (ret == ETIMEDOUT) {
-		*code = FDEVP_PSM_TIMEOUT;
-		*data = NULL;
-	} else {
-		CHECK_FCT( ret );
-		*code = ev->code;
-		*data = ev->data;
-		free(ev);
-	}
-	
-	return 0;
-}
-
 /* The state machine thread (controler) */
 static void * p_psm_th( void * arg )
 {
 	struct fd_peer * peer = (struct fd_peer *)arg;
 	int created_started = started;
 	int event;
+	size_t ev_sz;
 	void * ev_data;
 	
 	CHECK_PARAMS_DO( CHECK_PEER(peer), ASSERT(0) );
@@ -181,10 +162,10 @@
 	
 psm_loop:
 	/* Get next event */
-	CHECK_FCT_DO( psm_ev_timedget(peer, &event, &ev_data), goto psm_end );
-	TRACE_DEBUG(FULL, "'%s'\t<-- '%s'\t(%p)\t'%s'",
+	CHECK_FCT_DO( fd_event_timedget(peer->p_events, &peer->p_psm_timer, FDEVP_PSM_TIMEOUT, &event, &ev_sz, &ev_data), goto psm_end );
+	TRACE_DEBUG(FULL, "'%s'\t<-- '%s'\t(%p,%g)\t'%s'",
 			STATE_STR(peer->p_hdr.info.pi_state),
-			fd_pev_str(event), ev_data,
+			fd_pev_str(event), ev_data, ev_sz,
 			peer->p_hdr.info.pi_diamid);
 
 	/* Now, the action depends on the current state and the incoming event */
@@ -271,7 +252,7 @@
 	CHECK_PARAMS( CHECK_PEER(peer) );
 	
 	if (peer->p_hdr.info.pi_state != STATE_ZOMBIE) {
-		CHECK_FCT( fd_event_send(peer->p_events, FDEVP_TERMINATE, NULL) );
+		CHECK_FCT( fd_event_send(peer->p_events, FDEVP_TERMINATE, 0, NULL) );
 	} else {
 		TRACE_DEBUG(FULL, "Peer '%s' was already terminated", peer->p_hdr.info.pi_diamid);
 	}
--- a/freeDiameter/sctp.c	Wed Oct 21 18:42:45 2009 +0900
+++ b/freeDiameter/sctp.c	Mon Oct 26 16:00:49 2009 +0900
@@ -34,9 +34,16 @@
 *********************************************************************************************************/
 
 #include "fD.h"
+#include "cnxctx.h"
+
 #include <netinet/sctp.h>
 #include <sys/uio.h>
 
+/* Size of buffer to receive ancilliary data. May need to be enlarged if more sockopt are set... */
+#ifndef CMSG_BUF_LEN
+#define CMSG_BUF_LEN	1024
+#endif /* CMSG_BUF_LEN */
+
 /* Pre-binding socket options -- # streams read in config */
 static int fd_setsockopt_prebind(int sk)
 {
@@ -743,7 +750,7 @@
 }
 
 /* Retrieve streams information from a connected association -- optionaly provide the primary address */
-int fd_sctp_get_str_info( int sock, int *in, int *out, sSS *primary )
+int fd_sctp_get_str_info( int sock, uint16_t *in, uint16_t *out, sSS *primary )
 {
 	struct sctp_status status;
 	socklen_t sz = sizeof(status);
@@ -775,8 +782,8 @@
 	TRACE_DEBUG(FULL, "		 sstat_primary.spinfo_mtu     : %u" , status.sstat_primary.spinfo_mtu);
 	#endif /* DEBUG_SCTP */
 	
-	*in = (int)status.sstat_instrms;
-	*out = (int)status.sstat_outstrms;
+	*in = status.sstat_instrms;
+	*out = status.sstat_outstrms;
 	
 	if (primary)
 		memcpy(primary, &status.sstat_primary.spinfo_address, sizeof(sSS));
@@ -892,3 +899,218 @@
 	return 0;
 }
 
+/* Send a buffer over a specified stream */
+int fd_sctp_sendstr(int sock, uint16_t strid, uint8_t * buf, size_t len)
+{
+	struct msghdr mhdr;
+	struct iovec  iov;
+	struct {
+		struct cmsghdr 		hdr;
+		struct sctp_sndrcvinfo	sndrcv;
+	} anci;
+	ssize_t ret;
+	
+	TRACE_ENTRY("%d %hu %p %g", sock, strid, buf, len);
+	
+	memset(&mhdr, 0, sizeof(mhdr));
+	memset(&iov,  0, sizeof(iov));
+	memset(&anci, 0, sizeof(anci));
+	
+	/* IO Vector: message data */
+	iov.iov_base = buf;
+	iov.iov_len  = len;
+	
+	/* Anciliary data: specify SCTP stream */
+	anci.hdr.cmsg_len   = sizeof(anci);
+	anci.hdr.cmsg_level = IPPROTO_SCTP;
+	anci.hdr.cmsg_type  = SCTP_SNDRCV;
+	anci.sndrcv.sinfo_stream = strid;
+	/* note : we could store other data also, for example in .sinfo_ppid for remote peer or in .sinfo_context for errors. */
+	
+	/* We don't use mhdr.msg_name here; it could be used to specify an address different from the primary */
+	
+	mhdr.msg_iov    = &iov;
+	mhdr.msg_iovlen = 1;
+	
+	mhdr.msg_control    = &anci;
+	mhdr.msg_controllen = sizeof(anci);
+	
+	#ifdef DEBUG_SCTP
+	TRACE_DEBUG(FULL, "Sending %db data on stream %hu of socket %d", len, strid, sock);
+	#endif /* DEBUG_SCTP */
+	
+	CHECK_SYS( ret = sendmsg(sock, &mhdr, 0) );
+	ASSERT( ret == len ); /* There should not be partial delivery with sendmsg... */
+	
+	return 0;
+}
+
+/* Receive the next data from the socket, or next notification */
+int fd_sctp_recvmeta(int sock, uint16_t * strid, uint8_t ** buf, size_t * len, int *event)
+{
+	ssize_t 		 ret = 0;
+	struct msghdr 		 mhdr;
+	char   			 ancidata[ CMSG_BUF_LEN ];
+	struct iovec 		 iov;
+	uint8_t			*data = NULL;
+	size_t 			 bufsz = 0, datasize = 0;
+	size_t			 mempagesz = sysconf(_SC_PAGESIZE); /* We alloc buffer by memory pages for efficiency */
+	
+	TRACE_ENTRY("%d %p %p %p %p", sock, strid, buf, len, event);
+	CHECK_PARAMS( (sock > 0) && buf && len && event );
+	
+	/* Cleanup out parameters */
+	*buf = NULL;
+	*len = 0;
+	*event = 0;
+	
+	/* Prepare header for receiving message */
+	memset(&mhdr, 0, sizeof(mhdr));
+	mhdr.msg_iov    = &iov;
+	mhdr.msg_iovlen = 1;
+	mhdr.msg_control    = &ancidata;
+	mhdr.msg_controllen = sizeof(ancidata);
+	
+	/* We will loop while all data is not received. */
+incomplete:
+	if (datasize == bufsz) {
+		/* The buffer is full, enlarge it */
+		bufsz += mempagesz;
+		CHECK_MALLOC( data = realloc(data, bufsz) );
+	}
+	/* the new data will be received following the preceding */
+	memset(&iov,  0, sizeof(iov));
+	iov.iov_base = data + datasize ;
+	iov.iov_len  = bufsz - datasize;
+
+	/* Receive data from the socket */
+	pthread_cleanup_push(free, data);
+	ret = recvmsg(sock, &mhdr, 0);
+	pthread_cleanup_pop(0);
+	
+	/* Handle errors */
+	if (ret <= 0) { /* Socket is closed, or an error occurred */
+		CHECK_SYS_DO(ret, /* to log in case of error */);
+		free(data);
+		*event = FDEVP_CNX_ERROR;
+		return 0;
+	}
+	
+	/* Update the size of data we received */
+	datasize += ret;
+
+	/* SCTP provides an indication when we received a full record; loop if it is not the case */
+	if ( ! (mhdr.msg_flags & MSG_EOR) ) {
+		goto incomplete;
+	}
+	
+	/* Handle the case where the data received is a notification */
+	if (mhdr.msg_flags & MSG_NOTIFICATION) {
+		union sctp_notification * notif = (union sctp_notification *) data;
+		
+		switch (notif->sn_header.sn_type) {
+			
+			case SCTP_ASSOC_CHANGE:
+				#ifdef DEBUG_SCTP
+				TRACE_DEBUG(FULL, "Received SCTP_ASSOC_CHANGE notification");
+				TRACE_DEBUG(FULL, "    state : %hu", notif->sn_assoc_change.sac_state);
+				TRACE_DEBUG(FULL, "    error : %hu", notif->sn_assoc_change.sac_error);
+				TRACE_DEBUG(FULL, "    instr : %hu", notif->sn_assoc_change.sac_inbound_streams);
+				TRACE_DEBUG(FULL, "   outstr : %hu", notif->sn_assoc_change.sac_outbound_streams);
+				#endif /* DEBUG_SCTP */
+				
+				*event = FDEVP_CNX_EP_CHANGE;
+				break;
+	
+			case SCTP_PEER_ADDR_CHANGE:
+				#ifdef DEBUG_SCTP
+				TRACE_DEBUG(FULL, "Received SCTP_PEER_ADDR_CHANGE notification");
+				TRACE_DEBUG_sSA(FULL, "    intf_change : ", &(notif->sn_paddr_change.spc_aaddr), NI_NUMERICHOST | NI_NUMERICSERV, "" );
+				TRACE_DEBUG(FULL, "          state : %d", notif->sn_paddr_change.spc_state);
+				TRACE_DEBUG(FULL, "          error : %d", notif->sn_paddr_change.spc_error);
+				#endif /* DEBUG_SCTP */
+				
+				*event = FDEVP_CNX_EP_CHANGE;
+				break;
+	
+			case SCTP_SEND_FAILED:
+				#ifdef DEBUG_SCTP
+				TRACE_DEBUG(FULL, "Received SCTP_SEND_FAILED notification");
+				TRACE_DEBUG(FULL, "    len : %hu", notif->sn_send_failed.ssf_length);
+				TRACE_DEBUG(FULL, "    err : %d",  notif->sn_send_failed.ssf_error);
+				#endif /* DEBUG_SCTP */
+				
+				*event = FDEVP_CNX_ERROR;
+				break;
+			
+			case SCTP_REMOTE_ERROR:
+				#ifdef DEBUG_SCTP
+				TRACE_DEBUG(FULL, "Received SCTP_REMOTE_ERROR notification");
+				TRACE_DEBUG(FULL, "    err : %hu", ntohs(notif->sn_remote_error.sre_error));
+				TRACE_DEBUG(FULL, "    len : %hu", ntohs(notif->sn_remote_error.sre_length));
+				#endif /* DEBUG_SCTP */
+				
+				*event = FDEVP_CNX_ERROR;
+				break;
+	
+			case SCTP_SHUTDOWN_EVENT:
+				#ifdef DEBUG_SCTP
+				TRACE_DEBUG(FULL, "Received SCTP_SHUTDOWN_EVENT notification");
+				#endif /* DEBUG_SCTP */
+				
+				*event = FDEVP_CNX_ERROR;
+				break;
+			
+			default:	
+				TRACE_DEBUG(FULL, "Received unknown notification %d, assume error", notif->sn_header.sn_type);
+				*event = FDEVP_CNX_ERROR;
+		}
+		
+		free(data);
+		return 0;
+	}
+	
+	/* From this point, we have received a message */
+	*event = FDEVP_CNX_MSG_RECV;
+	*buf = data;
+	*len = datasize;
+	
+	if (strid) {
+		struct cmsghdr 		*hdr;
+		struct sctp_sndrcvinfo	*sndrcv;
+		
+		/* Handle the anciliary data */
+		for (hdr = CMSG_FIRSTHDR(&mhdr); hdr; hdr = CMSG_NXTHDR(&mhdr, hdr)) {
+
+			/* We deal only with anciliary data at SCTP level */
+			if (hdr->cmsg_level != IPPROTO_SCTP) {
+				TRACE_DEBUG(FULL, "Received some anciliary data at level %d, skipped", hdr->cmsg_level);
+				continue;
+			}
+			
+			/* Also only interested in SCTP_SNDRCV message for the moment */
+			if (hdr->cmsg_type != SCTP_SNDRCV) {
+				TRACE_DEBUG(FULL, "Anciliary block IPPROTO_SCTP / %d, skipped", hdr->cmsg_type);
+				continue;
+			}
+			
+			sndrcv = (struct sctp_sndrcvinfo *) CMSG_DATA(hdr);
+			#ifdef DEBUG_SCTP
+			TRACE_DEBUG(FULL, "Anciliary block IPPROTO_SCTP / SCTP_SNDRCV");
+			TRACE_DEBUG(FULL, "    sinfo_stream    : %hu", sndrcv->sinfo_stream);
+			TRACE_DEBUG(FULL, "    sinfo_ssn       : %hu", sndrcv->sinfo_ssn);
+			TRACE_DEBUG(FULL, "    sinfo_flags     : %hu", sndrcv->sinfo_flags);
+			/* TRACE_DEBUG(FULL, "    sinfo_pr_policy : %hu", sndrcv->sinfo_pr_policy); */
+			TRACE_DEBUG(FULL, "    sinfo_ppid      : %u" , sndrcv->sinfo_ppid);
+			TRACE_DEBUG(FULL, "    sinfo_context   : %u" , sndrcv->sinfo_context);
+			/* TRACE_DEBUG(FULL, "    sinfo_pr_value  : %u" , sndrcv->sinfo_pr_value); */
+			TRACE_DEBUG(FULL, "    sinfo_tsn       : %u" , sndrcv->sinfo_tsn);
+			TRACE_DEBUG(FULL, "    sinfo_cumtsn    : %u" , sndrcv->sinfo_cumtsn);
+			#endif /* DEBUG_SCTP */
+
+			*strid = sndrcv->sinfo_stream;
+		}
+	}
+	
+	return 0;
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/freeDiameter/sctps.c	Mon Oct 26 16:00:49 2009 +0900
@@ -0,0 +1,585 @@
+/*********************************************************************************************************
+* Software License Agreement (BSD License)                                                               *
+* Author: Sebastien Decugis <sdecugis@nict.go.jp>							 *
+*													 *
+* Copyright (c) 2009, WIDE Project and NICT								 *
+* All rights reserved.											 *
+* 													 *
+* Redistribution and use of this software in source and binary forms, with or without modification, are  *
+* permitted provided that the following conditions are met:						 *
+* 													 *
+* * Redistributions of source code must retain the above 						 *
+*   copyright notice, this list of conditions and the 							 *
+*   following disclaimer.										 *
+*    													 *
+* * Redistributions in binary form must reproduce the above 						 *
+*   copyright notice, this list of conditions and the 							 *
+*   following disclaimer in the documentation and/or other						 *
+*   materials provided with the distribution.								 *
+* 													 *
+* * Neither the name of the WIDE Project or NICT nor the 						 *
+*   names of its contributors may be used to endorse or 						 *
+*   promote products derived from this software without 						 *
+*   specific prior written permission of WIDE Project and 						 *
+*   NICT.												 *
+* 													 *
+* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
+* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
+* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
+* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 	 *
+* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 	 *
+* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
+* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF   *
+* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.								 *
+*********************************************************************************************************/
+
+/* This file contains code for TLS over multi-stream SCTP wrapper implementation (GnuTLS does not support this) */
+/* See http://aaa.koganei.wide.ad.jp/blogs/index.php/waaad/2008/08/18/tls-over-sctp for history */
+
+#include "fD.h"
+#include "cnxctx.h"
+
+#include <netinet/sctp.h>
+#include <sys/uio.h>
+
+/*
+
+Architecture of this wrapper:
+ - we have several fifo queues (1 per stream pairs).
+ GnuTLS is configured to use custom push / pull functions:
+ - the pull function retrieves the data from the fifo queue corresponding to a stream #.
+ - the push function sends the data on a certain stream.
+ We also have a demux thread that reads the socket and store received data in the appropriate fifo
+ 
+ We have one gnutls_session per stream pair, and as many streams that read the gnutls records and save incoming data to the target queue.
+ 
+This complexity is required because we cannot read a socket for a given stream only; we can only get the next message and find its stream.
+*/
+
+
+
+/*************************************************************/
+/*                      threads                              */
+/*************************************************************/
+
+/* Demux received data and store in the appropriate fifo */
+static void * demuxer(void * arg)
+{
+	struct cnxctx * conn = arg;
+	uint8_t * buf;
+	size_t    bufsz;
+	int	  event;
+	uint16_t  strid;
+	
+	TRACE_ENTRY("%p", arg);
+	
+	CHECK_PARAMS_DO(conn && (conn->cc_socket > 0), goto out);
+	ASSERT( conn->cc_proto == IPPROTO_SCTP );
+	ASSERT( conn->cc_tls == 1 );
+	ASSERT( Target_Queue(conn) );
+	ASSERT( conn->cc_sctps_data.array );
+	
+	do {
+		CHECK_FCT_DO( fd_sctp_recvmeta(conn->cc_socket, &strid, &buf, &bufsz, &event), goto error );
+		switch (event) {
+			case FDEVP_CNX_MSG_RECV:
+				/* Demux this message in the appropriate fifo, another thread will pull, gnutls process, and send in target queue */
+				CHECK_FCT_DO(fd_event_send(conn->cc_sctps_data.array[strid].raw_recv, event, bufsz, buf), goto error );
+				break;
+				
+			case FDEVP_CNX_EP_CHANGE:
+				/* Send this event to the target queue */
+				CHECK_FCT_DO( fd_event_send( Target_Queue(conn), event, bufsz, buf), goto error );
+				break;
+			
+			case FDEVP_CNX_ERROR:
+			default:
+				goto error;
+		}
+		
+	} while (conn->cc_loop);
+	
+out:
+	TRACE_DEBUG(FULL, "Thread terminated");	
+	return NULL;
+error:
+	CHECK_FCT_DO( fd_event_send( Target_Queue(conn), FDEVP_CNX_ERROR, 0, NULL), /* continue or destroy everything? */);
+	goto out;
+}
+
+/* Decrypt the data received in this stream pair and store it in the target queue */
+static void * decipher(void * arg)
+{
+	struct sctps_ctx * ctx = arg;
+	struct cnxctx 	 *cnx;
+	
+	TRACE_ENTRY("%p", arg);
+	
+	CHECK_PARAMS_DO(ctx && ctx->raw_recv && ctx->parent, goto error);
+	cnx = ctx->parent;
+	ASSERT( Target_Queue(cnx) );
+	
+	CHECK_FCT_DO(fd_tls_rcvthr_core(cnx, ctx->strid ? ctx->session : cnx->cc_tls_para.session), /* continue */);
+error:
+	CHECK_FCT_DO( fd_event_send( Target_Queue(cnx), FDEVP_CNX_ERROR, 0, NULL), /* continue or destroy everything? */);
+	TRACE_DEBUG(FULL, "Thread terminated");	
+	return NULL;
+}
+
+/*************************************************************/
+/*                     push / pull                           */
+/*************************************************************/
+
+/* Send data over the connection, called by gnutls */
+static ssize_t sctps_push(gnutls_transport_ptr_t tr, const void * data, size_t len)
+{
+	struct sctps_ctx * ctx = (struct sctps_ctx *) tr;
+	
+	TRACE_ENTRY("%p %p %g", tr, data, len);
+	CHECK_PARAMS_DO( tr && data, { errno = EINVAL; return -1; } );
+	
+	CHECK_FCT_DO( fd_sctp_sendstr(ctx->parent->cc_socket, ctx->strid, (uint8_t *)data, len), /* errno is already set */ return -1 );
+	
+	return len;
+}
+
+/* Retrieve data received on a stream and already demultiplexed */
+static ssize_t sctps_pull(gnutls_transport_ptr_t tr, void * buf, size_t len)
+{
+	struct sctps_ctx * ctx = (struct sctps_ctx *) tr;
+	size_t pulled = 0;
+	int emptied;
+	
+	TRACE_ENTRY("%p %p %g", tr, buf, len);
+	CHECK_PARAMS_DO( tr && buf, { errno = EINVAL; return -1; } );
+	
+	/* If we don't have data available now, pull new message from the fifo -- this is blocking */
+	if (!ctx->partial.buf) {
+		int ev;
+		CHECK_FCT_DO( errno = fd_event_get(ctx->raw_recv, &ev, &ctx->partial.bufsz, (void *)&ctx->partial.buf), return -1 );
+		ASSERT( ev == FDEVP_CNX_MSG_RECV );
+	}
+		
+	pulled = ctx->partial.bufsz - ctx->partial.offset;
+	if (pulled <= len) {
+		emptied = 1;
+	} else {
+		/* limit to the capacity of destination buffer */
+		emptied = 0;
+		pulled = len;
+	}
+
+	/* Store the data in the destination buffer */
+	memcpy(buf, ctx->partial.buf + ctx->partial.offset, pulled);
+
+	/* Free the buffer if we read all its content, and reset the partial structure */
+	if (emptied) {
+		free(ctx->partial.buf);
+		memset(&ctx->partial, 0, sizeof(ctx->partial));
+	} else {
+		ctx->partial.offset += pulled;
+	}
+
+	/* We are done */
+	return pulled;
+}
+
+/* Set the parameters of a session to use the appropriate fifo and stream information */
+static void set_sess_transport(gnutls_session_t session, struct sctps_ctx *ctx)
+{
+	/* Set the transport pointer passed to push & pull callbacks */
+	gnutls_transport_set_ptr( session, (gnutls_transport_ptr_t) ctx );
+	
+	/* Reset the low water value, since we don't use sockets */
+	gnutls_transport_set_lowat( session, 0 );
+	
+	/* Set the push and pull callbacks */
+	gnutls_transport_set_pull_function(session, sctps_pull);
+	gnutls_transport_set_push_function(session, sctps_push);
+
+	return;
+}
+
+/*************************************************************/
+/*               Session resuming support                    */
+/*************************************************************/
+
+struct sr_store {
+	struct fd_list	 list;	/* list of sr_data, ordered by key.size then key.data */
+	pthread_rwlock_t lock;
+	struct cnxctx   *parent;
+};
+
+/* Saved master session data for resuming sessions */
+struct sr_data {
+	struct fd_list	chain;
+	gnutls_datum_t	key;
+	gnutls_datum_t 	data;
+};
+
+/* The level at which we debug session resuming */
+#define SR_LEVEL FULL
+
+/* Initialize the store area for a connection */
+static int store_init(struct cnxctx * conn)
+{
+	TRACE_ENTRY("%p", conn);
+	CHECK_PARAMS( conn && !conn->cc_sctps_data.sess_store );
+	
+	CHECK_MALLOC( conn->cc_sctps_data.sess_store = malloc(sizeof(struct sr_store)) );
+	memset(conn->cc_sctps_data.sess_store, 0, sizeof(struct sr_store));
+	
+	fd_list_init(&conn->cc_sctps_data.sess_store->list, NULL);
+	CHECK_POSIX( pthread_rwlock_init(&conn->cc_sctps_data.sess_store->lock, NULL) );
+	conn->cc_sctps_data.sess_store->parent = conn;
+	
+	return 0;
+}
+
+/* Destroy the store area for a connection, and all its content */
+static void store_destroy(struct cnxctx * conn)
+{
+	/* Del all list entries */
+	TRACE_ENTRY("%p", conn);
+	CHECK_PARAMS_DO( conn, return );
+	
+	if (!conn->cc_sctps_data.sess_store)
+		return;
+	
+	CHECK_POSIX_DO( pthread_rwlock_destroy(&conn->cc_sctps_data.sess_store->lock), /* continue */ );
+	
+	while (!FD_IS_LIST_EMPTY(&conn->cc_sctps_data.sess_store->list)) {
+		struct sr_data * sr = (struct sr_data *) conn->cc_sctps_data.sess_store->list.next;
+		fd_list_unlink( &sr->chain );
+		free(sr->key.data);
+		free(sr->data.data);
+		free(sr);
+	}
+	
+	free(conn->cc_sctps_data.sess_store);
+	conn->cc_sctps_data.sess_store = NULL;
+	return;
+}
+
+/* Search the position (or next if not found) of a key in a store */
+static struct fd_list * find_or_next(struct sr_store * sto, gnutls_datum_t key, int * match)
+{
+	struct fd_list * ret;
+	*match = 0;
+	
+	for (ret = sto->list.next; ret != &sto->list; ret = ret->next) {
+		int cmp = 0;
+		struct sr_data * sr = (struct sr_data *)ret;
+		
+		if ( key.size < sr->key.size )
+			break;
+		
+		if ( key.size > sr->key.size )
+			continue;
+		
+		/* Key sizes are equal */
+		cmp = memcmp( key.data, sr->key.data, key.size );
+		
+		if (cmp > 0)
+			continue;
+		
+		if (cmp == 0)
+			*match = 1;
+		
+		break;
+	}
+	
+	return ret;
+}
+
+/* Callbacks for the TLS server side of the connection, called during gnutls_handshake */
+static int sr_store (void *dbf, gnutls_datum_t key, gnutls_datum_t data)
+{
+	struct sr_store * sto = (struct sr_store *)dbf;
+	struct fd_list * li;
+	struct sr_data * sr;
+	int match = 0;
+	int ret = 0;
+	
+	CHECK_PARAMS_DO( sto && key.data && data.data, return -1 );
+	TRACE_DEBUG_BUFFER(SR_LEVEL, "Session store [key ", key.data, key.size < 16 ? key.size : 16, "]");
+	
+	CHECK_POSIX_DO( pthread_rwlock_wrlock(&sto->lock), return -1 );
+	
+	li = find_or_next(sto, key, &match);
+	if (match) {
+		sr = (struct sr_data *)li;
+		
+		/* Check the data is the same */
+		if ((data.size != sr->data.size) || memcmp(data.data, sr->data.data, data.size)) {
+			TRACE_DEBUG(INFO, "GnuTLS tried to store a session with same key and different data!");
+			ret = -1;
+		}
+		goto out;
+	}
+	
+	/* Create a new entry */
+	CHECK_MALLOC_DO( sr = malloc(sizeof(struct sr_data)), { ret = -1; goto out; } );
+	memset(sr, 0, sizeof(struct sr_data));
+
+	fd_list_init(&sr->chain, sr);
+
+	CHECK_MALLOC_DO( sr->key.data = malloc(key.size), { ret = -1; goto out; } );
+	sr->key.size = key.size;
+	memcpy(sr->key.data, key.data, key.size);
+
+	CHECK_MALLOC_DO( sr->data.data = malloc(data.size), { ret = -1; goto out; } );
+	sr->data.size = data.size;
+	memcpy(sr->data.data, data.data, data.size);
+	
+	/* Save this new entry in the list, we are done */
+	fd_list_insert_before(li, &sr->chain);
+
+out:	
+	CHECK_POSIX_DO( pthread_rwlock_unlock(&sto->lock), return -1 );
+	return ret;
+}
+
+static int sr_remove (void *dbf, gnutls_datum_t key)
+{
+	struct sr_store * sto = (struct sr_store *)dbf;
+	struct fd_list * li;
+	struct sr_data * sr;
+	int match = 0;
+	int ret = 0;
+	
+	CHECK_PARAMS_DO( sto && key.data, return -1 );
+	TRACE_DEBUG_BUFFER(SR_LEVEL, "Session delete [key ", key.data, key.size < 16 ? key.size : 16, "]");
+	
+	CHECK_POSIX_DO( pthread_rwlock_wrlock(&sto->lock), return -1 );
+	
+	li = find_or_next(sto, key, &match);
+	if (match) {
+		sr = (struct sr_data *)li;
+		
+		/* Destroy this data */
+		fd_list_unlink(li);
+		free(sr->key.data);
+		free(sr->data.data);
+		free(sr);
+	} else {
+		/* It was not found */
+		ret = -1;
+	}
+	
+	CHECK_POSIX_DO( pthread_rwlock_unlock(&sto->lock), return -1 );
+	return ret;
+}
+
+static gnutls_datum_t sr_fetch (void *dbf, gnutls_datum_t key)
+{
+	struct sr_store * sto = (struct sr_store *)dbf;
+	struct fd_list * li;
+	struct sr_data * sr;
+	int match = 0;
+	gnutls_datum_t res = { NULL, 0 };
+	gnutls_datum_t error = { NULL, 0 };
+
+	CHECK_PARAMS_DO( sto && key.data, return error );
+	TRACE_DEBUG_BUFFER(SR_LEVEL, "Session fetch [key ", key.data, key.size < 16 ? key.size : 16, "]");
+
+	CHECK_POSIX_DO( pthread_rwlock_rdlock(&sto->lock), return error );
+	
+	li = find_or_next(sto, key, &match);
+	if (match) {
+		sr = (struct sr_data *)li;
+		CHECK_MALLOC_DO(res.data = gnutls_malloc(sr->data.size), goto out );
+		res.size = sr->data.size;
+		memcpy(res.data, sr->data.data, res.size);
+	}
+out:	
+	CHECK_POSIX_DO( pthread_rwlock_unlock(&sto->lock), return error);
+	return res;
+}
+
+/* Set the session pointer in a session object */
+static void set_resume_callbacks(gnutls_session_t session, struct cnxctx * conn)
+{
+	TRACE_ENTRY("%p", conn);
+	
+	gnutls_db_set_retrieve_function(session, sr_fetch);
+	gnutls_db_set_remove_function  (session, sr_remove);
+	gnutls_db_set_store_function   (session, sr_store);
+	gnutls_db_set_ptr              (session, conn->cc_sctps_data.sess_store);
+	
+	return;
+}
+
+/* The handshake is made in parallel in several threads to speed up */
+static void * handshake_resume_th(void * arg)
+{
+	struct sctps_ctx * ctx = (struct sctps_ctx *) arg;
+	TRACE_ENTRY("%p", arg);
+	
+	TRACE_DEBUG(FULL, "Starting TLS resumed handshake on stream %hu", ctx->strid);
+	CHECK_GNUTLS_DO( gnutls_handshake( ctx->session ), return NULL);
+			
+	/* We can trace success of resuming handshake by using gnutls_session_is_resumed */
+			
+	/* Finish */
+	return arg;
+}
+
+
+/*************************************************************/
+/*                     Exported functions                    */
+/*************************************************************/
+
+/* Initialize the wrapper for the connection */
+int fd_sctps_init(struct cnxctx * conn)
+{
+	uint16_t i;
+	
+	TRACE_ENTRY("%p", conn);
+	CHECK_PARAMS( conn && (conn->cc_sctp_para.pairs > 1) && (!conn->cc_sctps_data.array) );
+	
+	/* First, alloc the array and initialize the non-TLS data */
+	CHECK_MALLOC( conn->cc_sctps_data.array = calloc(conn->cc_sctp_para.pairs, sizeof(struct sctps_ctx))  );
+	for (i = 0; i < conn->cc_sctp_para.pairs; i++) {
+		conn->cc_sctps_data.array[i].parent = conn;
+		conn->cc_sctps_data.array[i].strid  = i;
+		CHECK_FCT( fd_fifo_new(&conn->cc_sctps_data.array[i].raw_recv) );
+	}
+	
+	/* Set push/pull functions in the master session, using fifo in array[0] */
+	set_sess_transport(conn->cc_tls_para.session, &conn->cc_sctps_data.array[0]);
+	
+	/* For server side, we also initialize the resuming capabilities */
+	if (conn->cc_tls_para.mode == GNUTLS_SERVER) {
+		
+		/* Prepare the store for sessions data */
+		CHECK_FCT( store_init(conn) );
+		
+		/* Set the callbacks for resuming in the master session */
+		set_resume_callbacks(conn->cc_tls_para.session, conn);
+	}
+
+	/* Start the demux thread */
+	CHECK_POSIX( pthread_create( &conn->cc_rcvthr, NULL, demuxer, conn ) );
+	
+	return 0;
+}
+
+/* Handshake other streams, after full handshake on the master session */
+int fd_sctps_handshake_others(struct cnxctx * conn, char * priority)
+{
+	uint16_t i;
+	int errors = 0;
+	gnutls_datum_t 	master_data;
+	
+	TRACE_ENTRY("%p %p", conn, priority);
+	CHECK_PARAMS( conn && (conn->cc_sctp_para.pairs > 1) && conn->cc_sctps_data.array );
+
+	/* Server side: we set all the parameters, the resume callback will take care of resuming the session */
+	/* Client side: we duplicate the parameters of the master session, then set the transport pointer */
+	
+	/* For client side, retrieve the master session parameters */
+	if (conn->cc_tls_para.mode == GNUTLS_CLIENT) {
+		CHECK_GNUTLS_DO( gnutls_session_get_data2(conn->cc_tls_para.session, &master_data), return ENOMEM );
+	}
+	
+	/* Initialize the session objects and start the handshake in a separate thread */
+	for (i = 1; i < conn->cc_sctp_para.pairs; i++) {
+		/* Set credentials and priority */
+		CHECK_FCT( fd_tls_prepare(&conn->cc_sctps_data.array[i].session, conn->cc_tls_para.mode, priority) );
+		
+		/* For the client, copy data from master session; for the server, set session resuming pointers */
+		if (conn->cc_tls_para.mode == GNUTLS_CLIENT) {
+			CHECK_GNUTLS_DO( gnutls_session_set_data(conn->cc_sctps_data.array[i].session, master_data.data, master_data.size), return ENOMEM );
+		} else {
+			set_resume_callbacks(conn->cc_sctps_data.array[i].session, conn);
+		}
+		
+		/* Set transport parameters */
+		set_sess_transport(conn->cc_sctps_data.array[i].session, &conn->cc_sctps_data.array[i]);
+		
+		/* Start the handshake thread */
+		CHECK_POSIX( pthread_create( &conn->cc_sctps_data.array[i].thr, NULL, handshake_resume_th, &conn->cc_sctps_data.array[i] ) );
+	}
+	
+	/* We can now release the memory of master session data if any */
+	if (conn->cc_tls_para.mode == GNUTLS_CLIENT) {
+		gnutls_free(master_data.data);
+	}
+	
+	/* Now wait for all handshakes to finish */
+	for (i = 1; i < conn->cc_sctp_para.pairs; i++) {
+		void * ret;
+		CHECK_POSIX( pthread_join(conn->cc_sctps_data.array[i].thr, &ret) );
+		if (ret == NULL) {
+			errors++; /* Handshake failed on this stream */
+		}
+	}
+	
+	return errors ? ENOTCONN : 0;
+}
+
+/* Receive messages from all stream pairs */
+int fd_sctps_startthreads(struct cnxctx * conn)
+{
+	uint16_t i;
+	
+	TRACE_ENTRY("%p", conn);
+	CHECK_PARAMS( conn && conn->cc_sctps_data.array );
+	
+	for (i = 0; i < conn->cc_sctp_para.pairs; i++) {
+		
+		/* Start the decipher thread */
+		CHECK_POSIX( pthread_create( &conn->cc_sctps_data.array[i].thr, NULL, decipher, &conn->cc_sctps_data.array[i] ) );
+	}
+	return 0;
+}
+
+/* Stop all receiver threads */
+void fd_sctps_stopthreads(struct cnxctx * conn)
+{
+	uint16_t i;
+	
+	TRACE_ENTRY("%p", conn);
+	CHECK_PARAMS_DO( conn && conn->cc_sctps_data.array, return );
+	
+	for (i = 0; i < conn->cc_sctp_para.pairs; i++) {
+		CHECK_FCT_DO( fd_thr_term(&conn->cc_sctps_data.array[i].thr), /* continue */ );
+	}
+	return;
+}
+
+/* Destroy a wrapper context */
+void fd_sctps_destroy(struct cnxctx * conn)
+{
+	uint16_t i;
+	
+	CHECK_PARAMS_DO( conn && conn->cc_sctps_data.array, return );
+	
+	/* Terminate all receiving threads in case we did not do it yet */
+	fd_sctps_stopthreads(conn);
+	
+	/* End all TLS sessions -- maybe we should do it in parallel ? */
+	for (i = 0; i < conn->cc_sctp_para.pairs; i++) {
+		CHECK_GNUTLS_DO( gnutls_bye(conn->cc_sctps_data.array[i].session, GNUTLS_SHUT_RDWR), /* Continue */ );
+	}
+	
+	/* Now, stop the demux thread */
+	CHECK_FCT_DO( fd_thr_term(&conn->cc_rcvthr), /* continue */ );
+	
+	/* Free remaining data in the array */
+	for (i = 0; i < conn->cc_sctp_para.pairs; i++) {
+		fd_event_destroy( &conn->cc_sctps_data.array[i].raw_recv, free );
+		free(conn->cc_sctps_data.array[i].partial.buf);
+		gnutls_deinit(conn->cc_sctps_data.array[i].session);
+	}
+	
+	/* Free the array itself now */
+	free(conn->cc_sctps_data.array);
+	conn->cc_sctps_data.array = NULL;
+	
+	/* Delete the store of sessions */
+	store_destroy(conn);
+	
+	return ;
+}
--- a/freeDiameter/server.c	Wed Oct 21 18:42:45 2009 +0900
+++ b/freeDiameter/server.c	Mon Oct 26 16:00:49 2009 +0900
@@ -119,7 +119,7 @@
 			goto cleanup;
 		}
 	} else {
-		CHECK_FCT_DO( fd_cnx_start_clear(c->conn), goto cleanup );
+		CHECK_FCT_DO( fd_cnx_start_clear(c->conn, 0), goto cleanup );
 	}
 	
 	/* Set the timeout to receive the first message */
@@ -134,6 +134,7 @@
 	TODO("Search matching peer");
 	TODO("Send event to the peer");
 	
+	TODO("(later) handshake or start_clear(.., 1)");
 	/* The end */
 cleanup:
 	/* Unlink the client structure */
@@ -151,7 +152,7 @@
 	return NULL;
 	
 fatal_error:	/* This has effect to terminate the daemon */
-	CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, NULL), );
+	CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), );
 	return NULL;
 }
 
@@ -197,7 +198,7 @@
 		s->status = 2;
 	/* Send error signal to the daemon */
 	TRACE_DEBUG(INFO, "An error occurred in server module! Thread is terminating...");
-	CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, NULL), );
+	CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), );
 
 	return NULL;
 }
--- a/freeDiameter/tcp.c	Wed Oct 21 18:42:45 2009 +0900
+++ b/freeDiameter/tcp.c	Mon Oct 26 16:00:49 2009 +0900
@@ -34,6 +34,8 @@
 *********************************************************************************************************/
 
 #include "fD.h"
+#include "cnxctx.h"
+
 #include <netinet/tcp.h>
 #include <netinet/ip6.h>
 #include <sys/socket.h>
--- a/include/freeDiameter/freeDiameter.h	Wed Oct 21 18:42:45 2009 +0900
+++ b/include/freeDiameter/freeDiameter.h	Mon Oct 26 16:00:49 2009 +0900
@@ -151,33 +151,13 @@
 /* Events */
 struct fd_event {
 	int	 code; /* codespace depends on the queue */
+	size_t 	 size;
 	void    *data;
 };
 
-static __inline__ int fd_event_send(struct fifo *queue, int code, void * data)
-{
-	struct fd_event * ev;
-	CHECK_MALLOC( ev = malloc(sizeof(struct fd_event)) );
-	ev->code = code;
-	ev->data = data;
-	CHECK_FCT( fd_fifo_post(queue, &ev) );
-	return 0;
-}
-static __inline__ int fd_event_get(struct fifo *queue, int *code, void ** data)
-{
-	struct fd_event * ev;
-	CHECK_FCT( fd_fifo_get(queue, &ev) );
-	if (code)
-		*code = ev->code;
-	if (data)
-		*data = ev->data;
-	free(ev);
-	return 0;
-}
-
-/* Events codespace for fd_g_config->cnf_main_ev */
+/* Daemon's codespace: 1000->1999 */
 enum {
-	 FDEV_TERMINATE = 1000	/* request to terminate */
+	 FDEV_TERMINATE	= 1000	/* request to terminate */
 	,FDEV_DUMP_DICT		/* Dump the content of the dictionary */
 	,FDEV_DUMP_EXT		/* Dump state of extensions */
 	,FDEV_DUMP_SERV		/* Dump the server socket status */
@@ -185,6 +165,65 @@
 	,FDEV_DUMP_CONFIG	/* Dump the configuration */
 	,FDEV_DUMP_PEERS	/* Dump the list of peers */
 };
+
+static __inline__ int fd_event_send(struct fifo *queue, int code, size_t datasz, void * data)
+{
+	struct fd_event * ev;
+	CHECK_MALLOC( ev = malloc(sizeof(struct fd_event)) );
+	ev->code = code;
+	ev->size = datasz;
+	ev->data = data;
+	CHECK_FCT( fd_fifo_post(queue, &ev) );
+	return 0;
+}
+static __inline__ int fd_event_get(struct fifo *queue, int *code, size_t *datasz, void ** data)
+{
+	struct fd_event * ev;
+	CHECK_FCT( fd_fifo_get(queue, &ev) );
+	if (code)
+		*code = ev->code;
+	if (datasz)
+		*datasz = ev->size;
+	if (data)
+		*data = ev->data;
+	free(ev);
+	return 0;
+}
+static __inline__ int fd_event_timedget(struct fifo *queue, struct timespec * timeout, int timeoutcode, int *code, size_t *datasz, void ** data)
+{
+	struct fd_event * ev;
+	int ret = 0;
+	ret = fd_fifo_timedget(queue, &ev, timeout);
+	if (ret == ETIMEDOUT) {
+		if (code)
+			*code = timeoutcode;
+		if (datasz)
+			*datasz = 0;
+		if (data)
+			*data = NULL;
+	} else {
+		CHECK_FCT( ret );
+		if (code)
+			*code = ev->code;
+		if (datasz)
+			*datasz = ev->size;
+		if (data)
+			*data = ev->data;
+		free(ev);
+	}
+	return 0;
+}
+static __inline__ void fd_event_destroy(struct fifo **queue, void (*free_cb)(void * data))
+{
+	struct fd_event * ev;
+	/* Purge all events, and free the associated data if any */
+	while (fd_fifo_tryget( *queue, &ev ) == 0) {
+		(*free_cb)(ev->data);
+		free(ev);
+	}
+	CHECK_FCT_DO( fd_fifo_del(queue), /* continue */ );
+	return ;
+}  
 const char * fd_ev_str(int event); /* defined in freeDiameter/main.c */
 
 
--- a/include/freeDiameter/libfreeDiameter.h	Wed Oct 21 18:42:45 2009 +0900
+++ b/include/freeDiameter/libfreeDiameter.h	Mon Oct 26 16:00:49 2009 +0900
@@ -426,6 +426,26 @@
 	  || (((ts1)->tv_sec  == (ts2)->tv_sec ) && ((ts1)->tv_nsec < (ts2)->tv_nsec) ))
 
 
+/* Trace a binary buffer content */
+#define TRACE_DEBUG_BUFFER(level, prefix, buf, bufsz, suffix ) {								\
+	if ( TRACE_BOOL(level) ) {												\
+		int __i;													\
+		size_t __sz = (size_t)(bufsz);											\
+		uint8_t * __buf = (uint8_t *)(buf);										\
+		char * __thn = ((char *)pthread_getspecific(fd_log_thname) ?: "unnamed");					\
+		fd_log_debug("\t | tid:%-20s\t%s\tin %s@%s:%d\n"								\
+			  "\t%s|%*s" prefix ,  											\
+					__thn, fd_log_time(NULL, __buf, sizeof(__buf)), __PRETTY_FUNCTION__, __FILE__, __LINE__,\
+					(level < FULL)?"@":" ",level, ""); 							\
+		for (__i = 0; __i < __sz; __i++) {										\
+			fd_log_debug("%02.2hhx", __buf[__i]);									\
+		}														\
+		fd_log_debug(suffix "\n");											\
+	}															\
+}
+
+
+
 /*============================================================*/
 /*                          THREADS                           */
 /*============================================================*/
@@ -508,6 +528,9 @@
 void fd_list_insert_after  ( struct fd_list * ref, struct fd_list * item );
 void fd_list_insert_before ( struct fd_list * ref, struct fd_list * item );
 
+/* Move a list at the end of another */
+void fd_list_move_end(struct fd_list * ref, struct fd_list * senti);
+
 /* Insert an item in an ordered list -- ordering function provided. If duplicate object found, EEXIST and it is returned in ref_duplicate */
 int fd_list_insert_ordered( struct fd_list * head, struct fd_list * item, int (*cmp_fct)(void *, void *), void ** ref_duplicate);
 
@@ -2384,6 +2407,23 @@
 int fd_fifo_del ( struct fifo  ** queue );
 
 /*
+ * FUNCTION:	fd_fifo_move
+ *
+ * PARAMETERS:
+ *  old		: Location of a FIFO that is to be emptied and deleted.
+ *  new		: A FIFO that will receive the old data.
+ *  loc_update	: if non NULL, a place to store the pointer to new FIFO atomically with the move.
+ *
+ * DESCRIPTION: 
+ *  Delete a queue and move its content to another one atomically.
+ *
+ * RETURN VALUE:
+ *  0		: The queue has been destroyed successfully.
+ *  EINVAL 	: A parameter is invalid.
+ */
+int fd_fifo_move ( struct fifo ** old, struct fifo * new, struct fifo ** loc_update );
+
+/*
  * FUNCTION:	fd_fifo_length
  *
  * PARAMETERS:
--- a/libfreeDiameter/fifo.c	Wed Oct 21 18:42:45 2009 +0900
+++ b/libfreeDiameter/fifo.c	Mon Oct 26 16:00:49 2009 +0900
@@ -149,15 +149,15 @@
 	
 	CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  );
 	
-	/* Ok, now invalidate the queue */
-	q->eyec = 0xdead;
-	
 	if ((q->count != 0) || (q->data != NULL)) {
 		TRACE_DEBUG(INFO, "The queue cannot be destroyed (%d, %p)", q->count, q->data);
 		CHECK_POSIX_DO(  pthread_mutex_unlock( &q->mtx ), /* no fallback */  );
 		return EINVAL;
 	}
 	
+	/* Ok, now invalidate the queue */
+	q->eyec = 0xdead;
+	
 	while (q->thrs) {
 		CHECK_POSIX(  pthread_cond_signal(&q->cond)  );
 		CHECK_POSIX(  pthread_mutex_unlock( &q->mtx ));
@@ -182,6 +182,59 @@
 	return 0;
 }
 
+/* Move the content of old into new, and update loc_update atomically */
+int fd_fifo_move ( struct fifo ** old, struct fifo * new, struct fifo ** loc_update )
+{
+	struct fifo * q;
+	int loops = 0;
+	
+	TRACE_ENTRY("%p %p %p", old, new, loc_update);
+	CHECK_PARAMS( old && CHECK_FIFO( *old ) && CHECK_FIFO( new ));
+	
+	q = *old;
+	CHECK_PARAMS( ! q->data );
+	if (new->high) {
+		TODO("Implement support for thresholds in fd_fifo_move...");
+	}
+	
+	/* Update loc_update */
+	*old = NULL;
+	if (loc_update)
+		*loc_update = new;
+	
+	/* Lock the queues */
+	CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  );
+	CHECK_POSIX(  pthread_mutex_lock( &new->mtx )  );
+	
+	/* Any waiting thread on the old queue returns an error */
+	q->eyec = 0xdead;
+	while (q->thrs) {
+		CHECK_POSIX(  pthread_cond_signal(&q->cond)  );
+		CHECK_POSIX(  pthread_mutex_unlock( &q->mtx ));
+		pthread_yield();
+		CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  );
+		ASSERT( ++loops < 10 ); /* detect infinite loops */
+	}
+	
+	/* Move all data from old to new */
+	fd_list_move_end( &new->list, &q->list );
+	if (q->count && (!new->count)) {
+		CHECK_POSIX(  pthread_cond_signal(&new->cond)  );
+	}
+	new->count += q->count;
+	
+	/* Destroy old */
+	CHECK_POSIX(  pthread_mutex_unlock( &q->mtx )  );
+	CHECK_POSIX(  pthread_cond_destroy( &q->cond )  );
+	CHECK_POSIX(  pthread_mutex_destroy( &q->mtx )  );
+	free(q);
+	
+	/* Unlock new, we're done */
+	CHECK_POSIX(  pthread_mutex_unlock( &new->mtx )  );
+	
+	return 0;
+}
+
 /* Get the length of the queue */
 int fd_fifo_length ( struct fifo * queue, int * length )
 {
--- a/libfreeDiameter/lists.c	Wed Oct 21 18:42:45 2009 +0900
+++ b/libfreeDiameter/lists.c	Mon Oct 26 16:00:49 2009 +0900
@@ -71,6 +71,24 @@
 	list_insert_after(ref, item);
 }
 
+/* Move all elements of list senti at the end of list ref */
+void fd_list_move_end(struct fd_list * ref, struct fd_list * senti)
+{
+	ASSERT(ref->head == ref);
+	ASSERT(senti->head == senti);
+	
+	if (senti->next == senti)
+		return;
+	
+	senti->next->prev = ref->prev;
+	ref->prev->next   = senti->next;
+	senti->prev->next = ref;
+	ref->prev         = senti->prev;
+	senti->prev = senti;
+	senti->next = senti;
+	
+}
+
 /* insert before a reference, checks done */
 static void list_insert_before ( struct fd_list * ref, struct fd_list * item )
 {
"Welcome to our mercurial repository"