changeset 14:14cf6daf716d

Some progress on peers module
author Sebastien Decugis <sdecugis@nict.go.jp>
date Thu, 01 Oct 2009 18:24:07 +0900
parents ef9ef3bf4752
children 050f4f6f9f2a
files freeDiameter/extensions.c freeDiameter/fD.h freeDiameter/fdd.y freeDiameter/main.c freeDiameter/p_expiry.c freeDiameter/p_psm.c freeDiameter/peers.c include/freeDiameter/freeDiameter.h include/freeDiameter/libfreeDiameter.h libfreeDiameter/dispatch.c libfreeDiameter/fifo.c libfreeDiameter/init.c libfreeDiameter/libfD.h libfreeDiameter/lists.c libfreeDiameter/sessions.c
diffstat 15 files changed, 426 insertions(+), 217 deletions(-) [+]
line wrap: on
line diff
--- a/freeDiameter/extensions.c	Wed Sep 30 18:25:46 2009 +0900
+++ b/freeDiameter/extensions.c	Thu Oct 01 18:24:07 2009 +0900
@@ -49,15 +49,7 @@
 };
 
 /* list of extensions */
-static struct fd_list ext_list;
-
-/* Initialize the module */
-int fd_ext_init()
-{
-	TRACE_ENTRY();
-	fd_list_init(&ext_list, NULL);
-	return 0;
-}
+static struct fd_list ext_list = FD_LIST_INITIALIZER(ext_list);
 
 /* Add new extension */
 int fd_ext_add( char * filename, char * conffile )
--- a/freeDiameter/fD.h	Wed Sep 30 18:25:46 2009 +0900
+++ b/freeDiameter/fD.h	Thu Oct 01 18:24:07 2009 +0900
@@ -41,6 +41,26 @@
 #include <freeDiameter/freeDiameter-host.h>
 #include <freeDiameter/freeDiameter.h>
 
+/* Timeout for establishing a connection */
+#ifndef CNX_TIMEOUT
+#define  CNX_TIMEOUT	10	/* in seconds */
+#endif /* CNX_TIMEOUT */
+
+/* Timeout for receiving a CER after incoming connection is established */
+#ifndef INCNX_TIMEOUT
+#define  INCNX_TIMEOUT	 20	/* in seconds */
+#endif /* INCNX_TIMEOUT */
+
+/* Timeout for receiving a CEA after CER is sent */
+#ifndef CEA_TIMEOUT
+#define  CEA_TIMEOUT	10	/* in seconds */
+#endif /* CEA_TIMEOUT */
+
+/* The timeout value to wait for answer to a DPR */
+#ifndef DPR_TIMEOUT
+#define DPR_TIMEOUT 	15	/* in seconds */
+#endif /* DPR_TIMEOUT */
+
 /* Configuration */
 int fd_conf_init();
 void fd_conf_dump();
@@ -48,7 +68,6 @@
 int fddparse(struct fd_config * conf); /* yacc generated */
 
 /* Extensions */
-int fd_ext_init();
 int fd_ext_add( char * filename, char * conffile );
 int fd_ext_load();
 void fd_ext_dump(void);
@@ -81,18 +100,10 @@
 	/* Origin of this peer object, for debug */
 	char		*p_dbgorig;
 	
-	/* Mutex that protect this peer structure */
-	pthread_mutex_t	 p_mtx;
-	
-	/* Reference counter -- freed only when this reaches 0 */
-	unsigned	 p_refcount;
-	
 	/* Chaining in peers sublists */
+	struct fd_list	 p_actives;	/* list of peers in the STATE_OPEN state -- faster routing creation */
 	struct fd_list	 p_expiry; 	/* list of expiring peers, ordered by their timeout value */
-	struct fd_list	 p_actives;	/* list of peers in the STATE_OPEN state -- faster routing creation */
-	
-	/* The next hop-by-hop id value for the link */
-	uint32_t	 p_hbh;
+	struct timespec	 p_exp_timer;	/* Timestamp where the peer will expire; updated each time activity is seen on the peer (except DW) */
 	
 	/* Some flags influencing the peer state machine */
 	struct {
@@ -108,7 +119,7 @@
 	}		 p_flags;
 	
 	/* The events queue, peer state machine thread, timer for states timeouts */
-	struct fifo	*p_events;
+	struct fifo	*p_events;	/* The mutex of this FIFO list protects also the state and timer information */
 	pthread_t	 p_psm;
 	struct timespec	 p_psm_timer;
 	
@@ -120,10 +131,13 @@
 	struct fifo	*p_tosend;
 	pthread_t	 p_outthr;
 	
+	/* The next hop-by-hop id value for the link, only read & modified by p_outthr */
+	uint32_t	 p_hbh;
+	
 	/* Sent requests (for fallback), list of struct sentreq ordered by hbh */
 	struct fd_list	 p_sentreq;
 	
-	/* connection context: socket & other metadata */
+	/* connection context: socket, callbacks and so on */
 	struct cnxctx	*p_cnxctx;
 	
 	/* Callback on initial connection success / failure */
@@ -144,7 +158,11 @@
 	
 	/* A message was received in the peer */
 	,FDEVP_MSG_INCOMING
+	
+	/* The PSM state is expired */
+	,FDEVP_PSM_TIMEOUT
 };
+const char * fd_pev_str(int event);
 
 /* Structure to store a sent request */
 struct sentreq {
@@ -153,17 +171,14 @@
 };
 
 /* Functions */
-int fd_peer_init();
 int fd_peer_fini();
 void fd_peer_dump_list(int details);
 /* fd_peer_add declared in freeDiameter.h */
-int fd_peer_rc_decr(struct fd_peer **ptr, int locked);
 
 /* Peer expiry */
 int fd_p_expi_init(void);
 int fd_p_expi_fini(void);
