changeset 229:965f5971dc23

Broadcast CEA over all streams to avoid possible race condition
author Sebastien Decugis <sdecugis@nict.go.jp>
date Tue, 02 Mar 2010 15:55:26 +0900
parents dcb58243e91f
children 5b17534180f1
files freeDiameter/cnxctx.c freeDiameter/fD.h freeDiameter/p_ce.c freeDiameter/p_dp.c freeDiameter/p_dw.c freeDiameter/p_out.c freeDiameter/p_psm.c freeDiameter/routing_dispatch.c
diffstat 8 files changed, 88 insertions(+), 56 deletions(-) [+]
line wrap: on
line diff
--- a/freeDiameter/cnxctx.c	Tue Mar 02 14:58:19 2010 +0900
+++ b/freeDiameter/cnxctx.c	Tue Mar 02 15:55:26 2010 +0900
@@ -1367,9 +1367,9 @@
 }
 
 /* Send a message -- this is synchronous -- and we assume it's never called by several threads at the same time, so we don't protect. */
-int fd_cnx_send(struct cnxctx * conn, unsigned char * buf, size_t len, int ordered)
+int fd_cnx_send(struct cnxctx * conn, unsigned char * buf, size_t len, uint32_t flags)
 {
-	TRACE_ENTRY("%p %p %zd %i", conn, buf, len, ordered);
+	TRACE_ENTRY("%p %p %zd %x", conn, buf, len, flags);
 	
 	CHECK_PARAMS(conn && (conn->cc_socket > 0) && (! (conn->cc_status & CC_STATUS_ERROR)) && buf && len);
 
@@ -1382,32 +1382,64 @@
 		
 #ifndef DISABLE_SCTP
 		case IPPROTO_SCTP: {
-			int multistr = 0;
-			
-			if ((!ordered) && (conn->cc_sctp_para.str_out > 1) && ((! (conn->cc_status & CC_STATUS_TLS)) || (conn->cc_sctp_para.pairs > 1)))  {
-				/* Update the id of the stream we will send this message on */
-				conn->cc_sctp_para.next += 1;
-				conn->cc_sctp_para.next %= ((conn->cc_status & CC_STATUS_TLS) ? conn->cc_sctp_para.pairs : conn->cc_sctp_para.str_out);
-				multistr = 1;
+			if (flags & FD_CNX_BROADCAST) {
+				/* Send the buffer over all other streams */
+				uint16_t str;
+				if (conn->cc_status & CC_STATUS_TLS) {
+					for ( str=1; str < conn->cc_sctp_para.pairs; str++) {
+						ssize_t ret;
+						size_t sent = 0;
+						do {
+							CHECK_GNUTLS_DO( ret = fd_tls_send_handle_error(conn, conn->cc_sctps_data.array[str].session, buf + sent, len - sent), );
+							if (ret <= 0)
+								return ENOTCONN;
+
+							sent += ret;
+						} while ( sent < len );
+					}
+				} else {
+					for ( str=1; str < conn->cc_sctp_para.str_out; str++) {
+						CHECK_FCT_DO( fd_sctp_sendstr(conn->cc_socket, str, buf, len, &conn->cc_status), { fd_cnx_markerror(conn); return ENOTCONN; } );
+					}
+				}
+				
+				/* Set the ORDERED flag also so that it is sent over stream 0 as well */
+				flags &= FD_CNX_ORDERED;
 			}
 			
-			if ((!multistr) || (conn->cc_sctp_para.next == 0)) {
+			if (flags & FD_CNX_ORDERED) {
+				/* We send over stream #0 */
 				CHECK_FCT( send_simple(conn, buf, len) );
 			} else {
-				if (!(conn->cc_status & CC_STATUS_TLS)) {
-					CHECK_FCT_DO( fd_sctp_sendstr(conn->cc_socket, conn->cc_sctp_para.next, buf, len, &conn->cc_status), { fd_cnx_markerror(conn); return ENOTCONN; } );
+				/* Default case : no flag specified */
+			
+				int another_str = 0; /* do we send over stream #0 ? */
+				
+				if ((conn->cc_sctp_para.str_out > 1) && ((! (conn->cc_status & CC_STATUS_TLS)) || (conn->cc_sctp_para.pairs > 1)))  {
+					/* Update the id of the stream we will send this message over */
+					conn->cc_sctp_para.next += 1;
+					conn->cc_sctp_para.next %= ((conn->cc_status & CC_STATUS_TLS) ? conn->cc_sctp_para.pairs : conn->cc_sctp_para.str_out);
+					another_str = (conn->cc_sctp_para.next ? 1 : 0);
+				}
+
+				if ( ! another_str ) {
+					CHECK_FCT( send_simple(conn, buf, len) );
 				} else {
-					/* push the record to the appropriate session */
-					ssize_t ret;
-					size_t sent = 0;
-					ASSERT(conn->cc_sctps_data.array != NULL);
-					do {
-						CHECK_GNUTLS_DO( ret = fd_tls_send_handle_error(conn, conn->cc_sctps_data.array[conn->cc_sctp_para.next].session, buf + sent, len - sent), );
-						if (ret <= 0)
-							return ENOTCONN;
-						
-						sent += ret;
-					} while ( sent < len );
+					if (!(conn->cc_status & CC_STATUS_TLS)) {
+						CHECK_FCT_DO( fd_sctp_sendstr(conn->cc_socket, conn->cc_sctp_para.next, buf, len, &conn->cc_status), { fd_cnx_markerror(conn); return ENOTCONN; } );
+					} else {
+						/* push the record to the appropriate session */
+						ssize_t ret;
+						size_t sent = 0;
+						ASSERT(conn->cc_sctps_data.array != NULL);
+						do {
+							CHECK_GNUTLS_DO( ret = fd_tls_send_handle_error(conn, conn->cc_sctps_data.array[conn->cc_sctp_para.next].session, buf + sent, len - sent), );
+							if (ret <= 0)
+								return ENOTCONN;
+
+							sent += ret;
+						} while ( sent < len );
+					}
 				}
 			}
 		}
--- a/freeDiameter/fD.h	Tue Mar 02 14:58:19 2010 +0900
+++ b/freeDiameter/fD.h	Tue Mar 02 15:55:26 2010 +0900
@@ -239,7 +239,7 @@
 }
 const char * fd_pev_str(int event);
 
-/* The data structure for FDEVP_CNX_INCOMING events */
+/* The data structure for FDEVP_CNX_INCOMING event */
 struct cnx_incoming {
 	struct msg	* cer;		/* the CER message received on this connection */
 	struct cnxctx	* cnx;		/* The connection context */
@@ -273,7 +273,7 @@
 void fd_psm_cleanup(struct fd_peer * peer, int terminate);
 
 /* Peer out */
-int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer);
+int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer, uint32_t flags);
 int fd_out_start(struct fd_peer * peer);
 int fd_out_stop(struct fd_peer * peer);
 
@@ -326,8 +326,11 @@
 char *          fd_cnx_getremoteid(struct cnxctx * conn);
 int             fd_cnx_receive(struct cnxctx * conn, struct timespec * timeout, unsigned char **buf, size_t * len);
 int             fd_cnx_recv_setaltfifo(struct cnxctx * conn, struct fifo * alt_fifo); /* send FDEVP_CNX_MSG_RECV event to the fifo list */
-int             fd_cnx_send(struct cnxctx * conn, unsigned char * buf, size_t len, int ordered);
+int             fd_cnx_send(struct cnxctx * conn, unsigned char * buf, size_t len, uint32_t flags);
 void            fd_cnx_destroy(struct cnxctx * conn);
 
+/* Flags for the fd_cnx_send function : */
+#define FD_CNX_ORDERED		(1 << 0)	/* All messages sent with this flag set will be delivered in the same order. No guarantee on other messages */
+#define FD_CNX_BROADCAST	(1 << 1)	/* The message is sent over all stream pairs, in case of SCTP. No effect on TCP */
 
 #endif /* _FD_H */
--- a/freeDiameter/p_ce.c	Tue Mar 02 14:58:19 2010 +0900
+++ b/freeDiameter/p_ce.c	Tue Mar 02 15:55:26 2010 +0900
@@ -587,7 +587,7 @@
 	/* Create and send the CEA with appropriate error code */
 	CHECK_FCT_DO( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, cer, MSGFL_ANSW_ERROR ), goto destroy );
 	CHECK_FCT_DO( fd_msg_rescode_set(*cer, rescode, errormsg, NULL, 1 ), goto destroy );
-	CHECK_FCT_DO( fd_out_send(cer, recv_cnx, NULL), goto destroy );
+	CHECK_FCT_DO( fd_out_send(cer, recv_cnx, NULL, FD_CNX_ORDERED), goto destroy );
 	
 	/* And now destroy this connection */
 destroy:
@@ -605,7 +605,7 @@
 	
 	/* Send CER on the new connection */
 	CHECK_FCT( create_CER(peer, initiator, &cer) );
-	CHECK_FCT( fd_out_send(&cer, initiator, peer) );
+	CHECK_FCT( fd_out_send(&cer, initiator, peer, FD_CNX_ORDERED) );
 	
 	/* Are we doing an election ? */
 	if (peer->p_hdr.info.runtime.pir_state == STATE_WAITCNXACK_ELEC) {
@@ -652,7 +652,7 @@
 		CHECK_FCT( fd_msg_rescode_set(*msg, "DIAMETER_COMMAND_UNSUPPORTED", "No CER allowed in current state", NULL, 1 ) );
 
 		/* msg now contains an answer message to send back */
-		CHECK_FCT_DO( fd_out_send(msg, NULL, peer), /* In case of error the message has already been dumped */ );
+		CHECK_FCT_DO( fd_out_send(msg, NULL, peer, FD_CNX_ORDERED), /* In case of error the message has already been dumped */ );
 	}
 	
 	/* If the state is not WAITCEA, just discard the message */
@@ -812,8 +812,7 @@
 	CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, &msg, 0 ) );
 	CHECK_FCT( fd_msg_rescode_set(msg, "DIAMETER_SUCCESS", NULL, NULL, 0 ) );
 	CHECK_FCT( add_CE_info(msg, peer->p_cnxctx, isi & PI_SEC_TLS_OLD, isi & PI_SEC_NONE) );
