diff freeDiameter/peers.c @ 13:ef9ef3bf4752

Progress on peer state machine
author Sebastien Decugis <sdecugis@nict.go.jp>
date Wed, 30 Sep 2009 18:25:46 +0900
parents 418d2ce80dc8
children 14cf6daf716d
line wrap: on
line diff
--- a/freeDiameter/peers.c	Mon Sep 28 17:29:25 2009 +0900
+++ b/freeDiameter/peers.c	Wed Sep 30 18:25:46 2009 +0900
@@ -50,35 +50,6 @@
 struct fd_list   fd_g_peers;
 pthread_rwlock_t fd_g_peers_rw;
 
-static int started = 0;
-static pthread_mutex_t  started_mtx = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t   started_cnd = PTHREAD_COND_INITIALIZER;
-
-/* Wait for start signal */
-int fd_peer_waitstart()
-{
-	CHECK_POSIX( pthread_mutex_lock(&started_mtx) );
-awake:	
-	if (! started) {
-		pthread_cleanup_push( fd_cleanup_mutex, &started_mtx );
-		CHECK_POSIX( pthread_cond_wait(&started_cnd, &started_mtx) );
-		pthread_cleanup_pop( 0 );
-		goto awake;
-	}
-	CHECK_POSIX( pthread_mutex_unlock(&started_mtx) );
-	return 0;
-}
-
-/* Allow the state machines to start */
-int fd_peer_start()
-{
-	CHECK_POSIX( pthread_mutex_lock(&started_mtx) );
-	started = 1;
-	CHECK_POSIX( pthread_cond_broadcast(&started_cnd) );
-	CHECK_POSIX( pthread_mutex_unlock(&started_mtx) );
-	return 0;
-}
-
 /* Initialize the peers list */
 int fd_peer_init()
 {
@@ -87,6 +58,20 @@
 	fd_list_init(&fd_g_peers, NULL);
 	CHECK_POSIX( pthread_rwlock_init(&fd_g_peers_rw, NULL) );
 	
+	CHECK_FCT(fd_p_expi_init());
+	
+	return 0;
+}
+
+/* Terminate peer module (destroy all peers) */
+int fd_peer_fini()
+{
+	TRACE_ENTRY();
+	
+	CHECK_FCT_DO(fd_p_expi_fini(), /* continue */);
+	
+	TODO("Complete this function")
+	
 	return 0;
 }
 
@@ -118,8 +103,235 @@
 	CHECK_FCT_DO( pthread_rwlock_unlock(&fd_g_peers_rw), /* continue */ );
 }
 
