Navigation


source: freeDiameter/libfdcore/p_sr.c @ 691:78b665400097

Last change on this file since 691:78b665400097 was 691:78b665400097, checked in by Sebastien Decugis <sdecugis@nict.go.jp>, 11 years ago

Cleanup all pthread_cleanup_push / pop pairs so that pop is always called after push, or ASSERT(0) is some grave errors

File size: 11.5 KB
Line 
1/*********************************************************************************************************
2* Software License Agreement (BSD License)                                                               *
3* Author: Sebastien Decugis <sdecugis@nict.go.jp>                                                        *
4*                                                                                                        *
5* Copyright (c) 2011, WIDE Project and NICT                                                              *
6* All rights reserved.                                                                                   *
7*                                                                                                        *
8* Redistribution and use of this software in source and binary forms, with or without modification, are  *
9* permitted provided that the following conditions are met:                                              *
10*                                                                                                        *
11* * Redistributions of source code must retain the above                                                 *
12*   copyright notice, this list of conditions and the                                                    *
13*   following disclaimer.                                                                                *
14*                                                                                                        *
15* * Redistributions in binary form must reproduce the above                                              *
16*   copyright notice, this list of conditions and the                                                    *
17*   following disclaimer in the documentation and/or other                                               *
18*   materials provided with the distribution.                                                            *
19*                                                                                                        *
20* * Neither the name of the WIDE Project or NICT nor the                                                 *
21*   names of its contributors may be used to endorse or                                                  *
22*   promote products derived from this software without                                                  *
23*   specific prior written permission of WIDE Project and                                                *
24*   NICT.                                                                                                *
25*                                                                                                        *
26* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
27* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
28* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
29* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT     *
30* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS    *
31* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
32* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF   *
33* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.                                                             *
34*********************************************************************************************************/
35
36#include "fdcore-internal.h"
37
38#ifndef SR_DEBUG_LVL
39#define SR_DEBUG_LVL ANNOYING
40#endif /* SR_DEBUG_LVL */
41
42/* Structure to store a sent request */
43struct sentreq {
44        struct fd_list  chain;  /* the "o" field points directly to the hop-by-hop of the request (uint32_t *)  */
45        struct msg      *req;   /* A request that was sent and not yet answered. */
46        uint32_t        prevhbh;/* The value to set in the hbh header when the message is retrieved */
47        struct fd_list  expire; /* the list of expiring requests */
48        struct timespec added_on; /* the time the request was added */
49};
50
51/* Find an element in the hbh list, or the following one */
52static struct fd_list * find_or_next(struct fd_list * srlist, uint32_t hbh, int * match)
53{
54        struct fd_list * li;
55        *match = 0;
56        for (li = srlist->next; li != srlist; li = li->next) {
57                uint32_t * nexthbh = li->o;
58                if (*nexthbh < hbh)
59                        continue;
60                if (*nexthbh == hbh)
61                        *match = 1;
62                break;
63        }
64        return li;
65}
66
67static void srl_dump(const char * text, struct fd_list * srlist)
68{
69        struct fd_list * li;
70        struct timespec now;
71        if (!TRACE_BOOL(SR_DEBUG_LVL))
72                return;
73        CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), );
74        fd_log_debug("%sSentReq list @%p:\n", text, srlist);
75        for (li = srlist->next; li != srlist; li = li->next) {
76                struct sentreq * sr = (struct sentreq *)li;
77                uint32_t * nexthbh = li->o;
78                fd_log_debug(" - Next req (%x): [since %ld.%06ld sec]\n", *nexthbh, 
79                        (now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_sec - sr->added_on.tv_sec) : (now.tv_sec - sr->added_on.tv_sec - 1),
80                        (now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_nsec - sr->added_on.tv_nsec) / 1000 : (now.tv_nsec - sr->added_on.tv_nsec + 1000000000) / 1000);
81                fd_msg_dump_one(SR_DEBUG_LVL + 1, sr->req);
82        }
83}
84
85/* (detached) thread that calls the anscb on expired messages.
86  We do it in a separate thread to avoid blocking the reception of new messages during this time */
87static void * call_anscb_expire(void * arg) {
88        struct msg * expired_req = arg;
89       
90        void (*anscb)(void *, struct msg **);
91        void * data;
92       
93        TRACE_ENTRY("%p", arg);
94        CHECK_PARAMS_DO( arg, return NULL );
95       
96        /* Set the thread name */
97        fd_log_threadname ( "Expired req cb." );
98       
99        /* Log */
100        TRACE_DEBUG(INFO, "The expiration timer for a request has been reached, abording this attempt now & calling cb...");
101       
102        /* Retrieve callback in the message */
103        CHECK_FCT_DO( fd_msg_anscb_get( expired_req, &anscb, &data ), return NULL);
104        ASSERT(anscb);
105
106        /* Call it */
107        (*anscb)(data, &expired_req);
108       
109        /* If the callback did not dispose of the message, do it now */
110        if (expired_req) {
111                fd_msg_log(FD_MSG_LOG_DROPPED, expired_req, "Expiration period completed without an answer, and the expiry callback did not dispose of the message.");
112                CHECK_FCT_DO( fd_msg_free(expired_req), /* ignore */ );
113        }
114       
115        /* Finish */
116        return NULL;
117}
118
119/* thread that handles messages expiring. The thread is started / cancelled only when needed */
120static void * sr_expiry_th(void * arg) {
121        struct sr_list * srlist = arg;
122        struct msg * expired_req;
123        pthread_attr_t detached;
124       
125        TRACE_ENTRY("%p", arg);
126        CHECK_PARAMS_DO( arg, return NULL );
127       
128        /* Set the thread name */
129        {
130                char buf[48];
131                sprintf(buf, "ReqExp/%.*s", (int)sizeof(buf) - 8, ((struct fd_peer *)(srlist->exp.o))->p_hdr.info.pi_diamid);
132                fd_log_threadname ( buf );
133        }
134       
135        CHECK_POSIX_DO( pthread_attr_init(&detached), return NULL );
136        CHECK_POSIX_DO( pthread_attr_setdetachstate(&detached, PTHREAD_CREATE_DETACHED), return NULL );
137       
138        CHECK_POSIX_DO( pthread_mutex_lock(&srlist->mtx),  return NULL );
139        pthread_cleanup_push( fd_cleanup_mutex, &srlist->mtx );
140       
141        do {
142                struct timespec now, *t;
143                struct sentreq * first;
144                pthread_t th;
145               
146                /* Check if there are expiring requests available */
147                if (FD_IS_LIST_EMPTY(&srlist->exp)) {
148                        /* Just wait for a change or cancelation */
149                        CHECK_POSIX_DO( pthread_cond_wait( &srlist->cnd, &srlist->mtx ), goto error );
150                        /* Restart the loop on wakeup */
151                        continue;
152                }
153               
154                /* Get the pointer to the request that expires first */
155                first = (struct sentreq *)(srlist->exp.next->o);
156                t = fd_msg_anscb_gettimeout( first->req );
157                ASSERT(t);
158               
159                /* Get the current time */
160                CHECK_SYS_DO(  clock_gettime(CLOCK_REALTIME, &now),  goto error  );
161
162                /* If first request is not expired, we just wait until it happens */
163                if ( TS_IS_INFERIOR( &now, t ) ) {
164                       
165                        CHECK_POSIX_DO2(  pthread_cond_timedwait( &srlist->cnd, &srlist->mtx, t ), 
166                                        ETIMEDOUT, /* ETIMEDOUT is a normal return value, continue */,
167                                        /* on other error, */ goto error );
168       
169                        /* on wakeup, loop */
170                        continue;
171                }
172               
173                /* Now, the first request in the list is expired; remove it and call the anscb for it in a new thread */
174                fd_list_unlink(&first->chain);
175                fd_list_unlink(&first->expire);
176                expired_req = first->req;
177                free(first);
178               
179                CHECK_POSIX_DO( pthread_create( &th, &detached, call_anscb_expire, expired_req ), goto error );
180
181                /* loop */
182        } while (1);
183error: 
184        ; /* pthread_cleanup_pop sometimes expands as "} ..." and the label beofre this cause some compilers to complain... */
185        pthread_cleanup_pop( 1 );
186        ASSERT(0); /* we have encountered a problem, maybe time to signal the framework to terminate? */
187        return NULL;
188}
189
190
191/* Store a new sent request */
192int fd_p_sr_store(struct sr_list * srlist, struct msg **req, uint32_t *hbhloc, uint32_t hbh_restore)
193{
194        struct sentreq * sr;
195        struct fd_list * next;
196        int match;
197        struct timespec * ts;
198       
199        TRACE_ENTRY("%p %p %p %x", srlist, req, hbhloc, hbh_restore);
200        CHECK_PARAMS(srlist && req && *req && hbhloc);
201       
202        CHECK_MALLOC( sr = malloc(sizeof(struct sentreq)) );
203        memset(sr, 0, sizeof(struct sentreq));
204        fd_list_init(&sr->chain, hbhloc);
205        sr->req = *req;
206        sr->prevhbh = hbh_restore;
207        fd_list_init(&sr->expire, sr);
208        CHECK_SYS( clock_gettime(CLOCK_REALTIME, &sr->added_on) );
209       
210        /* Search the place in the list */
211        CHECK_POSIX( pthread_mutex_lock(&srlist->mtx) );
212        next = find_or_next(&srlist->srs, *hbhloc, &match);
213        if (match) {
214                TRACE_DEBUG(INFO, "A request with the same hop-by-hop Id was already sent: error");
215                free(sr);
216                CHECK_POSIX_DO( pthread_mutex_unlock(&srlist->mtx), /* ignore */ );
217                return EINVAL;
218        }
219       
220        /* Save in the list */
221        *req = NULL;
222        fd_list_insert_before(next, &sr->chain);
223        srl_dump("Saved new request, ", &srlist->srs);
224       
225        /* In case of request with a timeout, also store in the timeout list */
226        ts = fd_msg_anscb_gettimeout( sr->req );
227        if (ts) {
228                struct fd_list * li;
229                struct timespec * t;
230               
231                /* browse srlist->exp from the end */
232                for (li = srlist->exp.prev; li != &srlist->exp; li = li->prev) {
233                        struct sentreq * s = (struct sentreq *)(li->o);
234                        t = fd_msg_anscb_gettimeout( s->req );
235                        ASSERT( t ); /* sanity */
236                        if (TS_IS_INFERIOR(t, ts))
237                                break;
238                }
239               
240                fd_list_insert_after(li, &sr->expire);
241       
242                /* if the thread does not exist yet, create it */
243                if (srlist->thr == (pthread_t)NULL) {
244                        CHECK_POSIX_DO( pthread_create(&srlist->thr, NULL, sr_expiry_th, srlist), /* continue anyway */);
245                } else {
246                        /* or, if added in first position, signal the condvar to update the sleep time of the thread */
247                        if (li == &srlist->exp) {
248                                CHECK_POSIX_DO( pthread_cond_signal(&srlist->cnd), /* continue anyway */);
249                        }
250                }
251        }
252       
253        CHECK_POSIX( pthread_mutex_unlock(&srlist->mtx) );
254        return 0;
255}
256
257/* Fetch a request by hbh */
258int fd_p_sr_fetch(struct sr_list * srlist, uint32_t hbh, struct msg **req)
259{
260        struct sentreq * sr;
261        int match;
262       
263        TRACE_ENTRY("%p %x %p", srlist, hbh, req);
264        CHECK_PARAMS(srlist && req);
265       
266        /* Search the request in the list */
267        CHECK_POSIX( pthread_mutex_lock(&srlist->mtx) );
268        srl_dump("Fetching a request, ", &srlist->srs);
269        sr = (struct sentreq *)find_or_next(&srlist->srs, hbh, &match);
270        if (!match) {
271                TRACE_DEBUG(INFO, "There is no saved request with this hop-by-hop id (%x)", hbh);
272                *req = NULL;
273        } else {
274                /* Restore hop-by-hop id */
275                *((uint32_t *)sr->chain.o) = sr->prevhbh;
276                /* Unlink */
277                fd_list_unlink(&sr->chain);
278                fd_list_unlink(&sr->expire);
279                *req = sr->req;
280                free(sr);
281        }
282        CHECK_POSIX( pthread_mutex_unlock(&srlist->mtx) );
283       
284        /* do not stop the expire thread here, it might cause creating/destroying it very often otherwise */
285
286        /* Done */
287        return 0;
288}
289
290/* Failover requests (free or requeue routables) */
291void fd_p_sr_failover(struct sr_list * srlist)
292{
293        CHECK_POSIX_DO( pthread_mutex_lock(&srlist->mtx), /* continue anyway */ );
294        while (!FD_IS_LIST_EMPTY(&srlist->srs)) {
295                struct sentreq * sr = (struct sentreq *)(srlist->srs.next);
296                fd_list_unlink(&sr->chain);
297                fd_list_unlink(&sr->expire);
298                if (fd_msg_is_routable(sr->req)) {
299                        struct msg_hdr * hdr = NULL;
300                        int ret;
301                       
302                        /* Set the 'T' flag */
303                        CHECK_FCT_DO(fd_msg_hdr(sr->req, &hdr), /* continue */);
304                        if (hdr)
305                                hdr->msg_flags |= CMD_FLAG_RETRANSMIT;
306                       
307                        /* Requeue for sending to another peer */
308                        CHECK_FCT_DO( ret = fd_fifo_post(fd_g_outgoing, &sr->req),
309                                {
310                                        fd_msg_log( FD_MSG_LOG_DROPPED, sr->req, "Internal error: error while requeuing during failover: %s", strerror(ret) );
311                                        CHECK_FCT_DO(fd_msg_free(sr->req), /* What can we do more? */)
312                                });
313                } else {
314                        /* Just free the request. */
315                        fd_msg_log( FD_MSG_LOG_DROPPED, sr->req, "Local message discarded during failover" );
316                        CHECK_FCT_DO(fd_msg_free(sr->req), /* Ignore */);
317                }
318                free(sr);
319        }
320        /* The list of expiring requests must be empty now */
321        ASSERT( FD_IS_LIST_EMPTY(&srlist->exp) );
322       
323        CHECK_POSIX_DO( pthread_mutex_unlock(&srlist->mtx), /* continue anyway */ );
324       
325        /* Terminate the expiry thread (must be done when the lock can be taken) */
326        CHECK_FCT_DO( fd_thr_term(&srlist->thr), /* ignore error */ );
327}
328
Note: See TracBrowser for help on using the repository browser.