Navigation


Changeset 649:5e5d8152c229 in freeDiameter for freeDiameter/p_sr.c


Ignore:
Timestamp:
Jan 5, 2011, 5:13:34 PM (13 years ago)
Author:
Sebastien Decugis <sdecugis@nict.go.jp>
Branch:
default
Phase:
public
Message:

Implemented fd_msg_send_timeout to close #10. Not tested yet.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • freeDiameter/p_sr.c

    r258 r649  
    4545        struct msg      *req;   /* A request that was sent and not yet answered. */
    4646        uint32_t        prevhbh;/* The value to set in the hbh header when the message is retrieved */
     47        struct fd_list  expire; /* the list of expiring requests */
     48        struct timespec added_on; /* the time the request was added */
    4749};
    4850
    49 /* Find an element in the list, or the following one */
     51/* Find an element in the hbh list, or the following one */
    5052static struct fd_list * find_or_next(struct fd_list * srlist, uint32_t hbh, int * match)
    5153{
     
    6668{
    6769        struct fd_list * li;
     70        struct timespec now;
    6871        if (!TRACE_BOOL(SR_DEBUG_LVL))
    6972                return;
     73        CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), );
    7074        fd_log_debug("%sSentReq list @%p:\n", text, srlist);
    7175        for (li = srlist->next; li != srlist; li = li->next) {
    7276                struct sentreq * sr = (struct sentreq *)li;
    7377                uint32_t * nexthbh = li->o;
    74                 fd_log_debug(" - Next req (%x):\n", *nexthbh);
     78                fd_log_debug(" - Next req (%x): [since %ld.%06ld sec]\n", *nexthbh,
     79                        (now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_sec - sr->added_on.tv_sec) : (now.tv_sec - sr->added_on.tv_sec - 1),
     80                        (now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_nsec - sr->added_on.tv_nsec) / 1000 : (now.tv_nsec - sr->added_on.tv_nsec + 1000000000) / 1000);
    7581                fd_msg_dump_one(SR_DEBUG_LVL + 1, sr->req);
    7682        }
    7783}
     84
     85/* (detached) thread that calls the anscb on expired messages.
     86  We do it in a separate thread to avoid blocking the reception of new messages during this time */
     87static void * call_anscb_expire(void * arg) {
     88        struct msg * expired_req = arg;
     89       
     90        void (*anscb)(void *, struct msg **);
     91        void * data;
     92       
     93        TRACE_ENTRY("%p", arg);
     94        CHECK_PARAMS_DO( arg, return NULL );
     95       
     96        /* Set the thread name */
     97        fd_log_threadname ( "Expired req cb." );
     98       
     99        /* Log */
     100        TRACE_DEBUG(INFO, "The expiration timer for a request has been reached, abording this attempt now & calling cb...");
     101       
     102        /* Retrieve callback in the message */
     103        CHECK_FCT_DO( fd_msg_anscb_get( expired_req, &anscb, &data ), return NULL);
     104        ASSERT(anscb);
     105
     106        /* Call it */
     107        (*anscb)(data, &expired_req);
     108       
     109        /* If the callback did not dispose of the message, do it now */
     110        if (expired_req) {
     111                CHECK_FCT_DO( fd_msg_free(expired_req), /* ignore */ );
     112        }
     113       
     114        /* Finish */
     115        return NULL;
     116}
     117
     118/* thread that handles messages expiring. The thread is started / stopped only when needed */
     119static void * sr_expiry_th(void * arg) {
     120        struct sr_list * srlist = arg;
     121        struct msg * expired_req;
     122        pthread_attr_t detached;
     123       
     124        TRACE_ENTRY("%p", arg);
     125        CHECK_PARAMS_DO( arg, return NULL );
     126       
     127        /* Set the thread name */
     128        {
     129                char buf[48];
     130                sprintf(buf, "ReqExp/%.*s", (int)sizeof(buf) - 8, ((struct fd_peer *)(srlist->exp.o))->p_hdr.info.pi_diamid);
     131                fd_log_threadname ( buf );
     132        }
     133       
     134        CHECK_POSIX_DO( pthread_attr_init(&detached), return NULL );
     135        CHECK_POSIX_DO( pthread_attr_setdetachstate(&detached, PTHREAD_CREATE_DETACHED), return NULL );
     136       
     137        CHECK_POSIX_DO( pthread_mutex_lock(&srlist->mtx),  return NULL );
     138        pthread_cleanup_push( fd_cleanup_mutex, &srlist->mtx );
     139       
     140        do {
     141                struct timespec now, *t;
     142                struct sentreq * first;
     143                pthread_t th;
     144               
     145                /* Check if there are expiring requests available */
     146                if (FD_IS_LIST_EMPTY(&srlist->exp)) {
     147                        /* Just wait for a change or cancelation */
     148                        CHECK_POSIX_DO( pthread_cond_wait( &srlist->cnd, &srlist->mtx ), goto error );
     149                        /* Restart the loop on wakeup */
     150                        continue;
     151                }
     152               
     153                /* Get the pointer to the request that expires first */
     154                first = (struct sentreq *)(srlist->exp.next->o);
     155                t = fd_msg_anscb_gettimeout( first->req );
     156                ASSERT(t);
     157               
     158                /* Get the current time */
     159                CHECK_SYS_DO(  clock_gettime(CLOCK_REALTIME, &now),  goto error  );
     160
     161                /* If first request is not expired, we just wait until it happens */
     162                if ( TS_IS_INFERIOR( &now, t ) ) {
     163                       
     164                        CHECK_POSIX_DO2(  pthread_cond_timedwait( &srlist->cnd, &srlist->mtx, t ), 
     165                                        ETIMEDOUT, /* ETIMEDOUT is a normal return value, continue */,
     166                                        /* on other error, */ goto error );
     167       
     168                        /* on wakeup, loop */
     169                        continue;
     170                }
     171               
     172                /* Now, the first request in the list is expired; remove it and call the anscb for it in a new thread */
     173                fd_list_unlink(&first->chain);
     174                fd_list_unlink(&first->expire);
     175                expired_req = first->req;
     176                free(first);
     177               
     178                CHECK_POSIX_DO( pthread_create( &th, &detached, call_anscb_expire, expired_req ), goto error );
     179
     180                /* loop */
     181        } while (1);
     182error: 
     183        pthread_cleanup_pop( 1 );
     184        return NULL;
     185}
     186
    78187
    79188/* Store a new sent request */
     
    83192        struct fd_list * next;
    84193        int match;
     194        struct timespec * ts;
    85195       
    86196        TRACE_ENTRY("%p %p %p %x", srlist, req, hbhloc, hbh_restore);
     
    92202        sr->req = *req;
    93203        sr->prevhbh = hbh_restore;
     204        fd_list_init(&sr->expire, sr);
     205        CHECK_SYS( clock_gettime(CLOCK_REALTIME, &sr->added_on) );
    94206       
    95207        /* Search the place in the list */
     
    107219        fd_list_insert_before(next, &sr->chain);
    108220        srl_dump("Saved new request, ", &srlist->srs);
     221       
     222        /* In case of request with a timeout, also store in the timeout list */
     223        ts = fd_msg_anscb_gettimeout( sr->req );
     224        if (ts) {
     225                struct fd_list * li;
     226                struct timespec * t;
     227               
     228                /* browse srlist->exp from the end */
     229                for (li = srlist->exp.prev; li != &srlist->exp; li = li->prev) {
     230                        struct sentreq * s = (struct sentreq *)(li->o);
     231                        t = fd_msg_anscb_gettimeout( s->req );
     232                        ASSERT( t ); /* sanity */
     233                        if (TS_IS_INFERIOR(t, ts))
     234                                break;
     235                }
     236               
     237                fd_list_insert_after(li, &sr->expire);
     238       
     239                /* if the thread does not exist yet, create it */
     240                if (srlist->thr == (pthread_t)NULL) {
     241                        CHECK_POSIX_DO( pthread_create(&srlist->thr, NULL, sr_expiry_th, srlist), /* continue anyway */);
     242                } else {
     243                        /* or, if added in first position, signal the condvar to update the sleep time of the thread */
     244                        if (li == &srlist->exp) {
     245                                CHECK_POSIX_DO( pthread_cond_signal(&srlist->cnd), /* continue anyway */);
     246                        }
     247                }
     248        }
     249       
    109250        CHECK_POSIX( pthread_mutex_unlock(&srlist->mtx) );
    110251        return 0;
     
    132273                /* Unlink */
    133274                fd_list_unlink(&sr->chain);
     275                fd_list_unlink(&sr->expire);
    134276                *req = sr->req;
    135277                free(sr);
    136278        }
    137279        CHECK_POSIX( pthread_mutex_unlock(&srlist->mtx) );
     280       
     281        /* do not stop the expire thread here, it might cause creating/destroying it very often otherwise */
    138282
    139283        /* Done */
     
    148292                struct sentreq * sr = (struct sentreq *)(srlist->srs.next);
    149293                fd_list_unlink(&sr->chain);
     294                fd_list_unlink(&sr->expire);
    150295                if (fd_msg_is_routable(sr->req)) {
    151296                        struct msg_hdr * hdr = NULL;
     
    165310                free(sr);
    166311        }
     312        /* The list of expiring requests must be empty now */
     313        ASSERT( FD_IS_LIST_EMPTY(&srlist->exp) );
     314       
    167315        CHECK_POSIX_DO( pthread_mutex_unlock(&srlist->mtx), /* continue anyway */ );
    168 }
    169 
     316       
     317        /* Terminate the expiry thread (must be done when the lock can be taken) */
     318        CHECK_FCT_DO( fd_thr_term(&srlist->thr), /* ignore error */ );
     319}
     320
Note: See TracChangeset for help on using the changeset viewer.