+/* Alloc / reinit a peer structure. if *ptr is not NULL, it must already point to a valid struct fd_peer. */
+static int fd_sp_reinit(struct fd_peer ** ptr)
+{
+	struct fd_peer *p;
+	
+	TRACE_ENTRY("%p", ptr);
+	CHECK_PARAMS(ptr);
+	
+	if (*ptr) {
+		p = *ptr;
+	} else {
+		CHECK_MALLOC( p = malloc(sizeof(struct fd_peer)) );
+		*ptr = p;
+	}
+	
+	/* Now initialize the content */
+	memset(p, 0, sizeof(struct fd_peer));
+	
+	fd_list_init(&p->p_hdr.chain, p);
+	
+	fd_list_init(&p->p_hdr.info.pi_endpoints, NULL);
+	p->p_hdr.info.pi_state = STATE_DISABLED;
+	fd_list_init(&p->p_hdr.info.pi_apps, NULL);
+	
+	p->p_eyec = EYEC_PEER;
+	CHECK_POSIX( pthread_mutex_init(&p->p_mtx, NULL) );
+	fd_list_init(&p->p_expiry, p);
+	fd_list_init(&p->p_actives, p);
+	p->p_hbh = lrand48();
+	CHECK_FCT( fd_fifo_new(&p->p_events) );
+	CHECK_FCT( fd_fifo_new(&p->p_recv) );
+	CHECK_FCT( fd_fifo_new(&p->p_tosend) );
+	fd_list_init(&p->p_sentreq, p);
+	
+	return 0;
+}
+
+#define free_null( _v ) 	\
+	if (_v) {		\
+		free(_v);	\
+		(_v) = NULL;	\
+	}
+	
+#define free_list( _l ) 						\
+	while (!FD_IS_LIST_EMPTY(_l)) {					\
+		struct fd_list * __li = ((struct fd_list *)(_l))->next;	\
+		fd_list_unlink(__li);					\
+		free(__li);						\
+	}
+
+/* Destroy a structure once all cleanups have been performed */
+static int fd_sp_destroy(struct fd_peer ** ptr)
+{
+	struct fd_peer *p;
+	void * t;
+	
+	TRACE_ENTRY("%p", ptr);
+	CHECK_PARAMS(ptr);
+	p = *ptr;
+	*ptr = NULL;
+	CHECK_PARAMS(p);
+	
+	CHECK_PARAMS( (p->p_refcount == 0) && FD_IS_LIST_EMPTY(&p->p_hdr.chain) );
+	
+	free_null(p->p_hdr.info.pi_diamid); 
+	free_null(p->p_hdr.info.pi_realm); 
+	free_list( &p->p_hdr.info.pi_endpoints );
+	/* Assume the security data is already freed */
+	free_null(p->p_hdr.info.pi_prodname);
+	free_list( &p->p_hdr.info.pi_apps );
+	
+	free_null(p->p_dbgorig);
+	CHECK_POSIX( pthread_mutex_destroy(&p->p_mtx) );
+	ASSERT(FD_IS_LIST_EMPTY(&p->p_expiry));
+	ASSERT(FD_IS_LIST_EMPTY(&p->p_actives));
+	
+	CHECK_FCT( fd_thr_term(&p->p_psm) );
+	while ( fd_fifo_tryget(p->p_events, &t) == 0 ) {
+		struct fd_event * ev = t;
+		TRACE_DEBUG(FULL, "Found event %d(%p) in queue of peer %p being destroyed", ev->code, ev->data, p);
+		free(ev);
+	}
+	CHECK_FCT( fd_fifo_del(&p->p_events) );
+	
+	CHECK_FCT( fd_thr_term(&p->p_inthr) );
+	while ( fd_fifo_tryget(p->p_recv, &t) == 0 ) {
+		struct msg * m = t;
+		TRACE_DEBUG(FULL, "Found message %p in incoming queue of peer %p being destroyed", m, p);
+		/* We simply destroy, the remote peer will re-send to someone else...*/
+		CHECK_FCT(fd_msg_free(m));
+	}
+	CHECK_FCT( fd_fifo_del(&p->p_recv) );
+	
+	CHECK_FCT( fd_thr_term(&p->p_outthr) );
+	while ( fd_fifo_tryget(p->p_tosend, &t) == 0 ) {
+		struct msg * m = t;
+		TRACE_DEBUG(FULL, "Found message %p in outgoing queue of peer %p being destroyed, requeue", m, p);
+		/* We simply requeue in global, the routing thread will re-handle it. */
+		
+	}
+	CHECK_FCT( fd_fifo_del(&p->p_tosend) );
+	
+	while (!FD_IS_LIST_EMPTY(&p->p_sentreq)) {
+		struct sentreq * sr = (struct sentreq *)(p->p_sentreq.next);
+		fd_list_unlink(&sr->chain);
+		TRACE_DEBUG(FULL, "Found message %p in list of sent requests to peer %p being destroyed, requeue (fallback)", sr->req, p);
+		CHECK_FCT(fd_fifo_post(fd_g_outgoing, &sr->req));
+		free(sr);
+	}
+	
+	TRACE_DEBUG(NONE, "TODO: destroy p->p_cnxctx here");
+	
+	if (p->p_cb)
+		(*p->p_cb)(NULL, p->p_cb_data);
+	
+	free(p);
+	
+	return 0;
+}
+
+/* Decrement refcount, delete if 0 */
+int fd_peer_rc_decr(struct fd_peer **ptr, int locked)
+{
+	int count;
+	struct fd_peer *p;
+	TRACE_ENTRY("%p %d", p, locked);
+	
+	CHECK_PARAMS(ptr && CHECK_PEER( *ptr ));
+	p = *ptr;
+	
+	if (!locked) {
+		CHECK_POSIX( pthread_rwlock_rdlock(&fd_g_peers_rw) );
+		CHECK_POSIX( pthread_mutex_lock( &p->p_mtx ) );
+		CHECK_POSIX( pthread_rwlock_unlock(&fd_g_peers_rw) );
+	}
+	
+	count = --(p->p_refcount);
+	
+	if (!locked) {
+		CHECK_POSIX( pthread_mutex_unlock( &p->p_mtx ) );
+	}
+	
+	if (count <= 0) {
+		/* All links have already been removed, we can destroy */
+		CHECK_FCT( fd_sp_destroy(ptr) );
+	}
+	return 0;
+}
+
 /* Add a new peer entry */
 int fd_peer_add ( struct peer_info * info, char * orig_dbg, void (*cb)(struct peer_info *, void *), void * cb_data )
 {
-	return ENOTSUP;
+	struct fd_peer *p = NULL;
+	struct fd_list * li;
+	int ret = 0;
+	TRACE_ENTRY("%p %p %p %p", info, orig_dbg, cb, cb_data);
+	CHECK_PARAMS(info && info->pi_diamid);
+	
+	/* Create a structure to contain the new peer information */
+	CHECK_FCT( fd_sp_reinit(&p) );
+	
+	/* Copy the informations from the parameters received */
+	CHECK_MALLOC( p->p_hdr.info.pi_diamid = strdup(info->pi_diamid) );
+	if (info->pi_realm) {
+		CHECK_MALLOC( p->p_hdr.info.pi_realm = strdup(info->pi_realm) );
+	}
+	
+	p->p_hdr.info.pi_flags.pro3 = info->pi_flags.pro3;
+	p->p_hdr.info.pi_flags.pro4 = info->pi_flags.pro4;
+	p->p_hdr.info.pi_flags.alg  = info->pi_flags.alg;
+	p->p_hdr.info.pi_flags.sec  = info->pi_flags.sec;
+	p->p_hdr.info.pi_flags.exp  = info->pi_flags.exp;
+	
+	p->p_hdr.info.pi_lft     = info->pi_lft;
+	p->p_hdr.info.pi_streams = info->pi_streams;
+	p->p_hdr.info.pi_port    = info->pi_port;
+	p->p_hdr.info.pi_tctimer = info->pi_tctimer;
+	p->p_hdr.info.pi_twtimer = info->pi_twtimer;
+	
+	/* Move the items from one list to the other */
+	while (!FD_IS_LIST_EMPTY( &info->pi_endpoints ) ) {
+		li = info->pi_endpoints.next;
+		fd_list_unlink(li);
+		fd_list_insert_before(&p->p_hdr.info.pi_endpoints, li);
+	}
+	
+	p->p_hdr.info.pi_sec_module = info->pi_sec_module;
+	memcpy(&p->p_hdr.info.pi_sec_data, &info->pi_sec_data, sizeof(info->pi_sec_data));
+	
+	/* The internal data */
+	if (orig_dbg) {
+		CHECK_MALLOC( p->p_dbgorig = strdup(orig_dbg) );
+	} else {
+		CHECK_MALLOC( p->p_dbgorig = strdup("unknown") );
+	}
+	p->p_cb = cb;
+	p->p_cb_data = cb_data;
+	
+	/* Ok, now check if we don't already have an entry with the same Diameter Id, and insert this one */
+	CHECK_POSIX( pthread_rwlock_wrlock(&fd_g_peers_rw) );
+	CHECK_POSIX( pthread_mutex_lock( &p->p_mtx ) );
+	
+	for (li = fd_g_peers.next; li != &fd_g_peers; li = li->next) {
+		struct fd_peer * prev = (struct fd_peer *)li;
+		int cmp = strcasecmp( p->p_hdr.info.pi_diamid, prev->p_hdr.info.pi_diamid );
+		if (cmp < 0)
+			continue;
+		if (cmp == 0)
+			ret = EEXIST;
+		break;
+	}
+	
+	/* We can insert the new peer object */
+	if (! ret) {
+		/* Update expiry list */
+		CHECK_FCT_DO( ret = fd_p_expi_update( p, 1 ), goto out );
+		
+		/* Insert the new element in the list */
+		fd_list_insert_before( li, &p->p_hdr.chain );
+		p->p_refcount++;
+	}
+
+out:	
+	CHECK_POSIX( pthread_mutex_unlock( &p->p_mtx ) );
+	CHECK_POSIX( pthread_rwlock_unlock(&fd_g_peers_rw) );
+	if (ret) {
+		CHECK_FCT( fd_sp_destroy(&p) );
+	} else {
+		CHECK_FCT( fd_psm_start(p) );
+	}
+	return ret;
 }
"Welcome to our mercurial repository"