changeset 1217:1e8267ad057c

Implemented early version of sctp_dtls.c, for debug
author Sebastien Decugis <sdecugis@freediameter.net>
date Tue, 18 Jun 2013 12:46:01 +0800
parents 76ac4bb75f0e
children 876cb3e4d738
files libfdcore/cnxctx.c libfdcore/cnxctx.h libfdcore/sctp3436.c libfdcore/sctp_dtls.c
diffstat 4 files changed, 530 insertions(+), 142 deletions(-) [+]
line wrap: on
line diff
--- a/libfdcore/cnxctx.c	Mon Jun 17 10:11:57 2013 +0800
+++ b/libfdcore/cnxctx.c	Tue Jun 18 12:46:01 2013 +0800
@@ -88,6 +88,7 @@
 
 	CHECK_MALLOC_DO( conn = malloc(sizeof(struct cnxctx)), return NULL );
 	memset(conn, 0, sizeof(struct cnxctx));
+	fd_list_init(&conn->cc_sctp_dtls_data.chunks, NULL);
 
 	if (full) {
 		CHECK_FCT_DO( fd_fifo_new ( &conn->cc_incoming, 5 ), return NULL );
@@ -946,7 +947,7 @@
 
 
 /* Returns 0 on error, received data size otherwise (always >= 0). This is not used for DTLS-protected associations. */
-static ssize_t fd_tls_recv_handle_error(struct cnxctx * conn, gnutls_session_t session, void * data, size_t sz)
+ssize_t fd_tls_recv_handle_error(struct cnxctx * conn, gnutls_session_t session, void * data, size_t sz)
 {
 	ssize_t ret;
 again:	
@@ -976,6 +977,14 @@
 					TRACE_DEBUG(FULL, "Got 0 size while reading the socket, probably connection closed...");
 					break;
 				
+				case GNUTLS_E_WARNING_ALERT_RECEIVED:
+					LOG_N("Received TLS WARNING ALERT: %s", gnutls_alert_get_name(gnutls_alert_get(session)) ?: "<unknown alert>");
+					goto again;
+					
+				case GNUTLS_E_FATAL_ALERT_RECEIVED:
+					LOG_E("Received TLS FATAL ALERT: %s", gnutls_alert_get_name(gnutls_alert_get(session)) ?: "<unknown alert>");
+					break;
+					
 				default:
 					if (gnutls_error_is_fatal (ret) == 0) {
 						LOG_N("Ignoring non-fatal GNU TLS error: %s", gnutls_strerror (ret));
@@ -995,7 +1004,7 @@
 }
 
 /* Wrapper around gnutls_record_send to handle some error codes. This is also used for DTLS-protected associations */
-static ssize_t fd_tls_send_handle_error(struct cnxctx * conn, gnutls_session_t session, void * data, size_t sz)
+ssize_t fd_tls_send_handle_error(struct cnxctx * conn, gnutls_session_t session, void * data, size_t sz)
 {
 	ssize_t ret;
 	struct timespec ts, now;
@@ -1043,13 +1052,7 @@
 
 
 /* The function that receives TLS data and re-builds a Diameter message -- it exits only on error or cancelation */
-/* 	   For the case of DTLS, since we are not using SCTP_UNORDERED, the messages over a single stream are ordered.
-	   Furthermore, as long as messages are shorter than the MTU [2^14 = 16384 bytes], they are delivered in a single
-	   record, as far as I understand. 
-	   For larger messages, however, it is possible that pieces of messages coming from different streams can get interleaved. 
-	   As a result, we do not use the following function for DTLS reception, because we use the sequence number to rebuild the
-	   messages. */
-int fd_tls_rcvthr_core(struct cnxctx * conn, gnutls_session_t session)
+int fd_tls_rcvthr_core(struct cnxctx * conn, gnutls_session_t session, int dtls)
 {
 	/* No guarantee that GnuTLS preserves the message boundaries, so we re-build it as in TCP. */
 	do {
@@ -1060,7 +1063,10 @@
 		size_t	received = 0;
 
 		do {
-			ret = fd_tls_recv_handle_error(conn, session, &header[received], sizeof(header) - received);
+			if (!dtls)
+				ret = fd_tls_recv_handle_error(conn, session, &header[received], sizeof(header) - received);
+			else
+				ret = fd_dtls_recv_handle_error(conn, session, &header[received], sizeof(header) - received);
 			if (ret <= 0) {
 				/* The connection is closed */
 				goto out;
@@ -1085,7 +1091,10 @@
 
 		while (received < rcv_data.length) {
 			pthread_cleanup_push(free_rcvdata, &rcv_data); /* In case we are canceled, clean the partialy built buffer */
-			ret = fd_tls_recv_handle_error(conn, session, rcv_data.buffer + received, rcv_data.length - received);
+			if (!dtls)
+				ret = fd_tls_recv_handle_error(conn, session, rcv_data.buffer + received, rcv_data.length - received);
+			else
+				ret = fd_dtls_recv_handle_error(conn, session, rcv_data.buffer + received, rcv_data.length - received);
 			pthread_cleanup_pop(0);
 
 			if (ret <= 0) {
@@ -1130,8 +1139,8 @@
 	ASSERT( fd_cnx_target_queue(conn) );
 
 	/* The next function only returns when there is an error on the socket */	
-	CHECK_FCT_DO(fd_tls_rcvthr_core(conn, conn->cc_tls_para.session), /* continue */);
-
+	CHECK_FCT_DO(fd_tls_rcvthr_core(conn, conn->cc_tls_para.session, 0), /* continue */);
+	
 	TRACE_DEBUG(FULL, "Thread terminated");	
 	return NULL;
 }
@@ -1935,8 +1944,7 @@
 				}
 			} else {
 				/* DTLS */
-				/* Multistream is handled at lower layer in the push/pull function */
-				CHECK_FCT( send_simple(conn, buf, len) );
+				CHECK_FCT( fd_sctp_dtls_send(conn, buf, len) );
 			}
 		}
 		break;
--- a/libfdcore/cnxctx.h	Mon Jun 17 10:11:57 2013 +0800
+++ b/libfdcore/cnxctx.h	Tue Jun 18 12:46:01 2013 +0800
@@ -76,11 +76,20 @@
 		uint16_t str_out;	/* Out streams */
 		uint16_t str_in;	/* In streams */
 		uint16_t pairs;		/* max number of pairs ( = min(in, out)) */
-		uint16_t next;		/* # of stream the next message will be sent to */
+		uint16_t next;		/* # of the stream the next message will be sent to */
 		int	 unordered;	/* boolean telling if use of streams > 0 is permitted */
 	} 		cc_sctp_para;
+	
+	/* For DTLS over SCTP */
+	struct {
+		/* This structure will be changed if we implement a different algorithm to reassemble the messages */
+		uint8_t		nextseq[8]; /* the next sequence number we expect to be delivered to upper layer. Can be overwriten if a new epoch is received. */
+		uint8_t		validseq[8]; /* last seq number that was actually decrypted, so we can trust this value. */
+		struct fd_list	chunks;	/* store the chunks with greater seq numbers received on SCTP socket for delayed delivery. */
+		size_t		offset;	/* how much data of the current chunk has already been passed to gnutls */
+	}		cc_sctp_dtls_data;
 
-	/* If both conditions */
+	/* For TLS over SCTP */
 	struct {
 		struct sctp3436_ctx *array; /* an array of cc_sctp_para.pairs elements -- the #0 is special (session is outside)*/
 		struct sr_store	 *sess_store; /* Session data of the master session, to resume the children sessions */
@@ -100,11 +109,12 @@
 void fd_cnx_s_setto(int sock);
 
 /* TLS */
-int fd_tls_rcvthr_core(struct cnxctx * conn, gnutls_session_t session);
+int fd_tls_rcvthr_core(struct cnxctx * conn, gnutls_session_t session, int dtls);
 int fd_tls_prepare(gnutls_session_t * session, int mode, int dtls, char * priority, void * alt_creds);
 #ifndef GNUTLS_VERSION_300
 int fd_tls_verify_credentials(gnutls_session_t session, struct cnxctx * conn, int verbose);
 #endif /* GNUTLS_VERSION_300 */
+ssize_t fd_tls_send_handle_error(struct cnxctx * conn, gnutls_session_t session, void * data, size_t sz);
 
 /* TCP */
 int fd_tcp_create_bind_server( int * sock, sSA * sa, socklen_t salen );
@@ -127,6 +137,8 @@
 /* DTLS over SCTP */
 int fd_sctp_dtls_prepare(gnutls_session_t session);
 int fd_sctp_dtls_settransport(gnutls_session_t session, struct cnxctx * conn);
+int fd_sctp_dtls_send(struct cnxctx * conn, unsigned char * buf, size_t len);
+ssize_t fd_dtls_recv_handle_error(struct cnxctx * conn, gnutls_session_t session, void * data, size_t sz);
 void * fd_sctp_dtls_rcvthr(void * arg);
 
 /* TLS over SCTP (multi-stream) */
--- a/libfdcore/sctp3436.c	Mon Jun 17 10:11:57 2013 +0800
+++ b/libfdcore/sctp3436.c	Tue Jun 18 12:46:01 2013 +0800
@@ -152,7 +152,7 @@
 	}
 	
 	/* The next function loops while there is no error */
-	CHECK_FCT_DO(fd_tls_rcvthr_core(cnx, ctx->strid ? ctx->session : cnx->cc_tls_para.session), /* continue */);
+	CHECK_FCT_DO(fd_tls_rcvthr_core(cnx, ctx->strid ? ctx->session : cnx->cc_tls_para.session, 0), /* continue */);
 error:
 	fd_cnx_markerror(cnx);
 	TRACE_DEBUG(FULL, "Thread terminated");	
--- a/libfdcore/sctp_dtls.c	Mon Jun 17 10:11:57 2013 +0800
+++ b/libfdcore/sctp_dtls.c	Tue Jun 18 12:46:01 2013 +0800
@@ -39,73 +39,426 @@
 #include "cnxctx.h"
 
 
-#define DTLS_TYPE_application_data	23
+/* In DTLS over SCTP, all the DTLS internal messages (handshake, etc) must be sent over stream 0 so that we are sure they are received in order.
+ Since we need to distinguish different DTLS payloads, we need some knowledge of DTLS protocol here. 
+ We will then chose the stream within our "push" function called by GNUTLS.
+ */
+#define DTLS_TYPE_OFFSET		0	/* The TYPE byte is the first in a DTLS packet */
+#define DTLS_TYPE_application_data	23	/* This is the value when the DTLS packet contains DATA (i.e. Diameter payload in our case) */
+#define DTLS_SEQ_OFFSET			3	/* The SEQUENCE bytes come after type and proto version */
+
+
+#define DTLS_SCTP_MTU	2^14 /* as per RFC 6083 */
+
+
+/* The DTLS MTU is limited to 2^14, but Diameter messages can be larger. It means we MUST handle Diameter messages reassembly here; and this is not simple.
+
+There are two ways to deal with this problem: 
+
+- first solution is to force ordering when parsing all the datagrams received (as SCTP guarantees we will receive them), 
+     so we are guaranteed to reconstruct the stream of data in the same order as it was sent, and we can process the received data the same way as TCP.
+  * pros: very robust, does not depend on how the remote side is sending the data (assuming they do not interleave chunks of diameter messages, we'd have no solution otherwise)
+  * cons: less efficient than the next solution, as on the receiving side we cannot parse new payloads until all the previous ones are received. 
+          It defeats some of the benefits of the partial ordering of SCTP.
+	  
+- second solution is to make sure the fragmented payloads are sent over the same stream (which are always ordered) and rebuild the messages per stream. 
+  * pros: enables to process complete messages received on other streams while waiting for some chunks (similar to non-DTLS situation, except that in that case SCTP handles the fragmentation)
+  * cons: we must be sure the sending side is actually sending pieces of a message on the same stream. And the processing on receiving side is more complex.
+  
+We'd have actually more solutions, for example storing the message hop-by-hop id in the snd_ppid field of SCTP header, but this would work only in front of freeDiameter.
+
+Here is an illustration of the two solutions: 
+we assume 3 streams S1,S2,S3 and 4 messages, message M1 of 2^14 + 2^13 (=24576) bytes and 3 messages M2,M3,M4 of 2^12 (=4096) bytes to send from peer A to peer B.
+
+Peer A calls fd_cnx_send() 4 times with the 4 messages M1,M2,M3,M4, which in turn calls gnutls_record_send(), which generates the chunks C1...C5 below:
+	C1: gnutls_record_send(M1) -> returns 2^14 since the complete record exceed the MTU.
+	C2: gnutls_record_send(M1+2^14) -> returns the remaining 2^13
+	C3: gnutls_record_send(M2)
+	C4: gnutls_record_send(M3)
+	C5: gnutls_record_send(M4)
+
+*** Solution 1)
+
+Implementing the first solution above, the chunks are sent as follows (assuming round-robin sending over the streams): 
+C1 over S1, 
+C2 over S2, 
+C3 over S3, 
+C4 over S1, 
+C5 over S2.
 
-#ifdef GNUTLS_VERSION_300
-/* Check if data is available for gnutls on a given context */
-static int sctp_dtls_pull_timeout(gnutls_transport_ptr_t tr, unsigned int ms)
+Given the size of the chunks, they might be delivered in the following order on the receiving side:
+C3
+C2
+C5
+C1
+C4
+
+This means we have to store C3, C2 and C5 until C1 is received, then we can process C1,C2,C3, 
+and again wait for C4 before processing C4 and C5, while C3, C4 and C5 are totally independent 
+and could be processed directly after being received.
+
+
+*** Solution 2)
+
+Here the partial ordering is enforced, so the sending side MUST send C1 and C2 over the same stream, e.g.:
+C1 over S1, 
+C2 over S1, 
+C3 over S2, 
+C4 over S3, 
+C5 over S1.
+
+On the receiving side, given the sizes of the message, we might receive the chunks in the following order:
+C3
+C4
+C1
+C2
+C5
+
+We can process C3 and C4 as soon as they are received, then C1 is stored (when decrypted we can see it is a partial chunk)
+until the remaining payload is received; however we can continue to process the data received over other streams without delay.
+
+
+*** What we do here.
+
+freeDiameter implements the Solution 2 on the sending side (no additional cost), via fd_sctp_dtls_send() below.
+
+On the receiving side, we implement Solution 1 at the moment (safe).  We do it at the lowest layer, before passing the data to GNUTLS. 
+This way, we can catch all sequence numbers easily. 
+Note however we have no way to handle cleanly the change of ephoch in case of cipher change (this is unclear in RFC6083 as well)
+
+We'll see later if it makes sense to implement solution 2. 
+How to decide if we can use it? one way could be to start doing solution 1, 
+and when a large record is received check if the chunks were received on the same stream or not.
+
+Implementation of solution 2 is difficult because we need to pass the stream information through GNU TLS and there is no easy way to do it.
+
+*/
+
+/* Retrieve the next data from the socket. Returns 0 if no payload data is available, >0 otherwise, and <0 in case of error */
+static int get_next_data_from_socket(struct cnxctx * conn, uint16_t *strid, uint8_t ** buf, size_t *len)
 {
-	struct cnxctx * conn = (struct cnxctx *)tr;
-	fd_set rfds;
-	struct timeval tv;
+	int got_data = 0;
+	int event;
+	CHECK_FCT_DO( fd_sctp_recvmeta(conn, strid, buf, len, &event), return -1 );
+	switch (event) {
+		case FDEVP_CNX_MSG_RECV:
+			got_data = 1;
+			LOG_A("Received DTLS data, type %02hhx, Seq %02hhx%02hhx%02hhx%02hhx%02hhx%02hhx%02hhx%02hhx, Stream %hu",
+				(*buf)[0],
+				(*buf)[3],(*buf)[4],(*buf)[5],(*buf)[6], (*buf)[7],(*buf)[8],(*buf)[9],(*buf)[10],
+				*strid);
+			break;
+
+		case FDEVP_CNX_EP_CHANGE:
+			/* Send this event to the target queue */
+			CHECK_FCT_DO( fd_event_send( fd_cnx_target_queue(conn), event, *len, *buf), return -1 );
+			break;
+
+		case FDEVP_CNX_SHUTDOWN:
+			/* Just ignore the notification for now, we will get another error later anyway */
+			break;
+
+		case FDEVP_CNX_ERROR:
+		default:
+			return -1;
+	}
+	
+	return got_data;
+}
+
+/***************************************************************************************************/
+/* Helper functions to reorder the received chunks by sequence number                              */
+/***************************************************************************************************/
+
+struct chunk {
+	struct fd_list	chain;	/* link in the ordered list of chunks */
+	uint8_t		seq[8];	/* epoch + sequence number */
+	uint8_t *	buffer; /* the data */
+	size_t		len;	/* length of the buffer */
+	uint16_t	stream; /* which stream the chunk was received on */
+	/* We could also add a timestamp here */
+};
 
-	FD_ZERO (&rfds);
-	FD_SET (conn->cc_socket, &rfds);
+/* Inserts new buffer received from the connection in the list of chunks */
+static int chunk_insert(struct cnxctx * conn, uint16_t streamid, uint8_t *buffer, size_t len)
+{
+	struct chunk * new;
+	struct fd_list * li;
+	uint8_t * newseq;
+	
+	/* Check the new sequence is >= what we processed in upper layer */
+	newseq = buffer + DTLS_SEQ_OFFSET;
+	if (memcmp(newseq, conn->cc_sctp_dtls_data.validseq, 8) < 0) {
+		LOG_E("Received DTLS packet with smaller sequence number than already processed, discarded. FFS.");
+		free(buffer);
+		return 0;
+	}
+	
+	/* Create a new chunk structure to store this chunk */
+	CHECK_MALLOC( new = malloc(sizeof(struct chunk)) );
+	memset(new, 0, sizeof(struct chunk));
+	fd_list_init(&new->chain, new);
+	memcpy(&new->seq, newseq, 8);
+	new->buffer = buffer;
+	new->len = len;
+	new->stream = streamid;
+	
+	/* Insert this new structure in the list attached to the connection */
+	for (li = conn->cc_sctp_dtls_data.chunks.prev; li != &conn->cc_sctp_dtls_data.chunks; li = li->prev) {
+		int cmp = memcmp(new->seq, ((struct chunk *)li->o)->seq, 8);
+		if (cmp < 0) continue;
+		if (cmp == 0) {
+			/* discard repeated seq */
+			LOG_E("Received DTLS packet with duplicate sequence number, discarded. FFS.");
+			free(buffer);
+			free(new);
+			return 0;
+		}
+		break;
+	}
+	/* special case: if we are already delivering partially the first chunk, we do insert only after this one */
+	if (conn->cc_sctp_dtls_data.offset && (li == &conn->cc_sctp_dtls_data.chunks))
+		li = li->next;
+	
+	fd_list_insert_after(li, &new->chain);
+	
+	return 0;
+	
+}
 
-	tv.tv_sec = 0;
-	tv.tv_usec = ms * 1000;
+/* Retrieve data from the list of chunks. Returns 0 if no data is ready for upper layer, the available length otherwise (up to upperlen) */
+static size_t chunk_retrieve(struct cnxctx * conn, void * upperbuf, size_t upperlen, int probeonly) 
+{
+	struct chunk * next;
+	int cmp,i;
+	size_t ret = 0;
+redo:	
+	if (FD_IS_LIST_EMPTY(&conn->cc_sctp_dtls_data.chunks)) {
+		return 0;
+	}
+	
+	next = conn->cc_sctp_dtls_data.chunks.next->o;
+	
+	/* If we are already delivering this chunk, just continue until complete */
+	if (conn->cc_sctp_dtls_data.offset != 0) {
+		if (probeonly)
+			return 1;
+		
+		ret = next->len - conn->cc_sctp_dtls_data.offset;
+		if (upperlen < ret)
+			ret = upperlen;
+		
+		memcpy(upperbuf, next->buffer + conn->cc_sctp_dtls_data.offset, ret);
+		conn->cc_sctp_dtls_data.offset += ret;
+		if (conn->cc_sctp_dtls_data.offset == next->len) {
+			/* we delivered the complete chunk, now we can remove it */
+			conn->cc_sctp_dtls_data.offset = 0;
+			free(next->buffer);
+			fd_list_unlink(&next->chain);
+			free(next);
+		}
+		return ret;
+	}
+	
+	cmp = memcmp(next->seq, conn->cc_sctp_dtls_data.nextseq, 8);
+	if (cmp < 0) {
+		cmp = memcmp(next->seq, conn->cc_sctp_dtls_data.validseq, 8);
+		if (cmp <= 0) {
+			/* This is old stuff or invalid stuff, discard */
+			LOG_E("Unqueued DTLS packet with old sequence number, discarding.");
+			free(next->buffer);
+			fd_list_unlink(&next->chain);
+			free(next);
+			goto redo;
+		}
+		/* If the first chunk in our list has a smaller seq number than what we already delivered, we pass it above (to prevent DoS) */
+		LOG_A("Unqueueing chunk with seq number %02hhx%02hhx%02hhx%02hhx%02hhx%02hhx%02hhx%02hhx", 
+			next->seq[0],next->seq[1],next->seq[2],next->seq[3],next->seq[4],next->seq[5],next->seq[6],next->seq[7]);
+		ret = next->len;
+		if (upperlen < ret) {
+			ret = upperlen;
+			memcpy(upperbuf, next->buffer, ret);
+			conn->cc_sctp_dtls_data.offset = ret;
+		} else {
+			memcpy(upperbuf, next->buffer, ret);
+			free(next->buffer);
+			fd_list_unlink(&next->chain);
+			free(next);
+		}
+		return ret;
+	}
+	if (cmp > 0) {
+		/* is this the first message of a new epoch ? */
+		uint8_t newepoch[8] = { 0, 0, 0, 0, 0, 0, 0, 0 };
+		if (next->seq[1] != 0xFF) {
+			newepoch[0] = next->seq[0]; newepoch[1] = next->seq[1] + 1;
+		} else if (next->seq[0] != 0xFF) {
+			newepoch[0] = next->seq[0] + 1; newepoch[1] = 0;
+		} else {
+			LOG_F("Epoch field wrapped, can this happen ???");
+			ASSERT(0); TODO("FFS");
+		}
+		
+		if (memcmp(newepoch, next->seq, 8) == 0) {
+			/* Bingo, this is the first message of the new epoch. We update our nextseq accordingly */
+			memcpy(conn->cc_sctp_dtls_data.nextseq, newepoch, 8);
+			conn->cc_sctp_dtls_data.nextseq[7] = 1;
+			
+			LOG_A("Unqueueing chunk with seq number %02hhx%02hhx%02hhx%02hhx%02hhx%02hhx%02hhx%02hhx", 
+				next->seq[0],next->seq[1],next->seq[2],next->seq[3],next->seq[4],next->seq[5],next->seq[6],next->seq[7]);
+			ret = next->len;
+			if (upperlen < ret) {
+				ret = upperlen;
+				memcpy(upperbuf, next->buffer, ret);
+				conn->cc_sctp_dtls_data.offset = ret;
+			} else {
+				memcpy(upperbuf, next->buffer, ret);
+				free(next->buffer);
+				fd_list_unlink(&next->chain);
+				free(next);
+			}
+			return ret;
+		}
+			
+		/* otherwise, we don't return this data */
+		return 0;
+	}
+	
+	/* next is the next chunk expected on this connection */
+	
+	/* We increment the next seq */
+	for (i = 7; i>=3; i--) {
+		if (conn->cc_sctp_dtls_data.nextseq[i] == 0xFF) {
+			conn->cc_sctp_dtls_data.nextseq[i] = 0;
+		} else {
+			conn->cc_sctp_dtls_data.nextseq[i] ++;
+			break;
+		}
+	}
+	if (i==2) {
+		LOG_F("Sequence_number field wrapped, can this happen ???");
+		ASSERT(0); TODO("FFS");
+	}
+	
+	/* And we deliver this to upper layer */
+	LOG_A("Unqueueing chunk with seq number %02hhx%02hhx%02hhx%02hhx%02hhx%02hhx%02hhx%02hhx", 
+		next->seq[0],next->seq[1],next->seq[2],next->seq[3],next->seq[4],next->seq[5],next->seq[6],next->seq[7]);
+	ret = next->len;
+	if (upperlen < ret) {
+		ret = upperlen;
+		memcpy(upperbuf, next->buffer, ret);
+		conn->cc_sctp_dtls_data.offset = ret;
+	} else {
+		memcpy(upperbuf, next->buffer, ret);
+		free(next->buffer);
+		fd_list_unlink(&next->chain);
+		free(next);
+	}
+	return ret;
+}
 
-	while(tv.tv_usec >= 1000000)
-	{
-		tv.tv_usec -= 1000000;
-		tv.tv_sec++;
-	}
+/* returns positive value if data is available for upper layer, 0 if the time is elapsed */
+static int chunk_select(struct cnxctx * conn, unsigned int ms)
+{
+	fd_set rfds;
+	struct timespec absend, inter;
+	int ret;
+	uint8_t * buf;
+	size_t 	  len;
+	uint16_t  strid;
+
+	/* absolute time we will timeout */
+	CHECK_SYS_DO(  clock_gettime(CLOCK_REALTIME, &absend), return -1 );
+	absend.tv_sec += ((ms + (absend.tv_nsec / 1000000L)) / 1000);
+	absend.tv_nsec = ( ms * 1000000L + absend.tv_nsec ) % 1000000000L;
+	
+	do {
+		/* Check if we have available data in the list of chunks */
+		if (chunk_retrieve(conn, NULL, 0, 1) > 0)
+			return 1;
+	
+		/* otherwise we need to retrieve more data from the socket, so we select */
 
-	return select (conn->cc_socket + 1, &rfds, NULL, NULL, &tv);
+		FD_ZERO (&rfds);
+		FD_SET (conn->cc_socket, &rfds);
+		
+		/* We wait until absend only */
+		CHECK_SYS_DO(  clock_gettime(CLOCK_REALTIME, &inter), return -1 );
+		if (inter.tv_nsec <= absend.tv_nsec) {
+			if (inter.tv_sec > absend.tv_sec) {
+				inter.tv_sec = 0; inter.tv_nsec = 0;
+			} else {
+				inter.tv_sec = absend.tv_sec - inter.tv_sec;
+				inter.tv_nsec = absend.tv_nsec - inter.tv_nsec;
+			}
+		} else {
+			if (inter.tv_sec >= absend.tv_sec) {
+				inter.tv_sec = 0; inter.tv_nsec = 0;
+			} else {
+				inter.tv_sec = absend.tv_sec - inter.tv_sec - 1;
+				inter.tv_nsec = 1000000000L - inter.tv_nsec + absend.tv_nsec;
+			}
+		}
+
+		/* Now, wait for new data on the socket */
+		ret = pselect (conn->cc_socket + 1, &rfds, NULL, NULL, &inter, NULL);
+		if (ret <= 0)
+			break; /* no data was received, we can return */
+		
+		/* We got data, get it and insert in the list of chunks  */
+		ret = get_next_data_from_socket(conn, &strid, &buf, &len);
+		if (ret < 0)
+			break;
+		if (ret == 0)
+			continue;
+		
+		CHECK_FCT_DO( chunk_insert(conn, strid, buf, len), return -1 );
+		/* and loop */
+	} while (1);
+	
+	return ret;
 }
-#endif /* GNUTLS_VERSION_300 */
 
-/* Send data over the connection, called by gnutls */
+/***************************************************************************************************/
+/* Functions "under" GNU TLS                                                                       */
+/***************************************************************************************************/
+
+/* Send data over the connection, called by gnutls. This function checks the type of DTLS packet and sends
+all non-application data over stream 0 (to enforce ordering) and application data over the stream set by
+upper layer in conn->cc_sctp_para.next */
 static ssize_t sctp_dtls_pushv(gnutls_transport_ptr_t tr, const giovec_t * iov, int iovcnt)
 {
 	struct cnxctx * conn = (struct cnxctx *)tr;
-	ssize_t ret;
+	uint16_t stream = 0;
+	
 	
 	TRACE_ENTRY("%p %p %d", tr, iov, iovcnt);
 	CHECK_PARAMS_DO( tr && iov, { errno = EINVAL; return -1; } );
 	
-	/* If no unordered delivery is allowed, send over stream 0 always */
-	if (conn->cc_sctp_para.unordered == 0) {
-		ret = fd_sctp_sendstrv(conn, 0, (const struct iovec *)iov, iovcnt);
-	}
-	/* Otherwise, we need to check the type of record. */
-	else {
-		if ((iovcnt > 0) 
-		&& (iov->iov_len > 0) && 
-		(*((uint8_t *)iov->iov_base) == DTLS_TYPE_application_data)) {
-			/* Data is sent over different streams */
-			if (conn->cc_sctp_para.str_out > 32) {
-				TODO("Limiting to 32 streams. Remove this limit when anti-replay is disabled");
-				conn->cc_sctp_para.str_out = 32;
-			}
-			if (conn->cc_sctp_para.str_out > 1) {
-				conn->cc_sctp_para.next += 1;
-				conn->cc_sctp_para.next %= conn->cc_sctp_para.str_out;
-			} else {
-				conn->cc_sctp_para.next = 0;
-			}
-			ret = fd_sctp_sendstrv(conn, conn->cc_sctp_para.next, (const struct iovec *)iov, iovcnt);
-	
-		} else {
-		/* other TLS messages are always sent over stream 0 */
-			ret = fd_sctp_sendstrv(conn, 0, (const struct iovec *)iov, iovcnt);
-		}
+	if ((conn->cc_sctp_para.unordered != 0) 
+	&& (iovcnt > 0) 
+	&& (iov->iov_len > 0) 
+	&& (((uint8_t *)iov->iov_base)[DTLS_TYPE_OFFSET] == DTLS_TYPE_application_data)) {
+		/* Data is sent over different streams, if allowed */
+		stream = conn->cc_sctp_para.next;
 	}
 	
-	return ret;
+	if ((iovcnt > 0) && (iov->iov_len > 10)) {
+		LOG_A("Sending DTLS data, type %02hhx, Seq %02hhx%02hhx%02hhx%02hhx%02hhx%02hhx%02hhx%02hhx, Stream %hu",
+			((uint8_t *)iov->iov_base)[0],
+			((uint8_t *)iov->iov_base)[3],((uint8_t *)iov->iov_base)[4],((uint8_t *)iov->iov_base)[5],((uint8_t *)iov->iov_base)[6],
+			((uint8_t *)iov->iov_base)[7],((uint8_t *)iov->iov_base)[8],((uint8_t *)iov->iov_base)[9],((uint8_t *)iov->iov_base)[10],
+			stream);	
+	} else {
+		LOG_A("Sending DTLS data, {iovcnt=%d, iov->iov_len=%zd}, Stream %hu",
+			iovcnt, ((iovcnt>0) ? iov->iov_len : 0), stream);	
+	}
+	
+	return fd_sctp_sendstrv(conn, stream, (const struct iovec *)iov, iovcnt);
 }
 
 #ifndef GNUTLS_VERSION_212
+/* compatibility wrapper for older GNUTLS that does not support the vector_push */
 static ssize_t sctp_dtls_push(gnutls_transport_ptr_t tr, const void * data, size_t len)
 {
 	giovec_t iov;
@@ -115,15 +468,48 @@
 }
 #endif /*  GNUTLS_VERSION_212 */
 
-/* Retrieve data received on any stream */
-static ssize_t sctp_dtls_pull(gnutls_transport_ptr_t tr, void * buf, size_t len)
+#ifdef GNUTLS_VERSION_300
+/* Check if data is available for gnutls on a given connection.  */
+static int sctp_dtls_pull_timeout(gnutls_transport_ptr_t tr, unsigned int ms)
+{
+	struct cnxctx * conn = (struct cnxctx *)tr;
+	return chunk_select(conn, ms);
+}
+#endif /* GNUTLS_VERSION_300 */
+
+
+/* This function returns only ordered data to the upper layer */
+static ssize_t sctp_dtls_pull(gnutls_transport_ptr_t tr, void * gnutlsbuf, size_t gnutlslen)
 {
 	struct cnxctx * conn = (struct cnxctx *)tr;
+	ssize_t ret = 0;
+	
+	while ( (ret = chunk_retrieve(conn,gnutlsbuf,gnutlslen,0)) == 0) {
+		
+		/* No partial data, read the next SCTP record */
+		int       stop = 0;
+		uint8_t * buf;
+		size_t 	  len;
+		uint16_t  strid;
+		do {
+			stop = get_next_data_from_socket(conn, &strid, &buf, &len);
+			if (stop < 0)
+				goto out;
+		} while (!stop);
+		
+		CHECK_FCT_DO( chunk_insert(conn, strid, buf, len), goto out );
+	}
+	
+out:
+	return ret;
 
-	/* If needed we can use fd_sctp_recvmeta to retrieve more information here */
-	return fd_cnx_s_recv(conn, buf, len);
 }
 
+
+/***************************************************************************************************/
+/* Functions "above" GNU TLS                                                                       */
+/***************************************************************************************************/
+
 /* Set the parameters of a session to use the cnxctx object */
 #ifndef GNUTLS_VERSION_300
 GCC_DIAG_OFF("-Wdeprecated-declarations")
@@ -150,7 +536,7 @@
 	GNUTLS_TRACE( gnutls_transport_set_vec_push_function(session, sctp_dtls_pushv) );
 #endif /* GNUTLS_VERSION_212 */
 
-	return;
+	return 0;
 }
 #ifndef GNUTLS_VERSION_300
 GCC_DIAG_ON("-Wdeprecated-declarations")
@@ -163,26 +549,28 @@
 int fd_sctp_dtls_prepare(gnutls_session_t session)
 {
 	/* We do not use cookies at the moment. Not sure it is useful or not */
-	TODO("Cookie exchange?");
+	/* TODO("Cookie exchange?"); */
 	/* gnutls_dtls_prestate_set (session, &prestate); */
 
-	gnutls_dtls_set_mtu(session, 2^14 /* as per RFC 6083 */);
+	gnutls_dtls_set_mtu(session, DTLS_SCTP_MTU);
 
 	gnutls_dtls_set_timeouts(session, 70000, 60000); /* Set retrans > total so that there is no retransmission, since SCTP is reliable */
 
 #ifdef GNUTLS_VERSION_320
 	TODO("Disable replay protection");
+	TODO("Register hook on the Finish message to change SCTP_AUTH active key on the socket");
 #endif /* GNUTLS_VERSION_320 */
 	
+	return 0;
 	
 }
 
-
-static ssize_t fd_dtls_recv_handle_error(struct cnxctx * conn, gnutls_session_t session, void * data, size_t sz, uint8_t seq[8])
+/* the following function is actually almost same as fd_tls_recv_handle_error at the moment, since all handling is done under gnutls */
+ssize_t fd_dtls_recv_handle_error(struct cnxctx * conn, gnutls_session_t session, void * data, size_t sz)
 {
 	ssize_t ret;
 again:	
-	CHECK_GNUTLS_DO( ret = gnutls_record_recv_seq(session, data, sz, seq), 
+	CHECK_GNUTLS_DO( ret = gnutls_record_recv_seq(session, data, sz, conn->cc_sctp_dtls_data.validseq), 
 		{
 			switch (ret) {
 				case GNUTLS_E_REHANDSHAKE: 
@@ -208,6 +596,17 @@
 					TRACE_DEBUG(FULL, "Got 0 size while reading the socket, probably connection closed...");
 					break;
 				
+				case GNUTLS_E_WARNING_ALERT_RECEIVED:
+					LOG_N("Received TLS WARNING ALERT: %s", gnutls_alert_get_name(gnutls_alert_get(session)) ?: "<unknown alert>");
+					if (!fd_cnx_teststate(conn, CC_STATUS_CLOSING))
+						goto again;
+					TRACE_DEBUG(FULL, "Connection is closing, so abord gnutls_record_recv now.");
+					break;
+					
+				case GNUTLS_E_FATAL_ALERT_RECEIVED:
+					LOG_E("Received TLS FATAL ALERT: %s", gnutls_alert_get_name(gnutls_alert_get(session)) ?: "<unknown alert>");
+					break;
+					
 				default:
 					if (gnutls_error_is_fatal (ret) == 0) {
 						LOG_N("Ignoring non-fatal GNU TLS error: %s", gnutls_strerror (ret));
@@ -226,8 +625,7 @@
 	return ret;
 }
 
-
-/* Receiver thread that reassemble the decrypted messages (when size is > 2<<14) for upper layer */
+/* Receiver thread that reassemble the decrypted messages (when size is > 2<<14) for upper layer. Very similar to fd_tls_rcvthr_core in this version */
 void * fd_sctp_dtls_rcvthr(void * arg) {
 
 	struct cnxctx * conn = arg;
@@ -244,72 +642,42 @@
 	
 	ASSERT( fd_cnx_teststate(conn, CC_STATUS_TLS) );
 	ASSERT( fd_cnx_target_queue(conn) );
-
+	
 	/* The next function only returns when there is an error on the socket */	
-	do {
-	
-		TODO("Reassemble the packets based on their sequence number & stream");
-		
-		
-#if 0	
-		
-		uint8_t header[4];
-		struct fd_cnx_rcvdata rcv_data;
-		struct fd_msg_pmdl *pmdl=NULL;
-		ssize_t ret = 0;
-		size_t	received = 0;
-		uint8_t seq[8];
-
-		do {
-			ret = fd_dtls_recv_handle_error(conn, session, &header[received], sizeof(header) - received, seq);
-			if (ret <= 0) {
-				/* The connection is closed */
-				goto out;
-			}
-			received += ret;
-		} while (received < sizeof(header));
-
-		rcv_data.length = ((size_t)header[1] << 16) + ((size_t)header[2] << 8) + (size_t)header[3];
+	CHECK_FCT_DO(fd_tls_rcvthr_core(conn, conn->cc_tls_para.session, 1), /* continue */);
 
-		/* Check the received word is a valid beginning of a Diameter message */
-		if ((header[0] != DIAMETER_VERSION)	/* defined in <libfreeDiameter.h> */
-		   || (rcv_data.length > DIAMETER_MSG_SIZE_MAX)) { /* to avoid too big mallocs */
-			/* The message is suspect */
-			LOG_E( "Received suspect header [ver: %d, size: %zd] from '%s', assume disconnection", (int)header[0], rcv_data.length, conn->cc_remid);
-			fd_cnx_markerror(conn);
-			goto out;
-		}
-
-		/* Ok, now we can really receive the data */
-		CHECK_MALLOC(  rcv_data.buffer = fd_cnx_alloc_msg_buffer( rcv_data.length, &pmdl ) );
-		memcpy(rcv_data.buffer, header, sizeof(header));
-
-		while (received < rcv_data.length) {
-			pthread_cleanup_push(free_rcvdata, &rcv_data); /* In case we are canceled, clean the partialy built buffer */
-			ret = fd_tls_recv_handle_error(conn, session, rcv_data.buffer + received, rcv_data.length - received);
-			pthread_cleanup_pop(0);
-
-			if (ret <= 0) {
-				free_rcvdata(&rcv_data);
-				goto out;
-			}
-			received += ret;
-		}
-		
-		fd_hook_call(HOOK_DATA_RECEIVED, NULL, NULL, &rcv_data, pmdl);
-		
-		/* We have received a complete message, pass it to the daemon */
-		CHECK_FCT_DO( ret = fd_event_send( fd_cnx_target_queue(conn), FDEVP_CNX_MSG_RECV, rcv_data.length, rcv_data.buffer), 
-			{ 
-				free_rcvdata(&rcv_data);
-				CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), );
-				return ret; 
-			} );
-#endif // 0	
-				
-	} while (1);
-	
-out:
 	TRACE_DEBUG(FULL, "Thread terminated");	
 	return NULL;
 }			
+
+/* Send a new Diameter message over the association */
+int fd_sctp_dtls_send(struct cnxctx * conn, unsigned char * buf, size_t len)
+{
+	ssize_t ret;
+	size_t sent = 0;
+	TRACE_ENTRY("%p %p %zd", conn, buf, len);
+	
+	CHECK_PARAMS(conn);
+	
+	/* First, decide which stream this data will be sent to */
+	if (conn->cc_sctp_para.str_out > 32) {
+		TODO("Limiting to 32 streams. Remove this limit when anti-replay is disabled");
+		conn->cc_sctp_para.str_out = 32;
+	}
+	if (conn->cc_sctp_para.str_out > 1) {
+		conn->cc_sctp_para.next += 1;
+		conn->cc_sctp_para.next %= conn->cc_sctp_para.str_out;
+	} else {
+		conn->cc_sctp_para.next = 0;
+	}
+	
+	/* Now send the data over this stream. Do it in a loop in case the length is larger than the MTU */
+	do {
+		CHECK_GNUTLS_DO( ret = fd_tls_send_handle_error(conn, conn->cc_tls_para.session, buf + sent, len - sent),  );
+		if (ret <= 0)
+			return ENOTCONN;
+		
+		sent += ret;
+	} while ( sent < len );
+	return 0;
+}
"Welcome to our mercurial repository"