Navigation


source: freeDiameter/libfdcore/p_out.c @ 706:4ffbc9f1e922

Last change on this file since 706:4ffbc9f1e922 was 706:4ffbc9f1e922, checked in by Sebastien Decugis <sdecugis@nict.go.jp>, 11 years ago

Large UNTESTED commit with the following changes:

  • Improved DiameterIdentity? handling (esp. interationalization issues), and improve efficiency of some string operations in peers, sessions, and dictionary modules (closes #7)
  • Cleanup in the session module to free only unreferenced sessions (#16)
  • Removed fd_cpu_flush_cache(), replaced by more robust alternatives.
  • Improved peer state machine algorithm to counter SCTP multistream race condition.
File size: 6.6 KB
Line 
1/*********************************************************************************************************
2* Software License Agreement (BSD License)                                                               *
3* Author: Sebastien Decugis <sdecugis@nict.go.jp>                                                        *
4*                                                                                                        *
5* Copyright (c) 2011, WIDE Project and NICT                                                              *
6* All rights reserved.                                                                                   *
7*                                                                                                        *
8* Redistribution and use of this software in source and binary forms, with or without modification, are  *
9* permitted provided that the following conditions are met:                                              *
10*                                                                                                        *
11* * Redistributions of source code must retain the above                                                 *
12*   copyright notice, this list of conditions and the                                                    *
13*   following disclaimer.                                                                                *
14*                                                                                                        *
15* * Redistributions in binary form must reproduce the above                                              *
16*   copyright notice, this list of conditions and the                                                    *
17*   following disclaimer in the documentation and/or other                                               *
18*   materials provided with the distribution.                                                            *
19*                                                                                                        *
20* * Neither the name of the WIDE Project or NICT nor the                                                 *
21*   names of its contributors may be used to endorse or                                                  *
22*   promote products derived from this software without                                                  *
23*   specific prior written permission of WIDE Project and                                                *
24*   NICT.                                                                                                *
25*                                                                                                        *
26* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
27* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
28* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
29* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT     *
30* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS    *
31* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
32* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF   *
33* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.                                                             *
34*********************************************************************************************************/
35
36#include "fdcore-internal.h"
37
38/* Alloc a new hbh for requests, bufferize the message and send on the connection, save in sentreq if provided */
39static int do_send(struct msg ** msg, uint32_t flags, struct cnxctx * cnx, uint32_t * hbh, struct sr_list * srl)
40{
41        struct msg_hdr * hdr;
42        int msg_is_a_req;
43        uint8_t * buf;
44        size_t sz;
45        int ret;
46        uint32_t bkp_hbh = 0;
47       
48        TRACE_ENTRY("%p %x %p %p %p", msg, flags, cnx, hbh, srl);
49       
50        /* Retrieve the message header */
51        CHECK_FCT( fd_msg_hdr(*msg, &hdr) );
52       
53        msg_is_a_req = (hdr->msg_flags & CMD_FLAG_REQUEST);
54        if (msg_is_a_req) {
55                CHECK_PARAMS(hbh && srl);
56                /* Alloc the hop-by-hop id and increment the value for next message */
57                bkp_hbh = hdr->msg_hbhid;
58                hdr->msg_hbhid = *hbh;
59                *hbh = hdr->msg_hbhid + 1;
60        }
61       
62        /* Create the message buffer */
63        CHECK_FCT(fd_msg_bufferize( *msg, &buf, &sz ));
64        pthread_cleanup_push( free, buf );
65       
66        /* Log the message */
67        fd_msg_log( FD_MSG_LOG_SENT, *msg, "Sent to '%s'", fd_cnx_getid(cnx));
68       
69        /* Save a request before sending so that there is no race condition with the answer */
70        if (msg_is_a_req) {
71                CHECK_FCT_DO( ret = fd_p_sr_store(srl, msg, &hdr->msg_hbhid, bkp_hbh), goto out );
72        }
73       
74        /* Send the message */
75        CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz, flags), );
76out:
77        ;       
78        pthread_cleanup_pop(1);
79       
80        if (ret)
81                return ret;
82       
83        /* Free remaining messages (i.e. answers) */
84        if (*msg) {
85                CHECK_FCT( fd_msg_free(*msg) );
86                *msg = NULL;
87        }
88       
89        return 0;
90}
91
92static void cleanup_requeue(void * arg)
93{
94        struct msg *msg = arg;
95        CHECK_FCT_DO(fd_fifo_post(fd_g_outgoing, &msg),
96                {
97                        fd_msg_log( FD_MSG_LOG_DROPPED, msg, "An error occurred while attempting to requeue this message during cancellation of the sending function");
98                        CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */);
99                } );
100}
101
102/* The code of the "out" thread */
103static void * out_thr(void * arg)
104{
105        struct fd_peer * peer = arg;
106        ASSERT( CHECK_PEER(peer) );
107       
108        /* Set the thread name */
109        {
110                char buf[48];
111                snprintf(buf, sizeof(buf), "OUT/%s", peer->p_hdr.info.pi_diamid);
112                fd_log_threadname ( buf );
113        }
114       
115        /* Loop until cancelation */
116        while (1) {
117                struct msg * msg;
118                int ret;
119               
120                /* Retrieve next message to send */
121                CHECK_FCT_DO( fd_fifo_get(peer->p_tosend, &msg), goto error );
122               
123                /* Now if we are cancelled, we requeue this message */
124                pthread_cleanup_push(cleanup_requeue, msg);
125               
126                /* Send the message, log any error */
127                CHECK_FCT_DO( ret = do_send(&msg, 0, peer->p_cnxctx, &peer->p_hbh, &peer->p_sr),
128                        {
129                                if (msg) {
130                                        fd_msg_log( FD_MSG_LOG_DROPPED, msg, "Internal error: Problem while sending (%s)\n", strerror(ret) );
131                                        fd_msg_free(msg);
132                                }
133                        } );
134                       
135                /* Loop */
136                pthread_cleanup_pop(0);
137        }
138       
139error:
140        /* It is not really a connection error, but the effect is the same, we are not able to send anymore message */
141        CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ );
142        return NULL;
143}
144
145/* Wrapper to sending a message either by out thread (peer in OPEN state) or directly; cnx or peer must be provided. Flags are valid only for direct sending, not through thread (unused) */
146int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer, uint32_t flags)
147{
148        TRACE_ENTRY("%p %p %p %x", msg, cnx, peer, flags);
149        CHECK_PARAMS( msg && *msg && (cnx || (peer && peer->p_cnxctx)));
150       
151        if (fd_peer_getstate(peer) == STATE_OPEN) {
152                /* Normal case: just queue for the out thread to pick it up */
153                CHECK_FCT( fd_fifo_post(peer->p_tosend, msg) );
154               
155        } else {
156                int ret;
157                uint32_t *hbh = NULL;
158               
159                /* In other cases, the thread is not running, so we handle the sending directly */
160                if (peer)
161                        hbh = &peer->p_hbh;
162
163                if (!cnx)
164                        cnx = peer->p_cnxctx;
165
166                /* Do send the message */
167                CHECK_FCT_DO( ret = do_send(msg, flags, cnx, hbh, peer ? &peer->p_sr : NULL),
168                        {
169                                if (msg) {
170                                        fd_msg_log( FD_MSG_LOG_DROPPED, *msg, "Internal error: Problem while sending (%s)\n", strerror(ret) );
171                                        fd_msg_free(*msg);
172                                        *msg = NULL;
173                                }
174                        } );
175        }
176       
177        return 0;
178}
179
180/* Start the "out" thread that picks messages in p_tosend and send them on p_cnxctx */
181int fd_out_start(struct fd_peer * peer)
182{
183        TRACE_ENTRY("%p", peer);
184        CHECK_PARAMS( CHECK_PEER(peer) && (peer->p_outthr == (pthread_t)NULL) );
185       
186        CHECK_POSIX( pthread_create(&peer->p_outthr, NULL, out_thr, peer) );
187       
188        return 0;
189}
190
191/* Stop that thread */
192int fd_out_stop(struct fd_peer * peer)
193{
194        TRACE_ENTRY("%p", peer);
195        CHECK_PARAMS( CHECK_PEER(peer) );
196       
197        CHECK_FCT( fd_thr_term(&peer->p_outthr) );
198       
199        return 0;
200}
201               
Note: See TracBrowser for help on using the repository browser.