Changeset 767:c47c16436f71 in freeDiameter for libfdproto/fifo.c
- Timestamp:
- Oct 24, 2011, 6:43:32 AM (13 years ago)
- Branch:
- default
- Phase:
- public
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libfdproto/fifo.c
r740 r767 53 53 54 54 pthread_mutex_t mtx; /* Mutex protecting this queue */ 55 pthread_cond_t cond; /* condition variable of the list */ 55 pthread_cond_t cond_pull; /* condition variable for pulling threads */ 56 pthread_cond_t cond_push; /* condition variable for pushing threads */ 56 57 57 58 struct fd_list list; /* sentinel for the list of elements */ 58 59 int count; /* number of objects in the list */ 59 60 int thrs; /* number of threads waiting for a new element (when count is 0) */ 61 62 int max; /* maximum number of items to accept if not 0 */ 63 int thrs_push; /* number of threads waitnig to push an item */ 60 64 61 65 uint16_t high; /* High level threshold (see libfreeDiameter.h for details) */ … … 75 79 76 80 77 /* Create a new queue */78 int fd_fifo_new ( struct fifo ** queue )81 /* Create a new queue, with max number of items -- use 0 for no max */ 82 int fd_fifo_new ( struct fifo ** queue, int max ) 79 83 { 80 84 struct fifo * new; … … 92 96 new->eyec = FIFO_EYEC; 93 97 CHECK_POSIX( pthread_mutex_init(&new->mtx, NULL) ); 94 CHECK_POSIX( pthread_cond_init(&new->cond, NULL) ); 98 CHECK_POSIX( pthread_cond_init(&new->cond_pull, NULL) ); 99 CHECK_POSIX( pthread_cond_init(&new->cond_push, NULL) ); 100 new->max = max; 95 101 96 102 fd_list_init(&new->list, NULL); … … 119 125 CHECK_POSIX_DO( pthread_mutex_lock( &queue->mtx ), /* continue */ ); 120 126 fd_log_debug(" %d elements in queue / %d threads waiting\n", queue->count, queue->thrs); 127 fd_log_debug(" %d elements max / %d threads waiting to push\n", queue->max, queue->thrs_push); 121 128 fd_log_debug(" thresholds: %d / %d (h:%d), cb: %p,%p (%p), highest: %d\n", 122 129 queue->high, queue->low, queue->highest, … … 162 169 while (q->thrs) { 163 170 CHECK_POSIX( pthread_mutex_unlock( &q->mtx )); 164 CHECK_POSIX( pthread_cond_signal(&q->cond) ); 165 sched_yield(); 166 if (loops >= 10) 167 /* sleep for a few milliseconds */ 168 usleep(50000); 171 CHECK_POSIX( pthread_cond_signal(&q->cond_pull) ); 172 usleep(1000); 169 173 170 174 CHECK_POSIX( pthread_mutex_lock( &q->mtx ) ); … … 178 182 CHECK_POSIX( pthread_mutex_unlock( &q->mtx ) ); 179 183 180 CHECK_POSIX_DO( pthread_cond_destroy( &q->cond ), ); 184 CHECK_POSIX_DO( pthread_cond_destroy( &q->cond_pull ), ); 185 186 CHECK_POSIX_DO( pthread_cond_destroy( &q->cond_push ), ); 181 187 182 188 CHECK_POSIX_DO( pthread_mutex_destroy( &q->mtx ), ); … … 207 213 /* Lock the queues */ 208 214 CHECK_POSIX( pthread_mutex_lock( &old->mtx ) ); 215 216 CHECK_PARAMS_DO( (! old->thrs_push), { 217 pthread_mutex_unlock( &old->mtx ); 218 return EINVAL; 219 } ); 220 209 221 CHECK_POSIX( pthread_mutex_lock( &new->mtx ) ); 210 222 … … 213 225 while (old->thrs) { 214 226 CHECK_POSIX( pthread_mutex_unlock( &old->mtx )); 215 CHECK_POSIX( pthread_cond_signal(&old->cond) ); 216 sched_yield(); 217 if (loops >= 10) 218 /* sleep for a few milliseconds */ 219 usleep(50000); 227 CHECK_POSIX( pthread_cond_signal( &old->cond_pull ) ); 228 usleep(1000); 220 229 221 230 CHECK_POSIX( pthread_mutex_lock( &old->mtx ) ); 222 ASSERT( ++loops < 20 ); /* detect infinite loops */231 ASSERT( loops < 20 ); /* detect infinite loops */ 223 232 } 224 233 … … 226 235 fd_list_move_end( &new->list, &old->list ); 227 236 if (old->count && (!new->count)) { 228 CHECK_POSIX( pthread_cond_signal(&new->cond ) );237 CHECK_POSIX( pthread_cond_signal(&new->cond_pull) ); 229 238 } 230 239 new->count += old->count; … … 296 305 } 297 306 307 308 /* This handler is called when a thread is blocked on a queue, and cancelled */ 309 static void fifo_cleanup_push(void * queue) 310 { 311 struct fifo * q = (struct fifo *)queue; 312 TRACE_ENTRY( "%p", queue ); 313 314 /* The thread has been cancelled, therefore it does not wait on the queue anymore */ 315 q->thrs_push--; 316 317 /* Now unlock the queue, and we're done */ 318 CHECK_POSIX_DO( pthread_mutex_unlock( &q->mtx ), /* nothing */ ); 319 320 /* End of cleanup handler */ 321 return; 322 } 323 324 298 325 /* Post a new item in the queue */ 299 326 int fd_fifo_post_int ( struct fifo * queue, void ** item ) … … 307 334 CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item ); 308 335 336 /* lock the queue */ 337 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); 338 339 if (queue->max) { 340 while (queue->count >= queue->max) { 341 int ret = 0; 342 343 /* We have to wait for an item to be pulled */ 344 queue->thrs_push++ ; 345 pthread_cleanup_push( fifo_cleanup_push, queue); 346 ret = pthread_cond_wait( &queue->cond_push, &queue->mtx ); 347 pthread_cleanup_pop(0); 348 queue->thrs_push-- ; 349 350 ASSERT( ret == 0 ); 351 } 352 } 353 309 354 /* Create a new list item */ 310 CHECK_MALLOC( new = malloc (sizeof (struct fd_list)) ); 355 CHECK_MALLOC_DO( new = malloc (sizeof (struct fd_list)) , { 356 pthread_mutex_unlock( &queue->mtx ); 357 } ); 311 358 312 359 fd_list_init(new, *item); 313 360 *item = NULL; 314 315 /* lock the queue */316 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) );317 361 318 362 /* Add the new item at the end */ … … 328 372 /* Signal if threads are asleep */ 329 373 if (queue->thrs > 0) { 330 CHECK_POSIX( pthread_cond_signal(&queue->cond) ); 374 CHECK_POSIX( pthread_cond_signal(&queue->cond_pull) ); 375 } 376 if (queue->thrs_push > 0) { 377 /* cascade */ 378 CHECK_POSIX( pthread_cond_signal(&queue->cond_push) ); 331 379 } 332 380 … … 355 403 free(li); 356 404 405 if (queue->thrs_push) { 406 CHECK_POSIX_DO( pthread_cond_signal( &queue->cond_push ), ); 407 } 408 357 409 return ret; 358 410 } … … 388 440 /* Check queue status */ 389 441 if (queue->count > 0) { 442 got_item: 390 443 /* There are elements in the queue, so pick the first one */ 391 444 *item = mq_pop(queue); 392 445 call_cb = test_l_cb(queue); 393 446 } else { 447 if (queue->thrs_push > 0) { 448 /* A thread is trying to push something, let's give it a chance */ 449 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); 450 CHECK_POSIX( pthread_cond_signal( &queue->cond_push ) ); 451 usleep(1000); 452 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); 453 if (queue->count > 0) 454 goto got_item; 455 } 456 394 457 wouldblock = 1; 395 458 *item = NULL; … … 457 520 pthread_cleanup_push( fifo_cleanup, queue); 458 521 if (istimed) { 459 ret = pthread_cond_timedwait( &queue->cond , &queue->mtx, abstime );522 ret = pthread_cond_timedwait( &queue->cond_pull, &queue->mtx, abstime ); 460 523 } else { 461 ret = pthread_cond_wait( &queue->cond , &queue->mtx );524 ret = pthread_cond_wait( &queue->cond_pull, &queue->mtx ); 462 525 } 463 526 pthread_cleanup_pop(0);
Note: See TracChangeset
for help on using the changeset viewer.