-int fd_p_expi_update(struct fd_peer * peer, int locked );
-int fd_p_expi_unlink(struct fd_peer * peer, int locked );
+int fd_p_expi_update(struct fd_peer * peer );
 
 /* Peer state machine */
 int fd_psm_start();
--- a/freeDiameter/fdd.y	Wed Sep 30 18:25:46 2009 +0900
+++ b/freeDiameter/fdd.y	Thu Oct 01 18:24:07 2009 +0900
@@ -314,6 +314,8 @@
 			
 connpeer:		{
 				memset(&fddpi, 0, sizeof(fddpi));
+				fd_list_init( &fddpi.pi_endpoints, NULL );
+				fd_list_init( &fddpi.pi_apps, NULL );
 			}
 			CONNPEER '=' QSTRING peerinfo ';'
 			{
--- a/freeDiameter/main.c	Wed Sep 30 18:25:46 2009 +0900
+++ b/freeDiameter/main.c	Thu Oct 01 18:24:07 2009 +0900
@@ -41,6 +41,8 @@
 /* forward declarations */
 static void * sig_hdl(void * arg);
 static int main_cmdline(int argc, char *argv[]);
+static void main_version(void);
+static void main_help( void );
 
 /* The static configuration structure */
 static struct fd_config conf;
@@ -76,10 +78,9 @@
 	CHECK_FCT( fd_dict_base_protocol(fd_g_config->cnf_dict) );
 	
 	/* Initialize other modules */
-	CHECK_FCT(  fd_ext_init()  );
 	CHECK_FCT(  fd_queues_init()  );
 	CHECK_FCT(  fd_msg_init()  );
-	CHECK_FCT(  fd_peer_init()  );
+	CHECK_FCT(  fd_p_expi_init()  );
 	
 	/* Parse the configuration file */
 	CHECK_FCT( fd_conf_parse() );
@@ -132,58 +133,33 @@
 	TRACE_DEBUG(INFO, FD_PROJECT_BINARY " daemon is stopping...");
 	
 	/* cleanups */
-	CHECK_FCT_DO( fd_ext_fini(), /* continue */ );
+	TODO("Stop dispatch thread(s) properly (no cancel yet)");
+	CHECK_FCT_DO( fd_peer_fini(), /* Stop all connections */ );
+	TODO("Stop dispatch & routing threads");
+	CHECK_FCT_DO( fd_ext_fini(), /* Cleaup all extensions */ );
+	TODO("Cleanup queues (dump all remaining messages ?)");
+	
 	CHECK_FCT_DO( fd_thr_term(&sig_th), /* continue */ );
 	
 	return ret;
 }
 
-/* Display package version */
-static void main_version_core(void)
+const char * fd_ev_str(int event)
 {
-	printf("%s, version %d.%d.%d"
-#ifdef HG_VERSION
-		" (r%s"
-# ifdef PACKAGE_HG_REVISION
-		"/%s"
-# endif /* PACKAGE_HG_VERSION */
-		")"
-#endif /* HG_VERSION */
-		"\n", 
-		FD_PROJECT_NAME, FD_PROJECT_VERSION_MAJOR, FD_PROJECT_VERSION_MINOR, FD_PROJECT_VERSION_REV
-#ifdef HG_VERSION
-		, HG_VERSION
-# ifdef PACKAGE_HG_REVISION
-		, PACKAGE_HG_REVISION
-# endif /* PACKAGE_HG_VERSION */
-#endif /* HG_VERSION */
-		);
-}
-
-/* Display package version and general info */
-static void main_version(void)
-{
-	main_version_core();
-	printf( "%s\n", FD_PROJECT_COPYRIGHT);
-	printf( "\nSee " FD_PROJECT_NAME " homepage at http://aaa.koganei.wide.ad.jp/\n"
-		" for information, updates and bug reports on this software.\n");
-}
-
-/* Print command-line options */
-static void main_help( void )
-{
-	main_version_core();
-	printf(	"  This daemon is an implementation of the Diameter protocol\n"
-		"  used for Authentication, Authorization, and Accounting (AAA).\n");
-	printf("\nUsage:  " FD_PROJECT_BINARY " [OPTIONS]...\n");
-	printf( "  -h, --help             Print help and exit\n"
-  		"  -V, --version          Print version and exit\n"
-  		"  -c, --config=filename  Read configuration from this file instead of the \n"
-		"                           default location (%s).\n", DEFAULT_CONF_FILE);
- 	printf( "\nDebug:\n"
-  		"  These options are mostly useful for developers\n"
-  		"  -d, --debug            Increase verbosity of debug messages\n"
-  		"  -q, --quiet            Decrease verbosity then remove debug messages\n");
+	switch (event) {
+	#define case_str( _val )\
+		case _val : return #_val
+		case_str(FDEV_TERMINATE);
+		case_str(FDEV_DUMP_DICT);
+		case_str(FDEV_DUMP_EXT);
+		case_str(FDEV_DUMP_QUEUES);
+		case_str(FDEV_DUMP_CONFIG);
+		case_str(FDEV_DUMP_PEERS);
+		
+		default:
+			TRACE_DEBUG(FULL, "Unknown event : %d", event);
+			return "Unknown event";
+	}
 }
 
 /* Parse the command-line */
@@ -244,7 +220,54 @@
 	}
 		
 	return 0;
