comparison libfdproto/fifo.c @ 1377:ce257e43085d

fd_fifo_del: check if queue is already (being) destroyed and return success in that case. Improves shutdown behaviour.
author Thomas Klausner <tk@giga.or.at>
date Thu, 20 Jun 2019 12:01:50 +0200
parents f40de74bd1c7
children 2131d6bdf32a
comparison
equal deleted inserted replaced
1376:b8afaf63f65a 1377:ce257e43085d
48 #include "fdproto-internal.h" 48 #include "fdproto-internal.h"
49 49
50 /* Definition of a FIFO queue object */ 50 /* Definition of a FIFO queue object */
51 struct fifo { 51 struct fifo {
52 int eyec; /* An eye catcher, also used to check a queue is valid. FIFO_EYEC */ 52 int eyec; /* An eye catcher, also used to check a queue is valid. FIFO_EYEC */
53 53
54 pthread_mutex_t mtx; /* Mutex protecting this queue */ 54 pthread_mutex_t mtx; /* Mutex protecting this queue */
55 pthread_cond_t cond_pull; /* condition variable for pulling threads */ 55 pthread_cond_t cond_pull; /* condition variable for pulling threads */
56 pthread_cond_t cond_push; /* condition variable for pushing threads */ 56 pthread_cond_t cond_push; /* condition variable for pushing threads */
57 57
58 struct fd_list list; /* sentinel for the list of elements */ 58 struct fd_list list; /* sentinel for the list of elements */
59 int count; /* number of objects in the list */ 59 int count; /* number of objects in the list */
60 int thrs; /* number of threads waiting for a new element (when count is 0) */ 60 int thrs; /* number of threads waiting for a new element (when count is 0) */
61 61
62 int max; /* maximum number of items to accept if not 0 */ 62 int max; /* maximum number of items to accept if not 0 */
63 int thrs_push; /* number of threads waitnig to push an item */ 63 int thrs_push; /* number of threads waitnig to push an item */
64 64
65 uint16_t high; /* High level threshold (see libfreeDiameter.h for details) */ 65 uint16_t high; /* High level threshold (see libfreeDiameter.h for details) */
66 uint16_t low; /* Low level threshhold */ 66 uint16_t low; /* Low level threshhold */
67 void *data; /* Opaque pointer for threshold callbacks */ 67 void *data; /* Opaque pointer for threshold callbacks */
68 void (*h_cb)(struct fifo *, void **); /* The callbacks */ 68 void (*h_cb)(struct fifo *, void **); /* The callbacks */
69 void (*l_cb)(struct fifo *, void **); 69 void (*l_cb)(struct fifo *, void **);
70 int highest;/* The highest count value for which h_cb has been called */ 70 int highest;/* The highest count value for which h_cb has been called */
71 int highest_ever; /* The max count value this queue has reached (for tweaking) */ 71 int highest_ever; /* The max count value this queue has reached (for tweaking) */
72 72
73 long long total_items; /* Cumulated number of items that went through this fifo (excluding current count), always increasing. */ 73 long long total_items; /* Cumulated number of items that went through this fifo (excluding current count), always increasing. */
74 struct timespec total_time; /* Cumulated time all items spent in this queue, including blocking time (always growing, use deltas for monitoring) */ 74 struct timespec total_time; /* Cumulated time all items spent in this queue, including blocking time (always growing, use deltas for monitoring) */
75 struct timespec blocking_time; /* Cumulated time threads trying to post new items were blocked (queue full). */ 75 struct timespec blocking_time; /* Cumulated time threads trying to post new items were blocked (queue full). */
76 struct timespec last_time; /* For the last element retrieved from the queue, how long it take between posting (including blocking) and poping */ 76 struct timespec last_time; /* For the last element retrieved from the queue, how long it take between posting (including blocking) and poping */
77 77
78 }; 78 };
79 79
80 struct fifo_item { 80 struct fifo_item {
81 struct fd_list item; 81 struct fd_list item;
82 struct timespec posted_on; 82 struct timespec posted_on;
91 91
92 /* Create a new queue, with max number of items -- use 0 for no max */ 92 /* Create a new queue, with max number of items -- use 0 for no max */
93 int fd_fifo_new ( struct fifo ** queue, int max ) 93 int fd_fifo_new ( struct fifo ** queue, int max )
94 { 94 {
95 struct fifo * new; 95 struct fifo * new;
96 96
97 TRACE_ENTRY( "%p", queue ); 97 TRACE_ENTRY( "%p", queue );
98 98
99 CHECK_PARAMS( queue ); 99 CHECK_PARAMS( queue );
100 100
101 /* Create a new object */ 101 /* Create a new object */
102 CHECK_MALLOC( new = malloc (sizeof (struct fifo) ) ); 102 CHECK_MALLOC( new = malloc (sizeof (struct fifo) ) );
103 103
104 /* Initialize the content */ 104 /* Initialize the content */
105 memset(new, 0, sizeof(struct fifo)); 105 memset(new, 0, sizeof(struct fifo));
106 106
107 new->eyec = FIFO_EYEC; 107 new->eyec = FIFO_EYEC;
108 CHECK_POSIX( pthread_mutex_init(&new->mtx, NULL) ); 108 CHECK_POSIX( pthread_mutex_init(&new->mtx, NULL) );
109 CHECK_POSIX( pthread_cond_init(&new->cond_pull, NULL) ); 109 CHECK_POSIX( pthread_cond_init(&new->cond_pull, NULL) );
110 CHECK_POSIX( pthread_cond_init(&new->cond_push, NULL) ); 110 CHECK_POSIX( pthread_cond_init(&new->cond_push, NULL) );
111 new->max = max; 111 new->max = max;
112 112
113 fd_list_init(&new->list, NULL); 113 fd_list_init(&new->list, NULL);
114 114
115 /* We're done */ 115 /* We're done */
116 *queue = new; 116 *queue = new;
117 return 0; 117 return 0;
118 } 118 }
119 119
120 /* Dump the content of a queue */ 120 /* Dump the content of a queue */
121 DECLARE_FD_DUMP_PROTOTYPE(fd_fifo_dump, char * name, struct fifo * queue, fd_fifo_dump_item_cb dump_item) 121 DECLARE_FD_DUMP_PROTOTYPE(fd_fifo_dump, char * name, struct fifo * queue, fd_fifo_dump_item_cb dump_item)
122 { 122 {
123 FD_DUMP_HANDLE_OFFSET(); 123 FD_DUMP_HANDLE_OFFSET();
124 124
125 if (name) { 125 if (name) {
126 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "'%s'(@%p): ", name, queue), return NULL); 126 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "'%s'(@%p): ", name, queue), return NULL);
127 } else { 127 } else {
128 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "{fifo}(@%p): ", queue), return NULL); 128 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "{fifo}(@%p): ", queue), return NULL);
129 } 129 }
130 130
131 if (!CHECK_FIFO( queue )) { 131 if (!CHECK_FIFO( queue )) {
132 return fd_dump_extend(FD_DUMP_STD_PARAMS, "INVALID/NULL"); 132 return fd_dump_extend(FD_DUMP_STD_PARAMS, "INVALID/NULL");
133 } 133 }
134 134
135 CHECK_POSIX_DO( pthread_mutex_lock( &queue->mtx ), /* continue */ ); 135 CHECK_POSIX_DO( pthread_mutex_lock( &queue->mtx ), /* continue */ );
136 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "items:%d,%d,%d threads:%d,%d stats:%lld/%ld.%06ld,%ld.%06ld,%ld.%06ld thresholds:%d,%d,%d,%p,%p,%p", 136 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "items:%d,%d,%d threads:%d,%d stats:%lld/%ld.%06ld,%ld.%06ld,%ld.%06ld thresholds:%d,%d,%d,%p,%p,%p",
137 queue->count, queue->highest_ever, queue->max, 137 queue->count, queue->highest_ever, queue->max,
138 queue->thrs, queue->thrs_push, 138 queue->thrs, queue->thrs_push,
139 queue->total_items,(long)queue->total_time.tv_sec,(long)(queue->total_time.tv_nsec/1000),(long)queue->blocking_time.tv_sec,(long)(queue->blocking_time.tv_nsec/1000),(long)queue->last_time.tv_sec,(long)(queue->last_time.tv_nsec/1000), 139 queue->total_items,(long)queue->total_time.tv_sec,(long)(queue->total_time.tv_nsec/1000),(long)queue->blocking_time.tv_sec,(long)(queue->blocking_time.tv_nsec/1000),(long)queue->last_time.tv_sec,(long)(queue->last_time.tv_nsec/1000),
140 queue->high, queue->low, queue->highest, queue->h_cb, queue->l_cb, queue->data), 140 queue->high, queue->low, queue->highest, queue->h_cb, queue->l_cb, queue->data),
141 goto error); 141 goto error);
142 142
143 if (dump_item) { 143 if (dump_item) {
144 struct fd_list * li; 144 struct fd_list * li;
145 int i = 0; 145 int i = 0;
146 for (li = queue->list.next; li != &queue->list; li = li->next) { 146 for (li = queue->list.next; li != &queue->list; li = li->next) {
147 struct fifo_item * fi = (struct fifo_item *)li; 147 struct fifo_item * fi = (struct fifo_item *)li;
148 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n [#%i](@%p)@%ld.%06ld: ", 148 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n [#%i](@%p)@%ld.%06ld: ",
149 i++, fi->item.o, (long)fi->posted_on.tv_sec,(long)(fi->posted_on.tv_nsec/1000)), 149 i++, fi->item.o, (long)fi->posted_on.tv_sec,(long)(fi->posted_on.tv_nsec/1000)),
150 goto error); 150 goto error);
151 CHECK_MALLOC_DO( (*dump_item)(FD_DUMP_STD_PARAMS, fi->item.o), goto error); 151 CHECK_MALLOC_DO( (*dump_item)(FD_DUMP_STD_PARAMS, fi->item.o), goto error);
152 } 152 }
153 } 153 }
154 CHECK_POSIX_DO( pthread_mutex_unlock( &queue->mtx ), /* continue */ ); 154 CHECK_POSIX_DO( pthread_mutex_unlock( &queue->mtx ), /* continue */ );
155 155
156 return *buf; 156 return *buf;
157 error: 157 error:
158 CHECK_POSIX_DO( pthread_mutex_unlock( &queue->mtx ), /* continue */ ); 158 CHECK_POSIX_DO( pthread_mutex_unlock( &queue->mtx ), /* continue */ );
159 return NULL; 159 return NULL;
160 } 160 }
161 161
162 /* Delete a queue. It must be empty. */ 162 /* Delete a queue. It must be empty. */
163 int fd_fifo_del ( struct fifo ** queue ) 163 int fd_fifo_del ( struct fifo ** queue )
164 { 164 {
165 struct fifo * q; 165 struct fifo * q;
166 int loops = 0; 166 int loops = 0;
167 167
168 TRACE_ENTRY( "%p", queue ); 168 TRACE_ENTRY( "%p", queue );
169 169
170 if (queue && *queue == NULL) {
171 /* Queue already (in the process of being) deleted */
172 return 0;
173 }
174
170 CHECK_PARAMS( queue && CHECK_FIFO( *queue ) ); 175 CHECK_PARAMS( queue && CHECK_FIFO( *queue ) );
171 176
172 q = *queue; 177 q = *queue;
173 178
174 CHECK_POSIX( pthread_mutex_lock( &q->mtx ) ); 179 CHECK_POSIX( pthread_mutex_lock( &q->mtx ) );
175 180
176 if ((q->count != 0) || (q->data != NULL)) { 181 if ((q->count != 0) || (q->data != NULL)) {
177 TRACE_DEBUG(INFO, "The queue cannot be destroyed (%d, %p)", q->count, q->data); 182 TRACE_DEBUG(INFO, "The queue cannot be destroyed (%d, %p)", q->count, q->data);
178 CHECK_POSIX_DO( pthread_mutex_unlock( &q->mtx ), /* no fallback */ ); 183 CHECK_POSIX_DO( pthread_mutex_unlock( &q->mtx ), /* no fallback */ );
179 return EINVAL; 184 return EINVAL;
180 } 185 }
181 186
182 /* Ok, now invalidate the queue */ 187 /* Ok, now invalidate the queue */
183 q->eyec = 0xdead; 188 q->eyec = 0xdead;
184 189
185 /* Have all waiting threads return an error */ 190 /* Have all waiting threads return an error */
186 while (q->thrs) { 191 while (q->thrs) {
187 CHECK_POSIX( pthread_mutex_unlock( &q->mtx )); 192 CHECK_POSIX( pthread_mutex_unlock( &q->mtx ));
188 CHECK_POSIX( pthread_cond_signal(&q->cond_pull) ); 193 CHECK_POSIX( pthread_cond_signal(&q->cond_pull) );
189 usleep(1000); 194 usleep(1000);
190 195
191 CHECK_POSIX( pthread_mutex_lock( &q->mtx ) ); 196 CHECK_POSIX( pthread_mutex_lock( &q->mtx ) );
192 ASSERT( ++loops < 20 ); /* detect infinite loops */ 197 ASSERT( ++loops < 20 ); /* detect infinite loops */
193 } 198 }
194 199
195 /* sanity check */ 200 /* sanity check */
196 ASSERT(FD_IS_LIST_EMPTY(&q->list)); 201 ASSERT(FD_IS_LIST_EMPTY(&q->list));
197 202
198 /* And destroy it */ 203 /* And destroy it */
199 CHECK_POSIX( pthread_mutex_unlock( &q->mtx ) ); 204 CHECK_POSIX( pthread_mutex_unlock( &q->mtx ) );
200 205
201 CHECK_POSIX_DO( pthread_cond_destroy( &q->cond_pull ), ); 206 CHECK_POSIX_DO( pthread_cond_destroy( &q->cond_pull ), );
202 207
203 CHECK_POSIX_DO( pthread_cond_destroy( &q->cond_push ), ); 208 CHECK_POSIX_DO( pthread_cond_destroy( &q->cond_push ), );
204 209
205 CHECK_POSIX_DO( pthread_mutex_destroy( &q->mtx ), ); 210 CHECK_POSIX_DO( pthread_mutex_destroy( &q->mtx ), );
206 211
207 free(q); 212 free(q);
208 *queue = NULL; 213 *queue = NULL;
209 214
210 return 0; 215 return 0;
211 } 216 }
212 217
213 /* Move the content of old into new, and update loc_update atomically. We leave the old queue empty but valid */ 218 /* Move the content of old into new, and update loc_update atomically. We leave the old queue empty but valid */
214 int fd_fifo_move ( struct fifo * old, struct fifo * new, struct fifo ** loc_update ) 219 int fd_fifo_move ( struct fifo * old, struct fifo * new, struct fifo ** loc_update )
215 { 220 {
216 int loops = 0; 221 int loops = 0;
217 222
218 TRACE_ENTRY("%p %p %p", old, new, loc_update); 223 TRACE_ENTRY("%p %p %p", old, new, loc_update);
219 CHECK_PARAMS( CHECK_FIFO( old ) && CHECK_FIFO( new )); 224 CHECK_PARAMS( CHECK_FIFO( old ) && CHECK_FIFO( new ));
220 225
221 CHECK_PARAMS( ! old->data ); 226 CHECK_PARAMS( ! old->data );
222 if (new->high) { 227 if (new->high) {
223 TODO("Implement support for thresholds in fd_fifo_move..."); 228 TODO("Implement support for thresholds in fd_fifo_move...");
224 } 229 }
225 230
226 /* Update loc_update */ 231 /* Update loc_update */
227 if (loc_update) 232 if (loc_update)
228 *loc_update = new; 233 *loc_update = new;
229 234
230 /* Lock the queues */ 235 /* Lock the queues */
231 CHECK_POSIX( pthread_mutex_lock( &old->mtx ) ); 236 CHECK_POSIX( pthread_mutex_lock( &old->mtx ) );
232 237
233 CHECK_PARAMS_DO( (! old->thrs_push), { 238 CHECK_PARAMS_DO( (! old->thrs_push), {
234 pthread_mutex_unlock( &old->mtx ); 239 pthread_mutex_unlock( &old->mtx );
235 return EINVAL; 240 return EINVAL;
236 } ); 241 } );
237 242
238 CHECK_POSIX( pthread_mutex_lock( &new->mtx ) ); 243 CHECK_POSIX( pthread_mutex_lock( &new->mtx ) );
239 244
240 /* Any waiting thread on the old queue returns an error */ 245 /* Any waiting thread on the old queue returns an error */
241 old->eyec = 0xdead; 246 old->eyec = 0xdead;
242 while (old->thrs) { 247 while (old->thrs) {
243 CHECK_POSIX( pthread_mutex_unlock( &old->mtx )); 248 CHECK_POSIX( pthread_mutex_unlock( &old->mtx ));
244 CHECK_POSIX( pthread_cond_signal( &old->cond_pull ) ); 249 CHECK_POSIX( pthread_cond_signal( &old->cond_pull ) );
245 usleep(1000); 250 usleep(1000);
246 251
247 CHECK_POSIX( pthread_mutex_lock( &old->mtx ) ); 252 CHECK_POSIX( pthread_mutex_lock( &old->mtx ) );
248 ASSERT( loops < 20 ); /* detect infinite loops */ 253 ASSERT( loops < 20 ); /* detect infinite loops */
249 } 254 }
250 255
251 /* Move all data from old to new */ 256 /* Move all data from old to new */
252 fd_list_move_end( &new->list, &old->list ); 257 fd_list_move_end( &new->list, &old->list );
253 if (old->count && (!new->count)) { 258 if (old->count && (!new->count)) {
254 CHECK_POSIX( pthread_cond_signal(&new->cond_pull) ); 259 CHECK_POSIX( pthread_cond_signal(&new->cond_pull) );
255 } 260 }
256 new->count += old->count; 261 new->count += old->count;
257 262
258 /* Reset old */ 263 /* Reset old */
259 old->count = 0; 264 old->count = 0;
260 old->eyec = FIFO_EYEC; 265 old->eyec = FIFO_EYEC;
261 266
262 /* Merge the stats in the new queue */ 267 /* Merge the stats in the new queue */
263 new->total_items += old->total_items; 268 new->total_items += old->total_items;
264 old->total_items = 0; 269 old->total_items = 0;
265 270
266 new->total_time.tv_nsec += old->total_time.tv_nsec; 271 new->total_time.tv_nsec += old->total_time.tv_nsec;
267 new->total_time.tv_sec += old->total_time.tv_sec + (new->total_time.tv_nsec / 1000000000); 272 new->total_time.tv_sec += old->total_time.tv_sec + (new->total_time.tv_nsec / 1000000000);
268 new->total_time.tv_nsec %= 1000000000; 273 new->total_time.tv_nsec %= 1000000000;
269 old->total_time.tv_nsec = 0; 274 old->total_time.tv_nsec = 0;
270 old->total_time.tv_sec = 0; 275 old->total_time.tv_sec = 0;
271 276
272 new->blocking_time.tv_nsec += old->blocking_time.tv_nsec; 277 new->blocking_time.tv_nsec += old->blocking_time.tv_nsec;
273 new->blocking_time.tv_sec += old->blocking_time.tv_sec + (new->blocking_time.tv_nsec / 1000000000); 278 new->blocking_time.tv_sec += old->blocking_time.tv_sec + (new->blocking_time.tv_nsec / 1000000000);
274 new->blocking_time.tv_nsec %= 1000000000; 279 new->blocking_time.tv_nsec %= 1000000000;
275 old->blocking_time.tv_nsec = 0; 280 old->blocking_time.tv_nsec = 0;
276 old->blocking_time.tv_sec = 0; 281 old->blocking_time.tv_sec = 0;
277 282
278 /* Unlock, we're done */ 283 /* Unlock, we're done */
279 CHECK_POSIX( pthread_mutex_unlock( &new->mtx ) ); 284 CHECK_POSIX( pthread_mutex_unlock( &new->mtx ) );
280 CHECK_POSIX( pthread_mutex_unlock( &old->mtx ) ); 285 CHECK_POSIX( pthread_mutex_unlock( &old->mtx ) );
281 286
282 return 0; 287 return 0;
283 } 288 }
284 289
285 /* Get the information on the queue */ 290 /* Get the information on the queue */
286 int fd_fifo_getstats( struct fifo * queue, int * current_count, int * limit_count, int * highest_count, long long * total_count, 291 int fd_fifo_getstats( struct fifo * queue, int * current_count, int * limit_count, int * highest_count, long long * total_count,
287 struct timespec * total, struct timespec * blocking, struct timespec * last) 292 struct timespec * total, struct timespec * blocking, struct timespec * last)
288 { 293 {
289 TRACE_ENTRY( "%p %p %p %p %p %p %p %p", queue, current_count, limit_count, highest_count, total_count, total, blocking, last); 294 TRACE_ENTRY( "%p %p %p %p %p %p %p %p", queue, current_count, limit_count, highest_count, total_count, total, blocking, last);
290 295
291 /* Check the parameters */ 296 /* Check the parameters */
292 CHECK_PARAMS( CHECK_FIFO( queue ) ); 297 CHECK_PARAMS( CHECK_FIFO( queue ) );
293 298
294 /* lock the queue */ 299 /* lock the queue */
295 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); 300 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) );
296 301
297 if (current_count) 302 if (current_count)
298 *current_count = queue->count; 303 *current_count = queue->count;
299 304
300 if (limit_count) 305 if (limit_count)
301 *limit_count = queue->max; 306 *limit_count = queue->max;
302 307
303 if (highest_count) 308 if (highest_count)
304 *highest_count = queue->highest_ever; 309 *highest_count = queue->highest_ever;
305 310
306 if (total_count) 311 if (total_count)
307 *total_count = queue->total_items; 312 *total_count = queue->total_items;
308 313
309 if (total) 314 if (total)
310 memcpy(total, &queue->total_time, sizeof(struct timespec)); 315 memcpy(total, &queue->total_time, sizeof(struct timespec));
311 316
312 if (blocking) 317 if (blocking)
313 memcpy(blocking, &queue->blocking_time, sizeof(struct timespec)); 318 memcpy(blocking, &queue->blocking_time, sizeof(struct timespec));
314 319
315 if (last) 320 if (last)
316 memcpy(last, &queue->last_time, sizeof(struct timespec)); 321 memcpy(last, &queue->last_time, sizeof(struct timespec));
317 322
318 /* Unlock */ 323 /* Unlock */
319 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); 324 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) );
320 325
321 /* Done */ 326 /* Done */
322 return 0; 327 return 0;
323 } 328 }
324 329
325 330
326 /* alternate version with no error checking */ 331 /* alternate version with no error checking */
327 int fd_fifo_length ( struct fifo * queue ) 332 int fd_fifo_length ( struct fifo * queue )
328 { 333 {
329 if ( !CHECK_FIFO( queue ) ) 334 if ( !CHECK_FIFO( queue ) )
330 return 0; 335 return 0;
331 336
332 return queue->count; /* Let's hope it's read atomically, since we are not locking... */ 337 return queue->count; /* Let's hope it's read atomically, since we are not locking... */
333 } 338 }
334 339
335 /* Set the thresholds of the queue */ 340 /* Set the thresholds of the queue */
336 int fd_fifo_setthrhd ( struct fifo * queue, void * data, uint16_t high, void (*h_cb)(struct fifo *, void **), uint16_t low, void (*l_cb)(struct fifo *, void **) ) 341 int fd_fifo_setthrhd ( struct fifo * queue, void * data, uint16_t high, void (*h_cb)(struct fifo *, void **), uint16_t low, void (*l_cb)(struct fifo *, void **) )
337 { 342 {
338 TRACE_ENTRY( "%p %p %hu %p %hu %p", queue, data, high, h_cb, low, l_cb ); 343 TRACE_ENTRY( "%p %p %hu %p %hu %p", queue, data, high, h_cb, low, l_cb );
339 344
340 /* Check the parameters */ 345 /* Check the parameters */
341 CHECK_PARAMS( CHECK_FIFO( queue ) && (high > low) && (queue->data == NULL) ); 346 CHECK_PARAMS( CHECK_FIFO( queue ) && (high > low) && (queue->data == NULL) );
342 347
343 /* lock the queue */ 348 /* lock the queue */
344 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); 349 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) );
345 350
346 /* Save the values */ 351 /* Save the values */
347 queue->high = high; 352 queue->high = high;
348 queue->low = low; 353 queue->low = low;
349 queue->data = data; 354 queue->data = data;
350 queue->h_cb = h_cb; 355 queue->h_cb = h_cb;
351 queue->l_cb = l_cb; 356 queue->l_cb = l_cb;
352 357
353 /* Unlock */ 358 /* Unlock */
354 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); 359 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) );
355 360
356 /* Done */ 361 /* Done */
357 return 0; 362 return 0;
358 } 363 }
359 364
360 365
361 /* This handler is called when a thread is blocked on a queue, and cancelled */ 366 /* This handler is called when a thread is blocked on a queue, and cancelled */
362 static void fifo_cleanup_push(void * queue) 367 static void fifo_cleanup_push(void * queue)
363 { 368 {
364 struct fifo * q = (struct fifo *)queue; 369 struct fifo * q = (struct fifo *)queue;
365 TRACE_ENTRY( "%p", queue ); 370 TRACE_ENTRY( "%p", queue );
366 371
367 /* The thread has been cancelled, therefore it does not wait on the queue anymore */ 372 /* The thread has been cancelled, therefore it does not wait on the queue anymore */
368 q->thrs_push--; 373 q->thrs_push--;
369 374
370 /* Now unlock the queue, and we're done */ 375 /* Now unlock the queue, and we're done */
371 CHECK_POSIX_DO( pthread_mutex_unlock( &q->mtx ), /* nothing */ ); 376 CHECK_POSIX_DO( pthread_mutex_unlock( &q->mtx ), /* nothing */ );
372 377
373 /* End of cleanup handler */ 378 /* End of cleanup handler */
374 return; 379 return;
375 } 380 }
376 381
377 382
379 int fd_fifo_post_internal ( struct fifo * queue, void ** item, int skip_max ) 384 int fd_fifo_post_internal ( struct fifo * queue, void ** item, int skip_max )
380 { 385 {
381 struct fifo_item * new; 386 struct fifo_item * new;
382 int call_cb = 0; 387 int call_cb = 0;
383 struct timespec posted_on, queued_on; 388 struct timespec posted_on, queued_on;
384 389
385 /* Get the timing of this call */ 390 /* Get the timing of this call */
386 CHECK_SYS( clock_gettime(CLOCK_REALTIME, &posted_on) ); 391 CHECK_SYS( clock_gettime(CLOCK_REALTIME, &posted_on) );
387 392
388 /* lock the queue */ 393 /* lock the queue */
389 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); 394 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) );
390 395
391 if ((!skip_max) && (queue->max)) { 396 if ((!skip_max) && (queue->max)) {
392 while (queue->count >= queue->max) { 397 while (queue->count >= queue->max) {
393 int ret = 0; 398 int ret = 0;
394 399
395 /* We have to wait for an item to be pulled */ 400 /* We have to wait for an item to be pulled */
396 queue->thrs_push++ ; 401 queue->thrs_push++ ;
397 pthread_cleanup_push( fifo_cleanup_push, queue); 402 pthread_cleanup_push( fifo_cleanup_push, queue);
398 ret = pthread_cond_wait( &queue->cond_push, &queue->mtx ); 403 ret = pthread_cond_wait( &queue->cond_push, &queue->mtx );
399 pthread_cleanup_pop(0); 404 pthread_cleanup_pop(0);
400 queue->thrs_push-- ; 405 queue->thrs_push-- ;
401 406
402 ASSERT( ret == 0 ); 407 ASSERT( ret == 0 );
403 } 408 }
404 } 409 }
405 410
406 /* Create a new list item */ 411 /* Create a new list item */
407 CHECK_MALLOC_DO( new = malloc (sizeof (struct fifo_item)) , { 412 CHECK_MALLOC_DO( new = malloc (sizeof (struct fifo_item)) , {
408 pthread_mutex_unlock( &queue->mtx ); 413 pthread_mutex_unlock( &queue->mtx );
409 return ENOMEM; 414 return ENOMEM;
410 } ); 415 } );
411 416
412 fd_list_init(&new->item, *item); 417 fd_list_init(&new->item, *item);
413 *item = NULL; 418 *item = NULL;
414 419
415 /* Add the new item at the end */ 420 /* Add the new item at the end */
416 fd_list_insert_before( &queue->list, &new->item); 421 fd_list_insert_before( &queue->list, &new->item);
417 queue->count++; 422 queue->count++;
418 if (queue->highest_ever < queue->count) 423 if (queue->highest_ever < queue->count)
419 queue->highest_ever = queue->count; 424 queue->highest_ever = queue->count;
420 if (queue->high && ((queue->count % queue->high) == 0)) { 425 if (queue->high && ((queue->count % queue->high) == 0)) {
421 call_cb = 1; 426 call_cb = 1;
422 queue->highest = queue->count; 427 queue->highest = queue->count;
423 } 428 }
424 429
425 /* store timing */ 430 /* store timing */
426 memcpy(&new->posted_on, &posted_on, sizeof(struct timespec)); 431 memcpy(&new->posted_on, &posted_on, sizeof(struct timespec));
427 432
428 /* update queue timing info "blocking time" */ 433 /* update queue timing info "blocking time" */
429 { 434 {
430 long long blocked_ns; 435 long long blocked_ns;
431 CHECK_SYS( clock_gettime(CLOCK_REALTIME, &queued_on) ); 436 CHECK_SYS( clock_gettime(CLOCK_REALTIME, &queued_on) );
432 blocked_ns = (queued_on.tv_sec - posted_on.tv_sec) * 1000000000; 437 blocked_ns = (queued_on.tv_sec - posted_on.tv_sec) * 1000000000;
433 blocked_ns += (queued_on.tv_nsec - posted_on.tv_nsec); 438 blocked_ns += (queued_on.tv_nsec - posted_on.tv_nsec);
434 blocked_ns += queue->blocking_time.tv_nsec; 439 blocked_ns += queue->blocking_time.tv_nsec;
435 queue->blocking_time.tv_sec += blocked_ns / 1000000000; 440 queue->blocking_time.tv_sec += blocked_ns / 1000000000;
436 queue->blocking_time.tv_nsec = blocked_ns % 1000000000; 441 queue->blocking_time.tv_nsec = blocked_ns % 1000000000;
437 } 442 }
438 443
439 /* Signal if threads are asleep */ 444 /* Signal if threads are asleep */
440 if (queue->thrs > 0) { 445 if (queue->thrs > 0) {
441 CHECK_POSIX( pthread_cond_signal(&queue->cond_pull) ); 446 CHECK_POSIX( pthread_cond_signal(&queue->cond_pull) );
442 } 447 }
443 if (queue->thrs_push > 0) { 448 if (queue->thrs_push > 0) {
444 /* cascade */ 449 /* cascade */
445 CHECK_POSIX( pthread_cond_signal(&queue->cond_push) ); 450 CHECK_POSIX( pthread_cond_signal(&queue->cond_push) );
446 } 451 }
447 452
448 /* Unlock */ 453 /* Unlock */
449 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); 454 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) );
450 455
451 /* Call high-watermark cb as needed */ 456 /* Call high-watermark cb as needed */
452 if (call_cb && queue->h_cb) 457 if (call_cb && queue->h_cb)
453 (*queue->h_cb)(queue, &queue->data); 458 (*queue->h_cb)(queue, &queue->data);
454 459
455 /* Done */ 460 /* Done */
456 return 0; 461 return 0;
457 } 462 }
458 463
459 /* Post a new item in the queue */ 464 /* Post a new item in the queue */
460 int fd_fifo_post_int ( struct fifo * queue, void ** item ) 465 int fd_fifo_post_int ( struct fifo * queue, void ** item )
461 { 466 {
462 TRACE_ENTRY( "%p %p", queue, item ); 467 TRACE_ENTRY( "%p %p", queue, item );
463 468
464 /* Check the parameters */ 469 /* Check the parameters */
465 CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item ); 470 CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item );
466 471
467 return fd_fifo_post_internal ( queue,item, 0 ); 472 return fd_fifo_post_internal ( queue,item, 0 );
468 473
469 } 474 }
470 475
471 /* Post a new item in the queue, not blocking */ 476 /* Post a new item in the queue, not blocking */
472 int fd_fifo_post_noblock ( struct fifo * queue, void ** item ) 477 int fd_fifo_post_noblock ( struct fifo * queue, void ** item )
473 { 478 {
474 TRACE_ENTRY( "%p %p", queue, item ); 479 TRACE_ENTRY( "%p %p", queue, item );
475 480
476 /* Check the parameters */ 481 /* Check the parameters */
477 CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item ); 482 CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item );
478 483
479 return fd_fifo_post_internal ( queue,item, 1 ); 484 return fd_fifo_post_internal ( queue,item, 1 );
480 485
481 } 486 }
482 487
483 /* Pop the first item from the queue */ 488 /* Pop the first item from the queue */
484 static void * mq_pop(struct fifo * queue) 489 static void * mq_pop(struct fifo * queue)
485 { 490 {
486 void * ret = NULL; 491 void * ret = NULL;
487 struct fifo_item * fi; 492 struct fifo_item * fi;
488 struct timespec now; 493 struct timespec now;
489 494
490 ASSERT( ! FD_IS_LIST_EMPTY(&queue->list) ); 495 ASSERT( ! FD_IS_LIST_EMPTY(&queue->list) );
491 496
492 fi = (struct fifo_item *)(queue->list.next); 497 fi = (struct fifo_item *)(queue->list.next);
493 ret = fi->item.o; 498 ret = fi->item.o;
494 fd_list_unlink(&fi->item); 499 fd_list_unlink(&fi->item);
495 queue->count--; 500 queue->count--;
496 queue->total_items++; 501 queue->total_items++;
497 502
498 /* Update the timings */ 503 /* Update the timings */
499 CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), goto skip_timing ); 504 CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), goto skip_timing );
500 { 505 {
501 long long elapsed = (now.tv_sec - fi->posted_on.tv_sec) * 1000000000; 506 long long elapsed = (now.tv_sec - fi->posted_on.tv_sec) * 1000000000;
502 elapsed += now.tv_nsec - fi->posted_on.tv_nsec; 507 elapsed += now.tv_nsec - fi->posted_on.tv_nsec;
503 508
504 queue->last_time.tv_sec = elapsed / 1000000000; 509 queue->last_time.tv_sec = elapsed / 1000000000;
505 queue->last_time.tv_nsec = elapsed % 1000000000; 510 queue->last_time.tv_nsec = elapsed % 1000000000;
506 511
507 elapsed += queue->total_time.tv_nsec; 512 elapsed += queue->total_time.tv_nsec;
508 queue->total_time.tv_sec += elapsed / 1000000000; 513 queue->total_time.tv_sec += elapsed / 1000000000;
509 queue->total_time.tv_nsec = elapsed % 1000000000; 514 queue->total_time.tv_nsec = elapsed % 1000000000;
510 } 515 }
511 skip_timing: 516 skip_timing:
512 free(fi); 517 free(fi);
513 518
514 if (queue->thrs_push) { 519 if (queue->thrs_push) {
515 CHECK_POSIX_DO( pthread_cond_signal( &queue->cond_push ), ); 520 CHECK_POSIX_DO( pthread_cond_signal( &queue->cond_push ), );
516 } 521 }
517 522
518 return ret; 523 return ret;
519 } 524 }
520 525
521 /* Check if the low watermark callback must be called. */ 526 /* Check if the low watermark callback must be called. */
522 static __inline__ int test_l_cb(struct fifo * queue) 527 static __inline__ int test_l_cb(struct fifo * queue)
523 { 528 {
524 if ((queue->high == 0) || (queue->low == 0) || (queue->l_cb == 0)) 529 if ((queue->high == 0) || (queue->low == 0) || (queue->l_cb == 0))
525 return 0; 530 return 0;
526 531
527 if (((queue->count % queue->high) == queue->low) && (queue->highest > queue->count)) { 532 if (((queue->count % queue->high) == queue->low) && (queue->highest > queue->count)) {
528 queue->highest -= queue->high; 533 queue->highest -= queue->high;
529 return 1; 534 return 1;
530 } 535 }
531 536
532 return 0; 537 return 0;
533 } 538 }
534 539
535 /* Try poping an item */ 540 /* Try poping an item */
536 int fd_fifo_tryget_int ( struct fifo * queue, void ** item ) 541 int fd_fifo_tryget_int ( struct fifo * queue, void ** item )
537 { 542 {
538 int wouldblock = 0; 543 int wouldblock = 0;
539 int call_cb = 0; 544 int call_cb = 0;
540 545
541 TRACE_ENTRY( "%p %p", queue, item ); 546 TRACE_ENTRY( "%p %p", queue, item );
542 547
543 /* Check the parameters */ 548 /* Check the parameters */
544 CHECK_PARAMS( CHECK_FIFO( queue ) && item ); 549 CHECK_PARAMS( CHECK_FIFO( queue ) && item );
545 550
546 /* lock the queue */ 551 /* lock the queue */
547 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); 552 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) );
548 553
549 /* Check queue status */ 554 /* Check queue status */
550 if (queue->count > 0) { 555 if (queue->count > 0) {
551 got_item: 556 got_item:
552 /* There are elements in the queue, so pick the first one */ 557 /* There are elements in the queue, so pick the first one */
553 *item = mq_pop(queue); 558 *item = mq_pop(queue);
560 usleep(1000); 565 usleep(1000);
561 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); 566 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) );
562 if (queue->count > 0) 567 if (queue->count > 0)
563 goto got_item; 568 goto got_item;
564 } 569 }
565 570
566 wouldblock = 1; 571 wouldblock = 1;
567 *item = NULL; 572 *item = NULL;
568 } 573 }
569 574
570 /* Unlock */ 575 /* Unlock */
571 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); 576 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) );
572 577
573 /* Call low watermark callback as needed */ 578 /* Call low watermark callback as needed */
574 if (call_cb) 579 if (call_cb)
575 (*queue->l_cb)(queue, &queue->data); 580 (*queue->l_cb)(queue, &queue->data);
576 581
577 /* Done */ 582 /* Done */
578 return wouldblock ? EWOULDBLOCK : 0; 583 return wouldblock ? EWOULDBLOCK : 0;
579 } 584 }
580 585
581 /* This handler is called when a thread is blocked on a queue, and cancelled */ 586 /* This handler is called when a thread is blocked on a queue, and cancelled */
582 static void fifo_cleanup(void * queue) 587 static void fifo_cleanup(void * queue)
583 { 588 {
584 struct fifo * q = (struct fifo *)queue; 589 struct fifo * q = (struct fifo *)queue;
585 TRACE_ENTRY( "%p", queue ); 590 TRACE_ENTRY( "%p", queue );
586 591
587 /* The thread has been cancelled, therefore it does not wait on the queue anymore */ 592 /* The thread has been cancelled, therefore it does not wait on the queue anymore */
588 q->thrs--; 593 q->thrs--;
589 594
590 /* Now unlock the queue, and we're done */ 595 /* Now unlock the queue, and we're done */
591 CHECK_POSIX_DO( pthread_mutex_unlock( &q->mtx ), /* nothing */ ); 596 CHECK_POSIX_DO( pthread_mutex_unlock( &q->mtx ), /* nothing */ );
592 597
593 /* End of cleanup handler */ 598 /* End of cleanup handler */
594 return; 599 return;
595 } 600 }
596 601
597 /* The internal function for fd_fifo_timedget and fd_fifo_get */ 602 /* The internal function for fd_fifo_timedget and fd_fifo_get */
598 static int fifo_tget ( struct fifo * queue, void ** item, int istimed, const struct timespec *abstime) 603 static int fifo_tget ( struct fifo * queue, void ** item, int istimed, const struct timespec *abstime)
599 { 604 {
600 int call_cb = 0; 605 int call_cb = 0;
601 int ret = 0; 606 int ret = 0;
602 607
603 /* Check the parameters */ 608 /* Check the parameters */
604 CHECK_PARAMS( CHECK_FIFO( queue ) && item && (abstime || !istimed) ); 609 CHECK_PARAMS( CHECK_FIFO( queue ) && item && (abstime || !istimed) );
605 610
606 /* Initialize the return value */ 611 /* Initialize the return value */
607 *item = NULL; 612 *item = NULL;
608 613
609 /* lock the queue */ 614 /* lock the queue */
610 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); 615 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) );
611 616
612 awaken: 617 awaken:
613 /* Check queue status */ 618 /* Check queue status */
614 if (!CHECK_FIFO( queue )) { 619 if (!CHECK_FIFO( queue )) {
615 /* The queue is being destroyed */ 620 /* The queue is being destroyed */
616 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); 621 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) );
617 TRACE_DEBUG(FULL, "The queue is being destroyed -> EPIPE"); 622 TRACE_DEBUG(FULL, "The queue is being destroyed -> EPIPE");
618 return EPIPE; 623 return EPIPE;
619 } 624 }
620 625
621 if (queue->count > 0) { 626 if (queue->count > 0) {
622 /* There are items in the queue, so pick the first one */ 627 /* There are items in the queue, so pick the first one */
623 *item = mq_pop(queue); 628 *item = mq_pop(queue);
624 call_cb = test_l_cb(queue); 629 call_cb = test_l_cb(queue);
625 } else { 630 } else {
633 } 638 }
634 pthread_cleanup_pop(0); 639 pthread_cleanup_pop(0);
635 queue->thrs-- ; 640 queue->thrs-- ;
636 if (ret == 0) 641 if (ret == 0)
637 goto awaken; /* test for spurious wake-ups */ 642 goto awaken; /* test for spurious wake-ups */
638 643
639 /* otherwise (ETIMEDOUT / other error) just continue */ 644 /* otherwise (ETIMEDOUT / other error) just continue */
640 } 645 }
641 646
642 /* Unlock */ 647 /* Unlock */
643 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); 648 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) );
644 649
645 /* Call low watermark callback as needed */ 650 /* Call low watermark callback as needed */
646 if (call_cb) 651 if (call_cb)
647 (*queue->l_cb)(queue, &queue->data); 652 (*queue->l_cb)(queue, &queue->data);
648 653
649 /* Done */ 654 /* Done */
650 return ret; 655 return ret;
651 } 656 }
652 657
653 /* Get the next available item, block until there is one */ 658 /* Get the next available item, block until there is one */
667 /* Test if data is available in the queue, without pulling it */ 672 /* Test if data is available in the queue, without pulling it */
668 int fd_fifo_select ( struct fifo * queue, const struct timespec *abstime ) 673 int fd_fifo_select ( struct fifo * queue, const struct timespec *abstime )
669 { 674 {
670 int ret = 0; 675 int ret = 0;
671 TRACE_ENTRY( "%p %p", queue, abstime ); 676 TRACE_ENTRY( "%p %p", queue, abstime );
672 677
673 CHECK_PARAMS_DO( CHECK_FIFO( queue ), return -EINVAL ); 678 CHECK_PARAMS_DO( CHECK_FIFO( queue ), return -EINVAL );
674 679
675 /* lock the queue */ 680 /* lock the queue */
676 CHECK_POSIX_DO( pthread_mutex_lock( &queue->mtx ), return -__ret__ ); 681 CHECK_POSIX_DO( pthread_mutex_lock( &queue->mtx ), return -__ret__ );
677 682
678 awaken: 683 awaken:
679 ret = (queue->count > 0 ) ? queue->count : 0; 684 ret = (queue->count > 0 ) ? queue->count : 0;
680 if ((ret == 0) && (abstime != NULL)) { 685 if ((ret == 0) && (abstime != NULL)) {
681 /* We have to wait for a new item */ 686 /* We have to wait for a new item */
682 queue->thrs++ ; 687 queue->thrs++ ;
683 pthread_cleanup_push( fifo_cleanup, queue); 688 pthread_cleanup_push( fifo_cleanup, queue);
684 ret = pthread_cond_timedwait( &queue->cond_pull, &queue->mtx, abstime ); 689 ret = pthread_cond_timedwait( &queue->cond_pull, &queue->mtx, abstime );
685 pthread_cleanup_pop(0); 690 pthread_cleanup_pop(0);
686 queue->thrs-- ; 691 queue->thrs-- ;
687 if (ret == 0) 692 if (ret == 0)
688 goto awaken; /* test for spurious wake-ups */ 693 goto awaken; /* test for spurious wake-ups */
689 694
690 if (ret == ETIMEDOUT) 695 if (ret == ETIMEDOUT)
691 ret = 0; 696 ret = 0;
692 else 697 else
693 ret = -ret; 698 ret = -ret;
694 } 699 }
695 700
696 /* Unlock */ 701 /* Unlock */
697 CHECK_POSIX_DO( pthread_mutex_unlock( &queue->mtx ), return -__ret__ ); 702 CHECK_POSIX_DO( pthread_mutex_unlock( &queue->mtx ), return -__ret__ );
698 703
699 return ret; 704 return ret;
700 } 705 }
"Welcome to our mercurial repository"