Changeset 25:67ca08d5bc48 in freeDiameter for freeDiameter/cnxctx.c
- Timestamp:
- Oct 26, 2009, 4:00:49 PM (15 years ago)
- Branch:
- default
- Phase:
- public
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
freeDiameter/cnxctx.c
r24 r25 35 35 36 36 #include "fD.h" 37 #include "cnxctx.h" 38 39 /* The maximum size of Diameter message we accept to receive (<= 2^24) to avoid too big mallocs in case of trashed headers */ 40 #ifndef DIAMETER_MSG_SIZE_MAX 41 #define DIAMETER_MSG_SIZE_MAX 65535 /* in bytes */ 42 #endif /* DIAMETER_MSG_SIZE_MAX */ 37 43 38 44 /* Connections contexts (cnxctx) in freeDiameter are wrappers around the sockets and TLS operations . … … 57 63 * 58 64 * 3) Usage 59 * - fd_cnx_receive, fd_cnx_send : exchange messages on this connection (send is synchronous, receive is not ).65 * - fd_cnx_receive, fd_cnx_send : exchange messages on this connection (send is synchronous, receive is not, but blocking). 60 66 * - fd_cnx_recv_setaltfifo : when a message is received, the event is sent to an external fifo list. fd_cnx_receive does not work when the alt_fifo is set. 61 67 * - fd_cnx_getid : retrieve a descriptive string for the connection (for debug) … … 68 74 */ 69 75 70 /* The connection context structure */ 71 struct cnxctx { 72 char cc_id[60]; /* The name of this connection */ 73 char cc_remid[60]; /* Id of remote peer */ 74 75 int cc_socket; /* The socket object of the connection -- <=0 if no socket is created */ 76 77 int cc_proto; /* IPPROTO_TCP or IPPROTO_SCTP */ 78 int cc_tls; /* Is TLS already started ? */ 79 80 struct fifo * cc_events; /* Events occuring on the connection */ 81 pthread_t cc_mgr; /* manager thread for the connection */ 82 struct fifo * cc_incoming; /* FIFO queue of messages received on the connection */ 83 struct fifo * cc_alt; /* alternate fifo to send FDEVP_CNX_MSG_RECV events to. */ 84 85 /* If cc_proto == SCTP */ 86 struct { 87 int str_out;/* Out streams */ 88 int str_in; /* In streams */ 89 int pairs; /* max number of pairs ( = min(in, out)) */ 90 int next; /* # of stream the next message will be sent to */ 91 } cc_sctp_para; 92 93 /* If cc_tls == true */ 94 struct { 95 int mode; /* GNUTLS_CLIENT / GNUTLS_SERVER */ 96 gnutls_session_t session; /* Session object (stream #0 in case of SCTP) */ 97 } cc_tls_para; 98 99 /* If both conditions */ 100 struct { 101 gnutls_session_t *res_sessions; /* Sessions of other pairs of streams, resumed from the first */ 102 /* Buffers, threads, ... */ 103 } cc_sctp_tls_para; 104 }; 105 76 77 /*******************************************/ 78 /* Creation of a connection object */ 79 /*******************************************/ 106 80 107 81 /* Initialize a context structure */ … … 116 90 117 91 if (full) { 118 CHECK_FCT_DO( fd_fifo_new ( &conn->cc_events ), return NULL );119 92 CHECK_FCT_DO( fd_fifo_new ( &conn->cc_incoming ), return NULL ); 120 93 } … … 352 325 } 353 326 354 /* Same for SCTP, accepts a list of remote addresses to connect to (see sctp_connectx ) */327 /* Same for SCTP, accepts a list of remote addresses to connect to (see sctp_connectx for how they are used) */ 355 328 struct cnxctx * fd_cnx_cli_connect_sctp(int no_ip6, uint16_t port, struct fd_list * list) 356 329 { … … 382 355 cnx->cc_sctp_para.pairs = cnx->cc_sctp_para.str_in; 383 356 357 if (TRACE_BOOL(INFO)) { 358 fd_log_debug("Connection established to server '"); 359 sSA_DUMP_NODE_SERV( &primary, NI_NUMERICSERV); 360 fd_log_debug("' (SCTP:%d, %d/%d streams).\n", sock, cnx->cc_sctp_para.str_in, cnx->cc_sctp_para.str_out); 361 } 362 384 363 /* Generate the names for the object */ 385 364 { … … 403 382 } 404 383 405 if (TRACE_BOOL(INFO)) {406 fd_log_debug("Connection established to server '");407 sSA_DUMP_NODE_SERV( &primary, NI_NUMERICSERV);408 fd_log_debug("' (SCTP:%d).\n", sock);409 }410 411 384 return cnx; 412 385 … … 424 397 } 425 398 399 /* Get the list of endpoints (IP addresses) of the local and remote peers on this connection */ 400 int fd_cnx_getendpoints(struct cnxctx * conn, struct fd_list * local, struct fd_list * remote) 401 { 402 TRACE_ENTRY("%p %p %p", conn, local, remote); 403 CHECK_PARAMS(conn); 404 405 if (local) { 406 /* Retrieve the local endpoint(s) of the connection */ 407 switch (conn->cc_proto) { 408 case IPPROTO_TCP: { 409 sSS ss; 410 socklen_t sl; 411 CHECK_FCT(fd_tcp_get_local_ep(conn->cc_socket, &ss, &sl)); 412 CHECK_FCT(fd_ep_add_merge( local, (sSA *)&ss, sl, EP_FL_LL | EP_FL_PRIMARY)); 413 } 414 break; 415 416 #ifndef DISABLE_SCTP 417 case IPPROTO_SCTP: { 418 CHECK_FCT(fd_sctp_get_local_ep(conn->cc_socket, local)); 419 } 420 break; 421 #endif /* DISABLE_SCTP */ 422 423 default: 424 CHECK_PARAMS(0); 425 } 426 } 427 428 if (remote) { 429 /* Check we have a full connection object, not a listening socket (with no remote) */ 430 CHECK_PARAMS( conn->cc_incoming ); 431 432 /* Retrieve the peer endpoint(s) of the connection */ 433 switch (conn->cc_proto) { 434 case IPPROTO_TCP: { 435 sSS ss; 436 socklen_t sl; 437 CHECK_FCT(fd_tcp_get_remote_ep(conn->cc_socket, &ss, &sl)); 438 CHECK_FCT(fd_ep_add_merge( remote, (sSA *)&ss, sl, EP_FL_LL | EP_FL_PRIMARY )); 439 } 440 break; 441 442 #ifndef DISABLE_SCTP 443 case IPPROTO_SCTP: { 444 CHECK_FCT(fd_sctp_get_remote_ep(conn->cc_socket, remote)); 445 } 446 break; 447 #endif /* DISABLE_SCTP */ 448 449 default: 450 CHECK_PARAMS(0); 451 } 452 } 453 454 return 0; 455 } 456 457 458 /* Get a string describing the remote peer address (ip address or fqdn) */ 459 char * fd_cnx_getremoteid(struct cnxctx * conn) 460 { 461 CHECK_PARAMS_DO( conn, return "" ); 462 return conn->cc_remid; 463 } 464 465 466 /**************************************/ 467 /* Use of a connection object */ 468 /**************************************/ 469 470 /* Receiver thread (TCP & noTLS) : incoming message is directly saved into the target queue */ 471 static void * rcvthr_notls_tcp(void * arg) 472 { 473 struct cnxctx * conn = arg; 474 475 TRACE_ENTRY("%p", arg); 476 477 CHECK_PARAMS_DO(conn && (conn->cc_socket > 0), goto out); 478 ASSERT( conn->cc_proto == IPPROTO_TCP ); 479 ASSERT( conn->cc_tls == 0 ); 480 ASSERT( Target_Queue(conn) ); 481 482 /* Receive from a TCP connection: we have to rebuild the message boundaries */ 483 do { 484 uint8_t header[4]; 485 uint8_t * newmsg; 486 size_t length; 487 ssize_t ret = 0; 488 size_t received = 0; 489 490 do { 491 ret = recv(conn->cc_socket, &header[received], sizeof(header) - received, 0); 492 if (ret <= 0) { 493 CHECK_SYS_DO(ret, /* continue */); 494 goto error; /* Stop the thread, the recipient of the event will cleanup */ 495 } 496 497 received += ret; 498 } while (received < sizeof(header)); 499 500 length = ((size_t)header[1] << 16) + ((size_t)header[2] << 8) + (size_t)header[3]; 501 502 /* Check the received word is a valid begining of a Diameter message */ 503 if ((header[0] != DIAMETER_VERSION) /* defined in <libfreeDiameter.h> */ 504 || (length > DIAMETER_MSG_SIZE_MAX)) { /* to avoid too big mallocs */ 505 /* The message is suspect */ 506 TRACE_DEBUG(INFO, "Received suspect header [ver: %d, size: %g], assume disconnection", (int)header[0], length); 507 goto error; /* Stop the thread, the recipient of the event will cleanup */ 508 } 509 510 /* Ok, now we can really receive the data */ 511 CHECK_MALLOC_DO( newmsg = malloc( length ), goto error ); 512 memcpy(newmsg, header, sizeof(header)); 513 514 while (received < length) { 515 pthread_cleanup_push(free, newmsg); /* In case we are canceled, clean the partialy built buffer */ 516 ret = recv(conn->cc_socket, newmsg + received, length - received, 0); 517 pthread_cleanup_pop(0); 518 519 if (ret <= 0) { 520 CHECK_SYS_DO(ret, /* continue */); 521 free(newmsg); 522 goto error; /* Stop the thread, the recipient of the event will cleanup */ 523 } 524 received += ret; 525 } 526 527 /* We have received a complete message, send it */ 528 CHECK_FCT_DO( fd_event_send( Target_Queue(conn), FDEVP_CNX_MSG_RECV, length, newmsg), /* continue or destroy everything? */); 529 530 } while (conn->cc_loop); 531 532 out: 533 TRACE_DEBUG(FULL, "Thread terminated"); 534 return NULL; 535 error: 536 CHECK_FCT_DO( fd_event_send( Target_Queue(conn), FDEVP_CNX_ERROR, 0, NULL), /* continue or destroy everything? */); 537 goto out; 538 } 539 540 #ifndef DISABLE_SCTP 541 /* Receiver thread (SCTP & noTLS) : incoming message is directly saved into cc_incoming, no need to care for the stream ID */ 542 static void * rcvthr_notls_sctp(void * arg) 543 { 544 struct cnxctx * conn = arg; 545 uint8_t * buf; 546 size_t bufsz; 547 int event; 548 549 TRACE_ENTRY("%p", arg); 550 551 CHECK_PARAMS_DO(conn && (conn->cc_socket > 0), goto out); 552 ASSERT( conn->cc_proto == IPPROTO_SCTP ); 553 ASSERT( conn->cc_tls == 0 ); 554 ASSERT( Target_Queue(conn) ); 555 556 do { 557 CHECK_FCT_DO( fd_sctp_recvmeta(conn->cc_socket, NULL, &buf, &bufsz, &event), goto error ); 558 if (event == FDEVP_CNX_ERROR) { 559 goto error; 560 } 561 562 CHECK_FCT_DO( fd_event_send( Target_Queue(conn), event, bufsz, buf), goto error ); 563 564 } while (conn->cc_loop); 565 566 out: 567 TRACE_DEBUG(FULL, "Thread terminated"); 568 return NULL; 569 error: 570 CHECK_FCT_DO( fd_event_send( Target_Queue(conn), FDEVP_CNX_ERROR, 0, NULL), /* continue or destroy everything? */); 571 goto out; 572 } 573 #endif /* DISABLE_SCTP */ 574 575 /* Returns 0 on error, received data size otherwise (always >= 0) */ 576 static ssize_t fd_tls_recv_handle_error(struct cnxctx * conn, gnutls_session_t session, void * data, size_t sz) 577 { 578 ssize_t ret; 579 again: 580 CHECK_GNUTLS_DO( ret = gnutls_record_recv(session, data, sz), 581 { 582 switch (ret) { 583 case GNUTLS_E_REHANDSHAKE: 584 CHECK_GNUTLS_DO( ret = gnutls_handshake(session), 585 { 586 if (TRACE_BOOL(INFO)) { 587 fd_log_debug("TLS re-handshake failed on socket %d (%s) : %s\n", conn->cc_socket, conn->cc_id, gnutls_strerror(ret)); 588 } 589 ret = 0; 590 goto end; 591 } ); 592 593 case GNUTLS_E_AGAIN: 594 case GNUTLS_E_INTERRUPTED: 595 goto again; 596 597 default: 598 TRACE_DEBUG(INFO, "This TLS error is not handled, assume unrecoverable error"); 599 ret = 0; 600 } 601 } ); 602 end: 603 return ret; 604 } 605 606 /* The function that receives TLS data and re-builds a Diameter message -- it exits only on error or cancelation */ 607 int fd_tls_rcvthr_core(struct cnxctx * conn, gnutls_session_t session) 608 { 609 /* No guaranty that GnuTLS preserves the message boundaries, so we re-build it as in TCP */ 610 do { 611 uint8_t header[4]; 612 uint8_t * newmsg; 613 size_t length; 614 ssize_t ret = 0; 615 size_t received = 0; 616 617 do { 618 ret = fd_tls_recv_handle_error(conn, conn->cc_tls_para.session, &header[received], sizeof(header) - received); 619 if (ret == 0) { 620 /* The connection is closed */ 621 goto out; 622 } 623 received += ret; 624 } while (received < sizeof(header)); 625 626 length = ((size_t)header[1] << 16) + ((size_t)header[2] << 8) + (size_t)header[3]; 627 628 /* Check the received word is a valid beginning of a Diameter message */ 629 if ((header[0] != DIAMETER_VERSION) /* defined in <libfreeDiameter.h> */ 630 || (length > DIAMETER_MSG_SIZE_MAX)) { /* to avoid too big mallocs */ 631 /* The message is suspect */ 632 TRACE_DEBUG(INFO, "Received suspect header [ver: %d, size: %g], assume disconnection", (int)header[0], length); 633 goto out; 634 } 635 636 /* Ok, now we can really receive the data */ 637 CHECK_MALLOC( newmsg = malloc( length ) ); 638 memcpy(newmsg, header, sizeof(header)); 639 640 while (received < length) { 641 pthread_cleanup_push(free, newmsg); /* In case we are canceled, clean the partialy built buffer */ 642 ret = fd_tls_recv_handle_error(conn, conn->cc_tls_para.session, newmsg + received, length - received); 643 pthread_cleanup_pop(0); 644 645 if (ret == 0) { 646 free(newmsg); 647 goto out; /* Stop the thread, the recipient of the event will cleanup */ 648 } 649 received += ret; 650 } 651 652 /* We have received a complete message, send it */ 653 CHECK_FCT_DO( fd_event_send( Target_Queue(conn), FDEVP_CNX_MSG_RECV, length, newmsg), /* continue or destroy everything? */); 654 655 } while (1); 656 out: 657 return ENOTCONN; 658 } 659 660 /* Receiver thread (TLS & 1 stream SCTP or TCP) : gnutls directly handles the socket, save records into the target queue */ 661 static void * rcvthr_tls_single(void * arg) 662 { 663 struct cnxctx * conn = arg; 664 665 TRACE_ENTRY("%p", arg); 666 667 CHECK_PARAMS_DO(conn && (conn->cc_socket > 0), goto error); 668 ASSERT( conn->cc_tls == 1 ); 669 ASSERT( Target_Queue(conn) ); 670 671 CHECK_FCT_DO(fd_tls_rcvthr_core(conn, conn->cc_tls_para.session), /* continue */); 672 error: 673 CHECK_FCT_DO( fd_event_send( Target_Queue(conn), FDEVP_CNX_ERROR, 0, NULL), /* continue or destroy everything? */); 674 TRACE_DEBUG(FULL, "Thread terminated"); 675 return NULL; 676 } 677 426 678 /* Start receving messages in clear (no TLS) on the connection */ 427 int fd_cnx_start_clear(struct cnxctx * conn) 428 { 429 430 TODO("..."); 431 return ENOTSUP; 679 int fd_cnx_start_clear(struct cnxctx * conn, int loop) 680 { 681 TRACE_ENTRY("%p %i", conn, loop); 682 683 CHECK_PARAMS( conn && Target_Queue(conn) && (!conn->cc_tls) && (!conn->cc_loop)); 684 685 /* Save the loop request */ 686 conn->cc_loop = loop; 687 688 /* Release resources in case of a previous call was already made */ 689 CHECK_FCT_DO( fd_thr_term(&conn->cc_rcvthr), /* continue */); 690 691 switch (conn->cc_proto) { 692 case IPPROTO_TCP: 693 /* Start the tcp_notls thread */ 694 CHECK_POSIX( pthread_create( &conn->cc_rcvthr, NULL, rcvthr_notls_tcp, conn ) ); 695 break; 696 #ifndef DISABLE_SCTP 697 case IPPROTO_SCTP: 698 /* Start the tcp_notls thread */ 699 CHECK_POSIX( pthread_create( &conn->cc_rcvthr, NULL, rcvthr_notls_sctp, conn ) ); 700 break; 701 #endif /* DISABLE_SCTP */ 702 default: 703 TRACE_DEBUG(INFO, "Unknown protocol: %d", conn->cc_proto); 704 return ENOTSUP; 705 } 706 707 return 0; 708 } 709 710 /* Prepare a gnutls session object for handshake */ 711 int fd_tls_prepare(gnutls_session_t * session, int mode, char * priority) 712 { 713 /* Create the master session context */ 714 CHECK_GNUTLS_DO( gnutls_init (session, mode), return ENOMEM ); 715 716 /* Set the algorithm suite */ 717 if (priority) { 718 const char * errorpos; 719 CHECK_GNUTLS_DO( gnutls_priority_set_direct( *session, priority, &errorpos ), 720 { TRACE_DEBUG(INFO, "Error in priority string '%s' at position: '%s'\n", priority, errorpos); return EINVAL; } ); 721 } else { 722 CHECK_GNUTLS_DO( gnutls_priority_set( *session, fd_g_config->cnf_sec_data.prio_cache ), return EINVAL ); 723 } 724 725 /* Set the credentials of this side of the connection */ 726 CHECK_GNUTLS_DO( gnutls_credentials_set (*session, GNUTLS_CRD_CERTIFICATE, fd_g_config->cnf_sec_data.credentials), return EINVAL ); 727 728 /* Request the remote credentials as well */ 729 if (mode == GNUTLS_SERVER) { 730 gnutls_certificate_server_set_request (*session, GNUTLS_CERT_REQUIRE); 731 } 732 733 return 0; 432 734 } 433 735 … … 436 738 { 437 739 TRACE_ENTRY( "%p %d", conn, mode); 438 CHECK_PARAMS( conn && ( (mode == GNUTLS_CLIENT) || (mode == GNUTLS_SERVER)) );740 CHECK_PARAMS( conn && (!conn->cc_tls) && ( (mode == GNUTLS_CLIENT) || (mode == GNUTLS_SERVER) ) && (!conn->cc_loop) ); 439 741 440 742 /* Save the mode */ 441 743 conn->cc_tls_para.mode = mode; 442 443 /* Create the master session context */ 444 CHECK_GNUTLS_DO( gnutls_init (&conn->cc_tls_para.session, mode), return ENOMEM ); 445 446 /* Set the algorithm suite */ 447 TODO("Use overwrite priority if non NULL"); 448 CHECK_GNUTLS_DO( gnutls_priority_set( conn->cc_tls_para.session, fd_g_config->cnf_sec_data.prio_cache ), return EINVAL ); 449 450 /* Set the credentials of this side of the connection */ 451 CHECK_GNUTLS_DO( gnutls_credentials_set (conn->cc_tls_para.session, GNUTLS_CRD_CERTIFICATE, fd_g_config->cnf_sec_data.credentials), return EINVAL ); 452 453 /* Request the remote credentials as well */ 454 if (mode == GNUTLS_SERVER) { 455 gnutls_certificate_server_set_request (conn->cc_tls_para.session, GNUTLS_CERT_REQUIRE); 456 } 457 458 /* Set the socket info in the session */ 459 gnutls_transport_set_ptr (conn->cc_tls_para.session, (gnutls_transport_ptr_t) conn->cc_socket); 744 745 /* Cancel receiving thread if any -- it should already be terminated anyway, we just release the resources */ 746 CHECK_FCT_DO( fd_thr_term(&conn->cc_rcvthr), /* continue */); 747 748 /* Once TLS handshake is done, we don't stop after the first message */ 749 conn->cc_loop = 1; 750 751 /* Prepare the master session credentials and priority */ 752 CHECK_FCT( fd_tls_prepare(&conn->cc_tls_para.session, mode, priority) ); 460 753 461 754 /* Special case: multi-stream TLS is not natively managed in GNU TLS, we use a wrapper library */ 462 if ( (conn->cc_proto == IPPROTO_SCTP) && (conn->cc_sctp_para.pairs > 0)) {463 #if ndef DISABLE_SCTP464 TODO("Initialize the SCTP TLS wrapper");465 TODO("Set the lowat, push and pull functions");755 if (conn->cc_sctp_para.pairs > 1) { 756 #ifdef DISABLE_SCTP 757 ASSERT(0); 758 CHECK_FCT( ENOTSUP ); 466 759 #else /* DISABLE_SCTP */ 467 ASSERT(0); 760 /* Initialize the wrapper, start the demux thread */ 761 CHECK_FCT( fd_sctps_init(conn) ); 468 762 #endif /* DISABLE_SCTP */ 763 } else { 764 /* Set the socket info in the session */ 765 gnutls_transport_set_ptr (conn->cc_tls_para.session, (gnutls_transport_ptr_t) conn->cc_socket); 469 766 } 470 767 … … 500 797 } 501 798 502 /* Other sessions in case of multi-stream SCTP are resumed from the master*/503 if ( (conn->cc_proto == IPPROTO_SCTP) && (conn->cc_sctp_para.pairs > 0)) {799 /* Multi-stream TLS: handshake other streams as well */ 800 if (conn->cc_sctp_para.pairs > 1) { 504 801 #ifndef DISABLE_SCTP 505 TODO("Init and resume all additional sessions from the master one."); 802 /* Resume all additional sessions from the master one. */ 803 CHECK_FCT(fd_sctps_handshake_others(conn, priority)); 804 805 /* Start decrypting the messages from all threads and queuing them in target queue */ 806 CHECK_FCT(fd_sctps_startthreads(conn)); 506 807 #endif /* DISABLE_SCTP */ 507 } 508 509 TODO("Start the connection state machine thread"); 808 } else { 809 /* Start decrypting the data */ 810 CHECK_POSIX( pthread_create( &conn->cc_rcvthr, NULL, rcvthr_tls_single, conn ) ); 811 } 510 812 511 813 return 0; … … 515 817 int fd_cnx_getcred(struct cnxctx * conn, const gnutls_datum_t **cert_list, unsigned int *cert_list_size) 516 818 { 517 518 TODO("..."); 519 return ENOTSUP; 520 } 521 522 /* Get the list of endpoints (IP addresses) of the local and remote peers on this connection */ 523 int fd_cnx_getendpoints(struct cnxctx * conn, struct fd_list * local, struct fd_list * remote) 524 { 525 TRACE_ENTRY("%p %p %p", conn, local, remote); 526 CHECK_PARAMS(conn); 527 528 if (local) { 529 /* Retrieve the local endpoint(s) of the connection */ 530 switch (conn->cc_proto) { 531 case IPPROTO_TCP: { 532 sSS ss; 533 socklen_t sl; 534 CHECK_FCT(fd_tcp_get_local_ep(conn->cc_socket, &ss, &sl)); 535 CHECK_FCT(fd_ep_add_merge( local, (sSA *)&ss, sl, EP_FL_LL | EP_FL_PRIMARY)); 536 } 819 TRACE_ENTRY("%p %p %p", conn, cert_list, cert_list_size); 820 CHECK_PARAMS( conn && (conn->cc_tls) && cert_list && cert_list_size ); 821 822 /* This function only works for X.509 certificates. */ 823 CHECK_PARAMS( gnutls_certificate_type_get (conn->cc_tls_para.session) == GNUTLS_CRT_X509 ); 824 825 *cert_list = gnutls_certificate_get_peers (conn->cc_tls_para.session, cert_list_size); 826 if (*cert_list == NULL) { 827 TRACE_DEBUG(INFO, "No certificate was provided by remote peer / an error occurred."); 828 return EINVAL; 829 } 830 831 TRACE_DEBUG( FULL, "Remote peer provided %d certificates.\n", *cert_list_size); 832 833 return 0; 834 } 835 836 /* Receive next message. if timeout is not NULL, wait only until timeout. This function only pulls from a queue, mgr thread is filling that queue aynchrounously. */ 837 int fd_cnx_receive(struct cnxctx * conn, struct timespec * timeout, unsigned char **buf, size_t * len) 838 { 839 int ev; 840 size_t ev_sz; 841 void * ev_data; 842 843 TRACE_ENTRY("%p %p %p %p", conn, timeout, buf, len); 844 CHECK_PARAMS(conn && (conn->cc_socket > 0) && buf && len); 845 CHECK_PARAMS(conn->cc_rcvthr != (pthread_t)NULL); 846 CHECK_PARAMS(conn->cc_alt == NULL); 847 848 /* Now, pull the first event */ 849 get_next: 850 if (timeout) { 851 CHECK_FCT( fd_event_timedget(conn->cc_incoming, timeout, FDEVP_PSM_TIMEOUT, &ev, &ev_sz, &ev_data) ); 852 } else { 853 CHECK_FCT( fd_event_get(conn->cc_incoming, &ev, &ev_sz, &ev_data) ); 854 } 855 856 switch (ev) { 857 case FDEVP_CNX_MSG_RECV: 858 /* We got one */ 859 *len = ev_sz; 860 *buf = ev_data; 861 return 0; 862 863 case FDEVP_PSM_TIMEOUT: 864 TRACE_DEBUG(FULL, "Timeout event received"); 865 return ETIMEDOUT; 866 867 case FDEVP_CNX_EP_CHANGE: 868 /* We ignore this event */ 869 goto get_next; 870 871 case FDEVP_CNX_ERROR: 872 TRACE_DEBUG(FULL, "Received ERROR event on the connection"); 873 return ENOTCONN; 874 } 875 876 TRACE_DEBUG(INFO, "Received unexpected event %d (%s)", ev, fd_pev_str(ev)); 877 return EINVAL; 878 } 879 880 /* Set an alternate FIFO list to send FDEVP_CNX_* events to */ 881 int fd_cnx_recv_setaltfifo(struct cnxctx * conn, struct fifo * alt_fifo) 882 { 883 TRACE_ENTRY( "%p %p", conn, alt_fifo ); 884 CHECK_PARAMS( conn && alt_fifo && conn->cc_incoming ); 885 886 /* The magic function does it all */ 887 CHECK_FCT( fd_fifo_move( &conn->cc_incoming, alt_fifo, &conn->cc_alt ) ); 888 889 return 0; 890 } 891 892 /* Send function when no multi-stream is involved, or sending on stream #0 (send() always use stream 0)*/ 893 static int send_simple(struct cnxctx * conn, unsigned char * buf, size_t len) 894 { 895 ssize_t ret; 896 size_t sent = 0; 897 TRACE_ENTRY("%p %p %g", conn, buf, len); 898 do { 899 if (conn->cc_tls) { 900 CHECK_GNUTLS_DO( ret = gnutls_record_send (conn->cc_tls_para.session, buf + sent, len - sent), return ENOTCONN ); 901 } else { 902 CHECK_SYS( ret = send(conn->cc_socket, buf + sent, len - sent, 0) ); /* better to replace with sendmsg for atomic sending? */ 903 } 904 sent += ret; 905 } while ( sent < len ); 906 return 0; 907 } 908 909 /* Send a message -- this is synchronous -- and we assume it's never called by several threads at the same time, so we don't protect. */ 910 int fd_cnx_send(struct cnxctx * conn, unsigned char * buf, size_t len) 911 { 912 TRACE_ENTRY("%p %p %g", conn, buf, len); 913 914 CHECK_PARAMS(conn && (conn->cc_socket > 0) && buf && len); 915 916 TRACE_DEBUG(FULL, "Sending %gb %sdata on connection %s", len, conn->cc_tls ? "TLS-protected ":"", conn->cc_id); 917 918 switch (conn->cc_proto) { 919 case IPPROTO_TCP: 920 CHECK_FCT( send_simple(conn, buf, len) ); 537 921 break; 538 539 #ifndef DISABLE_SCTP 540 case IPPROTO_SCTP: { 541 CHECK_FCT(fd_sctp_get_local_ep(conn->cc_socket, local)); 542 } 543 break; 544 #endif /* DISABLE_SCTP */ 545 546 default: 547 CHECK_PARAMS(0); 548 } 549 } 550 551 if (remote) { 552 /* Check we have a full connection object, not a listening socket (with no remote) */ 553 CHECK_PARAMS( conn->cc_events ); 554 555 /* Retrieve the peer endpoint(s) of the connection */ 556 switch (conn->cc_proto) { 557 case IPPROTO_TCP: { 558 sSS ss; 559 socklen_t sl; 560 CHECK_FCT(fd_tcp_get_remote_ep(conn->cc_socket, &ss, &sl)); 561 CHECK_FCT(fd_ep_add_merge( remote, (sSA *)&ss, sl, EP_FL_LL | EP_FL_PRIMARY )); 562 } 563 break; 564 565 #ifndef DISABLE_SCTP 566 case IPPROTO_SCTP: { 567 CHECK_FCT(fd_sctp_get_remote_ep(conn->cc_socket, remote)); 568 } 569 break; 570 #endif /* DISABLE_SCTP */ 571 572 default: 573 CHECK_PARAMS(0); 574 } 575 } 576 922 923 #ifndef DISABLE_SCTP 924 case IPPROTO_SCTP: { 925 int multistr = 0; 926 927 if ((conn->cc_sctp_para.str_out > 1) && ((! conn->cc_tls) || (conn->cc_sctp_para.pairs > 1))) { 928 /* Update the id of the stream we will send this message on */ 929 conn->cc_sctp_para.next += 1; 930 conn->cc_sctp_para.next %= (conn->cc_tls ? conn->cc_sctp_para.pairs : conn->cc_sctp_para.str_out); 931 multistr = 1; 932 } 933 934 if ((!multistr) || (conn->cc_sctp_para.next == 0)) { 935 CHECK_FCT( send_simple(conn, buf, len) ); 936 } else { 937 if (!conn->cc_tls) { 938 CHECK_FCT( fd_sctp_sendstr(conn->cc_socket, conn->cc_sctp_para.next, buf, len) ); 939 } else { 940 /* push the record to the appropriate session */ 941 ssize_t ret; 942 size_t sent = 0; 943 ASSERT(conn->cc_sctps_data.array != NULL); 944 do { 945 CHECK_GNUTLS_DO( ret = gnutls_record_send (conn->cc_sctps_data.array[conn->cc_sctp_para.next - 1].session, buf + sent, len - sent), { TODO("Handle error (re-handshake, etc.."); return ENOTCONN; } ); 946 sent += ret; 947 } while ( sent < len ); 948 } 949 } 950 } 951 break; 952 #endif /* DISABLE_SCTP */ 953 954 default: 955 TRACE_DEBUG(INFO, "Unknwon protocol: %d", conn->cc_proto); 956 return ENOTSUP; /* or EINVAL... */ 957 } 958 577 959 return 0; 578 960 } 579 961 580 962 581 /* Get a string describing the remote peer address (ip address or fqdn) */ 582 char * fd_cnx_getremoteid(struct cnxctx * conn) 583 { 584 CHECK_PARAMS_DO( conn, return "" ); 585 return conn->cc_remid; 586 } 587 588 589 /* Receive next message. if timeout is not NULL, wait only until timeout */ 590 int fd_cnx_receive(struct cnxctx * conn, struct timespec * timeout, unsigned char **buf, size_t * len) 591 { 592 593 TODO("..."); 594 return ENOTSUP; 595 } 596 597 /* Set / reset alternate FIFO list to send FDEVP_CNX_MSG_RECV to when message is received */ 598 int fd_cnx_recv_setaltfifo(struct cnxctx * conn, struct fifo * alt_fifo) 599 { 600 TRACE_ENTRY( "%p %p", conn, alt_fifo ); 601 CHECK_PARAMS( conn ); 602 603 /* Let's cross fingers that there is no race condition here... */ 604 conn->cc_alt = alt_fifo; 605 606 return 0; 607 } 608 609 /* Send a message */ 610 int fd_cnx_send(struct cnxctx * conn, unsigned char * buf, size_t len) 611 { 612 613 TODO("..."); 614 return ENOTSUP; 615 } 616 963 /**************************************/ 964 /* Destruction of connection */ 965 /**************************************/ 617 966 618 967 /* Destroy a conn structure, and shutdown the socket */ … … 623 972 CHECK_PARAMS_DO(conn, return); 624 973 625 TODO("End TLS session(s) if started"); 626 627 TODO("Stop manager thread if running"); 974 /* In case of TLS, stop receiver thread, then close properly the gnutls session */ 975 if ((conn->cc_tls) && (conn->cc_sctp_para.pairs > 1)) { 976 #ifndef DISABLE_SCTP 977 /* Multi-stream TLS: Stop all decipher threads, but not the demux thread */ 978 fd_sctps_stopthreads(conn); 979 #endif /* DISABLE_SCTP */ 980 } else { 981 /* Stop the decoding thread */ 982 CHECK_FCT_DO( fd_thr_term(&conn->cc_rcvthr), /* continue */ ); 983 } 984 985 /* Terminate properly the TLS session(s) */ 986 if (conn->cc_tls) { 987 /* Master session */ 988 CHECK_GNUTLS_DO( gnutls_bye(conn->cc_tls_para.session, GNUTLS_SHUT_RDWR), /* Continue */ ); 989 gnutls_deinit(conn->cc_tls_para.session); 990 991 #ifndef DISABLE_SCTP 992 if (conn->cc_sctp_para.pairs > 1) { 993 /* Multi-stream TLS: destroy the wrapper and stop the demux thread */ 994 fd_sctps_destroy(conn); 995 } 996 #endif /* DISABLE_SCTP */ 997 998 } 628 999 629 1000 /* Shut the connection down */ … … 632 1003 } 633 1004 634 TODO("Empty FIFO queues"); 635 636 /* Destroy FIFO lists */ 637 if (conn->cc_events) 638 CHECK_FCT_DO( fd_fifo_del ( &conn->cc_events ), /* continue */ ); 639 if (conn->cc_incoming) 640 CHECK_FCT_DO( fd_fifo_del ( &conn->cc_incoming ), /* continue */ ); 1005 /* Empty and destroy FIFO list */ 1006 if (conn->cc_incoming) { 1007 fd_event_destroy( &conn->cc_incoming, free ); 1008 } 641 1009 642 1010 /* Free the object */
Note: See TracChangeset
for help on using the changeset viewer.