-	
+}
+
+/* Display package version */
+static void main_version_core(void)
+{
+	printf("%s, version %d.%d.%d"
+#ifdef HG_VERSION
+		" (r%s"
+# ifdef PACKAGE_HG_REVISION
+		"/%s"
+# endif /* PACKAGE_HG_VERSION */
+		")"
+#endif /* HG_VERSION */
+		"\n", 
+		FD_PROJECT_NAME, FD_PROJECT_VERSION_MAJOR, FD_PROJECT_VERSION_MINOR, FD_PROJECT_VERSION_REV
+#ifdef HG_VERSION
+		, HG_VERSION
+# ifdef PACKAGE_HG_REVISION
+		, PACKAGE_HG_REVISION
+# endif /* PACKAGE_HG_VERSION */
+#endif /* HG_VERSION */
+		);
+}
+
+/* Display package version and general info */
+static void main_version(void)
+{
+	main_version_core();
+	printf( "%s\n", FD_PROJECT_COPYRIGHT);
+	printf( "\nSee " FD_PROJECT_NAME " homepage at http://aaa.koganei.wide.ad.jp/\n"
+		" for information, updates and bug reports on this software.\n");
+}
+
+/* Print command-line options */
+static void main_help( void )
+{
+	main_version_core();
+	printf(	"  This daemon is an implementation of the Diameter protocol\n"
+		"  used for Authentication, Authorization, and Accounting (AAA).\n");
+	printf("\nUsage:  " FD_PROJECT_BINARY " [OPTIONS]...\n");
+	printf( "  -h, --help             Print help and exit\n"
+  		"  -V, --version          Print version and exit\n"
+  		"  -c, --config=filename  Read configuration from this file instead of the \n"
+		"                           default location (%s).\n", DEFAULT_CONF_FILE);
+ 	printf( "\nDebug:\n"
+  		"  These options are mostly useful for developers\n"
+  		"  -d, --debug            Increase verbosity of debug messages\n"
+  		"  -q, --quiet            Decrease verbosity then remove debug messages\n");
 }
 
 #ifdef HAVE_SIGNALENT_H
--- a/freeDiameter/p_expiry.c	Wed Sep 30 18:25:46 2009 +0900
+++ b/freeDiameter/p_expiry.c	Thu Oct 01 18:24:07 2009 +0900
@@ -35,44 +35,120 @@
 
 #include "fD.h"
 
+static pthread_t       exp_thr;
+static struct fd_list  exp_list = FD_LIST_INITIALIZER( exp_list );
+static pthread_cond_t  exp_cnd  = PTHREAD_COND_INITIALIZER;
+static pthread_mutex_t exp_mtx  = PTHREAD_MUTEX_INITIALIZER;
 
+static void * exp_th_fct(void * arg)
+{
+	fd_log_threadname ( "Peers/expire" );
+	TRACE_ENTRY( "" );
+	
+	CHECK_POSIX_DO( pthread_mutex_lock(&exp_mtx),  goto error );
+	pthread_cleanup_push( fd_cleanup_mutex, &exp_mtx );
+	
+	do {
+		struct timespec	now;
+		struct fd_peer * first;
+		
+		/* Check if there are expiring sessions available */
+		if (FD_IS_LIST_EMPTY(&exp_list)) {
+			/* Just wait for a change or cancelation */
+			CHECK_POSIX_DO( pthread_cond_wait( &exp_cnd, &exp_mtx ), goto error );
+			/* Restart the loop on wakeup */
+			continue;
+		}
+		
+		/* Get the pointer to the peer that expires first */
+		first = (struct fd_peer *)(exp_list.next->o);
+		ASSERT( CHECK_PEER(first) );
+		
+		/* Get the current time */
+		CHECK_SYS_DO(  clock_gettime(CLOCK_REALTIME, &now),  goto error  );
 
+		/* If first peer is not expired, we just wait until it happens */
+		if ( TS_IS_INFERIOR( &now, &first->p_exp_timer ) ) {
+			
+			CHECK_POSIX_DO2(  pthread_cond_timedwait( &exp_cnd, &exp_mtx, &first->p_exp_timer ),  
+					ETIMEDOUT, /* ETIMEDOUT is a normal error, continue */,
+					/* on other error, */ goto error );
+	
+			/* on wakeup, loop */
+			continue;
+		}
+		
+		/* Now, the first peer in the list is expired; signal it */
+		fd_list_unlink( &first->p_expiry );
+		CHECK_FCT_DO( fd_event_send(first->p_events, FDEVP_TERMINATE, NULL), goto error );
+		
+	} while (1);
+	
+	pthread_cleanup_pop( 1 );
+error:
+	TRACE_DEBUG(INFO, "An error occurred in peers module! Expiry thread is terminating...");
+	ASSERT(0);
+	CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, NULL), );
+	return NULL;
+}
 
 /* Initialize peers expiry mechanism */
 int fd_p_expi_init(void)
 {
-	TODO("");
-	return ENOTSUP;
+	TRACE_ENTRY();
+	CHECK_FCT( pthread_create( &exp_thr, NULL, exp_th_fct, NULL ) );
+	return 0;
 }
 
 /* Finish peers expiry mechanism */
 int fd_p_expi_fini(void)
 {
-	TODO("");
-	return ENOTSUP;
+	CHECK_FCT_DO( fd_thr_term(&exp_thr), );
+	CHECK_POSIX( pthread_mutex_lock(&exp_mtx) );
+	
+	while (!FD_IS_LIST_EMPTY(&exp_list)) {
+		struct fd_peer * peer = (struct fd_peer *)(exp_list.next->o);
+		fd_list_unlink(&peer->p_expiry );
+	}
+	
+	CHECK_POSIX( pthread_mutex_unlock(&exp_mtx) );
+	return 0;
 }
 
