Changeset 8:3e143f047f78 in freeDiameter for libfreeDiameter/queues.c
- Timestamp:
- Sep 18, 2009, 6:54:07 PM (15 years ago)
- Branch:
- default
- Phase:
- public
- File:
-
- 1 moved
Legend:
- Unmodified
- Added
- Removed
-
libfreeDiameter/queues.c
r1 r8 34 34 *********************************************************************************************************/ 35 35 36 /* Messagesqueues module.36 /* FIFO queues module. 37 37 * 38 38 * The threads that call these functions must be in the cancellation state PTHREAD_CANCEL_ENABLE and type PTHREAD_CANCEL_DEFERRED. … … 42 42 * -> shutdown any process that can add into the queue first. 43 43 * -> pthread_cancel any thread that could be waiting on the queue. 44 * -> consume any message that is in the queue, using meq_tryget.45 * -> then destroy the queue using meq_del.44 * -> consume any element that is in the queue, using fd_qu_tryget_int. 45 * -> then destroy the queue using fd_mq_del. 46 46 */ 47 47 48 48 #include "libfD.h" 49 49 50 /* Definition of a messagequeue object */51 struct mqueue{52 int eyec; /* An eye catcher, also used to check a queue is valid. MQ_EYEC */50 /* Definition of a FIFO queue object */ 51 struct fifo { 52 int eyec; /* An eye catcher, also used to check a queue is valid. FIFO_EYEC */ 53 53 54 54 pthread_mutex_t mtx; /* Mutex protecting this queue */ 55 55 pthread_cond_t cond; /* condition variable of the list */ 56 56 57 struct fd_list list; /* sentinel for the list of messages */57 struct fd_list list; /* sentinel for the list of elements */ 58 58 int count; /* number of objects in the list */ 59 int thrs; /* number of threads waiting for a new message(when count is 0) */59 int thrs; /* number of threads waiting for a new element (when count is 0) */ 60 60 61 61 uint16_t high; /* High level threshold (see libfreeDiameter.h for details) */ 62 62 uint16_t low; /* Low level threshhold */ 63 63 void *data; /* Opaque pointer for threshold callbacks */ 64 void (*h_cb)(struct mqueue*, void **); /* The callbacks */65 void (*l_cb)(struct mqueue*, void **);64 void (*h_cb)(struct fifo *, void **); /* The callbacks */ 65 void (*l_cb)(struct fifo *, void **); 66 66 int highest;/* The highest count value for which h_cb has been called */ 67 67 }; 68 68 69 69 /* The eye catcher value */ 70 #define MQ_EYEC 0xe7ec113070 #define FIFO_EYEC 0xe7ec1130 71 71 72 72 /* Macro to check a pointer */ 73 #define CHECK_ QUEUE( _queue ) (( (_queue) != NULL) && ( (_queue)->eyec == MQ_EYEC) )74 75 76 /* Create a new messagequeue */77 int fd_ mq_new ( struct mqueue** queue )78 { 79 struct mqueue* new;73 #define CHECK_FIFO( _queue ) (( (_queue) != NULL) && ( (_queue)->eyec == FIFO_EYEC) ) 74 75 76 /* Create a new queue */ 77 int fd_fifo_new ( struct fifo ** queue ) 78 { 79 struct fifo * new; 80 80 81 81 TRACE_ENTRY( "%p", queue ); … … 84 84 85 85 /* Create a new object */ 86 CHECK_MALLOC( new = malloc (sizeof (struct mqueue) ) );86 CHECK_MALLOC( new = malloc (sizeof (struct fifo) ) ); 87 87 88 88 /* Initialize the content */ 89 memset(new, 0, sizeof(struct mqueue));90 91 new->eyec = MQ_EYEC;89 memset(new, 0, sizeof(struct fifo)); 90 91 new->eyec = FIFO_EYEC; 92 92 CHECK_POSIX( pthread_mutex_init(&new->mtx, NULL) ); 93 93 CHECK_POSIX( pthread_cond_init(&new->cond, NULL) ); … … 100 100 } 101 101 102 /* Delete a messagequeue. It must be unused. */103 int fd_ mq_del ( struct mqueue** queue )104 { 105 struct mqueue* q;102 /* Delete a queue. It must be unused. */ 103 int fd_fifo_del ( struct fifo ** queue ) 104 { 105 struct fifo * q; 106 106 107 107 TRACE_ENTRY( "%p", queue ); 108 108 109 CHECK_PARAMS( queue && CHECK_ QUEUE( *queue ) );109 CHECK_PARAMS( queue && CHECK_FIFO( *queue ) ); 110 110 111 111 q = *queue; … … 139 139 140 140 /* Get the length of the queue */ 141 int fd_ mq_length ( struct mqueue* queue, int * length )141 int fd_fifo_length ( struct fifo * queue, int * length ) 142 142 { 143 143 TRACE_ENTRY( "%p %p", queue, length ); 144 144 145 145 /* Check the parameters */ 146 CHECK_PARAMS( CHECK_ QUEUE( queue ) && length );146 CHECK_PARAMS( CHECK_FIFO( queue ) && length ); 147 147 148 148 /* lock the queue */ … … 160 160 161 161 /* alternate version with no error checking */ 162 int fd_ mq_length_noerr ( struct mqueue* queue )163 { 164 if ( !CHECK_ QUEUE( queue ) )162 int fd_fifo_length_noerr ( struct fifo * queue ) 163 { 164 if ( !CHECK_FIFO( queue ) ) 165 165 return 0; 166 166 … … 169 169 170 170 /* Set the thresholds of the queue */ 171 int fd_ mq_setthrhd ( struct mqueue * queue, void * data, uint16_t high, void (*h_cb)(struct mqueue *, void **), uint16_t low, void (*l_cb)(struct mqueue*, void **) )171 int fd_fifo_setthrhd ( struct fifo * queue, void * data, uint16_t high, void (*h_cb)(struct fifo *, void **), uint16_t low, void (*l_cb)(struct fifo *, void **) ) 172 172 { 173 173 TRACE_ENTRY( "%p %p %hu %p %hu %p", queue, data, high, h_cb, low, l_cb ); 174 174 175 175 /* Check the parameters */ 176 CHECK_PARAMS( CHECK_ QUEUE( queue ) && (high > low) && (queue->data == NULL) );176 CHECK_PARAMS( CHECK_FIFO( queue ) && (high > low) && (queue->data == NULL) ); 177 177 178 178 /* lock the queue */ … … 193 193 } 194 194 195 /* Post a new messagein the queue */196 int fd_ mq_post ( struct mqueue * queue, struct msg ** msg)195 /* Post a new item in the queue */ 196 int fd_fifo_post_int ( struct fifo * queue, void ** item ) 197 197 { 198 198 struct fd_list * new; 199 199 int call_cb = 0; 200 200 201 TRACE_ENTRY( "%p %p", queue, msg);202 203 /* Check the parameters */ 204 CHECK_PARAMS( CHECK_ QUEUE( queue ) && msg && *msg);201 TRACE_ENTRY( "%p %p", queue, item ); 202 203 /* Check the parameters */ 204 CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item ); 205 205 206 206 /* Create a new list item */ 207 207 CHECK_MALLOC( new = malloc (sizeof (struct fd_list)) ); 208 208 209 fd_list_init(new, * msg);210 * msg= NULL;211 212 /* lock the queue */ 213 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); 214 215 /* Add the new messageat the end */209 fd_list_init(new, *item); 210 *item = NULL; 211 212 /* lock the queue */ 213 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); 214 215 /* Add the new item at the end */ 216 216 fd_list_insert_before( &queue->list, new); 217 217 queue->count++; … … 237 237 } 238 238 239 /* Pop the first messagefrom the queue */240 static struct msg * mq_pop(struct mqueue* queue)241 { 242 struct msg* ret = NULL;239 /* Pop the first item from the queue */ 240 static void * mq_pop(struct fifo * queue) 241 { 242 void * ret = NULL; 243 243 struct fd_list * li; 244 244 … … 247 247 fd_list_unlink(li = queue->list.next); 248 248 queue->count--; 249 ret = (struct msg *)(li->o);249 ret = li->o; 250 250 free(li); 251 251 … … 254 254 255 255 /* Check if the low watermark callback must be called. */ 256 static int test_l_cb(struct mqueue* queue)256 static __inline__ int test_l_cb(struct fifo * queue) 257 257 { 258 258 if ((queue->high == 0) || (queue->low == 0) || (queue->l_cb == 0)) … … 267 267 } 268 268 269 /* Try poping a message*/270 int fd_ mq_tryget ( struct mqueue * queue, struct msg ** msg)269 /* Try poping an item */ 270 int fd_fifo_tryget_int ( struct fifo * queue, void ** item ) 271 271 { 272 272 int wouldblock = 0; 273 273 int call_cb = 0; 274 274 275 TRACE_ENTRY( "%p %p", queue, msg);276 277 /* Check the parameters */ 278 CHECK_PARAMS( CHECK_ QUEUE( queue ) && msg);275 TRACE_ENTRY( "%p %p", queue, item ); 276 277 /* Check the parameters */ 278 CHECK_PARAMS( CHECK_FIFO( queue ) && item ); 279 279 280 280 /* lock the queue */ … … 283 283 /* Check queue status */ 284 284 if (queue->count > 0) { 285 /* There are messages in the queue, so pick the first one */286 * msg= mq_pop(queue);285 /* There are elements in the queue, so pick the first one */ 286 *item = mq_pop(queue); 287 287 call_cb = test_l_cb(queue); 288 288 } else { 289 289 wouldblock = 1; 290 * msg= NULL;290 *item = NULL; 291 291 } 292 292 … … 303 303 304 304 /* This handler is called when a thread is blocked on a queue, and cancelled */ 305 static void mq_cleanup(void * queue)306 { 307 struct mqueue * q = (struct mqueue*)queue;305 static void fifo_cleanup(void * queue) 306 { 307 struct fifo * q = (struct fifo *)queue; 308 308 TRACE_ENTRY( "%p", queue ); 309 309 310 310 /* Check the parameter */ 311 if ( ! CHECK_ QUEUE( q )) {311 if ( ! CHECK_FIFO( q )) { 312 312 TRACE_DEBUG(INFO, "Invalid queue, skipping handler"); 313 313 return; … … 324 324 } 325 325 326 /* The internal function for meq_timedget and meq_get */327 static int mq_tget ( struct mqueue * queue, struct msg ** msg, int istimed, const struct timespec *abstime)326 /* The internal function for fd_fifo_timedget and fd_fifo_get */ 327 static int fifo_tget ( struct fifo * queue, void ** item, int istimed, const struct timespec *abstime) 328 328 { 329 329 int timedout = 0; … … 331 331 332 332 /* Check the parameters */ 333 CHECK_PARAMS( CHECK_ QUEUE( queue ) && msg&& (abstime || !istimed) );334 335 /* Initialize the msgvalue */336 * msg= NULL;333 CHECK_PARAMS( CHECK_FIFO( queue ) && item && (abstime || !istimed) ); 334 335 /* Initialize the return value */ 336 *item = NULL; 337 337 338 338 /* lock the queue */ … … 342 342 /* Check queue status */ 343 343 if (queue->count > 0) { 344 /* There are messages in the queue, so pick the first one */345 * msg= mq_pop(queue);344 /* There are items in the queue, so pick the first one */ 345 *item = mq_pop(queue); 346 346 call_cb = test_l_cb(queue); 347 347 } else { 348 348 int ret = 0; 349 /* We have to wait for a new message*/349 /* We have to wait for a new item */ 350 350 queue->thrs++ ; 351 pthread_cleanup_push( mq_cleanup, queue);351 pthread_cleanup_push( fifo_cleanup, queue); 352 352 if (istimed) { 353 353 ret = pthread_cond_timedwait( &queue->cond, &queue->mtx, abstime ); … … 379 379 } 380 380 381 /* Get the next available message, block until there is one */382 int fd_ mq_get ( struct mqueue * queue, struct msg ** msg)383 { 384 TRACE_ENTRY( "%p %p", queue, msg);385 return mq_tget(queue, msg, 0, NULL);386 } 387 388 /* Get the next available message, block until there is one, or the timeout expires */389 int fd_ mq_timedget ( struct mqueue * queue, struct msg ** msg, const struct timespec *abstime )390 { 391 TRACE_ENTRY( "%p %p %p", queue, msg, abstime );392 return mq_tget(queue, msg, 1, abstime);393 } 394 381 /* Get the next available item, block until there is one */ 382 int fd_fifo_get_int ( struct fifo * queue, void ** item ) 383 { 384 TRACE_ENTRY( "%p %p", queue, item ); 385 return fifo_tget(queue, item, 0, NULL); 386 } 387 388 /* Get the next available item, block until there is one, or the timeout expires */ 389 int fd_fifo_timedget_int ( struct fifo * queue, void ** item, const struct timespec *abstime ) 390 { 391 TRACE_ENTRY( "%p %p %p", queue, item, abstime ); 392 return fifo_tget(queue, item, 1, abstime); 393 } 394
Note: See TracChangeset
for help on using the changeset viewer.