comparison libfdcore/p_sr.c @ 1232:9e92fa478c23

Performance improvements for fd_p_sr_store based on Guangming proposal http://lists.freediameter.net/pipermail/dev/2013-July/000225.html
author Sebastien Decugis <sdecugis@freediameter.net>
date Sun, 11 Aug 2013 11:52:36 +0200
parents 043b894b0511
children 8f9684264fe0
comparison
equal deleted inserted replaced
1231:d9c48b0e8d97 1232:9e92fa478c23
39 struct sentreq { 39 struct sentreq {
40 struct fd_list chain; /* the "o" field points directly to the (new) hop-by-hop of the request (uint32_t *) */ 40 struct fd_list chain; /* the "o" field points directly to the (new) hop-by-hop of the request (uint32_t *) */
41 struct msg *req; /* A request that was sent and not yet answered. */ 41 struct msg *req; /* A request that was sent and not yet answered. */
42 uint32_t prevhbh;/* The value to set back in the hbh header when the message is retrieved */ 42 uint32_t prevhbh;/* The value to set back in the hbh header when the message is retrieved */
43 struct fd_list expire; /* the list of expiring requests */ 43 struct fd_list expire; /* the list of expiring requests */
44 struct timespec timeout; /* Cache the expire date of the request so that the timeout thread does not need to get it each time. */
44 struct timespec added_on; /* the time the request was added */ 45 struct timespec added_on; /* the time the request was added */
45 }; 46 };
46 47
47 /* Find an element in the hbh list, or the following one */ 48 /* Find an element in the hbh list, or the following one */
48 static struct fd_list * find_or_next(struct fd_list * srlist, uint32_t hbh, int * match) 49 static struct fd_list * find_or_next(struct fd_list * srlist, uint32_t hbh, int * match)
58 break; 59 break;
59 } 60 }
60 return li; 61 return li;
61 } 62 }
62 63
64 /* Similar but start from the end, since we add requests in growing hbh order usually */
65 static struct fd_list * find_or_prev(struct fd_list * srlist, uint32_t hbh, int * match)
66 {
67 struct fd_list * li;
68 *match = 0;
69 for (li = srlist->prev; li != srlist; li = li->prev) {
70 uint32_t * prevhbh = li->o;
71 if (*prevhbh > hbh)
72 continue;
73 if (*prevhbh == hbh)
74 *match = 1;
75 break;
76 }
77 return li;
78 }
79
63 static void srl_dump(const char * text, struct fd_list * srlist) 80 static void srl_dump(const char * text, struct fd_list * srlist)
64 { 81 {
65 struct fd_list * li; 82 struct fd_list * li;
66 struct timespec now; 83 struct timespec now;
67 84
77 (long)((now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_sec - sr->added_on.tv_sec) : (now.tv_sec - sr->added_on.tv_sec - 1)), 94 (long)((now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_sec - sr->added_on.tv_sec) : (now.tv_sec - sr->added_on.tv_sec - 1)),
78 (long)((now.tv_nsec >= sr->added_on.tv_nsec) ? ((now.tv_nsec - sr->added_on.tv_nsec) / 1000) : ((now.tv_nsec - sr->added_on.tv_nsec + 1000000000) / 1000))); 95 (long)((now.tv_nsec >= sr->added_on.tv_nsec) ? ((now.tv_nsec - sr->added_on.tv_nsec) / 1000) : ((now.tv_nsec - sr->added_on.tv_nsec + 1000000000) / 1000)));
79 } 96 }
80 } 97 }
81 98
82 struct expire_data {
83 struct msg * request;
84 struct fd_peer * sentto;
85 };
86
87 /* (detached) thread that calls the anscb on expired messages.
88 We do it in a separate thread to avoid blocking the reception of new messages during this time */
89 static void * call_expirecb(void * arg) {
90 struct expire_data * ed = arg;
91
92 void (*expirecb)(void *, DiamId_t, size_t, struct msg **);
93 void * data;
94
95 TRACE_ENTRY("%p", arg);
96 CHECK_PARAMS_DO( arg, return NULL );
97
98 /* Set the thread name */
99 fd_log_threadname ( "Expired req cb." );
100
101 /* Log */
102 TRACE_DEBUG(INFO, "The expiration timer for a request has been reached, aborting this attempt now & calling cb...");
103
104 /* Retrieve callback in the message */
105 CHECK_FCT_DO( fd_msg_anscb_get( ed->request, NULL, &expirecb, &data ), return NULL);
106 ASSERT(expirecb);
107
108 /* Clean up this data from the message */
109 CHECK_FCT_DO( fd_msg_anscb_associate( ed->request, NULL, NULL, NULL, NULL ), return NULL);
110
111 /* Call it */
112 (*expirecb)(data, ed->sentto->p_hdr.info.pi_diamid, ed->sentto->p_hdr.info.pi_diamidlen, &ed->request);
113
114 /* If the callback did not dispose of the message, do it now */
115 if (ed->request) {
116 fd_hook_call(HOOK_MESSAGE_DROPPED, ed->request, NULL, "Expiration period completed without an answer, and the expiry callback did not dispose of the message.", fd_msg_pmdl_get(ed->request));
117 CHECK_FCT_DO( fd_msg_free(ed->request), /* ignore */ );
118 }
119
120 free(ed);
121
122 /* Finish */
123 return NULL;
124 }
125
126 /* thread that handles messages expiring. The thread is started only when needed */ 99 /* thread that handles messages expiring. The thread is started only when needed */
127 static void * sr_expiry_th(void * arg) { 100 static void * sr_expiry_th(void * arg) {
128 struct sr_list * srlist = arg; 101 struct sr_list * srlist = arg;
129 pthread_attr_t detached;
130 struct expire_data * ed;
131 102
132 TRACE_ENTRY("%p", arg); 103 TRACE_ENTRY("%p", arg);
133 CHECK_PARAMS_DO( arg, return NULL ); 104 CHECK_PARAMS_DO( arg, return NULL );
134 105
135 /* Set the thread name */ 106 /* Set the thread name */
137 char buf[48]; 108 char buf[48];
138 snprintf(buf, sizeof(buf), "ReqExp/%s", ((struct fd_peer *)(srlist->exp.o))->p_hdr.info.pi_diamid); 109 snprintf(buf, sizeof(buf), "ReqExp/%s", ((struct fd_peer *)(srlist->exp.o))->p_hdr.info.pi_diamid);
139 fd_log_threadname ( buf ); 110 fd_log_threadname ( buf );
140 } 111 }
141 112
142 CHECK_POSIX_DO( pthread_attr_init(&detached), return NULL );
143 CHECK_POSIX_DO( pthread_attr_setdetachstate(&detached, PTHREAD_CREATE_DETACHED), return NULL );
144
145 CHECK_POSIX_DO( pthread_mutex_lock(&srlist->mtx), return NULL );
146 pthread_cleanup_push( fd_cleanup_mutex, &srlist->mtx );
147
148 do { 113 do {
149 struct timespec now, *t; 114 struct timespec now;
150 struct sentreq * first; 115 struct sentreq * first;
151 pthread_t th; 116 struct msg * request;
152 117 struct fd_peer * sentto;
118 void (*expirecb)(void *, DiamId_t, size_t, struct msg **);
119 void (*anscb)(void *, struct msg **);
120 void * data;
121 int no_error;
122
123 CHECK_POSIX_DO( pthread_mutex_lock(&srlist->mtx), return NULL );
124 pthread_cleanup_push( fd_cleanup_mutex, &srlist->mtx );
125
126 loop:
127 no_error = 0;
128
153 /* Check if there are expiring requests available */ 129 /* Check if there are expiring requests available */
154 if (FD_IS_LIST_EMPTY(&srlist->exp)) { 130 if (FD_IS_LIST_EMPTY(&srlist->exp)) {
155 /* Just wait for a change or cancelation */ 131 /* Just wait for a change or cancelation */
156 CHECK_POSIX_DO( pthread_cond_wait( &srlist->cnd, &srlist->mtx ), goto error ); 132 CHECK_POSIX_DO( pthread_cond_wait( &srlist->cnd, &srlist->mtx ), goto unlock );
157 /* Restart the loop on wakeup */ 133 /* Restart the loop on wakeup */
158 continue; 134 goto loop;
159 } 135 }
160 136
161 /* Get the pointer to the request that expires first */ 137 /* Get the pointer to the request that expires first */
162 first = (struct sentreq *)(srlist->exp.next->o); 138 first = (struct sentreq *)(srlist->exp.next->o);
163 t = fd_msg_anscb_gettimeout( first->req );
164 ASSERT(t);
165 139
166 /* Get the current time */ 140 /* Get the current time */
167 CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), goto error ); 141 CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), goto unlock );
168 142
169 /* If first request is not expired, we just wait until it happens */ 143 /* If first request is not expired, we just wait until it happens */
170 if ( TS_IS_INFERIOR( &now, t ) ) { 144 if ( TS_IS_INFERIOR( &now, &first->timeout ) ) {
171 145
172 CHECK_POSIX_DO2( pthread_cond_timedwait( &srlist->cnd, &srlist->mtx, t ), 146 CHECK_POSIX_DO2( pthread_cond_timedwait( &srlist->cnd, &srlist->mtx, &first->timeout ),
173 ETIMEDOUT, /* ETIMEDOUT is a normal return value, continue */, 147 ETIMEDOUT, /* ETIMEDOUT is a normal return value, continue */,
174 /* on other error, */ goto error ); 148 /* on other error, */ goto unlock );
175 149
176 /* on wakeup, loop */ 150 /* on wakeup, loop */
177 continue; 151 goto loop;
178 } 152 }
179 153
180 /* Now, the first request in the list is expired; remove it and call the anscb for it in a new thread */ 154 /* Now, the first request in the list is expired; remove it and call the expirecb for it */
181 CHECK_MALLOC_DO( ed = malloc(sizeof(struct expire_data)), goto error ); 155 request = first->req;
182 ed->sentto = first->chain.head->o; 156 sentto = first->chain.head->o;
183 ed->request = first->req; 157
184 *((uint32_t *)first->chain.o) = first->prevhbh; /* Restore the hbhid */ 158 TRACE_DEBUG(FULL, "Request %x was not answered by %s within the timer delay", *((uint32_t *)first->chain.o), sentto->p_hdr.info.pi_diamid);
159
160 /* Restore the hbhid */
161 *((uint32_t *)first->chain.o) = first->prevhbh;
162
163 /* Free the sentreq information */
185 fd_list_unlink(&first->chain); 164 fd_list_unlink(&first->chain);
186 fd_list_unlink(&first->expire); 165 fd_list_unlink(&first->expire);
187 free(first); 166 free(first);
188 167
189 CHECK_POSIX_DO( pthread_create( &th, &detached, call_expirecb, ed ), goto error ); 168 no_error = 1;
190 169 unlock:
191 /* loop */ 170 ; /* pthread_cleanup_pop sometimes expands as "} ..." and the label before this cause some compilers to complain... */
171 pthread_cleanup_pop( 1 ); /* unlock the mutex */
172 if (!no_error)
173 break;
174
175
176 /* Retrieve callback in the message */
177 CHECK_FCT_DO( fd_msg_anscb_get( request, &anscb, &expirecb, &data ), break);
178 ASSERT(expirecb);
179
180 /* Clean up this expirecb from the message */
181 CHECK_FCT_DO( fd_msg_anscb_associate( request, anscb, data, NULL, NULL ), break);
182
183 /* Call it */
184 (*expirecb)(data, sentto->p_hdr.info.pi_diamid, sentto->p_hdr.info.pi_diamidlen, &request);
185
186 /* If the callback did not dispose of the message, do it now */
187 if (request) {
188 fd_hook_call(HOOK_MESSAGE_DROPPED, request, NULL, "Expiration period completed without an answer, and the expiry callback did not dispose of the message.", fd_msg_pmdl_get(request));
189 CHECK_FCT_DO( fd_msg_free(request), /* ignore */ );
190 }
191
192 } while (1); 192 } while (1);
193 error: 193
194 ; /* pthread_cleanup_pop sometimes expands as "} ..." and the label before this cause some compilers to complain... */
195 pthread_cleanup_pop( 1 );
196 ASSERT(0); /* we have encountered a problem, maybe time to signal the framework to terminate? */ 194 ASSERT(0); /* we have encountered a problem, maybe time to signal the framework to terminate? */
197 return NULL; 195 return NULL;
198 } 196 }
199 197
200 198
201 /* Store a new sent request */ 199 /* Store a new sent request */
202 int fd_p_sr_store(struct sr_list * srlist, struct msg **req, uint32_t *hbhloc, uint32_t hbh_restore) 200 int fd_p_sr_store(struct sr_list * srlist, struct msg **req, uint32_t *hbhloc, uint32_t hbh_restore)
203 { 201 {
204 struct sentreq * sr; 202 struct sentreq * sr;
205 struct fd_list * next; 203 struct fd_list * prev;
206 int match; 204 int match;
207 struct timespec * ts; 205 struct timespec * ts;
208 206
209 TRACE_ENTRY("%p %p %p %x", srlist, req, hbhloc, hbh_restore); 207 TRACE_ENTRY("%p %p %p %x", srlist, req, hbhloc, hbh_restore);
210 CHECK_PARAMS(srlist && req && *req && hbhloc); 208 CHECK_PARAMS(srlist && req && *req && hbhloc);
217 fd_list_init(&sr->expire, sr); 215 fd_list_init(&sr->expire, sr);
218 CHECK_SYS( clock_gettime(CLOCK_REALTIME, &sr->added_on) ); 216 CHECK_SYS( clock_gettime(CLOCK_REALTIME, &sr->added_on) );
219 217
220 /* Search the place in the list */ 218 /* Search the place in the list */
221 CHECK_POSIX( pthread_mutex_lock(&srlist->mtx) ); 219 CHECK_POSIX( pthread_mutex_lock(&srlist->mtx) );
222 next = find_or_next(&srlist->srs, *hbhloc, &match); 220 prev = find_or_prev(&srlist->srs, *hbhloc, &match);
223 if (match) { 221 if (match) {
224 TRACE_DEBUG(INFO, "A request with the same hop-by-hop Id (0x%x) was already sent: error", *hbhloc); 222 TRACE_DEBUG(INFO, "A request with the same hop-by-hop Id (0x%x) was already sent: error", *hbhloc);
225 free(sr); 223 free(sr);
226 srl_dump("Current list of SR: ", &srlist->srs); 224 srl_dump("Current list of SR: ", &srlist->srs);
227 CHECK_POSIX_DO( pthread_mutex_unlock(&srlist->mtx), /* ignore */ ); 225 CHECK_POSIX_DO( pthread_mutex_unlock(&srlist->mtx), /* ignore */ );
228 return EINVAL; 226 return EINVAL;
229 } 227 }
230 228
231 /* Save in the list */ 229 /* Save in the list */
232 *req = NULL; 230 *req = NULL;
233 fd_list_insert_before(next, &sr->chain); 231 fd_list_insert_after(prev, &sr->chain);
234 srlist->cnt++; 232 srlist->cnt++;
235 233
236 /* In case of request with a timeout, also store in the timeout list */ 234 /* In case of request with a timeout, also store in the timeout list */
237 ts = fd_msg_anscb_gettimeout( sr->req ); 235 ts = fd_msg_anscb_gettimeout( sr->req );
238 if (ts) { 236 if (ts) {
239 struct fd_list * li; 237 struct fd_list * li;
240 struct timespec * t; 238
239 memcpy(&sr->timeout, ts, sizeof(struct timespec));
241 240
242 /* browse srlist->exp from the end */ 241 /* browse srlist->exp from the end */
243 for (li = srlist->exp.prev; li != &srlist->exp; li = li->prev) { 242 for (li = srlist->exp.prev; li != &srlist->exp; li = li->prev) {
244 struct sentreq * s = (struct sentreq *)(li->o); 243 struct sentreq * s = (struct sentreq *)(li->o);
245 t = fd_msg_anscb_gettimeout( s->req ); 244 if (TS_IS_INFERIOR(&s->timeout, ts))
246 ASSERT( t ); /* sanity */
247 if (TS_IS_INFERIOR(t, ts))
248 break; 245 break;
249 } 246 }
250 247
251 fd_list_insert_after(li, &sr->expire); 248 fd_list_insert_after(li, &sr->expire);
252 249
"Welcome to our mercurial repository"