changeset 86:e3e22d89e023

Started routing module
author Sebastien Decugis <sdecugis@nict.go.jp>
date Thu, 03 Dec 2009 17:36:35 +0900
parents e5fcd672caff
children c1c0f8a45c67
files freeDiameter/peers.c freeDiameter/routing.c include/freeDiameter/freeDiameter.h
diffstat 3 files changed, 297 insertions(+), 6 deletions(-) [+]
line wrap: on
line diff
--- a/freeDiameter/peers.c	Thu Dec 03 14:59:23 2009 +0900
+++ b/freeDiameter/peers.c	Thu Dec 03 17:36:35 2009 +0900
@@ -158,6 +158,32 @@
 	return ret;
 }
 
+/* Search for a peer */
+int fd_peer_getbyid( char * diamid, struct peer_hdr ** peer )
+{
+	struct fd_list * li;
+	
+	TRACE_ENTRY("%p %p", diamid, peer);
+	CHECK_PARAMS( diamid && peer );
+	
+	*peer = NULL;
+	
+	/* Search in the list */
+	CHECK_POSIX( pthread_rwlock_rdlock(&fd_g_peers_rw) );
+	for (li = fd_g_peers.next; li != &fd_g_peers; li = li->next) {
+		struct fd_peer * next = (struct fd_peer *)li;
+		int cmp = strcasecmp( diamid, next->p_hdr.info.pi_diamid );
+		if (cmp > 0)
+			continue;
+		if (cmp == 0)
+			*peer = &next->p_hdr;
+		break;
+	}
+	CHECK_POSIX( pthread_rwlock_unlock(&fd_g_peers_rw) );
+	
+	return 0;
+}
+
 
 #define free_null( _v ) 	\
 	if (_v) {		\
--- a/freeDiameter/routing.c	Thu Dec 03 14:59:23 2009 +0900
+++ b/freeDiameter/routing.c	Thu Dec 03 17:36:35 2009 +0900
@@ -35,25 +35,273 @@
 
 #include "fD.h"
 
+/********************************************************************************/
+/*              First part : handling the extensions callbacks                  */
+/********************************************************************************/
+
+/* Lists of the callbacks, and locks to protect them */
+static pthread_rwlock_t rt_fwd_lock = PTHREAD_RWLOCK_INITIALIZER;
+static struct fd_list 	rt_fwd_list = FD_LIST_INITIALIZER_O(rt_fwd_list, &rt_fwd_lock);
+
+static pthread_rwlock_t rt_out_lock = PTHREAD_RWLOCK_INITIALIZER;
+static struct fd_list 	rt_out_list = FD_LIST_INITIALIZER_O(rt_out_list, &rt_out_lock);
+
+/* Items in the lists are the same */
+struct rt_hdl {
+	struct fd_list	chain;	/* link in the rt_fwd_list or rt_out_list */
+	void *		cbdata;	/* the registered data */
+	union {
+		int	order;	/* This value is used to sort the list */
+		int 	dir;	/* It is the direction for FWD handlers */
+		int	prio;	/* and the priority for OUT handlers */
+	};
+	union {
+		int (*rt_fwd_cb)(void * cbdata, struct msg ** msg);
+		int (*rt_out_cb)(void * cbdata, struct msg * msg, struct fd_list * candidates);
+	};
+};	
+
+/* Add a new entry in the list */
+static int add_ordered(struct rt_hdl * new, struct fd_list * list)
+{
+	/* The list is ordered by prio parameter */
+	struct fd_list * li;
+	
+	CHECK_POSIX( pthread_rwlock_wrlock(list->o) );
+	
+	for (li = list->next; li != list; li = li->next) {
+		struct rt_hdl * h = (struct rt_hdl *) li;
+		if (new->order <= h->order)
+			break;
+	}
+	
+	fd_list_insert_before(li, &new->chain);
+	
+	CHECK_POSIX( pthread_rwlock_unlock(list->o) );
+}
+
+/* Register a new FWD callback */
+int fd_rt_fwd_register ( int (*rt_fwd_cb)(void * cbdata, struct msg ** msg), void * cbdata, enum fd_rt_fwd_dir dir, struct fd_rt_fwd_hdl ** handler )
+{
+	struct rt_hdl * new;
+	
+	TRACE_ENTRY("%p %p %d %p", rt_fwd_cb, cbdata, dir, handler);
+	CHECK_PARAMS( rt_fwd_cb );
+	CHECK_PARAMS( (dir >= RT_FWD_REQ) && ( dir <= RT_FWD_ANS) );
+	
+	/* Create a new container */
+	CHECK_MALLOC(new = malloc(sizeof(struct rt_hdl)));
+	memset(new, 0, sizeof(struct rt_hdl));
+	
+	/* Write the content */
+	fd_list_init(&new->chain, NULL);
+	new->cbdata 	= cbdata;
+	new->dir    	= dir;
+	new->rt_fwd_cb 	= rt_fwd_cb;
+	
+	/* Save this in the list */
+	CHECK_FCT( add_ordered(new, &rt_fwd_list) );
+	
+	/* Give it back to the extension if needed */
+	if (handler)
+		*handler = (void *)new;
+	
+	return 0;
+}
+
+/* Remove it */
+int fd_rt_fwd_unregister ( struct fd_rt_fwd_hdl * handler, void ** cbdata )
+{
+	struct rt_hdl * del;
+	TRACE_ENTRY( "%p %p", handler, cbdata);
+	CHECK_PARAMS( handler );
+	
+	del = (struct rt_hdl *)handler;
+	CHECK_PARAMS( del->chain.head == &rt_fwd_list );
+	
+	/* Unlink */
+	CHECK_POSIX( pthread_rwlock_wrlock(&rt_fwd_lock) );
+	fd_list_unlink(&del->chain);
+	CHECK_POSIX( pthread_rwlock_unlock(&rt_fwd_lock) );
+	
+	if (cbdata)
+		*cbdata = del->cbdata;
+	
+	free(del);
+	return 0;
+}
+
+/* Register a new OUT callback */
+int fd_rt_out_register ( int (*rt_out_cb)(void * cbdata, struct msg * msg, struct fd_list * candidates), void * cbdata, int priority, struct fd_rt_out_hdl ** handler )
+{
+	struct rt_hdl * new;
+	
+	TRACE_ENTRY("%p %p %d %p", rt_out_cb, cbdata, priority, handler);
+	CHECK_PARAMS( rt_out_cb );
+	
+	/* Create a new container */
+	CHECK_MALLOC(new = malloc(sizeof(struct rt_hdl)));
+	memset(new, 0, sizeof(struct rt_hdl));
+	
+	/* Write the content */
+	fd_list_init(&new->chain, NULL);
+	new->cbdata 	= cbdata;
+	new->prio    	= priority;
+	new->rt_out_cb 	= rt_out_cb;
+	
+	/* Save this in the list */
+	CHECK_FCT( add_ordered(new, &rt_out_list) );
+	
+	/* Give it back to the extension if needed */
+	if (handler)
+		*handler = (void *)new;
+	
+	return 0;
+}
+
+/* Remove it */
+int fd_rt_out_unregister ( struct fd_rt_out_hdl * handler, void ** cbdata )
+{
+	struct rt_hdl * del;
+	TRACE_ENTRY( "%p %p", handler, cbdata);
+	CHECK_PARAMS( handler );
+	
+	del = (struct rt_hdl *)handler;
+	CHECK_PARAMS( del->chain.head == &rt_out_list );
+	
+	/* Unlink */
+	CHECK_POSIX( pthread_rwlock_wrlock(&rt_out_lock) );
+	fd_list_unlink(&del->chain);
+	CHECK_POSIX( pthread_rwlock_unlock(&rt_out_lock) );
+	
+	if (cbdata)
+		*cbdata = del->cbdata;
+	
+	free(del);
+	return 0;
+}
+
+/********************************************************************************/
+/*                   Second part : the routing threads                          */
+/********************************************************************************/
+
+/* Note: in the first version, we only create one thread of each kind.
+ We could improve the scalability by using the threshold feature of the queues fd_g_incoming and fd_g_outgoing
+ ( fd_g_local is managed by the dispatch thread ) to create additional threads if a queue is filling up.
+ */
+
+/* Function to return an error to an incoming request */
+static int return_error(struct msg * msg, char * error_code)
+{
+	struct fd_peer * peer;
+
+	/* Get the source of the message */
+	{
+		char * id;
+		CHECK_FCT( fd_msg_source_get( msg, &id ) );
+		
+		/* Search the peer with this id */
+		CHECK_FCT( fd_peer_getbyid( id, (void *)&peer ) );
+		
+		if (!peer) {
+			TRACE_DEBUG(INFO, "Unable to send error '%s' to deleted peer '%s' in reply to:", error_code, id);
+			fd_msg_dump_walk(INFO, msg);
+			fd_msg_free(msg);
+			return 0;
+		}
+	}
+	
+	/* Create the error message */
+	CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, &msg, MSGFL_ANSW_ERROR ) );
+
+	/* Set the error code */
+	CHECK_FCT( fd_msg_rescode_set(msg, error_code, NULL, NULL, 1 ) );
+
+	/* Send the answer */
+	CHECK_FCT( fd_out_send(&msg, NULL, peer) );
+	
+	/* Done */
+	return 0;
+}
+
+
+/* The (routing-in) thread -- see description in freeDiameter.h */
+static void * routing_in_thr(void * arg)
+{
+	TRACE_ENTRY("%p", arg);
+	
+	/* Set the thread name */
+	if (arg) {
+		char buf[48];
+		snprintf(buf, sizeof(buf), "Routing-IN %p", arg);
+		fd_log_threadname ( buf );
+	} else {
+		fd_log_threadname ( "Routing-IN" );
+	}
+	
+	/* Main thread loop */
+	do {
+		struct msg * msg;
+		struct msg_hdr * hdr;
+		int is_req = 0;
+		int is_err = 0;
+		
+		/* Test if we were told to stop */
+		pthread_testcancel();
+		
+		/* Get the next message from the incoming queue */
+		CHECK_FCT_DO( fd_fifo_get ( fd_g_incoming, &msg ), goto fatal_error );
+		
+		if (TRACE_BOOL(FULL)) {
+			TRACE_DEBUG(FULL, "Picked next message:");
+			fd_msg_dump_one(FULL, msg);
+		}
+		
+		/* Read the message header */
+		CHECK_FCT_DO( fd_msg_hdr(msg, &hdr), goto fatal_error );
+		is_req = hdr->msg_flags & CMD_FLAG_REQUEST;
+		is_err = hdr->msg_flags & CMD_FLAG_ERROR;
+		
+		/* Handle incorrect bits */
+		if (is_req && is_err) {
+			CHECK_FCT_DO( return_error( msg, "DIAMETER_INVALID_HDR_BITS"), goto fatal_error );
+			continue;
+		}
+		
+		
+		
+
+	
+	
+	} while (1);
+	
+fatal_error:
+	TRACE_DEBUG(INFO, "An error occurred in routing module! IN thread is terminating...");
+	CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), );
+	return NULL;
+}
+
 /* Note: after testing if the message is to be handled locally, we should test for decorated NAI 
   (draft-ietf-dime-nai-routing-04 section 4.4) */
