diff libfdcore/cnxctx.c @ 706:4ffbc9f1e922

Large UNTESTED commit with the following changes: * Improved DiameterIdentity handling (esp. interationalization issues), and improve efficiency of some string operations in peers, sessions, and dictionary modules (closes #7) * Cleanup in the session module to free only unreferenced sessions (#16) * Removed fd_cpu_flush_cache(), replaced by more robust alternatives. * Improved peer state machine algorithm to counter SCTP multistream race condition.
author Sebastien Decugis <sdecugis@nict.go.jp>
date Wed, 09 Feb 2011 15:26:58 +0900
parents 2e94ef0515d7
children 19a9470de77a
line wrap: on
line diff
--- a/libfdcore/cnxctx.c	Mon Jan 31 17:22:21 2011 +0900
+++ b/libfdcore/cnxctx.c	Wed Feb 09 15:26:58 2011 +0900
@@ -50,8 +50,6 @@
  * They are always oriented on connections (TCP or SCTP), connectionless modes (UDP or SCTP) are not supported.
  */
 
-/* Note: this file could be moved to libfreeDiameter instead, but since it uses gnuTLS we prefer to keep it in the daemon */
-
 /* Lifetime of a cnxctx object:
  * 1) Creation
  *    a) a server socket:
@@ -157,8 +155,7 @@
 #ifdef DISABLE_SCTP
 	TRACE_DEBUG(INFO, "This function should never been called when SCTP is disabled...");
 	ASSERT(0);
-	CHECK_FCT_DO( ENOTSUP, );
-	return NULL;
+	CHECK_FCT_DO( ENOTSUP, return NULL);
 #else /* DISABLE_SCTP */
 	struct cnxctx * cnx = NULL;
 
@@ -248,18 +245,18 @@
 		char portbuf[10];
 		int  rc;
 		
-		/* Numeric values for debug */
 		rc = getnameinfo((sSA *)&ss, sSAlen(&ss), addrbuf, sizeof(addrbuf), portbuf, sizeof(portbuf), NI_NUMERICHOST | NI_NUMERICSERV);
 		if (rc) {
 			snprintf(addrbuf, sizeof(addrbuf), "[err:%s]", gai_strerror(rc));
 			portbuf[0] = '\0';
 		}
 		
-		snprintf(cli->cc_id, sizeof(cli->cc_id), "{%s} (%d) <- [%s]:%s (%d)", 
-				IPPROTO_NAME(cli->cc_proto), serv->cc_socket, 
-				addrbuf, portbuf, cli->cc_socket);
+		/* Numeric values for debug... */
+		snprintf(cli->cc_id, sizeof(cli->cc_id), "%s from [%s]:%s (%d<-%d)", 
+				IPPROTO_NAME(cli->cc_proto), addrbuf, portbuf, serv->cc_socket, cli->cc_socket);
 		
-		/* Name for log messages */
+		
+		/* ...Name for log messages */
 		rc = getnameinfo((sSA *)&ss, sSAlen(&ss), cli->cc_remid, sizeof(cli->cc_remid), NULL, 0, 0);
 		if (rc)
 			snprintf(cli->cc_remid, sizeof(cli->cc_remid), "[err:%s]", gai_strerror(rc));
@@ -333,16 +330,16 @@
 		char portbuf[10];
 		int  rc;
 		
-		/* Numeric values for debug */
+		/* Numeric values for debug... */
 		rc = getnameinfo(sa, addrlen, addrbuf, sizeof(addrbuf), portbuf, sizeof(portbuf), NI_NUMERICHOST | NI_NUMERICSERV);
 		if (rc) {
 			snprintf(addrbuf, sizeof(addrbuf), "[err:%s]", gai_strerror(rc));
 			portbuf[0] = '\0';
 		}
 		
-		snprintf(cnx->cc_id, sizeof(cnx->cc_id), "{TCP} -> [%s]:%s (%d)", addrbuf, portbuf, cnx->cc_socket);
+		snprintf(cnx->cc_id, sizeof(cnx->cc_id), "TCP to [%s]:%s (%d)", addrbuf, portbuf, cnx->cc_socket);
 		
-		/* Name for log messages */
+		/* ...Name for log messages */
 		rc = getnameinfo(sa, addrlen, cnx->cc_remid, sizeof(cnx->cc_remid), NULL, 0, 0);
 		if (rc)
 			snprintf(cnx->cc_remid, sizeof(cnx->cc_remid), "[err:%s]", gai_strerror(rc));
@@ -355,10 +352,9 @@
 struct cnxctx * fd_cnx_cli_connect_sctp(int no_ip6, uint16_t port, struct fd_list * list)
 {
 #ifdef DISABLE_SCTP
-	TRACE_DEBUG(INFO, "This function should never been called when SCTP is disabled...");
+	TRACE_DEBUG(INFO, "This function should never be called when SCTP is disabled...");
 	ASSERT(0);
-	CHECK_FCT_DO( ENOTSUP, );
-	return NULL;
+	CHECK_FCT_DO( ENOTSUP, return NULL);
 #else /* DISABLE_SCTP */
 	int sock = 0;
 	struct cnxctx * cnx = NULL;
@@ -415,16 +411,16 @@
 		char portbuf[10];
 		int  rc;
 		
-		/* Numeric values for debug */
+		/* Numeric values for debug... */
 		rc = getnameinfo((sSA *)&primary, sSAlen(&primary), addrbuf, sizeof(addrbuf), portbuf, sizeof(portbuf), NI_NUMERICHOST | NI_NUMERICSERV);
 		if (rc) {
 			snprintf(addrbuf, sizeof(addrbuf), "[err:%s]", gai_strerror(rc));
 			portbuf[0] = '\0';
 		}
 		
-		snprintf(cnx->cc_id, sizeof(cnx->cc_id), "{SCTP} -> [%s]:%s (%d)", addrbuf, portbuf, cnx->cc_socket);
+		snprintf(cnx->cc_id, sizeof(cnx->cc_id), "SCTP to [%s]:%s (%d)", addrbuf, portbuf, cnx->cc_socket);
 		
-		/* Name for log messages */
+		/* ...Name for log messages */
 		rc = getnameinfo((sSA *)&primary, sSAlen(&primary), cnx->cc_remid, sizeof(cnx->cc_remid), NULL, 0, 0);
 		if (rc)
 			snprintf(cnx->cc_remid, sizeof(cnx->cc_remid), "[err:%s]", gai_strerror(rc));
@@ -453,20 +449,63 @@
 }
 
 /* Set the hostname to check during handshake */
-void fd_cnx_sethostname(struct cnxctx * conn, char * hn)
+void fd_cnx_sethostname(struct cnxctx * conn, DiamId_t hn)
 {
 	CHECK_PARAMS_DO( conn, return );
 	conn->cc_tls_para.cn = hn;
 }
 
+/* We share a lock with many threads but we hold it only very short time so it is OK */
+static pthread_mutex_t state_lock = PTHREAD_MUTEX_INITIALIZER;
+uint32_t fd_cnx_getstate(struct cnxctx * conn)
+{
+	uint32_t st;
+	CHECK_POSIX_DO( pthread_mutex_lock(&state_lock), { ASSERT(0); } );
+	st = conn->cc_state;
+	CHECK_POSIX_DO( pthread_mutex_unlock(&state_lock), { ASSERT(0); } );
+	return st;
+}
+int  fd_cnx_teststate(struct cnxctx * conn, uint32_t flag)
+{
+	uint32_t st;
+	CHECK_POSIX_DO( pthread_mutex_lock(&state_lock), { ASSERT(0); } );
+	st = conn->cc_state;
+	CHECK_POSIX_DO( pthread_mutex_unlock(&state_lock), { ASSERT(0); } );
+	return st & flag;
+}
+void fd_cnx_addstate(struct cnxctx * conn, uint32_t orstate)
+{
+	CHECK_POSIX_DO( pthread_mutex_lock(&state_lock), { ASSERT(0); } );
+	conn->cc_state |= orstate;
+	CHECK_POSIX_DO( pthread_mutex_unlock(&state_lock), { ASSERT(0); } );
+}
+void fd_cnx_setstate(struct cnxctx * conn, uint32_t abstate)
+{
+	CHECK_POSIX_DO( pthread_mutex_lock(&state_lock), { ASSERT(0); } );
+	conn->cc_state = abstate;
+	CHECK_POSIX_DO( pthread_mutex_unlock(&state_lock), { ASSERT(0); } );
+}
+
+
 /* Return the TLS state of a connection */
 int fd_cnx_getTLS(struct cnxctx * conn)
 {
 	CHECK_PARAMS_DO( conn, return 0 );
-	fd_cpu_flush_cache();
-	return conn->cc_status & CC_STATUS_TLS;
+	return fd_cnx_teststate(conn, CC_STATUS_TLS);
 }
 
+/* Return true if the connection supports unordered delivery of messages */
+int fd_cnx_isMultichan(struct cnxctx * conn)
+{
+	CHECK_PARAMS_DO( conn, return 0 );
+	#ifdef DISABLE_SCTP
+	if (conn->cc_proto == IPPROTO_SCTP)
+		return (conn->cc_sctp_para.str_in > 1) || (conn->cc_sctp_para.str_out > 1);
+	#endif /* DISABLE_SCTP */
+	return 0;
+}
+
+
 /* Get the list of endpoints (IP addresses) of the local and remote peers on this connection */
 int fd_cnx_getremoteeps(struct cnxctx * conn, struct fd_list * eps)
 {
@@ -507,10 +546,11 @@
 	return conn->cc_remid;
 }
 
-/* Retrieve a list of all IP addresses of the local system from the kernel, using  */
+/* Retrieve a list of all IP addresses of the local system from the kernel, using getifaddrs */
 int fd_cnx_get_local_eps(struct fd_list * list)
 {
 	struct ifaddrs *iflist, *cur;
+	
 	CHECK_SYS(getifaddrs(&iflist));
 	
 	for (cur = iflist; cur != NULL; cur = cur->ifa_next) {
@@ -542,22 +582,21 @@
 	TRACE_ENTRY("%p", conn);
 	CHECK_PARAMS_DO( conn, goto fatal );
 	
-	TRACE_DEBUG(FULL, "Error flag set for socket %d (%s / %s)", conn->cc_socket, conn->cc_remid, conn->cc_id);
+	TRACE_DEBUG(FULL, "Error flag set for socket %d (%s, %s)", conn->cc_socket, conn->cc_id, conn->cc_remid);
 	
 	/* Mark the error */
-	fd_cpu_flush_cache();
-	conn->cc_status |= CC_STATUS_ERROR;
+	fd_cnx_addstate(conn, CC_STATUS_ERROR);
 	
 	/* Report the error if not reported yet, and not closing */
-	if ((!(conn->cc_status & CC_STATUS_CLOSING )) && (!(conn->cc_status & CC_STATUS_SIGNALED )))  {
+	if (!fd_cnx_teststate(conn, CC_STATUS_CLOSING | CC_STATUS_SIGNALED ))  {
 		TRACE_DEBUG(FULL, "Sending FDEVP_CNX_ERROR event");
-		CHECK_FCT_DO( fd_event_send( Target_Queue(conn), FDEVP_CNX_ERROR, 0, NULL), goto fatal);
-		conn->cc_status |= CC_STATUS_SIGNALED;
+		CHECK_FCT_DO( fd_event_send( fd_cnx_target_queue(conn), FDEVP_CNX_ERROR, 0, NULL), goto fatal);
+		fd_cnx_addstate(conn, CC_STATUS_SIGNALED);
 	}
-	fd_cpu_flush_cache();
 	return;
 fatal:
 	/* An unrecoverable error occurred, stop the daemon */
+	ASSERT(0);
 	CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), );	
 }
 
@@ -582,8 +621,7 @@
 	ret = recv(conn->cc_socket, buffer, length, 0);
 	/* Handle special case of timeout */
 	if ((ret < 0) && (errno == EAGAIN)) {
-		fd_cpu_flush_cache();
-		if (! (conn->cc_status & CC_STATUS_CLOSING))
+		if (! fd_cnx_teststate(conn, CC_STATUS_CLOSING ))
 			goto again; /* don't care, just ignore */
 		if (!timedout) {
 			timedout ++; /* allow for one timeout while closing */
@@ -591,11 +629,11 @@
 		}
 	}
 	
-	CHECK_SYS_DO(ret, /* continue */);
-	
 	/* Mark the error */
-	if (ret <= 0)
+	if (ret <= 0) {
+		CHECK_SYS_DO(ret, /* continue, this is only used to log the error here */);
 		fd_cnx_markerror(conn);
+	}
 	
 	return ret;
 }
