view libfdcore/p_sr.c @ 1240:0420ccc4671a

Add a counter for the sent requests for which we did not wait for an answer. It might be relevant this value contributes to the load estimate of the remote peer, but it is not very reliable
author Sebastien Decugis <sdecugis@freediameter.net>
date Thu, 10 Oct 2013 16:30:55 +0200
parents b059b9e8ef83
children c9a160b815ea
line wrap: on
line source

/*********************************************************************************************************
* Software License Agreement (BSD License)                                                               *
* Author: Sebastien Decugis <sdecugis@freediameter.net>							 *
*													 *
* Copyright (c) 2013, WIDE Project and NICT								 *
* All rights reserved.											 *
* 													 *
* Redistribution and use of this software in source and binary forms, with or without modification, are  *
* permitted provided that the following conditions are met:						 *
* 													 *
* * Redistributions of source code must retain the above 						 *
*   copyright notice, this list of conditions and the 							 *
*   following disclaimer.										 *
*    													 *
* * Redistributions in binary form must reproduce the above 						 *
*   copyright notice, this list of conditions and the 							 *
*   following disclaimer in the documentation and/or other						 *
*   materials provided with the distribution.								 *
* 													 *
* * Neither the name of the WIDE Project or NICT nor the 						 *
*   names of its contributors may be used to endorse or 						 *
*   promote products derived from this software without 						 *
*   specific prior written permission of WIDE Project and 						 *
*   NICT.												 *
* 													 *
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 	 *
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 	 *
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF   *
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.								 *
*********************************************************************************************************/

#include "fdcore-internal.h"

/* Structure to store a sent request */
struct sentreq {
	struct fd_list	chain; 	/* the "o" field points directly to the (new) hop-by-hop of the request (uint32_t *)  */
	struct msg	*req;	/* A request that was sent and not yet answered. */
	uint32_t	prevhbh;/* The value to set back in the hbh header when the message is retrieved */
	struct fd_list  expire; /* the list of expiring requests */
	struct timespec timeout; /* Cache the expire date of the request so that the timeout thread does not need to get it each time. */
	struct timespec added_on; /* the time the request was added */
};

/* Find an element in the hbh list, or the following one */
static struct fd_list * find_or_next(struct fd_list * srlist, uint32_t hbh, int * match)
{
	struct fd_list * li;
	*match = 0;
	for (li = srlist->next; li != srlist; li = li->next) {
		uint32_t * nexthbh = li->o;
		if (*nexthbh < hbh)
			continue;
		if (*nexthbh == hbh)
			*match = 1;
		break;
	}
	return li;
}

/* Similar but start from the end, since we add requests in growing hbh order usually */
static struct fd_list * find_or_prev(struct fd_list * srlist, uint32_t hbh, int * match)
{
	struct fd_list * li;
	*match = 0;
	for (li = srlist->prev; li != srlist; li = li->prev) {
		uint32_t * prevhbh = li->o;
		if (*prevhbh > hbh)
			continue;
		if (*prevhbh == hbh)
			*match = 1;
		break;
	}
	return li;
}

static void srl_dump(const char * text, struct fd_list * srlist)
{
	struct fd_list * li;
	struct timespec now;
	
	LOG_D("%sSentReq list @%p:", text, srlist);
	
	CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), );
	
	for (li = srlist->next; li != srlist; li = li->next) {
		struct sentreq * sr = (struct sentreq *)li;
		uint32_t * nexthbh = li->o;
		
		LOG_D(" - Next req (hbh:0x%x, prev:0x%x): [since %ld.%06ld sec]", *nexthbh, sr->prevhbh,
			(long)((now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_sec - sr->added_on.tv_sec) : (now.tv_sec - sr->added_on.tv_sec - 1)),
			(long)((now.tv_nsec >= sr->added_on.tv_nsec) ? ((now.tv_nsec - sr->added_on.tv_nsec) / 1000) : ((now.tv_nsec - sr->added_on.tv_nsec + 1000000000) / 1000)));
	}
}

