comparison freeDiameter/p_out.c @ 229:965f5971dc23

Broadcast CEA over all streams to avoid possible race condition
author Sebastien Decugis <sdecugis@nict.go.jp>
date Tue, 02 Mar 2010 15:55:26 +0900
parents e1da03ba112f
children 5df55136361b
comparison
equal deleted inserted replaced
228:dcb58243e91f 229:965f5971dc23
34 *********************************************************************************************************/ 34 *********************************************************************************************************/
35 35
36 #include "fD.h" 36 #include "fD.h"
37 37
38 /* Alloc a new hbh for requests, bufferize the message and send on the connection, save in sentreq if provided */ 38 /* Alloc a new hbh for requests, bufferize the message and send on the connection, save in sentreq if provided */
39 static int do_send(struct msg ** msg, struct cnxctx * cnx, uint32_t * hbh, struct sr_list * srl) 39 static int do_send(struct msg ** msg, uint32_t flags, struct cnxctx * cnx, uint32_t * hbh, struct sr_list * srl)
40 { 40 {
41 struct msg_hdr * hdr; 41 struct msg_hdr * hdr;
42 int msg_is_a_req, msg_is_appl; 42 int msg_is_a_req;
43 uint8_t * buf; 43 uint8_t * buf;
44 size_t sz; 44 size_t sz;
45 int ret; 45 int ret;
46 uint32_t bkp_hbh = 0; 46 uint32_t bkp_hbh = 0;
47 47
48 TRACE_ENTRY("%p %p %p %p", msg, cnx, hbh, srl); 48 TRACE_ENTRY("%p %x %p %p %p", msg, flags, cnx, hbh, srl);
49 49
50 /* Retrieve the message header */ 50 /* Retrieve the message header */
51 CHECK_FCT( fd_msg_hdr(*msg, &hdr) ); 51 CHECK_FCT( fd_msg_hdr(*msg, &hdr) );
52 52
53 msg_is_a_req = (hdr->msg_flags & CMD_FLAG_REQUEST); 53 msg_is_a_req = (hdr->msg_flags & CMD_FLAG_REQUEST);
56 /* Alloc the hop-by-hop id and increment the value for next message */ 56 /* Alloc the hop-by-hop id and increment the value for next message */
57 bkp_hbh = hdr->msg_hbhid; 57 bkp_hbh = hdr->msg_hbhid;
58 hdr->msg_hbhid = *hbh; 58 hdr->msg_hbhid = *hbh;
59 *hbh = hdr->msg_hbhid + 1; 59 *hbh = hdr->msg_hbhid + 1;
60 } 60 }
61
62 msg_is_appl = fd_msg_is_routable(*msg);
63 61
64 /* Log the message */ 62 /* Log the message */
65 if (TRACE_BOOL(FULL)) { 63 if (TRACE_BOOL(FULL)) {
66 CHECK_FCT_DO( fd_msg_update_length(*msg), /* continue */ ); 64 CHECK_FCT_DO( fd_msg_update_length(*msg), /* continue */ );
67 TRACE_DEBUG(FULL, "Sending the following message on connection '%s':", fd_cnx_getid(cnx)); 65 TRACE_DEBUG(FULL, "Sending the following message on connection '%s':", fd_cnx_getid(cnx));
76 if (msg_is_a_req) { 74 if (msg_is_a_req) {
77 CHECK_FCT_DO( ret = fd_p_sr_store(srl, msg, &hdr->msg_hbhid, bkp_hbh), { free(buf); return ret; } ); 75 CHECK_FCT_DO( ret = fd_p_sr_store(srl, msg, &hdr->msg_hbhid, bkp_hbh), { free(buf); return ret; } );
78 } 76 }
79 77
80 /* Send the message */ 78 /* Send the message */
81 CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz, !msg_is_appl), { free(buf); return ret; } ); 79 CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz, flags), { free(buf); return ret; } );
82 pthread_cleanup_pop(1); 80 pthread_cleanup_pop(1);
83 81
84 /* Free remaining messages (i.e. answers) */ 82 /* Free remaining messages (i.e. answers) */
85 if (*msg) { 83 if (*msg) {
86 CHECK_FCT( fd_msg_free(*msg) ); 84 CHECK_FCT( fd_msg_free(*msg) );
119 117
120 /* Now if we are cancelled, we requeue this message */ 118 /* Now if we are cancelled, we requeue this message */
121 pthread_cleanup_push(cleanup_requeue, msg); 119 pthread_cleanup_push(cleanup_requeue, msg);
122 120
123 /* Send the message, log any error */ 121 /* Send the message, log any error */
124 CHECK_FCT_DO( do_send(&msg, peer->p_cnxctx, &peer->p_hbh, &peer->p_sr), 122 CHECK_FCT_DO( do_send(&msg, 0, peer->p_cnxctx, &peer->p_hbh, &peer->p_sr),
125 { 123 {
126 fd_log_debug("An error occurred while sending this message, it is lost:\n"); 124 fd_log_debug("An error occurred while sending this message, it is lost:\n");
127 fd_msg_dump_walk(NONE, msg); 125 fd_msg_dump_walk(NONE, msg);
128 fd_msg_free(msg); 126 fd_msg_free(msg);
129 } ); 127 } );
136 /* It is not really a connection error, but the effect is the same, we are not able to send anymore message */ 134 /* It is not really a connection error, but the effect is the same, we are not able to send anymore message */
137 CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ ); 135 CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ );
138 return NULL; 136 return NULL;
139 } 137 }
140 138
141 /* Wrapper to sending a message either by out thread (peer in OPEN state) or directly; cnx or peer must be provided */ 139 /* Wrapper to sending a message either by out thread (peer in OPEN state) or directly; cnx or peer must be provided. Flags are valid only for direct sending, not through thread (unused) */
142 int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer) 140 int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer, uint32_t flags)
143 { 141 {
144 TRACE_ENTRY("%p %p %p", msg, cnx, peer); 142 TRACE_ENTRY("%p %p %p %x", msg, cnx, peer, flags);
145 CHECK_PARAMS( msg && *msg && (cnx || (peer && peer->p_cnxctx))); 143 CHECK_PARAMS( msg && *msg && (cnx || (peer && peer->p_cnxctx)));
146 144
147 if (peer && (peer->p_hdr.info.runtime.pir_state == STATE_OPEN)) { 145 if (peer && (peer->p_hdr.info.runtime.pir_state == STATE_OPEN)) {
148 /* Normal case: just queue for the out thread to pick it up */ 146 /* Normal case: just queue for the out thread to pick it up */
149 CHECK_FCT( fd_fifo_post(peer->p_tosend, msg) ); 147 CHECK_FCT( fd_fifo_post(peer->p_tosend, msg) );
157 155
158 if (!cnx) 156 if (!cnx)
159 cnx = peer->p_cnxctx; 157 cnx = peer->p_cnxctx;
160 158
161 /* Do send the message */ 159 /* Do send the message */
162 CHECK_FCT_DO( do_send(msg, cnx, hbh, peer ? &peer->p_sr : NULL), 160 CHECK_FCT_DO( do_send(msg, flags, cnx, hbh, peer ? &peer->p_sr : NULL),
163 { 161 {
164 fd_log_debug("An error occurred while sending this message, it is lost:\n"); 162 fd_log_debug("An error occurred while sending this message, it is lost:\n");
165 fd_msg_dump_walk(NONE, *msg); 163 fd_msg_dump_walk(NONE, *msg);
166 fd_msg_free(*msg); 164 fd_msg_free(*msg);
167 *msg = NULL; 165 *msg = NULL;
"Welcome to our mercurial repository"