Changeset 767:c47c16436f71 in freeDiameter
- Timestamp:
- Oct 24, 2011, 6:43:32 AM (13 years ago)
- Branch:
- default
- Phase:
- public
- Files:
-
- 12 edited
Legend:
- Unmodified
- Added
- Removed
-
extensions/app_radgw/rgw_worker.c
r741 r767 327 327 memset(workers, 0, sizeof(workers)); 328 328 329 CHECK_FCT( fd_fifo_new ( &work_stack ) );329 CHECK_FCT( fd_fifo_new ( &work_stack, 30 ) ); 330 330 331 331 /* Create the worker thread(s) */ -
extensions/dbg_interactive/queues.i
r741 r767 42 42 43 43 %extend fifo { 44 fifo( ) {44 fifo(int max = 0) { 45 45 struct fifo * q = NULL; 46 int ret = fd_fifo_new(&q );46 int ret = fd_fifo_new(&q, max); 47 47 if (ret != 0) { 48 48 DI_ERROR(ret, NULL, NULL); … … 147 147 ret = fd_fifo_tryget($self, &obj); 148 148 if (ret == EWOULDBLOCK) { 149 Py_ XINCREF(Py_None);149 Py_INCREF(Py_None); 150 150 return Py_None; 151 151 } … … 182 182 ret = fd_fifo_timedget($self, &obj, &ts); 183 183 if (ret == ETIMEDOUT) { 184 Py_ XINCREF(Py_None);184 Py_INCREF(Py_None); 185 185 return Py_None; 186 186 } -
include/freeDiameter/libfdproto.h
r764 r767 2759 2759 * PARAMETERS: 2760 2760 * queue : Upon success, a pointer to the new queue is saved here. 2761 * max : max number of items in the queue. Above this number, adding a new item becomes a 2762 * blocking operation. Use 0 to disable this maximum. 2761 2763 * 2762 2764 * DESCRIPTION: … … 2768 2770 * ENOMEM : Not enough memory to complete the creation. 2769 2771 */ 2770 int fd_fifo_new ( struct fifo ** queue );2772 int fd_fifo_new ( struct fifo ** queue, int max ); 2771 2773 2772 2774 /* -
libfdcore/cnxctx.c
r740 r767 90 90 91 91 if (full) { 92 CHECK_FCT_DO( fd_fifo_new ( &conn->cc_incoming ), return NULL );92 CHECK_FCT_DO( fd_fifo_new ( &conn->cc_incoming, 5 ), return NULL ); 93 93 } 94 94 -
libfdcore/config.c
r740 r767 69 69 70 70 CHECK_FCT( fd_dict_init(&fd_g_config->cnf_dict) ); 71 CHECK_FCT( fd_fifo_new(&fd_g_config->cnf_main_ev ) );71 CHECK_FCT( fd_fifo_new(&fd_g_config->cnf_main_ev, 0) ); 72 72 73 73 /* TLS parameters */ -
libfdcore/p_psm.c
r740 r767 856 856 857 857 /* Create the FIFO for events */ 858 CHECK_FCT( fd_fifo_new(&peer->p_events ) );858 CHECK_FCT( fd_fifo_new(&peer->p_events, 0) ); 859 859 860 860 /* Create the PSM controler thread */ -
libfdcore/peers.c
r749 r767 77 77 fd_list_init(&p->p_actives, p); 78 78 fd_list_init(&p->p_expiry, p); 79 CHECK_FCT( fd_fifo_new(&p->p_tosend ) );79 CHECK_FCT( fd_fifo_new(&p->p_tosend, 5) ); 80 80 p->p_hbh = lrand48(); 81 81 -
libfdcore/queues.c
r740 r767 45 45 { 46 46 TRACE_ENTRY(); 47 CHECK_FCT( fd_fifo_new ( &fd_g_incoming ) );48 CHECK_FCT( fd_fifo_new ( &fd_g_outgoing ) );49 CHECK_FCT( fd_fifo_new ( &fd_g_local ) );47 CHECK_FCT( fd_fifo_new ( &fd_g_incoming, 20 ) ); 48 CHECK_FCT( fd_fifo_new ( &fd_g_outgoing, 30 ) ); 49 CHECK_FCT( fd_fifo_new ( &fd_g_local, 25 ) ); 50 50 return 0; 51 51 } -
libfdcore/sctps.c
r740 r767 497 497 conn->cc_sctps_data.array[i].parent = conn; 498 498 conn->cc_sctps_data.array[i].strid = i; 499 CHECK_FCT( fd_fifo_new(&conn->cc_sctps_data.array[i].raw_recv ) );499 CHECK_FCT( fd_fifo_new(&conn->cc_sctps_data.array[i].raw_recv, 10) ); 500 500 } 501 501 -
libfdproto/fifo.c
r740 r767 53 53 54 54 pthread_mutex_t mtx; /* Mutex protecting this queue */ 55 pthread_cond_t cond; /* condition variable of the list */ 55 pthread_cond_t cond_pull; /* condition variable for pulling threads */ 56 pthread_cond_t cond_push; /* condition variable for pushing threads */ 56 57 57 58 struct fd_list list; /* sentinel for the list of elements */ 58 59 int count; /* number of objects in the list */ 59 60 int thrs; /* number of threads waiting for a new element (when count is 0) */ 61 62 int max; /* maximum number of items to accept if not 0 */ 63 int thrs_push; /* number of threads waitnig to push an item */ 60 64 61 65 uint16_t high; /* High level threshold (see libfreeDiameter.h for details) */ … … 75 79 76 80 77 /* Create a new queue */78 int fd_fifo_new ( struct fifo ** queue )81 /* Create a new queue, with max number of items -- use 0 for no max */ 82 int fd_fifo_new ( struct fifo ** queue, int max ) 79 83 { 80 84 struct fifo * new; … … 92 96 new->eyec = FIFO_EYEC; 93 97 CHECK_POSIX( pthread_mutex_init(&new->mtx, NULL) ); 94 CHECK_POSIX( pthread_cond_init(&new->cond, NULL) ); 98 CHECK_POSIX( pthread_cond_init(&new->cond_pull, NULL) ); 99 CHECK_POSIX( pthread_cond_init(&new->cond_push, NULL) ); 100 new->max = max; 95 101 96 102 fd_list_init(&new->list, NULL); … … 119 125 CHECK_POSIX_DO( pthread_mutex_lock( &queue->mtx ), /* continue */ ); 120 126 fd_log_debug(" %d elements in queue / %d threads waiting\n", queue->count, queue->thrs); 127 fd_log_debug(" %d elements max / %d threads waiting to push\n", queue->max, queue->thrs_push); 121 128 fd_log_debug(" thresholds: %d / %d (h:%d), cb: %p,%p (%p), highest: %d\n", 122 129 queue->high, queue->low, queue->highest, … … 162 169 while (q->thrs) { 163 170 CHECK_POSIX( pthread_mutex_unlock( &q->mtx )); 164 CHECK_POSIX( pthread_cond_signal(&q->cond) ); 165 sched_yield(); 166 if (loops >= 10) 167 /* sleep for a few milliseconds */ 168 usleep(50000); 171 CHECK_POSIX( pthread_cond_signal(&q->cond_pull) ); 172 usleep(1000); 169 173 170 174 CHECK_POSIX( pthread_mutex_lock( &q->mtx ) ); … … 178 182 CHECK_POSIX( pthread_mutex_unlock( &q->mtx ) ); 179 183 180 CHECK_POSIX_DO( pthread_cond_destroy( &q->cond ), ); 184 CHECK_POSIX_DO( pthread_cond_destroy( &q->cond_pull ), ); 185 186 CHECK_POSIX_DO( pthread_cond_destroy( &q->cond_push ), ); 181 187 182 188 CHECK_POSIX_DO( pthread_mutex_destroy( &q->mtx ), ); … … 207 213 /* Lock the queues */ 208 214 CHECK_POSIX( pthread_mutex_lock( &old->mtx ) ); 215 216 CHECK_PARAMS_DO( (! old->thrs_push), { 217 pthread_mutex_unlock( &old->mtx ); 218 return EINVAL; 219 } ); 220 209 221 CHECK_POSIX( pthread_mutex_lock( &new->mtx ) ); 210 222 … … 213 225 while (old->thrs) { 214 226 CHECK_POSIX( pthread_mutex_unlock( &old->mtx )); 215 CHECK_POSIX( pthread_cond_signal(&old->cond) ); 216 sched_yield(); 217 if (loops >= 10) 218 /* sleep for a few milliseconds */ 219 usleep(50000); 227 CHECK_POSIX( pthread_cond_signal( &old->cond_pull ) ); 228 usleep(1000); 220 229 221 230 CHECK_POSIX( pthread_mutex_lock( &old->mtx ) ); 222 ASSERT( ++loops < 20 ); /* detect infinite loops */231 ASSERT( loops < 20 ); /* detect infinite loops */ 223 232 } 224 233 … … 226 235 fd_list_move_end( &new->list, &old->list ); 227 236 if (old->count && (!new->count)) { 228 CHECK_POSIX( pthread_cond_signal(&new->cond ) );237 CHECK_POSIX( pthread_cond_signal(&new->cond_pull) ); 229 238 } 230 239 new->count += old->count; … … 296 305 } 297 306 307 308 /* This handler is called when a thread is blocked on a queue, and cancelled */ 309 static void fifo_cleanup_push(void * queue) 310 { 311 struct fifo * q = (struct fifo *)queue; 312 TRACE_ENTRY( "%p", queue ); 313 314 /* The thread has been cancelled, therefore it does not wait on the queue anymore */ 315 q->thrs_push--; 316 317 /* Now unlock the queue, and we're done */ 318 CHECK_POSIX_DO( pthread_mutex_unlock( &q->mtx ), /* nothing */ ); 319 320 /* End of cleanup handler */ 321 return; 322 } 323 324 298 325 /* Post a new item in the queue */ 299 326 int fd_fifo_post_int ( struct fifo * queue, void ** item ) … … 307 334 CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item ); 308 335 336 /* lock the queue */ 337 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); 338 339 if (queue->max) { 340 while (queue->count >= queue->max) { 341 int ret = 0; 342 343 /* We have to wait for an item to be pulled */ 344 queue->thrs_push++ ; 345 pthread_cleanup_push( fifo_cleanup_push, queue); 346 ret = pthread_cond_wait( &queue->cond_push, &queue->mtx ); 347 pthread_cleanup_pop(0); 348 queue->thrs_push-- ; 349 350 ASSERT( ret == 0 ); 351 } 352 } 353 309 354 /* Create a new list item */ 310 CHECK_MALLOC( new = malloc (sizeof (struct fd_list)) ); 355 CHECK_MALLOC_DO( new = malloc (sizeof (struct fd_list)) , { 356 pthread_mutex_unlock( &queue->mtx ); 357 } ); 311 358 312 359 fd_list_init(new, *item); 313 360 *item = NULL; 314 315 /* lock the queue */316 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) );317 361 318 362 /* Add the new item at the end */ … … 328 372 /* Signal if threads are asleep */ 329 373 if (queue->thrs > 0) { 330 CHECK_POSIX( pthread_cond_signal(&queue->cond) ); 374 CHECK_POSIX( pthread_cond_signal(&queue->cond_pull) ); 375 } 376 if (queue->thrs_push > 0) { 377 /* cascade */ 378 CHECK_POSIX( pthread_cond_signal(&queue->cond_push) ); 331 379 } 332 380 … … 355 403 free(li); 356 404 405 if (queue->thrs_push) { 406 CHECK_POSIX_DO( pthread_cond_signal( &queue->cond_push ), ); 407 } 408 357 409 return ret; 358 410 } … … 388 440 /* Check queue status */ 389 441 if (queue->count > 0) { 442 got_item: 390 443 /* There are elements in the queue, so pick the first one */ 391 444 *item = mq_pop(queue); 392 445 call_cb = test_l_cb(queue); 393 446 } else { 447 if (queue->thrs_push > 0) { 448 /* A thread is trying to push something, let's give it a chance */ 449 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); 450 CHECK_POSIX( pthread_cond_signal( &queue->cond_push ) ); 451 usleep(1000); 452 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); 453 if (queue->count > 0) 454 goto got_item; 455 } 456 394 457 wouldblock = 1; 395 458 *item = NULL; … … 457 520 pthread_cleanup_push( fifo_cleanup, queue); 458 521 if (istimed) { 459 ret = pthread_cond_timedwait( &queue->cond , &queue->mtx, abstime );522 ret = pthread_cond_timedwait( &queue->cond_pull, &queue->mtx, abstime ); 460 523 } else { 461 ret = pthread_cond_wait( &queue->cond , &queue->mtx );524 ret = pthread_cond_wait( &queue->cond_pull, &queue->mtx ); 462 525 } 463 526 pthread_cleanup_pop(0); -
tests/testcnx.c
r740 r767 1499 1499 /* fd_cnx_recv_setaltfifo */ 1500 1500 CHECK( 0, fd_cnx_send(client_side, cer_buf, cer_sz, 0)); 1501 CHECK( 0, fd_fifo_new(&myfifo ) );1501 CHECK( 0, fd_fifo_new(&myfifo, 0) ); 1502 1502 CHECK( 0, fd_cnx_recv_setaltfifo(server_side, myfifo) ); 1503 1503 CHECK( 0, clock_gettime(CLOCK_REALTIME, &now) ); … … 1591 1591 /* fd_cnx_recv_setaltfifo */ 1592 1592 CHECK( 0, fd_cnx_send(client_side, cer_buf, cer_sz, 0)); 1593 CHECK( 0, fd_fifo_new(&myfifo ) );1593 CHECK( 0, fd_fifo_new(&myfifo, 0) ); 1594 1594 CHECK( 0, fd_cnx_recv_setaltfifo(server_side, myfifo) ); 1595 1595 CHECK( 0, clock_gettime(CLOCK_REALTIME, &now) ); -
tests/testfifo.c
r751 r767 111 111 } 112 112 113 /* The test function, to be threaded */ 114 static int iter = 0; 115 static void * test_fct2(void * data) 116 { 117 int i; 118 int * item; 119 struct test_data * td = (struct test_data *) data; 120 121 for (i=0; i< td->nbr; i++) { 122 item = malloc(sizeof(int)); 123 CHECK( 1, item ? 1 : 0 ); 124 *item = i; 125 CHECK( 0, fd_fifo_post(td->queue, &item) ); 126 iter++; 127 } 128 129 return NULL; 130 } 131 113 132 114 133 /* Main test routine */ … … 145 164 146 165 /* Create the queue */ 147 CHECK( 0, fd_fifo_new(&queue ) );166 CHECK( 0, fd_fifo_new(&queue, 0) ); 148 167 149 168 /* Check the count is 0 */ … … 233 252 234 253 /* Create the queue */ 235 CHECK( 0, fd_fifo_new(&queue ) );254 CHECK( 0, fd_fifo_new(&queue, 0) ); 236 255 237 256 /* Create the barrier */ … … 303 322 304 323 /* Create the queue */ 305 CHECK( 0, fd_fifo_new(&queue ) );324 CHECK( 0, fd_fifo_new(&queue, 0) ); 306 325 307 326 /* Create the barrier */ … … 370 389 371 390 /* Create the queue */ 372 CHECK( 0, fd_fifo_new(&queue ) );391 CHECK( 0, fd_fifo_new(&queue, 0) ); 373 392 374 393 /* Prepare the test data */ … … 442 461 } 443 462 463 /* Test max queue limit */ 464 { 465 struct fifo *queue = NULL; 466 struct test_data td; 467 pthread_t th; 468 int * item, i; 469 470 /* Create the queue */ 471 CHECK( 0, fd_fifo_new(&queue, 10) ); 472 473 /* Initialize the test data structures */ 474 td.queue = queue; 475 td.nbr = 15; 476 477 CHECK( 0, pthread_create( &th, NULL, test_fct2, &td ) ); 478 479 usleep(1000); /* 1 millisec */ 480 481 CHECK( 10, iter ); 482 483 CHECK( 0, fd_fifo_tryget(queue, &item) ); 484 CHECK( 0, *item); 485 free(item); 486 487 usleep(1000); /* 1 millisec */ 488 489 CHECK( 11, iter ); 490 491 for (i=1; i<4; i++) { 492 CHECK( 0, fd_fifo_get(queue, &item) ); 493 CHECK( i, *item); 494 free(item); 495 } 496 497 usleep(1000); /* 1 millisec */ 498 499 CHECK( 14, iter ); 500 501 /* fd_fifo_dump(0, "test", queue, NULL); */ 502 503 for (; i < td.nbr; i++) { 504 CHECK( 0, fd_fifo_tryget(queue, &item) ); 505 CHECK( i, *item); 506 free(item); 507 } 508 509 CHECK( 0, pthread_join( th, NULL ) ); 510 CHECK( 15, iter ); 511 512 } 513 444 514 /* Delete the messages */ 445 515 CHECK( 0, fd_msg_free( msg1 ) );
Note: See TracChangeset
for help on using the changeset viewer.