Changeset 649:5e5d8152c229 in freeDiameter for freeDiameter/p_sr.c
- Timestamp:
- Jan 5, 2011, 5:13:34 PM (13 years ago)
- Branch:
- default
- Phase:
- public
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
freeDiameter/p_sr.c
r258 r649 45 45 struct msg *req; /* A request that was sent and not yet answered. */ 46 46 uint32_t prevhbh;/* The value to set in the hbh header when the message is retrieved */ 47 struct fd_list expire; /* the list of expiring requests */ 48 struct timespec added_on; /* the time the request was added */ 47 49 }; 48 50 49 /* Find an element in the list, or the following one */51 /* Find an element in the hbh list, or the following one */ 50 52 static struct fd_list * find_or_next(struct fd_list * srlist, uint32_t hbh, int * match) 51 53 { … … 66 68 { 67 69 struct fd_list * li; 70 struct timespec now; 68 71 if (!TRACE_BOOL(SR_DEBUG_LVL)) 69 72 return; 73 CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), ); 70 74 fd_log_debug("%sSentReq list @%p:\n", text, srlist); 71 75 for (li = srlist->next; li != srlist; li = li->next) { 72 76 struct sentreq * sr = (struct sentreq *)li; 73 77 uint32_t * nexthbh = li->o; 74 fd_log_debug(" - Next req (%x):\n", *nexthbh); 78 fd_log_debug(" - Next req (%x): [since %ld.%06ld sec]\n", *nexthbh, 79 (now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_sec - sr->added_on.tv_sec) : (now.tv_sec - sr->added_on.tv_sec - 1), 80 (now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_nsec - sr->added_on.tv_nsec) / 1000 : (now.tv_nsec - sr->added_on.tv_nsec + 1000000000) / 1000); 75 81 fd_msg_dump_one(SR_DEBUG_LVL + 1, sr->req); 76 82 } 77 83 } 84 85 /* (detached) thread that calls the anscb on expired messages. 86 We do it in a separate thread to avoid blocking the reception of new messages during this time */ 87 static void * call_anscb_expire(void * arg) { 88 struct msg * expired_req = arg; 89 90 void (*anscb)(void *, struct msg **); 91 void * data; 92 93 TRACE_ENTRY("%p", arg); 94 CHECK_PARAMS_DO( arg, return NULL ); 95 96 /* Set the thread name */ 97 fd_log_threadname ( "Expired req cb." ); 98 99 /* Log */ 100 TRACE_DEBUG(INFO, "The expiration timer for a request has been reached, abording this attempt now & calling cb..."); 101 102 /* Retrieve callback in the message */ 103 CHECK_FCT_DO( fd_msg_anscb_get( expired_req, &anscb, &data ), return NULL); 104 ASSERT(anscb); 105 106 /* Call it */ 107 (*anscb)(data, &expired_req); 108 109 /* If the callback did not dispose of the message, do it now */ 110 if (expired_req) { 111 CHECK_FCT_DO( fd_msg_free(expired_req), /* ignore */ ); 112 } 113 114 /* Finish */ 115 return NULL; 116 } 117 118 /* thread that handles messages expiring. The thread is started / stopped only when needed */ 119 static void * sr_expiry_th(void * arg) { 120 struct sr_list * srlist = arg; 121 struct msg * expired_req; 122 pthread_attr_t detached; 123 124 TRACE_ENTRY("%p", arg); 125 CHECK_PARAMS_DO( arg, return NULL ); 126 127 /* Set the thread name */ 128 { 129 char buf[48]; 130 sprintf(buf, "ReqExp/%.*s", (int)sizeof(buf) - 8, ((struct fd_peer *)(srlist->exp.o))->p_hdr.info.pi_diamid); 131 fd_log_threadname ( buf ); 132 } 133 134 CHECK_POSIX_DO( pthread_attr_init(&detached), return NULL ); 135 CHECK_POSIX_DO( pthread_attr_setdetachstate(&detached, PTHREAD_CREATE_DETACHED), return NULL ); 136 137 CHECK_POSIX_DO( pthread_mutex_lock(&srlist->mtx), return NULL ); 138 pthread_cleanup_push( fd_cleanup_mutex, &srlist->mtx ); 139 140 do { 141 struct timespec now, *t; 142 struct sentreq * first; 143 pthread_t th; 144 145 /* Check if there are expiring requests available */ 146 if (FD_IS_LIST_EMPTY(&srlist->exp)) { 147 /* Just wait for a change or cancelation */ 148 CHECK_POSIX_DO( pthread_cond_wait( &srlist->cnd, &srlist->mtx ), goto error ); 149 /* Restart the loop on wakeup */ 150 continue; 151 } 152 153 /* Get the pointer to the request that expires first */ 154 first = (struct sentreq *)(srlist->exp.next->o); 155 t = fd_msg_anscb_gettimeout( first->req ); 156 ASSERT(t); 157 158 /* Get the current time */ 159 CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), goto error ); 160 161 /* If first request is not expired, we just wait until it happens */ 162 if ( TS_IS_INFERIOR( &now, t ) ) { 163 164 CHECK_POSIX_DO2( pthread_cond_timedwait( &srlist->cnd, &srlist->mtx, t ), 165 ETIMEDOUT, /* ETIMEDOUT is a normal return value, continue */, 166 /* on other error, */ goto error ); 167 168 /* on wakeup, loop */ 169 continue; 170 } 171 172 /* Now, the first request in the list is expired; remove it and call the anscb for it in a new thread */ 173 fd_list_unlink(&first->chain); 174 fd_list_unlink(&first->expire); 175 expired_req = first->req; 176 free(first); 177 178 CHECK_POSIX_DO( pthread_create( &th, &detached, call_anscb_expire, expired_req ), goto error ); 179 180 /* loop */ 181 } while (1); 182 error: 183 pthread_cleanup_pop( 1 ); 184 return NULL; 185 } 186 78 187 79 188 /* Store a new sent request */ … … 83 192 struct fd_list * next; 84 193 int match; 194 struct timespec * ts; 85 195 86 196 TRACE_ENTRY("%p %p %p %x", srlist, req, hbhloc, hbh_restore); … … 92 202 sr->req = *req; 93 203 sr->prevhbh = hbh_restore; 204 fd_list_init(&sr->expire, sr); 205 CHECK_SYS( clock_gettime(CLOCK_REALTIME, &sr->added_on) ); 94 206 95 207 /* Search the place in the list */ … … 107 219 fd_list_insert_before(next, &sr->chain); 108 220 srl_dump("Saved new request, ", &srlist->srs); 221 222 /* In case of request with a timeout, also store in the timeout list */ 223 ts = fd_msg_anscb_gettimeout( sr->req ); 224 if (ts) { 225 struct fd_list * li; 226 struct timespec * t; 227 228 /* browse srlist->exp from the end */ 229 for (li = srlist->exp.prev; li != &srlist->exp; li = li->prev) { 230 struct sentreq * s = (struct sentreq *)(li->o); 231 t = fd_msg_anscb_gettimeout( s->req ); 232 ASSERT( t ); /* sanity */ 233 if (TS_IS_INFERIOR(t, ts)) 234 break; 235 } 236 237 fd_list_insert_after(li, &sr->expire); 238 239 /* if the thread does not exist yet, create it */ 240 if (srlist->thr == (pthread_t)NULL) { 241 CHECK_POSIX_DO( pthread_create(&srlist->thr, NULL, sr_expiry_th, srlist), /* continue anyway */); 242 } else { 243 /* or, if added in first position, signal the condvar to update the sleep time of the thread */ 244 if (li == &srlist->exp) { 245 CHECK_POSIX_DO( pthread_cond_signal(&srlist->cnd), /* continue anyway */); 246 } 247 } 248 } 249 109 250 CHECK_POSIX( pthread_mutex_unlock(&srlist->mtx) ); 110 251 return 0; … … 132 273 /* Unlink */ 133 274 fd_list_unlink(&sr->chain); 275 fd_list_unlink(&sr->expire); 134 276 *req = sr->req; 135 277 free(sr); 136 278 } 137 279 CHECK_POSIX( pthread_mutex_unlock(&srlist->mtx) ); 280 281 /* do not stop the expire thread here, it might cause creating/destroying it very often otherwise */ 138 282 139 283 /* Done */ … … 148 292 struct sentreq * sr = (struct sentreq *)(srlist->srs.next); 149 293 fd_list_unlink(&sr->chain); 294 fd_list_unlink(&sr->expire); 150 295 if (fd_msg_is_routable(sr->req)) { 151 296 struct msg_hdr * hdr = NULL; … … 165 310 free(sr); 166 311 } 312 /* The list of expiring requests must be empty now */ 313 ASSERT( FD_IS_LIST_EMPTY(&srlist->exp) ); 314 167 315 CHECK_POSIX_DO( pthread_mutex_unlock(&srlist->mtx), /* continue anyway */ ); 168 } 169 316 317 /* Terminate the expiry thread (must be done when the lock can be taken) */ 318 CHECK_FCT_DO( fd_thr_term(&srlist->thr), /* ignore error */ ); 319 } 320
Note: See TracChangeset
for help on using the changeset viewer.