comparison libfdcore/p_out.c @ 658:f198d16fa7f4

Initial commit for 1.1.0: * Restructuring: * libfreeDiameter: - renamed folder & binary into libfdproto - renamed libfD.h into fdproto-internal.h - removed signals management (replaced by triggers in libfdcore) * freeDiameter split into: - libfdcore (most contents) - renamed fD.h into fdcore-internal.h - added core.c for framework init/shutdown. - new triggers mechanism in events.c. - freeDiameterd (main, command line parsing, signals management) * tests: - now in top-level directory tests. * other changes: - fd_dict_new now returns 0 on duplicate identical entries. - fixes in dict_legacy_xml - fixes in some dictionaries - moved FD_DEFAULT_CONF_FILENAME definition to freeDiameter-host.h
author Sebastien Decugis <sdecugis@nict.go.jp>
date Fri, 14 Jan 2011 15:15:23 +0900
parents freeDiameter/p_out.c@7250e91f0662
children 2e94ef0515d7
comparison
equal deleted inserted replaced
656:5b05d85682f1 658:f198d16fa7f4
1 /*********************************************************************************************************
2 * Software License Agreement (BSD License) *
3 * Author: Sebastien Decugis <sdecugis@nict.go.jp> *
4 * *
5 * Copyright (c) 2010, WIDE Project and NICT *
6 * All rights reserved. *
7 * *
8 * Redistribution and use of this software in source and binary forms, with or without modification, are *
9 * permitted provided that the following conditions are met: *
10 * *
11 * * Redistributions of source code must retain the above *
12 * copyright notice, this list of conditions and the *
13 * following disclaimer. *
14 * *
15 * * Redistributions in binary form must reproduce the above *
16 * copyright notice, this list of conditions and the *
17 * following disclaimer in the documentation and/or other *
18 * materials provided with the distribution. *
19 * *
20 * * Neither the name of the WIDE Project or NICT nor the *
21 * names of its contributors may be used to endorse or *
22 * promote products derived from this software without *
23 * specific prior written permission of WIDE Project and *
24 * NICT. *
25 * *
26 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
27 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
28 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
29 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT *
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS *
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
32 * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF *
33 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *
34 *********************************************************************************************************/
35
36 #include "fdcore-internal.h"
37
38 /* Alloc a new hbh for requests, bufferize the message and send on the connection, save in sentreq if provided */
39 static int do_send(struct msg ** msg, uint32_t flags, struct cnxctx * cnx, uint32_t * hbh, struct sr_list * srl)
40 {
41 struct msg_hdr * hdr;
42 int msg_is_a_req;
43 uint8_t * buf;
44 size_t sz;
45 int ret;
46 uint32_t bkp_hbh = 0;
47
48 TRACE_ENTRY("%p %x %p %p %p", msg, flags, cnx, hbh, srl);
49
50 /* Retrieve the message header */
51 CHECK_FCT( fd_msg_hdr(*msg, &hdr) );
52
53 msg_is_a_req = (hdr->msg_flags & CMD_FLAG_REQUEST);
54 if (msg_is_a_req) {
55 CHECK_PARAMS(hbh && srl);
56 /* Alloc the hop-by-hop id and increment the value for next message */
57 bkp_hbh = hdr->msg_hbhid;
58 hdr->msg_hbhid = *hbh;
59 *hbh = hdr->msg_hbhid + 1;
60 }
61
62 /* Log the message */
63 if (TRACE_BOOL(FULL)) {
64 CHECK_FCT_DO( fd_msg_update_length(*msg), /* continue */ );
65 TRACE_DEBUG(FULL, "Sending the following message on connection '%s':", fd_cnx_getid(cnx));
66 fd_msg_dump_walk(FULL, *msg);
67 }
68
69 /* Create the message buffer */
70 CHECK_FCT(fd_msg_bufferize( *msg, &buf, &sz ));
71 pthread_cleanup_push( free, buf );
72
73 /* Save a request before sending so that there is no race condition with the answer */
74 if (msg_is_a_req) {
75 CHECK_FCT_DO( ret = fd_p_sr_store(srl, msg, &hdr->msg_hbhid, bkp_hbh), { free(buf); return ret; } );
76 }
77
78 /* Send the message */
79 CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz, flags), { free(buf); return ret; } );
80 pthread_cleanup_pop(1);
81
82 /* Free remaining messages (i.e. answers) */
83 if (*msg) {
84 CHECK_FCT( fd_msg_free(*msg) );
85 *msg = NULL;
86 }
87
88 return 0;
89 }
90
91 static void cleanup_requeue(void * arg)
92 {
93 struct msg *msg = arg;
94 CHECK_FCT_DO(fd_fifo_post(fd_g_outgoing, &msg),
95 CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */));
96 }
97
98 /* The code of the "out" thread */
99 static void * out_thr(void * arg)
100 {
101 struct fd_peer * peer = arg;
102 ASSERT( CHECK_PEER(peer) );
103
104 /* Set the thread name */
105 {
106 char buf[48];
107 sprintf(buf, "OUT/%.*s", (int)sizeof(buf) - 5, peer->p_hdr.info.pi_diamid);
108 fd_log_threadname ( buf );
109 }
110
111 /* Loop until cancelation */
112 while (1) {
113 struct msg * msg;
114
115 /* Retrieve next message to send */
116 CHECK_FCT_DO( fd_fifo_get(peer->p_tosend, &msg), goto error );
117
118 /* Now if we are cancelled, we requeue this message */
119 pthread_cleanup_push(cleanup_requeue, msg);
120
121 /* Send the message, log any error */
122 CHECK_FCT_DO( do_send(&msg, 0, peer->p_cnxctx, &peer->p_hbh, &peer->p_sr),
123 {
124 if (msg) {
125 fd_log_debug("An error occurred while sending this message, it was lost:\n");
126 fd_msg_dump_walk(NONE, msg);
127 fd_msg_free(msg);
128 }
129 } );
130
131 /* Loop */
132 pthread_cleanup_pop(0);
133 }
134
135 error:
136 /* It is not really a connection error, but the effect is the same, we are not able to send anymore message */
137 CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ );
138 return NULL;
139 }
140
141 /* Wrapper to sending a message either by out thread (peer in OPEN state) or directly; cnx or peer must be provided. Flags are valid only for direct sending, not through thread (unused) */
142 int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer, uint32_t flags)
143 {
144 TRACE_ENTRY("%p %p %p %x", msg, cnx, peer, flags);
145 CHECK_PARAMS( msg && *msg && (cnx || (peer && peer->p_cnxctx)));
146
147 fd_cpu_flush_cache();
148 if (peer && (peer->p_hdr.info.runtime.pir_state == STATE_OPEN)) {
149 /* Normal case: just queue for the out thread to pick it up */
150 CHECK_FCT( fd_fifo_post(peer->p_tosend, msg) );
151
152 } else {
153 uint32_t *hbh = NULL;
154
155 /* In other cases, the thread is not running, so we handle the sending directly */
156 if (peer)
157 hbh = &peer->p_hbh;
158
159 if (!cnx)
160 cnx = peer->p_cnxctx;
161
162 /* Do send the message */
163 CHECK_FCT_DO( do_send(msg, flags, cnx, hbh, peer ? &peer->p_sr : NULL),
164 {
165 if (msg) {
166 fd_log_debug("An error occurred while sending this message, it was lost:\n");
167 fd_msg_dump_walk(NONE, *msg);
168 fd_msg_free(*msg);
169 *msg = NULL;
170 }
171 } );
172 }
173
174 return 0;
175 }
176
177 /* Start the "out" thread that picks messages in p_tosend and send them on p_cnxctx */
178 int fd_out_start(struct fd_peer * peer)
179 {
180 TRACE_ENTRY("%p", peer);
181 CHECK_PARAMS( CHECK_PEER(peer) && (peer->p_outthr == (pthread_t)NULL) );
182
183 CHECK_POSIX( pthread_create(&peer->p_outthr, NULL, out_thr, peer) );
184
185 return 0;
186 }
187
188 /* Stop that thread */
189 int fd_out_stop(struct fd_peer * peer)
190 {
191 TRACE_ENTRY("%p", peer);
192 CHECK_PARAMS( CHECK_PEER(peer) );
193
194 CHECK_FCT( fd_thr_term(&peer->p_outthr) );
195
196 return 0;
197 }
198
"Welcome to our mercurial repository"