Mercurial > hg > freeDiameter
diff freeDiameter/p_sr.c @ 649:5e5d8152c229
Implemented fd_msg_send_timeout to close #10. Not tested yet.
author | Sebastien Decugis <sdecugis@nict.go.jp> |
---|---|
date | Wed, 05 Jan 2011 17:13:34 +0900 |
parents | 5df55136361b |
children | 7e9a5e9aad64 |
line wrap: on
line diff
--- a/freeDiameter/p_sr.c Tue Jan 04 16:21:26 2011 +0900 +++ b/freeDiameter/p_sr.c Wed Jan 05 17:13:34 2011 +0900 @@ -44,9 +44,11 @@ struct fd_list chain; /* the "o" field points directly to the hop-by-hop of the request (uint32_t *) */ struct msg *req; /* A request that was sent and not yet answered. */ uint32_t prevhbh;/* The value to set in the hbh header when the message is retrieved */ + struct fd_list expire; /* the list of expiring requests */ + struct timespec added_on; /* the time the request was added */ }; -/* Find an element in the list, or the following one */ +/* Find an element in the hbh list, or the following one */ static struct fd_list * find_or_next(struct fd_list * srlist, uint32_t hbh, int * match) { struct fd_list * li; @@ -65,23 +67,131 @@ static void srl_dump(const char * text, struct fd_list * srlist) { struct fd_list * li; + struct timespec now; if (!TRACE_BOOL(SR_DEBUG_LVL)) return; + CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), ); fd_log_debug("%sSentReq list @%p:\n", text, srlist); for (li = srlist->next; li != srlist; li = li->next) { struct sentreq * sr = (struct sentreq *)li; uint32_t * nexthbh = li->o; - fd_log_debug(" - Next req (%x):\n", *nexthbh); + fd_log_debug(" - Next req (%x): [since %ld.%06ld sec]\n", *nexthbh, + (now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_sec - sr->added_on.tv_sec) : (now.tv_sec - sr->added_on.tv_sec - 1), + (now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_nsec - sr->added_on.tv_nsec) / 1000 : (now.tv_nsec - sr->added_on.tv_nsec + 1000000000) / 1000); fd_msg_dump_one(SR_DEBUG_LVL + 1, sr->req); } } +/* (detached) thread that calls the anscb on expired messages. + We do it in a separate thread to avoid blocking the reception of new messages during this time */ +static void * call_anscb_expire(void * arg) { + struct msg * expired_req = arg; + + void (*anscb)(void *, struct msg **); + void * data; + + TRACE_ENTRY("%p", arg); + CHECK_PARAMS_DO( arg, return NULL ); + + /* Set the thread name */ + fd_log_threadname ( "Expired req cb." ); + + /* Log */ + TRACE_DEBUG(INFO, "The expiration timer for a request has been reached, abording this attempt now & calling cb..."); + + /* Retrieve callback in the message */ + CHECK_FCT_DO( fd_msg_anscb_get( expired_req, &anscb, &data ), return NULL); + ASSERT(anscb); + + /* Call it */ + (*anscb)(data, &expired_req); + + /* If the callback did not dispose of the message, do it now */ + if (expired_req) { + CHECK_FCT_DO( fd_msg_free(expired_req), /* ignore */ ); + } + + /* Finish */ + return NULL; +} + +/* thread that handles messages expiring. The thread is started / stopped only when needed */ +static void * sr_expiry_th(void * arg) { + struct sr_list * srlist = arg; + struct msg * expired_req; + pthread_attr_t detached; + + TRACE_ENTRY("%p", arg); + CHECK_PARAMS_DO( arg, return NULL ); + + /* Set the thread name */ + { + char buf[48]; + sprintf(buf, "ReqExp/%.*s", (int)sizeof(buf) - 8, ((struct fd_peer *)(srlist->exp.o))->p_hdr.info.pi_diamid); + fd_log_threadname ( buf ); + } + + CHECK_POSIX_DO( pthread_attr_init(&detached), return NULL ); + CHECK_POSIX_DO( pthread_attr_setdetachstate(&detached, PTHREAD_CREATE_DETACHED), return NULL ); + + CHECK_POSIX_DO( pthread_mutex_lock(&srlist->mtx), return NULL ); + pthread_cleanup_push( fd_cleanup_mutex, &srlist->mtx ); + + do { + struct timespec now, *t; + struct sentreq * first; + pthread_t th; + + /* Check if there are expiring requests available */ + if (FD_IS_LIST_EMPTY(&srlist->exp)) { + /* Just wait for a change or cancelation */ + CHECK_POSIX_DO( pthread_cond_wait( &srlist->cnd, &srlist->mtx ), goto error ); + /* Restart the loop on wakeup */ + continue; + } + + /* Get the pointer to the request that expires first */ + first = (struct sentreq *)(srlist->exp.next->o); + t = fd_msg_anscb_gettimeout( first->req ); + ASSERT(t); + + /* Get the current time */ + CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), goto error ); + + /* If first request is not expired, we just wait until it happens */ + if ( TS_IS_INFERIOR( &now, t ) ) { + + CHECK_POSIX_DO2( pthread_cond_timedwait( &srlist->cnd, &srlist->mtx, t ), + ETIMEDOUT, /* ETIMEDOUT is a normal return value, continue */, + /* on other error, */ goto error ); + + /* on wakeup, loop */ + continue; + } + + /* Now, the first request in the list is expired; remove it and call the anscb for it in a new thread */ + fd_list_unlink(&first->chain); + fd_list_unlink(&first->expire); + expired_req = first->req; + free(first); + + CHECK_POSIX_DO( pthread_create( &th, &detached, call_anscb_expire, expired_req ), goto error ); + + /* loop */ + } while (1); +error: + pthread_cleanup_pop( 1 ); + return NULL; +} + + /* Store a new sent request */ int fd_p_sr_store(struct sr_list * srlist, struct msg **req, uint32_t *hbhloc, uint32_t hbh_restore) { struct sentreq * sr; struct fd_list * next; int match; + struct timespec * ts; TRACE_ENTRY("%p %p %p %x", srlist, req, hbhloc, hbh_restore); CHECK_PARAMS(srlist && req && *req && hbhloc); @@ -91,6 +201,8 @@ fd_list_init(&sr->chain, hbhloc); sr->req = *req; sr->prevhbh = hbh_restore; + fd_list_init(&sr->expire, sr); + CHECK_SYS( clock_gettime(CLOCK_REALTIME, &sr->added_on) ); /* Search the place in the list */ CHECK_POSIX( pthread_mutex_lock(&srlist->mtx) ); @@ -106,6 +218,35 @@ *req = NULL; fd_list_insert_before(next, &sr->chain); srl_dump("Saved new request, ", &srlist->srs); + + /* In case of request with a timeout, also store in the timeout list */ + ts = fd_msg_anscb_gettimeout( sr->req ); + if (ts) { + struct fd_list * li; + struct timespec * t; + + /* browse srlist->exp from the end */ + for (li = srlist->exp.prev; li != &srlist->exp; li = li->prev) { + struct sentreq * s = (struct sentreq *)(li->o); + t = fd_msg_anscb_gettimeout( s->req ); + ASSERT( t ); /* sanity */ + if (TS_IS_INFERIOR(t, ts)) + break; + } + + fd_list_insert_after(li, &sr->expire); + + /* if the thread does not exist yet, create it */ + if (srlist->thr == (pthread_t)NULL) { + CHECK_POSIX_DO( pthread_create(&srlist->thr, NULL, sr_expiry_th, srlist), /* continue anyway */); + } else { + /* or, if added in first position, signal the condvar to update the sleep time of the thread */ + if (li == &srlist->exp) { + CHECK_POSIX_DO( pthread_cond_signal(&srlist->cnd), /* continue anyway */); + } + } + } + CHECK_POSIX( pthread_mutex_unlock(&srlist->mtx) ); return 0; } @@ -131,10 +272,13 @@ *((uint32_t *)sr->chain.o) = sr->prevhbh; /* Unlink */ fd_list_unlink(&sr->chain); + fd_list_unlink(&sr->expire); *req = sr->req; free(sr); } CHECK_POSIX( pthread_mutex_unlock(&srlist->mtx) ); + + /* do not stop the expire thread here, it might cause creating/destroying it very often otherwise */ /* Done */ return 0; @@ -147,6 +291,7 @@ while (!FD_IS_LIST_EMPTY(&srlist->srs)) { struct sentreq * sr = (struct sentreq *)(srlist->srs.next); fd_list_unlink(&sr->chain); + fd_list_unlink(&sr->expire); if (fd_msg_is_routable(sr->req)) { struct msg_hdr * hdr = NULL; @@ -164,6 +309,12 @@ } free(sr); } + /* The list of expiring requests must be empty now */ + ASSERT( FD_IS_LIST_EMPTY(&srlist->exp) ); + CHECK_POSIX_DO( pthread_mutex_unlock(&srlist->mtx), /* continue anyway */ ); + + /* Terminate the expiry thread (must be done when the lock can be taken) */ + CHECK_FCT_DO( fd_thr_term(&srlist->thr), /* ignore error */ ); }