Navigation


source: freeDiameter/libfreeDiameter/fifo.c @ 25:67ca08d5bc48

Last change on this file since 25:67ca08d5bc48 was 25:67ca08d5bc48, checked in by Sebastien Decugis <sdecugis@nict.go.jp>, 11 years ago

Completed connection context files

File size: 14.2 KB
Line 
1/*********************************************************************************************************
2* Software License Agreement (BSD License)                                                               *
3* Author: Sebastien Decugis <sdecugis@nict.go.jp>                                                        *
4*                                                                                                        *
5* Copyright (c) 2008, WIDE Project and NICT                                                              *
6* All rights reserved.                                                                                   *
7*                                                                                                        *
8* Redistribution and use of this software in source and binary forms, with or without modification, are  *
9* permitted provided that the following conditions are met:                                              *
10*                                                                                                        *
11* * Redistributions of source code must retain the above                                                 *
12*   copyright notice, this list of conditions and the                                                    *
13*   following disclaimer.                                                                                *
14*                                                                                                        *
15* * Redistributions in binary form must reproduce the above                                              *
16*   copyright notice, this list of conditions and the                                                    *
17*   following disclaimer in the documentation and/or other                                               *
18*   materials provided with the distribution.                                                            *
19*                                                                                                        *
20* * Neither the name of the WIDE Project or NICT nor the                                                 *
21*   names of its contributors may be used to endorse or                                                  *
22*   promote products derived from this software without                                                  *
23*   specific prior written permission of WIDE Project and                                                *
24*   NICT.                                                                                                *
25*                                                                                                        *
26* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
27* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
28* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
29* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT     *
30* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS    *
31* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
32* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF   *
33* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.                                                             *
34*********************************************************************************************************/
35
36/* FIFO queues module.
37 *
38 * The threads that call these functions must be in the cancellation state PTHREAD_CANCEL_ENABLE and type PTHREAD_CANCEL_DEFERRED.
39 * This is the default state and type on thread creation.
40 *
41 * In order to destroy properly a queue, the application must:
42 *  -> shutdown any process that can add into the queue first.
43 *  -> pthread_cancel any thread that could be waiting on the queue.
44 *  -> consume any element that is in the queue, using fd_qu_tryget_int.
45 *  -> then destroy the queue using fd_mq_del.
46 */
47
48#include "libfD.h"
49
50/* Definition of a FIFO queue object */
51struct fifo {
52        int             eyec;   /* An eye catcher, also used to check a queue is valid. FIFO_EYEC */
53       
54        pthread_mutex_t mtx;    /* Mutex protecting this queue */
55        pthread_cond_t  cond;   /* condition variable of the list */
56       
57        struct fd_list  list;   /* sentinel for the list of elements */
58        int             count;  /* number of objects in the list */
59        int             thrs;   /* number of threads waiting for a new element (when count is 0) */
60       
61        uint16_t        high;   /* High level threshold (see libfreeDiameter.h for details) */
62        uint16_t        low;    /* Low level threshhold */
63        void            *data;  /* Opaque pointer for threshold callbacks */
64        void            (*h_cb)(struct fifo *, void **); /* The callbacks */
65        void            (*l_cb)(struct fifo *, void **);
66        int             highest;/* The highest count value for which h_cb has been called */
67        int             highest_ever; /* The max count value this queue has reached (for tweaking) */
68};
69
70/* The eye catcher value */
71#define FIFO_EYEC       0xe7ec1130
72
73/* Macro to check a pointer */
74#define CHECK_FIFO( _queue ) (( (_queue) != NULL) && ( (_queue)->eyec == FIFO_EYEC) )
75
76
77/* Create a new queue */
78int fd_fifo_new ( struct fifo ** queue )
79{
80        struct fifo * new;
81       
82        TRACE_ENTRY( "%p", queue );
83       
84        CHECK_PARAMS( queue );
85       
86        /* Create a new object */
87        CHECK_MALLOC( new = malloc (sizeof (struct fifo) )  );
88       
89        /* Initialize the content */
90        memset(new, 0, sizeof(struct fifo));
91       
92        new->eyec = FIFO_EYEC;
93        CHECK_POSIX( pthread_mutex_init(&new->mtx, NULL) );
94        CHECK_POSIX( pthread_cond_init(&new->cond, NULL) );
95       
96        fd_list_init(&new->list, NULL);
97       
98        /* We're done */
99        *queue = new;
100        return 0;
101}
102
103/* Dump the content of a queue */
104void fd_fifo_dump(int level, char * name, struct fifo * queue, void (*dump_item)(int level, void * item))
105{
106        TRACE_ENTRY("%i %p %p %p", level, name, queue, dump_item);
107       
108        if (!TRACE_BOOL(level))
109                return;
110       
111        fd_log_debug("Dumping queue '%s' (%p):\n", name ?: "?", queue);
112        if (!CHECK_FIFO( queue )) {
113                fd_log_debug("  Queue invalid!\n");
114                if (queue)
115                        fd_log_debug("  (%x != %x)\n", queue->eyec, FIFO_EYEC);
116                return;
117        }
118       
119        CHECK_POSIX_DO(  pthread_mutex_lock( &queue->mtx ), /* continue */  );
120        fd_log_debug("   %d elements in queue / %d threads waiting\n", queue->count, queue->thrs);
121        fd_log_debug("   thresholds: %d / %d (h:%d), cb: %p,%p (%p), highest: %d\n",
122                        queue->high, queue->low, queue->highest, 
123                        queue->h_cb, queue->l_cb, queue->data,
124                        queue->highest_ever);
125       
126        if (dump_item) {
127                struct fd_list * li;
128                int i = 0;
129                for (li = queue->list.next; li != &queue->list; li = li->next) {
130                        fd_log_debug("  [%i] item %p in fifo %p:\n", i++, li->o, queue);
131                        (*dump_item)(level, li->o);
132                }
133        }
134        CHECK_POSIX_DO(  pthread_mutex_unlock( &queue->mtx ), /* continue */  );
135       
136}
137
138/* Delete a queue. It must be empty. */ 
139int fd_fifo_del ( struct fifo  ** queue )
140{
141        struct fifo * q;
142        int loops = 0;
143       
144        TRACE_ENTRY( "%p", queue );
145
146        CHECK_PARAMS( queue && CHECK_FIFO( *queue ) );
147       
148        q = *queue;
149       
150        CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  );
151       
152        if ((q->count != 0) || (q->data != NULL)) {
153                TRACE_DEBUG(INFO, "The queue cannot be destroyed (%d, %p)", q->count, q->data);
154                CHECK_POSIX_DO(  pthread_mutex_unlock( &q->mtx ), /* no fallback */  );
155                return EINVAL;
156        }
157       
158        /* Ok, now invalidate the queue */
159        q->eyec = 0xdead;
160       
161        while (q->thrs) {
162                CHECK_POSIX(  pthread_cond_signal(&q->cond)  );
163                CHECK_POSIX(  pthread_mutex_unlock( &q->mtx ));
164                pthread_yield();
165                CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  );
166                ASSERT( ++loops < 10 ); /* detect infinite loops */
167        }
168       
169        /* sanity check */
170        ASSERT(FD_IS_LIST_EMPTY(&q->list));
171       
172        /* And destroy it */
173        CHECK_POSIX(  pthread_mutex_unlock( &q->mtx )  );
174       
175        CHECK_POSIX(  pthread_cond_destroy( &q->cond )  );
176       
177        CHECK_POSIX(  pthread_mutex_destroy( &q->mtx )  );
178       
179        free(q);
180        *queue = NULL;
181       
182        return 0;
183}
184
185/* Move the content of old into new, and update loc_update atomically */
186int fd_fifo_move ( struct fifo ** old, struct fifo * new, struct fifo ** loc_update )
187{
188        struct fifo * q;
189        int loops = 0;
190       
191        TRACE_ENTRY("%p %p %p", old, new, loc_update);
192        CHECK_PARAMS( old && CHECK_FIFO( *old ) && CHECK_FIFO( new ));
193       
194        q = *old;
195        CHECK_PARAMS( ! q->data );
196        if (new->high) {
197                TODO("Implement support for thresholds in fd_fifo_move...");
198        }
199       
200        /* Update loc_update */
201        *old = NULL;
202        if (loc_update)
203                *loc_update = new;
204       
205        /* Lock the queues */
206        CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  );
207        CHECK_POSIX(  pthread_mutex_lock( &new->mtx )  );
208       
209        /* Any waiting thread on the old queue returns an error */
210        q->eyec = 0xdead;
211        while (q->thrs) {
212                CHECK_POSIX(  pthread_cond_signal(&q->cond)  );
213                CHECK_POSIX(  pthread_mutex_unlock( &q->mtx ));
214                pthread_yield();
215                CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  );
216                ASSERT( ++loops < 10 ); /* detect infinite loops */
217        }
218       
219        /* Move all data from old to new */
220        fd_list_move_end( &new->list, &q->list );
221        if (q->count && (!new->count)) {
222                CHECK_POSIX(  pthread_cond_signal(&new->cond)  );
223        }
224        new->count += q->count;
225       
226        /* Destroy old */
227        CHECK_POSIX(  pthread_mutex_unlock( &q->mtx )  );
228        CHECK_POSIX(  pthread_cond_destroy( &q->cond )  );
229        CHECK_POSIX(  pthread_mutex_destroy( &q->mtx )  );
230        free(q);
231       
232        /* Unlock new, we're done */
233        CHECK_POSIX(  pthread_mutex_unlock( &new->mtx )  );
234       
235        return 0;
236}
237
238/* Get the length of the queue */
239int fd_fifo_length ( struct fifo * queue, int * length )
240{
241        TRACE_ENTRY( "%p %p", queue, length );
242       
243        /* Check the parameters */
244        CHECK_PARAMS( CHECK_FIFO( queue ) && length );
245       
246        /* lock the queue */
247        CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
248       
249        /* Retrieve the count */
250        *length = queue->count;
251       
252        /* Unlock */
253        CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
254       
255        /* Done */
256        return 0;
257}
258
259/* alternate version with no error checking */
260int fd_fifo_length_noerr ( struct fifo * queue )
261{
262        if ( !CHECK_FIFO( queue ) )
263                return 0;
264       
265        return queue->count; /* Let's hope it's read atomically, since we are not locking... */
266}
267
268/* Set the thresholds of the queue */
269int fd_fifo_setthrhd ( struct fifo * queue, void * data, uint16_t high, void (*h_cb)(struct fifo *, void **), uint16_t low, void (*l_cb)(struct fifo *, void **) )
270{
271        TRACE_ENTRY( "%p %p %hu %p %hu %p", queue, data, high, h_cb, low, l_cb );
272       
273        /* Check the parameters */
274        CHECK_PARAMS( CHECK_FIFO( queue ) && (high > low) && (queue->data == NULL) );
275       
276        /* lock the queue */
277        CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
278       
279        /* Save the values */
280        queue->high = high;
281        queue->low  = low;
282        queue->data = data;
283        queue->h_cb = h_cb;
284        queue->l_cb = l_cb;
285       
286        /* Unlock */
287        CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
288       
289        /* Done */
290        return 0;
291}
292
293/* Post a new item in the queue */
294int fd_fifo_post_int ( struct fifo * queue, void ** item )
295{
296        struct fd_list * new;
297        int call_cb = 0;
298       
299        TRACE_ENTRY( "%p %p", queue, item );
300       
301        /* Check the parameters */
302        CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item );
303       
304        /* Create a new list item */
305        CHECK_MALLOC(  new = malloc (sizeof (struct fd_list))  );
306       
307        fd_list_init(new, *item);
308        *item = NULL;
309       
310        /* lock the queue */
311        CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
312       
313        /* Add the new item at the end */
314        fd_list_insert_before( &queue->list, new);
315        queue->count++;
316        if (queue->highest_ever < queue->count)
317                queue->highest_ever = queue->count;
318        if (queue->high && ((queue->count % queue->high) == 0)) {
319                call_cb = 1;
320                queue->highest = queue->count;
321        }
322       
323        /* Signal if threads are asleep */
324        if (queue->thrs > 0) {
325                CHECK_POSIX(  pthread_cond_signal(&queue->cond)  );
326        }
327       
328        /* Unlock */
329        CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
330       
331        /* Call high-watermark cb as needed */
332        if (call_cb && queue->h_cb)
333                (*queue->h_cb)(queue, &queue->data);
334       
335        /* Done */
336        return 0;
337}
338
339/* Pop the first item from the queue */
340static void * mq_pop(struct fifo * queue)
341{
342        void * ret = NULL;
343        struct fd_list * li;
344       
345        ASSERT( ! FD_IS_LIST_EMPTY(&queue->list) );
346       
347        fd_list_unlink(li = queue->list.next);
348        queue->count--;
349        ret = li->o;
350        free(li);
351       
352        return ret;
353}
354
355/* Check if the low watermark callback must be called. */
356static __inline__ int test_l_cb(struct fifo * queue)
357{
358        if ((queue->high == 0) || (queue->low == 0) || (queue->l_cb == 0))
359                return 0;
360       
361        if (((queue->count % queue->high) == queue->low) && (queue->highest > queue->count)) {
362                queue->highest -= queue->high;
363                return 1;
364        }
365       
366        return 0;
367}
368
369/* Try poping an item */
370int fd_fifo_tryget_int ( struct fifo * queue, void ** item )
371{
372        int wouldblock = 0;
373        int call_cb = 0;
374       
375        TRACE_ENTRY( "%p %p", queue, item );
376       
377        /* Check the parameters */
378        CHECK_PARAMS( CHECK_FIFO( queue ) && item );
379       
380        /* lock the queue */
381        CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
382       
383        /* Check queue status */
384        if (queue->count > 0) {
385                /* There are elements in the queue, so pick the first one */
386                *item = mq_pop(queue);
387                call_cb = test_l_cb(queue);
388        } else {
389                wouldblock = 1;
390                *item = NULL;
391        }
392               
393        /* Unlock */
394        CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
395       
396        /* Call low watermark callback as needed */
397        if (call_cb)
398                (*queue->l_cb)(queue, &queue->data);
399       
400        /* Done */
401        return wouldblock ? EWOULDBLOCK : 0;
402}
403
404/* This handler is called when a thread is blocked on a queue, and cancelled */
405static void fifo_cleanup(void * queue)
406{
407        struct fifo * q = (struct fifo *)queue;
408        TRACE_ENTRY( "%p", queue );
409       
410        /* Check the parameter */
411        if ( ! CHECK_FIFO( q )) {
412                TRACE_DEBUG(INFO, "Invalid queue, skipping handler");
413                return;
414        }
415       
416        /* The thread has been cancelled, therefore it does not wait on the queue anymore */
417        q->thrs--;
418       
419        /* Now unlock the queue, and we're done */
420        CHECK_POSIX_DO(  pthread_mutex_unlock( &q->mtx ),  /* nothing */  );
421       
422        /* End of cleanup handler */
423        return;
424}
425
426/* The internal function for fd_fifo_timedget and fd_fifo_get */
427static int fifo_tget ( struct fifo * queue, void ** item, int istimed, const struct timespec *abstime)
428{
429        int timedout = 0;
430        int call_cb = 0;
431       
432        /* Check the parameters */
433        CHECK_PARAMS( CHECK_FIFO( queue ) && item && (abstime || !istimed) );
434       
435        /* Initialize the return value */
436        *item = NULL;
437       
438        /* lock the queue */
439        CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
440       
441awaken:
442        /* Check queue status */
443        if (!CHECK_FIFO( queue )) {
444                /* The queue is being destroyed */
445                CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
446                TRACE_DEBUG(FULL, "The queue is being destroyed -> EPIPE");
447                return EPIPE;
448        }
449               
450        if (queue->count > 0) {
451                /* There are items in the queue, so pick the first one */
452                *item = mq_pop(queue);
453                call_cb = test_l_cb(queue);
454        } else {
455                int ret = 0;
456                /* We have to wait for a new item */
457                queue->thrs++ ;
458                pthread_cleanup_push( fifo_cleanup, queue);
459                if (istimed) {
460                        ret = pthread_cond_timedwait( &queue->cond, &queue->mtx, abstime );
461                } else {
462                        ret = pthread_cond_wait( &queue->cond, &queue->mtx );
463                }
464                pthread_cleanup_pop(0);
465                queue->thrs-- ;
466                if (ret == 0)
467                        goto awaken;  /* test for spurious wake-ups */
468               
469                if (istimed && (ret == ETIMEDOUT)) {
470                        timedout = 1;
471                } else {
472                        /* Unexpected error condition (means we need to debug) */
473                        ASSERT( ret == 0 /* never true */ );
474                }
475        }
476       
477        /* Unlock */
478        CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
479       
480        /* Call low watermark callback as needed */
481        if (call_cb)
482                (*queue->l_cb)(queue, &queue->data);
483       
484        /* Done */
485        return timedout ? ETIMEDOUT : 0;
486}
487
488/* Get the next available item, block until there is one */
489int fd_fifo_get_int ( struct fifo * queue, void ** item )
490{
491        TRACE_ENTRY( "%p %p", queue, item );
492        return fifo_tget(queue, item, 0, NULL);
493}
494
495/* Get the next available item, block until there is one, or the timeout expires */
496int fd_fifo_timedget_int ( struct fifo * queue, void ** item, const struct timespec *abstime )
497{
498        TRACE_ENTRY( "%p %p %p", queue, item, abstime );
499        return fifo_tget(queue, item, 1, abstime);
500}
501
Note: See TracBrowser for help on using the repository browser.