Mercurial > hg > freeDiameter
comparison libfdproto/fifo.c @ 1377:ce257e43085d
fd_fifo_del: check if queue is already (being) destroyed and return success in that case.
Improves shutdown behaviour.
author | Thomas Klausner <tk@giga.or.at> |
---|---|
date | Thu, 20 Jun 2019 12:01:50 +0200 |
parents | f40de74bd1c7 |
children | 2131d6bdf32a |
comparison
equal
deleted
inserted
replaced
1376:b8afaf63f65a | 1377:ce257e43085d |
---|---|
48 #include "fdproto-internal.h" | 48 #include "fdproto-internal.h" |
49 | 49 |
50 /* Definition of a FIFO queue object */ | 50 /* Definition of a FIFO queue object */ |
51 struct fifo { | 51 struct fifo { |
52 int eyec; /* An eye catcher, also used to check a queue is valid. FIFO_EYEC */ | 52 int eyec; /* An eye catcher, also used to check a queue is valid. FIFO_EYEC */ |
53 | 53 |
54 pthread_mutex_t mtx; /* Mutex protecting this queue */ | 54 pthread_mutex_t mtx; /* Mutex protecting this queue */ |
55 pthread_cond_t cond_pull; /* condition variable for pulling threads */ | 55 pthread_cond_t cond_pull; /* condition variable for pulling threads */ |
56 pthread_cond_t cond_push; /* condition variable for pushing threads */ | 56 pthread_cond_t cond_push; /* condition variable for pushing threads */ |
57 | 57 |
58 struct fd_list list; /* sentinel for the list of elements */ | 58 struct fd_list list; /* sentinel for the list of elements */ |
59 int count; /* number of objects in the list */ | 59 int count; /* number of objects in the list */ |
60 int thrs; /* number of threads waiting for a new element (when count is 0) */ | 60 int thrs; /* number of threads waiting for a new element (when count is 0) */ |
61 | 61 |
62 int max; /* maximum number of items to accept if not 0 */ | 62 int max; /* maximum number of items to accept if not 0 */ |
63 int thrs_push; /* number of threads waitnig to push an item */ | 63 int thrs_push; /* number of threads waitnig to push an item */ |
64 | 64 |
65 uint16_t high; /* High level threshold (see libfreeDiameter.h for details) */ | 65 uint16_t high; /* High level threshold (see libfreeDiameter.h for details) */ |
66 uint16_t low; /* Low level threshhold */ | 66 uint16_t low; /* Low level threshhold */ |
67 void *data; /* Opaque pointer for threshold callbacks */ | 67 void *data; /* Opaque pointer for threshold callbacks */ |
68 void (*h_cb)(struct fifo *, void **); /* The callbacks */ | 68 void (*h_cb)(struct fifo *, void **); /* The callbacks */ |
69 void (*l_cb)(struct fifo *, void **); | 69 void (*l_cb)(struct fifo *, void **); |
70 int highest;/* The highest count value for which h_cb has been called */ | 70 int highest;/* The highest count value for which h_cb has been called */ |
71 int highest_ever; /* The max count value this queue has reached (for tweaking) */ | 71 int highest_ever; /* The max count value this queue has reached (for tweaking) */ |
72 | 72 |
73 long long total_items; /* Cumulated number of items that went through this fifo (excluding current count), always increasing. */ | 73 long long total_items; /* Cumulated number of items that went through this fifo (excluding current count), always increasing. */ |
74 struct timespec total_time; /* Cumulated time all items spent in this queue, including blocking time (always growing, use deltas for monitoring) */ | 74 struct timespec total_time; /* Cumulated time all items spent in this queue, including blocking time (always growing, use deltas for monitoring) */ |
75 struct timespec blocking_time; /* Cumulated time threads trying to post new items were blocked (queue full). */ | 75 struct timespec blocking_time; /* Cumulated time threads trying to post new items were blocked (queue full). */ |
76 struct timespec last_time; /* For the last element retrieved from the queue, how long it take between posting (including blocking) and poping */ | 76 struct timespec last_time; /* For the last element retrieved from the queue, how long it take between posting (including blocking) and poping */ |
77 | 77 |
78 }; | 78 }; |
79 | 79 |
80 struct fifo_item { | 80 struct fifo_item { |
81 struct fd_list item; | 81 struct fd_list item; |
82 struct timespec posted_on; | 82 struct timespec posted_on; |
91 | 91 |
92 /* Create a new queue, with max number of items -- use 0 for no max */ | 92 /* Create a new queue, with max number of items -- use 0 for no max */ |
93 int fd_fifo_new ( struct fifo ** queue, int max ) | 93 int fd_fifo_new ( struct fifo ** queue, int max ) |
94 { | 94 { |
95 struct fifo * new; | 95 struct fifo * new; |
96 | 96 |
97 TRACE_ENTRY( "%p", queue ); | 97 TRACE_ENTRY( "%p", queue ); |
98 | 98 |
99 CHECK_PARAMS( queue ); | 99 CHECK_PARAMS( queue ); |
100 | 100 |
101 /* Create a new object */ | 101 /* Create a new object */ |
102 CHECK_MALLOC( new = malloc (sizeof (struct fifo) ) ); | 102 CHECK_MALLOC( new = malloc (sizeof (struct fifo) ) ); |
103 | 103 |
104 /* Initialize the content */ | 104 /* Initialize the content */ |
105 memset(new, 0, sizeof(struct fifo)); | 105 memset(new, 0, sizeof(struct fifo)); |
106 | 106 |
107 new->eyec = FIFO_EYEC; | 107 new->eyec = FIFO_EYEC; |
108 CHECK_POSIX( pthread_mutex_init(&new->mtx, NULL) ); | 108 CHECK_POSIX( pthread_mutex_init(&new->mtx, NULL) ); |
109 CHECK_POSIX( pthread_cond_init(&new->cond_pull, NULL) ); | 109 CHECK_POSIX( pthread_cond_init(&new->cond_pull, NULL) ); |
110 CHECK_POSIX( pthread_cond_init(&new->cond_push, NULL) ); | 110 CHECK_POSIX( pthread_cond_init(&new->cond_push, NULL) ); |
111 new->max = max; | 111 new->max = max; |
112 | 112 |
113 fd_list_init(&new->list, NULL); | 113 fd_list_init(&new->list, NULL); |
114 | 114 |
115 /* We're done */ | 115 /* We're done */ |
116 *queue = new; | 116 *queue = new; |
117 return 0; | 117 return 0; |
118 } | 118 } |
119 | 119 |
120 /* Dump the content of a queue */ | 120 /* Dump the content of a queue */ |
121 DECLARE_FD_DUMP_PROTOTYPE(fd_fifo_dump, char * name, struct fifo * queue, fd_fifo_dump_item_cb dump_item) | 121 DECLARE_FD_DUMP_PROTOTYPE(fd_fifo_dump, char * name, struct fifo * queue, fd_fifo_dump_item_cb dump_item) |
122 { | 122 { |
123 FD_DUMP_HANDLE_OFFSET(); | 123 FD_DUMP_HANDLE_OFFSET(); |
124 | 124 |
125 if (name) { | 125 if (name) { |
126 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "'%s'(@%p): ", name, queue), return NULL); | 126 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "'%s'(@%p): ", name, queue), return NULL); |
127 } else { | 127 } else { |
128 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "{fifo}(@%p): ", queue), return NULL); | 128 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "{fifo}(@%p): ", queue), return NULL); |
129 } | 129 } |
130 | 130 |
131 if (!CHECK_FIFO( queue )) { | 131 if (!CHECK_FIFO( queue )) { |
132 return fd_dump_extend(FD_DUMP_STD_PARAMS, "INVALID/NULL"); | 132 return fd_dump_extend(FD_DUMP_STD_PARAMS, "INVALID/NULL"); |
133 } | 133 } |
134 | 134 |
135 CHECK_POSIX_DO( pthread_mutex_lock( &queue->mtx ), /* continue */ ); | 135 CHECK_POSIX_DO( pthread_mutex_lock( &queue->mtx ), /* continue */ ); |
136 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "items:%d,%d,%d threads:%d,%d stats:%lld/%ld.%06ld,%ld.%06ld,%ld.%06ld thresholds:%d,%d,%d,%p,%p,%p", | 136 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "items:%d,%d,%d threads:%d,%d stats:%lld/%ld.%06ld,%ld.%06ld,%ld.%06ld thresholds:%d,%d,%d,%p,%p,%p", |
137 queue->count, queue->highest_ever, queue->max, | 137 queue->count, queue->highest_ever, queue->max, |
138 queue->thrs, queue->thrs_push, | 138 queue->thrs, queue->thrs_push, |
139 queue->total_items,(long)queue->total_time.tv_sec,(long)(queue->total_time.tv_nsec/1000),(long)queue->blocking_time.tv_sec,(long)(queue->blocking_time.tv_nsec/1000),(long)queue->last_time.tv_sec,(long)(queue->last_time.tv_nsec/1000), | 139 queue->total_items,(long)queue->total_time.tv_sec,(long)(queue->total_time.tv_nsec/1000),(long)queue->blocking_time.tv_sec,(long)(queue->blocking_time.tv_nsec/1000),(long)queue->last_time.tv_sec,(long)(queue->last_time.tv_nsec/1000), |
140 queue->high, queue->low, queue->highest, queue->h_cb, queue->l_cb, queue->data), | 140 queue->high, queue->low, queue->highest, queue->h_cb, queue->l_cb, queue->data), |
141 goto error); | 141 goto error); |
142 | 142 |
143 if (dump_item) { | 143 if (dump_item) { |
144 struct fd_list * li; | 144 struct fd_list * li; |
145 int i = 0; | 145 int i = 0; |
146 for (li = queue->list.next; li != &queue->list; li = li->next) { | 146 for (li = queue->list.next; li != &queue->list; li = li->next) { |
147 struct fifo_item * fi = (struct fifo_item *)li; | 147 struct fifo_item * fi = (struct fifo_item *)li; |
148 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n [#%i](@%p)@%ld.%06ld: ", | 148 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n [#%i](@%p)@%ld.%06ld: ", |
149 i++, fi->item.o, (long)fi->posted_on.tv_sec,(long)(fi->posted_on.tv_nsec/1000)), | 149 i++, fi->item.o, (long)fi->posted_on.tv_sec,(long)(fi->posted_on.tv_nsec/1000)), |
150 goto error); | 150 goto error); |
151 CHECK_MALLOC_DO( (*dump_item)(FD_DUMP_STD_PARAMS, fi->item.o), goto error); | 151 CHECK_MALLOC_DO( (*dump_item)(FD_DUMP_STD_PARAMS, fi->item.o), goto error); |
152 } | 152 } |
153 } | 153 } |
154 CHECK_POSIX_DO( pthread_mutex_unlock( &queue->mtx ), /* continue */ ); | 154 CHECK_POSIX_DO( pthread_mutex_unlock( &queue->mtx ), /* continue */ ); |
155 | 155 |
156 return *buf; | 156 return *buf; |
157 error: | 157 error: |
158 CHECK_POSIX_DO( pthread_mutex_unlock( &queue->mtx ), /* continue */ ); | 158 CHECK_POSIX_DO( pthread_mutex_unlock( &queue->mtx ), /* continue */ ); |
159 return NULL; | 159 return NULL; |
160 } | 160 } |
161 | 161 |
162 /* Delete a queue. It must be empty. */ | 162 /* Delete a queue. It must be empty. */ |
163 int fd_fifo_del ( struct fifo ** queue ) | 163 int fd_fifo_del ( struct fifo ** queue ) |
164 { | 164 { |
165 struct fifo * q; | 165 struct fifo * q; |
166 int loops = 0; | 166 int loops = 0; |
167 | 167 |
168 TRACE_ENTRY( "%p", queue ); | 168 TRACE_ENTRY( "%p", queue ); |
169 | 169 |
170 if (queue && *queue == NULL) { | |
171 /* Queue already (in the process of being) deleted */ | |
172 return 0; | |
173 } | |
174 | |
170 CHECK_PARAMS( queue && CHECK_FIFO( *queue ) ); | 175 CHECK_PARAMS( queue && CHECK_FIFO( *queue ) ); |
171 | 176 |
172 q = *queue; | 177 q = *queue; |
173 | 178 |
174 CHECK_POSIX( pthread_mutex_lock( &q->mtx ) ); | 179 CHECK_POSIX( pthread_mutex_lock( &q->mtx ) ); |
175 | 180 |
176 if ((q->count != 0) || (q->data != NULL)) { | 181 if ((q->count != 0) || (q->data != NULL)) { |
177 TRACE_DEBUG(INFO, "The queue cannot be destroyed (%d, %p)", q->count, q->data); | 182 TRACE_DEBUG(INFO, "The queue cannot be destroyed (%d, %p)", q->count, q->data); |
178 CHECK_POSIX_DO( pthread_mutex_unlock( &q->mtx ), /* no fallback */ ); | 183 CHECK_POSIX_DO( pthread_mutex_unlock( &q->mtx ), /* no fallback */ ); |
179 return EINVAL; | 184 return EINVAL; |
180 } | 185 } |
181 | 186 |
182 /* Ok, now invalidate the queue */ | 187 /* Ok, now invalidate the queue */ |
183 q->eyec = 0xdead; | 188 q->eyec = 0xdead; |
184 | 189 |
185 /* Have all waiting threads return an error */ | 190 /* Have all waiting threads return an error */ |
186 while (q->thrs) { | 191 while (q->thrs) { |
187 CHECK_POSIX( pthread_mutex_unlock( &q->mtx )); | 192 CHECK_POSIX( pthread_mutex_unlock( &q->mtx )); |
188 CHECK_POSIX( pthread_cond_signal(&q->cond_pull) ); | 193 CHECK_POSIX( pthread_cond_signal(&q->cond_pull) ); |
189 usleep(1000); | 194 usleep(1000); |
190 | 195 |
191 CHECK_POSIX( pthread_mutex_lock( &q->mtx ) ); | 196 CHECK_POSIX( pthread_mutex_lock( &q->mtx ) ); |
192 ASSERT( ++loops < 20 ); /* detect infinite loops */ | 197 ASSERT( ++loops < 20 ); /* detect infinite loops */ |
193 } | 198 } |
194 | 199 |
195 /* sanity check */ | 200 /* sanity check */ |
196 ASSERT(FD_IS_LIST_EMPTY(&q->list)); | 201 ASSERT(FD_IS_LIST_EMPTY(&q->list)); |
197 | 202 |
198 /* And destroy it */ | 203 /* And destroy it */ |
199 CHECK_POSIX( pthread_mutex_unlock( &q->mtx ) ); | 204 CHECK_POSIX( pthread_mutex_unlock( &q->mtx ) ); |
200 | 205 |
201 CHECK_POSIX_DO( pthread_cond_destroy( &q->cond_pull ), ); | 206 CHECK_POSIX_DO( pthread_cond_destroy( &q->cond_pull ), ); |
202 | 207 |
203 CHECK_POSIX_DO( pthread_cond_destroy( &q->cond_push ), ); | 208 CHECK_POSIX_DO( pthread_cond_destroy( &q->cond_push ), ); |
204 | 209 |
205 CHECK_POSIX_DO( pthread_mutex_destroy( &q->mtx ), ); | 210 CHECK_POSIX_DO( pthread_mutex_destroy( &q->mtx ), ); |
206 | 211 |
207 free(q); | 212 free(q); |
208 *queue = NULL; | 213 *queue = NULL; |
209 | 214 |
210 return 0; | 215 return 0; |
211 } | 216 } |
212 | 217 |
213 /* Move the content of old into new, and update loc_update atomically. We leave the old queue empty but valid */ | 218 /* Move the content of old into new, and update loc_update atomically. We leave the old queue empty but valid */ |
214 int fd_fifo_move ( struct fifo * old, struct fifo * new, struct fifo ** loc_update ) | 219 int fd_fifo_move ( struct fifo * old, struct fifo * new, struct fifo ** loc_update ) |
215 { | 220 { |
216 int loops = 0; | 221 int loops = 0; |
217 | 222 |
218 TRACE_ENTRY("%p %p %p", old, new, loc_update); | 223 TRACE_ENTRY("%p %p %p", old, new, loc_update); |
219 CHECK_PARAMS( CHECK_FIFO( old ) && CHECK_FIFO( new )); | 224 CHECK_PARAMS( CHECK_FIFO( old ) && CHECK_FIFO( new )); |
220 | 225 |
221 CHECK_PARAMS( ! old->data ); | 226 CHECK_PARAMS( ! old->data ); |
222 if (new->high) { | 227 if (new->high) { |
223 TODO("Implement support for thresholds in fd_fifo_move..."); | 228 TODO("Implement support for thresholds in fd_fifo_move..."); |
224 } | 229 } |
225 | 230 |
226 /* Update loc_update */ | 231 /* Update loc_update */ |
227 if (loc_update) | 232 if (loc_update) |
228 *loc_update = new; | 233 *loc_update = new; |
229 | 234 |
230 /* Lock the queues */ | 235 /* Lock the queues */ |
231 CHECK_POSIX( pthread_mutex_lock( &old->mtx ) ); | 236 CHECK_POSIX( pthread_mutex_lock( &old->mtx ) ); |
232 | 237 |
233 CHECK_PARAMS_DO( (! old->thrs_push), { | 238 CHECK_PARAMS_DO( (! old->thrs_push), { |
234 pthread_mutex_unlock( &old->mtx ); | 239 pthread_mutex_unlock( &old->mtx ); |
235 return EINVAL; | 240 return EINVAL; |
236 } ); | 241 } ); |
237 | 242 |
238 CHECK_POSIX( pthread_mutex_lock( &new->mtx ) ); | 243 CHECK_POSIX( pthread_mutex_lock( &new->mtx ) ); |
239 | 244 |
240 /* Any waiting thread on the old queue returns an error */ | 245 /* Any waiting thread on the old queue returns an error */ |
241 old->eyec = 0xdead; | 246 old->eyec = 0xdead; |
242 while (old->thrs) { | 247 while (old->thrs) { |
243 CHECK_POSIX( pthread_mutex_unlock( &old->mtx )); | 248 CHECK_POSIX( pthread_mutex_unlock( &old->mtx )); |
244 CHECK_POSIX( pthread_cond_signal( &old->cond_pull ) ); | 249 CHECK_POSIX( pthread_cond_signal( &old->cond_pull ) ); |
245 usleep(1000); | 250 usleep(1000); |
246 | 251 |
247 CHECK_POSIX( pthread_mutex_lock( &old->mtx ) ); | 252 CHECK_POSIX( pthread_mutex_lock( &old->mtx ) ); |
248 ASSERT( loops < 20 ); /* detect infinite loops */ | 253 ASSERT( loops < 20 ); /* detect infinite loops */ |
249 } | 254 } |
250 | 255 |
251 /* Move all data from old to new */ | 256 /* Move all data from old to new */ |
252 fd_list_move_end( &new->list, &old->list ); | 257 fd_list_move_end( &new->list, &old->list ); |
253 if (old->count && (!new->count)) { | 258 if (old->count && (!new->count)) { |
254 CHECK_POSIX( pthread_cond_signal(&new->cond_pull) ); | 259 CHECK_POSIX( pthread_cond_signal(&new->cond_pull) ); |
255 } | 260 } |
256 new->count += old->count; | 261 new->count += old->count; |
257 | 262 |
258 /* Reset old */ | 263 /* Reset old */ |
259 old->count = 0; | 264 old->count = 0; |
260 old->eyec = FIFO_EYEC; | 265 old->eyec = FIFO_EYEC; |
261 | 266 |
262 /* Merge the stats in the new queue */ | 267 /* Merge the stats in the new queue */ |
263 new->total_items += old->total_items; | 268 new->total_items += old->total_items; |
264 old->total_items = 0; | 269 old->total_items = 0; |
265 | 270 |
266 new->total_time.tv_nsec += old->total_time.tv_nsec; | 271 new->total_time.tv_nsec += old->total_time.tv_nsec; |
267 new->total_time.tv_sec += old->total_time.tv_sec + (new->total_time.tv_nsec / 1000000000); | 272 new->total_time.tv_sec += old->total_time.tv_sec + (new->total_time.tv_nsec / 1000000000); |
268 new->total_time.tv_nsec %= 1000000000; | 273 new->total_time.tv_nsec %= 1000000000; |
269 old->total_time.tv_nsec = 0; | 274 old->total_time.tv_nsec = 0; |
270 old->total_time.tv_sec = 0; | 275 old->total_time.tv_sec = 0; |
271 | 276 |
272 new->blocking_time.tv_nsec += old->blocking_time.tv_nsec; | 277 new->blocking_time.tv_nsec += old->blocking_time.tv_nsec; |
273 new->blocking_time.tv_sec += old->blocking_time.tv_sec + (new->blocking_time.tv_nsec / 1000000000); | 278 new->blocking_time.tv_sec += old->blocking_time.tv_sec + (new->blocking_time.tv_nsec / 1000000000); |
274 new->blocking_time.tv_nsec %= 1000000000; | 279 new->blocking_time.tv_nsec %= 1000000000; |
275 old->blocking_time.tv_nsec = 0; | 280 old->blocking_time.tv_nsec = 0; |
276 old->blocking_time.tv_sec = 0; | 281 old->blocking_time.tv_sec = 0; |
277 | 282 |
278 /* Unlock, we're done */ | 283 /* Unlock, we're done */ |
279 CHECK_POSIX( pthread_mutex_unlock( &new->mtx ) ); | 284 CHECK_POSIX( pthread_mutex_unlock( &new->mtx ) ); |
280 CHECK_POSIX( pthread_mutex_unlock( &old->mtx ) ); | 285 CHECK_POSIX( pthread_mutex_unlock( &old->mtx ) ); |
281 | 286 |
282 return 0; | 287 return 0; |
283 } | 288 } |
284 | 289 |
285 /* Get the information on the queue */ | 290 /* Get the information on the queue */ |
286 int fd_fifo_getstats( struct fifo * queue, int * current_count, int * limit_count, int * highest_count, long long * total_count, | 291 int fd_fifo_getstats( struct fifo * queue, int * current_count, int * limit_count, int * highest_count, long long * total_count, |
287 struct timespec * total, struct timespec * blocking, struct timespec * last) | 292 struct timespec * total, struct timespec * blocking, struct timespec * last) |
288 { | 293 { |
289 TRACE_ENTRY( "%p %p %p %p %p %p %p %p", queue, current_count, limit_count, highest_count, total_count, total, blocking, last); | 294 TRACE_ENTRY( "%p %p %p %p %p %p %p %p", queue, current_count, limit_count, highest_count, total_count, total, blocking, last); |
290 | 295 |
291 /* Check the parameters */ | 296 /* Check the parameters */ |
292 CHECK_PARAMS( CHECK_FIFO( queue ) ); | 297 CHECK_PARAMS( CHECK_FIFO( queue ) ); |
293 | 298 |
294 /* lock the queue */ | 299 /* lock the queue */ |
295 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); | 300 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); |
296 | 301 |
297 if (current_count) | 302 if (current_count) |
298 *current_count = queue->count; | 303 *current_count = queue->count; |
299 | 304 |
300 if (limit_count) | 305 if (limit_count) |
301 *limit_count = queue->max; | 306 *limit_count = queue->max; |
302 | 307 |
303 if (highest_count) | 308 if (highest_count) |
304 *highest_count = queue->highest_ever; | 309 *highest_count = queue->highest_ever; |
305 | 310 |
306 if (total_count) | 311 if (total_count) |
307 *total_count = queue->total_items; | 312 *total_count = queue->total_items; |
308 | 313 |
309 if (total) | 314 if (total) |
310 memcpy(total, &queue->total_time, sizeof(struct timespec)); | 315 memcpy(total, &queue->total_time, sizeof(struct timespec)); |
311 | 316 |
312 if (blocking) | 317 if (blocking) |
313 memcpy(blocking, &queue->blocking_time, sizeof(struct timespec)); | 318 memcpy(blocking, &queue->blocking_time, sizeof(struct timespec)); |
314 | 319 |
315 if (last) | 320 if (last) |
316 memcpy(last, &queue->last_time, sizeof(struct timespec)); | 321 memcpy(last, &queue->last_time, sizeof(struct timespec)); |
317 | 322 |
318 /* Unlock */ | 323 /* Unlock */ |
319 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); | 324 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); |
320 | 325 |
321 /* Done */ | 326 /* Done */ |
322 return 0; | 327 return 0; |
323 } | 328 } |
324 | 329 |
325 | 330 |
326 /* alternate version with no error checking */ | 331 /* alternate version with no error checking */ |
327 int fd_fifo_length ( struct fifo * queue ) | 332 int fd_fifo_length ( struct fifo * queue ) |
328 { | 333 { |
329 if ( !CHECK_FIFO( queue ) ) | 334 if ( !CHECK_FIFO( queue ) ) |
330 return 0; | 335 return 0; |
331 | 336 |
332 return queue->count; /* Let's hope it's read atomically, since we are not locking... */ | 337 return queue->count; /* Let's hope it's read atomically, since we are not locking... */ |
333 } | 338 } |
334 | 339 |
335 /* Set the thresholds of the queue */ | 340 /* Set the thresholds of the queue */ |
336 int fd_fifo_setthrhd ( struct fifo * queue, void * data, uint16_t high, void (*h_cb)(struct fifo *, void **), uint16_t low, void (*l_cb)(struct fifo *, void **) ) | 341 int fd_fifo_setthrhd ( struct fifo * queue, void * data, uint16_t high, void (*h_cb)(struct fifo *, void **), uint16_t low, void (*l_cb)(struct fifo *, void **) ) |
337 { | 342 { |
338 TRACE_ENTRY( "%p %p %hu %p %hu %p", queue, data, high, h_cb, low, l_cb ); | 343 TRACE_ENTRY( "%p %p %hu %p %hu %p", queue, data, high, h_cb, low, l_cb ); |
339 | 344 |
340 /* Check the parameters */ | 345 /* Check the parameters */ |
341 CHECK_PARAMS( CHECK_FIFO( queue ) && (high > low) && (queue->data == NULL) ); | 346 CHECK_PARAMS( CHECK_FIFO( queue ) && (high > low) && (queue->data == NULL) ); |
342 | 347 |
343 /* lock the queue */ | 348 /* lock the queue */ |
344 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); | 349 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); |
345 | 350 |
346 /* Save the values */ | 351 /* Save the values */ |
347 queue->high = high; | 352 queue->high = high; |
348 queue->low = low; | 353 queue->low = low; |
349 queue->data = data; | 354 queue->data = data; |
350 queue->h_cb = h_cb; | 355 queue->h_cb = h_cb; |
351 queue->l_cb = l_cb; | 356 queue->l_cb = l_cb; |
352 | 357 |
353 /* Unlock */ | 358 /* Unlock */ |
354 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); | 359 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); |
355 | 360 |
356 /* Done */ | 361 /* Done */ |
357 return 0; | 362 return 0; |
358 } | 363 } |
359 | 364 |
360 | 365 |
361 /* This handler is called when a thread is blocked on a queue, and cancelled */ | 366 /* This handler is called when a thread is blocked on a queue, and cancelled */ |
362 static void fifo_cleanup_push(void * queue) | 367 static void fifo_cleanup_push(void * queue) |
363 { | 368 { |
364 struct fifo * q = (struct fifo *)queue; | 369 struct fifo * q = (struct fifo *)queue; |
365 TRACE_ENTRY( "%p", queue ); | 370 TRACE_ENTRY( "%p", queue ); |
366 | 371 |
367 /* The thread has been cancelled, therefore it does not wait on the queue anymore */ | 372 /* The thread has been cancelled, therefore it does not wait on the queue anymore */ |
368 q->thrs_push--; | 373 q->thrs_push--; |
369 | 374 |
370 /* Now unlock the queue, and we're done */ | 375 /* Now unlock the queue, and we're done */ |
371 CHECK_POSIX_DO( pthread_mutex_unlock( &q->mtx ), /* nothing */ ); | 376 CHECK_POSIX_DO( pthread_mutex_unlock( &q->mtx ), /* nothing */ ); |
372 | 377 |
373 /* End of cleanup handler */ | 378 /* End of cleanup handler */ |
374 return; | 379 return; |
375 } | 380 } |
376 | 381 |
377 | 382 |
379 int fd_fifo_post_internal ( struct fifo * queue, void ** item, int skip_max ) | 384 int fd_fifo_post_internal ( struct fifo * queue, void ** item, int skip_max ) |
380 { | 385 { |
381 struct fifo_item * new; | 386 struct fifo_item * new; |
382 int call_cb = 0; | 387 int call_cb = 0; |
383 struct timespec posted_on, queued_on; | 388 struct timespec posted_on, queued_on; |
384 | 389 |
385 /* Get the timing of this call */ | 390 /* Get the timing of this call */ |
386 CHECK_SYS( clock_gettime(CLOCK_REALTIME, &posted_on) ); | 391 CHECK_SYS( clock_gettime(CLOCK_REALTIME, &posted_on) ); |
387 | 392 |
388 /* lock the queue */ | 393 /* lock the queue */ |
389 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); | 394 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); |
390 | 395 |
391 if ((!skip_max) && (queue->max)) { | 396 if ((!skip_max) && (queue->max)) { |
392 while (queue->count >= queue->max) { | 397 while (queue->count >= queue->max) { |
393 int ret = 0; | 398 int ret = 0; |
394 | 399 |
395 /* We have to wait for an item to be pulled */ | 400 /* We have to wait for an item to be pulled */ |
396 queue->thrs_push++ ; | 401 queue->thrs_push++ ; |
397 pthread_cleanup_push( fifo_cleanup_push, queue); | 402 pthread_cleanup_push( fifo_cleanup_push, queue); |
398 ret = pthread_cond_wait( &queue->cond_push, &queue->mtx ); | 403 ret = pthread_cond_wait( &queue->cond_push, &queue->mtx ); |
399 pthread_cleanup_pop(0); | 404 pthread_cleanup_pop(0); |
400 queue->thrs_push-- ; | 405 queue->thrs_push-- ; |
401 | 406 |
402 ASSERT( ret == 0 ); | 407 ASSERT( ret == 0 ); |
403 } | 408 } |
404 } | 409 } |
405 | 410 |
406 /* Create a new list item */ | 411 /* Create a new list item */ |
407 CHECK_MALLOC_DO( new = malloc (sizeof (struct fifo_item)) , { | 412 CHECK_MALLOC_DO( new = malloc (sizeof (struct fifo_item)) , { |
408 pthread_mutex_unlock( &queue->mtx ); | 413 pthread_mutex_unlock( &queue->mtx ); |
409 return ENOMEM; | 414 return ENOMEM; |
410 } ); | 415 } ); |
411 | 416 |
412 fd_list_init(&new->item, *item); | 417 fd_list_init(&new->item, *item); |
413 *item = NULL; | 418 *item = NULL; |
414 | 419 |
415 /* Add the new item at the end */ | 420 /* Add the new item at the end */ |
416 fd_list_insert_before( &queue->list, &new->item); | 421 fd_list_insert_before( &queue->list, &new->item); |
417 queue->count++; | 422 queue->count++; |
418 if (queue->highest_ever < queue->count) | 423 if (queue->highest_ever < queue->count) |
419 queue->highest_ever = queue->count; | 424 queue->highest_ever = queue->count; |
420 if (queue->high && ((queue->count % queue->high) == 0)) { | 425 if (queue->high && ((queue->count % queue->high) == 0)) { |
421 call_cb = 1; | 426 call_cb = 1; |
422 queue->highest = queue->count; | 427 queue->highest = queue->count; |
423 } | 428 } |
424 | 429 |
425 /* store timing */ | 430 /* store timing */ |
426 memcpy(&new->posted_on, &posted_on, sizeof(struct timespec)); | 431 memcpy(&new->posted_on, &posted_on, sizeof(struct timespec)); |
427 | 432 |
428 /* update queue timing info "blocking time" */ | 433 /* update queue timing info "blocking time" */ |
429 { | 434 { |
430 long long blocked_ns; | 435 long long blocked_ns; |
431 CHECK_SYS( clock_gettime(CLOCK_REALTIME, &queued_on) ); | 436 CHECK_SYS( clock_gettime(CLOCK_REALTIME, &queued_on) ); |
432 blocked_ns = (queued_on.tv_sec - posted_on.tv_sec) * 1000000000; | 437 blocked_ns = (queued_on.tv_sec - posted_on.tv_sec) * 1000000000; |
433 blocked_ns += (queued_on.tv_nsec - posted_on.tv_nsec); | 438 blocked_ns += (queued_on.tv_nsec - posted_on.tv_nsec); |
434 blocked_ns += queue->blocking_time.tv_nsec; | 439 blocked_ns += queue->blocking_time.tv_nsec; |
435 queue->blocking_time.tv_sec += blocked_ns / 1000000000; | 440 queue->blocking_time.tv_sec += blocked_ns / 1000000000; |
436 queue->blocking_time.tv_nsec = blocked_ns % 1000000000; | 441 queue->blocking_time.tv_nsec = blocked_ns % 1000000000; |
437 } | 442 } |
438 | 443 |
439 /* Signal if threads are asleep */ | 444 /* Signal if threads are asleep */ |
440 if (queue->thrs > 0) { | 445 if (queue->thrs > 0) { |
441 CHECK_POSIX( pthread_cond_signal(&queue->cond_pull) ); | 446 CHECK_POSIX( pthread_cond_signal(&queue->cond_pull) ); |
442 } | 447 } |
443 if (queue->thrs_push > 0) { | 448 if (queue->thrs_push > 0) { |
444 /* cascade */ | 449 /* cascade */ |
445 CHECK_POSIX( pthread_cond_signal(&queue->cond_push) ); | 450 CHECK_POSIX( pthread_cond_signal(&queue->cond_push) ); |
446 } | 451 } |
447 | 452 |
448 /* Unlock */ | 453 /* Unlock */ |
449 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); | 454 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); |
450 | 455 |
451 /* Call high-watermark cb as needed */ | 456 /* Call high-watermark cb as needed */ |
452 if (call_cb && queue->h_cb) | 457 if (call_cb && queue->h_cb) |
453 (*queue->h_cb)(queue, &queue->data); | 458 (*queue->h_cb)(queue, &queue->data); |
454 | 459 |
455 /* Done */ | 460 /* Done */ |
456 return 0; | 461 return 0; |
457 } | 462 } |
458 | 463 |
459 /* Post a new item in the queue */ | 464 /* Post a new item in the queue */ |
460 int fd_fifo_post_int ( struct fifo * queue, void ** item ) | 465 int fd_fifo_post_int ( struct fifo * queue, void ** item ) |
461 { | 466 { |
462 TRACE_ENTRY( "%p %p", queue, item ); | 467 TRACE_ENTRY( "%p %p", queue, item ); |
463 | 468 |
464 /* Check the parameters */ | 469 /* Check the parameters */ |
465 CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item ); | 470 CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item ); |
466 | 471 |
467 return fd_fifo_post_internal ( queue,item, 0 ); | 472 return fd_fifo_post_internal ( queue,item, 0 ); |
468 | 473 |
469 } | 474 } |
470 | 475 |
471 /* Post a new item in the queue, not blocking */ | 476 /* Post a new item in the queue, not blocking */ |
472 int fd_fifo_post_noblock ( struct fifo * queue, void ** item ) | 477 int fd_fifo_post_noblock ( struct fifo * queue, void ** item ) |
473 { | 478 { |
474 TRACE_ENTRY( "%p %p", queue, item ); | 479 TRACE_ENTRY( "%p %p", queue, item ); |
475 | 480 |
476 /* Check the parameters */ | 481 /* Check the parameters */ |
477 CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item ); | 482 CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item ); |
478 | 483 |
479 return fd_fifo_post_internal ( queue,item, 1 ); | 484 return fd_fifo_post_internal ( queue,item, 1 ); |
480 | 485 |
481 } | 486 } |
482 | 487 |
483 /* Pop the first item from the queue */ | 488 /* Pop the first item from the queue */ |
484 static void * mq_pop(struct fifo * queue) | 489 static void * mq_pop(struct fifo * queue) |
485 { | 490 { |
486 void * ret = NULL; | 491 void * ret = NULL; |
487 struct fifo_item * fi; | 492 struct fifo_item * fi; |
488 struct timespec now; | 493 struct timespec now; |
489 | 494 |
490 ASSERT( ! FD_IS_LIST_EMPTY(&queue->list) ); | 495 ASSERT( ! FD_IS_LIST_EMPTY(&queue->list) ); |
491 | 496 |
492 fi = (struct fifo_item *)(queue->list.next); | 497 fi = (struct fifo_item *)(queue->list.next); |
493 ret = fi->item.o; | 498 ret = fi->item.o; |
494 fd_list_unlink(&fi->item); | 499 fd_list_unlink(&fi->item); |
495 queue->count--; | 500 queue->count--; |
496 queue->total_items++; | 501 queue->total_items++; |
497 | 502 |
498 /* Update the timings */ | 503 /* Update the timings */ |
499 CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), goto skip_timing ); | 504 CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), goto skip_timing ); |
500 { | 505 { |
501 long long elapsed = (now.tv_sec - fi->posted_on.tv_sec) * 1000000000; | 506 long long elapsed = (now.tv_sec - fi->posted_on.tv_sec) * 1000000000; |
502 elapsed += now.tv_nsec - fi->posted_on.tv_nsec; | 507 elapsed += now.tv_nsec - fi->posted_on.tv_nsec; |
503 | 508 |
504 queue->last_time.tv_sec = elapsed / 1000000000; | 509 queue->last_time.tv_sec = elapsed / 1000000000; |
505 queue->last_time.tv_nsec = elapsed % 1000000000; | 510 queue->last_time.tv_nsec = elapsed % 1000000000; |
506 | 511 |
507 elapsed += queue->total_time.tv_nsec; | 512 elapsed += queue->total_time.tv_nsec; |
508 queue->total_time.tv_sec += elapsed / 1000000000; | 513 queue->total_time.tv_sec += elapsed / 1000000000; |
509 queue->total_time.tv_nsec = elapsed % 1000000000; | 514 queue->total_time.tv_nsec = elapsed % 1000000000; |
510 } | 515 } |
511 skip_timing: | 516 skip_timing: |
512 free(fi); | 517 free(fi); |
513 | 518 |
514 if (queue->thrs_push) { | 519 if (queue->thrs_push) { |
515 CHECK_POSIX_DO( pthread_cond_signal( &queue->cond_push ), ); | 520 CHECK_POSIX_DO( pthread_cond_signal( &queue->cond_push ), ); |
516 } | 521 } |
517 | 522 |
518 return ret; | 523 return ret; |
519 } | 524 } |
520 | 525 |
521 /* Check if the low watermark callback must be called. */ | 526 /* Check if the low watermark callback must be called. */ |
522 static __inline__ int test_l_cb(struct fifo * queue) | 527 static __inline__ int test_l_cb(struct fifo * queue) |
523 { | 528 { |
524 if ((queue->high == 0) || (queue->low == 0) || (queue->l_cb == 0)) | 529 if ((queue->high == 0) || (queue->low == 0) || (queue->l_cb == 0)) |
525 return 0; | 530 return 0; |
526 | 531 |
527 if (((queue->count % queue->high) == queue->low) && (queue->highest > queue->count)) { | 532 if (((queue->count % queue->high) == queue->low) && (queue->highest > queue->count)) { |
528 queue->highest -= queue->high; | 533 queue->highest -= queue->high; |
529 return 1; | 534 return 1; |
530 } | 535 } |
531 | 536 |
532 return 0; | 537 return 0; |
533 } | 538 } |
534 | 539 |
535 /* Try poping an item */ | 540 /* Try poping an item */ |
536 int fd_fifo_tryget_int ( struct fifo * queue, void ** item ) | 541 int fd_fifo_tryget_int ( struct fifo * queue, void ** item ) |
537 { | 542 { |
538 int wouldblock = 0; | 543 int wouldblock = 0; |
539 int call_cb = 0; | 544 int call_cb = 0; |
540 | 545 |
541 TRACE_ENTRY( "%p %p", queue, item ); | 546 TRACE_ENTRY( "%p %p", queue, item ); |
542 | 547 |
543 /* Check the parameters */ | 548 /* Check the parameters */ |
544 CHECK_PARAMS( CHECK_FIFO( queue ) && item ); | 549 CHECK_PARAMS( CHECK_FIFO( queue ) && item ); |
545 | 550 |
546 /* lock the queue */ | 551 /* lock the queue */ |
547 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); | 552 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); |
548 | 553 |
549 /* Check queue status */ | 554 /* Check queue status */ |
550 if (queue->count > 0) { | 555 if (queue->count > 0) { |
551 got_item: | 556 got_item: |
552 /* There are elements in the queue, so pick the first one */ | 557 /* There are elements in the queue, so pick the first one */ |
553 *item = mq_pop(queue); | 558 *item = mq_pop(queue); |
560 usleep(1000); | 565 usleep(1000); |
561 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); | 566 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); |
562 if (queue->count > 0) | 567 if (queue->count > 0) |
563 goto got_item; | 568 goto got_item; |
564 } | 569 } |
565 | 570 |
566 wouldblock = 1; | 571 wouldblock = 1; |
567 *item = NULL; | 572 *item = NULL; |
568 } | 573 } |
569 | 574 |
570 /* Unlock */ | 575 /* Unlock */ |
571 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); | 576 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); |
572 | 577 |
573 /* Call low watermark callback as needed */ | 578 /* Call low watermark callback as needed */ |
574 if (call_cb) | 579 if (call_cb) |
575 (*queue->l_cb)(queue, &queue->data); | 580 (*queue->l_cb)(queue, &queue->data); |
576 | 581 |
577 /* Done */ | 582 /* Done */ |
578 return wouldblock ? EWOULDBLOCK : 0; | 583 return wouldblock ? EWOULDBLOCK : 0; |
579 } | 584 } |
580 | 585 |
581 /* This handler is called when a thread is blocked on a queue, and cancelled */ | 586 /* This handler is called when a thread is blocked on a queue, and cancelled */ |
582 static void fifo_cleanup(void * queue) | 587 static void fifo_cleanup(void * queue) |
583 { | 588 { |
584 struct fifo * q = (struct fifo *)queue; | 589 struct fifo * q = (struct fifo *)queue; |
585 TRACE_ENTRY( "%p", queue ); | 590 TRACE_ENTRY( "%p", queue ); |
586 | 591 |
587 /* The thread has been cancelled, therefore it does not wait on the queue anymore */ | 592 /* The thread has been cancelled, therefore it does not wait on the queue anymore */ |
588 q->thrs--; | 593 q->thrs--; |
589 | 594 |
590 /* Now unlock the queue, and we're done */ | 595 /* Now unlock the queue, and we're done */ |
591 CHECK_POSIX_DO( pthread_mutex_unlock( &q->mtx ), /* nothing */ ); | 596 CHECK_POSIX_DO( pthread_mutex_unlock( &q->mtx ), /* nothing */ ); |
592 | 597 |
593 /* End of cleanup handler */ | 598 /* End of cleanup handler */ |
594 return; | 599 return; |
595 } | 600 } |
596 | 601 |
597 /* The internal function for fd_fifo_timedget and fd_fifo_get */ | 602 /* The internal function for fd_fifo_timedget and fd_fifo_get */ |
598 static int fifo_tget ( struct fifo * queue, void ** item, int istimed, const struct timespec *abstime) | 603 static int fifo_tget ( struct fifo * queue, void ** item, int istimed, const struct timespec *abstime) |
599 { | 604 { |
600 int call_cb = 0; | 605 int call_cb = 0; |
601 int ret = 0; | 606 int ret = 0; |
602 | 607 |
603 /* Check the parameters */ | 608 /* Check the parameters */ |
604 CHECK_PARAMS( CHECK_FIFO( queue ) && item && (abstime || !istimed) ); | 609 CHECK_PARAMS( CHECK_FIFO( queue ) && item && (abstime || !istimed) ); |
605 | 610 |
606 /* Initialize the return value */ | 611 /* Initialize the return value */ |
607 *item = NULL; | 612 *item = NULL; |
608 | 613 |
609 /* lock the queue */ | 614 /* lock the queue */ |
610 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); | 615 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); |
611 | 616 |
612 awaken: | 617 awaken: |
613 /* Check queue status */ | 618 /* Check queue status */ |
614 if (!CHECK_FIFO( queue )) { | 619 if (!CHECK_FIFO( queue )) { |
615 /* The queue is being destroyed */ | 620 /* The queue is being destroyed */ |
616 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); | 621 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); |
617 TRACE_DEBUG(FULL, "The queue is being destroyed -> EPIPE"); | 622 TRACE_DEBUG(FULL, "The queue is being destroyed -> EPIPE"); |
618 return EPIPE; | 623 return EPIPE; |
619 } | 624 } |
620 | 625 |
621 if (queue->count > 0) { | 626 if (queue->count > 0) { |
622 /* There are items in the queue, so pick the first one */ | 627 /* There are items in the queue, so pick the first one */ |
623 *item = mq_pop(queue); | 628 *item = mq_pop(queue); |
624 call_cb = test_l_cb(queue); | 629 call_cb = test_l_cb(queue); |
625 } else { | 630 } else { |
633 } | 638 } |
634 pthread_cleanup_pop(0); | 639 pthread_cleanup_pop(0); |
635 queue->thrs-- ; | 640 queue->thrs-- ; |
636 if (ret == 0) | 641 if (ret == 0) |
637 goto awaken; /* test for spurious wake-ups */ | 642 goto awaken; /* test for spurious wake-ups */ |
638 | 643 |
639 /* otherwise (ETIMEDOUT / other error) just continue */ | 644 /* otherwise (ETIMEDOUT / other error) just continue */ |
640 } | 645 } |
641 | 646 |
642 /* Unlock */ | 647 /* Unlock */ |
643 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); | 648 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); |
644 | 649 |
645 /* Call low watermark callback as needed */ | 650 /* Call low watermark callback as needed */ |
646 if (call_cb) | 651 if (call_cb) |
647 (*queue->l_cb)(queue, &queue->data); | 652 (*queue->l_cb)(queue, &queue->data); |
648 | 653 |
649 /* Done */ | 654 /* Done */ |
650 return ret; | 655 return ret; |
651 } | 656 } |
652 | 657 |
653 /* Get the next available item, block until there is one */ | 658 /* Get the next available item, block until there is one */ |
667 /* Test if data is available in the queue, without pulling it */ | 672 /* Test if data is available in the queue, without pulling it */ |
668 int fd_fifo_select ( struct fifo * queue, const struct timespec *abstime ) | 673 int fd_fifo_select ( struct fifo * queue, const struct timespec *abstime ) |
669 { | 674 { |
670 int ret = 0; | 675 int ret = 0; |
671 TRACE_ENTRY( "%p %p", queue, abstime ); | 676 TRACE_ENTRY( "%p %p", queue, abstime ); |
672 | 677 |
673 CHECK_PARAMS_DO( CHECK_FIFO( queue ), return -EINVAL ); | 678 CHECK_PARAMS_DO( CHECK_FIFO( queue ), return -EINVAL ); |
674 | 679 |
675 /* lock the queue */ | 680 /* lock the queue */ |
676 CHECK_POSIX_DO( pthread_mutex_lock( &queue->mtx ), return -__ret__ ); | 681 CHECK_POSIX_DO( pthread_mutex_lock( &queue->mtx ), return -__ret__ ); |
677 | 682 |
678 awaken: | 683 awaken: |
679 ret = (queue->count > 0 ) ? queue->count : 0; | 684 ret = (queue->count > 0 ) ? queue->count : 0; |
680 if ((ret == 0) && (abstime != NULL)) { | 685 if ((ret == 0) && (abstime != NULL)) { |
681 /* We have to wait for a new item */ | 686 /* We have to wait for a new item */ |
682 queue->thrs++ ; | 687 queue->thrs++ ; |
683 pthread_cleanup_push( fifo_cleanup, queue); | 688 pthread_cleanup_push( fifo_cleanup, queue); |
684 ret = pthread_cond_timedwait( &queue->cond_pull, &queue->mtx, abstime ); | 689 ret = pthread_cond_timedwait( &queue->cond_pull, &queue->mtx, abstime ); |
685 pthread_cleanup_pop(0); | 690 pthread_cleanup_pop(0); |
686 queue->thrs-- ; | 691 queue->thrs-- ; |
687 if (ret == 0) | 692 if (ret == 0) |
688 goto awaken; /* test for spurious wake-ups */ | 693 goto awaken; /* test for spurious wake-ups */ |
689 | 694 |
690 if (ret == ETIMEDOUT) | 695 if (ret == ETIMEDOUT) |
691 ret = 0; | 696 ret = 0; |
692 else | 697 else |
693 ret = -ret; | 698 ret = -ret; |
694 } | 699 } |
695 | 700 |
696 /* Unlock */ | 701 /* Unlock */ |
697 CHECK_POSIX_DO( pthread_mutex_unlock( &queue->mtx ), return -__ret__ ); | 702 CHECK_POSIX_DO( pthread_mutex_unlock( &queue->mtx ), return -__ret__ ); |
698 | 703 |
699 return ret; | 704 return ret; |
700 } | 705 } |