Navigation


Changeset 649:5e5d8152c229 in freeDiameter


Ignore:
Timestamp:
Jan 5, 2011, 5:13:34 PM (13 years ago)
Author:
Sebastien Decugis <sdecugis@nict.go.jp>
Branch:
default
Phase:
public
Message:

Implemented fd_msg_send_timeout to close #10. Not tested yet.

Files:
9 edited

Legend:

Unmodified
Added
Removed
  • contrib/debian/changelog

    r647 r649  
    11freediameter (1.0.4) UNRELEASED; urgency=low
    22
    3   * Added new option to specify timeout on receiving answer (#10)
     3  * Added new API to specify timeout on receiving answer (#10)
    44  * Bumped API version number accordingly.
    55
  • doc/dbg_interactive.py.sample

    r641 r649  
    464464mydwr.send()
    465465
    466 # Optionaly, a callback can be registered when a message is sent, with an optional object.
     466# Optionaly, a callback can be registered when a request is sent, with an optional object.
    467467# This callback takes the answer message as parameter and should return None or a message. (cf. fd_msg_send)
    468468def send_callback(msg, obj):
     
    475475mydwr = msg(buf)
    476476mydwr.send(send_callback, some_object)
     477
     478# Again optionaly, a time limit can be specified in this case as follow:
     479mydwr.send(send_callback, some_object, 10)
     480# In that case, if no answer / error is received after 10 seconds (the value specified),
     481# the callback is called with the request as parameter.
     482# Testing for timeout case is done by using msg.is_request()
     483def send_callback(msg, obj):
     484    if (msg.is_request()):
     485        print "Request timed out without answer:"
     486    else:
     487        print "Received answer:"
     488    msg.dump()
     489    print "Associated data:"
     490    obj
     491    return None
    477492
    478493
  • extensions/dbg_interactive/messages.i

    r640 r649  
    5555        }
    5656       
    57         SWIG_PYTHON_THREAD_BEGIN_BLOCK;
    58        
    59         if (!msg || !*msg) {
    60                 PyMsg = Py_None;
    61         } else {
    62                 PyMsg = SWIG_NewPointerObj((void *)*msg,     SWIGTYPE_p_msg,     0 );
    63         }
    64        
    65         result = PyEval_CallFunction(l->cb, "(OO)", PyMsg, l->data);
    66         Py_XDECREF(l->cb);
    67         Py_XDECREF(l->data);
    68         free(l);
    69        
    70         /* The callback is supposed to return a message or NULL */
    71         if (!SWIG_IsOK(SWIG_ConvertPtr(result, (void *)msg, SWIGTYPE_p_msg, SWIG_POINTER_DISOWN))) {
    72                 fd_log_debug("Error: Cannot convert the return value to message.\n");
    73                 *msg = NULL;
    74         }
    75        
    76         Py_XDECREF(result);
    77        
    78         SWIG_PYTHON_THREAD_END_BLOCK;
    79        
     57        if (l->cb) {
     58       
     59                SWIG_PYTHON_THREAD_BEGIN_BLOCK;
     60
     61                if (!msg || !*msg) {
     62                        PyMsg = Py_None;
     63                } else {
     64                        PyMsg = SWIG_NewPointerObj((void *)*msg,     SWIGTYPE_p_msg,     0 );
     65                }
     66
     67                result = PyEval_CallFunction(l->cb, "(OO)", PyMsg, l->data);
     68                Py_XDECREF(l->cb);
     69                Py_XDECREF(l->data);
     70                free(l);
     71
     72                /* The callback is supposed to return a message or NULL */
     73                if (!SWIG_IsOK(SWIG_ConvertPtr(result, (void *)msg, SWIGTYPE_p_msg, SWIG_POINTER_DISOWN))) {
     74                        fd_log_debug("Error: Cannot convert the return value to message.\n");
     75                        *msg = NULL;
     76                }
     77
     78                Py_XDECREF(result);
     79
     80                SWIG_PYTHON_THREAD_END_BLOCK;
     81               
     82        }
     83        /* else */
     84                /* Only the timeout was specified, without a callback */
     85                /* in this case, just delete the message */
     86                /* it actually happens automatically when we do nothing. */
    8087}
    8188%}
     
    121128        /* SEND THE MESSAGE */
    122129        %delobject send; /* when this has been called, the msg must not be freed anymore */
    123         void send(PyObject * PyCb = NULL, PyObject * data = NULL) {
     130        void send(PyObject * PyCb = NULL, PyObject * data = NULL, unsigned int timeout = 0) {
    124131                int ret;
    125132                struct msg * m = $self;
    126133                struct anscb_py_layer * l = NULL;
    127134               
    128                 if (PyCb) {
     135                if (PyCb || timeout) {
    129136                        l = malloc(sizeof(struct anscb_py_layer));
    130137                        if (!l) {
     
    139146                }
    140147               
    141                 ret = fd_msg_send(&m, PyCb ? anscb_python : NULL, l);
     148                if (timeout) {
     149                        struct timespec ts;
     150                        (void) clock_gettime(CLOCK_REALTIME, &ts);
     151                        ts.tv_sec += timeout;
     152                        ret = fd_msg_send_timeout(&m, anscb_python, l, &ts);
     153                } else {
     154                        ret = fd_msg_send(&m, PyCb ? anscb_python : NULL, l);
     155                }
    142156                if (ret != 0) {
    143157                        DI_ERROR(ret, NULL, NULL);
     
    303317        }
    304318       
     319        /* Is request? (shortcut) */
     320        PyObject * is_request() {
     321                PyObject * r;
     322                int ret;
     323                struct msg_hdr * h;
     324               
     325                ret = fd_msg_hdr($self, &h);
     326                if (ret != 0) {
     327                        DI_ERROR(ret, NULL, NULL);
     328                }
     329                if (h->msg_flags & CMD_FLAG_REQUEST)
     330                        r = Py_True;
     331                else
     332                        r = Py_False;
     333                Py_XINCREF(r);
     334                return r;
     335        }
     336       
    305337        /* Get the source */
    306338        char *source() {
  • freeDiameter/fD.h

    r447 r649  
    118118/* Sentinel for the sent requests list */
    119119struct sr_list {
    120         struct fd_list  srs;
    121         pthread_mutex_t mtx;
     120        struct fd_list  srs; /* requests ordered by hop-by-hop id */
     121        struct fd_list  exp; /* requests that have a timeout set, ordered by timeout */
     122        pthread_mutex_t mtx; /* mutex to protect these lists */
     123        pthread_cond_t  cnd; /* cond var used by the thread that handles timeouts */
     124        pthread_t       thr; /* the thread that handles timeouts (and calls the anscb) */
    122125};
    123126
     
    290293int fd_p_sr_store(struct sr_list * srlist, struct msg **req, uint32_t *hbhloc, uint32_t hbh_restore);
    291294int fd_p_sr_fetch(struct sr_list * srlist, uint32_t hbh, struct msg **req);
     295int fd_p_sr_start(struct sr_list * srlist);
     296int fd_p_sr_stop(struct sr_list * srlist);
    292297void fd_p_sr_failover(struct sr_list * srlist);
    293298
  • freeDiameter/p_sr.c

    r258 r649  
    4545        struct msg      *req;   /* A request that was sent and not yet answered. */
    4646        uint32_t        prevhbh;/* The value to set in the hbh header when the message is retrieved */
     47        struct fd_list  expire; /* the list of expiring requests */
     48        struct timespec added_on; /* the time the request was added */
    4749};
    4850
    49 /* Find an element in the list, or the following one */
     51/* Find an element in the hbh list, or the following one */
    5052static struct fd_list * find_or_next(struct fd_list * srlist, uint32_t hbh, int * match)
    5153{
     
    6668{
    6769        struct fd_list * li;
     70        struct timespec now;
    6871        if (!TRACE_BOOL(SR_DEBUG_LVL))
    6972                return;
     73        CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), );
    7074        fd_log_debug("%sSentReq list @%p:\n", text, srlist);
    7175        for (li = srlist->next; li != srlist; li = li->next) {
    7276                struct sentreq * sr = (struct sentreq *)li;
    7377                uint32_t * nexthbh = li->o;
    74                 fd_log_debug(" - Next req (%x):\n", *nexthbh);
     78                fd_log_debug(" - Next req (%x): [since %ld.%06ld sec]\n", *nexthbh,
     79                        (now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_sec - sr->added_on.tv_sec) : (now.tv_sec - sr->added_on.tv_sec - 1),
     80                        (now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_nsec - sr->added_on.tv_nsec) / 1000 : (now.tv_nsec - sr->added_on.tv_nsec + 1000000000) / 1000);
    7581                fd_msg_dump_one(SR_DEBUG_LVL + 1, sr->req);
    7682        }
    7783}
     84
     85/* (detached) thread that calls the anscb on expired messages.
     86  We do it in a separate thread to avoid blocking the reception of new messages during this time */
     87static void * call_anscb_expire(void * arg) {
     88        struct msg * expired_req = arg;
     89       
     90        void (*anscb)(void *, struct msg **);
     91        void * data;
     92       
     93        TRACE_ENTRY("%p", arg);
     94        CHECK_PARAMS_DO( arg, return NULL );
     95       
     96        /* Set the thread name */
     97        fd_log_threadname ( "Expired req cb." );
     98       
     99        /* Log */
     100        TRACE_DEBUG(INFO, "The expiration timer for a request has been reached, abording this attempt now & calling cb...");
     101       
     102        /* Retrieve callback in the message */
     103        CHECK_FCT_DO( fd_msg_anscb_get( expired_req, &anscb, &data ), return NULL);
     104        ASSERT(anscb);
     105
     106        /* Call it */
     107        (*anscb)(data, &expired_req);
     108       
     109        /* If the callback did not dispose of the message, do it now */
     110        if (expired_req) {
     111                CHECK_FCT_DO( fd_msg_free(expired_req), /* ignore */ );
     112        }
     113       
     114        /* Finish */
     115        return NULL;
     116}
     117
     118/* thread that handles messages expiring. The thread is started / stopped only when needed */
     119static void * sr_expiry_th(void * arg) {
     120        struct sr_list * srlist = arg;
     121        struct msg * expired_req;
     122        pthread_attr_t detached;
     123       
     124        TRACE_ENTRY("%p", arg);
     125        CHECK_PARAMS_DO( arg, return NULL );
     126       
     127        /* Set the thread name */
     128        {
     129                char buf[48];
     130                sprintf(buf, "ReqExp/%.*s", (int)sizeof(buf) - 8, ((struct fd_peer *)(srlist->exp.o))->p_hdr.info.pi_diamid);
     131                fd_log_threadname ( buf );
     132        }
     133       
     134        CHECK_POSIX_DO( pthread_attr_init(&detached), return NULL );
     135        CHECK_POSIX_DO( pthread_attr_setdetachstate(&detached, PTHREAD_CREATE_DETACHED), return NULL );
     136       
     137        CHECK_POSIX_DO( pthread_mutex_lock(&srlist->mtx),  return NULL );
     138        pthread_cleanup_push( fd_cleanup_mutex, &srlist->mtx );
     139       
     140        do {
     141                struct timespec now, *t;
     142                struct sentreq * first;
     143                pthread_t th;
     144               
     145                /* Check if there are expiring requests available */
     146                if (FD_IS_LIST_EMPTY(&srlist->exp)) {
     147                        /* Just wait for a change or cancelation */
     148                        CHECK_POSIX_DO( pthread_cond_wait( &srlist->cnd, &srlist->mtx ), goto error );
     149                        /* Restart the loop on wakeup */
     150                        continue;
     151                }
     152               
     153                /* Get the pointer to the request that expires first */
     154                first = (struct sentreq *)(srlist->exp.next->o);
     155                t = fd_msg_anscb_gettimeout( first->req );
     156                ASSERT(t);
     157               
     158                /* Get the current time */
     159                CHECK_SYS_DO(  clock_gettime(CLOCK_REALTIME, &now),  goto error  );
     160
     161                /* If first request is not expired, we just wait until it happens */
     162                if ( TS_IS_INFERIOR( &now, t ) ) {
     163                       
     164                        CHECK_POSIX_DO2(  pthread_cond_timedwait( &srlist->cnd, &srlist->mtx, t ), 
     165                                        ETIMEDOUT, /* ETIMEDOUT is a normal return value, continue */,
     166                                        /* on other error, */ goto error );
     167       
     168                        /* on wakeup, loop */
     169                        continue;
     170                }
     171               
     172                /* Now, the first request in the list is expired; remove it and call the anscb for it in a new thread */
     173                fd_list_unlink(&first->chain);
     174                fd_list_unlink(&first->expire);
     175                expired_req = first->req;
     176                free(first);
     177               
     178                CHECK_POSIX_DO( pthread_create( &th, &detached, call_anscb_expire, expired_req ), goto error );
     179
     180                /* loop */
     181        } while (1);
     182error: 
     183        pthread_cleanup_pop( 1 );
     184        return NULL;
     185}
     186
    78187
    79188/* Store a new sent request */
     
    83192        struct fd_list * next;
    84193        int match;
     194        struct timespec * ts;
    85195       
    86196        TRACE_ENTRY("%p %p %p %x", srlist, req, hbhloc, hbh_restore);
     
    92202        sr->req = *req;
    93203        sr->prevhbh = hbh_restore;
     204        fd_list_init(&sr->expire, sr);
     205        CHECK_SYS( clock_gettime(CLOCK_REALTIME, &sr->added_on) );
    94206       
    95207        /* Search the place in the list */
     
    107219        fd_list_insert_before(next, &sr->chain);
    108220        srl_dump("Saved new request, ", &srlist->srs);
     221       
     222        /* In case of request with a timeout, also store in the timeout list */
     223        ts = fd_msg_anscb_gettimeout( sr->req );
     224        if (ts) {
     225                struct fd_list * li;
     226                struct timespec * t;
     227               
     228                /* browse srlist->exp from the end */
     229                for (li = srlist->exp.prev; li != &srlist->exp; li = li->prev) {
     230                        struct sentreq * s = (struct sentreq *)(li->o);
     231                        t = fd_msg_anscb_gettimeout( s->req );
     232                        ASSERT( t ); /* sanity */
     233                        if (TS_IS_INFERIOR(t, ts))
     234                                break;
     235                }
     236               
     237                fd_list_insert_after(li, &sr->expire);
     238       
     239                /* if the thread does not exist yet, create it */
     240                if (srlist->thr == (pthread_t)NULL) {
     241                        CHECK_POSIX_DO( pthread_create(&srlist->thr, NULL, sr_expiry_th, srlist), /* continue anyway */);
     242                } else {
     243                        /* or, if added in first position, signal the condvar to update the sleep time of the thread */
     244                        if (li == &srlist->exp) {
     245                                CHECK_POSIX_DO( pthread_cond_signal(&srlist->cnd), /* continue anyway */);
     246                        }
     247                }
     248        }
     249       
    109250        CHECK_POSIX( pthread_mutex_unlock(&srlist->mtx) );
    110251        return 0;
     
    132273                /* Unlink */
    133274                fd_list_unlink(&sr->chain);
     275                fd_list_unlink(&sr->expire);
    134276                *req = sr->req;
    135277                free(sr);
    136278        }
    137279        CHECK_POSIX( pthread_mutex_unlock(&srlist->mtx) );
     280       
     281        /* do not stop the expire thread here, it might cause creating/destroying it very often otherwise */
    138282
    139283        /* Done */
     
    148292                struct sentreq * sr = (struct sentreq *)(srlist->srs.next);
    149293                fd_list_unlink(&sr->chain);
     294                fd_list_unlink(&sr->expire);
    150295                if (fd_msg_is_routable(sr->req)) {
    151296                        struct msg_hdr * hdr = NULL;
     
    165310                free(sr);
    166311        }
     312        /* The list of expiring requests must be empty now */
     313        ASSERT( FD_IS_LIST_EMPTY(&srlist->exp) );
     314       
    167315        CHECK_POSIX_DO( pthread_mutex_unlock(&srlist->mtx), /* continue anyway */ );
    168 }
    169 
     316       
     317        /* Terminate the expiry thread (must be done when the lock can be taken) */
     318        CHECK_FCT_DO( fd_thr_term(&srlist->thr), /* ignore error */ );
     319}
     320
  • freeDiameter/peers.c

    r454 r649  
    7979       
    8080        fd_list_init(&p->p_sr.srs, p);
     81        fd_list_init(&p->p_sr.exp, p);
    8182        CHECK_POSIX( pthread_mutex_init(&p->p_sr.mtx, NULL) );
     83        CHECK_POSIX( pthread_cond_init(&p->p_sr.cnd, NULL) );
    8284       
    8385        fd_list_init(&p->p_connparams, p);
     
    251253        CHECK_FCT_DO( fd_fifo_del(&p->p_tosend), /* continue */ );
    252254        CHECK_POSIX_DO( pthread_mutex_destroy(&p->p_sr.mtx), /* continue */);
     255        CHECK_POSIX_DO( pthread_cond_destroy(&p->p_sr.cnd), /* continue */);
    253256       
    254257        /* If the callback is still around... */
  • freeDiameter/routing_dispatch.c

    r563 r649  
    799799        CHECK_FCT( fd_msg_hdr(*pmsg, &hdr) );
    800800        is_req = hdr->msg_flags & CMD_FLAG_REQUEST;
    801                
     801       
    802802        /* For answers, the routing is very easy */
    803803        if ( ! is_req ) {
  • include/freeDiameter/libfreeDiameter.h

    r648 r649  
    21472147int fd_msg_anscb_associate( struct msg * msg, void ( *anscb)(void *, struct msg **), void  * data, const struct timespec *timeout );
    21482148int fd_msg_anscb_get      ( struct msg * msg, void (**anscb)(void *, struct msg **), void ** data );
     2149struct timespec *fd_msg_anscb_gettimeout( struct msg * msg ); /* returns NULL or a valid non-0 timespec */
    21492150
    21502151/*
  • libfreeDiameter/messages.c

    r648 r649  
    919919        if (timeout) {
    920920                memcpy(&msg->msg_cb.timeout, timeout, sizeof(struct timespec));
    921         } else {
    922                 memset(&msg->msg_cb.timeout, 0, sizeof(struct timespec)); /* clear the area */
    923921        }
    924922       
     
    938936       
    939937        return 0;
    940 }       
     938}
     939
     940struct timespec *fd_msg_anscb_gettimeout( struct msg * msg )
     941{
     942        TRACE_ENTRY("%p", msg);
     943       
     944        /* Check the parameters */
     945        CHECK_PARAMS_DO( CHECK_MSG(msg), return NULL );
     946       
     947        if (!msg->msg_cb.timeout.tv_sec) {
     948                return NULL;
     949        }
     950       
     951        return &msg->msg_cb.timeout;
     952}
    941953
    942954/* Associate routing lists */
Note: See TracChangeset for help on using the changeset viewer.