+  
 /* Note2: if the message is still for local delivery, we should test for duplicate
   (draft-asveren-dime-dupcons-00). This may conflict with path validation decisions, no clear answer yet */
 
+
+
 /* Initialize the routing module */
 int fd_rt_init(void)
 {
+	TODO("Start the routing threads");
 	return ENOTSUP;
 }
 
 /* Terminate the routing module */
 int fd_rt_fini(void)
 {
+	TODO("Stop the routing threads");
 	return ENOTSUP;
 }
 
-int fd_rt_fwd_register ( int (*rt_fwd_cb)(void * cbdata, struct msg ** msg), void * cbdata, enum fd_rt_fwd_dir dir, struct fd_rt_fwd_hdl ** handler );
-int fd_rt_fwd_unregister ( struct fd_rt_fwd_hdl * handler, void ** cbdata );
 
-int fd_rt_out_register ( int (*rt_out_cb)(void * cbdata, struct msg * msg, struct fd_list * candidates), void * cbdata, int priority, struct fd_rt_out_hdl ** handler );
-int fd_rt_out_unregister ( struct fd_rt_out_hdl * handler, void ** cbdata );
+
--- a/include/freeDiameter/freeDiameter.h	Thu Dec 03 14:59:23 2009 +0900
+++ b/include/freeDiameter/freeDiameter.h	Thu Dec 03 17:36:35 2009 +0900
@@ -285,6 +285,22 @@
 int fd_peer_add ( struct peer_info * info, char * orig_dbg, void (*cb)(struct peer_info *, void *), void * cb_data );
 
 /*
+ * FUNCTION:	fd_peer_getbyid
+ *
+ * PARAMETERS:
+ *  diamid 	: A \0 terminated string.
+ *  peer	: The peer is stored here if it exists.
+ *
+ * DESCRIPTION: 
+ *   Search a peer by its Diameter-Id.
+ *
+ * RETURN VALUE:
+ *  0   : *peer has been updated (to NULL if the peer is not found).
+ * !0	: An error occurred.
+ */
+int fd_peer_getbyid( char * diamid, struct peer_hdr ** peer );
+
+/*
  * FUNCTION:	peer_validate_register
  *
  * PARAMETERS:
@@ -480,8 +496,8 @@
 /* Message direction for the callback */
 enum fd_rt_fwd_dir {
 	RT_FWD_REQ = 1,	/* The callback will be called on forwarded requests only */
-	RT_FWD_ANS,	/* The callback will be called on answers and errors only */
-	RT_FWD_ALL,	/* The callback will be called on all forwarded messages */
+	RT_FWD_ALL = 2,	/* The callback will be called on all forwarded messages (requests and answers )*/
+	RT_FWD_ANS = 3	/* The callback will be called on answers and errors only */
 };	
 
 /*
@@ -496,6 +512,7 @@
  *
  * DESCRIPTION: 
  *   Register a new callback for forwarded messages. See explanations above. 
+ * Note that there is no guaranteed order for the callbacks calls.
  *
  * RETURN VALUE:
  *  0      	: The callback is registered.
"Welcome to our mercurial repository"