# HG changeset patch # User Sebastien Decugis # Date 1267512926 -32400 # Node ID 965f5971dc232e2587f390a3c989199d306fc019 # Parent dcb58243e91f8ea36275d05660df1d41fd97af2e Broadcast CEA over all streams to avoid possible race condition diff -r dcb58243e91f -r 965f5971dc23 freeDiameter/cnxctx.c --- a/freeDiameter/cnxctx.c Tue Mar 02 14:58:19 2010 +0900 +++ b/freeDiameter/cnxctx.c Tue Mar 02 15:55:26 2010 +0900 @@ -1367,9 +1367,9 @@ } /* Send a message -- this is synchronous -- and we assume it's never called by several threads at the same time, so we don't protect. */ -int fd_cnx_send(struct cnxctx * conn, unsigned char * buf, size_t len, int ordered) +int fd_cnx_send(struct cnxctx * conn, unsigned char * buf, size_t len, uint32_t flags) { - TRACE_ENTRY("%p %p %zd %i", conn, buf, len, ordered); + TRACE_ENTRY("%p %p %zd %x", conn, buf, len, flags); CHECK_PARAMS(conn && (conn->cc_socket > 0) && (! (conn->cc_status & CC_STATUS_ERROR)) && buf && len); @@ -1382,32 +1382,64 @@ #ifndef DISABLE_SCTP case IPPROTO_SCTP: { - int multistr = 0; - - if ((!ordered) && (conn->cc_sctp_para.str_out > 1) && ((! (conn->cc_status & CC_STATUS_TLS)) || (conn->cc_sctp_para.pairs > 1))) { - /* Update the id of the stream we will send this message on */ - conn->cc_sctp_para.next += 1; - conn->cc_sctp_para.next %= ((conn->cc_status & CC_STATUS_TLS) ? conn->cc_sctp_para.pairs : conn->cc_sctp_para.str_out); - multistr = 1; + if (flags & FD_CNX_BROADCAST) { + /* Send the buffer over all other streams */ + uint16_t str; + if (conn->cc_status & CC_STATUS_TLS) { + for ( str=1; str < conn->cc_sctp_para.pairs; str++) { + ssize_t ret; + size_t sent = 0; + do { + CHECK_GNUTLS_DO( ret = fd_tls_send_handle_error(conn, conn->cc_sctps_data.array[str].session, buf + sent, len - sent), ); + if (ret <= 0) + return ENOTCONN; + + sent += ret; + } while ( sent < len ); + } + } else { + for ( str=1; str < conn->cc_sctp_para.str_out; str++) { + CHECK_FCT_DO( fd_sctp_sendstr(conn->cc_socket, str, buf, len, &conn->cc_status), { fd_cnx_markerror(conn); return ENOTCONN; } ); + } + } + + /* Set the ORDERED flag also so that it is sent over stream 0 as well */ + flags &= FD_CNX_ORDERED; } - if ((!multistr) || (conn->cc_sctp_para.next == 0)) { + if (flags & FD_CNX_ORDERED) { + /* We send over stream #0 */ CHECK_FCT( send_simple(conn, buf, len) ); } else { - if (!(conn->cc_status & CC_STATUS_TLS)) { - CHECK_FCT_DO( fd_sctp_sendstr(conn->cc_socket, conn->cc_sctp_para.next, buf, len, &conn->cc_status), { fd_cnx_markerror(conn); return ENOTCONN; } ); + /* Default case : no flag specified */ + + int another_str = 0; /* do we send over stream #0 ? */ + + if ((conn->cc_sctp_para.str_out > 1) && ((! (conn->cc_status & CC_STATUS_TLS)) || (conn->cc_sctp_para.pairs > 1))) { + /* Update the id of the stream we will send this message over */ + conn->cc_sctp_para.next += 1; + conn->cc_sctp_para.next %= ((conn->cc_status & CC_STATUS_TLS) ? conn->cc_sctp_para.pairs : conn->cc_sctp_para.str_out); + another_str = (conn->cc_sctp_para.next ? 1 : 0); + } + + if ( ! another_str ) { + CHECK_FCT( send_simple(conn, buf, len) ); } else { - /* push the record to the appropriate session */ - ssize_t ret; - size_t sent = 0; - ASSERT(conn->cc_sctps_data.array != NULL); - do { - CHECK_GNUTLS_DO( ret = fd_tls_send_handle_error(conn, conn->cc_sctps_data.array[conn->cc_sctp_para.next].session, buf + sent, len - sent), ); - if (ret <= 0) - return ENOTCONN; - - sent += ret; - } while ( sent < len ); + if (!(conn->cc_status & CC_STATUS_TLS)) { + CHECK_FCT_DO( fd_sctp_sendstr(conn->cc_socket, conn->cc_sctp_para.next, buf, len, &conn->cc_status), { fd_cnx_markerror(conn); return ENOTCONN; } ); + } else { + /* push the record to the appropriate session */ + ssize_t ret; + size_t sent = 0; + ASSERT(conn->cc_sctps_data.array != NULL); + do { + CHECK_GNUTLS_DO( ret = fd_tls_send_handle_error(conn, conn->cc_sctps_data.array[conn->cc_sctp_para.next].session, buf + sent, len - sent), ); + if (ret <= 0) + return ENOTCONN; + + sent += ret; + } while ( sent < len ); + } } } } diff -r dcb58243e91f -r 965f5971dc23 freeDiameter/fD.h --- a/freeDiameter/fD.h Tue Mar 02 14:58:19 2010 +0900 +++ b/freeDiameter/fD.h Tue Mar 02 15:55:26 2010 +0900 @@ -239,7 +239,7 @@ } const char * fd_pev_str(int event); -/* The data structure for FDEVP_CNX_INCOMING events */ +/* The data structure for FDEVP_CNX_INCOMING event */ struct cnx_incoming { struct msg * cer; /* the CER message received on this connection */ struct cnxctx * cnx; /* The connection context */ @@ -273,7 +273,7 @@ void fd_psm_cleanup(struct fd_peer * peer, int terminate); /* Peer out */ -int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer); +int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer, uint32_t flags); int fd_out_start(struct fd_peer * peer); int fd_out_stop(struct fd_peer * peer); @@ -326,8 +326,11 @@ char * fd_cnx_getremoteid(struct cnxctx * conn); int fd_cnx_receive(struct cnxctx * conn, struct timespec * timeout, unsigned char **buf, size_t * len); int fd_cnx_recv_setaltfifo(struct cnxctx * conn, struct fifo * alt_fifo); /* send FDEVP_CNX_MSG_RECV event to the fifo list */ -int fd_cnx_send(struct cnxctx * conn, unsigned char * buf, size_t len, int ordered); +int fd_cnx_send(struct cnxctx * conn, unsigned char * buf, size_t len, uint32_t flags); void fd_cnx_destroy(struct cnxctx * conn); +/* Flags for the fd_cnx_send function : */ +#define FD_CNX_ORDERED (1 << 0) /* All messages sent with this flag set will be delivered in the same order. No guarantee on other messages */ +#define FD_CNX_BROADCAST (1 << 1) /* The message is sent over all stream pairs, in case of SCTP. No effect on TCP */ #endif /* _FD_H */ diff -r dcb58243e91f -r 965f5971dc23 freeDiameter/p_ce.c --- a/freeDiameter/p_ce.c Tue Mar 02 14:58:19 2010 +0900 +++ b/freeDiameter/p_ce.c Tue Mar 02 15:55:26 2010 +0900 @@ -587,7 +587,7 @@ /* Create and send the CEA with appropriate error code */ CHECK_FCT_DO( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, cer, MSGFL_ANSW_ERROR ), goto destroy ); CHECK_FCT_DO( fd_msg_rescode_set(*cer, rescode, errormsg, NULL, 1 ), goto destroy ); - CHECK_FCT_DO( fd_out_send(cer, recv_cnx, NULL), goto destroy ); + CHECK_FCT_DO( fd_out_send(cer, recv_cnx, NULL, FD_CNX_ORDERED), goto destroy ); /* And now destroy this connection */ destroy: @@ -605,7 +605,7 @@ /* Send CER on the new connection */ CHECK_FCT( create_CER(peer, initiator, &cer) ); - CHECK_FCT( fd_out_send(&cer, initiator, peer) ); + CHECK_FCT( fd_out_send(&cer, initiator, peer, FD_CNX_ORDERED) ); /* Are we doing an election ? */ if (peer->p_hdr.info.runtime.pir_state == STATE_WAITCNXACK_ELEC) { @@ -652,7 +652,7 @@ CHECK_FCT( fd_msg_rescode_set(*msg, "DIAMETER_COMMAND_UNSUPPORTED", "No CER allowed in current state", NULL, 1 ) ); /* msg now contains an answer message to send back */ - CHECK_FCT_DO( fd_out_send(msg, NULL, peer), /* In case of error the message has already been dumped */ ); + CHECK_FCT_DO( fd_out_send(msg, NULL, peer, FD_CNX_ORDERED), /* In case of error the message has already been dumped */ ); } /* If the state is not WAITCEA, just discard the message */ @@ -812,8 +812,7 @@ CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, &msg, 0 ) ); CHECK_FCT( fd_msg_rescode_set(msg, "DIAMETER_SUCCESS", NULL, NULL, 0 ) ); CHECK_FCT( add_CE_info(msg, peer->p_cnxctx, isi & PI_SEC_TLS_OLD, isi & PI_SEC_NONE) ); - CHECK_FCT( fd_out_send(&msg, peer->p_cnxctx, peer) ); - TODO("In case of SCTP, broadcast the CEA over all streams so that further messages cannot be delivered before the CEA?"); + CHECK_FCT( fd_out_send(&msg, peer->p_cnxctx, peer, FD_CNX_BROADCAST) ); /* Broadcast in order to avoid further messages sent over a different stream be delivered first... */ /* Handshake if needed */ if (isi & PI_SEC_TLS_OLD) { @@ -866,7 +865,7 @@ CHECK_FCT( fd_msg_rescode_set(msg, ec, NULL, NULL, 1 ) ); /* msg now contains an answer message to send back */ - CHECK_FCT_DO( fd_out_send(&msg, peer->p_cnxctx, peer), /* In case of error the message has already been dumped */ ); + CHECK_FCT_DO( fd_out_send(&msg, peer->p_cnxctx, peer, FD_CNX_ORDERED), /* In case of error the message has already been dumped */ ); } cleanup: diff -r dcb58243e91f -r 965f5971dc23 freeDiameter/p_dp.c --- a/freeDiameter/p_dp.c Tue Mar 02 14:58:19 2010 +0900 +++ b/freeDiameter/p_dp.c Tue Mar 02 15:55:26 2010 +0900 @@ -86,12 +86,12 @@ CHECK_FCT( fd_dict_search( fd_g_config->cnf_dict, DICT_ENUMVAL, ENUMVAL_BY_STRUCT, &er, &dictobj, 0 ) ); if (dictobj) { CHECK_FCT( fd_dict_getval( dictobj, &er.search ) ); - fd_log_debug("Peer '%s' sent a DPR with cause: %s\n", peer->p_hdr.info.pi_diamid, er.search.enum_name); + TRACE_DEBUG(INFO, "Peer '%s' sent a DPR with cause: %s\n", peer->p_hdr.info.pi_diamid, er.search.enum_name); } else { - fd_log_debug("Peer '%s' sent a DPR with unknown cause: %u\n", peer->p_hdr.info.pi_diamid, peer->p_hdr.info.runtime.pir_lastDC); + TRACE_DEBUG(INFO, "Peer '%s' sent a DPR with unknown cause: %u\n", peer->p_hdr.info.pi_diamid, peer->p_hdr.info.runtime.pir_lastDC); } } else { - fd_log_debug("Peer '%s' sent a DPR without Disconnect-Cause AVP\n", peer->p_hdr.info.pi_diamid); + TRACE_DEBUG(INFO, "Peer '%s' sent a DPR without Disconnect-Cause AVP\n", peer->p_hdr.info.pi_diamid); } } @@ -103,7 +103,7 @@ CHECK_FCT( fd_psm_change_state(peer, STATE_CLOSING) ); /* Now send the DPA */ - CHECK_FCT( fd_out_send( msg, NULL, peer) ); + CHECK_FCT( fd_out_send( msg, NULL, peer, FD_CNX_ORDERED) ); /* Move to CLOSED state */ fd_psm_cleanup(peer, 0); @@ -114,7 +114,7 @@ } else { /* We received a DPA */ if (peer->p_hdr.info.runtime.pir_state != STATE_CLOSING) { - TRACE_DEBUG(INFO, "Ignore DPA received in state %s", STATE_STR(peer->p_hdr.info.runtime.pir_state)); + TRACE_DEBUG(INFO, "Ignoring DPA received in state %s", STATE_STR(peer->p_hdr.info.runtime.pir_state)); } /* In theory, we should control the Result-Code AVP. But since we will not go back to OPEN state here anyway, let's skip it */ @@ -167,7 +167,7 @@ fd_psm_next_timeout(peer, 0, DPR_TIMEOUT); /* Now send the DPR message */ - CHECK_FCT_DO( fd_out_send(&msg, NULL, peer), /* ignore since we are on timeout anyway */ ); + CHECK_FCT_DO( fd_out_send(&msg, NULL, peer, FD_CNX_ORDERED), /* ignore since we are on timeout anyway */ ); return 0; } diff -r dcb58243e91f -r 965f5971dc23 freeDiameter/p_dw.c --- a/freeDiameter/p_dw.c Tue Mar 02 14:58:19 2010 +0900 +++ b/freeDiameter/p_dw.c Tue Mar 02 15:55:26 2010 +0900 @@ -75,7 +75,7 @@ CHECK_FCT( fd_msg_add_origin ( msg, 1 ) ); /* Now send this message */ - CHECK_FCT( fd_out_send(&msg, NULL, peer) ); + CHECK_FCT( fd_out_send(&msg, NULL, peer, FD_CNX_ORDERED) ); /* And mark the pending DW */ peer->p_flags.pf_dw_pending = 1; @@ -98,7 +98,7 @@ CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, msg, 0 ) ); CHECK_FCT( fd_msg_rescode_set( *msg, "DIAMETER_SUCCESS", NULL, NULL, 0 ) ); CHECK_FCT( fd_msg_add_origin ( *msg, 1 ) ); - CHECK_FCT( fd_out_send( msg, peer->p_cnxctx, peer) ); + CHECK_FCT( fd_out_send( msg, peer->p_cnxctx, peer, FD_CNX_ORDERED) ); } else { /* Just discard the DWA */ diff -r dcb58243e91f -r 965f5971dc23 freeDiameter/p_out.c --- a/freeDiameter/p_out.c Tue Mar 02 14:58:19 2010 +0900 +++ b/freeDiameter/p_out.c Tue Mar 02 15:55:26 2010 +0900 @@ -36,16 +36,16 @@ #include "fD.h" /* Alloc a new hbh for requests, bufferize the message and send on the connection, save in sentreq if provided */ -static int do_send(struct msg ** msg, struct cnxctx * cnx, uint32_t * hbh, struct sr_list * srl) +static int do_send(struct msg ** msg, uint32_t flags, struct cnxctx * cnx, uint32_t * hbh, struct sr_list * srl) { struct msg_hdr * hdr; - int msg_is_a_req, msg_is_appl; + int msg_is_a_req; uint8_t * buf; size_t sz; int ret; uint32_t bkp_hbh = 0; - TRACE_ENTRY("%p %p %p %p", msg, cnx, hbh, srl); + TRACE_ENTRY("%p %x %p %p %p", msg, flags, cnx, hbh, srl); /* Retrieve the message header */ CHECK_FCT( fd_msg_hdr(*msg, &hdr) ); @@ -59,8 +59,6 @@ *hbh = hdr->msg_hbhid + 1; } - msg_is_appl = fd_msg_is_routable(*msg); - /* Log the message */ if (TRACE_BOOL(FULL)) { CHECK_FCT_DO( fd_msg_update_length(*msg), /* continue */ ); @@ -78,7 +76,7 @@ } /* Send the message */ - CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz, !msg_is_appl), { free(buf); return ret; } ); + CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz, flags), { free(buf); return ret; } ); pthread_cleanup_pop(1); /* Free remaining messages (i.e. answers) */ @@ -121,7 +119,7 @@ pthread_cleanup_push(cleanup_requeue, msg); /* Send the message, log any error */ - CHECK_FCT_DO( do_send(&msg, peer->p_cnxctx, &peer->p_hbh, &peer->p_sr), + CHECK_FCT_DO( do_send(&msg, 0, peer->p_cnxctx, &peer->p_hbh, &peer->p_sr), { fd_log_debug("An error occurred while sending this message, it is lost:\n"); fd_msg_dump_walk(NONE, msg); @@ -138,10 +136,10 @@ return NULL; } -/* Wrapper to sending a message either by out thread (peer in OPEN state) or directly; cnx or peer must be provided */ -int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer) +/* Wrapper to sending a message either by out thread (peer in OPEN state) or directly; cnx or peer must be provided. Flags are valid only for direct sending, not through thread (unused) */ +int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer, uint32_t flags) { - TRACE_ENTRY("%p %p %p", msg, cnx, peer); + TRACE_ENTRY("%p %p %p %x", msg, cnx, peer, flags); CHECK_PARAMS( msg && *msg && (cnx || (peer && peer->p_cnxctx))); if (peer && (peer->p_hdr.info.runtime.pir_state == STATE_OPEN)) { @@ -159,7 +157,7 @@ cnx = peer->p_cnxctx; /* Do send the message */ - CHECK_FCT_DO( do_send(msg, cnx, hbh, peer ? &peer->p_sr : NULL), + CHECK_FCT_DO( do_send(msg, flags, cnx, hbh, peer ? &peer->p_sr : NULL), { fd_log_debug("An error occurred while sending this message, it is lost:\n"); fd_msg_dump_walk(NONE, *msg); diff -r dcb58243e91f -r 965f5971dc23 freeDiameter/p_psm.c --- a/freeDiameter/p_psm.c Tue Mar 02 14:58:19 2010 +0900 +++ b/freeDiameter/p_psm.c Tue Mar 02 15:55:26 2010 +0900 @@ -458,7 +458,7 @@ } else { if (msg) { /* Send the error back to the peer */ - CHECK_FCT_DO( fd_out_send(&msg, NULL, peer), /* In case of error the message has already been dumped */ ); + CHECK_FCT_DO( fd_out_send(&msg, NULL, peer, FD_CNX_ORDERED), /* In case of error the message has already been dumped */ ); if (msg) { CHECK_FCT_DO( fd_msg_free(msg), goto psm_end); } @@ -499,7 +499,7 @@ CHECK_FCT_DO( fd_msg_rescode_set(msg, "DIAMETER_INVALID_HDR_BITS", NULL, NULL, 1 ), break ); /* Send the answer */ - CHECK_FCT_DO( fd_out_send(&msg, peer->p_cnxctx, peer), break ); + CHECK_FCT_DO( fd_out_send(&msg, peer->p_cnxctx, peer, FD_CNX_ORDERED), break ); } while (0); } else { /* We did ASK for it ??? */ diff -r dcb58243e91f -r 965f5971dc23 freeDiameter/routing_dispatch.c --- a/freeDiameter/routing_dispatch.c Tue Mar 02 14:58:19 2010 +0900 +++ b/freeDiameter/routing_dispatch.c Tue Mar 02 15:55:26 2010 +0900 @@ -425,7 +425,7 @@ if (is_loc) { CHECK_FCT( fd_fifo_post(fd_g_incoming, pmsg) ); } else { - CHECK_FCT( fd_out_send(pmsg, NULL, peer) ); + CHECK_FCT( fd_out_send(pmsg, NULL, peer, 0) ); } /* Done */ @@ -825,7 +825,7 @@ hdr->msg_hbhid = qry_hdr->msg_hbhid; /* Push the message into this peer */ - CHECK_FCT( fd_out_send(pmsg, NULL, peer) ); + CHECK_FCT( fd_out_send(pmsg, NULL, peer, 0) ); /* We're done with this answer */ return 0; @@ -933,7 +933,7 @@ if (peer && (peer->p_hdr.info.runtime.pir_state == STATE_OPEN)) { /* Send to this one */ - CHECK_FCT_DO( fd_out_send(pmsg, NULL, peer), continue ); + CHECK_FCT_DO( fd_out_send(pmsg, NULL, peer, 0), continue ); /* If the sending was successful */ break;