-/* Add a peer in the expiry list if needed */
-int fd_p_expi_update(struct fd_peer * peer, int locked )
+/* Add / requeue a peer in the expiry list */
+int fd_p_expi_update(struct fd_peer * peer )
 {
-	TODO("");
+	TRACE_ENTRY("%p", peer);
+	CHECK_PARAMS( CHECK_PEER(peer) );
+	
+	CHECK_POSIX( pthread_mutex_lock(&exp_mtx) );
+	
+	fd_list_unlink(&peer->p_expiry );
 	
 	/* if peer expires */
-		/* add to the expiry list in appropriate position */
-		/* increment peer refcount */
+	if (peer->p_hdr.info.pi_flags.exp) {
+		struct fd_list * li;
+		
+		/* update the p_exp_timer value */
+		CHECK_SYS(  clock_gettime(CLOCK_REALTIME, &peer->p_exp_timer)  );
+		peer->p_exp_timer.tv_sec += peer->p_hdr.info.pi_lft;
+		
+		/* add to the expiry list in appropriate position (probably around the end) */
+		for (li = exp_list.prev; li != &exp_list; li = li->prev) {
+			struct fd_peer * p = (struct fd_peer *)(li->o);
+			if (TS_IS_INFERIOR( &p->p_exp_timer, &peer->p_exp_timer ) )
+				break;
+		}
+		
+		fd_list_insert_after(li, &peer->p_expiry);
+		
 		/* signal the expiry thread if we added in first position */
+		if (li == &exp_list) {
+			CHECK_POSIX( pthread_cond_signal(&exp_cnd) );
+		}
+	}
 	
-	return ENOTSUP;
+	CHECK_POSIX( pthread_mutex_unlock(&exp_mtx) );
+	return 0;
 }
 
-/* Remove a peer from expiry list if needed */
-int fd_p_expi_unlink(struct fd_peer * peer, int locked )
-{
-	TODO("");
-	/* if peer is in expiry list */
-		/* remove from the list */
-		/* decrement peer refcount */
-		/* no need to signal the expiry thread ... */
-	
-	return ENOTSUP;
-}
--- a/freeDiameter/p_psm.c	Wed Sep 30 18:25:46 2009 +0900
+++ b/freeDiameter/p_psm.c	Thu Oct 01 18:24:07 2009 +0900
@@ -35,6 +35,35 @@
 
 #include "fD.h"
 
+const char *peer_state_str[] = { 
+	  "STATE_ZOMBIE"
+	, "STATE_OPEN"
+	, "STATE_CLOSED"
+	, "STATE_CLOSING"
+	, "STATE_WAITCNXACK"
+	, "STATE_WAITCNXACK_ELEC"
+	, "STATE_WAITCEA"
+	, "STATE_SUSPECT"
+	, "STATE_REOPEN"
+	};
+
+const char * fd_pev_str(int event)
+{
+	switch (event) {
+	#define case_str( _val )\
+		case _val : return #_val
+		case_str(FDEVP_TERMINATE);
+		case_str(FDEVP_DUMP_ALL);
+		case_str(FDEVP_MSG_INCOMING);
+		case_str(FDEVP_PSM_TIMEOUT);
+		
+		default:
+			TRACE_DEBUG(FULL, "Unknown event : %d", event);
+			return "Unknown event";
+	}
+}
+
+
 static int started = 0;
 static pthread_mutex_t  started_mtx = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t   started_cnd = PTHREAD_COND_INITIALIZER;
@@ -55,16 +84,121 @@
 	return 0;
 }
 
-/* Allow the state machines to start */
-int fd_psm_start()
+/* Cancelation cleanup : set ZOMBIE state in the peer */
+void cleanup_state(void * arg) 
+{
+	struct fd_peer * peer = (struct fd_peer *)arg;
+	CHECK_PARAMS_DO( CHECK_PEER(peer), return );
+	peer->p_hdr.info.pi_state = STATE_ZOMBIE;
+	return;
+}
+
+/* Set timeout timer of next event */
+static void psm_next_timeout(struct fd_peer * peer, int add_random, int delay)
+{
+	/* Initialize the timer */
+	CHECK_POSIX_DO(  clock_gettime( CLOCK_REALTIME,  &peer->p_psm_timer ), ASSERT(0) );
+	
+	if (add_random) {
+		if (delay > 2)
+			delay -= 2;
+		else
+			delay = 0;
+
+		/* Add a random value between 0 and 4sec */
+		peer->p_psm_timer.tv_sec += random() % 4;
+		peer->p_psm_timer.tv_nsec+= random() % 1000000000L;
+		if (peer->p_psm_timer.tv_nsec > 1000000000L) {
+			peer->p_psm_timer.tv_nsec -= 1000000000L;
+			peer->p_psm_timer.tv_sec ++;
+		}
+	}
+	
+	peer->p_psm_timer.tv_sec += delay;
+	
+#if 0
+	/* temporary for debug */
+	peer->p_psm_timer.tv_sec += 10;
+#endif
+}
+
+static int psm_ev_timedget(struct fd_peer * peer, int *code, void ** data)
 {
-	TRACE_ENTRY("");
-	CHECK_POSIX( pthread_mutex_lock(&started_mtx) );
-	started = 1;
-	CHECK_POSIX( pthread_cond_broadcast(&started_cnd) );
-	CHECK_POSIX( pthread_mutex_unlock(&started_mtx) );
+	struct fd_event * ev;
+	int ret = 0;
+	
+	TRACE_ENTRY("%p %p %p", peer, code, data);
+	
+	ret = fd_fifo_timedget(peer->p_events, &ev, &peer->p_psm_timer);
+	if (ret == ETIMEDOUT) {
+		*code = FDEVP_PSM_TIMEOUT;
+		*data = NULL;
+	} else {
+		CHECK_FCT( ret );
+		*code = ev->code;
+		*data = ev->data;
+		free(ev);
+	}
+	
 	return 0;
-}
+}	
+
+/* The state machine thread */
+static void * p_psm_th( void * arg )
+{
+	struct fd_peer * peer = (struct fd_peer *)arg;
+	int created_started = started;
+	
+	CHECK_PARAMS_DO( CHECK_PEER(peer), ASSERT(0) );
+	
+	pthread_cleanup_push( cleanup_state, arg );
+	
+	/* Set the thread name */
+	{
+		char buf[48];
+		sprintf(buf, "PSM/%.*s", sizeof(buf) - 5, peer->p_hdr.info.pi_diamid);
+		fd_log_threadname ( buf );
+	}
+	
+	/* Wait that the PSM are authorized to start in the daemon */
+	CHECK_FCT_DO( fd_psm_waitstart(), goto end );
+	
+	/* The state machine starts in CLOSED state */
+	peer->p_hdr.info.pi_state = STATE_CLOSED;
+	
+	/* Initialize the timer */
+	if (peer->p_flags.pf_responder) {
+		psm_next_timeout(peer, 0, INCNX_TIMEOUT);
+	} else {
+		psm_next_timeout(peer, created_started ? 0 : 1, 0);
+	}
+	
+psm:
+	do {
+		int event;
+		void * ev_data;
+		
+		/* Get next event */
+		CHECK_FCT_DO( psm_ev_timedget(peer, &event, &ev_data), goto end );
+		TRACE_DEBUG(FULL, "'%s'\t<-- '%s'\t(%p)\t'%s'",
+				STATE_STR(peer->p_hdr.info.pi_state),
+				fd_pev_str(event), ev_data,
+				peer->p_hdr.info.pi_diamid);
+		
+		/* Now, the action depends on the current state and the incoming event */
+		
+	
+	} while (1);	
+	
+	
+end:	
+	/* set STATE_ZOMBIE */
+	pthread_cleanup_pop(1);
+	return NULL;
+}	
+	
+	
+
 
 /* Create the PSM thread of one peer structure */
 int fd_psm_begin(struct fd_peer * peer )
