view waaad/peer-out.c @ 371:e86dba02630a

Updated copyright information
author Sebastien Decugis <sdecugis@nict.go.jp>
date Mon, 25 May 2009 14:51:46 +0900
parents b9b74d6ac29e
children
line wrap: on
line source

/*********************************************************************************************************
* Software License Agreement (BSD License)                                                               *
* Author: Sebastien Decugis <sdecugis@nict.go.jp>							 *
*													 *
* Copyright (c) 2009, WIDE Project and NICT								 *
* All rights reserved.											 *
* 													 *
* Redistribution and use of this software in source and binary forms, with or without modification, are  *
* permitted provided that the following conditions are met:						 *
* 													 *
* * Redistributions of source code must retain the above 						 *
*   copyright notice, this list of conditions and the 							 *
*   following disclaimer.										 *
*    													 *
* * Redistributions in binary form must reproduce the above 						 *
*   copyright notice, this list of conditions and the 							 *
*   following disclaimer in the documentation and/or other						 *
*   materials provided with the distribution.								 *
* 													 *
* * Neither the name of the WIDE Project or NICT nor the 						 *
*   names of its contributors may be used to endorse or 						 *
*   promote products derived from this software without 						 *
*   specific prior written permission of WIDE Project and 						 *
*   NICT.												 *
* 													 *
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 	 *
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 	 *
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF   *
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.								 *
*********************************************************************************************************/

/* Peers facility.
 *
 * This file contains the code to send messages pending on a peer.
 *
 */

#include "waaad-internal.h"
#include "peer-internal.h"

/* A structure that is used in the next functions */
struct requeue_data {
	  msg_t * msg;
	_peer_t * peer;
};

/* If thread is canceled while processing a message, we have to requeue this message so it is not lost */
static void cleanup_requeue(void * data)
{
	struct requeue_data * rd = (struct requeue_data *)data;
	
	TRACE_ENTRY( "%p", data);
	
	CHECK_PARAMS_DO( data,  return );
	
	if (rd->msg) {
		CHECK_FCT_DO(  meq_post(rd->peer->p_out_q, rd->msg),  { (void) msg_free(rd->msg, 1); return; }  );
		rd->msg = NULL;
	}
	
	return;
}


/* The function that actually renders a msg_t structure and write it to the socket, using security mechanism */
int _peer_out_sendmsg(_peer_t * peer, msg_t * msg)
{
	unsigned char *buf = NULL;
	size_t len;
	
	TRACE_DEBUG(FULL, "Sending message to peer '%s':", peer->p_diamid);
	msg_dump_walk(FULL, msg);

	/* Render the message as a buffer */
	CHECK_FCT_DO(  msg_bufferize ( msg, &buf, &len ),
		{
			/* What are we supposed to do? just discard the message or start failover? */
			/* => let's discard this message, with a log entry */
			log_error("An error occurred while generating a message for sending to peer '%s', message dropped.\n", peer->p_diamid);
			CHECK_FCT_DO(msg_free(msg, 1), );
			return 0; /* is this OK with the cancelation handler??? */
		}  );

	pthread_cleanup_push( cleanup_buffer, buf );

	/* Now send the message using the appropriate security module */
	ASSERT( peer && peer->p_secmod && peer->p_secmod->sec_send_protect );
	CHECK_FCT(  (*peer->p_secmod->sec_send_protect) ( &peer->p_sec_session, &peer->p_ext_session, buf, len ) );

	/* free the buffer */
	pthread_cleanup_pop( 1 );
	return 0;
}

int _peer_out_savesentreq(_peer_t * peer, msg_t * msg, uint32_t hbh)
{
	uti_list_t * next;
	p_sent_t * new;
	
	
	CHECK_MALLOC( new = (p_sent_t *)malloc(sizeof(p_sent_t)) );

	uti_list_init(&new->chain, msg);
	new->hbh = hbh;
	
	/* Update the internal "routable" data now, so that the failover operation is faster */
	(void)msg_is_routable(msg);

	/* Now search place to save this hop-by-hop id in the sent messages. 
	  Since hbh is a monotonic incrementing counter, we search backward. 
	  (we actually have to search only in case where counter overflowed, 
	  otherwise it's always added at the last position)
	  */
	for (next = &peer->p_sent; next->prev != &peer->p_sent; next = next->prev) {
		if (((p_sent_t *)(next->prev))->hbh < hbh)
			break;
	}

	/* Now save the new message in the list */
	uti_list_insert_before(next, &new->chain);

	TRACE_DEBUG(FULL, "Request (hbh %x) saved in list for peer '%s'", hbh, peer->p_diamid);
	
	return 0;
}	