-	CHECK_FCT( fd_out_send(&msg, peer->p_cnxctx, peer) );
-	TODO("In case of SCTP, broadcast the CEA over all streams so that further messages cannot be delivered before the CEA?");
+	CHECK_FCT( fd_out_send(&msg, peer->p_cnxctx, peer, FD_CNX_BROADCAST) ); /* Broadcast in order to avoid further messages sent over a different stream be delivered first... */
 	
 	/* Handshake if needed */
 	if (isi & PI_SEC_TLS_OLD) {
@@ -866,7 +865,7 @@
 		CHECK_FCT( fd_msg_rescode_set(msg, ec, NULL, NULL, 1 ) );
 
 		/* msg now contains an answer message to send back */
-		CHECK_FCT_DO( fd_out_send(&msg, peer->p_cnxctx, peer), /* In case of error the message has already been dumped */ );
+		CHECK_FCT_DO( fd_out_send(&msg, peer->p_cnxctx, peer, FD_CNX_ORDERED), /* In case of error the message has already been dumped */ );
 	}
 	
 cleanup:
--- a/freeDiameter/p_dp.c	Tue Mar 02 14:58:19 2010 +0900
+++ b/freeDiameter/p_dp.c	Tue Mar 02 15:55:26 2010 +0900
@@ -86,12 +86,12 @@
 				CHECK_FCT( fd_dict_search( fd_g_config->cnf_dict, DICT_ENUMVAL, ENUMVAL_BY_STRUCT, &er, &dictobj, 0 )  );
 				if (dictobj) {
 					CHECK_FCT( fd_dict_getval( dictobj, &er.search ) );
-					fd_log_debug("Peer '%s' sent a DPR with cause: %s\n", peer->p_hdr.info.pi_diamid, er.search.enum_name);
+					TRACE_DEBUG(INFO, "Peer '%s' sent a DPR with cause: %s\n", peer->p_hdr.info.pi_diamid, er.search.enum_name);
 				} else {
-					fd_log_debug("Peer '%s' sent a DPR with unknown cause: %u\n", peer->p_hdr.info.pi_diamid, peer->p_hdr.info.runtime.pir_lastDC);
+					TRACE_DEBUG(INFO, "Peer '%s' sent a DPR with unknown cause: %u\n", peer->p_hdr.info.pi_diamid, peer->p_hdr.info.runtime.pir_lastDC);
 				}
 			} else {
-				fd_log_debug("Peer '%s' sent a DPR without Disconnect-Cause AVP\n", peer->p_hdr.info.pi_diamid);
+				TRACE_DEBUG(INFO, "Peer '%s' sent a DPR without Disconnect-Cause AVP\n", peer->p_hdr.info.pi_diamid);
 			}
 		}
 		