@@ -78,15 +212,30 @@
 int fd_psm_terminate(struct fd_peer * peer )
 {
 	TRACE_ENTRY("%p", peer);
-	TODO("");
-	return ENOTSUP;
+	CHECK_PARAMS( CHECK_PEER(peer) );
+	CHECK_FCT( fd_event_send(peer->p_events, FDEVP_TERMINATE, NULL) );
+	return 0;
 }
 
 /* End the PSM violently */
 void fd_psm_abord(struct fd_peer * peer )
 {
 	TRACE_ENTRY("%p", peer);
-	TODO("");
+	TODO("Cancel PSM thread");
+	TODO("Cancel IN thread");
+	TODO("Cancel OUT thread");
+	TODO("Cleanup the connection");
 	return;
 }
 
+/* Allow the state machines to start */
+int fd_psm_start()
+{
+	TRACE_ENTRY("");
+	CHECK_POSIX( pthread_mutex_lock(&started_mtx) );
+	started = 1;
+	CHECK_POSIX( pthread_cond_broadcast(&started_cnd) );
+	CHECK_POSIX( pthread_mutex_unlock(&started_mtx) );
+	return 0;
+}
+
--- a/freeDiameter/peers.c	Wed Sep 30 18:25:46 2009 +0900
+++ b/freeDiameter/peers.c	Thu Oct 01 18:24:07 2009 +0900
@@ -35,42 +35,29 @@
 
 #include "fD.h"
 
-const char *peer_state_str[] = { "<error>"
-	, "STATE_DISABLED"
-	, "STATE_OPEN"
-	, "STATE_CLOSED"
-	, "STATE_CLOSING"
-	, "STATE_WAITCNXACK"
-	, "STATE_WAITCNXACK_ELEC"
-	, "STATE_WAITCEA"
-	, "STATE_SUSPECT"
-	, "STATE_REOPEN"
-	};
-
-struct fd_list   fd_g_peers;
-pthread_rwlock_t fd_g_peers_rw;
-
-/* Initialize the peers list */
-int fd_peer_init()
-{
-	TRACE_ENTRY();
-	
-	fd_list_init(&fd_g_peers, NULL);
-	CHECK_POSIX( pthread_rwlock_init(&fd_g_peers_rw, NULL) );
-	
-	CHECK_FCT(fd_p_expi_init());
-	
-	return 0;
-}
+struct fd_list   fd_g_peers = FD_LIST_INITIALIZER(fd_g_peers);
+pthread_rwlock_t fd_g_peers_rw = PTHREAD_RWLOCK_INITIALIZER;
 
 /* Terminate peer module (destroy all peers) */
 int fd_peer_fini()
 {
+	struct fd_list * li;
 	TRACE_ENTRY();
 	
 	CHECK_FCT_DO(fd_p_expi_fini(), /* continue */);
 	
-	TODO("Complete this function")
+	TRACE_DEBUG(INFO, "Sending signal to terminate to all peer connections");
+	
+	CHECK_FCT_DO( pthread_rwlock_rdlock(&fd_g_peers_rw), /* continue */ );
+	/* For each peer in the list, ... */
+	for (li = fd_g_peers.next; li != &fd_g_peers; li = li->next) {
+		struct fd_peer * np = (struct fd_peer *)li;
+		CHECK_FCT_DO( fd_psm_terminate(np), /* continue */ );
+	}
+	CHECK_FCT_DO( pthread_rwlock_unlock(&fd_g_peers_rw), /* continue */ );
+	
+	TODO("Give some time to all PSM, then destroy remaining threads");
+	/* fd_psm_abord(struct fd_peer * peer ) */
 	
 	return 0;
 }
@@ -124,11 +111,9 @@
 	fd_list_init(&p->p_hdr.chain, p);
 	
 	fd_list_init(&p->p_hdr.info.pi_endpoints, NULL);
-	p->p_hdr.info.pi_state = STATE_DISABLED;
 	fd_list_init(&p->p_hdr.info.pi_apps, NULL);
 	
 	p->p_eyec = EYEC_PEER;
-	CHECK_POSIX( pthread_mutex_init(&p->p_mtx, NULL) );
 	fd_list_init(&p->p_expiry, p);
 	fd_list_init(&p->p_actives, p);
 	p->p_hbh = lrand48();
@@ -165,7 +150,7 @@
 	*ptr = NULL;
 	CHECK_PARAMS(p);
 	
-	CHECK_PARAMS( (p->p_refcount == 0) && FD_IS_LIST_EMPTY(&p->p_hdr.chain) );
+	CHECK_PARAMS( FD_IS_LIST_EMPTY(&p->p_hdr.chain) );
 	
 	free_null(p->p_hdr.info.pi_diamid); 
 	free_null(p->p_hdr.info.pi_realm); 
