Mercurial > hg > freeDiameter
changeset 649:5e5d8152c229
Implemented fd_msg_send_timeout to close #10. Not tested yet.
author | Sebastien Decugis <sdecugis@nict.go.jp> |
---|---|
date | Wed, 05 Jan 2011 17:13:34 +0900 |
parents | ae29bf971f20 |
children | 7e9a5e9aad64 |
files | contrib/debian/changelog doc/dbg_interactive.py.sample extensions/dbg_interactive/messages.i freeDiameter/fD.h freeDiameter/p_sr.c freeDiameter/peers.c freeDiameter/routing_dispatch.c include/freeDiameter/libfreeDiameter.h libfreeDiameter/messages.c |
diffstat | 9 files changed, 253 insertions(+), 34 deletions(-) [+] |
line wrap: on
line diff
--- a/contrib/debian/changelog Tue Jan 04 16:21:26 2011 +0900 +++ b/contrib/debian/changelog Wed Jan 05 17:13:34 2011 +0900 @@ -1,6 +1,6 @@ freediameter (1.0.4) UNRELEASED; urgency=low - * Added new option to specify timeout on receiving answer (#10) + * Added new API to specify timeout on receiving answer (#10) * Bumped API version number accordingly. -- Sebastien Decugis <sdecugis@nict.go.jp> Tue, 04 Jan 2011 16:18:53 +0900
--- a/doc/dbg_interactive.py.sample Tue Jan 04 16:21:26 2011 +0900 +++ b/doc/dbg_interactive.py.sample Wed Jan 05 17:13:34 2011 +0900 @@ -463,7 +463,7 @@ mydwr = msg(buf) mydwr.send() -# Optionaly, a callback can be registered when a message is sent, with an optional object. +# Optionaly, a callback can be registered when a request is sent, with an optional object. # This callback takes the answer message as parameter and should return None or a message. (cf. fd_msg_send) def send_callback(msg, obj): print "Received answer:" @@ -475,6 +475,21 @@ mydwr = msg(buf) mydwr.send(send_callback, some_object) +# Again optionaly, a time limit can be specified in this case as follow: +mydwr.send(send_callback, some_object, 10) +# In that case, if no answer / error is received after 10 seconds (the value specified), +# the callback is called with the request as parameter. +# Testing for timeout case is done by using msg.is_request() +def send_callback(msg, obj): + if (msg.is_request()): + print "Request timed out without answer:" + else: + print "Received answer:" + msg.dump() + print "Associated data:" + obj + return None + # Set a result code in an answer message. mydwr = msg(buf)
--- a/extensions/dbg_interactive/messages.i Tue Jan 04 16:21:26 2011 +0900 +++ b/extensions/dbg_interactive/messages.i Wed Jan 05 17:13:34 2011 +0900 @@ -54,29 +54,36 @@ return; } - SWIG_PYTHON_THREAD_BEGIN_BLOCK; - - if (!msg || !*msg) { - PyMsg = Py_None; - } else { - PyMsg = SWIG_NewPointerObj((void *)*msg, SWIGTYPE_p_msg, 0 ); - } + if (l->cb) { - result = PyEval_CallFunction(l->cb, "(OO)", PyMsg, l->data); - Py_XDECREF(l->cb); - Py_XDECREF(l->data); - free(l); - - /* The callback is supposed to return a message or NULL */ - if (!SWIG_IsOK(SWIG_ConvertPtr(result, (void *)msg, SWIGTYPE_p_msg, SWIG_POINTER_DISOWN))) { - fd_log_debug("Error: Cannot convert the return value to message.\n"); - *msg = NULL; + SWIG_PYTHON_THREAD_BEGIN_BLOCK; + + if (!msg || !*msg) { + PyMsg = Py_None; + } else { + PyMsg = SWIG_NewPointerObj((void *)*msg, SWIGTYPE_p_msg, 0 ); + } + + result = PyEval_CallFunction(l->cb, "(OO)", PyMsg, l->data); + Py_XDECREF(l->cb); + Py_XDECREF(l->data); + free(l); + + /* The callback is supposed to return a message or NULL */ + if (!SWIG_IsOK(SWIG_ConvertPtr(result, (void *)msg, SWIGTYPE_p_msg, SWIG_POINTER_DISOWN))) { + fd_log_debug("Error: Cannot convert the return value to message.\n"); + *msg = NULL; + } + + Py_XDECREF(result); + + SWIG_PYTHON_THREAD_END_BLOCK; + } - - Py_XDECREF(result); - - SWIG_PYTHON_THREAD_END_BLOCK; - + /* else */ + /* Only the timeout was specified, without a callback */ + /* in this case, just delete the message */ + /* it actually happens automatically when we do nothing. */ } %} @@ -120,12 +127,12 @@ /* SEND THE MESSAGE */ %delobject send; /* when this has been called, the msg must not be freed anymore */ - void send(PyObject * PyCb = NULL, PyObject * data = NULL) { + void send(PyObject * PyCb = NULL, PyObject * data = NULL, unsigned int timeout = 0) { int ret; struct msg * m = $self; struct anscb_py_layer * l = NULL; - if (PyCb) { + if (PyCb || timeout) { l = malloc(sizeof(struct anscb_py_layer)); if (!l) { DI_ERROR_MALLOC; @@ -138,7 +145,14 @@ l->data = data; } - ret = fd_msg_send(&m, PyCb ? anscb_python : NULL, l); + if (timeout) { + struct timespec ts; + (void) clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += timeout; + ret = fd_msg_send_timeout(&m, anscb_python, l, &ts); + } else { + ret = fd_msg_send(&m, PyCb ? anscb_python : NULL, l); + } if (ret != 0) { DI_ERROR(ret, NULL, NULL); } @@ -302,6 +316,24 @@ return r; } + /* Is request? (shortcut) */ + PyObject * is_request() { + PyObject * r; + int ret; + struct msg_hdr * h; + + ret = fd_msg_hdr($self, &h); + if (ret != 0) { + DI_ERROR(ret, NULL, NULL); + } + if (h->msg_flags & CMD_FLAG_REQUEST) + r = Py_True; + else + r = Py_False; + Py_XINCREF(r); + return r; + } + /* Get the source */ char *source() { char * s = NULL;
--- a/freeDiameter/fD.h Tue Jan 04 16:21:26 2011 +0900 +++ b/freeDiameter/fD.h Wed Jan 05 17:13:34 2011 +0900 @@ -117,8 +117,11 @@ /* Sentinel for the sent requests list */ struct sr_list { - struct fd_list srs; - pthread_mutex_t mtx; + struct fd_list srs; /* requests ordered by hop-by-hop id */ + struct fd_list exp; /* requests that have a timeout set, ordered by timeout */ + pthread_mutex_t mtx; /* mutex to protect these lists */ + pthread_cond_t cnd; /* cond var used by the thread that handles timeouts */ + pthread_t thr; /* the thread that handles timeouts (and calls the anscb) */ }; /* Peers */ @@ -289,6 +292,8 @@ /* Peer sent requests cache */ int fd_p_sr_store(struct sr_list * srlist, struct msg **req, uint32_t *hbhloc, uint32_t hbh_restore); int fd_p_sr_fetch(struct sr_list * srlist, uint32_t hbh, struct msg **req); +int fd_p_sr_start(struct sr_list * srlist); +int fd_p_sr_stop(struct sr_list * srlist); void fd_p_sr_failover(struct sr_list * srlist); /* Local Link messages (CER/CEA, DWR/DWA, DPR/DPA) */
--- a/freeDiameter/p_sr.c Tue Jan 04 16:21:26 2011 +0900 +++ b/freeDiameter/p_sr.c Wed Jan 05 17:13:34 2011 +0900 @@ -44,9 +44,11 @@ struct fd_list chain; /* the "o" field points directly to the hop-by-hop of the request (uint32_t *) */ struct msg *req; /* A request that was sent and not yet answered. */ uint32_t prevhbh;/* The value to set in the hbh header when the message is retrieved */ + struct fd_list expire; /* the list of expiring requests */ + struct timespec added_on; /* the time the request was added */ }; -/* Find an element in the list, or the following one */ +/* Find an element in the hbh list, or the following one */ static struct fd_list * find_or_next(struct fd_list * srlist, uint32_t hbh, int * match) { struct fd_list * li; @@ -65,23 +67,131 @@ static void srl_dump(const char * text, struct fd_list * srlist) { struct fd_list * li; + struct timespec now; if (!TRACE_BOOL(SR_DEBUG_LVL)) return; + CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), ); fd_log_debug("%sSentReq list @%p:\n", text, srlist); for (li = srlist->next; li != srlist; li = li->next) { struct sentreq * sr = (struct sentreq *)li; uint32_t * nexthbh = li->o; - fd_log_debug(" - Next req (%x):\n", *nexthbh); + fd_log_debug(" - Next req (%x): [since %ld.%06ld sec]\n", *nexthbh, + (now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_sec - sr->added_on.tv_sec) : (now.tv_sec - sr->added_on.tv_sec - 1), + (now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_nsec - sr->added_on.tv_nsec) / 1000 : (now.tv_nsec - sr->added_on.tv_nsec + 1000000000) / 1000); fd_msg_dump_one(SR_DEBUG_LVL + 1, sr->req); } } +/* (detached) thread that calls the anscb on expired messages. + We do it in a separate thread to avoid blocking the reception of new messages during this time */ +static void * call_anscb_expire(void * arg) { + struct msg * expired_req = arg; + + void (*anscb)(void *, struct msg **); + void * data; + + TRACE_ENTRY("%p", arg); + CHECK_PARAMS_DO( arg, return NULL ); + + /* Set the thread name */ + fd_log_threadname ( "Expired req cb." ); + + /* Log */ + TRACE_DEBUG(INFO, "The expiration timer for a request has been reached, abording this attempt now & calling cb..."); + + /* Retrieve callback in the message */ + CHECK_FCT_DO( fd_msg_anscb_get( expired_req, &anscb, &data ), return NULL); + ASSERT(anscb); + + /* Call it */ + (*anscb)(data, &expired_req); + + /* If the callback did not dispose of the message, do it now */ + if (expired_req) { + CHECK_FCT_DO( fd_msg_free(expired_req), /* ignore */ ); + } + + /* Finish */ + return NULL; +} + +/* thread that handles messages expiring. The thread is started / stopped only when needed */ +static void * sr_expiry_th(void * arg) { + struct sr_list * srlist = arg; + struct msg * expired_req; + pthread_attr_t detached; + + TRACE_ENTRY("%p", arg); + CHECK_PARAMS_DO( arg, return NULL ); + + /* Set the thread name */ + { + char buf[48]; + sprintf(buf, "ReqExp/%.*s", (int)sizeof(buf) - 8, ((struct fd_peer *)(srlist->exp.o))->p_hdr.info.pi_diamid); + fd_log_threadname ( buf ); + } + + CHECK_POSIX_DO( pthread_attr_init(&detached), return NULL ); + CHECK_POSIX_DO( pthread_attr_setdetachstate(&detached, PTHREAD_CREATE_DETACHED), return NULL ); + + CHECK_POSIX_DO( pthread_mutex_lock(&srlist->mtx), return NULL ); + pthread_cleanup_push( fd_cleanup_mutex, &srlist->mtx ); + + do { + struct timespec now, *t; + struct sentreq * first; + pthread_t th; + + /* Check if there are expiring requests available */ + if (FD_IS_LIST_EMPTY(&srlist->exp)) { + /* Just wait for a change or cancelation */ + CHECK_POSIX_DO( pthread_cond_wait( &srlist->cnd, &srlist->mtx ), goto error ); + /* Restart the loop on wakeup */ + continue; + } + + /* Get the pointer to the request that expires first */ + first = (struct sentreq *)(srlist->exp.next->o); + t = fd_msg_anscb_gettimeout( first->req ); + ASSERT(t); + + /* Get the current time */ + CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), goto error ); + + /* If first request is not expired, we just wait until it happens */ + if ( TS_IS_INFERIOR( &now, t ) ) { + + CHECK_POSIX_DO2( pthread_cond_timedwait( &srlist->cnd, &srlist->mtx, t ), + ETIMEDOUT, /* ETIMEDOUT is a normal return value, continue */, + /* on other error, */ goto error ); + + /* on wakeup, loop */ + continue; + } + + /* Now, the first request in the list is expired; remove it and call the anscb for it in a new thread */ + fd_list_unlink(&first->chain); + fd_list_unlink(&first->expire); + expired_req = first->req; + free(first); + + CHECK_POSIX_DO( pthread_create( &th, &detached, call_anscb_expire, expired_req ), goto error ); + + /* loop */ + } while (1); +error: + pthread_cleanup_pop( 1 ); + return NULL; +} + + /* Store a new sent request */ int fd_p_sr_store(struct sr_list * srlist, struct msg **req, uint32_t *hbhloc, uint32_t hbh_restore) { struct sentreq * sr; struct fd_list * next; int match; + struct timespec * ts; TRACE_ENTRY("%p %p %p %x", srlist, req, hbhloc, hbh_restore); CHECK_PARAMS(srlist && req && *req && hbhloc); @@ -91,6 +201,8 @@ fd_list_init(&sr->chain, hbhloc); sr->req = *req; sr->prevhbh = hbh_restore; + fd_list_init(&sr->expire, sr); + CHECK_SYS( clock_gettime(CLOCK_REALTIME, &sr->added_on) ); /* Search the place in the list */ CHECK_POSIX( pthread_mutex_lock(&srlist->mtx) ); @@ -106,6 +218,35 @@ *req = NULL; fd_list_insert_before(next, &sr->chain); srl_dump("Saved new request, ", &srlist->srs); + + /* In case of request with a timeout, also store in the timeout list */ + ts = fd_msg_anscb_gettimeout( sr->req ); + if (ts) { + struct fd_list * li; + struct timespec * t; + + /* browse srlist->exp from the end */ + for (li = srlist->exp.prev; li != &srlist->exp; li = li->prev) { + struct sentreq * s = (struct sentreq *)(li->o); + t = fd_msg_anscb_gettimeout( s->req ); + ASSERT( t ); /* sanity */ + if (TS_IS_INFERIOR(t, ts)) + break; + } + + fd_list_insert_after(li, &sr->expire); + + /* if the thread does not exist yet, create it */ + if (srlist->thr == (pthread_t)NULL) { + CHECK_POSIX_DO( pthread_create(&srlist->thr, NULL, sr_expiry_th, srlist), /* continue anyway */); + } else { + /* or, if added in first position, signal the condvar to update the sleep time of the thread */ + if (li == &srlist->exp) { + CHECK_POSIX_DO( pthread_cond_signal(&srlist->cnd), /* continue anyway */); + } + } + } + CHECK_POSIX( pthread_mutex_unlock(&srlist->mtx) ); return 0; } @@ -131,10 +272,13 @@ *((uint32_t *)sr->chain.o) = sr->prevhbh; /* Unlink */ fd_list_unlink(&sr->chain); + fd_list_unlink(&sr->expire); *req = sr->req; free(sr); } CHECK_POSIX( pthread_mutex_unlock(&srlist->mtx) ); + + /* do not stop the expire thread here, it might cause creating/destroying it very often otherwise */ /* Done */ return 0; @@ -147,6 +291,7 @@ while (!FD_IS_LIST_EMPTY(&srlist->srs)) { struct sentreq * sr = (struct sentreq *)(srlist->srs.next); fd_list_unlink(&sr->chain); + fd_list_unlink(&sr->expire); if (fd_msg_is_routable(sr->req)) { struct msg_hdr * hdr = NULL; @@ -164,6 +309,12 @@ } free(sr); } + /* The list of expiring requests must be empty now */ + ASSERT( FD_IS_LIST_EMPTY(&srlist->exp) ); + CHECK_POSIX_DO( pthread_mutex_unlock(&srlist->mtx), /* continue anyway */ ); + + /* Terminate the expiry thread (must be done when the lock can be taken) */ + CHECK_FCT_DO( fd_thr_term(&srlist->thr), /* ignore error */ ); }
--- a/freeDiameter/peers.c Tue Jan 04 16:21:26 2011 +0900 +++ b/freeDiameter/peers.c Wed Jan 05 17:13:34 2011 +0900 @@ -78,7 +78,9 @@ p->p_hbh = lrand48(); fd_list_init(&p->p_sr.srs, p); + fd_list_init(&p->p_sr.exp, p); CHECK_POSIX( pthread_mutex_init(&p->p_sr.mtx, NULL) ); + CHECK_POSIX( pthread_cond_init(&p->p_sr.cnd, NULL) ); fd_list_init(&p->p_connparams, p); @@ -250,6 +252,7 @@ CHECK_FCT_DO( fd_fifo_del(&p->p_tosend), /* continue */ ); CHECK_POSIX_DO( pthread_mutex_destroy(&p->p_sr.mtx), /* continue */); + CHECK_POSIX_DO( pthread_cond_destroy(&p->p_sr.cnd), /* continue */); /* If the callback is still around... */ if (p->p_cb)
--- a/freeDiameter/routing_dispatch.c Tue Jan 04 16:21:26 2011 +0900 +++ b/freeDiameter/routing_dispatch.c Wed Jan 05 17:13:34 2011 +0900 @@ -798,7 +798,7 @@ /* Read the message header */ CHECK_FCT( fd_msg_hdr(*pmsg, &hdr) ); is_req = hdr->msg_flags & CMD_FLAG_REQUEST; - + /* For answers, the routing is very easy */ if ( ! is_req ) { struct msg * qry;
--- a/include/freeDiameter/libfreeDiameter.h Tue Jan 04 16:21:26 2011 +0900 +++ b/include/freeDiameter/libfreeDiameter.h Wed Jan 05 17:13:34 2011 +0900 @@ -2146,6 +2146,7 @@ */ int fd_msg_anscb_associate( struct msg * msg, void ( *anscb)(void *, struct msg **), void * data, const struct timespec *timeout ); int fd_msg_anscb_get ( struct msg * msg, void (**anscb)(void *, struct msg **), void ** data ); +struct timespec *fd_msg_anscb_gettimeout( struct msg * msg ); /* returns NULL or a valid non-0 timespec */ /* * FUNCTION: fd_msg_rt_associate, fd_msg_rt_get
--- a/libfreeDiameter/messages.c Tue Jan 04 16:21:26 2011 +0900 +++ b/libfreeDiameter/messages.c Wed Jan 05 17:13:34 2011 +0900 @@ -918,8 +918,6 @@ msg->msg_cb.data = data; if (timeout) { memcpy(&msg->msg_cb.timeout, timeout, sizeof(struct timespec)); - } else { - memset(&msg->msg_cb.timeout, 0, sizeof(struct timespec)); /* clear the area */ } return 0; @@ -937,7 +935,21 @@ *data = msg->msg_cb.data; return 0; -} +} + +struct timespec *fd_msg_anscb_gettimeout( struct msg * msg ) +{ + TRACE_ENTRY("%p", msg); + + /* Check the parameters */ + CHECK_PARAMS_DO( CHECK_MSG(msg), return NULL ); + + if (!msg->msg_cb.timeout.tv_sec) { + return NULL; + } + + return &msg->msg_cb.timeout; +} /* Associate routing lists */ int fd_msg_rt_associate( struct msg * msg, struct rt_data ** rtd )