/* The code of the p_out_th tread */
/* This thread pulls messages from the p_out_q queue, and deal with them */
static void * _peer_out_th(void * arg)
{
	struct requeue_data rd;
	
	THREAD_NAME_PEER( "Outgoing", arg );
	TRACE_ENTRY( "%p", arg );
	
	CHECK_PARAMS_DO(  VALIDATE_PEER(arg), return NULL  );
	
	rd.peer = _P(arg);
	
	/* This thread loops until it is canceled */
	while (1) {
		msg_data_t * hdr;
		uint32_t hbh; /* the hbh to use for sending the message on next hop */
		
		pthread_testcancel();
			
		/* Get the next message to be sent. */
		CHECK_FCT_DO(  meq_get ( rd.peer->p_out_q, &(rd.msg) ),  goto error );
		
		/* We removed the message from the queue, we need to reput it if we're interrupted now */
		pthread_cleanup_push( cleanup_requeue, &rd );
		
		/* Get the header information of the message */
		CHECK_FCT_DO(  msg_data(rd.msg, &hdr),  {  CHECK_FCT_DO(msg_free(rd.msg, 1), ); goto error;  }  );
		
		hbh = hdr->msg_hbhid;
		
		/* Save it if it's a request */
		if (hdr->msg_flags & CMD_FLAG_REQUEST) {
			/* Get the next available hbh id -- no need to lock since we're the only thread modifying this value. */
			hbh = rd.peer->p_hbh;
			rd.peer->p_hbh = hbh + 1;
			
			/* Lock the peer */
			CHECK_POSIX_DO(  pthread_mutex_lock(&(rd.peer)->p_lock),  goto error  );
			
			/* Save the message in the sent queue first, so that if we are cancelled the failover will resend it */
			CHECK_FCT_DO( _peer_out_savesentreq( rd.peer, rd.msg, hbh ), 
					{
						CHECK_POSIX_DO(  pthread_mutex_unlock(&(rd.peer)->p_lock),  /* continue */  );
						log_error("An error occurred while saving a message to send to peer '%s', message dropped.", (rd.peer)->p_diamid);
						CHECK_FCT_DO(msg_free(rd.msg, 1), );
						continue;
					} );
		
			/* and unlock */
			CHECK_POSIX_DO(  pthread_mutex_unlock(&(rd.peer)->p_lock),  goto error  );
			
		}
		
		/* The message is now safe, we don't need to requeue it anymore */
		pthread_cleanup_pop( 0 );
		
		{
			uint32_t prev = hdr->msg_hbhid;
			
			hdr->msg_hbhid = hbh;
			
			/* Send the message */
			CHECK_FCT_DO( _peer_out_sendmsg(rd.peer, rd.msg),
					{
						log_error("An error occurred while sending a message to peer '%s'.", (rd.peer)->p_diamid);
					} );
			
			/* Restore */
			hdr->msg_hbhid = prev;
		}

		/* If it was an answer, we can discard the message */
		if ( ! (hdr->msg_flags & CMD_FLAG_REQUEST)) {
			
			CHECK_FCT_DO(msg_free(rd.msg, 1), /* should we fail? */ );
		}
		
		/* loop to the next message */
	}
	
error:
	/* Signal the parent thread that we have exited */
	CHECK_POSIX_DO(   _peer_events_send(rd.peer, PEVENT_TH_TERM_OUT, NULL),  /* ignore */ );

	/* Return */
	TRACE_DEBUG (FULL, "Thread is terminated.");
	return NULL;
}

/* Start the emitter thread for a peer */
int _peer_out_start(_peer_t * peer)
{
	CHECK_PARAMS( VALIDATE_PEER(peer) );
	CHECK_PARAMS( peer->p_secmod );
	CHECK_PARAMS( peer->p_sec_session.conn );
	
	CHECK_PARAMS( peer->p_out_q );
	CHECK_PARAMS( peer->p_out_th == (pthread_t)NULL );
	
	/* Ok the peer is in appropriate state, let's start the thread */
	CHECK_POSIX(  pthread_create( &peer->p_out_th, NULL, _peer_out_th, peer )  );
	
	/* done */
	return 0;
}

/* Stop this receiving thread */
int _peer_out_stop(_peer_t * peer)
{
	CHECK_PARAMS( VALIDATE_PEER(peer) );

	if (peer->p_out_th != (pthread_t)NULL)
		return _thread_term(&peer->p_out_th);
	else
		return 0;
}
"Welcome to our mercurial repository"