comparison libfdcore/p_psm.c @ 1103:d8591b1c56cd

Implemented a few hooks
author Sebastien Decugis <sdecugis@freediameter.net>
date Fri, 10 May 2013 18:48:57 +0800
parents 7d7266115a34
children 757df62cadb6
comparison
equal deleted inserted replaced
1102:1d7b3ebda27f 1103:d8591b1c56cd
480 480
481 /* A message was received */ 481 /* A message was received */
482 if (event == FDEVP_CNX_MSG_RECV) { 482 if (event == FDEVP_CNX_MSG_RECV) {
483 struct msg * msg = NULL; 483 struct msg * msg = NULL;
484 struct msg_hdr * hdr; 484 struct msg_hdr * hdr;
485 struct fd_cnx_rcvdata rcv_data;
486 struct fd_msg_pmdl * pmdl = NULL;
487
488 rcv_data.buffer = ev_data;
489 rcv_data.length = ev_sz;
490 pmdl = fd_msg_pmdl_get_inbuf(rcv_data.buffer, rcv_data.length);
485 491
486 /* Parse the received buffer */ 492 /* Parse the received buffer */
487 CHECK_FCT_DO( fd_msg_parse_buffer( (void *)&ev_data, ev_sz, &msg), 493 CHECK_FCT_DO( fd_msg_parse_buffer( (void *)&ev_data, ev_sz, &msg),
488 { 494 {
489 fd_log_debug("Received invalid data from peer '%s', closing the connection", peer->p_hdr.info.pi_diamid); 495 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, NULL, peer, &rcv_data, pmdl );
490 free(ev_data); 496 free(ev_data);
491 CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), goto psm_reset ); 497 CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), goto psm_reset );
492 goto psm_loop; 498 goto psm_loop;
493 } ); 499 } );
494 500
501 fd_hook_associate(msg, pmdl);
502
495 /* If the current state does not allow receiving messages, just drop it */ 503 /* If the current state does not allow receiving messages, just drop it */
496 if (cur_state == STATE_CLOSED) { 504 if (cur_state == STATE_CLOSED) {
497 /* In such case, just discard the message */ 505 /* In such case, just discard the message */
498 //fd_msg_log( FD_MSG_LOG_DROPPED, msg, "Purged from peer '%s''s queue (CLOSED state).", peer->p_hdr.info.pi_diamid ); 506 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, peer, "Message purged from queue, peer in CLOSED state", fd_msg_pmdl_get(msg));
499 fd_msg_free(msg); 507 fd_msg_free(msg);
500 goto psm_loop; 508 goto psm_loop;
501 } 509 }
502 510
503 /* Log incoming message */
504 //fd_msg_log( FD_MSG_LOG_RECEIVED, msg, "Received %zdb from '%s' (%s)", ev_sz, peer->p_hdr.info.pi_diamid, STATE_STR(cur_state) );
505
506 /* Extract the header */ 511 /* Extract the header */
507 CHECK_FCT_DO( fd_msg_hdr(msg, &hdr), goto psm_end ); 512 CHECK_FCT_DO( fd_msg_hdr(msg, &hdr), goto psm_end );
508 513
509 /* If it is an answer, associate with the request or drop */ 514 /* If it is an answer, associate with the request or drop */
510 if (!(hdr->msg_flags & CMD_FLAG_REQUEST)) { 515 if (!(hdr->msg_flags & CMD_FLAG_REQUEST)) {
511 struct msg * req; 516 struct msg * req;
512 /* Search matching request (same hbhid) */ 517 /* Search matching request (same hbhid) */
513 CHECK_FCT_DO( fd_p_sr_fetch(&peer->p_sr, hdr->msg_hbhid, &req), goto psm_end ); 518 CHECK_FCT_DO( fd_p_sr_fetch(&peer->p_sr, hdr->msg_hbhid, &req), goto psm_end );
514 if (req == NULL) { 519 if (req == NULL) {
515 //fd_msg_log( FD_MSG_LOG_DROPPED, msg, "Answer received with no corresponding sent request." ); 520 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, peer, "Answer received with no corresponding sent request.", fd_msg_pmdl_get(msg));
516 fd_msg_free(msg); 521 fd_msg_free(msg);
517 goto psm_loop; 522 goto psm_loop;
518 } 523 }
519 524
520 /* Associate */ 525 /* Associate */
521 CHECK_FCT_DO( fd_msg_answ_associate( msg, req ), goto psm_end ); 526 CHECK_FCT_DO( fd_msg_answ_associate( msg, req ), goto psm_end );
522 527
523 /* Display the delay to receive the answer */
524 {
525 //fd_msg_log( FD_MSG_LOG_TIMING, msg, "Answer received in %ld.%6.6ld sec.", (long)delay.tv_sec, delay.tv_nsec / 1000 );
526 }
527 } else { 528 } else {
528 /* Mark the incoming request so that we know we have pending answers for this peer */ 529 /* Mark the incoming request so that we know we have pending answers for this peer */
529 CHECK_POSIX_DO( pthread_mutex_lock(&peer->p_state_mtx), goto psm_end ); 530 CHECK_POSIX_DO( pthread_mutex_lock(&peer->p_state_mtx), goto psm_end );
530 peer->p_reqin_count++; 531 peer->p_reqin_count++;
531 CHECK_POSIX_DO( pthread_mutex_unlock(&peer->p_state_mtx), goto psm_end ); 532 CHECK_POSIX_DO( pthread_mutex_unlock(&peer->p_state_mtx), goto psm_end );
532 } 533 }
534
535 /* Log incoming message */
536 fd_hook_call(HOOK_MESSAGE_RECEIVED, msg, peer, NULL, fd_msg_pmdl_get(msg));
533 537
534 if (cur_state == STATE_OPEN_NEW) { 538 if (cur_state == STATE_OPEN_NEW) {
535 /* OK, we have received something, so the connection is supposedly now in OPEN state at the remote site */ 539 /* OK, we have received something, so the connection is supposedly now in OPEN state at the remote site */
536 fd_psm_change_state(peer, STATE_OPEN ); 540 fd_psm_change_state(peer, STATE_OPEN );
537 } 541 }
566 /* In other states, we discard the message, it is either old or invalid to send it for the remote peer */ 570 /* In other states, we discard the message, it is either old or invalid to send it for the remote peer */
567 case STATE_WAITCNXACK: 571 case STATE_WAITCNXACK:
568 case STATE_WAITCNXACK_ELEC: 572 case STATE_WAITCNXACK_ELEC:
569 case STATE_WAITCEA: 573 case STATE_WAITCEA:
570 case STATE_CLOSED: 574 case STATE_CLOSED:
571 default: 575 default: {
572 /* In such case, just discard the message */ 576 /* In such case, just discard the message */
573 //fd_msg_log( FD_MSG_LOG_DROPPED, msg, "Received from peer '%s' while connection was not in state %s.", peer->p_hdr.info.pi_diamid, STATE_STR(cur_state) ); 577 char buf[128];
578 snprintf(buf, sizeof(buf), "Received while peer state machine was in state %s.", STATE_STR(cur_state));
579 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, peer, buf, fd_msg_pmdl_get(msg));
574 fd_msg_free(msg); 580 fd_msg_free(msg);
581 }
575 } 582 }
576 goto psm_loop; 583 goto psm_loop;
577 } 584 }
578 585
579 /* Link-local message: They must be understood by our dictionary, otherwise we return an error */ 586 /* Link-local message: They must be understood by our dictionary, otherwise we return an error */
580 { 587 {
581 int ret = fd_msg_parse_or_error( &msg ); 588 struct msg * error = NULL;
589 int ret = fd_msg_parse_or_error( &msg, &error );
582 if (ret != EBADMSG) { 590 if (ret != EBADMSG) {
583 CHECK_FCT_DO( ret, goto psm_end ); 591 CHECK_FCT_DO( ret,
592 {
593 LOG_E("%s: An unexpected error occurred while parsing a link-local message", peer->p_hdr.info.pi_diamid);
594 fd_msg_free(msg);
595 goto psm_end;
596 } );
584 } else { 597 } else {
585 if (msg) { 598 if (msg == NULL) {
586 /* Send the error back to the peer */ 599 /* Send the error back to the peer */
587 CHECK_FCT_DO( ret = fd_out_send(&msg, NULL, peer, FD_CNX_ORDERED), ); 600 CHECK_FCT_DO( ret = fd_out_send(&error, NULL, peer, FD_CNX_ORDERED), );
588 if (msg) { 601 if (error) {
589 /* Only if an error occurred & the message was not saved / dumped */ 602 /* Only if an error occurred & the message was not saved / dumped */
590 //fd_msg_log( FD_MSG_LOG_DROPPED, msg, "Internal error: Problem while sending (%s)", strerror(ret) ); 603 LOG_E("%s: error sending a message", peer->p_hdr.info.pi_diamid);
591 CHECK_FCT_DO( fd_msg_free(msg), goto psm_end); 604 CHECK_FCT_DO( fd_msg_free(error), goto psm_end);
592 } 605 }
593 } else { 606 } else {
594 /* We received an invalid answer, let's disconnect */ 607 /* We received an invalid answer, let's disconnect */
608 LOG_E("%s: Received invalid answer to Base protocol message, disconnecting...", peer->p_hdr.info.pi_diamid);
609 CHECK_FCT_DO( fd_msg_free(msg), goto psm_end);
595 CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), goto psm_reset ); 610 CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), goto psm_reset );
596 } 611 }
597 goto psm_loop; 612 goto psm_loop;
598 } 613 }
599 } 614 }
600 615
601 /* Handle the LL message and update the expiry timer appropriately */ 616 /* Handle the LL message and update the expiry timer appropriately */
602 switch (hdr->msg_code) { 617 switch (hdr->msg_code) {
603 case CC_CAPABILITIES_EXCHANGE: 618 case CC_CAPABILITIES_EXCHANGE:
604 CHECK_FCT_DO( fd_p_ce_msgrcv(&msg, (hdr->msg_flags & CMD_FLAG_REQUEST), peer), goto psm_reset ); 619 CHECK_FCT_DO( fd_p_ce_msgrcv(&msg, (hdr->msg_flags & CMD_FLAG_REQUEST), peer),
620 {
621 if (msg)
622 CHECK_FCT_DO( fd_msg_free(msg), );
623 goto psm_reset;
624 } );
605 break; 625 break;
606 626
607 case CC_DISCONNECT_PEER: 627 case CC_DISCONNECT_PEER:
608 CHECK_FCT_DO( fd_p_dp_handle(&msg, (hdr->msg_flags & CMD_FLAG_REQUEST), peer), goto psm_reset ); 628 CHECK_FCT_DO( fd_p_dp_handle(&msg, (hdr->msg_flags & CMD_FLAG_REQUEST), peer), goto psm_reset );
609 if (fd_peer_getstate(peer) == STATE_CLOSING) 629 if (fd_peer_getstate(peer) == STATE_CLOSING)
668 case STATE_WAITCNXACK: 688 case STATE_WAITCNXACK:
669 case STATE_SUSPECT: 689 case STATE_SUSPECT:
670 default: 690 default:
671 /* Mark the connection problem */ 691 /* Mark the connection problem */
672 peer->p_flags.pf_cnx_pb = 1; 692 peer->p_flags.pf_cnx_pb = 1;
693
694 fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, peer, "The connection was broken", NULL);
673 695
674 /* Destroy the connection, restart the timer to a new connection attempt */ 696 /* Destroy the connection, restart the timer to a new connection attempt */
675 fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_tctimer ?: fd_g_config->cnf_timer_tc); 697 fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_tctimer ?: fd_g_config->cnf_timer_tc);
676 698
677 case STATE_CLOSED: 699 case STATE_CLOSED:
729 if (params->cnx) { 751 if (params->cnx) {
730 fd_cnx_destroy(params->cnx); 752 fd_cnx_destroy(params->cnx);
731 params->cnx = NULL; 753 params->cnx = NULL;
732 } 754 }
733 if (params->cer) { 755 if (params->cer) {
734 //fd_msg_log( FD_MSG_LOG_DROPPED, params->cer, "Internal error: this CER was not handled as expected." );
735 CHECK_FCT_DO( fd_msg_free(params->cer), ); 756 CHECK_FCT_DO( fd_msg_free(params->cer), );
736 params->cer = NULL; 757 params->cer = NULL;
737 } 758 }
738 759
739 /* Loop */ 760 /* Loop */
750 peer->p_ini_thr = (pthread_t)NULL; 771 peer->p_ini_thr = (pthread_t)NULL;
751 772
752 switch (cur_state) { 773 switch (cur_state) {
753 case STATE_WAITCNXACK_ELEC: 774 case STATE_WAITCNXACK_ELEC:
754 case STATE_WAITCNXACK: 775 case STATE_WAITCNXACK:
776 LOG_D("%s: Connection established", peer->p_hdr.info.pi_diamid);
755 fd_p_ce_handle_newcnx(peer, cnx); 777 fd_p_ce_handle_newcnx(peer, cnx);
756 break; 778 break;
757 779
758 default: 780 default:
759 /* Just abort the attempt and continue */ 781 /* Just abort the attempt and continue */
778 /* Process the receiver side */ 800 /* Process the receiver side */
779 CHECK_FCT_DO( fd_p_ce_process_receiver(peer), goto psm_end ); 801 CHECK_FCT_DO( fd_p_ce_process_receiver(peer), goto psm_end );
780 break; 802 break;
781 803
782 case STATE_WAITCNXACK: 804 case STATE_WAITCNXACK:
805 LOG_D("%s: Connection attempt failed", peer->p_hdr.info.pi_diamid);
783 /* Go back to CLOSE */ 806 /* Go back to CLOSE */
784 fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_tctimer ?: fd_g_config->cnf_timer_tc); 807 fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_tctimer ?: fd_g_config->cnf_timer_tc);
785 goto psm_reset; 808 goto psm_reset;
786 809
787 default: 810 default:
800 case STATE_OPEN_NEW: 823 case STATE_OPEN_NEW:
801 CHECK_FCT_DO( fd_p_dw_timeout(peer), goto psm_end ); 824 CHECK_FCT_DO( fd_p_dw_timeout(peer), goto psm_end );
802 goto psm_loop; 825 goto psm_loop;
803 826
804 case STATE_CLOSED: 827 case STATE_CLOSED:
828 LOG_D("%s: Connecting...", peer->p_hdr.info.pi_diamid);
805 CHECK_FCT_DO( fd_psm_change_state(peer, STATE_WAITCNXACK), goto psm_end ); 829 CHECK_FCT_DO( fd_psm_change_state(peer, STATE_WAITCNXACK), goto psm_end );
806 fd_psm_next_timeout(peer, 0, CNX_TIMEOUT); 830 fd_psm_next_timeout(peer, 0, CNX_TIMEOUT);
807 CHECK_FCT_DO( fd_p_cnx_init(peer), goto psm_end ); 831 CHECK_FCT_DO( fd_p_cnx_init(peer), goto psm_end );
808 goto psm_loop; 832 goto psm_loop;
809 833
810 case STATE_SUSPECT: 834 case STATE_SUSPECT:
811 /* Mark the connection problem */ 835 /* Mark the connection problem */
812 peer->p_flags.pf_cnx_pb = 1; 836 peer->p_flags.pf_cnx_pb = 1;
813 case STATE_CLOSING:
814 case STATE_WAITCNXACK: 837 case STATE_WAITCNXACK:
815 case STATE_WAITCEA: 838 case STATE_WAITCEA:
839 fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, peer, "Timeout while waiting for remote peer", NULL);
840 case STATE_CLOSING:
816 /* Destroy the connection, restart the timer to a new connection attempt */ 841 /* Destroy the connection, restart the timer to a new connection attempt */
817 fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_tctimer ?: fd_g_config->cnf_timer_tc); 842 fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_tctimer ?: fd_g_config->cnf_timer_tc);
818 goto psm_reset; 843 goto psm_reset;
819 844
820 case STATE_CLOSING_GRACE: 845 case STATE_CLOSING_GRACE:
846 goto psm_end; 871 goto psm_end;
847 fd_psm_cleanup(peer, 0); 872 fd_psm_cleanup(peer, 0);
848 goto psm_loop; 873 goto psm_loop;
849 874
850 psm_end: 875 psm_end:
876 LOG_N("%s: Going to ZOMBIE state (no more activity)", peer->p_hdr.info.pi_diamid);
851 fd_psm_cleanup(peer, 1); 877 fd_psm_cleanup(peer, 1);
852 TRACE_DEBUG(INFO, "'%s'\t-> STATE_ZOMBIE (terminated)\t'%s'", 878 TRACE_DEBUG(INFO, "'%s'\t-> STATE_ZOMBIE (terminated)\t'%s'",
853 STATE_STR(fd_peer_getstate(peer)), 879 STATE_STR(fd_peer_getstate(peer)),
854 peer->p_hdr.info.pi_diamid); 880 peer->p_hdr.info.pi_diamid);
855 pthread_cleanup_pop(1); /* set STATE_ZOMBIE */ 881 pthread_cleanup_pop(1); /* set STATE_ZOMBIE */
"Welcome to our mercurial repository"