@@ -103,7 +103,7 @@
 		CHECK_FCT( fd_psm_change_state(peer, STATE_CLOSING) );
 		
 		/* Now send the DPA */
-		CHECK_FCT( fd_out_send( msg, NULL, peer) );
+		CHECK_FCT( fd_out_send( msg, NULL, peer, FD_CNX_ORDERED) );
 		
 		/* Move to CLOSED state */
 		fd_psm_cleanup(peer, 0);
@@ -114,7 +114,7 @@
 	} else {
 		/* We received a DPA */
 		if (peer->p_hdr.info.runtime.pir_state != STATE_CLOSING) {
-			TRACE_DEBUG(INFO, "Ignore DPA received in state %s", STATE_STR(peer->p_hdr.info.runtime.pir_state));
+			TRACE_DEBUG(INFO, "Ignoring DPA received in state %s", STATE_STR(peer->p_hdr.info.runtime.pir_state));
 		}
 			
 		/* In theory, we should control the Result-Code AVP. But since we will not go back to OPEN state here anyway, let's skip it */
@@ -167,7 +167,7 @@
 	fd_psm_next_timeout(peer, 0, DPR_TIMEOUT);
 	
 	/* Now send the DPR message */
-	CHECK_FCT_DO( fd_out_send(&msg, NULL, peer), /* ignore since we are on timeout anyway */ );
+	CHECK_FCT_DO( fd_out_send(&msg, NULL, peer, FD_CNX_ORDERED), /* ignore since we are on timeout anyway */ );
 	
 	return 0;
 }
