Changeset 1344:a5c072798f1a in freeDiameter for libfdcore
- Timestamp:
- Apr 12, 2019, 8:15:04 PM (5 years ago)
- Branch:
- default
- Phase:
- public
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libfdcore/sctp3436.c
r1190 r1344 51 51 - the push function sends the data on a certain stream. 52 52 We also have a demux thread that reads the socket and store received data in the appropriate fifo 53 53 54 54 We have one gnutls_session per stream pair, and as many threads that read the gnutls records and save incoming data to the target queue. 55 55 56 56 This complexity is required because we cannot read a socket for a given stream only; we can only get the next message and find its stream. 57 57 */ … … 71 71 int event; 72 72 uint16_t strid; 73 73 74 74 TRACE_ENTRY("%p", arg); 75 75 CHECK_PARAMS_DO(conn && (conn->cc_socket > 0), goto out); 76 76 77 77 /* Set the thread name */ 78 78 { 79 char buf[ 48];79 char buf[100]; 80 80 snprintf(buf, sizeof(buf), "Demuxer (%d:%s)", conn->cc_socket, conn->cc_remid); 81 81 fd_log_threadname ( buf ); 82 82 } 83 83 84 84 ASSERT( conn->cc_proto == IPPROTO_SCTP ); 85 85 ASSERT( fd_cnx_target_queue(conn) ); 86 86 ASSERT( conn->cc_sctp3436_data.array ); 87 87 88 88 do { 89 89 CHECK_FCT_DO( fd_sctp_recvmeta(conn, &strid, &buf, &bufsz, &event), goto fatal ); … … 98 98 } 99 99 break; 100 100 101 101 case FDEVP_CNX_EP_CHANGE: 102 102 /* Send this event to the target queue */ 103 103 CHECK_FCT_DO( fd_event_send( fd_cnx_target_queue(conn), event, bufsz, buf), goto fatal ); 104 104 break; 105 105 106 106 case FDEVP_CNX_ERROR: 107 107 goto out; 108 108 109 109 case FDEVP_CNX_SHUTDOWN: 110 110 /* Just ignore the notification for now, we will get another error later anyway */ 111 111 continue; 112 112 113 113 default: 114 114 goto fatal; 115 115 } 116 116 117 117 } while (conn->cc_loop); 118 118 119 119 out: 120 120 /* Signal termination of the connection to all decipher threads */ … … 125 125 } 126 126 fd_cnx_markerror(conn); 127 TRACE_DEBUG(FULL, "Thread terminated"); 127 TRACE_DEBUG(FULL, "Thread terminated"); 128 128 return NULL; 129 129 130 130 fatal: 131 131 /* An unrecoverable error occurred, stop the daemon */ … … 139 139 struct sctp3436_ctx * ctx = arg; 140 140 struct cnxctx *cnx; 141 141 142 142 TRACE_ENTRY("%p", arg); 143 143 CHECK_PARAMS_DO(ctx && ctx->raw_recv && ctx->parent, goto error); 144 144 cnx = ctx->parent; 145 145 ASSERT( fd_cnx_target_queue(cnx) ); 146 146 147 147 /* Set the thread name */ 148 148 { 149 char buf[ 48];149 char buf[100]; 150 150 snprintf(buf, sizeof(buf), "Decipher (%hu@%d:%s)", ctx->strid, cnx->cc_socket, cnx->cc_remid); 151 151 fd_log_threadname ( buf ); 152 152 } 153 153 154 154 /* The next function loops while there is no error */ 155 155 CHECK_FCT_DO(fd_tls_rcvthr_core(cnx, ctx->strid ? ctx->session : cnx->cc_tls_para.session), /* continue */); 156 156 error: 157 157 fd_cnx_markerror(cnx); 158 TRACE_DEBUG(FULL, "Thread terminated"); 158 TRACE_DEBUG(FULL, "Thread terminated"); 159 159 return NULL; 160 160 } … … 171 171 struct timespec tsstore, *ts = NULL; 172 172 int ret; 173 173 174 174 TRACE_ENTRY("%p %d", tr, ms); 175 175 176 176 if (ctx->partial.buf) 177 177 return 1; /* data is already available for pull */ 178 178 179 179 if (ms) { 180 180 CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &tsstore), return -1 ); … … 184 184 ts = &tsstore; 185 185 } 186 186 187 187 ret = fd_fifo_select ( ctx->raw_recv, ts ); 188 188 if (ret < 0) { … … 190 190 ret = -1; 191 191 } 192 192 193 193 return ret; 194 194 } … … 201 201 struct sctp3436_ctx * ctx = (struct sctp3436_ctx *) tr; 202 202 struct iovec iov; 203 203 204 204 TRACE_ENTRY("%p %p %zd", tr, data, len); 205 205 CHECK_PARAMS_DO( tr && data, { errno = EINVAL; return -1; } ); 206 206 207 207 iov.iov_base = (void *)data; 208 208 iov.iov_len = len; 209 209 210 210 return fd_sctp_sendstrv(ctx->parent, ctx->strid, &iov, 1); 211 211 } … … 214 214 { 215 215 struct sctp3436_ctx * ctx = (struct sctp3436_ctx *) tr; 216 216 217 217 TRACE_ENTRY("%p %p %d", tr, iov, iovcnt); 218 218 CHECK_PARAMS_DO( tr && iov, { errno = EINVAL; return -1; } ); 219 219 220 220 return fd_sctp_sendstrv(ctx->parent, ctx->strid, (const struct iovec *)iov, iovcnt); 221 221 } … … 228 228 size_t pulled = 0; 229 229 int emptied; 230 230 231 231 TRACE_ENTRY("%p %p %zd", tr, buf, len); 232 232 CHECK_PARAMS_DO( tr && buf, { errno = EINVAL; goto error; } ); 233 233 234 234 /* If we don't have data available now, pull new message from the fifo -- this is blocking (until the queue is destroyed) */ 235 235 if (!ctx->partial.buf) { … … 241 241 } 242 242 } 243 243 244 244 pulled = ctx->partial.bufsz - ctx->partial.offset; 245 245 if (pulled <= len) { … … 264 264 /* We are done */ 265 265 return pulled; 266 266 267 267 error: 268 268 gnutls_transport_set_errno (ctx->session, errno); … … 278 278 /* Set the transport pointer passed to push & pull callbacks */ 279 279 GNUTLS_TRACE( gnutls_transport_set_ptr( session, (gnutls_transport_ptr_t) ctx ) ); 280 280 281 281 /* Reset the low water value, since we don't use sockets */ 282 282 #ifndef GNUTLS_VERSION_300 … … 287 287 GNUTLS_TRACE( gnutls_transport_set_pull_timeout_function( session, sctp3436_pull_timeout ) ); 288 288 #endif /* GNUTLS_VERSION_300 */ 289 289 290 290 /* Set the push and pull callbacks */ 291 291 GNUTLS_TRACE( gnutls_transport_set_pull_function(session, sctp3436_pull) ); … … 325 325 TRACE_ENTRY("%p", conn); 326 326 CHECK_PARAMS( conn && !conn->cc_sctp3436_data.sess_store ); 327 327 328 328 CHECK_MALLOC( conn->cc_sctp3436_data.sess_store = malloc(sizeof(struct sr_store)) ); 329 329 memset(conn->cc_sctp3436_data.sess_store, 0, sizeof(struct sr_store)); 330 330 331 331 fd_list_init(&conn->cc_sctp3436_data.sess_store->list, NULL); 332 332 CHECK_POSIX( pthread_rwlock_init(&conn->cc_sctp3436_data.sess_store->lock, NULL) ); 333 333 conn->cc_sctp3436_data.sess_store->parent = conn; 334 334 335 335 return 0; 336 336 } … … 342 342 TRACE_ENTRY("%p", conn); 343 343 CHECK_PARAMS_DO( conn, return ); 344 344 345 345 if (!conn->cc_sctp3436_data.sess_store) 346 346 return; 347 347 348 348 CHECK_POSIX_DO( pthread_rwlock_destroy(&conn->cc_sctp3436_data.sess_store->lock), /* continue */ ); 349 349 350 350 while (!FD_IS_LIST_EMPTY(&conn->cc_sctp3436_data.sess_store->list)) { 351 351 struct sr_data * sr = (struct sr_data *) conn->cc_sctp3436_data.sess_store->list.next; … … 355 355 free(sr); 356 356 } 357 357 358 358 free(conn->cc_sctp3436_data.sess_store); 359 359 conn->cc_sctp3436_data.sess_store = NULL; … … 366 366 struct fd_list * ret; 367 367 *match = 0; 368 368 369 369 for (ret = sto->list.next; ret != &sto->list; ret = ret->next) { 370 370 int cmp = 0; 371 371 struct sr_data * sr = (struct sr_data *)ret; 372 372 373 373 cmp = fd_os_cmp(key.data, key.size, sr->key.data, sr->key.size); 374 374 if (cmp > 0) 375 375 continue; 376 376 377 377 if (cmp == 0) 378 378 *match = 1; 379 379 380 380 break; 381 381 } 382 382 383 383 return ret; 384 384 } … … 392 392 int match = 0; 393 393 int ret = 0; 394 394 395 395 TRACE_DEBUG( GNUTLS_DBG_LEVEL, "GNUTLS Callback: %s", __PRETTY_FUNCTION__ ); 396 396 CHECK_PARAMS_DO( sto && key.data && data.data, return -1 ); 397 397 398 398 CHECK_POSIX_DO( pthread_rwlock_wrlock(&sto->lock), return -1 ); 399 399 TRACE_BUFFER(FD_LOG_DEBUG, GNUTLS_DBG_LEVEL, "Session store [key ", key.data, key.size, "]"); 400 400 401 401 li = find_or_next(sto, key, &match); 402 402 if (match) { 403 403 sr = (struct sr_data *)li; 404 404 405 405 /* Check the data is the same */ 406 406 if ((data.size != sr->data.size) || memcmp(data.data, sr->data.data, data.size)) { … … 409 409 TRACE_BUFFER(FD_LOG_DEBUG, INFO, " -- old data [", sr->data.data, sr->data.size, "]"); 410 410 TRACE_BUFFER(FD_LOG_DEBUG, INFO, " -- new data [", data.data, data.size, "]"); 411 411 412 412 ret = -1; 413 413 } else { … … 416 416 goto out; 417 417 } 418 418 419 419 /* Create a new entry */ 420 420 CHECK_MALLOC_DO( sr = malloc(sizeof(struct sr_data)), { ret = -1; goto out; } ); … … 430 430 sr->data.size = data.size; 431 431 memcpy(sr->data.data, data.data, data.size); 432 432 433 433 /* Save this new entry in the list, we are done */ 434 434 fd_list_insert_before(li, &sr->chain); 435 435 436 out: 436 out: 437 437 CHECK_POSIX_DO( pthread_rwlock_unlock(&sto->lock), return -1 ); 438 438 return ret; … … 446 446 int match = 0; 447 447 int ret = 0; 448 448 449 449 TRACE_DEBUG( GNUTLS_DBG_LEVEL, "GNUTLS Callback: %s", __PRETTY_FUNCTION__ ); 450 450 CHECK_PARAMS_DO( sto && key.data, return -1 ); 451 451 452 452 CHECK_POSIX_DO( pthread_rwlock_wrlock(&sto->lock), return -1 ); 453 453 TRACE_BUFFER(FD_LOG_DEBUG, GNUTLS_DBG_LEVEL, "Session delete [key ", key.data, key.size, "]"); 454 454 455 455 li = find_or_next(sto, key, &match); 456 456 if (match) { 457 457 sr = (struct sr_data *)li; 458 458 459 459 /* Destroy this data */ 460 460 fd_list_unlink(li); … … 466 466 ret = -1; 467 467 } 468 468 469 469 CHECK_POSIX_DO( pthread_rwlock_unlock(&sto->lock), return -1 ); 470 470 return ret; … … 485 485 CHECK_POSIX_DO( pthread_rwlock_rdlock(&sto->lock), return error ); 486 486 TRACE_BUFFER(FD_LOG_DEBUG, GNUTLS_DBG_LEVEL, "Session fetch [key ", key.data, key.size, "]"); 487 487 488 488 li = find_or_next(sto, key, &match); 489 489 if (match) { … … 493 493 memcpy(res.data, sr->data.data, res.size); 494 494 } 495 out: 495 out: 496 496 TRACE_DEBUG(GNUTLS_DBG_LEVEL, "Fetched (%p, %d) from store %p", res.data, res.size, sto); 497 497 CHECK_POSIX_DO( pthread_rwlock_unlock(&sto->lock), return error); … … 503 503 { 504 504 TRACE_ENTRY("%p", conn); 505 505 506 506 GNUTLS_TRACE( gnutls_db_set_retrieve_function(session, sr_fetch)); 507 507 GNUTLS_TRACE( gnutls_db_set_remove_function (session, sr_remove)); 508 508 GNUTLS_TRACE( gnutls_db_set_store_function (session, sr_store)); 509 509 GNUTLS_TRACE( gnutls_db_set_ptr (session, conn->cc_sctp3436_data.sess_store)); 510 510 511 511 return; 512 512 } … … 517 517 struct sctp3436_ctx * ctx = (struct sctp3436_ctx *) arg; 518 518 int resumed; 519 519 520 520 TRACE_ENTRY("%p", arg); 521 521 522 522 /* Set the thread name */ 523 523 { … … 526 526 fd_log_threadname ( buf ); 527 527 } 528 528 529 529 TRACE_DEBUG(FULL, "Starting TLS resumed handshake on stream %hu", ctx->strid); 530 530 531 531 CHECK_GNUTLS_DO( gnutls_handshake( ctx->session ), return NULL); 532 532 533 533 GNUTLS_TRACE( resumed = gnutls_session_is_resumed(ctx->session) ); 534 534 #ifndef GNUTLS_VERSION_300 … … 545 545 } 546 546 } 547 547 548 548 /* Finished, OK */ 549 549 return arg; … … 559 559 { 560 560 uint16_t i; 561 561 562 562 TRACE_ENTRY("%p", conn); 563 563 CHECK_PARAMS( conn && (conn->cc_sctp_para.pairs > 1) && (!conn->cc_sctp3436_data.array) ); 564 564 565 565 /* First, alloc the array and initialize the non-TLS data */ 566 566 CHECK_MALLOC( conn->cc_sctp3436_data.array = calloc(conn->cc_sctp_para.pairs, sizeof(struct sctp3436_ctx)) ); … … 570 570 CHECK_FCT( fd_fifo_new(&conn->cc_sctp3436_data.array[i].raw_recv, 10) ); 571 571 } 572 572 573 573 /* Set push/pull functions in the master session, using fifo in array[0] */ 574 574 set_sess_transport(conn->cc_tls_para.session, &conn->cc_sctp3436_data.array[0]); 575 575 576 576 /* For server side, we also initialize the resuming capabilities */ 577 577 if (conn->cc_tls_para.mode == GNUTLS_SERVER) { 578 578 579 579 /* Prepare the store for sessions data */ 580 580 CHECK_FCT( store_init(conn) ); 581 581 582 582 /* Set the callbacks for resuming in the master session */ 583 583 set_resume_callbacks(conn->cc_tls_para.session, conn); … … 586 586 /* Start the demux thread */ 587 587 CHECK_POSIX( pthread_create( &conn->cc_rcvthr, NULL, demuxer, conn ) ); 588 588 589 589 return 0; 590 590 } … … 596 596 int errors = 0; 597 597 gnutls_datum_t master_data; 598 598 599 599 TRACE_ENTRY("%p %p", conn, priority); 600 600 CHECK_PARAMS( conn && (conn->cc_sctp_para.pairs > 1) && conn->cc_sctp3436_data.array ); … … 602 602 /* Server side: we set all the parameters, the resume callback will take care of resuming the session */ 603 603 /* Client side: we duplicate the parameters of the master session, then set the transport pointer */ 604 604 605 605 /* For client side, retrieve the master session parameters */ 606 606 if (conn->cc_tls_para.mode == GNUTLS_CLIENT) { … … 614 614 } 615 615 } 616 616 617 617 /* Initialize the session objects and start the handshake in a separate thread */ 618 618 for (i = 1; i < conn->cc_sctp_para.pairs; i++) { 619 619 /* Set credentials and priority */ 620 620 CHECK_FCT( fd_tls_prepare(&conn->cc_sctp3436_data.array[i].session, conn->cc_tls_para.mode, 0, priority, alt_creds) ); 621 621 622 622 /* additional initialization for gnutls 3.x */ 623 623 #ifdef GNUTLS_VERSION_300 … … 644 644 set_resume_callbacks(conn->cc_sctp3436_data.array[i].session, conn); 645 645 } 646 646 647 647 /* Set transport parameters */ 648 648 set_sess_transport(conn->cc_sctp3436_data.array[i].session, &conn->cc_sctp3436_data.array[i]); 649 649 650 650 /* Start the handshake thread */ 651 651 CHECK_POSIX( pthread_create( &conn->cc_sctp3436_data.array[i].thr, NULL, handshake_resume_th, &conn->cc_sctp3436_data.array[i] ) ); 652 652 } 653 653 654 654 /* We can now release the memory of master session data if any */ 655 655 if (conn->cc_tls_para.mode == GNUTLS_CLIENT) { 656 656 GNUTLS_TRACE( gnutls_free(master_data.data) ); 657 657 } 658 658 659 659 /* Now wait for all handshakes to finish */ 660 660 for (i = 1; i < conn->cc_sctp_para.pairs; i++) { … … 666 666 } 667 667 } 668 668 669 669 if (errors) { 670 670 TRACE_DEBUG(INFO, "Handshake failed on %d/%hd stream pairs", errors, conn->cc_sctp_para.pairs); … … 672 672 return ENOTCONN; 673 673 } 674 674 675 675 return 0; 676 676 } … … 680 680 { 681 681 uint16_t i; 682 682 683 683 TRACE_ENTRY("%p", conn); 684 684 CHECK_PARAMS( conn && conn->cc_sctp3436_data.array ); 685 685 686 686 if (others) { 687 687 for (i = 1; i < conn->cc_sctp_para.pairs; i++) { … … 700 700 { 701 701 uint16_t i; 702 702 703 703 CHECK_PARAMS_DO( conn && conn->cc_sctp3436_data.array, return ); 704 704 705 705 /* End all TLS sessions, in series (not as efficient as paralel, but simpler) */ 706 706 for (i = 1; i < conn->cc_sctp_para.pairs; i++) { … … 715 715 { 716 716 uint16_t i; 717 717 718 718 TRACE_ENTRY("%p", conn); 719 719 CHECK_PARAMS_DO( conn && conn->cc_sctp3436_data.array, return ); 720 720 721 721 for (i = 0; i < conn->cc_sctp_para.pairs; i++) { 722 722 if (conn->cc_sctp3436_data.array[i].thr != (pthread_t)NULL) { … … 732 732 { 733 733 uint16_t i; 734 734 735 735 TRACE_ENTRY("%p", conn); 736 736 CHECK_PARAMS_DO( conn && conn->cc_sctp3436_data.array, return ); 737 737 738 738 for (i = 1; i < conn->cc_sctp_para.pairs; i++) { 739 739 if (conn->cc_sctp3436_data.array[i].session) { … … 749 749 { 750 750 uint16_t i; 751 751 752 752 TRACE_ENTRY("%p", conn); 753 753 CHECK_PARAMS_DO( conn && conn->cc_sctp3436_data.array, return ); 754 754 755 755 for (i = 0; i < conn->cc_sctp_para.pairs; i++) { 756 756 CHECK_FCT_DO( fd_thr_term(&conn->cc_sctp3436_data.array[i].thr), /* continue */ ); … … 763 763 { 764 764 uint16_t i; 765 765 766 766 CHECK_PARAMS_DO( conn && conn->cc_sctp3436_data.array, return ); 767 767 768 768 /* Terminate all receiving threads in case we did not do it yet */ 769 769 fd_sctp3436_stopthreads(conn); 770 770 771 771 /* Now, stop the demux thread */ 772 772 CHECK_FCT_DO( fd_thr_term(&conn->cc_rcvthr), /* continue */ ); 773 773 774 774 /* Free remaining data in the array */ 775 775 for (i = 0; i < conn->cc_sctp_para.pairs; i++) { … … 782 782 } 783 783 } 784 784 785 785 /* Free the array itself now */ 786 786 free(conn->cc_sctp3436_data.array); 787 787 conn->cc_sctp3436_data.array = NULL; 788 788 789 789 /* Delete the store of sessions */ 790 790 store_destroy(conn); 791 791 792 792 return ; 793 793 }
Note: See TracChangeset
for help on using the changeset viewer.