/* thread that handles messages expiring. The thread is started only when needed */
static void * sr_expiry_th(void * arg) {
	struct sr_list * srlist = arg;
	
	TRACE_ENTRY("%p", arg);
	CHECK_PARAMS_DO( arg, return NULL );
	
	/* Set the thread name */
	{
		char buf[48];
		snprintf(buf, sizeof(buf), "ReqExp/%s", ((struct fd_peer *)(srlist->exp.o))->p_hdr.info.pi_diamid);
		fd_log_threadname ( buf );
	}
	
	do {
		struct timespec	now;
		struct sentreq * first;
		struct msg * request;
		struct fd_peer * sentto;
		void (*expirecb)(void *, DiamId_t, size_t, struct msg **);
		void (*anscb)(void *, struct msg **);
		void * data;
		int no_error;

		CHECK_POSIX_DO( pthread_mutex_lock(&srlist->mtx),  return NULL );
		pthread_cleanup_push( fd_cleanup_mutex, &srlist->mtx );

loop:	
		no_error = 0;

		/* Check if there are expiring requests available */
		if (FD_IS_LIST_EMPTY(&srlist->exp)) {
			/* Just wait for a change or cancelation */
			CHECK_POSIX_DO( pthread_cond_wait( &srlist->cnd, &srlist->mtx ), goto unlock );
			/* Restart the loop on wakeup */
			goto loop;
		}
		
		/* Get the pointer to the request that expires first */
		first = (struct sentreq *)(srlist->exp.next->o);
		
		/* Get the current time */
		CHECK_SYS_DO(  clock_gettime(CLOCK_REALTIME, &now),  goto unlock  );

		/* If first request is not expired, we just wait until it happens */
		if ( TS_IS_INFERIOR( &now, &first->timeout ) ) {
			
			CHECK_POSIX_DO2(  pthread_cond_timedwait( &srlist->cnd, &srlist->mtx, &first->timeout ),  
					ETIMEDOUT, /* ETIMEDOUT is a normal return value, continue */,
					/* on other error, */ goto unlock );
	
			/* on wakeup, loop */
			goto loop;
		}
		
		/* Now, the first request in the list is expired; remove it and call the expirecb for it */
		request = first->req;
		sentto = first->chain.head->o;
		
		TRACE_DEBUG(FULL, "Request %x was not answered by %s within the timer delay", *((uint32_t *)first->chain.o), sentto->p_hdr.info.pi_diamid);
		
		/* Restore the hbhid */
		*((uint32_t *)first->chain.o) = first->prevhbh; 
		
		/* Free the sentreq information */
		fd_list_unlink(&first->chain);
		srlist->cnt--;
		srlist->cnt_lost++; /* We are not waiting for this answer anymore, but the remote peer may still be processing it. */
		fd_list_unlink(&first->expire);
		free(first);
		
		no_error = 1;
unlock:
		; /* pthread_cleanup_pop sometimes expands as "} ..." and the label before this cause some compilers to complain... */
		pthread_cleanup_pop( 1 ); /* unlock the mutex */
		if (!no_error)
			break;

		
		/* Retrieve callback in the message */
		CHECK_FCT_DO( fd_msg_anscb_get( request, &anscb, &expirecb, &data ), break);
		ASSERT(expirecb);
	
		/* Clean up this expirecb from the message */
		CHECK_FCT_DO( fd_msg_anscb_associate( request, anscb, data, NULL, NULL ), break);

		/* Call it */
		(*expirecb)(data, sentto->p_hdr.info.pi_diamid, sentto->p_hdr.info.pi_diamidlen, &request);
	
		/* If the callback did not dispose of the message, do it now */
		if (request) {
			fd_hook_call(HOOK_MESSAGE_DROPPED, request, NULL, "Expiration period completed without an answer, and the expiry callback did not dispose of the message.", fd_msg_pmdl_get(request));
			CHECK_FCT_DO( fd_msg_free(request), /* ignore */ );
		}
	
	} while (1);
	
	ASSERT(0); /* we have encountered a problem, maybe time to signal the framework to terminate? */
	return NULL;
}


/* Store a new sent request */
int fd_p_sr_store(struct sr_list * srlist, struct msg **req, uint32_t *hbhloc, uint32_t hbh_restore)
{
	struct sentreq * sr;
	struct fd_list * prev;
	int match;
	struct timespec * ts;
	
	TRACE_ENTRY("%p %p %p %x", srlist, req, hbhloc, hbh_restore);
	CHECK_PARAMS(srlist && req && *req && hbhloc);
	
	CHECK_MALLOC( sr = malloc(sizeof(struct sentreq)) );
	memset(sr, 0, sizeof(struct sentreq));
	fd_list_init(&sr->chain, hbhloc);
	sr->req = *req;
	sr->prevhbh = hbh_restore;
	fd_list_init(&sr->expire, sr);
	CHECK_SYS( clock_gettime(CLOCK_REALTIME, &sr->added_on) );
	
	/* Search the place in the list */
	CHECK_POSIX( pthread_mutex_lock(&srlist->mtx) );
	prev = find_or_prev(&srlist->srs, *hbhloc, &match);
	if (match) {
		TRACE_DEBUG(INFO, "A request with the same hop-by-hop Id (0x%x) was already sent: error", *hbhloc);
		free(sr);
		srl_dump("Current list of SR: ", &srlist->srs);
		CHECK_POSIX_DO( pthread_mutex_unlock(&srlist->mtx), /* ignore */ );
		return EINVAL;
	}
	
	/* Save in the list */
	*req = NULL;
	fd_list_insert_after(prev, &sr->chain);
	srlist->cnt++;
	
	/* In case of request with a timeout, also store in the timeout list */
	ts = fd_msg_anscb_gettimeout( sr->req );
	if (ts) {
		struct fd_list * li;
		
		memcpy(&sr->timeout, ts, sizeof(struct timespec));
		
		/* browse srlist->exp from the end */
		for (li = srlist->exp.prev; li != &srlist->exp; li = li->prev) {
			struct sentreq * s = (struct sentreq *)(li->o);
			if (TS_IS_INFERIOR(&s->timeout, ts))
				break;
		}
		
		fd_list_insert_after(li, &sr->expire);
	
		/* if the thread does not exist yet, create it */
		if (srlist->thr == (pthread_t)NULL) {
			CHECK_POSIX_DO( pthread_create(&srlist->thr, NULL, sr_expiry_th, srlist), /* continue anyway */);
		} else {
			/* or, if added in first position, signal the condvar to update the sleep time of the thread */
			if (li == &srlist->exp) {
				CHECK_POSIX_DO( pthread_cond_signal(&srlist->cnd), /* continue anyway */);
			}
		}
	}
	
	CHECK_POSIX( pthread_mutex_unlock(&srlist->mtx) );
	return 0;
}