--- a/freeDiameter/p_dw.c	Tue Mar 02 14:58:19 2010 +0900
+++ b/freeDiameter/p_dw.c	Tue Mar 02 15:55:26 2010 +0900
@@ -75,7 +75,7 @@
 	CHECK_FCT( fd_msg_add_origin ( msg, 1 ) );
 	
 	/* Now send this message */
-	CHECK_FCT( fd_out_send(&msg, NULL, peer) );
+	CHECK_FCT( fd_out_send(&msg, NULL, peer, FD_CNX_ORDERED) );
 	
 	/* And mark the pending DW */
 	peer->p_flags.pf_dw_pending = 1;
@@ -98,7 +98,7 @@
 		CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, msg, 0 ) );
 		CHECK_FCT( fd_msg_rescode_set( *msg, "DIAMETER_SUCCESS", NULL, NULL, 0 ) );
 		CHECK_FCT( fd_msg_add_origin ( *msg, 1 ) );
-		CHECK_FCT( fd_out_send( msg, peer->p_cnxctx, peer) );
+		CHECK_FCT( fd_out_send( msg, peer->p_cnxctx, peer, FD_CNX_ORDERED) );
 		
 	} else {
 		/* Just discard the DWA */
--- a/freeDiameter/p_out.c	Tue Mar 02 14:58:19 2010 +0900
+++ b/freeDiameter/p_out.c	Tue Mar 02 15:55:26 2010 +0900
@@ -36,16 +36,16 @@
 #include "fD.h"
 
 /* Alloc a new hbh for requests, bufferize the message and send on the connection, save in sentreq if provided */
-static int do_send(struct msg ** msg, struct cnxctx * cnx, uint32_t * hbh, struct sr_list * srl)
+static int do_send(struct msg ** msg, uint32_t flags, struct cnxctx * cnx, uint32_t * hbh, struct sr_list * srl)
 {
 	struct msg_hdr * hdr;
-	int msg_is_a_req, msg_is_appl;
+	int msg_is_a_req;
 	uint8_t * buf;
 	size_t sz;
 	int ret;
 	uint32_t bkp_hbh = 0;
 	
-	TRACE_ENTRY("%p %p %p %p", msg, cnx, hbh, srl);
+	TRACE_ENTRY("%p %x %p %p %p", msg, flags, cnx, hbh, srl);
 	
 	/* Retrieve the message header */
 	CHECK_FCT( fd_msg_hdr(*msg, &hdr) );
@@ -59,8 +59,6 @@
 		*hbh = hdr->msg_hbhid + 1;
 	}
 	
-	msg_is_appl = fd_msg_is_routable(*msg);
-	
 	/* Log the message */
 	if (TRACE_BOOL(FULL)) {
 		CHECK_FCT_DO(  fd_msg_update_length(*msg), /* continue */  );
@@ -78,7 +76,7 @@
 	}
 	
 	/* Send the message */
-	CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz, !msg_is_appl), { free(buf); return ret; } );
+	CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz, flags), { free(buf); return ret; } );
 	pthread_cleanup_pop(1);
 	
 	/* Free remaining messages (i.e. answers) */
@@ -121,7 +119,7 @@
 		pthread_cleanup_push(cleanup_requeue, msg);
 		
 		/* Send the message, log any error */
