Navigation


source: freeDiameter/libfdproto/fifo.c

Last change on this file was 1554:566bb46cc73f, checked in by Sebastien Decugis <sdecugis@freediameter.net>, 5 months ago

Updated copyright information

File size: 21.2 KB
Line 
1/*********************************************************************************************************
2* Software License Agreement (BSD License)                                                               *
3* Author: Sebastien Decugis <sdecugis@freediameter.net>                                                  *
4*                                                                                                        *
5* Copyright (c) 2020, WIDE Project and NICT                                                              *
6* All rights reserved.                                                                                   *
7*                                                                                                        *
8* Redistribution and use of this software in source and binary forms, with or without modification, are  *
9* permitted provided that the following conditions are met:                                              *
10*                                                                                                        *
11* * Redistributions of source code must retain the above                                                 *
12*   copyright notice, this list of conditions and the                                                    *
13*   following disclaimer.                                                                                *
14*                                                                                                        *
15* * Redistributions in binary form must reproduce the above                                              *
16*   copyright notice, this list of conditions and the                                                    *
17*   following disclaimer in the documentation and/or other                                               *
18*   materials provided with the distribution.                                                            *
19*                                                                                                        *
20* * Neither the name of the WIDE Project or NICT nor the                                                 *
21*   names of its contributors may be used to endorse or                                                  *
22*   promote products derived from this software without                                                  *
23*   specific prior written permission of WIDE Project and                                                *
24*   NICT.                                                                                                *
25*                                                                                                        *
26* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
27* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
28* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
29* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT     *
30* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS    *
31* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
32* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF   *
33* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.                                                             *
34*********************************************************************************************************/
35
36/* FIFO queues module.
37 *
38 * The threads that call these functions must be in the cancellation state PTHREAD_CANCEL_ENABLE and type PTHREAD_CANCEL_DEFERRED.
39 * This is the default state and type on thread creation.
40 *
41 * In order to destroy properly a queue, the application must:
42 *  -> shutdown any process that can add into the queue first.
43 *  -> pthread_cancel any thread that could be waiting on the queue.
44 *  -> consume any element that is in the queue, using fd_qu_tryget_int.
45 *  -> then destroy the queue using fd_mq_del.
46 */
47
48#include "fdproto-internal.h"
49
50/* Definition of a FIFO queue object */
51struct fifo {
52        int             eyec;   /* An eye catcher, also used to check a queue is valid. FIFO_EYEC */
53
54        pthread_mutex_t mtx;    /* Mutex protecting this queue */
55        pthread_cond_t  cond_pull;      /* condition variable for pulling threads */
56        pthread_cond_t  cond_push;      /* condition variable for pushing threads */
57
58        struct fd_list  list;   /* sentinel for the list of elements */
59        int             count;  /* number of objects in the list */
60        int             thrs;   /* number of threads waiting for a new element (when count is 0) */
61
62        int             max;    /* maximum number of items to accept if not 0 */
63        int             thrs_push; /* number of threads waitnig to push an item */
64
65        uint16_t        high;   /* High level threshold (see libfreeDiameter.h for details) */
66        uint16_t        low;    /* Low level threshhold */
67        void            *data;  /* Opaque pointer for threshold callbacks */
68        void            (*h_cb)(struct fifo *, void **); /* The callbacks */
69        void            (*l_cb)(struct fifo *, void **);
70        int             highest;/* The highest count value for which h_cb has been called */
71        int             highest_ever; /* The max count value this queue has reached (for tweaking) */
72
73        long long       total_items;   /* Cumulated number of items that went through this fifo (excluding current count), always increasing. */
74        struct timespec total_time;    /* Cumulated time all items spent in this queue, including blocking time (always growing, use deltas for monitoring) */
75        struct timespec blocking_time; /* Cumulated time threads trying to post new items were blocked (queue full). */
76        struct timespec last_time;     /* For the last element retrieved from the queue, how long it take between posting (including blocking) and poping */
77
78};
79
80struct fifo_item {
81        struct fd_list   item;
82        struct timespec  posted_on;
83};
84
85/* The eye catcher value */
86#define FIFO_EYEC       0xe7ec1130
87
88/* Macro to check a pointer */
89#define CHECK_FIFO( _queue ) (( (_queue) != NULL) && ( (_queue)->eyec == FIFO_EYEC) )
90
91
92/* Create a new queue, with max number of items -- use 0 for no max */
93int fd_fifo_new ( struct fifo ** queue, int max )
94{
95        struct fifo * new;
96
97        TRACE_ENTRY( "%p", queue );
98
99        CHECK_PARAMS( queue );
100
101        /* Create a new object */
102        CHECK_MALLOC( new = malloc (sizeof (struct fifo) )  );
103
104        /* Initialize the content */
105        memset(new, 0, sizeof(struct fifo));
106
107        new->eyec = FIFO_EYEC;
108        CHECK_POSIX( pthread_mutex_init(&new->mtx, NULL) );
109        CHECK_POSIX( pthread_cond_init(&new->cond_pull, NULL) );
110        CHECK_POSIX( pthread_cond_init(&new->cond_push, NULL) );
111        new->max = max;
112
113        fd_list_init(&new->list, NULL);
114
115        /* We're done */
116        *queue = new;
117        return 0;
118}
119
120int fd_fifo_set_max (struct fifo * queue, int max)
121{
122    queue->max = max;
123    return 0;
124}
125
126
127/* Dump the content of a queue */
128DECLARE_FD_DUMP_PROTOTYPE(fd_fifo_dump, char * name, struct fifo * queue, fd_fifo_dump_item_cb dump_item)
129{
130        FD_DUMP_HANDLE_OFFSET();
131
132        if (name) {
133                CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "'%s'(@%p): ", name, queue), return NULL);
134        } else {
135                CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "{fifo}(@%p): ", queue), return NULL);
136        }
137
138        if (!CHECK_FIFO( queue )) {
139                return fd_dump_extend(FD_DUMP_STD_PARAMS, "INVALID/NULL");
140        }
141
142        CHECK_POSIX_DO(  pthread_mutex_lock( &queue->mtx ), /* continue */  );
143        CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "items:%d,%d,%d threads:%d,%d stats:%lld/%ld.%06ld,%ld.%06ld,%ld.%06ld thresholds:%d,%d,%d,%p,%p,%p",
144                                                queue->count, queue->highest_ever, queue->max,
145                                                queue->thrs, queue->thrs_push,
146                                                queue->total_items,(long)queue->total_time.tv_sec,(long)(queue->total_time.tv_nsec/1000),(long)queue->blocking_time.tv_sec,(long)(queue->blocking_time.tv_nsec/1000),(long)queue->last_time.tv_sec,(long)(queue->last_time.tv_nsec/1000),
147                                                queue->high, queue->low, queue->highest, queue->h_cb, queue->l_cb, queue->data),
148                         goto error);
149
150        if (dump_item) {
151                struct fd_list * li;
152                int i = 0;
153                for (li = queue->list.next; li != &queue->list; li = li->next) {
154                        struct fifo_item * fi = (struct fifo_item *)li;
155                        CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n [#%i](@%p)@%ld.%06ld: ",
156                                                i++, fi->item.o, (long)fi->posted_on.tv_sec,(long)(fi->posted_on.tv_nsec/1000)),
157                                         goto error);
158                        CHECK_MALLOC_DO( (*dump_item)(FD_DUMP_STD_PARAMS, fi->item.o), goto error);
159                }
160        }
161        CHECK_POSIX_DO(  pthread_mutex_unlock( &queue->mtx ), /* continue */  );
162
163        return *buf;
164error:
165        CHECK_POSIX_DO(  pthread_mutex_unlock( &queue->mtx ), /* continue */  );
166        return NULL;
167}
168
169/* Delete a queue. It must be empty. */
170int fd_fifo_del ( struct fifo  ** queue )
171{
172        struct fifo * q;
173#ifndef NDEBUG
174        int loops = 0;
175#endif
176
177        TRACE_ENTRY( "%p", queue );
178
179        if (queue && *queue == NULL) {
180                /* Queue already (in the process of being) deleted */
181                return 0;
182        }
183
184        CHECK_PARAMS( queue && CHECK_FIFO( *queue ) );
185
186        q = *queue;
187
188        CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  );
189
190        if ((q->count != 0) || (q->data != NULL)) {
191                TRACE_DEBUG(INFO, "The queue cannot be destroyed (%d, %p)", q->count, q->data);
192                CHECK_POSIX_DO(  pthread_mutex_unlock( &q->mtx ), /* no fallback */  );
193                return EINVAL;
194        }
195
196        /* Ok, now invalidate the queue */
197        q->eyec = 0xdead;
198
199        /* Have all waiting threads return an error */
200        while (q->thrs) {
201                CHECK_POSIX(  pthread_mutex_unlock( &q->mtx ));
202                CHECK_POSIX(  pthread_cond_signal(&q->cond_pull)  );
203                usleep(1000);
204
205                CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  );
206                ASSERT( ++loops < 200 ); /* detect infinite loops */
207        }
208
209        /* sanity check */
210        ASSERT(FD_IS_LIST_EMPTY(&q->list));
211
212        /* And destroy it */
213        CHECK_POSIX(  pthread_mutex_unlock( &q->mtx )  );
214
215        CHECK_POSIX_DO(  pthread_cond_destroy( &q->cond_pull ),  );
216
217        CHECK_POSIX_DO(  pthread_cond_destroy( &q->cond_push ),  );
218
219        CHECK_POSIX_DO(  pthread_mutex_destroy( &q->mtx ),  );
220
221        free(q);
222        *queue = NULL;
223
224        return 0;
225}
226
227/* Move the content of old into new, and update loc_update atomically. We leave the old queue empty but valid */
228int fd_fifo_move ( struct fifo * old, struct fifo * new, struct fifo ** loc_update )
229{
230#ifndef NDEBUG
231        int loops = 0;
232#endif
233
234        TRACE_ENTRY("%p %p %p", old, new, loc_update);
235        CHECK_PARAMS( CHECK_FIFO( old ) && CHECK_FIFO( new ));
236
237        CHECK_PARAMS( ! old->data );
238        if (new->high) {
239                TODO("Implement support for thresholds in fd_fifo_move...");
240        }
241
242        /* Update loc_update */
243        if (loc_update)
244                *loc_update = new;
245
246        /* Lock the queues */
247        CHECK_POSIX(  pthread_mutex_lock( &old->mtx )  );
248
249        CHECK_PARAMS_DO( (! old->thrs_push), {
250                        pthread_mutex_unlock( &old->mtx );
251                        return EINVAL;
252                } );
253
254        CHECK_POSIX(  pthread_mutex_lock( &new->mtx )  );
255
256        /* Any waiting thread on the old queue returns an error */
257        old->eyec = 0xdead;
258        while (old->thrs) {
259                CHECK_POSIX(  pthread_mutex_unlock( &old->mtx ));
260                CHECK_POSIX(  pthread_cond_signal( &old->cond_pull )  );
261                usleep(1000);
262
263                CHECK_POSIX(  pthread_mutex_lock( &old->mtx )  );
264                ASSERT( loops < 20 ); /* detect infinite loops */
265        }
266
267        /* Move all data from old to new */
268        fd_list_move_end( &new->list, &old->list );
269        if (old->count && (!new->count)) {
270                CHECK_POSIX(  pthread_cond_signal(&new->cond_pull)  );
271        }
272        new->count += old->count;
273
274        /* Reset old */
275        old->count = 0;
276        old->eyec = FIFO_EYEC;
277
278        /* Merge the stats in the new queue */
279        new->total_items += old->total_items;
280        old->total_items = 0;
281
282        new->total_time.tv_nsec += old->total_time.tv_nsec;
283        new->total_time.tv_sec += old->total_time.tv_sec + (new->total_time.tv_nsec / 1000000000);
284        new->total_time.tv_nsec %= 1000000000;
285        old->total_time.tv_nsec = 0;
286        old->total_time.tv_sec = 0;
287
288        new->blocking_time.tv_nsec += old->blocking_time.tv_nsec;
289        new->blocking_time.tv_sec += old->blocking_time.tv_sec + (new->blocking_time.tv_nsec / 1000000000);
290        new->blocking_time.tv_nsec %= 1000000000;
291        old->blocking_time.tv_nsec = 0;
292        old->blocking_time.tv_sec = 0;
293
294        /* Unlock, we're done */
295        CHECK_POSIX(  pthread_mutex_unlock( &new->mtx )  );
296        CHECK_POSIX(  pthread_mutex_unlock( &old->mtx )  );
297
298        return 0;
299}
300
301/* Get the information on the queue */
302int fd_fifo_getstats( struct fifo * queue, int * current_count, int * limit_count, int * highest_count, long long * total_count,
303                                           struct timespec * total, struct timespec * blocking, struct timespec * last)
304{
305        TRACE_ENTRY( "%p %p %p %p %p %p %p %p", queue, current_count, limit_count, highest_count, total_count, total, blocking, last);
306
307        if (queue == NULL) {
308                /* It is not an error if the queue is not available; happens e.g. when peers disappear */
309                return 0;
310        }
311
312        /* Check the parameters */
313        CHECK_PARAMS( CHECK_FIFO( queue ) );
314
315        /* lock the queue */
316        CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
317
318        if (current_count)
319                *current_count = queue->count;
320
321        if (limit_count)
322                *limit_count = queue->max;
323
324        if (highest_count)
325                *highest_count = queue->highest_ever;
326
327        if (total_count)
328                *total_count = queue->total_items;
329
330        if (total)
331                memcpy(total, &queue->total_time, sizeof(struct timespec));
332
333        if (blocking)
334                memcpy(blocking, &queue->blocking_time, sizeof(struct timespec));
335
336        if (last)
337                memcpy(last, &queue->last_time, sizeof(struct timespec));
338
339        /* Unlock */
340        CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
341
342        /* Done */
343        return 0;
344}
345
346
347/* alternate version with no error checking */
348int fd_fifo_length ( struct fifo * queue )
349{
350        if ( !CHECK_FIFO( queue ) )
351                return 0;
352
353        return queue->count; /* Let's hope it's read atomically, since we are not locking... */
354}
355
356/* Set the thresholds of the queue */
357int fd_fifo_setthrhd ( struct fifo * queue, void * data, uint16_t high, void (*h_cb)(struct fifo *, void **), uint16_t low, void (*l_cb)(struct fifo *, void **) )
358{
359        TRACE_ENTRY( "%p %p %hu %p %hu %p", queue, data, high, h_cb, low, l_cb );
360
361        /* Check the parameters */
362        CHECK_PARAMS( CHECK_FIFO( queue ) && (high > low) && (queue->data == NULL) );
363
364        /* lock the queue */
365        CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
366
367        /* Save the values */
368        queue->high = high;
369        queue->low  = low;
370        queue->data = data;
371        queue->h_cb = h_cb;
372        queue->l_cb = l_cb;
373
374        /* Unlock */
375        CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
376
377        /* Done */
378        return 0;
379}
380
381
382/* This handler is called when a thread is blocked on a queue, and cancelled */
383static void fifo_cleanup_push(void * queue)
384{
385        struct fifo * q = (struct fifo *)queue;
386        TRACE_ENTRY( "%p", queue );
387
388        /* The thread has been cancelled, therefore it does not wait on the queue anymore */
389        q->thrs_push--;
390
391        /* Now unlock the queue, and we're done */
392        CHECK_POSIX_DO(  pthread_mutex_unlock( &q->mtx ),  /* nothing */  );
393
394        /* End of cleanup handler */
395        return;
396}
397
398
399/* Post a new item in the queue */
400int fd_fifo_post_internal ( struct fifo * queue, void ** item, int skip_max )
401{
402        struct fifo_item * new;
403        int call_cb = 0;
404        struct timespec posted_on, queued_on;
405
406        /* Get the timing of this call */
407        CHECK_SYS(  clock_gettime(CLOCK_REALTIME, &posted_on)  );
408
409        /* lock the queue */
410        CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
411
412        if ((!skip_max) && (queue->max)) {
413                while (queue->count >= queue->max) {
414                        int ret = 0;
415
416                        /* We have to wait for an item to be pulled */
417                        queue->thrs_push++ ;
418                        pthread_cleanup_push( fifo_cleanup_push, queue);
419                        ret = pthread_cond_wait( &queue->cond_push, &queue->mtx );
420                        pthread_cleanup_pop(0);
421                        queue->thrs_push-- ;
422
423#ifdef NDEBUG
424                        (void)ret;
425#endif
426                        ASSERT( ret == 0 );
427                }
428        }
429
430        /* Create a new list item */
431        CHECK_MALLOC_DO(  new = malloc (sizeof (struct fifo_item)) , {
432                        pthread_mutex_unlock( &queue->mtx );
433                        return ENOMEM;
434                } );
435
436        fd_list_init(&new->item, *item);
437        *item = NULL;
438
439        /* Add the new item at the end */
440        fd_list_insert_before( &queue->list, &new->item);
441        queue->count++;
442        if (queue->highest_ever < queue->count)
443                queue->highest_ever = queue->count;
444        if (queue->high && ((queue->count % queue->high) == 0)) {
445                call_cb = 1;
446                queue->highest = queue->count;
447        }
448
449        /* store timing */
450        memcpy(&new->posted_on, &posted_on, sizeof(struct timespec));
451
452        /* update queue timing info "blocking time" */
453        {
454                long long blocked_ns;
455                CHECK_SYS(  clock_gettime(CLOCK_REALTIME, &queued_on)  );
456                blocked_ns = (queued_on.tv_sec - posted_on.tv_sec) * 1000000000;
457                blocked_ns += (queued_on.tv_nsec - posted_on.tv_nsec);
458                blocked_ns += queue->blocking_time.tv_nsec;
459                queue->blocking_time.tv_sec += blocked_ns / 1000000000;
460                queue->blocking_time.tv_nsec = blocked_ns % 1000000000;
461        }
462
463        /* Signal if threads are asleep */
464        if (queue->thrs > 0) {
465                CHECK_POSIX(  pthread_cond_signal(&queue->cond_pull)  );
466        }
467        if (queue->thrs_push > 0) {
468                /* cascade */
469                CHECK_POSIX(  pthread_cond_signal(&queue->cond_push)  );
470        }
471
472        /* Unlock */
473        CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
474
475        /* Call high-watermark cb as needed */
476        if (call_cb && queue->h_cb)
477                (*queue->h_cb)(queue, &queue->data);
478
479        /* Done */
480        return 0;
481}
482
483/* Post a new item in the queue */
484int fd_fifo_post_int ( struct fifo * queue, void ** item )
485{
486        TRACE_ENTRY( "%p %p", queue, item );
487
488        /* Check the parameters */
489        CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item );
490
491        return fd_fifo_post_internal ( queue,item, 0 );
492
493}
494
495/* Post a new item in the queue, not blocking */
496int fd_fifo_post_noblock ( struct fifo * queue, void ** item )
497{
498        TRACE_ENTRY( "%p %p", queue, item );
499
500        /* Check the parameters */
501        CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item );
502
503        return fd_fifo_post_internal ( queue,item, 1 );
504
505}
506
507/* Pop the first item from the queue */
508static void * mq_pop(struct fifo * queue)
509{
510        void * ret = NULL;
511        struct fifo_item * fi;
512        struct timespec now;
513
514        ASSERT( ! FD_IS_LIST_EMPTY(&queue->list) );
515
516        fi = (struct fifo_item *)(queue->list.next);
517        ret = fi->item.o;
518        fd_list_unlink(&fi->item);
519        queue->count--;
520        queue->total_items++;
521
522        /* Update the timings */
523        CHECK_SYS_DO(  clock_gettime(CLOCK_REALTIME, &now), goto skip_timing  );
524        {
525                long long elapsed = (now.tv_sec - fi->posted_on.tv_sec) * 1000000000;
526                elapsed += now.tv_nsec - fi->posted_on.tv_nsec;
527
528                queue->last_time.tv_sec = elapsed / 1000000000;
529                queue->last_time.tv_nsec = elapsed % 1000000000;
530
531                elapsed += queue->total_time.tv_nsec;
532                queue->total_time.tv_sec += elapsed / 1000000000;
533                queue->total_time.tv_nsec = elapsed % 1000000000;
534        }
535skip_timing:
536        free(fi);
537
538        if (queue->thrs_push) {
539                CHECK_POSIX_DO( pthread_cond_signal( &queue->cond_push ), );
540        }
541
542        return ret;
543}
544
545/* Check if the low watermark callback must be called. */
546static __inline__ int test_l_cb(struct fifo * queue)
547{
548        if ((queue->high == 0) || (queue->low == 0) || (queue->l_cb == 0))
549                return 0;
550
551        if (((queue->count % queue->high) == queue->low) && (queue->highest > queue->count)) {
552                queue->highest -= queue->high;
553                return 1;
554        }
555
556        return 0;
557}
558
559/* Try poping an item */
560int fd_fifo_tryget_int ( struct fifo * queue, void ** item )
561{
562        int wouldblock = 0;
563        int call_cb = 0;
564
565        TRACE_ENTRY( "%p %p", queue, item );
566
567        /* Check the parameters */
568        CHECK_PARAMS( CHECK_FIFO( queue ) && item );
569
570        /* lock the queue */
571        CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
572
573        /* Check queue status */
574        if (queue->count > 0) {
575got_item:
576                /* There are elements in the queue, so pick the first one */
577                *item = mq_pop(queue);
578                call_cb = test_l_cb(queue);
579        } else {
580                if (queue->thrs_push > 0) {
581                        /* A thread is trying to push something, let's give it a chance */
582                        CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
583                        CHECK_POSIX(  pthread_cond_signal( &queue->cond_push )  );
584                        usleep(1000);
585                        CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
586                        if (queue->count > 0)
587                                goto got_item;
588                }
589
590                wouldblock = 1;
591                *item = NULL;
592        }
593
594        /* Unlock */
595        CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
596
597        /* Call low watermark callback as needed */
598        if (call_cb)
599                (*queue->l_cb)(queue, &queue->data);
600
601        /* Done */
602        return wouldblock ? EWOULDBLOCK : 0;
603}
604
605/* This handler is called when a thread is blocked on a queue, and cancelled */
606static void fifo_cleanup(void * queue)
607{
608        struct fifo * q = (struct fifo *)queue;
609        TRACE_ENTRY( "%p", queue );
610
611        /* The thread has been cancelled, therefore it does not wait on the queue anymore */
612        q->thrs--;
613
614        /* Now unlock the queue, and we're done */
615        CHECK_POSIX_DO(  pthread_mutex_unlock( &q->mtx ),  /* nothing */  );
616
617        /* End of cleanup handler */
618        return;
619}
620
621/* The internal function for fd_fifo_timedget and fd_fifo_get */
622static int fifo_tget ( struct fifo * queue, void ** item, int istimed, const struct timespec *abstime)
623{
624        int call_cb = 0;
625        int ret = 0;
626
627        /* Check the parameters */
628        CHECK_PARAMS( CHECK_FIFO( queue ) && item && (abstime || !istimed) );
629
630        /* Initialize the return value */
631        *item = NULL;
632
633        /* lock the queue */
634        CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
635
636awaken:
637        /* Check queue status */
638        if (!CHECK_FIFO( queue )) {
639                /* The queue is being destroyed */
640                CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
641                TRACE_DEBUG(FULL, "The queue is being destroyed -> EPIPE");
642                return EPIPE;
643        }
644
645        if (queue->count > 0) {
646                /* There are items in the queue, so pick the first one */
647                *item = mq_pop(queue);
648                call_cb = test_l_cb(queue);
649        } else {
650                /* We have to wait for a new item */
651                queue->thrs++ ;
652                pthread_cleanup_push( fifo_cleanup, queue);
653                if (istimed) {
654                        ret = pthread_cond_timedwait( &queue->cond_pull, &queue->mtx, abstime );
655                } else {
656                        ret = pthread_cond_wait( &queue->cond_pull, &queue->mtx );
657                }
658                pthread_cleanup_pop(0);
659                queue->thrs-- ;
660                if (ret == 0)
661                        goto awaken;  /* test for spurious wake-ups */
662
663                /* otherwise (ETIMEDOUT / other error) just continue */
664        }
665
666        /* Unlock */
667        CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
668
669        /* Call low watermark callback as needed */
670        if (call_cb)
671                (*queue->l_cb)(queue, &queue->data);
672
673        /* Done */
674        return ret;
675}
676
677/* Get the next available item, block until there is one */
678int fd_fifo_get_int ( struct fifo * queue, void ** item )
679{
680        TRACE_ENTRY( "%p %p", queue, item );
681        return fifo_tget(queue, item, 0, NULL);
682}
683
684/* Get the next available item, block until there is one, or the timeout expires */
685int fd_fifo_timedget_int ( struct fifo * queue, void ** item, const struct timespec *abstime )
686{
687        TRACE_ENTRY( "%p %p %p", queue, item, abstime );
688        return fifo_tget(queue, item, 1, abstime);
689}
690
691/* Test if data is available in the queue, without pulling it */
692int fd_fifo_select ( struct fifo * queue, const struct timespec *abstime )
693{
694        int ret = 0;
695        TRACE_ENTRY( "%p %p", queue, abstime );
696
697        CHECK_PARAMS_DO( CHECK_FIFO( queue ), return -EINVAL );
698
699        /* lock the queue */
700        CHECK_POSIX_DO(  pthread_mutex_lock( &queue->mtx ), return -__ret__  );
701
702awaken:
703        ret = (queue->count > 0 ) ? queue->count : 0;
704        if ((ret == 0) && (abstime != NULL)) {
705                /* We have to wait for a new item */
706                queue->thrs++ ;
707                pthread_cleanup_push( fifo_cleanup, queue);
708                ret = pthread_cond_timedwait( &queue->cond_pull, &queue->mtx, abstime );
709                pthread_cleanup_pop(0);
710                queue->thrs-- ;
711                if (ret == 0)
712                        goto awaken;  /* test for spurious wake-ups */
713
714                if (ret == ETIMEDOUT)
715                        ret = 0;
716                else
717                        ret = -ret;
718        }
719
720        /* Unlock */
721        CHECK_POSIX_DO(  pthread_mutex_unlock( &queue->mtx ), return -__ret__  );
722
723        return ret;
724}
Note: See TracBrowser for help on using the repository browser.