@@ -175,7 +160,6 @@
 	free_list( &p->p_hdr.info.pi_apps );
 	
 	free_null(p->p_dbgorig);
-	CHECK_POSIX( pthread_mutex_destroy(&p->p_mtx) );
 	ASSERT(FD_IS_LIST_EMPTY(&p->p_expiry));
 	ASSERT(FD_IS_LIST_EMPTY(&p->p_actives));
 	
@@ -213,7 +197,9 @@
 		free(sr);
 	}
 	
-	TRACE_DEBUG(NONE, "TODO: destroy p->p_cnxctx here");
+	if (p->p_cnxctx) {
+		TODO("destroy p->p_cnxctx");
+	}
 	
 	if (p->p_cb)
 		(*p->p_cb)(NULL, p->p_cb_data);
@@ -223,34 +209,6 @@
 	return 0;
 }
 
-/* Decrement refcount, delete if 0 */
-int fd_peer_rc_decr(struct fd_peer **ptr, int locked)
-{
-	int count;
-	struct fd_peer *p;
-	TRACE_ENTRY("%p %d", p, locked);
-	
-	CHECK_PARAMS(ptr && CHECK_PEER( *ptr ));
-	p = *ptr;
-	
-	if (!locked) {
-		CHECK_POSIX( pthread_rwlock_rdlock(&fd_g_peers_rw) );
-		CHECK_POSIX( pthread_mutex_lock( &p->p_mtx ) );
-		CHECK_POSIX( pthread_rwlock_unlock(&fd_g_peers_rw) );
-	}
-	
-	count = --(p->p_refcount);
-	
-	if (!locked) {
-		CHECK_POSIX( pthread_mutex_unlock( &p->p_mtx ) );
-	}
-	
-	if (count <= 0) {
-		/* All links have already been removed, we can destroy */
-		CHECK_FCT( fd_sp_destroy(ptr) );
-	}
-	return 0;
-}
 
 /* Add a new peer entry */
 int fd_peer_add ( struct peer_info * info, char * orig_dbg, void (*cb)(struct peer_info *, void *), void * cb_data )
@@ -283,11 +241,12 @@
 	p->p_hdr.info.pi_twtimer = info->pi_twtimer;
 	
 	/* Move the items from one list to the other */
-	while (!FD_IS_LIST_EMPTY( &info->pi_endpoints ) ) {
-		li = info->pi_endpoints.next;
-		fd_list_unlink(li);
-		fd_list_insert_before(&p->p_hdr.info.pi_endpoints, li);
-	}
+	if (info->pi_endpoints.next)
+		while (!FD_IS_LIST_EMPTY( &info->pi_endpoints ) ) {
+			li = info->pi_endpoints.next;
+			fd_list_unlink(li);
+			fd_list_insert_before(&p->p_hdr.info.pi_endpoints, li);
+		}
 	
 	p->p_hdr.info.pi_sec_module = info->pi_sec_module;
 	memcpy(&p->p_hdr.info.pi_sec_data, &info->pi_sec_data, sizeof(info->pi_sec_data));
@@ -303,7 +262,6 @@
 	
 	/* Ok, now check if we don't already have an entry with the same Diameter Id, and insert this one */
 	CHECK_POSIX( pthread_rwlock_wrlock(&fd_g_peers_rw) );
-	CHECK_POSIX( pthread_mutex_lock( &p->p_mtx ) );
 	
 	for (li = fd_g_peers.next; li != &fd_g_peers; li = li->next) {
 		struct fd_peer * prev = (struct fd_peer *)li;
@@ -318,20 +276,18 @@
 	/* We can insert the new peer object */
 	if (! ret) {
 		/* Update expiry list */
-		CHECK_FCT_DO( ret = fd_p_expi_update( p, 1 ), goto out );
+		CHECK_FCT_DO( ret = fd_p_expi_update( p ), goto out );
 		
 		/* Insert the new element in the list */
 		fd_list_insert_before( li, &p->p_hdr.chain );
-		p->p_refcount++;
 	}
 
 out:	
-	CHECK_POSIX( pthread_mutex_unlock( &p->p_mtx ) );
 	CHECK_POSIX( pthread_rwlock_unlock(&fd_g_peers_rw) );
 	if (ret) {
 		CHECK_FCT( fd_sp_destroy(&p) );
 	} else {
-		CHECK_FCT( fd_psm_start(p) );
+		CHECK_FCT( fd_psm_begin(p) );
 	}
 	return ret;
 }
--- a/include/freeDiameter/freeDiameter.h	Wed Sep 30 18:25:46 2009 +0900
+++ b/include/freeDiameter/freeDiameter.h	Thu Oct 01 18:24:07 2009 +0900
@@ -140,7 +140,7 @@
 	,FDEV_DUMP_CONFIG	/* Dump the configuration */
 	,FDEV_DUMP_PEERS	/* Dump the list of peers */
 };
-
+const char * fd_ev_str(int event);
 
 
 /***************************************/
@@ -150,7 +150,7 @@
 /* States of a peer */
 enum peer_state {
 	/* Stable states */
-	STATE_DISABLED = 1,	/* No connexion must be attempted / only this state means that the peer PSM thread is not running */
+	STATE_ZOMBIE = 0,	/* The threads handling the peer are not running for some reason */
 	STATE_OPEN,		/* Connexion established */
 	
 	/* Peer state machine */
@@ -166,11 +166,11 @@
 	
 	/* Failover state machine */
 	STATE_SUSPECT,		/* A DWR was sent and not answered within TwTime. Failover in progress. */
-	STATE_REOPEN		/* Connection has been re-established, waiting for 3 DWR/DWA exchanges before putting back to service */
+	STATE_REOPEN,		/* Connection has been re-established, waiting for 3 DWR/DWA exchanges before putting back to service */
 };
 extern const char *peer_state_str[];
 #define STATE_STR(state) \
