Navigation


Changeset 8:3e143f047f78 in freeDiameter for libfreeDiameter/queues.c


Ignore:
Timestamp:
Sep 18, 2009, 6:54:07 PM (15 years ago)
Author:
Sebastien Decugis <sdecugis@nict.go.jp>
Branch:
default
Phase:
public
Message:

Backup for the week-end

File:
1 moved

Legend:

Unmodified
Added
Removed
  • libfreeDiameter/queues.c

    r1 r8  
    3434*********************************************************************************************************/
    3535
    36 /* Messages queues module.
     36/* FIFO queues module.
    3737 *
    3838 * The threads that call these functions must be in the cancellation state PTHREAD_CANCEL_ENABLE and type PTHREAD_CANCEL_DEFERRED.
     
    4242 *  -> shutdown any process that can add into the queue first.
    4343 *  -> pthread_cancel any thread that could be waiting on the queue.
    44  *  -> consume any message that is in the queue, using meq_tryget.
    45  *  -> then destroy the queue using meq_del.
     44 *  -> consume any element that is in the queue, using fd_qu_tryget_int.
     45 *  -> then destroy the queue using fd_mq_del.
    4646 */
    4747
    4848#include "libfD.h"
    4949
    50 /* Definition of a message queue object */
    51 struct mqueue {
    52         int             eyec;   /* An eye catcher, also used to check a queue is valid. MQ_EYEC */
     50/* Definition of a FIFO queue object */
     51struct fifo {
     52        int             eyec;   /* An eye catcher, also used to check a queue is valid. FIFO_EYEC */
    5353       
    5454        pthread_mutex_t mtx;    /* Mutex protecting this queue */
    5555        pthread_cond_t  cond;   /* condition variable of the list */
    5656       
    57         struct fd_list  list;   /* sentinel for the list of messages */
     57        struct fd_list  list;   /* sentinel for the list of elements */
    5858        int             count;  /* number of objects in the list */
    59         int             thrs;   /* number of threads waiting for a new message (when count is 0) */
     59        int             thrs;   /* number of threads waiting for a new element (when count is 0) */
    6060       
    6161        uint16_t        high;   /* High level threshold (see libfreeDiameter.h for details) */
    6262        uint16_t        low;    /* Low level threshhold */
    6363        void            *data;  /* Opaque pointer for threshold callbacks */
    64         void            (*h_cb)(struct mqueue *, void **); /* The callbacks */
    65         void            (*l_cb)(struct mqueue *, void **);
     64        void            (*h_cb)(struct fifo *, void **); /* The callbacks */
     65        void            (*l_cb)(struct fifo *, void **);
    6666        int             highest;/* The highest count value for which h_cb has been called */
    6767};
    6868
    6969/* The eye catcher value */
    70 #define MQ_EYEC 0xe7ec1130
     70#define FIFO_EYEC       0xe7ec1130
    7171
    7272/* Macro to check a pointer */
    73 #define CHECK_QUEUE( _queue ) (( (_queue) != NULL) && ( (_queue)->eyec == MQ_EYEC) )
    74 
    75 
    76 /* Create a new message queue */
    77 int fd_mq_new ( struct mqueue ** queue )
    78 {
    79         struct mqueue * new;
     73#define CHECK_FIFO( _queue ) (( (_queue) != NULL) && ( (_queue)->eyec == FIFO_EYEC) )
     74
     75
     76/* Create a new queue */
     77int fd_fifo_new ( struct fifo ** queue )
     78{
     79        struct fifo * new;
    8080       
    8181        TRACE_ENTRY( "%p", queue );
     
    8484       
    8585        /* Create a new object */
    86         CHECK_MALLOC( new = malloc (sizeof (struct mqueue) )  );
     86        CHECK_MALLOC( new = malloc (sizeof (struct fifo) )  );
    8787       
    8888        /* Initialize the content */
    89         memset(new, 0, sizeof(struct mqueue));
    90        
    91         new->eyec = MQ_EYEC;
     89        memset(new, 0, sizeof(struct fifo));
     90       
     91        new->eyec = FIFO_EYEC;
    9292        CHECK_POSIX( pthread_mutex_init(&new->mtx, NULL) );
    9393        CHECK_POSIX( pthread_cond_init(&new->cond, NULL) );
     
    100100}
    101101
    102 /* Delete a message queue. It must be unused. */
    103 int fd_mq_del ( struct mqueue  ** queue )
    104 {
    105         struct mqueue * q;
     102/* Delete a queue. It must be unused. */
     103int fd_fifo_del ( struct fifo  ** queue )
     104{
     105        struct fifo * q;
    106106       
    107107        TRACE_ENTRY( "%p", queue );
    108108
    109         CHECK_PARAMS( queue && CHECK_QUEUE( *queue ) );
     109        CHECK_PARAMS( queue && CHECK_FIFO( *queue ) );
    110110       
    111111        q = *queue;
     
    139139
    140140/* Get the length of the queue */
    141 int fd_mq_length ( struct mqueue * queue, int * length )
     141int fd_fifo_length ( struct fifo * queue, int * length )
    142142{
    143143        TRACE_ENTRY( "%p %p", queue, length );
    144144       
    145145        /* Check the parameters */
    146         CHECK_PARAMS( CHECK_QUEUE( queue ) && length );
     146        CHECK_PARAMS( CHECK_FIFO( queue ) && length );
    147147       
    148148        /* lock the queue */
     
    160160
    161161/* alternate version with no error checking */
    162 int fd_mq_length_noerr ( struct mqueue * queue )
    163 {
    164         if ( !CHECK_QUEUE( queue ) )
     162int fd_fifo_length_noerr ( struct fifo * queue )
     163{
     164        if ( !CHECK_FIFO( queue ) )
    165165                return 0;
    166166       
     
    169169
    170170/* Set the thresholds of the queue */
    171 int fd_mq_setthrhd ( struct mqueue * queue, void * data, uint16_t high, void (*h_cb)(struct mqueue *, void **), uint16_t low, void (*l_cb)(struct mqueue *, void **) )
     171int fd_fifo_setthrhd ( struct fifo * queue, void * data, uint16_t high, void (*h_cb)(struct fifo *, void **), uint16_t low, void (*l_cb)(struct fifo *, void **) )
    172172{
    173173        TRACE_ENTRY( "%p %p %hu %p %hu %p", queue, data, high, h_cb, low, l_cb );
    174174       
    175175        /* Check the parameters */
    176         CHECK_PARAMS( CHECK_QUEUE( queue ) && (high > low) && (queue->data == NULL) );
     176        CHECK_PARAMS( CHECK_FIFO( queue ) && (high > low) && (queue->data == NULL) );
    177177       
    178178        /* lock the queue */
     
    193193}
    194194
    195 /* Post a new message in the queue */
    196 int fd_mq_post ( struct mqueue * queue, struct msg ** msg )
     195/* Post a new item in the queue */
     196int fd_fifo_post_int ( struct fifo * queue, void ** item )
    197197{
    198198        struct fd_list * new;
    199199        int call_cb = 0;
    200200       
    201         TRACE_ENTRY( "%p %p", queue, msg );
    202        
    203         /* Check the parameters */
    204         CHECK_PARAMS( CHECK_QUEUE( queue ) && msg && *msg );
     201        TRACE_ENTRY( "%p %p", queue, item );
     202       
     203        /* Check the parameters */
     204        CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item );
    205205       
    206206        /* Create a new list item */
    207207        CHECK_MALLOC(  new = malloc (sizeof (struct fd_list))  );
    208208       
    209         fd_list_init(new, *msg);
    210         *msg = NULL;
    211        
    212         /* lock the queue */
    213         CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
    214        
    215         /* Add the new message at the end */
     209        fd_list_init(new, *item);
     210        *item = NULL;
     211       
     212        /* lock the queue */
     213        CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
     214       
     215        /* Add the new item at the end */
    216216        fd_list_insert_before( &queue->list, new);
    217217        queue->count++;
     
    237237}
    238238
    239 /* Pop the first message from the queue */
    240 static struct msg * mq_pop(struct mqueue * queue)
    241 {
    242         struct msg * ret = NULL;
     239/* Pop the first item from the queue */
     240static void * mq_pop(struct fifo * queue)
     241{
     242        void * ret = NULL;
    243243        struct fd_list * li;
    244244       
     
    247247        fd_list_unlink(li = queue->list.next);
    248248        queue->count--;
    249         ret = (struct msg *)(li->o);
     249        ret = li->o;
    250250        free(li);
    251251       
     
    254254
    255255/* Check if the low watermark callback must be called. */
    256 static int test_l_cb(struct mqueue * queue)
     256static __inline__ int test_l_cb(struct fifo * queue)
    257257{
    258258        if ((queue->high == 0) || (queue->low == 0) || (queue->l_cb == 0))
     
    267267}
    268268
    269 /* Try poping a message */
    270 int fd_mq_tryget ( struct mqueue * queue, struct msg ** msg )
     269/* Try poping an item */
     270int fd_fifo_tryget_int ( struct fifo * queue, void ** item )
    271271{
    272272        int wouldblock = 0;
    273273        int call_cb = 0;
    274274       
    275         TRACE_ENTRY( "%p %p", queue, msg );
    276        
    277         /* Check the parameters */
    278         CHECK_PARAMS( CHECK_QUEUE( queue ) && msg );
     275        TRACE_ENTRY( "%p %p", queue, item );
     276       
     277        /* Check the parameters */
     278        CHECK_PARAMS( CHECK_FIFO( queue ) && item );
    279279       
    280280        /* lock the queue */
     
    283283        /* Check queue status */
    284284        if (queue->count > 0) {
    285                 /* There are messages in the queue, so pick the first one */
    286                 *msg = mq_pop(queue);
     285                /* There are elements in the queue, so pick the first one */
     286                *item = mq_pop(queue);
    287287                call_cb = test_l_cb(queue);
    288288        } else {
    289289                wouldblock = 1;
    290                 *msg = NULL;
     290                *item = NULL;
    291291        }
    292292               
     
    303303
    304304/* This handler is called when a thread is blocked on a queue, and cancelled */
    305 static void mq_cleanup(void * queue)
    306 {
    307         struct mqueue * q = (struct mqueue *)queue;
     305static void fifo_cleanup(void * queue)
     306{
     307        struct fifo * q = (struct fifo *)queue;
    308308        TRACE_ENTRY( "%p", queue );
    309309       
    310310        /* Check the parameter */
    311         if ( ! CHECK_QUEUE( q )) {
     311        if ( ! CHECK_FIFO( q )) {
    312312                TRACE_DEBUG(INFO, "Invalid queue, skipping handler");
    313313                return;
     
    324324}
    325325
    326 /* The internal function for meq_timedget and meq_get */
    327 static int mq_tget ( struct mqueue * queue, struct msg ** msg, int istimed, const struct timespec *abstime)
     326/* The internal function for fd_fifo_timedget and fd_fifo_get */
     327static int fifo_tget ( struct fifo * queue, void ** item, int istimed, const struct timespec *abstime)
    328328{
    329329        int timedout = 0;
     
    331331       
    332332        /* Check the parameters */
    333         CHECK_PARAMS( CHECK_QUEUE( queue ) && msg && (abstime || !istimed) );
    334        
    335         /* Initialize the msg value */
    336         *msg = NULL;
     333        CHECK_PARAMS( CHECK_FIFO( queue ) && item && (abstime || !istimed) );
     334       
     335        /* Initialize the return value */
     336        *item = NULL;
    337337       
    338338        /* lock the queue */
     
    342342        /* Check queue status */
    343343        if (queue->count > 0) {
    344                 /* There are messages in the queue, so pick the first one */
    345                 *msg = mq_pop(queue);
     344                /* There are items in the queue, so pick the first one */
     345                *item = mq_pop(queue);
    346346                call_cb = test_l_cb(queue);
    347347        } else {
    348348                int ret = 0;
    349                 /* We have to wait for a new message */
     349                /* We have to wait for a new item */
    350350                queue->thrs++ ;
    351                 pthread_cleanup_push( mq_cleanup, queue);
     351                pthread_cleanup_push( fifo_cleanup, queue);
    352352                if (istimed) {
    353353                        ret = pthread_cond_timedwait( &queue->cond, &queue->mtx, abstime );
     
    379379}
    380380
    381 /* Get the next available message, block until there is one */
    382 int fd_mq_get ( struct mqueue * queue, struct msg ** msg )
    383 {
    384         TRACE_ENTRY( "%p %p", queue, msg );
    385         return mq_tget(queue, msg, 0, NULL);
    386 }
    387 
    388 /* Get the next available message, block until there is one, or the timeout expires */
    389 int fd_mq_timedget ( struct mqueue * queue, struct msg ** msg, const struct timespec *abstime )
    390 {
    391         TRACE_ENTRY( "%p %p %p", queue, msg, abstime );
    392         return mq_tget(queue, msg, 1, abstime);
    393 }
    394 
     381/* Get the next available item, block until there is one */
     382int fd_fifo_get_int ( struct fifo * queue, void ** item )
     383{
     384        TRACE_ENTRY( "%p %p", queue, item );
     385        return fifo_tget(queue, item, 0, NULL);
     386}
     387
     388/* Get the next available item, block until there is one, or the timeout expires */
     389int fd_fifo_timedget_int ( struct fifo * queue, void ** item, const struct timespec *abstime )
     390{
     391        TRACE_ENTRY( "%p %p %p", queue, item, abstime );
     392        return fifo_tget(queue, item, 1, abstime);
     393}
     394
Note: See TracChangeset for help on using the changeset viewer.