Changeset 1377:ce257e43085d in freeDiameter
- Timestamp:
- Jun 20, 2019, 7:01:50 PM (5 years ago)
- Branch:
- default
- Phase:
- public
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libfdproto/fifo.c
r1188 r1377 51 51 struct fifo { 52 52 int eyec; /* An eye catcher, also used to check a queue is valid. FIFO_EYEC */ 53 53 54 54 pthread_mutex_t mtx; /* Mutex protecting this queue */ 55 55 pthread_cond_t cond_pull; /* condition variable for pulling threads */ 56 56 pthread_cond_t cond_push; /* condition variable for pushing threads */ 57 57 58 58 struct fd_list list; /* sentinel for the list of elements */ 59 59 int count; /* number of objects in the list */ 60 60 int thrs; /* number of threads waiting for a new element (when count is 0) */ 61 61 62 62 int max; /* maximum number of items to accept if not 0 */ 63 63 int thrs_push; /* number of threads waitnig to push an item */ 64 64 65 65 uint16_t high; /* High level threshold (see libfreeDiameter.h for details) */ 66 66 uint16_t low; /* Low level threshhold */ … … 70 70 int highest;/* The highest count value for which h_cb has been called */ 71 71 int highest_ever; /* The max count value this queue has reached (for tweaking) */ 72 72 73 73 long long total_items; /* Cumulated number of items that went through this fifo (excluding current count), always increasing. */ 74 74 struct timespec total_time; /* Cumulated time all items spent in this queue, including blocking time (always growing, use deltas for monitoring) */ 75 75 struct timespec blocking_time; /* Cumulated time threads trying to post new items were blocked (queue full). */ 76 76 struct timespec last_time; /* For the last element retrieved from the queue, how long it take between posting (including blocking) and poping */ 77 77 78 78 }; 79 79 … … 94 94 { 95 95 struct fifo * new; 96 96 97 97 TRACE_ENTRY( "%p", queue ); 98 98 99 99 CHECK_PARAMS( queue ); 100 100 101 101 /* Create a new object */ 102 102 CHECK_MALLOC( new = malloc (sizeof (struct fifo) ) ); 103 103 104 104 /* Initialize the content */ 105 105 memset(new, 0, sizeof(struct fifo)); 106 106 107 107 new->eyec = FIFO_EYEC; 108 108 CHECK_POSIX( pthread_mutex_init(&new->mtx, NULL) ); … … 110 110 CHECK_POSIX( pthread_cond_init(&new->cond_push, NULL) ); 111 111 new->max = max; 112 112 113 113 fd_list_init(&new->list, NULL); 114 114 115 115 /* We're done */ 116 116 *queue = new; … … 122 122 { 123 123 FD_DUMP_HANDLE_OFFSET(); 124 124 125 125 if (name) { 126 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "'%s'(@%p): ", name, queue), return NULL); 126 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "'%s'(@%p): ", name, queue), return NULL); 127 127 } else { 128 128 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "{fifo}(@%p): ", queue), return NULL); 129 129 } 130 130 131 131 if (!CHECK_FIFO( queue )) { 132 132 return fd_dump_extend(FD_DUMP_STD_PARAMS, "INVALID/NULL"); 133 133 } 134 134 135 135 CHECK_POSIX_DO( pthread_mutex_lock( &queue->mtx ), /* continue */ ); 136 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "items:%d,%d,%d threads:%d,%d stats:%lld/%ld.%06ld,%ld.%06ld,%ld.%06ld thresholds:%d,%d,%d,%p,%p,%p", 136 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "items:%d,%d,%d threads:%d,%d stats:%lld/%ld.%06ld,%ld.%06ld,%ld.%06ld thresholds:%d,%d,%d,%p,%p,%p", 137 137 queue->count, queue->highest_ever, queue->max, 138 138 queue->thrs, queue->thrs_push, 139 139 queue->total_items,(long)queue->total_time.tv_sec,(long)(queue->total_time.tv_nsec/1000),(long)queue->blocking_time.tv_sec,(long)(queue->blocking_time.tv_nsec/1000),(long)queue->last_time.tv_sec,(long)(queue->last_time.tv_nsec/1000), 140 queue->high, queue->low, queue->highest, queue->h_cb, queue->l_cb, queue->data), 140 queue->high, queue->low, queue->highest, queue->h_cb, queue->l_cb, queue->data), 141 141 goto error); 142 142 143 143 if (dump_item) { 144 144 struct fd_list * li; … … 146 146 for (li = queue->list.next; li != &queue->list; li = li->next) { 147 147 struct fifo_item * fi = (struct fifo_item *)li; 148 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n [#%i](@%p)@%ld.%06ld: ", 149 i++, fi->item.o, (long)fi->posted_on.tv_sec,(long)(fi->posted_on.tv_nsec/1000)), 148 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n [#%i](@%p)@%ld.%06ld: ", 149 i++, fi->item.o, (long)fi->posted_on.tv_sec,(long)(fi->posted_on.tv_nsec/1000)), 150 150 goto error); 151 151 CHECK_MALLOC_DO( (*dump_item)(FD_DUMP_STD_PARAMS, fi->item.o), goto error); … … 153 153 } 154 154 CHECK_POSIX_DO( pthread_mutex_unlock( &queue->mtx ), /* continue */ ); 155 155 156 156 return *buf; 157 157 error: … … 160 160 } 161 161 162 /* Delete a queue. It must be empty. */ 162 /* Delete a queue. It must be empty. */ 163 163 int fd_fifo_del ( struct fifo ** queue ) 164 164 { 165 165 struct fifo * q; 166 166 int loops = 0; 167 167 168 168 TRACE_ENTRY( "%p", queue ); 169 169 170 if (queue && *queue == NULL) { 171 /* Queue already (in the process of being) deleted */ 172 return 0; 173 } 174 170 175 CHECK_PARAMS( queue && CHECK_FIFO( *queue ) ); 171 176 172 177 q = *queue; 173 178 174 179 CHECK_POSIX( pthread_mutex_lock( &q->mtx ) ); 175 180 176 181 if ((q->count != 0) || (q->data != NULL)) { 177 182 TRACE_DEBUG(INFO, "The queue cannot be destroyed (%d, %p)", q->count, q->data); … … 179 184 return EINVAL; 180 185 } 181 186 182 187 /* Ok, now invalidate the queue */ 183 188 q->eyec = 0xdead; 184 189 185 190 /* Have all waiting threads return an error */ 186 191 while (q->thrs) { … … 188 193 CHECK_POSIX( pthread_cond_signal(&q->cond_pull) ); 189 194 usleep(1000); 190 195 191 196 CHECK_POSIX( pthread_mutex_lock( &q->mtx ) ); 192 197 ASSERT( ++loops < 20 ); /* detect infinite loops */ 193 198 } 194 199 195 200 /* sanity check */ 196 201 ASSERT(FD_IS_LIST_EMPTY(&q->list)); 197 202 198 203 /* And destroy it */ 199 204 CHECK_POSIX( pthread_mutex_unlock( &q->mtx ) ); 200 205 201 206 CHECK_POSIX_DO( pthread_cond_destroy( &q->cond_pull ), ); 202 207 203 208 CHECK_POSIX_DO( pthread_cond_destroy( &q->cond_push ), ); 204 209 205 210 CHECK_POSIX_DO( pthread_mutex_destroy( &q->mtx ), ); 206 211 207 212 free(q); 208 213 *queue = NULL; 209 214 210 215 return 0; 211 216 } … … 215 220 { 216 221 int loops = 0; 217 222 218 223 TRACE_ENTRY("%p %p %p", old, new, loc_update); 219 224 CHECK_PARAMS( CHECK_FIFO( old ) && CHECK_FIFO( new )); 220 225 221 226 CHECK_PARAMS( ! old->data ); 222 227 if (new->high) { 223 228 TODO("Implement support for thresholds in fd_fifo_move..."); 224 229 } 225 230 226 231 /* Update loc_update */ 227 232 if (loc_update) 228 233 *loc_update = new; 229 234 230 235 /* Lock the queues */ 231 236 CHECK_POSIX( pthread_mutex_lock( &old->mtx ) ); 232 237 233 238 CHECK_PARAMS_DO( (! old->thrs_push), { 234 239 pthread_mutex_unlock( &old->mtx ); 235 240 return EINVAL; 236 241 } ); 237 242 238 243 CHECK_POSIX( pthread_mutex_lock( &new->mtx ) ); 239 244 240 245 /* Any waiting thread on the old queue returns an error */ 241 246 old->eyec = 0xdead; … … 244 249 CHECK_POSIX( pthread_cond_signal( &old->cond_pull ) ); 245 250 usleep(1000); 246 251 247 252 CHECK_POSIX( pthread_mutex_lock( &old->mtx ) ); 248 253 ASSERT( loops < 20 ); /* detect infinite loops */ 249 254 } 250 255 251 256 /* Move all data from old to new */ 252 257 fd_list_move_end( &new->list, &old->list ); … … 255 260 } 256 261 new->count += old->count; 257 262 258 263 /* Reset old */ 259 264 old->count = 0; 260 265 old->eyec = FIFO_EYEC; 261 266 262 267 /* Merge the stats in the new queue */ 263 268 new->total_items += old->total_items; 264 269 old->total_items = 0; 265 270 266 271 new->total_time.tv_nsec += old->total_time.tv_nsec; 267 272 new->total_time.tv_sec += old->total_time.tv_sec + (new->total_time.tv_nsec / 1000000000); … … 269 274 old->total_time.tv_nsec = 0; 270 275 old->total_time.tv_sec = 0; 271 276 272 277 new->blocking_time.tv_nsec += old->blocking_time.tv_nsec; 273 278 new->blocking_time.tv_sec += old->blocking_time.tv_sec + (new->blocking_time.tv_nsec / 1000000000); … … 275 280 old->blocking_time.tv_nsec = 0; 276 281 old->blocking_time.tv_sec = 0; 277 282 278 283 /* Unlock, we're done */ 279 284 CHECK_POSIX( pthread_mutex_unlock( &new->mtx ) ); 280 285 CHECK_POSIX( pthread_mutex_unlock( &old->mtx ) ); 281 286 282 287 return 0; 283 288 } 284 289 285 290 /* Get the information on the queue */ 286 int fd_fifo_getstats( struct fifo * queue, int * current_count, int * limit_count, int * highest_count, long long * total_count, 291 int fd_fifo_getstats( struct fifo * queue, int * current_count, int * limit_count, int * highest_count, long long * total_count, 287 292 struct timespec * total, struct timespec * blocking, struct timespec * last) 288 293 { 289 294 TRACE_ENTRY( "%p %p %p %p %p %p %p %p", queue, current_count, limit_count, highest_count, total_count, total, blocking, last); 290 295 291 296 /* Check the parameters */ 292 297 CHECK_PARAMS( CHECK_FIFO( queue ) ); 293 298 294 299 /* lock the queue */ 295 300 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); 296 301 297 302 if (current_count) 298 303 *current_count = queue->count; 299 304 300 305 if (limit_count) 301 306 *limit_count = queue->max; 302 307 303 308 if (highest_count) 304 309 *highest_count = queue->highest_ever; 305 310 306 311 if (total_count) 307 312 *total_count = queue->total_items; 308 313 309 314 if (total) 310 315 memcpy(total, &queue->total_time, sizeof(struct timespec)); 311 316 312 317 if (blocking) 313 318 memcpy(blocking, &queue->blocking_time, sizeof(struct timespec)); 314 319 315 320 if (last) 316 321 memcpy(last, &queue->last_time, sizeof(struct timespec)); 317 322 318 323 /* Unlock */ 319 324 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); 320 325 321 326 /* Done */ 322 327 return 0; … … 329 334 if ( !CHECK_FIFO( queue ) ) 330 335 return 0; 331 336 332 337 return queue->count; /* Let's hope it's read atomically, since we are not locking... */ 333 338 } … … 337 342 { 338 343 TRACE_ENTRY( "%p %p %hu %p %hu %p", queue, data, high, h_cb, low, l_cb ); 339 344 340 345 /* Check the parameters */ 341 346 CHECK_PARAMS( CHECK_FIFO( queue ) && (high > low) && (queue->data == NULL) ); 342 347 343 348 /* lock the queue */ 344 349 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); 345 350 346 351 /* Save the values */ 347 352 queue->high = high; … … 350 355 queue->h_cb = h_cb; 351 356 queue->l_cb = l_cb; 352 357 353 358 /* Unlock */ 354 359 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); 355 360 356 361 /* Done */ 357 362 return 0; … … 364 369 struct fifo * q = (struct fifo *)queue; 365 370 TRACE_ENTRY( "%p", queue ); 366 371 367 372 /* The thread has been cancelled, therefore it does not wait on the queue anymore */ 368 373 q->thrs_push--; 369 374 370 375 /* Now unlock the queue, and we're done */ 371 376 CHECK_POSIX_DO( pthread_mutex_unlock( &q->mtx ), /* nothing */ ); 372 377 373 378 /* End of cleanup handler */ 374 379 return; … … 382 387 int call_cb = 0; 383 388 struct timespec posted_on, queued_on; 384 389 385 390 /* Get the timing of this call */ 386 391 CHECK_SYS( clock_gettime(CLOCK_REALTIME, &posted_on) ); 387 392 388 393 /* lock the queue */ 389 394 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); 390 395 391 396 if ((!skip_max) && (queue->max)) { 392 397 while (queue->count >= queue->max) { 393 398 int ret = 0; 394 399 395 400 /* We have to wait for an item to be pulled */ 396 401 queue->thrs_push++ ; … … 399 404 pthread_cleanup_pop(0); 400 405 queue->thrs_push-- ; 401 406 402 407 ASSERT( ret == 0 ); 403 408 } 404 409 } 405 410 406 411 /* Create a new list item */ 407 412 CHECK_MALLOC_DO( new = malloc (sizeof (struct fifo_item)) , { … … 409 414 return ENOMEM; 410 415 } ); 411 416 412 417 fd_list_init(&new->item, *item); 413 418 *item = NULL; 414 419 415 420 /* Add the new item at the end */ 416 421 fd_list_insert_before( &queue->list, &new->item); … … 422 427 queue->highest = queue->count; 423 428 } 424 429 425 430 /* store timing */ 426 431 memcpy(&new->posted_on, &posted_on, sizeof(struct timespec)); 427 432 428 433 /* update queue timing info "blocking time" */ 429 434 { … … 436 441 queue->blocking_time.tv_nsec = blocked_ns % 1000000000; 437 442 } 438 443 439 444 /* Signal if threads are asleep */ 440 445 if (queue->thrs > 0) { … … 445 450 CHECK_POSIX( pthread_cond_signal(&queue->cond_push) ); 446 451 } 447 452 448 453 /* Unlock */ 449 454 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); 450 455 451 456 /* Call high-watermark cb as needed */ 452 457 if (call_cb && queue->h_cb) 453 458 (*queue->h_cb)(queue, &queue->data); 454 459 455 460 /* Done */ 456 461 return 0; … … 461 466 { 462 467 TRACE_ENTRY( "%p %p", queue, item ); 463 468 464 469 /* Check the parameters */ 465 470 CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item ); 466 471 467 472 return fd_fifo_post_internal ( queue,item, 0 ); 468 473 469 474 } 470 475 … … 473 478 { 474 479 TRACE_ENTRY( "%p %p", queue, item ); 475 480 476 481 /* Check the parameters */ 477 482 CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item ); 478 483 479 484 return fd_fifo_post_internal ( queue,item, 1 ); 480 485 481 486 } 482 487 … … 487 492 struct fifo_item * fi; 488 493 struct timespec now; 489 494 490 495 ASSERT( ! FD_IS_LIST_EMPTY(&queue->list) ); 491 496 492 497 fi = (struct fifo_item *)(queue->list.next); 493 498 ret = fi->item.o; … … 495 500 queue->count--; 496 501 queue->total_items++; 497 502 498 503 /* Update the timings */ 499 504 CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), goto skip_timing ); … … 501 506 long long elapsed = (now.tv_sec - fi->posted_on.tv_sec) * 1000000000; 502 507 elapsed += now.tv_nsec - fi->posted_on.tv_nsec; 503 508 504 509 queue->last_time.tv_sec = elapsed / 1000000000; 505 510 queue->last_time.tv_nsec = elapsed % 1000000000; 506 511 507 512 elapsed += queue->total_time.tv_nsec; 508 513 queue->total_time.tv_sec += elapsed / 1000000000; 509 514 queue->total_time.tv_nsec = elapsed % 1000000000; 510 515 } 511 skip_timing: 516 skip_timing: 512 517 free(fi); 513 518 514 519 if (queue->thrs_push) { 515 520 CHECK_POSIX_DO( pthread_cond_signal( &queue->cond_push ), ); 516 521 } 517 522 518 523 return ret; 519 524 } … … 524 529 if ((queue->high == 0) || (queue->low == 0) || (queue->l_cb == 0)) 525 530 return 0; 526 531 527 532 if (((queue->count % queue->high) == queue->low) && (queue->highest > queue->count)) { 528 533 queue->highest -= queue->high; 529 534 return 1; 530 535 } 531 536 532 537 return 0; 533 538 } … … 538 543 int wouldblock = 0; 539 544 int call_cb = 0; 540 545 541 546 TRACE_ENTRY( "%p %p", queue, item ); 542 547 543 548 /* Check the parameters */ 544 549 CHECK_PARAMS( CHECK_FIFO( queue ) && item ); 545 550 546 551 /* lock the queue */ 547 552 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); 548 553 549 554 /* Check queue status */ 550 555 if (queue->count > 0) { … … 563 568 goto got_item; 564 569 } 565 570 566 571 wouldblock = 1; 567 572 *item = NULL; 568 573 } 569 574 570 575 /* Unlock */ 571 576 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); 572 577 573 578 /* Call low watermark callback as needed */ 574 579 if (call_cb) 575 580 (*queue->l_cb)(queue, &queue->data); 576 581 577 582 /* Done */ 578 583 return wouldblock ? EWOULDBLOCK : 0; … … 584 589 struct fifo * q = (struct fifo *)queue; 585 590 TRACE_ENTRY( "%p", queue ); 586 591 587 592 /* The thread has been cancelled, therefore it does not wait on the queue anymore */ 588 593 q->thrs--; 589 594 590 595 /* Now unlock the queue, and we're done */ 591 596 CHECK_POSIX_DO( pthread_mutex_unlock( &q->mtx ), /* nothing */ ); 592 597 593 598 /* End of cleanup handler */ 594 599 return; … … 600 605 int call_cb = 0; 601 606 int ret = 0; 602 607 603 608 /* Check the parameters */ 604 609 CHECK_PARAMS( CHECK_FIFO( queue ) && item && (abstime || !istimed) ); 605 610 606 611 /* Initialize the return value */ 607 612 *item = NULL; 608 613 609 614 /* lock the queue */ 610 615 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) ); 611 616 612 617 awaken: 613 618 /* Check queue status */ … … 618 623 return EPIPE; 619 624 } 620 625 621 626 if (queue->count > 0) { 622 627 /* There are items in the queue, so pick the first one */ … … 636 641 if (ret == 0) 637 642 goto awaken; /* test for spurious wake-ups */ 638 643 639 644 /* otherwise (ETIMEDOUT / other error) just continue */ 640 645 } 641 646 642 647 /* Unlock */ 643 648 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) ); 644 649 645 650 /* Call low watermark callback as needed */ 646 651 if (call_cb) 647 652 (*queue->l_cb)(queue, &queue->data); 648 653 649 654 /* Done */ 650 655 return ret; … … 670 675 int ret = 0; 671 676 TRACE_ENTRY( "%p %p", queue, abstime ); 672 677 673 678 CHECK_PARAMS_DO( CHECK_FIFO( queue ), return -EINVAL ); 674 679 675 680 /* lock the queue */ 676 681 CHECK_POSIX_DO( pthread_mutex_lock( &queue->mtx ), return -__ret__ ); 677 678 awaken: 682 683 awaken: 679 684 ret = (queue->count > 0 ) ? queue->count : 0; 680 685 if ((ret == 0) && (abstime != NULL)) { … … 687 692 if (ret == 0) 688 693 goto awaken; /* test for spurious wake-ups */ 689 694 690 695 if (ret == ETIMEDOUT) 691 696 ret = 0; 692 else 697 else 693 698 ret = -ret; 694 699 } 695 700 696 701 /* Unlock */ 697 702 CHECK_POSIX_DO( pthread_mutex_unlock( &queue->mtx ), return -__ret__ ); 698 703 699 704 return ret; 700 705 }
Note: See TracChangeset
for help on using the changeset viewer.