-	peer_state_str[ ((unsigned)(state)) <= STATE_REOPEN ? ((unsigned)(state)) : 0 ]
+	(((unsigned)(state)) <= STATE_REOPEN ? peer_state_str[((unsigned)(state)) ] : "<Invalid>")
 
 /* Information about a remote peer. Same structure is used for creating a new entry, but not all fields are meaningful in that case */
 struct peer_info {
@@ -237,7 +237,7 @@
 		} 	other;
 	} 		pi_sec_data;
 	
-	/* The remaining information is read-only, not used for peer creation */
+	/* The remaining information must not be modified, and is not used for peer creation */
 	enum peer_state	pi_state;
 	uint32_t	pi_vendorid;	/* Content of the Vendor-Id AVP, or 0 by default */
 	uint32_t	pi_orstate;	/* Origin-State-Id value */
--- a/include/freeDiameter/libfreeDiameter.h	Wed Sep 30 18:25:46 2009 +0900
+++ b/include/freeDiameter/libfreeDiameter.h	Thu Oct 01 18:24:07 2009 +0900
@@ -192,7 +192,7 @@
 /* For development only, to keep track of TODO locations in the code */
 #ifndef ERRORS_ON_TODO
 #define TODO( _msg, _args... ) \
-	TRACE_DEBUG(NONE, _msg , ##_args);
+	TRACE_DEBUG(NONE, "TODO: " _msg , ##_args);
 #else /* ERRORS_ON_TODO */
 #define TODO( _msg, _args... ) \
 	"TODO" = _msg ## _args; /* just a stupid compilation error to spot the todo */
@@ -445,13 +445,15 @@
 	void		*o;    /* additional avialbe pointer used for start of the parento object or other purpose */
 };
 
-#define FD_LIST( _li ) ((struct fd_list *)( _li ))
-
 /* Initialize a list element */
+#define FD_LIST_INITIALIZER( _list_name ) \
+	{ .next = & _list_name, .prev = & _list_name, .head = & _list_name, .o = NULL }
+#define FD_LIST_INITIALIZER_O( _list_name, _obj ) \
+	{ .next = & _list_name, .prev = & _list_name, .head = & _list_name, .o = _obj }
 void fd_list_init ( struct fd_list * list, void *obj );
 
 /* Return boolean, true if the list is empty */
-#define FD_IS_LIST_EMPTY( _list ) (((FD_LIST(_list))->head == (_list)) && ((FD_LIST(_list))->next == (_list)))
+#define FD_IS_LIST_EMPTY( _list ) ((((struct fd_list *)(_list))->head == (_list)) && (((struct fd_list *)(_list))->next == (_list)))
 
 /* Insert an item in a list at known position */
 void fd_list_insert_after  ( struct fd_list * ref, struct fd_list * item );
--- a/libfreeDiameter/dispatch.c	Wed Sep 30 18:25:46 2009 +0900
+++ b/libfreeDiameter/dispatch.c	Thu Oct 01 18:24:07 2009 +0900
@@ -43,10 +43,10 @@
 pthread_rwlock_t fd_disp_lock = PTHREAD_RWLOCK_INITIALIZER;
 
 /* List of all registered handlers -- useful if we want to cleanup properly at some point... */
-static struct fd_list all_handlers;
+static struct fd_list all_handlers = FD_LIST_INITIALIZER( all_handlers );
 
 /* List of handlers registered for DISP_HOW_ANY. Other handlers are stored in the dictionary */
-static struct fd_list any_handlers;
+static struct fd_list any_handlers = FD_LIST_INITIALIZER( any_handlers );
 
 /* The structure to store a callback */
 struct disp_hdl {
@@ -63,14 +63,6 @@
 	( ( ( _hdl ) != NULL ) && ( ((struct disp_hdl *)( _hdl ))->eyec == DISP_EYEC ) )
 
 /**************************************************************************************/
-/* Initialize the module lists */
-void fd_disp_init(void)
-{
-	TRACE_ENTRY();
-	fd_list_init(&all_handlers, NULL);
-	fd_list_init(&any_handlers, NULL);
-	/* if PTHREAD_RWLOCK_INITIALIZER is not supported on all platforms, we may initialize the lock here */
-}
 
 /* Call CBs from a given list (any_handlers if cb_list is NULL) -- must have locked fd_disp_lock before */
 int fd_disp_call_cb_int( struct fd_list * cb_list, struct msg ** msg, struct avp *avp, struct session *sess, enum disp_action *action, 
--- a/libfreeDiameter/fifo.c	Wed Sep 30 18:25:46 2009 +0900
+++ b/libfreeDiameter/fifo.c	Thu Oct 01 18:24:07 2009 +0900
@@ -64,6 +64,7 @@
 	void		(*h_cb)(struct fifo *, void **); /* The callbacks */
 	void		(*l_cb)(struct fifo *, void **);
 	int 		highest;/* The highest count value for which h_cb has been called */
+	int		highest_ever; /* The max count value this queue has reached (for tweaking) */
 };
 
 /* The eye catcher value */
@@ -117,10 +118,10 @@
 	
 	CHECK_POSIX_DO(  pthread_mutex_lock( &queue->mtx ), /* continue */  );
 	fd_log_debug("   %d elements in queue / %d threads waiting\n", queue->count, queue->thrs);
-	fd_log_debug("   thresholds: %d / %d, cb: %p / %p (%p), highest: %d\n",
-			queue->high, queue->low,
+	fd_log_debug("   thresholds: %d / %d (h:%d), cb: %p,%p (%p), highest: %d\n",
+			queue->high, queue->low, queue->highest, 
 			queue->h_cb, queue->l_cb, queue->data,
-			queue->highest);
+			queue->highest_ever);
 	
 	if (dump_item) {
 		struct fd_list * li;
@@ -250,6 +251,8 @@
 	/* Add the new item at the end */
 	fd_list_insert_before( &queue->list, new);
 	queue->count++;
+	if (queue->highest_ever < queue->count)
+		queue->highest_ever = queue->count;
 	if (queue->high && ((queue->count % queue->high) == 0)) {
 		call_cb = 1;
 		queue->highest = queue->count;
--- a/libfreeDiameter/init.c	Wed Sep 30 18:25:46 2009 +0900
+++ b/libfreeDiameter/init.c	Thu Oct 01 18:24:07 2009 +0900
@@ -48,7 +48,6 @@
 	
 	/* Initialize the modules that need it */
 	fd_msg_eteid_init();
-	fd_disp_init();
 	CHECK_FCT( fd_sess_init() );
 	
 	return 0;
--- a/libfreeDiameter/libfD.h	Wed Sep 30 18:25:46 2009 +0900
+++ b/libfreeDiameter/libfD.h	Thu Oct 01 18:24:07 2009 +0900
@@ -44,7 +44,6 @@
 /* Internal to the library */
 extern const char * type_base_name[];
 void fd_msg_eteid_init(void);
-void fd_disp_init(void);
 int fd_sess_init(void);
 
 /* Iterator on the rules of a parent object */
--- a/libfreeDiameter/lists.c	Wed Sep 30 18:25:46 2009 +0900
+++ b/libfreeDiameter/lists.c	Thu Oct 01 18:24:07 2009 +0900
@@ -46,9 +46,9 @@
 }
 
 #define CHECK_SINGLE( li ) {			\
-	ASSERT( FD_LIST(li)->next == (li) );	\
-	ASSERT( FD_LIST(li)->prev == (li) );	\
-	ASSERT( FD_LIST(li)->head == (li) );	\
+	ASSERT( ((struct fd_list *)(li))->next == (li) );	\
+	ASSERT( ((struct fd_list *)(li))->prev == (li) );	\
+	ASSERT( ((struct fd_list *)(li))->head == (li) );	\
 }
 
 /* insert after a reference, checks done */
--- a/libfreeDiameter/sessions.c	Wed Sep 30 18:25:46 2009 +0900
+++ b/libfreeDiameter/sessions.c	Thu Oct 01 18:24:07 2009 +0900
@@ -116,7 +116,7 @@
 static pthread_mutex_t 	sid_lock = PTHREAD_MUTEX_INITIALIZER;
 
 /* Expiring sessions management */
-static struct fd_list	exp_sentinel;	/* list of sessions ordered by their timeout date */
+static struct fd_list	exp_sentinel = FD_LIST_INITIALIZER(exp_sentinel);	/* list of sessions ordered by their timeout date */
 static pthread_mutex_t	exp_lock = PTHREAD_MUTEX_INITIALIZER;	/* lock protecting the list. */
 static pthread_cond_t	exp_cond = PTHREAD_COND_INITIALIZER;	/* condvar used by the expiry mecahinsm. */
 static pthread_t	exp_thr; 	/* The expiry thread that handles cleanup of expired sessions */
@@ -230,8 +230,7 @@
 		CHECK_POSIX(  pthread_mutex_init(&sess_hash[i].lock, NULL)  );
 	}
 	
-	/* Initialize expiry management */
-	fd_list_init( &exp_sentinel, NULL );
+	/* Start session garbage collector (expiry) */
 	CHECK_POSIX(  pthread_create(&exp_thr, NULL, exp_fct, NULL)  );
 	
 	return 0;
@@ -265,7 +264,8 @@
 int fd_sess_handler_destroy ( struct session_handler ** handler )
 {
 	struct session_handler * del;
-	struct fd_list deleted_states; /* Save the list of states to be cleaned up. We do it after finding them to avoid deadlocks. the "o" field becomes a copy of the sid. */
+	/* place to save the list of states to be cleaned up. We do it after finding them to avoid deadlocks. the "o" field becomes a copy of the sid. */
+	struct fd_list deleted_states = FD_LIST_INITIALIZER( deleted_states );
 	int i;
 	
 	TRACE_ENTRY("%p", handler);
@@ -273,7 +273,6 @@
 	
 	del = *handler;
 	*handler = NULL;
-	fd_list_init(&deleted_states, NULL);
 	
 	del->eyec = 0xdead; /* The handler is not valid anymore for any other operation */
 	
@@ -412,6 +411,11 @@
 		}
 		fd_list_insert_after( li, &sess->expire );
 
+		/* We added a new expiring element, we must signal */
+		if (li == &exp_sentinel) {
+			CHECK_POSIX( pthread_cond_signal(&exp_cond) );
+		}
+		
 		#if 0
 		if (TRACE_BOOL(ANNOYING)) {	
 			TRACE_DEBUG(FULL, "-- Updated session expiry list --");
@@ -423,9 +427,6 @@
 		}
 		#endif
 		
-		/* We added a new expiring element, we must signal */
-		CHECK_POSIX( pthread_cond_signal(&exp_cond) );
-		
 		/* We're done */
 		CHECK_POSIX( pthread_mutex_unlock( &exp_lock ) );
 	}
@@ -505,8 +506,10 @@
 	}
 	fd_list_insert_before( li, &session->expire );
 
-	/* We added a new expiring element, we must signal */
-	CHECK_POSIX( pthread_cond_signal(&exp_cond) );
+	/* We added a new expiring element, we must signal if it was in first position */
+	if (session->expire.prev == &exp_sentinel) {
+		CHECK_POSIX( pthread_cond_signal(&exp_cond) );
+	}
 
 	#if 0
 	if (TRACE_BOOL(ANNOYING)) {	
@@ -587,8 +590,6 @@
 	return 0;
 }
 
-
-
 /* Save a state information with a session */
 int fd_sess_state_store_internal ( struct session_handler * handler, struct session * session, session_state ** state )
 {
"Welcome to our mercurial repository"