@@ -609,8 +647,7 @@
 	ret = send(conn->cc_socket, buffer, length, 0);
 	/* Handle special case of timeout */
 	if ((ret < 0) && (errno == EAGAIN)) {
-		fd_cpu_flush_cache();
-		if (! (conn->cc_status & CC_STATUS_CLOSING))
+		if (! fd_cnx_teststate(conn, CC_STATUS_CLOSING ))
 			goto again; /* don't care, just ignore */
 		if (!timedout) {
 			timedout ++; /* allow for one timeout while closing */
@@ -642,8 +679,8 @@
 	}
 	
 	ASSERT( conn->cc_proto == IPPROTO_TCP );
-	ASSERT( ! (conn->cc_status & CC_STATUS_TLS) );
-	ASSERT( Target_Queue(conn) );
+	ASSERT( ! fd_cnx_teststate(conn, CC_STATUS_TLS ) );
+	ASSERT( fd_cnx_target_queue(conn) );
 	
 	/* Receive from a TCP connection: we have to rebuild the message boundaries */
 	do {
@@ -665,7 +702,7 @@
 		length = ((size_t)header[1] << 16) + ((size_t)header[2] << 8) + (size_t)header[3];
 
 		/* Check the received word is a valid begining of a Diameter message */
-		if ((header[0] != DIAMETER_VERSION)	/* defined in <libfreeDiameter.h> */
+		if ((header[0] != DIAMETER_VERSION)	/* defined in <libfdproto.h> */
 		   || (length > DIAMETER_MSG_SIZE_MAX)) { /* to avoid too big mallocs */
 			/* The message is suspect */
 			TRACE_DEBUG(INFO, "Received suspect header [ver: %d, size: %zd], assume disconnection", (int)header[0], length);
@@ -690,8 +727,7 @@
 		}
 		
 		/* We have received a complete message, pass it to the daemon */
-		fd_cpu_flush_cache();
-		CHECK_FCT_DO( fd_event_send( Target_Queue(conn), FDEVP_CNX_MSG_RECV, length, newmsg), /* continue or destroy everything? */);
+		CHECK_FCT_DO( fd_event_send( fd_cnx_target_queue(conn), FDEVP_CNX_MSG_RECV, length, newmsg), /* continue or destroy everything? */);
 		
 	} while (conn->cc_loop);
 	
@@ -725,12 +761,11 @@
 	}
 	
 	ASSERT( conn->cc_proto == IPPROTO_SCTP );
-	ASSERT( ! (conn->cc_status & CC_STATUS_TLS) );
-	ASSERT( Target_Queue(conn) );
+	ASSERT( ! fd_cnx_teststate(conn, CC_STATUS_TLS ) );
+	ASSERT( fd_cnx_target_queue(conn) );
 	
 	do {
-		fd_cpu_flush_cache();
-		CHECK_FCT_DO( fd_sctp_recvmeta(conn->cc_socket, NULL, &buf, &bufsz, &event, &conn->cc_status), goto fatal );
+		CHECK_FCT_DO( fd_sctp_recvmeta(conn, NULL, &buf, &bufsz, &event), goto fatal );
 		if (event == FDEVP_CNX_ERROR) {
 			fd_cnx_markerror(conn);
 			goto out;
@@ -741,8 +776,7 @@
 			continue;
 		}
 		
-		fd_cpu_flush_cache();
-		CHECK_FCT_DO( fd_event_send( Target_Queue(conn), event, bufsz, buf), goto fatal );
+		CHECK_FCT_DO( fd_event_send( fd_cnx_target_queue(conn), event, bufsz, buf), goto fatal );
 		
 	} while (conn->cc_loop || (event != FDEVP_CNX_MSG_RECV));
 	
@@ -762,7 +796,7 @@
 {
 	TRACE_ENTRY("%p %i", conn, loop);
 	
-	CHECK_PARAMS( conn && Target_Queue(conn) && (!(conn->cc_status & CC_STATUS_TLS)) && (!conn->cc_loop));
+	CHECK_PARAMS( conn && fd_cnx_target_queue(conn) && (!fd_cnx_teststate(conn, CC_STATUS_TLS)) && (!conn->cc_loop));
 	
 	/* Release resources in case of a previous call was already made */
 	CHECK_FCT_DO( fd_thr_term(&conn->cc_rcvthr), /* continue */);
@@ -802,8 +836,7 @@
 		{
 			switch (ret) {
 				case GNUTLS_E_REHANDSHAKE: 
-					fd_cpu_flush_cache();
-					if (!(conn->cc_status & CC_STATUS_CLOSING))
+					if (!fd_cnx_teststate(conn, CC_STATUS_CLOSING))
 						CHECK_GNUTLS_DO( ret = gnutls_handshake(session),
 							{
 								if (TRACE_BOOL(INFO)) {
@@ -814,8 +847,7 @@
 
 				case GNUTLS_E_AGAIN:
 				case GNUTLS_E_INTERRUPTED:
-					fd_cpu_flush_cache();
-					if (!(conn->cc_status & CC_STATUS_CLOSING))
+					if (!fd_cnx_teststate(conn, CC_STATUS_CLOSING))
 						goto again;
 					TRACE_DEBUG(FULL, "Connection is closing, so abord gnutls_record_recv now.");
 					break;
@@ -848,8 +880,7 @@
 		{
 			switch (ret) {
 				case GNUTLS_E_REHANDSHAKE: 
-					fd_cpu_flush_cache();
-					if (!(conn->cc_status & CC_STATUS_CLOSING))
+					if (!fd_cnx_teststate(conn, CC_STATUS_CLOSING))
 						CHECK_GNUTLS_DO( ret = gnutls_handshake(session),
 							{
 								if (TRACE_BOOL(INFO)) {
@@ -860,8 +891,7 @@
 
 				case GNUTLS_E_AGAIN:
 				case GNUTLS_E_INTERRUPTED:
-					fd_cpu_flush_cache();
-					if (!(conn->cc_status & CC_STATUS_CLOSING))
+					if (!fd_cnx_teststate(conn, CC_STATUS_CLOSING))
 						goto again;
 					TRACE_DEBUG(INFO, "Connection is closing, so abord gnutls_record_send now.");
 					break;
@@ -926,8 +956,7 @@
 		}
 		
 		/* We have received a complete message, pass it to the daemon */
-		fd_cpu_flush_cache();
-		CHECK_FCT_DO( ret = fd_event_send( Target_Queue(conn), FDEVP_CNX_MSG_RECV, length, newmsg), 
+		CHECK_FCT_DO( ret = fd_event_send( fd_cnx_target_queue(conn), FDEVP_CNX_MSG_RECV, length, newmsg), 
 			{ 
 				free(newmsg); 
 				CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), );
@@ -955,8 +984,8 @@
 		fd_log_threadname ( buf );
 	}
 	
-	ASSERT( conn->cc_status & CC_STATUS_TLS );
-	ASSERT( Target_Queue(conn) );
+	ASSERT( fd_cnx_teststate(conn, CC_STATUS_TLS) );
+	ASSERT( fd_cnx_target_queue(conn) );
 
 	/* The next function only returns when there is an error on the socket */	
 	CHECK_FCT_DO(fd_tls_rcvthr_core(conn, conn->cc_tls_para.session), /* continue */);
@@ -1214,7 +1243,7 @@
 int fd_cnx_handshake(struct cnxctx * conn, int mode, char * priority, void * alt_creds)
 {
 	TRACE_ENTRY( "%p %d %p %p", conn, mode, priority, alt_creds);
-	CHECK_PARAMS( conn && (!(conn->cc_status & CC_STATUS_TLS)) && ( (mode == GNUTLS_CLIENT) || (mode == GNUTLS_SERVER) ) && (!conn->cc_loop) );
+	CHECK_PARAMS( conn && (!fd_cnx_teststate(conn, CC_STATUS_TLS)) && ( (mode == GNUTLS_CLIENT) || (mode == GNUTLS_SERVER) ) && (!conn->cc_loop) );
 
 	/* Save the mode */
 	conn->cc_tls_para.mode = mode;
@@ -1247,9 +1276,8 @@
 	}
 
 	/* Mark the connection as protected from here, so that the gnutls credentials will be freed */
-	fd_cpu_flush_cache();
-	conn->cc_status |= CC_STATUS_TLS;
-
+	fd_cnx_addstate(conn, CC_STATUS_TLS);
+	
 	/* Handshake master session */
 	{
 		int ret;
@@ -1298,7 +1326,7 @@
 int fd_cnx_getcred(struct cnxctx * conn, const gnutls_datum_t **cert_list, unsigned int *cert_list_size)
 {
 	TRACE_ENTRY("%p %p %p", conn, cert_list, cert_list_size);
-	CHECK_PARAMS( conn && (conn->cc_status & CC_STATUS_TLS) && cert_list && cert_list_size );
+	CHECK_PARAMS( conn && fd_cnx_teststate(conn, CC_STATUS_TLS) && cert_list && cert_list_size );
 	
 	/* This function only works for X.509 certificates. */
 	CHECK_PARAMS( gnutls_certificate_type_get (conn->cc_tls_para.session) == GNUTLS_CRT_X509 );
@@ -1359,16 +1387,29 @@
 	return EINVAL;
 }
 
+/* Where the events are sent */
+struct fifo * fd_cnx_target_queue(struct cnxctx * conn)
+{
+	struct fifo *q;
+	CHECK_POSIX_DO( pthread_mutex_lock(&state_lock), { ASSERT(0); } );
+	q = conn->cc_alt ?: conn->cc_incoming;
+	CHECK_POSIX_DO( pthread_mutex_unlock(&state_lock), { ASSERT(0); } );
+	return q;
+}
+
 /* Set an alternate FIFO list to send FDEVP_CNX_* events to */
 int fd_cnx_recv_setaltfifo(struct cnxctx * conn, struct fifo * alt_fifo)
 {
+	int ret;
 	TRACE_ENTRY( "%p %p", conn, alt_fifo );
 	CHECK_PARAMS( conn && alt_fifo && conn->cc_incoming );
 	
 	/* The magic function does it all */
-	CHECK_FCT( fd_fifo_move( conn->cc_incoming, alt_fifo, &conn->cc_alt ) );
+	CHECK_POSIX_DO( pthread_mutex_lock(&state_lock), { ASSERT(0); } );
+	CHECK_FCT_DO( ret = fd_fifo_move( conn->cc_incoming, alt_fifo, &conn->cc_alt ), );
+	CHECK_POSIX_DO( pthread_mutex_unlock(&state_lock), { ASSERT(0); } );
 	
-	return 0;
+	return ret;
 }
 
 /* Send function when no multi-stream is involved, or sending on stream #0 (send() always use stream 0)*/
@@ -1378,8 +1419,7 @@
 	size_t sent = 0;
 	TRACE_ENTRY("%p %p %zd", conn, buf, len);
 	do {
-		fd_cpu_flush_cache();
-		if (conn->cc_status & CC_STATUS_TLS) {
+		if (fd_cnx_teststate(conn, CC_STATUS_TLS)) {
 			CHECK_GNUTLS_DO( ret = fd_tls_send_handle_error(conn, conn->cc_tls_para.session, buf + sent, len - sent),  );
 		} else {
 			/* Maybe better to replace this call with sendmsg for atomic sending? */
@@ -1393,14 +1433,14 @@
 	return 0;
 }
 
-/* Send a message -- this is synchronous -- and we assume it's never called by several threads at the same time, so we don't protect. */
+/* Send a message -- this is synchronous -- and we assume it's never called by several threads at the same time (on the same conn), so we don't protect. */
 int fd_cnx_send(struct cnxctx * conn, unsigned char * buf, size_t len, uint32_t flags)
 {
 	TRACE_ENTRY("%p %p %zd %x", conn, buf, len, flags);
 	
-	CHECK_PARAMS(conn && (conn->cc_socket > 0) && (! (conn->cc_status & CC_STATUS_ERROR)) && buf && len);
+	CHECK_PARAMS(conn && (conn->cc_socket > 0) && (! fd_cnx_teststate(conn, CC_STATUS_ERROR)) && buf && len);
 
-	TRACE_DEBUG(FULL, "Sending %zdb %sdata on connection %s", len, (conn->cc_status & CC_STATUS_TLS) ? "TLS-protected ":"", conn->cc_id);
+	TRACE_DEBUG(FULL, "Sending %zdb %sdata on connection %s", len, fd_cnx_teststate(conn, CC_STATUS_TLS) ? "TLS-protected ":"", conn->cc_id);
 	
 	switch (conn->cc_proto) {
 		case IPPROTO_TCP:
@@ -1409,32 +1449,6 @@
 		
 #ifndef DISABLE_SCTP
 		case IPPROTO_SCTP: {
-			if (flags & FD_CNX_BROADCAST) {
-				/* Send the buffer over all other streams */
-				uint16_t str;
-				fd_cpu_flush_cache();
-				if (conn->cc_status & CC_STATUS_TLS) {
-					for ( str=1; str < conn->cc_sctp_para.pairs; str++) {
-						ssize_t ret;
-						size_t sent = 0;
-						do {
-							CHECK_GNUTLS_DO( ret = fd_tls_send_handle_error(conn, conn->cc_sctps_data.array[str].session, buf + sent, len - sent), );
-							if (ret <= 0)
-								return ENOTCONN;
-
-							sent += ret;
-						} while ( sent < len );
-					}
-				} else {
-					for ( str=1; str < conn->cc_sctp_para.str_out; str++) {
-						CHECK_FCT_DO( fd_sctp_sendstr(conn->cc_socket, str, buf, len, &conn->cc_status), { fd_cnx_markerror(conn); return ENOTCONN; } );
-					}
-				}
-				
-				/* Set the ORDERED flag also so that it is sent over stream 0 as well */
-				flags &= FD_CNX_ORDERED;
-			}
-			
 			if (flags & FD_CNX_ORDERED) {
 				/* We send over stream #0 */
 				CHECK_FCT( send_simple(conn, buf, len) );
@@ -1443,18 +1457,18 @@
 			
 				int another_str = 0; /* do we send over stream #0 ? */
 				
-				if ((conn->cc_sctp_para.str_out > 1) && ((! (conn->cc_status & CC_STATUS_TLS)) || (conn->cc_sctp_para.pairs > 1)))  {
+				if ((conn->cc_sctp_para.str_out > 1) && ((!fd_cnx_teststate(conn, CC_STATUS_TLS)) || (conn->cc_sctp_para.pairs > 1)))  {
 					/* Update the id of the stream we will send this message over */
 					conn->cc_sctp_para.next += 1;
-					conn->cc_sctp_para.next %= ((conn->cc_status & CC_STATUS_TLS) ? conn->cc_sctp_para.pairs : conn->cc_sctp_para.str_out);
+					conn->cc_sctp_para.next %= (fd_cnx_teststate(conn, CC_STATUS_TLS) ? conn->cc_sctp_para.pairs : conn->cc_sctp_para.str_out);
 					another_str = (conn->cc_sctp_para.next ? 1 : 0);
 				}
 
 				if ( ! another_str ) {
 					CHECK_FCT( send_simple(conn, buf, len) );
 				} else {
-					if (!(conn->cc_status & CC_STATUS_TLS)) {
-						CHECK_FCT_DO( fd_sctp_sendstr(conn->cc_socket, conn->cc_sctp_para.next, buf, len, &conn->cc_status), { fd_cnx_markerror(conn); return ENOTCONN; } );
+					if (!fd_cnx_teststate(conn, CC_STATUS_TLS)) {
+						CHECK_FCT_DO( fd_sctp_sendstr(conn, conn->cc_sctp_para.next, buf, len), { fd_cnx_markerror(conn); return ENOTCONN; } );
 					} else {
 						/* push the record to the appropriate session */
 						ssize_t ret;
@@ -1495,24 +1509,23 @@
 	
 	CHECK_PARAMS_DO(conn, return);
 	
-	fd_cpu_flush_cache();
-	conn->cc_status |= CC_STATUS_CLOSING;
+	fd_cnx_addstate(conn, CC_STATUS_CLOSING);
 	
 	/* Initiate shutdown of the TLS session(s): call gnutls_bye(WR), then read until error */
-	if (conn->cc_status & CC_STATUS_TLS) {
+	if (fd_cnx_teststate(conn, CC_STATUS_TLS)) {
 #ifndef DISABLE_SCTP
 		if (conn->cc_sctp_para.pairs > 1) {
-			if (! (conn->cc_status & CC_STATUS_ERROR )) {
+			if (! fd_cnx_teststate(conn, CC_STATUS_ERROR )) {
 				/* Bye on master session */
 				CHECK_GNUTLS_DO( gnutls_bye(conn->cc_tls_para.session, GNUTLS_SHUT_WR), fd_cnx_markerror(conn) );
 			}
 
-			if (! (conn->cc_status & CC_STATUS_ERROR ) ) {
+			if (! fd_cnx_teststate(conn, CC_STATUS_ERROR ) ) {
 				/* and other stream pairs */
 				fd_sctps_bye(conn);
 			}
 
-			if (! (conn->cc_status & CC_STATUS_ERROR ) ) {
+			if (! fd_cnx_teststate(conn, CC_STATUS_ERROR ) ) {
 				/* Now wait for all decipher threads to terminate */
 				fd_sctps_waitthreadsterm(conn);
 			} else {
@@ -1532,13 +1545,13 @@
 
 		} else {
 #endif /* DISABLE_SCTP */
-		/* We are not using the sctps wrapper layer */
-			if (! (conn->cc_status & CC_STATUS_ERROR ) ) {
+		/* We are TLS, but not using the sctps wrapper layer */
+			if (! fd_cnx_teststate(conn, CC_STATUS_ERROR ) ) {
 				/* Master session */
 				CHECK_GNUTLS_DO( gnutls_bye(conn->cc_tls_para.session, GNUTLS_SHUT_WR), fd_cnx_markerror(conn) );
 			}
 
-			if (! (conn->cc_status & CC_STATUS_ERROR ) ) {
+			if (! fd_cnx_teststate(conn, CC_STATUS_ERROR ) ) {
 				/* In this case, just wait for thread rcvthr_tls_single to terminate */
 				if (conn->cc_rcvthr != (pthread_t)NULL) {
 					CHECK_POSIX_DO(  pthread_join(conn->cc_rcvthr, NULL), /* continue */  );
@@ -1554,7 +1567,6 @@
 				GNUTLS_TRACE( gnutls_deinit(conn->cc_tls_para.session) );
 				conn->cc_tls_para.session = NULL;
 			}
-		
 #ifndef DISABLE_SCTP
 		}
 #endif /* DISABLE_SCTP */
"Welcome to our mercurial repository"