-		CHECK_FCT_DO( do_send(&msg, peer->p_cnxctx, &peer->p_hbh, &peer->p_sr),
+		CHECK_FCT_DO( do_send(&msg, 0, peer->p_cnxctx, &peer->p_hbh, &peer->p_sr),
 			{
 				fd_log_debug("An error occurred while sending this message, it is lost:\n");
 				fd_msg_dump_walk(NONE, msg);
@@ -138,10 +136,10 @@
 	return NULL;
 }
 
-/* Wrapper to sending a message either by out thread (peer in OPEN state) or directly; cnx or peer must be provided */
-int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer)
+/* Wrapper to sending a message either by out thread (peer in OPEN state) or directly; cnx or peer must be provided. Flags are valid only for direct sending, not through thread (unused) */
+int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer, uint32_t flags)
 {
-	TRACE_ENTRY("%p %p %p", msg, cnx, peer);
+	TRACE_ENTRY("%p %p %p %x", msg, cnx, peer, flags);
 	CHECK_PARAMS( msg && *msg && (cnx || (peer && peer->p_cnxctx)));
 	
 	if (peer && (peer->p_hdr.info.runtime.pir_state == STATE_OPEN)) {
@@ -159,7 +157,7 @@
 			cnx = peer->p_cnxctx;
 
 		/* Do send the message */
-		CHECK_FCT_DO( do_send(msg, cnx, hbh, peer ? &peer->p_sr : NULL),
+		CHECK_FCT_DO( do_send(msg, flags, cnx, hbh, peer ? &peer->p_sr : NULL),
 			{
 				fd_log_debug("An error occurred while sending this message, it is lost:\n");
 				fd_msg_dump_walk(NONE, *msg);
--- a/freeDiameter/p_psm.c	Tue Mar 02 14:58:19 2010 +0900
+++ b/freeDiameter/p_psm.c	Tue Mar 02 15:55:26 2010 +0900
@@ -458,7 +458,7 @@
 			} else {
 				if (msg) {
 					/* Send the error back to the peer */
-					CHECK_FCT_DO( fd_out_send(&msg, NULL, peer), /* In case of error the message has already been dumped */ );
+					CHECK_FCT_DO( fd_out_send(&msg, NULL, peer, FD_CNX_ORDERED), /* In case of error the message has already been dumped */ );
 					if (msg) {
 						CHECK_FCT_DO( fd_msg_free(msg), goto psm_end);
 					}
@@ -499,7 +499,7 @@
 						CHECK_FCT_DO( fd_msg_rescode_set(msg, "DIAMETER_INVALID_HDR_BITS", NULL, NULL, 1 ), break );
 
 						/* Send the answer */
-						CHECK_FCT_DO( fd_out_send(&msg, peer->p_cnxctx, peer), break );
+						CHECK_FCT_DO( fd_out_send(&msg, peer->p_cnxctx, peer, FD_CNX_ORDERED), break );
 					} while (0);
 				} else {
 					/* We did ASK for it ??? */
--- a/freeDiameter/routing_dispatch.c	Tue Mar 02 14:58:19 2010 +0900
+++ b/freeDiameter/routing_dispatch.c	Tue Mar 02 15:55:26 2010 +0900
@@ -425,7 +425,7 @@
 	if (is_loc) {
 		CHECK_FCT( fd_fifo_post(fd_g_incoming, pmsg) );
 	} else {
-		CHECK_FCT( fd_out_send(pmsg, NULL, peer) );
+		CHECK_FCT( fd_out_send(pmsg, NULL, peer, 0) );
 	}
 	
 	/* Done */
@@ -825,7 +825,7 @@
 		hdr->msg_hbhid = qry_hdr->msg_hbhid;
 
 		/* Push the message into this peer */
-		CHECK_FCT( fd_out_send(pmsg, NULL, peer) );
+		CHECK_FCT( fd_out_send(pmsg, NULL, peer, 0) );
 
 		/* We're done with this answer */
 		return 0;
@@ -933,7 +933,7 @@
 
 		if (peer && (peer->p_hdr.info.runtime.pir_state == STATE_OPEN)) {
 			/* Send to this one */
-			CHECK_FCT_DO( fd_out_send(pmsg, NULL, peer), continue );
+			CHECK_FCT_DO( fd_out_send(pmsg, NULL, peer, 0), continue );
 			
 			/* If the sending was successful */
 			break;
"Welcome to our mercurial repository"