/* Fetch a request by hbh */
int fd_p_sr_fetch(struct sr_list * srlist, uint32_t hbh, struct msg **req)
{
	struct sentreq * sr;
	int match;
	
	TRACE_ENTRY("%p %x %p", srlist, hbh, req);
	CHECK_PARAMS(srlist && req);
	
	/* Search the request in the list */
	CHECK_POSIX( pthread_mutex_lock(&srlist->mtx) );
	sr = (struct sentreq *)find_or_next(&srlist->srs, hbh, &match);
	if (!match) {
		TRACE_DEBUG(INFO, "There is no saved request with this hop-by-hop id (%x)", hbh);
		srl_dump("Current list of SR: ", &srlist->srs);
		*req = NULL;
		if (srlist->cnt_lost > 0) {
			srlist->cnt_lost--; /* This is probably an answer for a request we already timedout. */
		} /* else, probably a bug in the remote peer */
	} else {
		/* Restore hop-by-hop id */
		*((uint32_t *)sr->chain.o) = sr->prevhbh;
		/* Unlink */
		fd_list_unlink(&sr->chain);
		srlist->cnt--;
		fd_list_unlink(&sr->expire);
		*req = sr->req;
		free(sr);
	}
	CHECK_POSIX( pthread_mutex_unlock(&srlist->mtx) );
	
	/* do not stop the expire thread here, it might cause creating/destroying it very often otherwise */

	/* Done */
	return 0;
}

/* Failover requests (free or requeue routables) */
void fd_p_sr_failover(struct sr_list * srlist)
{
	CHECK_POSIX_DO( pthread_mutex_lock(&srlist->mtx), /* continue anyway */ );
	while (!FD_IS_LIST_EMPTY(&srlist->srs)) {
		struct sentreq * sr = (struct sentreq *)(srlist->srs.next);
		fd_list_unlink(&sr->chain);
		srlist->cnt--;
		fd_list_unlink(&sr->expire);
		if (fd_msg_is_routable(sr->req)) {
			struct msg_hdr * hdr = NULL;
			int ret;
			
			/* Set the 'T' flag */
			CHECK_FCT_DO(fd_msg_hdr(sr->req, &hdr), /* continue */);
			if (hdr)
				hdr->msg_flags |= CMD_FLAG_RETRANSMIT;
			
			/* Restore the original hop-by-hop id of the request */
			*((uint32_t *)sr->chain.o) = sr->prevhbh;
			
			fd_hook_call(HOOK_MESSAGE_FAILOVER, sr->req, (struct fd_peer *)srlist->srs.o, NULL, fd_msg_pmdl_get(sr->req));
			
			/* Requeue for sending to another peer */
			CHECK_FCT_DO( ret = fd_fifo_post_noblock(fd_g_outgoing, (void *)&sr->req),
				{
					char buf[256];
					snprintf(buf, sizeof(buf), "Internal error: error while requeuing during failover: %s", strerror(ret));
					fd_hook_call(HOOK_MESSAGE_DROPPED, sr->req, NULL, buf, fd_msg_pmdl_get(sr->req));
					CHECK_FCT_DO(fd_msg_free(sr->req), /* What can we do more? */)
				});
		} else {
			/* Just free the request. */
			/* fd_hook_call(HOOK_MESSAGE_DROPPED, sr->req, NULL, "Sent & unanswered local message discarded during failover.", fd_msg_pmdl_get(sr->req)); */
			CHECK_FCT_DO(fd_msg_free(sr->req), /* Ignore */);
		}
		free(sr);
	}
	/* The list of expiring requests must be empty now */
	ASSERT( FD_IS_LIST_EMPTY(&srlist->exp) );
	ASSERT( srlist->cnt == 0 ); /* debug the counter management if needed */
	
	CHECK_POSIX_DO( pthread_mutex_unlock(&srlist->mtx), /* continue anyway */ );
	
	/* Terminate the expiry thread (must be done when the lock can be taken) */
	CHECK_FCT_DO( fd_thr_term(&srlist->thr), /* ignore error */ );
}

"Welcome to our mercurial repository"