Navigation


Changeset 649:5e5d8152c229 in freeDiameter


Ignore:
Timestamp:
Jan 5, 2011 5:13:34 PM (2 years ago)
Author:
Sebastien Decugis <sdecugis@nict.go.jp>
Branch:
default
Message:

Implemented fd_msg_send_timeout to close #10. Not tested yet.

Files:
9 edited

Legend:

Unmodified
Added
Removed
  • contrib/debian/changelog

    r647 r649  
    11freediameter (1.0.4) UNRELEASED; urgency=low 
    22 
    3   * Added new option to specify timeout on receiving answer (#10)  
     3  * Added new API to specify timeout on receiving answer (#10)  
    44  * Bumped API version number accordingly. 
    55 
  • doc/dbg_interactive.py.sample

    r641 r649  
    464464mydwr.send() 
    465465 
    466 # Optionaly, a callback can be registered when a message is sent, with an optional object. 
     466# Optionaly, a callback can be registered when a request is sent, with an optional object. 
    467467# This callback takes the answer message as parameter and should return None or a message. (cf. fd_msg_send) 
    468468def send_callback(msg, obj): 
     
    475475mydwr = msg(buf) 
    476476mydwr.send(send_callback, some_object) 
     477 
     478# Again optionaly, a time limit can be specified in this case as follow: 
     479mydwr.send(send_callback, some_object, 10) 
     480# In that case, if no answer / error is received after 10 seconds (the value specified),  
     481# the callback is called with the request as parameter. 
     482# Testing for timeout case is done by using msg.is_request() 
     483def send_callback(msg, obj): 
     484    if (msg.is_request()): 
     485        print "Request timed out without answer:" 
     486    else: 
     487        print "Received answer:" 
     488    msg.dump() 
     489    print "Associated data:" 
     490    obj 
     491    return None 
    477492 
    478493 
  • extensions/dbg_interactive/messages.i

    r640 r649  
    5555        } 
    5656         
    57         SWIG_PYTHON_THREAD_BEGIN_BLOCK; 
    58          
    59         if (!msg || !*msg) { 
    60                 PyMsg = Py_None; 
    61         } else { 
    62                 PyMsg = SWIG_NewPointerObj((void *)*msg,     SWIGTYPE_p_msg,     0 ); 
    63         } 
    64          
    65         result = PyEval_CallFunction(l->cb, "(OO)", PyMsg, l->data); 
    66         Py_XDECREF(l->cb); 
    67         Py_XDECREF(l->data); 
    68         free(l); 
    69          
    70         /* The callback is supposed to return a message or NULL */ 
    71         if (!SWIG_IsOK(SWIG_ConvertPtr(result, (void *)msg, SWIGTYPE_p_msg, SWIG_POINTER_DISOWN))) { 
    72                 fd_log_debug("Error: Cannot convert the return value to message.\n"); 
    73                 *msg = NULL; 
    74         } 
    75          
    76         Py_XDECREF(result); 
    77          
    78         SWIG_PYTHON_THREAD_END_BLOCK; 
    79          
     57        if (l->cb) { 
     58         
     59                SWIG_PYTHON_THREAD_BEGIN_BLOCK; 
     60 
     61                if (!msg || !*msg) { 
     62                        PyMsg = Py_None; 
     63                } else { 
     64                        PyMsg = SWIG_NewPointerObj((void *)*msg,     SWIGTYPE_p_msg,     0 ); 
     65                } 
     66 
     67                result = PyEval_CallFunction(l->cb, "(OO)", PyMsg, l->data); 
     68                Py_XDECREF(l->cb); 
     69                Py_XDECREF(l->data); 
     70                free(l); 
     71 
     72                /* The callback is supposed to return a message or NULL */ 
     73                if (!SWIG_IsOK(SWIG_ConvertPtr(result, (void *)msg, SWIGTYPE_p_msg, SWIG_POINTER_DISOWN))) { 
     74                        fd_log_debug("Error: Cannot convert the return value to message.\n"); 
     75                        *msg = NULL; 
     76                } 
     77 
     78                Py_XDECREF(result); 
     79 
     80                SWIG_PYTHON_THREAD_END_BLOCK; 
     81                 
     82        } 
     83        /* else */ 
     84                /* Only the timeout was specified, without a callback */ 
     85                /* in this case, just delete the message */ 
     86                /* it actually happens automatically when we do nothing. */ 
    8087} 
    8188%} 
     
    121128        /* SEND THE MESSAGE */ 
    122129        %delobject send; /* when this has been called, the msg must not be freed anymore */ 
    123         void send(PyObject * PyCb = NULL, PyObject * data = NULL) { 
     130        void send(PyObject * PyCb = NULL, PyObject * data = NULL, unsigned int timeout = 0) { 
    124131                int ret; 
    125132                struct msg * m = $self; 
    126133                struct anscb_py_layer * l = NULL; 
    127134                 
    128                 if (PyCb) { 
     135                if (PyCb || timeout) { 
    129136                        l = malloc(sizeof(struct anscb_py_layer)); 
    130137                        if (!l) { 
     
    139146                } 
    140147                 
    141                 ret = fd_msg_send(&m, PyCb ? anscb_python : NULL, l); 
     148                if (timeout) { 
     149                        struct timespec ts; 
     150                        (void) clock_gettime(CLOCK_REALTIME, &ts); 
     151                        ts.tv_sec += timeout; 
     152                        ret = fd_msg_send_timeout(&m, anscb_python, l, &ts); 
     153                } else { 
     154                        ret = fd_msg_send(&m, PyCb ? anscb_python : NULL, l); 
     155                } 
    142156                if (ret != 0) { 
    143157                        DI_ERROR(ret, NULL, NULL); 
     
    303317        } 
    304318         
     319        /* Is request? (shortcut) */ 
     320        PyObject * is_request() { 
     321                PyObject * r; 
     322                int ret; 
     323                struct msg_hdr * h; 
     324                 
     325                ret = fd_msg_hdr($self, &h); 
     326                if (ret != 0) { 
     327                        DI_ERROR(ret, NULL, NULL); 
     328                } 
     329                if (h->msg_flags & CMD_FLAG_REQUEST)  
     330                        r = Py_True; 
     331                else 
     332                        r = Py_False; 
     333                Py_XINCREF(r); 
     334                return r; 
     335        } 
     336         
    305337        /* Get the source */ 
    306338        char *source() { 
  • freeDiameter/fD.h

    r447 r649  
    118118/* Sentinel for the sent requests list */ 
    119119struct sr_list { 
    120         struct fd_list  srs; 
    121         pthread_mutex_t mtx; 
     120        struct fd_list  srs; /* requests ordered by hop-by-hop id */ 
     121        struct fd_list  exp; /* requests that have a timeout set, ordered by timeout */ 
     122        pthread_mutex_t mtx; /* mutex to protect these lists */ 
     123        pthread_cond_t  cnd; /* cond var used by the thread that handles timeouts */ 
     124        pthread_t       thr; /* the thread that handles timeouts (and calls the anscb) */ 
    122125}; 
    123126 
     
    290293int fd_p_sr_store(struct sr_list * srlist, struct msg **req, uint32_t *hbhloc, uint32_t hbh_restore); 
    291294int fd_p_sr_fetch(struct sr_list * srlist, uint32_t hbh, struct msg **req); 
     295int fd_p_sr_start(struct sr_list * srlist); 
     296int fd_p_sr_stop(struct sr_list * srlist); 
    292297void fd_p_sr_failover(struct sr_list * srlist); 
    293298 
  • freeDiameter/p_sr.c

    r258 r649  
    4545        struct msg      *req;   /* A request that was sent and not yet answered. */ 
    4646        uint32_t        prevhbh;/* The value to set in the hbh header when the message is retrieved */ 
     47        struct fd_list  expire; /* the list of expiring requests */ 
     48        struct timespec added_on; /* the time the request was added */ 
    4749}; 
    4850 
    49 /* Find an element in the list, or the following one */ 
     51/* Find an element in the hbh list, or the following one */ 
    5052static struct fd_list * find_or_next(struct fd_list * srlist, uint32_t hbh, int * match) 
    5153{ 
     
    6668{ 
    6769        struct fd_list * li; 
     70        struct timespec now; 
    6871        if (!TRACE_BOOL(SR_DEBUG_LVL)) 
    6972                return; 
     73        CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), ); 
    7074        fd_log_debug("%sSentReq list @%p:\n", text, srlist); 
    7175        for (li = srlist->next; li != srlist; li = li->next) { 
    7276                struct sentreq * sr = (struct sentreq *)li; 
    7377                uint32_t * nexthbh = li->o; 
    74                 fd_log_debug(" - Next req (%x):\n", *nexthbh); 
     78                fd_log_debug(" - Next req (%x): [since %ld.%06ld sec]\n", *nexthbh,  
     79                        (now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_sec - sr->added_on.tv_sec) : (now.tv_sec - sr->added_on.tv_sec - 1), 
     80                        (now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_nsec - sr->added_on.tv_nsec) / 1000 : (now.tv_nsec - sr->added_on.tv_nsec + 1000000000) / 1000); 
    7581                fd_msg_dump_one(SR_DEBUG_LVL + 1, sr->req); 
    7682        } 
    7783} 
     84 
     85/* (detached) thread that calls the anscb on expired messages.  
     86  We do it in a separate thread to avoid blocking the reception of new messages during this time */ 
     87static void * call_anscb_expire(void * arg) { 
     88        struct msg * expired_req = arg; 
     89         
     90        void (*anscb)(void *, struct msg **); 
     91        void * data; 
     92         
     93        TRACE_ENTRY("%p", arg); 
     94        CHECK_PARAMS_DO( arg, return NULL ); 
     95         
     96        /* Set the thread name */ 
     97        fd_log_threadname ( "Expired req cb." ); 
     98         
     99        /* Log */ 
     100        TRACE_DEBUG(INFO, "The expiration timer for a request has been reached, abording this attempt now & calling cb..."); 
     101         
     102        /* Retrieve callback in the message */ 
     103        CHECK_FCT_DO( fd_msg_anscb_get( expired_req, &anscb, &data ), return NULL); 
     104        ASSERT(anscb); 
     105 
     106        /* Call it */ 
     107        (*anscb)(data, &expired_req); 
     108         
     109        /* If the callback did not dispose of the message, do it now */ 
     110        if (expired_req) { 
     111                CHECK_FCT_DO( fd_msg_free(expired_req), /* ignore */ ); 
     112        } 
     113         
     114        /* Finish */ 
     115        return NULL; 
     116} 
     117 
     118/* thread that handles messages expiring. The thread is started / stopped only when needed */ 
     119static void * sr_expiry_th(void * arg) { 
     120        struct sr_list * srlist = arg; 
     121        struct msg * expired_req; 
     122        pthread_attr_t detached; 
     123         
     124        TRACE_ENTRY("%p", arg); 
     125        CHECK_PARAMS_DO( arg, return NULL ); 
     126         
     127        /* Set the thread name */ 
     128        { 
     129                char buf[48]; 
     130                sprintf(buf, "ReqExp/%.*s", (int)sizeof(buf) - 8, ((struct fd_peer *)(srlist->exp.o))->p_hdr.info.pi_diamid); 
     131                fd_log_threadname ( buf ); 
     132        } 
     133         
     134        CHECK_POSIX_DO( pthread_attr_init(&detached), return NULL ); 
     135        CHECK_POSIX_DO( pthread_attr_setdetachstate(&detached, PTHREAD_CREATE_DETACHED), return NULL ); 
     136         
     137        CHECK_POSIX_DO( pthread_mutex_lock(&srlist->mtx),  return NULL ); 
     138        pthread_cleanup_push( fd_cleanup_mutex, &srlist->mtx ); 
     139         
     140        do { 
     141                struct timespec now, *t; 
     142                struct sentreq * first; 
     143                pthread_t th; 
     144                 
     145                /* Check if there are expiring requests available */ 
     146                if (FD_IS_LIST_EMPTY(&srlist->exp)) { 
     147                        /* Just wait for a change or cancelation */ 
     148                        CHECK_POSIX_DO( pthread_cond_wait( &srlist->cnd, &srlist->mtx ), goto error ); 
     149                        /* Restart the loop on wakeup */ 
     150                        continue; 
     151                } 
     152                 
     153                /* Get the pointer to the request that expires first */ 
     154                first = (struct sentreq *)(srlist->exp.next->o); 
     155                t = fd_msg_anscb_gettimeout( first->req ); 
     156                ASSERT(t); 
     157                 
     158                /* Get the current time */ 
     159                CHECK_SYS_DO(  clock_gettime(CLOCK_REALTIME, &now),  goto error  ); 
     160 
     161                /* If first request is not expired, we just wait until it happens */ 
     162                if ( TS_IS_INFERIOR( &now, t ) ) { 
     163                         
     164                        CHECK_POSIX_DO2(  pthread_cond_timedwait( &srlist->cnd, &srlist->mtx, t ),   
     165                                        ETIMEDOUT, /* ETIMEDOUT is a normal return value, continue */, 
     166                                        /* on other error, */ goto error ); 
     167         
     168                        /* on wakeup, loop */ 
     169                        continue; 
     170                } 
     171                 
     172                /* Now, the first request in the list is expired; remove it and call the anscb for it in a new thread */ 
     173                fd_list_unlink(&first->chain); 
     174                fd_list_unlink(&first->expire); 
     175                expired_req = first->req; 
     176                free(first); 
     177                 
     178                CHECK_POSIX_DO( pthread_create( &th, &detached, call_anscb_expire, expired_req ), goto error ); 
     179 
     180                /* loop */ 
     181        } while (1); 
     182error:   
     183        pthread_cleanup_pop( 1 ); 
     184        return NULL; 
     185} 
     186 
    78187 
    79188/* Store a new sent request */ 
     
    83192        struct fd_list * next; 
    84193        int match; 
     194        struct timespec * ts; 
    85195         
    86196        TRACE_ENTRY("%p %p %p %x", srlist, req, hbhloc, hbh_restore); 
     
    92202        sr->req = *req; 
    93203        sr->prevhbh = hbh_restore; 
     204        fd_list_init(&sr->expire, sr); 
     205        CHECK_SYS( clock_gettime(CLOCK_REALTIME, &sr->added_on) ); 
    94206         
    95207        /* Search the place in the list */ 
     
    107219        fd_list_insert_before(next, &sr->chain); 
    108220        srl_dump("Saved new request, ", &srlist->srs); 
     221         
     222        /* In case of request with a timeout, also store in the timeout list */ 
     223        ts = fd_msg_anscb_gettimeout( sr->req ); 
     224        if (ts) { 
     225                struct fd_list * li; 
     226                struct timespec * t; 
     227                 
     228                /* browse srlist->exp from the end */ 
     229                for (li = srlist->exp.prev; li != &srlist->exp; li = li->prev) { 
     230                        struct sentreq * s = (struct sentreq *)(li->o); 
     231                        t = fd_msg_anscb_gettimeout( s->req ); 
     232                        ASSERT( t ); /* sanity */ 
     233                        if (TS_IS_INFERIOR(t, ts)) 
     234                                break; 
     235                } 
     236                 
     237                fd_list_insert_after(li, &sr->expire); 
     238         
     239                /* if the thread does not exist yet, create it */ 
     240                if (srlist->thr == (pthread_t)NULL) { 
     241                        CHECK_POSIX_DO( pthread_create(&srlist->thr, NULL, sr_expiry_th, srlist), /* continue anyway */); 
     242                } else { 
     243                        /* or, if added in first position, signal the condvar to update the sleep time of the thread */ 
     244                        if (li == &srlist->exp) { 
     245                                CHECK_POSIX_DO( pthread_cond_signal(&srlist->cnd), /* continue anyway */); 
     246                        } 
     247                } 
     248        } 
     249         
    109250        CHECK_POSIX( pthread_mutex_unlock(&srlist->mtx) ); 
    110251        return 0; 
     
    132273                /* Unlink */ 
    133274                fd_list_unlink(&sr->chain); 
     275                fd_list_unlink(&sr->expire); 
    134276                *req = sr->req; 
    135277                free(sr); 
    136278        } 
    137279        CHECK_POSIX( pthread_mutex_unlock(&srlist->mtx) ); 
     280         
     281        /* do not stop the expire thread here, it might cause creating/destroying it very often otherwise */ 
    138282 
    139283        /* Done */ 
     
    148292                struct sentreq * sr = (struct sentreq *)(srlist->srs.next); 
    149293                fd_list_unlink(&sr->chain); 
     294                fd_list_unlink(&sr->expire); 
    150295                if (fd_msg_is_routable(sr->req)) { 
    151296                        struct msg_hdr * hdr = NULL; 
     
    165310                free(sr); 
    166311        } 
     312        /* The list of expiring requests must be empty now */ 
     313        ASSERT( FD_IS_LIST_EMPTY(&srlist->exp) ); 
     314         
    167315        CHECK_POSIX_DO( pthread_mutex_unlock(&srlist->mtx), /* continue anyway */ ); 
    168 } 
    169  
     316         
     317        /* Terminate the expiry thread (must be done when the lock can be taken) */ 
     318        CHECK_FCT_DO( fd_thr_term(&srlist->thr), /* ignore error */ ); 
     319} 
     320 
  • freeDiameter/peers.c

    r454 r649  
    7979         
    8080        fd_list_init(&p->p_sr.srs, p); 
     81        fd_list_init(&p->p_sr.exp, p); 
    8182        CHECK_POSIX( pthread_mutex_init(&p->p_sr.mtx, NULL) ); 
     83        CHECK_POSIX( pthread_cond_init(&p->p_sr.cnd, NULL) ); 
    8284         
    8385        fd_list_init(&p->p_connparams, p); 
     
    251253        CHECK_FCT_DO( fd_fifo_del(&p->p_tosend), /* continue */ ); 
    252254        CHECK_POSIX_DO( pthread_mutex_destroy(&p->p_sr.mtx), /* continue */); 
     255        CHECK_POSIX_DO( pthread_cond_destroy(&p->p_sr.cnd), /* continue */); 
    253256         
    254257        /* If the callback is still around... */ 
  • freeDiameter/routing_dispatch.c

    r563 r649  
    799799        CHECK_FCT( fd_msg_hdr(*pmsg, &hdr) ); 
    800800        is_req = hdr->msg_flags & CMD_FLAG_REQUEST; 
    801                  
     801         
    802802        /* For answers, the routing is very easy */ 
    803803        if ( ! is_req ) { 
  • include/freeDiameter/libfreeDiameter.h

    r648 r649  
    21472147int fd_msg_anscb_associate( struct msg * msg, void ( *anscb)(void *, struct msg **), void  * data, const struct timespec *timeout ); 
    21482148int fd_msg_anscb_get      ( struct msg * msg, void (**anscb)(void *, struct msg **), void ** data ); 
     2149struct timespec *fd_msg_anscb_gettimeout( struct msg * msg ); /* returns NULL or a valid non-0 timespec */ 
    21492150 
    21502151/* 
  • libfreeDiameter/messages.c

    r648 r649  
    919919        if (timeout) { 
    920920                memcpy(&msg->msg_cb.timeout, timeout, sizeof(struct timespec)); 
    921         } else { 
    922                 memset(&msg->msg_cb.timeout, 0, sizeof(struct timespec)); /* clear the area */ 
    923921        } 
    924922         
     
    938936         
    939937        return 0; 
    940 }        
     938} 
     939 
     940struct timespec *fd_msg_anscb_gettimeout( struct msg * msg ) 
     941{ 
     942        TRACE_ENTRY("%p", msg); 
     943         
     944        /* Check the parameters */ 
     945        CHECK_PARAMS_DO( CHECK_MSG(msg), return NULL ); 
     946         
     947        if (!msg->msg_cb.timeout.tv_sec) { 
     948                return NULL; 
     949        } 
     950         
     951        return &msg->msg_cb.timeout; 
     952} 
    941953 
    942954/* Associate routing lists */ 
Note: See TracChangeset for help on using the changeset viewer.