Changeset 649:5e5d8152c229 in freeDiameter
- Timestamp:
- Jan 5, 2011, 5:13:34 PM (13 years ago)
- Branch:
- default
- Phase:
- public
- Files:
-
- 9 edited
Legend:
- Unmodified
- Added
- Removed
-
contrib/debian/changelog
r647 r649 1 1 freediameter (1.0.4) UNRELEASED; urgency=low 2 2 3 * Added new optionto specify timeout on receiving answer (#10)3 * Added new API to specify timeout on receiving answer (#10) 4 4 * Bumped API version number accordingly. 5 5 -
doc/dbg_interactive.py.sample
r641 r649 464 464 mydwr.send() 465 465 466 # Optionaly, a callback can be registered when a messageis sent, with an optional object.466 # Optionaly, a callback can be registered when a request is sent, with an optional object. 467 467 # This callback takes the answer message as parameter and should return None or a message. (cf. fd_msg_send) 468 468 def send_callback(msg, obj): … … 475 475 mydwr = msg(buf) 476 476 mydwr.send(send_callback, some_object) 477 478 # Again optionaly, a time limit can be specified in this case as follow: 479 mydwr.send(send_callback, some_object, 10) 480 # In that case, if no answer / error is received after 10 seconds (the value specified), 481 # the callback is called with the request as parameter. 482 # Testing for timeout case is done by using msg.is_request() 483 def send_callback(msg, obj): 484 if (msg.is_request()): 485 print "Request timed out without answer:" 486 else: 487 print "Received answer:" 488 msg.dump() 489 print "Associated data:" 490 obj 491 return None 477 492 478 493 -
extensions/dbg_interactive/messages.i
r640 r649 55 55 } 56 56 57 SWIG_PYTHON_THREAD_BEGIN_BLOCK; 58 59 if (!msg || !*msg) { 60 PyMsg = Py_None; 61 } else { 62 PyMsg = SWIG_NewPointerObj((void *)*msg, SWIGTYPE_p_msg, 0 ); 63 } 64 65 result = PyEval_CallFunction(l->cb, "(OO)", PyMsg, l->data); 66 Py_XDECREF(l->cb); 67 Py_XDECREF(l->data); 68 free(l); 69 70 /* The callback is supposed to return a message or NULL */ 71 if (!SWIG_IsOK(SWIG_ConvertPtr(result, (void *)msg, SWIGTYPE_p_msg, SWIG_POINTER_DISOWN))) { 72 fd_log_debug("Error: Cannot convert the return value to message.\n"); 73 *msg = NULL; 74 } 75 76 Py_XDECREF(result); 77 78 SWIG_PYTHON_THREAD_END_BLOCK; 79 57 if (l->cb) { 58 59 SWIG_PYTHON_THREAD_BEGIN_BLOCK; 60 61 if (!msg || !*msg) { 62 PyMsg = Py_None; 63 } else { 64 PyMsg = SWIG_NewPointerObj((void *)*msg, SWIGTYPE_p_msg, 0 ); 65 } 66 67 result = PyEval_CallFunction(l->cb, "(OO)", PyMsg, l->data); 68 Py_XDECREF(l->cb); 69 Py_XDECREF(l->data); 70 free(l); 71 72 /* The callback is supposed to return a message or NULL */ 73 if (!SWIG_IsOK(SWIG_ConvertPtr(result, (void *)msg, SWIGTYPE_p_msg, SWIG_POINTER_DISOWN))) { 74 fd_log_debug("Error: Cannot convert the return value to message.\n"); 75 *msg = NULL; 76 } 77 78 Py_XDECREF(result); 79 80 SWIG_PYTHON_THREAD_END_BLOCK; 81 82 } 83 /* else */ 84 /* Only the timeout was specified, without a callback */ 85 /* in this case, just delete the message */ 86 /* it actually happens automatically when we do nothing. */ 80 87 } 81 88 %} … … 121 128 /* SEND THE MESSAGE */ 122 129 %delobject send; /* when this has been called, the msg must not be freed anymore */ 123 void send(PyObject * PyCb = NULL, PyObject * data = NULL ) {130 void send(PyObject * PyCb = NULL, PyObject * data = NULL, unsigned int timeout = 0) { 124 131 int ret; 125 132 struct msg * m = $self; 126 133 struct anscb_py_layer * l = NULL; 127 134 128 if (PyCb ) {135 if (PyCb || timeout) { 129 136 l = malloc(sizeof(struct anscb_py_layer)); 130 137 if (!l) { … … 139 146 } 140 147 141 ret = fd_msg_send(&m, PyCb ? anscb_python : NULL, l); 148 if (timeout) { 149 struct timespec ts; 150 (void) clock_gettime(CLOCK_REALTIME, &ts); 151 ts.tv_sec += timeout; 152 ret = fd_msg_send_timeout(&m, anscb_python, l, &ts); 153 } else { 154 ret = fd_msg_send(&m, PyCb ? anscb_python : NULL, l); 155 } 142 156 if (ret != 0) { 143 157 DI_ERROR(ret, NULL, NULL); … … 303 317 } 304 318 319 /* Is request? (shortcut) */ 320 PyObject * is_request() { 321 PyObject * r; 322 int ret; 323 struct msg_hdr * h; 324 325 ret = fd_msg_hdr($self, &h); 326 if (ret != 0) { 327 DI_ERROR(ret, NULL, NULL); 328 } 329 if (h->msg_flags & CMD_FLAG_REQUEST) 330 r = Py_True; 331 else 332 r = Py_False; 333 Py_XINCREF(r); 334 return r; 335 } 336 305 337 /* Get the source */ 306 338 char *source() { -
freeDiameter/fD.h
r447 r649 118 118 /* Sentinel for the sent requests list */ 119 119 struct sr_list { 120 struct fd_list srs; 121 pthread_mutex_t mtx; 120 struct fd_list srs; /* requests ordered by hop-by-hop id */ 121 struct fd_list exp; /* requests that have a timeout set, ordered by timeout */ 122 pthread_mutex_t mtx; /* mutex to protect these lists */ 123 pthread_cond_t cnd; /* cond var used by the thread that handles timeouts */ 124 pthread_t thr; /* the thread that handles timeouts (and calls the anscb) */ 122 125 }; 123 126 … … 290 293 int fd_p_sr_store(struct sr_list * srlist, struct msg **req, uint32_t *hbhloc, uint32_t hbh_restore); 291 294 int fd_p_sr_fetch(struct sr_list * srlist, uint32_t hbh, struct msg **req); 295 int fd_p_sr_start(struct sr_list * srlist); 296 int fd_p_sr_stop(struct sr_list * srlist); 292 297 void fd_p_sr_failover(struct sr_list * srlist); 293 298 -
freeDiameter/p_sr.c
r258 r649 45 45 struct msg *req; /* A request that was sent and not yet answered. */ 46 46 uint32_t prevhbh;/* The value to set in the hbh header when the message is retrieved */ 47 struct fd_list expire; /* the list of expiring requests */ 48 struct timespec added_on; /* the time the request was added */ 47 49 }; 48 50 49 /* Find an element in the list, or the following one */51 /* Find an element in the hbh list, or the following one */ 50 52 static struct fd_list * find_or_next(struct fd_list * srlist, uint32_t hbh, int * match) 51 53 { … … 66 68 { 67 69 struct fd_list * li; 70 struct timespec now; 68 71 if (!TRACE_BOOL(SR_DEBUG_LVL)) 69 72 return; 73 CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), ); 70 74 fd_log_debug("%sSentReq list @%p:\n", text, srlist); 71 75 for (li = srlist->next; li != srlist; li = li->next) { 72 76 struct sentreq * sr = (struct sentreq *)li; 73 77 uint32_t * nexthbh = li->o; 74 fd_log_debug(" - Next req (%x):\n", *nexthbh); 78 fd_log_debug(" - Next req (%x): [since %ld.%06ld sec]\n", *nexthbh, 79 (now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_sec - sr->added_on.tv_sec) : (now.tv_sec - sr->added_on.tv_sec - 1), 80 (now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_nsec - sr->added_on.tv_nsec) / 1000 : (now.tv_nsec - sr->added_on.tv_nsec + 1000000000) / 1000); 75 81 fd_msg_dump_one(SR_DEBUG_LVL + 1, sr->req); 76 82 } 77 83 } 84 85 /* (detached) thread that calls the anscb on expired messages. 86 We do it in a separate thread to avoid blocking the reception of new messages during this time */ 87 static void * call_anscb_expire(void * arg) { 88 struct msg * expired_req = arg; 89 90 void (*anscb)(void *, struct msg **); 91 void * data; 92 93 TRACE_ENTRY("%p", arg); 94 CHECK_PARAMS_DO( arg, return NULL ); 95 96 /* Set the thread name */ 97 fd_log_threadname ( "Expired req cb." ); 98 99 /* Log */ 100 TRACE_DEBUG(INFO, "The expiration timer for a request has been reached, abording this attempt now & calling cb..."); 101 102 /* Retrieve callback in the message */ 103 CHECK_FCT_DO( fd_msg_anscb_get( expired_req, &anscb, &data ), return NULL); 104 ASSERT(anscb); 105 106 /* Call it */ 107 (*anscb)(data, &expired_req); 108 109 /* If the callback did not dispose of the message, do it now */ 110 if (expired_req) { 111 CHECK_FCT_DO( fd_msg_free(expired_req), /* ignore */ ); 112 } 113 114 /* Finish */ 115 return NULL; 116 } 117 118 /* thread that handles messages expiring. The thread is started / stopped only when needed */ 119 static void * sr_expiry_th(void * arg) { 120 struct sr_list * srlist = arg; 121 struct msg * expired_req; 122 pthread_attr_t detached; 123 124 TRACE_ENTRY("%p", arg); 125 CHECK_PARAMS_DO( arg, return NULL ); 126 127 /* Set the thread name */ 128 { 129 char buf[48]; 130 sprintf(buf, "ReqExp/%.*s", (int)sizeof(buf) - 8, ((struct fd_peer *)(srlist->exp.o))->p_hdr.info.pi_diamid); 131 fd_log_threadname ( buf ); 132 } 133 134 CHECK_POSIX_DO( pthread_attr_init(&detached), return NULL ); 135 CHECK_POSIX_DO( pthread_attr_setdetachstate(&detached, PTHREAD_CREATE_DETACHED), return NULL ); 136 137 CHECK_POSIX_DO( pthread_mutex_lock(&srlist->mtx), return NULL ); 138 pthread_cleanup_push( fd_cleanup_mutex, &srlist->mtx ); 139 140 do { 141 struct timespec now, *t; 142 struct sentreq * first; 143 pthread_t th; 144 145 /* Check if there are expiring requests available */ 146 if (FD_IS_LIST_EMPTY(&srlist->exp)) { 147 /* Just wait for a change or cancelation */ 148 CHECK_POSIX_DO( pthread_cond_wait( &srlist->cnd, &srlist->mtx ), goto error ); 149 /* Restart the loop on wakeup */ 150 continue; 151 } 152 153 /* Get the pointer to the request that expires first */ 154 first = (struct sentreq *)(srlist->exp.next->o); 155 t = fd_msg_anscb_gettimeout( first->req ); 156 ASSERT(t); 157 158 /* Get the current time */ 159 CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), goto error ); 160 161 /* If first request is not expired, we just wait until it happens */ 162 if ( TS_IS_INFERIOR( &now, t ) ) { 163 164 CHECK_POSIX_DO2( pthread_cond_timedwait( &srlist->cnd, &srlist->mtx, t ), 165 ETIMEDOUT, /* ETIMEDOUT is a normal return value, continue */, 166 /* on other error, */ goto error ); 167 168 /* on wakeup, loop */ 169 continue; 170 } 171 172 /* Now, the first request in the list is expired; remove it and call the anscb for it in a new thread */ 173 fd_list_unlink(&first->chain); 174 fd_list_unlink(&first->expire); 175 expired_req = first->req; 176 free(first); 177 178 CHECK_POSIX_DO( pthread_create( &th, &detached, call_anscb_expire, expired_req ), goto error ); 179 180 /* loop */ 181 } while (1); 182 error: 183 pthread_cleanup_pop( 1 ); 184 return NULL; 185 } 186 78 187 79 188 /* Store a new sent request */ … … 83 192 struct fd_list * next; 84 193 int match; 194 struct timespec * ts; 85 195 86 196 TRACE_ENTRY("%p %p %p %x", srlist, req, hbhloc, hbh_restore); … … 92 202 sr->req = *req; 93 203 sr->prevhbh = hbh_restore; 204 fd_list_init(&sr->expire, sr); 205 CHECK_SYS( clock_gettime(CLOCK_REALTIME, &sr->added_on) ); 94 206 95 207 /* Search the place in the list */ … … 107 219 fd_list_insert_before(next, &sr->chain); 108 220 srl_dump("Saved new request, ", &srlist->srs); 221 222 /* In case of request with a timeout, also store in the timeout list */ 223 ts = fd_msg_anscb_gettimeout( sr->req ); 224 if (ts) { 225 struct fd_list * li; 226 struct timespec * t; 227 228 /* browse srlist->exp from the end */ 229 for (li = srlist->exp.prev; li != &srlist->exp; li = li->prev) { 230 struct sentreq * s = (struct sentreq *)(li->o); 231 t = fd_msg_anscb_gettimeout( s->req ); 232 ASSERT( t ); /* sanity */ 233 if (TS_IS_INFERIOR(t, ts)) 234 break; 235 } 236 237 fd_list_insert_after(li, &sr->expire); 238 239 /* if the thread does not exist yet, create it */ 240 if (srlist->thr == (pthread_t)NULL) { 241 CHECK_POSIX_DO( pthread_create(&srlist->thr, NULL, sr_expiry_th, srlist), /* continue anyway */); 242 } else { 243 /* or, if added in first position, signal the condvar to update the sleep time of the thread */ 244 if (li == &srlist->exp) { 245 CHECK_POSIX_DO( pthread_cond_signal(&srlist->cnd), /* continue anyway */); 246 } 247 } 248 } 249 109 250 CHECK_POSIX( pthread_mutex_unlock(&srlist->mtx) ); 110 251 return 0; … … 132 273 /* Unlink */ 133 274 fd_list_unlink(&sr->chain); 275 fd_list_unlink(&sr->expire); 134 276 *req = sr->req; 135 277 free(sr); 136 278 } 137 279 CHECK_POSIX( pthread_mutex_unlock(&srlist->mtx) ); 280 281 /* do not stop the expire thread here, it might cause creating/destroying it very often otherwise */ 138 282 139 283 /* Done */ … … 148 292 struct sentreq * sr = (struct sentreq *)(srlist->srs.next); 149 293 fd_list_unlink(&sr->chain); 294 fd_list_unlink(&sr->expire); 150 295 if (fd_msg_is_routable(sr->req)) { 151 296 struct msg_hdr * hdr = NULL; … … 165 310 free(sr); 166 311 } 312 /* The list of expiring requests must be empty now */ 313 ASSERT( FD_IS_LIST_EMPTY(&srlist->exp) ); 314 167 315 CHECK_POSIX_DO( pthread_mutex_unlock(&srlist->mtx), /* continue anyway */ ); 168 } 169 316 317 /* Terminate the expiry thread (must be done when the lock can be taken) */ 318 CHECK_FCT_DO( fd_thr_term(&srlist->thr), /* ignore error */ ); 319 } 320 -
freeDiameter/peers.c
r454 r649 79 79 80 80 fd_list_init(&p->p_sr.srs, p); 81 fd_list_init(&p->p_sr.exp, p); 81 82 CHECK_POSIX( pthread_mutex_init(&p->p_sr.mtx, NULL) ); 83 CHECK_POSIX( pthread_cond_init(&p->p_sr.cnd, NULL) ); 82 84 83 85 fd_list_init(&p->p_connparams, p); … … 251 253 CHECK_FCT_DO( fd_fifo_del(&p->p_tosend), /* continue */ ); 252 254 CHECK_POSIX_DO( pthread_mutex_destroy(&p->p_sr.mtx), /* continue */); 255 CHECK_POSIX_DO( pthread_cond_destroy(&p->p_sr.cnd), /* continue */); 253 256 254 257 /* If the callback is still around... */ -
freeDiameter/routing_dispatch.c
r563 r649 799 799 CHECK_FCT( fd_msg_hdr(*pmsg, &hdr) ); 800 800 is_req = hdr->msg_flags & CMD_FLAG_REQUEST; 801 801 802 802 /* For answers, the routing is very easy */ 803 803 if ( ! is_req ) { -
include/freeDiameter/libfreeDiameter.h
r648 r649 2147 2147 int fd_msg_anscb_associate( struct msg * msg, void ( *anscb)(void *, struct msg **), void * data, const struct timespec *timeout ); 2148 2148 int fd_msg_anscb_get ( struct msg * msg, void (**anscb)(void *, struct msg **), void ** data ); 2149 struct timespec *fd_msg_anscb_gettimeout( struct msg * msg ); /* returns NULL or a valid non-0 timespec */ 2149 2150 2150 2151 /* -
libfreeDiameter/messages.c
r648 r649 919 919 if (timeout) { 920 920 memcpy(&msg->msg_cb.timeout, timeout, sizeof(struct timespec)); 921 } else {922 memset(&msg->msg_cb.timeout, 0, sizeof(struct timespec)); /* clear the area */923 921 } 924 922 … … 938 936 939 937 return 0; 940 } 938 } 939 940 struct timespec *fd_msg_anscb_gettimeout( struct msg * msg ) 941 { 942 TRACE_ENTRY("%p", msg); 943 944 /* Check the parameters */ 945 CHECK_PARAMS_DO( CHECK_MSG(msg), return NULL ); 946 947 if (!msg->msg_cb.timeout.tv_sec) { 948 return NULL; 949 } 950 951 return &msg->msg_cb.timeout; 952 } 941 953 942 954 /* Associate routing lists */
Note: See TracChangeset
for help on using the changeset viewer.